Spaces:
Runtime error
Runtime error
import os | |
import torch | |
import gradio as gr | |
from transformers import AutoModelForCausalLM, AutoTokenizer | |
from huggingface_hub import snapshot_download | |
from dotenv import load_dotenv | |
# Load environment variables | |
load_dotenv() | |
# Set number of threads (adjust based on your CPU cores) | |
torch.set_num_threads(4) | |
# Device and torch dtype selection | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
torch_dtype = torch.bfloat16 if device == "cuda" else torch.float32 | |
# No-op decorator for CPU mode (if you had GPU-specific decorators) | |
def gpu_decorator(func): | |
return func | |
# Import SNAC after setting device | |
from snac import SNAC | |
print("Loading SNAC model...") | |
snac_model = SNAC.from_pretrained("hubertsiuzdak/snac_24khz") | |
snac_model = snac_model.to(device) | |
snac_model.eval() # Set SNAC to eval mode | |
model_name = "canopylabs/orpheus-3b-0.1-ft" | |
# Download only necessary files for the Orpheus model | |
snapshot_download( | |
repo_id=model_name, | |
allow_patterns=[ | |
"config.json", | |
"*.safetensors", | |
"model.safetensors.index.json", | |
], | |
ignore_patterns=[ | |
"optimizer.pt", | |
"pytorch_model.bin", | |
"training_args.bin", | |
"scheduler.pt", | |
"tokenizer.json", | |
"tokenizer_config.json", | |
"special_tokens_map.json", | |
"vocab.json", | |
"merges.txt", | |
"tokenizer.*" | |
] | |
) | |
print("Loading Orpheus model...") | |
model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch_dtype) | |
model.to(device) | |
model.eval() # Set the model to evaluation mode | |
# Optionally compile the model for PyTorch 2.0+ on CPU (if available) | |
if hasattr(torch, "compile") and device == "cpu": | |
try: | |
model = torch.compile(model) | |
print("Model compiled with torch.compile") | |
except Exception as e: | |
print("torch.compile not supported:", e) | |
tokenizer = AutoTokenizer.from_pretrained(model_name) | |
print(f"Orpheus model loaded to {device}") | |
def process_prompt(prompt, voice, tokenizer, device): | |
prompt = f"{voice}: {prompt}" | |
input_ids = tokenizer(prompt, return_tensors="pt").input_ids | |
start_token = torch.tensor([[128259]], dtype=torch.int64) | |
end_tokens = torch.tensor([[128009, 128260]], dtype=torch.int64) | |
modified_input_ids = torch.cat([start_token, input_ids, end_tokens], dim=1) | |
attention_mask = torch.ones_like(modified_input_ids) | |
return modified_input_ids.to(device), attention_mask.to(device) | |
def parse_output(generated_ids): | |
token_to_find = 128257 | |
token_to_remove = 128258 | |
token_indices = (generated_ids == token_to_find).nonzero(as_tuple=True) | |
if len(token_indices[1]) > 0: | |
last_occurrence_idx = token_indices[1][-1].item() | |
cropped_tensor = generated_ids[:, last_occurrence_idx + 1:] | |
else: | |
cropped_tensor = generated_ids | |
processed_rows = [] | |
for row in cropped_tensor: | |
masked_row = row[row != token_to_remove] | |
processed_rows.append(masked_row) | |
code_lists = [] | |
for row in processed_rows: | |
row_length = row.size(0) | |
new_length = (row_length // 7) * 7 | |
trimmed_row = row[:new_length] | |
trimmed_row = [t - 128266 for t in trimmed_row] | |
code_lists.append(trimmed_row) | |
return code_lists[0] | |
def redistribute_codes(code_list, snac_model): | |
snac_device = next(snac_model.parameters()).device | |
layer_1, layer_2, layer_3 = [], [], [] | |
for i in range((len(code_list) + 1) // 7): | |
layer_1.append(code_list[7 * i]) | |
layer_2.append(code_list[7 * i + 1] - 4096) | |
layer_3.append(code_list[7 * i + 2] - (2 * 4096)) | |
layer_3.append(code_list[7 * i + 3] - (3 * 4096)) | |
layer_2.append(code_list[7 * i + 4] - (4 * 4096)) | |
layer_3.append(code_list[7 * i + 5] - (5 * 4096)) | |
layer_3.append(code_list[7 * i + 6] - (6 * 4096)) | |
codes = [ | |
torch.tensor(layer_1, device=snac_device).unsqueeze(0), | |
torch.tensor(layer_2, device=snac_device).unsqueeze(0), | |
torch.tensor(layer_3, device=snac_device).unsqueeze(0) | |
] | |
audio_hat = snac_model.decode(codes) | |
return audio_hat.detach().squeeze().cpu().numpy() | |
def generate_speech(text, voice, temperature, top_p, repetition_penalty, max_new_tokens, progress=gr.Progress()): | |
if not text.strip(): | |
return None | |
try: | |
progress(0.05, "Processing text...") | |
input_ids, attention_mask = process_prompt(text, voice, tokenizer, device) | |
progress(0.2, "Generating tokens...") | |
with torch.inference_mode(): | |
generated_ids = model.generate( | |
input_ids=input_ids, | |
attention_mask=attention_mask, | |
max_new_tokens=max_new_tokens, | |
do_sample=True, | |
temperature=temperature, | |
top_p=top_p, | |
repetition_penalty=repetition_penalty, | |
num_return_sequences=1, | |
eos_token_id=128258, | |
) | |
progress(0.4, "Parsing tokens...") | |
code_list = parse_output(generated_ids) | |
progress(0.7, "Generating audio...") | |
audio_samples = redistribute_codes(code_list, snac_model) | |
progress(1.0, "Done") | |
return (24000, audio_samples) | |
except Exception as e: | |
print(f"Error generating speech: {e}") | |
return None | |
def convert_model_to_onnx(): | |
""" | |
Converts the Orpheus model to ONNX format using a dummy prompt. | |
The exported file will be saved as 'orpheus_model.onnx' in the working directory. | |
""" | |
dummy_prompt = "tara: Hello" | |
dummy_input = tokenizer(dummy_prompt, return_tensors="pt").input_ids.to(device) | |
file_path = "orpheus_model.onnx" | |
try: | |
# Export the model to ONNX format | |
torch.onnx.export( | |
model, | |
dummy_input, | |
file_path, | |
export_params=True, | |
opset_version=14, | |
input_names=["input_ids"], | |
output_names=["logits"], | |
dynamic_axes={ | |
"input_ids": {0: "batch_size", 1: "sequence_length"}, | |
"logits": {0: "batch_size", 1: "sequence_length"} | |
}, | |
) | |
return f"Model converted to ONNX and saved as '{file_path}'." | |
except Exception as e: | |
return f"Error during ONNX conversion: {e}" | |
# UI examples and voice choices | |
examples = [ | |
["Hey there my name is Tara, <chuckle> and I'm a speech generation model that can sound like a person.", "tara", 0.6, 0.95, 1.1, 1200], | |
["I've also been taught to understand and produce paralinguistic things like sighing, or chuckling, or yawning!", "dan", 0.7, 0.95, 1.1, 1200], | |
["I live in San Francisco, and have, uhm let's see, 3 billion 7 hundred ... well, let's just say a lot of parameters.", "emma", 0.6, 0.9, 1.2, 1200] | |
] | |
VOICES = ["tara", "dan", "josh", "emma"] | |
with gr.Blocks(title="Orpheus Text-to-Speech") as demo: | |
gr.Markdown(""" | |
# 🎵 Orpheus Text-to-Speech | |
Enter text to hear it converted to natural-sounding speech. | |
**Tips:** | |
- Use paralinguistic cues like `<chuckle>` or `<sigh>`. | |
- Longer text can produce more natural results. | |
""") | |
with gr.Row(): | |
with gr.Column(scale=3): | |
text_input = gr.Textbox(label="Text to speak", placeholder="Enter your text...", lines=5) | |
voice = gr.Dropdown(choices=VOICES, value="tara", label="Voice") | |
with gr.Accordion("Advanced Settings", open=False): | |
temperature = gr.Slider(minimum=0.1, maximum=1.5, value=0.6, step=0.05, label="Temperature", | |
info="Higher values produce more varied speech") | |
top_p = gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top P", | |
info="Nucleus sampling threshold") | |
repetition_penalty = gr.Slider(minimum=1.0, maximum=2.0, value=1.1, step=0.05, label="Repetition Penalty", | |
info="Discourage repetition") | |
max_new_tokens = gr.Slider(minimum=100, maximum=2000, value=1200, step=100, label="Max Length", | |
info="Maximum generated tokens") | |
with gr.Row(): | |
submit_btn = gr.Button("Generate Speech", variant="primary") | |
clear_btn = gr.Button("Clear") | |
with gr.Column(scale=2): | |
audio_output = gr.Audio(label="Generated Speech", type="numpy") | |
gr.Examples( | |
examples=examples, | |
inputs=[text_input, voice, temperature, top_p, repetition_penalty, max_new_tokens], | |
outputs=audio_output, | |
fn=generate_speech, | |
cache_examples=True, | |
) | |
submit_btn.click( | |
fn=generate_speech, | |
inputs=[text_input, voice, temperature, top_p, repetition_penalty, max_new_tokens], | |
outputs=audio_output | |
) | |
clear_btn.click( | |
fn=lambda: (None, None), | |
inputs=[], | |
outputs=[text_input, audio_output] | |
) | |
gr.Markdown("## ONNX Conversion") | |
onnx_btn = gr.Button("Convert Model to ONNX") | |
onnx_output = gr.Textbox(label="Conversion Output") | |
onnx_btn.click(fn=convert_model_to_onnx, inputs=[], outputs=onnx_output) | |
if __name__ == "__main__": | |
demo.queue().launch(share=False, ssr_mode=False) |