import numpy as np from scipy.io.wavfile import write from scipy.signal import find_peaks from scipy.fft import fft from tqdm import tqdm import time import matplotlib.pyplot as plt from scipy.io.wavfile import read from scipy import signal import gradio as gr import reedsolo import wavio from scipy.signal import butter, lfilter # ---------------Parameters--------------- # low_frequency = 18000 high_frequency = 19000 bit_duration = 0.007 sample_rate = 44100 amplitude_scaling_factor = 10.0 # -----------------Record----------------- # def record(audio): try: sr, data = audio wavio.write("recorded.wav", data, sr) main() return f"Audio receive correctly" except Exception as e: return f"Error: {e}" # -----------------Filter----------------- # def butter_bandpass(lowcut, highcut, sr, order=5): nyquist = 0.5 * sr low = lowcut / nyquist high = highcut / nyquist coef = butter(order, [low, high], btype='band') b = coef[0] a = coef[1] return b, a def butter_bandpass_filter(data, lowcut, highcut, sr, order=5): b, a = butter_bandpass(lowcut, highcut, sr, order=order) y = lfilter(b, a, data) return y def main(): input_file = 'recorded.wav' output_file = 'output_filtered_receiver.wav' lowcut = 17500 highcut = 19500 sr, data = read(input_file) filtered_data = butter_bandpass_filter(data, lowcut, highcut, sr) write(output_file, sr, np.int16(filtered_data)) return "Filtered Audio Generated" # -----------------Frame----------------- # def calculate_snr(data, start, end, target_frequency): segment = data[start:end] spectrum = np.fft.fft(segment) frequencies = np.fft.fftfreq(len(spectrum), 1 / sample_rate) target_index = np.abs(frequencies - target_frequency).argmin() amplitude = np.abs(spectrum[target_index]) noise_segment = data[100:1000 + len(segment)] noise_spectrum = np.fft.fft(noise_segment) noise_amplitude = np.abs(noise_spectrum[target_index]) snr = 10 * np.log10(amplitude / noise_amplitude) return snr def frame_analyse(filename): sr, y = read(filename) first_part_start = 0 first_part_end = len(y) // 2 second_part_start = len(y) // 2 second_part_end = len(y) segment_length = 256 overlap_size = 128 f, t, sxx = signal.spectrogram(y, sr, nperseg=segment_length, noverlap=overlap_size) plt.figure() plt.pcolormesh(t, f, sxx, shading="gouraud") plt.xlabel("Time [s]") plt.ylabel("Frequency [Hz]") plt.title("Spectrogram of the signal") plt.show() f0 = 18000 f_idx = np.argmin(np.abs(f - f0)) thresholds_start = calculate_snr(y, first_part_start, first_part_end, low_frequency) thresholds_end = calculate_snr(y, second_part_start, second_part_end, high_frequency) t_idx_start = np.argmax(sxx[f_idx] > thresholds_start) t_start = t[t_idx_start] t_idx_end = t_idx_start while t_idx_end < len(t) and np.max(sxx[f_idx, t_idx_end:]) > thresholds_end: t_idx_end += 1 t_end = t[t_idx_end] return t_start, t_end # -----------------Receiver----------------- # def dominant_frequency(signal_value): yf = fft(signal_value) xf = np.linspace(0.0, sample_rate / 2.0, len(signal_value) // 2) peaks, _ = find_peaks(np.abs(yf[0:len(signal_value) // 2])) return xf[peaks[np.argmax(np.abs(yf[0:len(signal_value) // 2][peaks]))]] def binary_to_text(binary): try: return ''.join(chr(int(binary[i:i + 8], 2)) for i in range(0, len(binary), 8)) except Exception as e: return f"Except: {e}" def decode_rs(binary_string, ecc_bytes): byte_data = bytearray(int(binary_string[i:i + 8], 2) for i in range(0, len(binary_string), 8)) rs = reedsolo.RSCodec(ecc_bytes) corrected_data_tuple = rs.decode(byte_data) corrected_data = corrected_data_tuple[0] corrected_data = corrected_data.rstrip(b'\x00') corrected_binary_string = ''.join(format(byte, '08b') for byte in corrected_data) return corrected_binary_string def manchester_decoding(binary_string): decoded_string = '' for i in tqdm(range(0, len(binary_string), 2), desc="Decoding"): if i + 1 < len(binary_string): if binary_string[i] == '0' and binary_string[i + 1] == '1': decoded_string += '0' elif binary_string[i] == '1' and binary_string[i + 1] == '0': decoded_string += '1' else: print("Error: Invalid Manchester Encoding") return None return decoded_string def signal_to_binary_between_times(filename): start_time, end_time = frame_analyse(filename) sr, data = read(filename) start_sample = int((start_time - 0.007) * sr) end_sample = int((end_time - 0.007) * sr) binary_string = '' start_analyse_time = time.time() for i in tqdm(range(start_sample, end_sample, int(sr * bit_duration))): signal_value = data[i:i + int(sr * bit_duration)] frequency = dominant_frequency(signal_value) if np.abs(frequency - low_frequency) < np.abs(frequency - high_frequency): binary_string += '0' else: binary_string += '1' index_start = binary_string.find("1000001") substrings = ["0111110", "011110"] index_end = -1 for substring in substrings: index = binary_string.find(substring) if index != -1: index_end = index break print("Binary String:", binary_string) binary_string_decoded = manchester_decoding(binary_string[index_start + 7:index_end]) decoded_binary_string = decode_rs(binary_string_decoded, 20) return decoded_binary_string def receive(): try: audio_receive = signal_to_binary_between_times('output_filtered_receiver.wav') return binary_to_text(audio_receive) except Exception as e: return f"Error: {e}" # -----------------Interface----------------- # with gr.Blocks() as demo: input_audio = gr.Audio(sources=["upload"]) output_text = gr.Textbox(label="Record Sound") btn_convert = gr.Button(value="Convert") btn_convert.click(fn=record, inputs=input_audio, outputs=output_text) output_convert = gr.Textbox(label="Received Text") btn_receive = gr.Button(value="Received Text") btn_receive.click(fn=receive, outputs=output_convert) demo.launch()