receiver-signal / app.py
Awell00's picture
Update app.py
442e214
import numpy as np
from scipy.io.wavfile import write
from scipy.signal import find_peaks
from scipy.fft import fft
from tqdm import tqdm
import matplotlib.pyplot as plt
from scipy.io.wavfile import read
from scipy import signal
import gradio as gr
import reedsolo
import wavio
from scipy.signal import butter, lfilter
# ---------------Parameters--------------- #
input_file = 'recorded.wav'
output_file = 'output_filtered_receiver.wav'
low_frequency = 18000
high_frequency = 19000
bit_duration = 0.007
sample_rate = 44100
amplitude_scaling_factor = 10.0
# -----------------Record----------------- #
def record(audio):
"""
This function records audio and writes it to a .wav file.
Parameters:
audio (tuple): A tuple containing the sample rate and the audio data.
Returns:
str: A success message if the audio is recorded correctly, otherwise an error message.
"""
try:
# Check if the audio tuple contains exactly two elements
if len(audio) != 2:
return f"Error: Expected a tuple with 2 elements, but got {len(audio)}"
# Unpack the sample rate and data from the audio tuple
sr, data = audio
# Write the audio data to a .wav file
wavio.write("recorded.wav", data, sr)
# Call the filtered function to apply the bandpass filter to the audio data
filtered()
# Return a success message
return f"Audio receive correctly"
except Exception as e:
# If an error occurs, return an error message
return f"Error: {str(e)}"
# -----------------Filter----------------- #
def butter_bandpass(sr, order=5):
"""
This function designs a Butterworth bandpass filter.
Parameters:
sr (int): The sample rate of the audio.
order (int): The order of the filter.
Returns:
tuple: The filter coefficients `b` and `a`.
"""
# Calculate the Nyquist frequency
nyquist = 0.5 * sr
# Normalize the cutoff frequencies with a 500 Hz offset
low = (low_frequency - 500) / nyquist
high = (high_frequency + 500) / nyquist
# Design the Butterworth bandpass filter
coef = butter(order, [low, high], btype='band')
# Extract the filter coefficients
b = coef[0]
a = coef[1]
return b, a
def butter_bandpass_filter(data, sr, order=5):
"""
This function applies the Butterworth bandpass filter to a given data.
Parameters:
data (array): The audio data to be filtered.
sr (int): The sample rate of the audio.
order (int): The order of the filter.
Returns:
array: The filtered audio data.
"""
# Get the filter coefficients
b, a = butter_bandpass(sr, order=order)
# Apply the filter to the data
y = lfilter(b, a, data)
return y
def filtered():
"""
This function reads an audio file, applies the bandpass filter to the audio data,
and then writes the filtered data to an output file.
Returns:
str: A success message if the audio is filtered correctly, otherwise an error message.
"""
# Read the audio data from the input file
sr, data = read(input_file)
# Apply the bandpass filter to the audio data
filtered_data = butter_bandpass_filter(data, sr)
# Write the filtered data to the output file
write(output_file, sr, np.int16(filtered_data))
return "Filtered Audio Generated"
# -----------------Frame----------------- #
def calculate_snr(data, start, end, target_frequency):
"""
This function calculates the Signal-to-Noise Ratio (SNR) for a given frequency within a segment of data.
Parameters:
data (array): The audio data.
start (int): The start index of the segment.
end (int): The end index of the segment.
target_frequency (float): The frequency for which the SNR is to be calculated.
Returns:
float: The calculated SNR.
"""
try:
# Extract the segment from the data
segment = data[start:end]
# Perform a Fast Fourier Transform on the segment
spectrum = np.fft.fft(segment)
# Generate the frequencies corresponding to the FFT coefficients
frequencies = np.fft.fftfreq(len(spectrum), 1 / sample_rate)
# Find the index of the target frequency
target_index = np.abs(frequencies - target_frequency).argmin()
# Calculate the amplitude of the target frequency
amplitude = np.abs(spectrum[target_index])
# Define a noise segment
noise_segment = data[100:1000 + len(segment)]
# Perform a Fast Fourier Transform on the noise segment
noise_spectrum = np.fft.fft(noise_segment)
# Calculate the amplitude of the noise at the target frequency
noise_amplitude = np.abs(noise_spectrum[target_index])
# Calculate the SNR
snr = 10 * np.log10(amplitude / noise_amplitude)
return snr
except Exception as e:
# If an error occurs, return an error message
return f"Error: {e}"
def frame_analyse(filename):
"""
This function analyses an audio file and returns the start and end times of the signal of interest.
Parameters:
filename (str): The path to the audio file.
Returns:
tuple: The start and end times of the signal of interest.
"""
try:
# Read the audio file
sr, y = read(filename)
# Define the start and end indices of the first and second parts of the audio data
first_part_start = 0
first_part_end = len(y) // 2
second_part_start = len(y) // 2
second_part_end = len(y)
# Define the segment length and overlap size for the spectrogram
segment_length = 256
overlap_size = 128
# Calculate the spectrogram of the audio data
f, t, sxx = signal.spectrogram(y, sr, nperseg=segment_length, noverlap=overlap_size)
# Plot the spectrogram
plt.figure()
plt.pcolormesh(t, f, sxx, shading="gouraud")
plt.xlabel("Time [s]")
plt.ylabel("Frequency [Hz]")
plt.title("Spectrogram of the signal")
plt.show()
# Define the target frequency
f0 = 18000
# Find the index of the target frequency
f_idx = np.argmin(np.abs(f - f0))
# Calculate the SNR thresholds for the start and end of the signal
thresholds_start = calculate_snr(y, first_part_start, first_part_end, low_frequency)
thresholds_end = calculate_snr(y, second_part_start, second_part_end, high_frequency)
# Find the start and end indices of the signal of interest
t_idx_start = np.argmax(sxx[f_idx] > thresholds_start)
t_idx_end = t_idx_start
while t_idx_end < len(t) and np.max(sxx[f_idx, t_idx_end:]) > thresholds_end:
t_idx_end += 1
# Convert the start and end indices to times
t_start = t[t_idx_start]
t_end = t[t_idx_end]
return t_start, t_end
except Exception as e:
# If an error occurs, return an error message
return f"Error: {e}"
# -----------------Receiver----------------- #
def dominant_frequency(signal_value):
"""
This function calculates the dominant frequency in a given signal.
Parameters:
signal_value (array): The signal data.
Returns:
float: The dominant frequency.
"""
# Perform a Fast Fourier Transform on the signal
yf = fft(signal_value)
# Generate the frequencies corresponding to the FFT coefficients
xf = np.linspace(0.0, sample_rate / 2.0, len(signal_value) // 2)
# Find the peaks in the absolute values of the FFT coefficients
peaks, _ = find_peaks(np.abs(yf[0:len(signal_value) // 2]))
# Return the frequency corresponding to the peak with the highest amplitude
return xf[peaks[np.argmax(np.abs(yf[0:len(signal_value) // 2][peaks]))]]
def binary_to_text(binary):
"""
This function converts a binary string to text.
Parameters:
binary (str): The binary string.
Returns:
str: The converted text.
"""
try:
# Convert each 8-bit binary number to a character and join them together
return ''.join(chr(int(binary[i:i + 8], 2)) for i in range(0, len(binary), 8))
except Exception as e:
# If an error occurs, return an error message
return f"Error: {e}"
def decode_rs(binary_string, ecc_bytes):
"""
This function decodes a Reed-Solomon encoded binary string.
Parameters:
binary_string (str): The binary string.
ecc_bytes (int): The number of error correction bytes used in the encoding.
Returns:
str: The decoded binary string.
"""
# Convert the binary string to a bytearray
byte_data = bytearray(int(binary_string[i:i + 8], 2) for i in range(0, len(binary_string), 8))
# Initialize a Reed-Solomon codec
rs = reedsolo.RSCodec(ecc_bytes)
# Decode the bytearray
corrected_data_tuple = rs.decode(byte_data)
corrected_data = corrected_data_tuple[0]
# Remove trailing null bytes
corrected_data = corrected_data.rstrip(b'\x00')
# Convert the bytearray back to a binary string
corrected_binary_string = ''.join(format(byte, '08b') for byte in corrected_data)
return corrected_binary_string
def manchester_decoding(binary_string):
"""
This function decodes a Manchester encoded binary string.
Parameters:
binary_string (str): The binary string.
Returns:
str: The decoded binary string.
"""
decoded_string = ''
for i in tqdm(range(0, len(binary_string), 2), desc="Decoding"):
if i + 1 < len(binary_string):
if binary_string[i] == '0' and binary_string[i + 1] == '1':
decoded_string += '0'
elif binary_string[i] == '1' and binary_string[i + 1] == '0':
decoded_string += '1'
else:
print("Error: Invalid Manchester Encoding")
return None
return decoded_string
def signal_to_binary_between_times(filename):
"""
This function converts a signal to a binary string between specified times.
Parameters:
filename (str): The path to the audio file.
Returns:
str: The binary string.
"""
# Get the start and end times of the signal of interest
start_time, end_time = frame_analyse(filename)
# Read the audio file
sr, data = read(filename)
# Calculate the start and end samples of the signal of interest
start_sample = int((start_time - 0.007) * sr)
end_sample = int((end_time - 0.007) * sr)
binary_string = ''
# Convert each sample to a binary digit
for i in tqdm(range(start_sample, end_sample, int(sr * bit_duration))):
signal_value = data[i:i + int(sr * bit_duration)]
frequency = dominant_frequency(signal_value)
if np.abs(frequency - low_frequency) < np.abs(frequency - high_frequency):
binary_string += '0'
else:
binary_string += '1'
# Find the start and end indices of the binary string
index_start = binary_string.find("1000001")
substrings = ["0111110", "011110"]
index_end = -1
for substring in substrings:
index = binary_string.find(substring)
if index != -1:
index_end = index
break
print("Binary String:", binary_string)
binary_string_decoded = manchester_decoding(binary_string[index_start + 7:index_end])
# Decode the binary string
decoded_binary_string = decode_rs(binary_string_decoded, 20)
return decoded_binary_string
def receive():
"""
This function receives an audio signal, converts it to a binary string, and then converts the binary string to text.
Returns:
str: The received text.
"""
try:
# Convert the audio signal to a binary string
audio_receive = signal_to_binary_between_times('output_filtered_receiver.wav')
# Convert the binary string to text
return binary_to_text(audio_receive)
except Exception as e:
# If an error occurs, return an error message
return f"Error: {e}"
# -----------------Interface----------------- #
# Start a Gradio Blocks interface
with gr.Blocks() as demo:
input_audio = gr.Audio(sources=["upload"])
output_text = gr.Textbox(label="Record Sound")
btn_convert = gr.Button(value="Convert")
btn_convert.click(fn=record, inputs=input_audio, outputs=output_text)
output_convert = gr.Textbox(label="Received Text")
btn_receive = gr.Button(value="Received Text")
btn_receive.click(fn=receive, outputs=output_convert)
demo.launch()