import os |
import pathlib |
import tempfile |
from pydub import AudioSegment, silence |
import gradio as gr |
import torch |
import torchaudio |
from fairseq2.assets import InProcAssetMetadataProvider, asset_store |
from fairseq2.data import Collater, SequenceData, VocabularyInfo |
from fairseq2.data.audio import ( |
AudioDecoder, |
WaveformToFbankConverter, |
WaveformToFbankOutput, |
) |
from seamless_communication.inference import SequenceGeneratorOptions |
from fairseq2.generation import NGramRepeatBlockProcessor |
from fairseq2.memory import MemoryBlock |
from fairseq2.typing import DataType, Device |
from huggingface_hub import snapshot_download |
from seamless_communication.inference import BatchedSpeechOutput, Translator, SequenceGeneratorOptions |
from seamless_communication.models.generator.loader import load_pretssel_vocoder_model |
from seamless_communication.models.unity import ( |
UnitTokenizer, |
load_gcmvn_stats, |
load_unity_text_tokenizer, |
load_unity_unit_tokenizer, |
) |
from torch.nn import Module |
from seamless_communication.cli.expressivity.evaluate.pretssel_inference_helper import PretsselGenerator |
from utils import LANGUAGE_CODE_TO_NAME |
# Seamless Expressive |
[SeamlessExpressive](https://github.com/facebookresearch/seamless_communication) is a speech-to-speech translation model that captures certain underexplored aspects of prosody such as speech rate and pauses, while preserving the style of one's voice and high content translation quality. |
""" |
CACHE_EXAMPLES = os.getenv("CACHE_EXAMPLES") == "1" and torch.cuda.is_available() |
CHECKPOINTS_PATH = pathlib.Path(os.getenv("CHECKPOINTS_PATH", "/workspace/seamless_communication/demo/expressive/models")) |
if not CHECKPOINTS_PATH.exists(): |
snapshot_download(repo_id="facebook/seamless-expressive", repo_type="model", local_dir=CHECKPOINTS_PATH) |
snapshot_download(repo_id="facebook/seamless-m4t-v2-large", repo_type="model", local_dir=CHECKPOINTS_PATH) |
asset_store.env_resolvers.clear() |
asset_store.env_resolvers.append(lambda: "demo") |
demo_metadata = [ |
{ |
"name": "seamless_expressivity@demo", |
"checkpoint": f"file://{CHECKPOINTS_PATH}/m2m_expressive_unity.pt", |
"char_tokenizer": f"file://{CHECKPOINTS_PATH}/spm_char_lang38_tc.model", |
}, |
{ |
"name": "vocoder_pretssel@demo", |
"checkpoint": f"file://{CHECKPOINTS_PATH}/pretssel_melhifigan_wm-final.pt", |
}, |
{ |
"name": "seamlessM4T_v2_large@demo", |
"checkpoint": f"file://{CHECKPOINTS_PATH}/seamlessM4T_v2_large.pt", |
"char_tokenizer": f"file://{CHECKPOINTS_PATH}/spm_char_lang38_tc.model", |
}, |
] |
asset_store.metadata_providers.append(InProcAssetMetadataProvider(demo_metadata)) |
LANGUAGE_NAME_TO_CODE = {v: k for k, v in LANGUAGE_CODE_TO_NAME.items()} |
if torch.cuda.is_available(): |
device = torch.device("cuda:0") |
dtype = torch.float16 |
else: |
device = torch.device("cpu") |
dtype = torch.float32 |
MODEL_NAME = "seamless_expressivity" |
VOCODER_NAME = "vocoder_pretssel" |
m4t_translator = Translator( |
model_name_or_card="seamlessM4T_v2_large", |
vocoder_name_or_card=None, |
device=device, |
dtype=dtype, |
) |
unit_tokenizer = load_unity_unit_tokenizer(MODEL_NAME) |
_gcmvn_mean, _gcmvn_std = load_gcmvn_stats(VOCODER_NAME) |
gcmvn_mean = torch.tensor(_gcmvn_mean, device=device, dtype=dtype) |
gcmvn_std = torch.tensor(_gcmvn_std, device=device, dtype=dtype) |
translator = Translator( |
vocoder_name_or_card=None, |
device=device, |
dtype=dtype, |
apply_mintox=False, |
) |
text_generation_opts = SequenceGeneratorOptions( |
beam_size=5, |
unk_penalty=torch.inf, |
soft_max_seq_len=(0, 200), |
step_processor=NGramRepeatBlockProcessor( |
ngram_size=10, |
), |
) |
m4t_text_generation_opts = SequenceGeneratorOptions( |
beam_size=5, |
unk_penalty=torch.inf, |
soft_max_seq_len=(1, 200), |
step_processor=NGramRepeatBlockProcessor( |
ngram_size=10, |
), |
) |
pretssel_generator = PretsselGenerator( |
vocab_info=unit_tokenizer.vocab_info, |
device=device, |
dtype=dtype, |
) |
decode_audio = AudioDecoder(dtype=torch.float32, device=device) |
convert_to_fbank = WaveformToFbankConverter( |
num_mel_bins=80, |
waveform_scale=2**15, |
channel_last=True, |
standardize=False, |
device=device, |
dtype=dtype, |
) |
def normalize_fbank(data: WaveformToFbankOutput) -> WaveformToFbankOutput: |
fbank = data["fbank"] |
std, mean = torch.std_mean(fbank, dim=0) |
data["fbank"] = fbank.subtract(mean).divide(std) |
data["gcmvn_fbank"] = fbank.subtract(gcmvn_mean).divide(gcmvn_std) |
return data |
collate = Collater(pad_value=0, pad_to_multiple=1) |
from pydub import AudioSegment |
def adjust_audio_duration(input_audio_path, output_audio_path): |
input_audio = AudioSegment.from_file(input_audio_path) |
output_audio = AudioSegment.from_file(output_audio_path) |
input_duration = len(input_audio) |
output_duration = len(output_audio) |
duration_diff = input_duration - output_duration |
if duration_diff > 0: |
print("Duration diff : ",duration_diff) |
silence = AudioSegment.silent(duration=duration_diff) |
output_audio += silence |
output_audio.export(output_audio_path, format='wav') |
return output_audio_path |
import yt_dlp |
def dowloadYoutubeAudio(url): |
print("Téléchargement de l'audio YouTube en cours...") |
ydl_opts = { |
'format': 'm4a/bestaudio/best', |
'outtmpl': os.getcwd() + "/audio", |
'postprocessors': [{ |
'key': 'FFmpegExtractAudio', |
'preferredcodec': 'wav', |
}] |
} |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
error_code = ydl.download([url]) |
if error_code == 0: |
print("Sauvegarde du fichier audio...") |
print("download_finished : ", os.getcwd() + "/audio.wav") |
else: |
print("error : Échec du téléchargement...") |
return os.getcwd() + "/audio.wav" |
def split_audio(input_audio_path): |
print("Start Split Audio") |
audio = AudioSegment.from_file(input_audio_path) |
silence_thresh = -20 |
min_silence_len = 300 |
chunks = [] |
current_chunk = AudioSegment.silent(duration=0) |
for ms in range(0, len(audio), 10): |
segment = audio[ms:ms + 10] |
current_chunk += segment |
if len(current_chunk) >= 8000: |
if silence.detect_silence(current_chunk[-min_silence_len:], min_silence_len=min_silence_len, silence_thresh=silence_thresh): |
print("Silence détecté, découpage du segment") |
chunks.append(current_chunk) |
current_chunk = AudioSegment.silent(duration=0) |
if len(current_chunk) >= 8900: |
print("Durée maximale atteinte, découpage du segment") |
chunks.append(current_chunk) |
current_chunk = AudioSegment.silent(duration=0) |
if len(current_chunk) > 0: |
chunks.append(current_chunk) |
print('Nombre de segments valides:', len(chunks)) |
return chunks |
def remove_prosody_tokens_from_text(text): |
text = text.replace("*", "").replace("=", "") |
text = " ".join(text.split()) |
return text |
import torchaudio |
def preprocess_audio(input_audio_path: str): |
print("preprocess_audio start") |
print("Audio Path :", input_audio_path) |
audio_segments = split_audio(input_audio_path) |
temp_folder = os.path.join(os.getcwd(), "path_to_temp_folder") |
os.makedirs(temp_folder, exist_ok=True) |
segment_paths = [] |
for i, segment in enumerate(audio_segments): |
segment_path = os.path.join(temp_folder, f"segment_{i}.wav") |
segment_audio = segment.get_array_of_samples() |
segment_tensor = torch.tensor(segment_audio).unsqueeze(0).float() |
segment_tensor = torchaudio.functional.resample(segment_tensor, orig_freq=segment.frame_rate, new_freq=AUDIO_SAMPLE_RATE) |
torchaudio.save(segment_path, segment_tensor, sample_rate=AUDIO_SAMPLE_RATE) |
segment_paths.append(segment_path) |
print("path for :", segment_path) |
return segment_paths |
import os |
import torchaudio |
def preprocess_audio22(input_audio_path: str): |
print("preprocess_audio start") |
print("Audio Path :", input_audio_path) |
audio_segments = split_audio(input_audio_path) |
temp_folder = os.path.join(os.getcwd(), "path_to_temp_folder") |
os.makedirs(temp_folder, exist_ok=True) |
segment_paths = [] |
for i, segment in enumerate(audio_segments): |
temp_segment_path = os.path.join(temp_folder, f"temp_segment_{i}.wav") |
segment.export(temp_segment_path, format="wav") |
arr, org_sr = torchaudio.load(temp_segment_path) |
new_arr = torchaudio.functional.resample(arr, orig_freq=org_sr, new_freq=AUDIO_SAMPLE_RATE) |
segment_path = os.path.join(temp_folder, f"segment_{i}.wav") |
torchaudio.save(segment_path, new_arr, sample_rate=AUDIO_SAMPLE_RATE) |
segment_paths.append(segment_path) |
print("Path for :", segment_path) |
return segment_paths |
def preprocess_audio222(input_audio_path: str): |
print("preprocess_audio start") |
print("Audio Path :",input_audio_path) |
audio_segments = split_audio(input_audio_path) |
temp_folder = os.getcwd()+"/path_to_temp_folder" |
os.makedirs(temp_folder, exist_ok=True) |
segment_paths = [] |
for i, segment in enumerate(audio_segments): |
segment_path = os.path.join(temp_folder, f"segment_{i}.wav") |
segment.export(segment_path, format="wav") |
segment_paths.append(segment_path) |
print("path for : ",segment_path) |
return segment_paths |
def process_segment(segment_path, source_language_code, target_language_code): |
with pathlib.Path(segment_path).open("rb") as fb: |
block = MemoryBlock(fb.read()) |
example = decode_audio(block) |
example = convert_to_fbank(example) |
example = normalize_fbank(example) |
example = collate(example) |
source_sentences, _ = m4t_translator.predict( |
input=example["fbank"], |
task_str="S2TT", |
tgt_lang=source_language_code, |
text_generation_opts=m4t_text_generation_opts, |
) |
source_text = str(source_sentences[0]) |
prosody_encoder_input = example["gcmvn_fbank"] |
text_output, unit_output = translator.predict( |
example["fbank"], |
"S2ST", |
tgt_lang=target_language_code, |
src_lang=source_language_code, |
text_generation_opts=text_generation_opts, |
unit_generation_ngram_filtering=False, |
duration_factor=1.0, |
prosody_encoder_input=prosody_encoder_input, |
src_text=source_text, |
) |
speech_output = pretssel_generator.predict( |
unit_output.units, |
tgt_lang=target_language_code, |
prosody_encoder_input=prosody_encoder_input, |
) |
segment_output_audio_path = os.path.join(os.getcwd(), "result", f"segment_audio_{os.path.basename(segment_path)}") |
os.makedirs(os.path.dirname(segment_output_audio_path), exist_ok=True) |
torchaudio.save( |
segment_output_audio_path, |
speech_output.audio_wavs[0][0].to(torch.float32).cpu(), |
sample_rate=speech_output.sample_rate, |
) |
segment_output_audio_path = adjust_audio_duration(segment_path, segment_output_audio_path) |
text_out = remove_prosody_tokens_from_text(str(text_output[0])) |
print("Audio ici : ",segment_output_audio_path) |
return segment_output_audio_path, text_out |
from typing import Tuple |
def run2( |
input_audio_path: str, |
source_language: str, |
target_language: str, |
) -> Tuple[str, str]: |
target_language_code = LANGUAGE_NAME_TO_CODE[target_language] |
source_language_code = LANGUAGE_NAME_TO_CODE[source_language] |
preprocess_audio(input_audio_path) |
with pathlib.Path(input_audio_path).open("rb") as fb: |
block = MemoryBlock(fb.read()) |
example = decode_audio(block) |
example = convert_to_fbank(example) |
example = normalize_fbank(example) |
example = collate(example) |
source_sentences, _ = m4t_translator.predict( |
input=example["fbank"], |
task_str="S2TT", |
tgt_lang=source_language_code, |
text_generation_opts=m4t_text_generation_opts, |
) |
source_text = str(source_sentences[0]) |
prosody_encoder_input = example["gcmvn_fbank"] |
text_output, unit_output = translator.predict( |
example["fbank"], |
"S2ST", |
tgt_lang=target_language_code, |
src_lang=source_language_code, |
text_generation_opts=text_generation_opts, |
unit_generation_ngram_filtering=False, |
duration_factor=1.0, |
prosody_encoder_input=prosody_encoder_input, |
src_text=source_text, |
) |
speech_output = pretssel_generator.predict( |
unit_output.units, |
tgt_lang=target_language_code, |
prosody_encoder_input=prosody_encoder_input, |
) |
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f: |
torchaudio.save( |
f.name, |
speech_output.audio_wavs[0][0].to(torch.float32).cpu(), |
sample_rate=speech_output.sample_rate, |
) |
text_out = remove_prosody_tokens_from_text(str(text_output[0])) |
return f.name, text_out |
def run(input_audio_path: str, source_language: str, target_language: str) -> tuple[str, str]: |
target_language_code = LANGUAGE_NAME_TO_CODE[target_language] |
source_language_code = LANGUAGE_NAME_TO_CODE[source_language] |
segment_paths = preprocess_audio22(input_audio_path) |
print("preprocess_audio end") |
final_text = "" |
final_audio = AudioSegment.silent(duration=0) |
for segment_path in segment_paths: |
segment_audio_path, segment_text = process_segment(segment_path, source_language_code, target_language_code) |
final_text += segment_text + " " |
segment_audio = AudioSegment.from_file(segment_audio_path) |
final_audio += segment_audio |
output_audio_path = os.path.join(os.getcwd(), "result", "audio.wav") |
os.makedirs(os.path.dirname(output_audio_path), exist_ok=True) |
final_audio.export(output_audio_path, format="wav") |
text_out = remove_prosody_tokens_from_text(final_text.strip()) |
return output_audio_path, text_out |
"English", |
"French", |
"German", |
"Spanish", |
] |
from flask import Flask, request, jsonify |
import torch |
import torchaudio |
app = Flask(__name__) |
@app.route('/translate', methods=['POST']) |
def translate(): |
data = request.json |
input_audio_path = data['input_audio_path'] |
source_language = data['source_language'] |
target_language = data['target_language'] |
output_audio_path, output_text = run(input_audio_path, source_language, target_language) |
return jsonify({ |
'output_audio_path': output_audio_path, |
'output_text': output_text |
}) |
import os |
url = "https://youtu.be/qb_tHWGJOp8?si=10qB2JApy0q3XY76" |
input_audio_path = dowloadYoutubeAudio(url) |
source_language = "French" |
target_language = "English" |
print("Audio à traiter : ",input_audio_path) |
output_audio_path, output_text = run(input_audio_path, source_language, target_language) |
print("output_audio_path : ",output_audio_path) |