import numpy as np import soundfile as sf import time def audio_stream_generator(audio_file_path, chunk_size=4096, simulate_realtime=True): """ 音频流生成器,从音频文件中读取数据并以流的方式输出 参数: audio_file_path: 音频文件路径 chunk_size: 每个数据块的大小(采样点数) simulate_realtime: 是否模拟实时流处理的速度 生成: numpy.ndarray: 每次生成一个chunk_size大小的np.float32数据块 """ # 加载音频文件 audio_data, sample_rate = sf.read(audio_file_path) # 确保音频数据是float32类型 if audio_data.dtype != np.float32: audio_data = audio_data.astype(np.float32) # 如果是立体声,转换为单声道 if len(audio_data.shape) > 1 and audio_data.shape[1] > 1: audio_data = audio_data.mean(axis=1) print(f"已加载音频文件: {audio_file_path}") print(f"采样率: {sample_rate} Hz") print(f"音频长度: {len(audio_data)/sample_rate:.2f} 秒") # 计算每个块的时长(秒) chunk_duration = chunk_size / sample_rate if simulate_realtime else 0 # 按块生成数据 audio_len = len(audio_data) for pos in range(0, audio_len, chunk_size): # 获取当前块 end_pos = min(pos + chunk_size, audio_len) chunk = audio_data[pos:end_pos] # 如果块大小不足,用0填充 if len(chunk) < chunk_size: padded_chunk = np.zeros(chunk_size, dtype=np.float32) padded_chunk[:len(chunk)] = chunk chunk = padded_chunk # 模拟实时处理的延迟 if simulate_realtime: time.sleep(chunk_duration) yield chunk print("音频流处理完成")