Spaces:
Sleeping
Sleeping
import numpy as np | |
from scipy.io.wavfile import write | |
from scipy.signal import find_peaks | |
from scipy.fft import fft | |
from tqdm import tqdm | |
import time | |
import matplotlib.pyplot as plt | |
from scipy.io.wavfile import read | |
from scipy import signal | |
import gradio as gr | |
import reedsolo | |
import wavio | |
from scipy.signal import butter, lfilter | |
# ---------------Parameters--------------- # | |
low_frequency = 18000 | |
high_frequency = 19000 | |
bit_duration = 0.007 | |
sample_rate = 44100 | |
amplitude_scaling_factor = 10.0 | |
# -----------------Record----------------- # | |
def record(audio): | |
try: | |
sr, data = audio | |
wavio.write("recorded.wav", data, sr) | |
main() | |
return f"Audio receive correctly" | |
except Exception as e: | |
return f"Error: {e}" | |
# -----------------Filter----------------- # | |
def butter_bandpass(lowcut, highcut, sr, order=5): | |
nyquist = 0.5 * sr | |
low = lowcut / nyquist | |
high = highcut / nyquist | |
coef = butter(order, [low, high], btype='band') | |
b = coef[0] | |
a = coef[1] | |
return b, a | |
def butter_bandpass_filter(data, lowcut, highcut, sr, order=5): | |
b, a = butter_bandpass(lowcut, highcut, sr, order=order) | |
y = lfilter(b, a, data) | |
return y | |
def main(): | |
input_file = 'recorded.wav' | |
output_file = 'output_filtered_receiver.wav' | |
lowcut = 17500 | |
highcut = 19500 | |
sr, data = read(input_file) | |
filtered_data = butter_bandpass_filter(data, lowcut, highcut, sr) | |
write(output_file, sr, np.int16(filtered_data)) | |
return "Filtered Audio Generated" | |
# -----------------Frame----------------- # | |
def calculate_snr(data, start, end, target_frequency): | |
segment = data[start:end] | |
spectrum = np.fft.fft(segment) | |
frequencies = np.fft.fftfreq(len(spectrum), 1 / sample_rate) | |
target_index = np.abs(frequencies - target_frequency).argmin() | |
amplitude = np.abs(spectrum[target_index]) | |
noise_segment = data[100:1000 + len(segment)] | |
noise_spectrum = np.fft.fft(noise_segment) | |
noise_amplitude = np.abs(noise_spectrum[target_index]) | |
snr = 10 * np.log10(amplitude / noise_amplitude) | |
return snr | |
def frame_analyse(filename): | |
sr, y = read(filename) | |
first_part_start = 0 | |
first_part_end = len(y) // 2 | |
second_part_start = len(y) // 2 | |
second_part_end = len(y) | |
segment_length = 256 | |
overlap_size = 128 | |
f, t, sxx = signal.spectrogram(y, sr, nperseg=segment_length, noverlap=overlap_size) | |
plt.figure() | |
plt.pcolormesh(t, f, sxx, shading="gouraud") | |
plt.xlabel("Time [s]") | |
plt.ylabel("Frequency [Hz]") | |
plt.title("Spectrogram of the signal") | |
plt.show() | |
f0 = 18000 | |
f_idx = np.argmin(np.abs(f - f0)) | |
thresholds_start = calculate_snr(y, first_part_start, first_part_end, low_frequency) | |
thresholds_end = calculate_snr(y, second_part_start, second_part_end, high_frequency) | |
t_idx_start = np.argmax(sxx[f_idx] > thresholds_start) | |
t_start = t[t_idx_start] | |
t_idx_end = t_idx_start | |
while t_idx_end < len(t) and np.max(sxx[f_idx, t_idx_end:]) > thresholds_end: | |
t_idx_end += 1 | |
t_end = t[t_idx_end] | |
return t_start, t_end | |
# -----------------Receiver----------------- # | |
def dominant_frequency(signal_value): | |
yf = fft(signal_value) | |
xf = np.linspace(0.0, sample_rate / 2.0, len(signal_value) // 2) | |
peaks, _ = find_peaks(np.abs(yf[0:len(signal_value) // 2])) | |
return xf[peaks[np.argmax(np.abs(yf[0:len(signal_value) // 2][peaks]))]] | |
def binary_to_text(binary): | |
try: | |
return ''.join(chr(int(binary[i:i + 8], 2)) for i in range(0, len(binary), 8)) | |
except Exception as e: | |
return f"Except: {e}" | |
def decode_rs(binary_string, ecc_bytes): | |
byte_data = bytearray(int(binary_string[i:i + 8], 2) for i in range(0, len(binary_string), 8)) | |
rs = reedsolo.RSCodec(ecc_bytes) | |
corrected_data_tuple = rs.decode(byte_data) | |
corrected_data = corrected_data_tuple[0] | |
corrected_data = corrected_data.rstrip(b'\x00') | |
corrected_binary_string = ''.join(format(byte, '08b') for byte in corrected_data) | |
return corrected_binary_string | |
def manchester_decoding(binary_string): | |
decoded_string = '' | |
for i in tqdm(range(0, len(binary_string), 2), desc="Decoding"): | |
if i + 1 < len(binary_string): | |
if binary_string[i] == '0' and binary_string[i + 1] == '1': | |
decoded_string += '0' | |
elif binary_string[i] == '1' and binary_string[i + 1] == '0': | |
decoded_string += '1' | |
else: | |
print("Error: Invalid Manchester Encoding") | |
return None | |
return decoded_string | |
def signal_to_binary_between_times(filename): | |
start_time, end_time = frame_analyse(filename) | |
sr, data = read(filename) | |
start_sample = int((start_time - 0.007) * sr) | |
end_sample = int((end_time - 0.007) * sr) | |
binary_string = '' | |
start_analyse_time = time.time() | |
for i in tqdm(range(start_sample, end_sample, int(sr * bit_duration))): | |
signal_value = data[i:i + int(sr * bit_duration)] | |
frequency = dominant_frequency(signal_value) | |
if np.abs(frequency - low_frequency) < np.abs(frequency - high_frequency): | |
binary_string += '0' | |
else: | |
binary_string += '1' | |
index_start = binary_string.find("1000001") | |
substrings = ["0111110", "011110"] | |
index_end = -1 | |
for substring in substrings: | |
index = binary_string.find(substring) | |
if index != -1: | |
index_end = index | |
break | |
print("Binary String:", binary_string) | |
binary_string_decoded = manchester_decoding(binary_string[index_start + 7:index_end]) | |
decoded_binary_string = decode_rs(binary_string_decoded, 20) | |
return decoded_binary_string | |
def receive(): | |
try: | |
audio_receive = signal_to_binary_between_times('output_filtered_receiver.wav') | |
return binary_to_text(audio_receive) | |
except Exception as e: | |
return f"Error: {e}" | |
# -----------------Interface----------------- # | |
with gr.Blocks() as demo: | |
input_audio = gr.Audio(sources=["upload"]) | |
output_text = gr.Textbox(label="Record Sound") | |
btn_convert = gr.Button(value="Convert") | |
btn_convert.click(fn=record, inputs=input_audio, outputs=output_text) | |
output_convert = gr.Textbox(label="Received Text") | |
btn_receive = gr.Button(value="Received Text") | |
btn_receive.click(fn=receive, outputs=output_convert) | |
demo.launch() |