Spaces:
Running
Running
import gradio as gr | |
import torch | |
import torchaudio | |
from transformers import AutoTokenizer, AutoModelForCausalLM | |
from speechtokenizer import SpeechTokenizer | |
from audiotools import AudioSignal | |
import bitsandbytes as bnb # Import bitsandbytes for INT8 quantization | |
import numpy as np | |
from uuid import uuid4 | |
# Load the necessary models and tokenizers | |
model_path = "Vikhrmodels/llama_asr_tts_24000" | |
tokenizer = AutoTokenizer.from_pretrained(model_path, cache_dir=".") | |
# Специальные токены | |
start_audio_token = "<soa>" | |
end_audio_token = "<eoa>" | |
end_sequence_token = "<eos>" | |
# Константы | |
n_codebooks = 3 | |
max_seq_length = 1024 | |
top_k = 20 | |
from safetensors.torch import load_file | |
def convert_to_16_bit_wav(data): | |
# Based on: https://docs.scipy.org/doc/scipy/reference/generated/scipy.io.wavfile.write.html | |
# breakpoint() | |
if data.dtype == np.float32: | |
# warnings.warn( | |
# "Audio data is not in 16-bit integer format." | |
# "Trying to convert to 16-bit int format." | |
# ) | |
data = data / np.abs(data).max() | |
data = data * 32767 | |
data = data.astype(np.int16) | |
elif data.dtype == np.int32: | |
# warnings.warn( | |
# "Audio data is not in 16-bit integer format." | |
# "Trying to convert to 16-bit int format." | |
# ) | |
data = data / 65538 | |
data = data.astype(np.int16) | |
elif data.dtype == np.int16: | |
pass | |
elif data.dtype == np.uint8: | |
# warnings.warn( | |
# "Audio data is not in 16-bit integer format." | |
# "Trying to convert to 16-bit int format." | |
# ) | |
data = data * 257 - 32768 | |
data = data.astype(np.int16) | |
else: | |
raise ValueError("Audio data cannot be converted to " "16-bit int format.") | |
return data | |
# Load the model with INT8 quantization | |
model = AutoModelForCausalLM.from_pretrained( | |
model_path, | |
cache_dir=".", | |
load_in_8bit=True, # Enable loading in INT8 | |
device_map="auto" # Automatically map model to available devices | |
) | |
# Configurations for Speech Tokenizer | |
config_path = "audiotokenizer/speechtokenizer_hubert_avg_config.json" | |
ckpt_path = "audiotokenizer/SpeechTokenizer.pt" | |
quantizer = SpeechTokenizer.load_from_checkpoint(config_path, ckpt_path) | |
quantizer.eval() | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
# Перемещение всех слоев квантизатора на устройство и их заморозка | |
def freeze_entire_model(model): | |
for n, p in model.named_parameters(): | |
p.requires_grad = False | |
return model | |
for n, child in quantizer.named_children(): | |
child.to(device) | |
child = freeze_entire_model(child) | |
# Функция для создания токенов заполнения для аудио | |
def get_audio_padding_tokens(quantizer): | |
audio = torch.zeros((1, 1, 1)).to(device) | |
codes = quantizer.encode(audio) | |
del audio | |
torch.cuda.empty_cache() | |
return {"audio_tokens": codes.squeeze(1)} | |
# Функция для декодирования аудио из токенов | |
def decode_audio(tokens, quantizer, pad_tokens, n_original_tokens): | |
start = torch.nonzero(tokens == tokenizer(start_audio_token)["input_ids"][-1]) | |
end = torch.nonzero(tokens == tokenizer(end_audio_token)["input_ids"][-1]) | |
start = start[0, -1] + 1 if len(start) else 0 | |
end = end[0, -1] if len(end) else tokens.shape[-1] | |
audio_tokens = tokens[start:end] % n_original_tokens | |
reminder = audio_tokens.shape[-1] % n_codebooks | |
if reminder: | |
audio_tokens = torch.cat([audio_tokens, pad_tokens[reminder:n_codebooks]], dim=0) | |
transposed = audio_tokens.view(-1, n_codebooks).t() | |
codes = transposed.view(n_codebooks, 1, -1).to(device) | |
audio = quantizer.decode(codes).squeeze(0) | |
torch.cuda.empty_cache() | |
xp = str(uuid4())+'.wav' | |
AudioSignal(audio.detach().cpu().numpy(),quantizer.sample_rate).write(xp) | |
return xp | |
# Пример использования | |
# Функция инференса для текста на входе и аудио на выходе | |
def infer_text_to_audio(text, model, tokenizer, quantizer, max_seq_length=1024, top_k=20): | |
text_tokenized = tokenizer(text, return_tensors="pt") | |
text_input_tokens = text_tokenized["input_ids"].to(device) | |
soa = tokenizer(start_audio_token, return_tensors="pt")["input_ids"][:, -1:].to(device) | |
eoa = tokenizer(end_audio_token, return_tensors="pt")["input_ids"][:, -1:].to(device) | |
text_tokens = torch.cat([text_input_tokens, soa], dim=1) | |
attention_mask = torch.ones(text_tokens.size(), device=device) | |
output_audio_tokens = model.generate(text_tokens, attention_mask=attention_mask, max_new_tokens=max_seq_length, top_k=top_k, do_sample=True) | |
padding_tokens = get_audio_padding_tokens(quantizer)["audio_tokens"].to(device) | |
audio_signal = decode_audio(output_audio_tokens[0], quantizer, padding_tokens.t()[0], len(tokenizer) - 1024) | |
return audio_signal | |
# Функция инференса для аудио на входе и текста на выходе | |
def infer_audio_to_text(audio_path, model, tokenizer, quantizer, max_seq_length=1024, top_k=20): | |
audio_data, sample_rate = torchaudio.load(audio_path) | |
audio = audio_data.view(1, 1, -1).float().to(device) | |
codes = quantizer.encode(audio) | |
n_codebooks_a = 1 | |
raw_audio_tokens = codes[:, :n_codebooks_a] + len(tokenizer) - 1024 | |
soa = tokenizer(start_audio_token, return_tensors="pt")["input_ids"][:, -1:].to(device) | |
eoa = tokenizer(end_audio_token, return_tensors="pt")["input_ids"][:, -1:].to(device) | |
audio_tokens = torch.cat([soa, raw_audio_tokens.view(1, -1), eoa], dim=1) | |
attention_mask = torch.ones(audio_tokens.size(), device=device) | |
output_text_tokens = model.generate(audio_tokens, attention_mask=attention_mask, max_new_tokens=max_seq_length, top_k=top_k, do_sample=True) | |
output_text_tokens = output_text_tokens.cpu()[0] | |
output_text_tokens = output_text_tokens[output_text_tokens < tokenizer(start_audio_token)["input_ids"][-1]] | |
decoded_text = tokenizer.decode(output_text_tokens, skip_special_tokens=True) | |
return decoded_text | |
# Functions for inference | |
def infer_text_to_audio_gr(text): | |
audio_signal = infer_text_to_audio(text.strip().upper(), model, tokenizer, quantizer) | |
return audio_signal | |
def infer_audio_to_text_gr(audio_path): | |
generated_text = infer_audio_to_text(audio_path, model, tokenizer, quantizer) | |
return generated_text | |
# Gradio Interface | |
text_to_audio_interface = gr.Interface( | |
fn=infer_text_to_audio_gr, | |
inputs=gr.Textbox(label="Input Text"), | |
outputs=gr.Audio(label="Аудио Ответ"), | |
title="T2S", | |
description="Модель в режиме ответа в аудио", | |
allow_flagging='never', | |
) | |
audio_to_text_interface = gr.Interface( | |
fn=infer_audio_to_text_gr, | |
inputs=gr.Audio(type="filepath", label="Input Audio"), | |
outputs=gr.Textbox(label="Текстовый ответ"), | |
title="S2T", | |
description="Модель в режиме ответа в тексте", | |
allow_flagging='never' | |
) | |
# Launch Gradio App | |
demo = gr.TabbedInterface([text_to_audio_interface, audio_to_text_interface], ["Текст - Аудио", "Аудио - Текст"]) | |
demo.launch(share=True) |