|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
import pathlib |
|
import tempfile |
|
from pydub import AudioSegment, silence |
|
import gradio as gr |
|
import torch |
|
import torchaudio |
|
from fairseq2.assets import InProcAssetMetadataProvider, asset_store |
|
from fairseq2.data import Collater, SequenceData, VocabularyInfo |
|
from fairseq2.data.audio import ( |
|
AudioDecoder, |
|
WaveformToFbankConverter, |
|
WaveformToFbankOutput, |
|
) |
|
|
|
from seamless_communication.inference import SequenceGeneratorOptions |
|
from fairseq2.generation import NGramRepeatBlockProcessor |
|
from fairseq2.memory import MemoryBlock |
|
from fairseq2.typing import DataType, Device |
|
from huggingface_hub import snapshot_download |
|
from seamless_communication.inference import BatchedSpeechOutput, Translator, SequenceGeneratorOptions |
|
from seamless_communication.models.generator.loader import load_pretssel_vocoder_model |
|
from seamless_communication.models.unity import ( |
|
UnitTokenizer, |
|
load_gcmvn_stats, |
|
load_unity_text_tokenizer, |
|
load_unity_unit_tokenizer, |
|
) |
|
from torch.nn import Module |
|
from seamless_communication.cli.expressivity.evaluate.pretssel_inference_helper import PretsselGenerator |
|
|
|
from utils import LANGUAGE_CODE_TO_NAME |
|
|
|
DESCRIPTION = """\ |
|
# Seamless Expressive |
|
[SeamlessExpressive](https://github.com/facebookresearch/seamless_communication) is a speech-to-speech translation model that captures certain underexplored aspects of prosody such as speech rate and pauses, while preserving the style of one's voice and high content translation quality. |
|
""" |
|
|
|
CACHE_EXAMPLES = os.getenv("CACHE_EXAMPLES") == "1" and torch.cuda.is_available() |
|
|
|
CHECKPOINTS_PATH = pathlib.Path(os.getenv("CHECKPOINTS_PATH", "/workspace/seamless_communication/demo/expressive/models")) |
|
if not CHECKPOINTS_PATH.exists(): |
|
snapshot_download(repo_id="facebook/seamless-expressive", repo_type="model", local_dir=CHECKPOINTS_PATH) |
|
snapshot_download(repo_id="facebook/seamless-m4t-v2-large", repo_type="model", local_dir=CHECKPOINTS_PATH) |
|
|
|
|
|
|
|
asset_store.env_resolvers.clear() |
|
asset_store.env_resolvers.append(lambda: "demo") |
|
|
|
|
|
|
|
demo_metadata = [ |
|
{ |
|
"name": "seamless_expressivity@demo", |
|
"checkpoint": f"file://{CHECKPOINTS_PATH}/m2m_expressive_unity.pt", |
|
"char_tokenizer": f"file://{CHECKPOINTS_PATH}/spm_char_lang38_tc.model", |
|
}, |
|
{ |
|
"name": "vocoder_pretssel@demo", |
|
"checkpoint": f"file://{CHECKPOINTS_PATH}/pretssel_melhifigan_wm-final.pt", |
|
}, |
|
{ |
|
"name": "seamlessM4T_v2_large@demo", |
|
"checkpoint": f"file://{CHECKPOINTS_PATH}/seamlessM4T_v2_large.pt", |
|
"char_tokenizer": f"file://{CHECKPOINTS_PATH}/spm_char_lang38_tc.model", |
|
}, |
|
] |
|
|
|
asset_store.metadata_providers.append(InProcAssetMetadataProvider(demo_metadata)) |
|
|
|
LANGUAGE_NAME_TO_CODE = {v: k for k, v in LANGUAGE_CODE_TO_NAME.items()} |
|
|
|
|
|
if torch.cuda.is_available(): |
|
device = torch.device("cuda:0") |
|
dtype = torch.float16 |
|
else: |
|
device = torch.device("cpu") |
|
dtype = torch.float32 |
|
|
|
|
|
MODEL_NAME = "seamless_expressivity" |
|
VOCODER_NAME = "vocoder_pretssel" |
|
|
|
|
|
m4t_translator = Translator( |
|
model_name_or_card="seamlessM4T_v2_large", |
|
vocoder_name_or_card=None, |
|
device=device, |
|
dtype=dtype, |
|
) |
|
unit_tokenizer = load_unity_unit_tokenizer(MODEL_NAME) |
|
|
|
_gcmvn_mean, _gcmvn_std = load_gcmvn_stats(VOCODER_NAME) |
|
gcmvn_mean = torch.tensor(_gcmvn_mean, device=device, dtype=dtype) |
|
gcmvn_std = torch.tensor(_gcmvn_std, device=device, dtype=dtype) |
|
|
|
translator = Translator( |
|
MODEL_NAME, |
|
vocoder_name_or_card=None, |
|
device=device, |
|
dtype=dtype, |
|
apply_mintox=False, |
|
) |
|
|
|
text_generation_opts = SequenceGeneratorOptions( |
|
beam_size=5, |
|
unk_penalty=torch.inf, |
|
soft_max_seq_len=(0, 200), |
|
step_processor=NGramRepeatBlockProcessor( |
|
ngram_size=10, |
|
), |
|
) |
|
m4t_text_generation_opts = SequenceGeneratorOptions( |
|
beam_size=5, |
|
unk_penalty=torch.inf, |
|
soft_max_seq_len=(1, 200), |
|
step_processor=NGramRepeatBlockProcessor( |
|
ngram_size=10, |
|
), |
|
) |
|
|
|
pretssel_generator = PretsselGenerator( |
|
VOCODER_NAME, |
|
vocab_info=unit_tokenizer.vocab_info, |
|
device=device, |
|
dtype=dtype, |
|
) |
|
|
|
decode_audio = AudioDecoder(dtype=torch.float32, device=device) |
|
|
|
convert_to_fbank = WaveformToFbankConverter( |
|
num_mel_bins=80, |
|
waveform_scale=2**15, |
|
channel_last=True, |
|
standardize=False, |
|
device=device, |
|
dtype=dtype, |
|
) |
|
|
|
|
|
def normalize_fbank(data: WaveformToFbankOutput) -> WaveformToFbankOutput: |
|
fbank = data["fbank"] |
|
std, mean = torch.std_mean(fbank, dim=0) |
|
data["fbank"] = fbank.subtract(mean).divide(std) |
|
data["gcmvn_fbank"] = fbank.subtract(gcmvn_mean).divide(gcmvn_std) |
|
return data |
|
|
|
|
|
collate = Collater(pad_value=0, pad_to_multiple=1) |
|
|
|
|
|
AUDIO_SAMPLE_RATE = 16000 |
|
MAX_INPUT_AUDIO_LENGTH = 10 |
|
|
|
|
|
from pydub import AudioSegment |
|
|
|
def adjust_audio_duration(input_audio_path, output_audio_path): |
|
input_audio = AudioSegment.from_file(input_audio_path) |
|
output_audio = AudioSegment.from_file(output_audio_path) |
|
|
|
input_duration = len(input_audio) |
|
output_duration = len(output_audio) |
|
|
|
|
|
duration_diff = input_duration - output_duration |
|
|
|
|
|
if duration_diff > 0: |
|
print("Duration diff : ",duration_diff) |
|
silence = AudioSegment.silent(duration=duration_diff) |
|
output_audio += silence |
|
|
|
|
|
output_audio.export(output_audio_path, format='wav') |
|
|
|
return output_audio_path |
|
|
|
|
|
|
|
|
|
import yt_dlp |
|
def dowloadYoutubeAudio(url): |
|
print("Téléchargement de l'audio YouTube en cours...") |
|
ydl_opts = { |
|
'format': 'm4a/bestaudio/best', |
|
'outtmpl': os.getcwd() + "/audio", |
|
'postprocessors': [{ |
|
'key': 'FFmpegExtractAudio', |
|
'preferredcodec': 'wav', |
|
}] |
|
} |
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
error_code = ydl.download([url]) |
|
|
|
if error_code == 0: |
|
print("Sauvegarde du fichier audio...") |
|
print("download_finished : ", os.getcwd() + "/audio.wav") |
|
else: |
|
print("error : Échec du téléchargement...") |
|
|
|
return os.getcwd() + "/audio.wav" |
|
|
|
|
|
def split_audio(input_audio_path): |
|
print("Start Split Audio") |
|
audio = AudioSegment.from_file(input_audio_path) |
|
silence_thresh = -20 |
|
min_silence_len = 300 |
|
|
|
chunks = [] |
|
current_chunk = AudioSegment.silent(duration=0) |
|
for ms in range(0, len(audio), 10): |
|
segment = audio[ms:ms + 10] |
|
current_chunk += segment |
|
|
|
if len(current_chunk) >= 8000: |
|
|
|
if silence.detect_silence(current_chunk[-min_silence_len:], min_silence_len=min_silence_len, silence_thresh=silence_thresh): |
|
|
|
print("Silence détecté, découpage du segment") |
|
chunks.append(current_chunk) |
|
current_chunk = AudioSegment.silent(duration=0) |
|
|
|
if len(current_chunk) >= 8900: |
|
print("Durée maximale atteinte, découpage du segment") |
|
chunks.append(current_chunk) |
|
current_chunk = AudioSegment.silent(duration=0) |
|
|
|
|
|
if len(current_chunk) > 0: |
|
chunks.append(current_chunk) |
|
|
|
print('Nombre de segments valides:', len(chunks)) |
|
return chunks |
|
|
|
|
|
|
|
|
|
def remove_prosody_tokens_from_text(text): |
|
|
|
text = text.replace("*", "").replace("=", "") |
|
text = " ".join(text.split()) |
|
return text |
|
|
|
|
|
|
|
|
|
|
|
|
|
import torchaudio |
|
|
|
AUDIO_SAMPLE_RATE = 16000 |
|
|
|
def preprocess_audio(input_audio_path: str): |
|
print("preprocess_audio start") |
|
print("Audio Path :", input_audio_path) |
|
audio_segments = split_audio(input_audio_path) |
|
temp_folder = os.path.join(os.getcwd(), "path_to_temp_folder") |
|
os.makedirs(temp_folder, exist_ok=True) |
|
segment_paths = [] |
|
|
|
for i, segment in enumerate(audio_segments): |
|
segment_path = os.path.join(temp_folder, f"segment_{i}.wav") |
|
segment_audio = segment.get_array_of_samples() |
|
segment_tensor = torch.tensor(segment_audio).unsqueeze(0).float() |
|
|
|
|
|
segment_tensor = torchaudio.functional.resample(segment_tensor, orig_freq=segment.frame_rate, new_freq=AUDIO_SAMPLE_RATE) |
|
|
|
torchaudio.save(segment_path, segment_tensor, sample_rate=AUDIO_SAMPLE_RATE) |
|
segment_paths.append(segment_path) |
|
print("path for :", segment_path) |
|
|
|
return segment_paths |
|
|
|
|
|
|
|
import os |
|
import torchaudio |
|
|
|
|
|
AUDIO_SAMPLE_RATE = 16000 |
|
|
|
def preprocess_audio22(input_audio_path: str): |
|
print("preprocess_audio start") |
|
print("Audio Path :", input_audio_path) |
|
|
|
|
|
audio_segments = split_audio(input_audio_path) |
|
|
|
|
|
temp_folder = os.path.join(os.getcwd(), "path_to_temp_folder") |
|
os.makedirs(temp_folder, exist_ok=True) |
|
|
|
segment_paths = [] |
|
for i, segment in enumerate(audio_segments): |
|
|
|
temp_segment_path = os.path.join(temp_folder, f"temp_segment_{i}.wav") |
|
segment.export(temp_segment_path, format="wav") |
|
|
|
|
|
arr, org_sr = torchaudio.load(temp_segment_path) |
|
new_arr = torchaudio.functional.resample(arr, orig_freq=org_sr, new_freq=AUDIO_SAMPLE_RATE) |
|
|
|
|
|
segment_path = os.path.join(temp_folder, f"segment_{i}.wav") |
|
torchaudio.save(segment_path, new_arr, sample_rate=AUDIO_SAMPLE_RATE) |
|
|
|
|
|
segment_paths.append(segment_path) |
|
print("Path for :", segment_path) |
|
|
|
return segment_paths |
|
|
|
|
|
def preprocess_audio222(input_audio_path: str): |
|
|
|
print("preprocess_audio start") |
|
print("Audio Path :",input_audio_path) |
|
audio_segments = split_audio(input_audio_path) |
|
temp_folder = os.getcwd()+"/path_to_temp_folder" |
|
os.makedirs(temp_folder, exist_ok=True) |
|
segment_paths = [] |
|
for i, segment in enumerate(audio_segments): |
|
segment_path = os.path.join(temp_folder, f"segment_{i}.wav") |
|
segment.export(segment_path, format="wav") |
|
segment_paths.append(segment_path) |
|
print("path for : ",segment_path) |
|
|
|
return segment_paths |
|
|
|
|
|
|
|
|
|
def process_segment(segment_path, source_language_code, target_language_code): |
|
|
|
|
|
with pathlib.Path(segment_path).open("rb") as fb: |
|
block = MemoryBlock(fb.read()) |
|
example = decode_audio(block) |
|
|
|
example = convert_to_fbank(example) |
|
example = normalize_fbank(example) |
|
example = collate(example) |
|
|
|
|
|
source_sentences, _ = m4t_translator.predict( |
|
input=example["fbank"], |
|
task_str="S2TT", |
|
tgt_lang=source_language_code, |
|
text_generation_opts=m4t_text_generation_opts, |
|
) |
|
source_text = str(source_sentences[0]) |
|
|
|
prosody_encoder_input = example["gcmvn_fbank"] |
|
text_output, unit_output = translator.predict( |
|
example["fbank"], |
|
"S2ST", |
|
tgt_lang=target_language_code, |
|
src_lang=source_language_code, |
|
text_generation_opts=text_generation_opts, |
|
unit_generation_ngram_filtering=False, |
|
duration_factor=1.0, |
|
prosody_encoder_input=prosody_encoder_input, |
|
src_text=source_text, |
|
) |
|
speech_output = pretssel_generator.predict( |
|
unit_output.units, |
|
tgt_lang=target_language_code, |
|
prosody_encoder_input=prosody_encoder_input, |
|
) |
|
|
|
|
|
segment_output_audio_path = os.path.join(os.getcwd(), "result", f"segment_audio_{os.path.basename(segment_path)}") |
|
os.makedirs(os.path.dirname(segment_output_audio_path), exist_ok=True) |
|
|
|
|
|
torchaudio.save( |
|
segment_output_audio_path, |
|
speech_output.audio_wavs[0][0].to(torch.float32).cpu(), |
|
sample_rate=speech_output.sample_rate, |
|
) |
|
segment_output_audio_path = adjust_audio_duration(segment_path, segment_output_audio_path) |
|
|
|
|
|
text_out = remove_prosody_tokens_from_text(str(text_output[0])) |
|
print("Audio ici : ",segment_output_audio_path) |
|
return segment_output_audio_path, text_out |
|
|
|
|
|
|
|
|
|
|
|
from typing import Tuple |
|
|
|
def run2( |
|
input_audio_path: str, |
|
source_language: str, |
|
target_language: str, |
|
) -> Tuple[str, str]: |
|
target_language_code = LANGUAGE_NAME_TO_CODE[target_language] |
|
source_language_code = LANGUAGE_NAME_TO_CODE[source_language] |
|
|
|
preprocess_audio(input_audio_path) |
|
|
|
with pathlib.Path(input_audio_path).open("rb") as fb: |
|
block = MemoryBlock(fb.read()) |
|
example = decode_audio(block) |
|
|
|
example = convert_to_fbank(example) |
|
example = normalize_fbank(example) |
|
example = collate(example) |
|
|
|
|
|
source_sentences, _ = m4t_translator.predict( |
|
input=example["fbank"], |
|
task_str="S2TT", |
|
tgt_lang=source_language_code, |
|
text_generation_opts=m4t_text_generation_opts, |
|
) |
|
source_text = str(source_sentences[0]) |
|
|
|
prosody_encoder_input = example["gcmvn_fbank"] |
|
text_output, unit_output = translator.predict( |
|
example["fbank"], |
|
"S2ST", |
|
tgt_lang=target_language_code, |
|
src_lang=source_language_code, |
|
text_generation_opts=text_generation_opts, |
|
unit_generation_ngram_filtering=False, |
|
duration_factor=1.0, |
|
prosody_encoder_input=prosody_encoder_input, |
|
src_text=source_text, |
|
) |
|
speech_output = pretssel_generator.predict( |
|
unit_output.units, |
|
tgt_lang=target_language_code, |
|
prosody_encoder_input=prosody_encoder_input, |
|
) |
|
|
|
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f: |
|
torchaudio.save( |
|
f.name, |
|
speech_output.audio_wavs[0][0].to(torch.float32).cpu(), |
|
sample_rate=speech_output.sample_rate, |
|
) |
|
|
|
text_out = remove_prosody_tokens_from_text(str(text_output[0])) |
|
|
|
return f.name, text_out |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def run(input_audio_path: str, source_language: str, target_language: str) -> tuple[str, str]: |
|
target_language_code = LANGUAGE_NAME_TO_CODE[target_language] |
|
source_language_code = LANGUAGE_NAME_TO_CODE[source_language] |
|
|
|
segment_paths = preprocess_audio22(input_audio_path) |
|
print("preprocess_audio end") |
|
final_text = "" |
|
final_audio = AudioSegment.silent(duration=0) |
|
|
|
|
|
for segment_path in segment_paths: |
|
segment_audio_path, segment_text = process_segment(segment_path, source_language_code, target_language_code) |
|
final_text += segment_text + " " |
|
segment_audio = AudioSegment.from_file(segment_audio_path) |
|
final_audio += segment_audio |
|
|
|
output_audio_path = os.path.join(os.getcwd(), "result", "audio.wav") |
|
os.makedirs(os.path.dirname(output_audio_path), exist_ok=True) |
|
final_audio.export(output_audio_path, format="wav") |
|
|
|
text_out = remove_prosody_tokens_from_text(final_text.strip()) |
|
|
|
return output_audio_path, text_out |
|
|
|
|
|
|
|
|
|
|
|
TARGET_LANGUAGE_NAMES = [ |
|
"English", |
|
"French", |
|
"German", |
|
"Spanish", |
|
] |
|
|
|
|
|
from flask import Flask, request, jsonify |
|
import torch |
|
import torchaudio |
|
|
|
app = Flask(__name__) |
|
|
|
@app.route('/translate', methods=['POST']) |
|
def translate(): |
|
|
|
data = request.json |
|
input_audio_path = data['input_audio_path'] |
|
source_language = data['source_language'] |
|
target_language = data['target_language'] |
|
|
|
|
|
output_audio_path, output_text = run(input_audio_path, source_language, target_language) |
|
|
|
|
|
return jsonify({ |
|
'output_audio_path': output_audio_path, |
|
'output_text': output_text |
|
}) |
|
|
|
|
|
import os |
|
|
|
url = "https://youtu.be/qb_tHWGJOp8?si=10qB2JApy0q3XY76" |
|
input_audio_path = dowloadYoutubeAudio(url) |
|
|
|
|
|
source_language = "French" |
|
target_language = "English" |
|
print("Audio à traiter : ",input_audio_path) |
|
output_audio_path, output_text = run(input_audio_path, source_language, target_language) |
|
|
|
print("output_audio_path : ",output_audio_path) |
|
|