|
from fastapi import HTTPException, Body |
|
from fastapi.responses import StreamingResponse |
|
|
|
import io |
|
from numpy import clip |
|
import soundfile as sf |
|
from pydantic import BaseModel, Field |
|
from fastapi.responses import FileResponse |
|
|
|
|
|
from modules.synthesize_audio import synthesize_audio |
|
from modules.normalization import text_normalize |
|
|
|
from modules import generate_audio as generate |
|
|
|
|
|
from typing import Literal |
|
import pyrubberband as pyrb |
|
|
|
from modules.api import utils as api_utils |
|
from modules.api.Api import APIManager |
|
|
|
import numpy as np |
|
|
|
|
|
class AudioSpeechRequest(BaseModel): |
|
input: str |
|
model: str = "chattts-4w" |
|
voice: str = "female2" |
|
response_format: Literal["mp3", "wav"] = "mp3" |
|
speed: int = Field(1, ge=1, le=10, description="Speed of the audio") |
|
style: str = "" |
|
|
|
|
|
batch_size: int = Field(1, ge=1, le=10, description="Batch size") |
|
spliter_threshold: float = Field( |
|
100, ge=10, le=1024, description="Threshold for sentence spliter" |
|
) |
|
|
|
|
|
async def openai_speech_api( |
|
request: AudioSpeechRequest = Body( |
|
..., description="JSON body with model, input text, and voice" |
|
) |
|
): |
|
try: |
|
model = request.model |
|
input_text = request.input |
|
voice = request.voice |
|
style = request.style |
|
response_format = request.response_format |
|
batch_size = request.batch_size |
|
spliter_threshold = request.spliter_threshold |
|
speed = request.speed |
|
speed = clip(speed, 0.1, 10) |
|
|
|
if not input_text: |
|
raise HTTPException(status_code=400, detail="Input text is required.") |
|
|
|
|
|
text = text_normalize(input_text, is_end=True) |
|
|
|
|
|
params = api_utils.calc_spk_style(spk=voice, style=style) |
|
|
|
spk = params.get("spk", -1) |
|
seed = params.get("seed", 42) |
|
temperature = params.get("temperature", 0.3) |
|
prompt1 = params.get("prompt1", "") |
|
prompt2 = params.get("prompt2", "") |
|
prefix = params.get("prefix", "") |
|
|
|
|
|
sample_rate, audio_data = synthesize_audio( |
|
text, |
|
temperature=temperature, |
|
top_P=0.7, |
|
top_K=20, |
|
spk=spk, |
|
infer_seed=seed, |
|
batch_size=batch_size, |
|
spliter_threshold=spliter_threshold, |
|
prompt1=prompt1, |
|
prompt2=prompt2, |
|
prefix=prefix, |
|
) |
|
|
|
if speed != 1: |
|
audio_data = pyrb.time_stretch(audio_data, sample_rate, speed) |
|
|
|
|
|
buffer = io.BytesIO() |
|
sf.write(buffer, audio_data, sample_rate, format="wav") |
|
buffer.seek(0) |
|
|
|
if response_format == "mp3": |
|
|
|
buffer = api_utils.wav_to_mp3(buffer) |
|
|
|
return StreamingResponse(buffer, media_type="audio/mp3") |
|
|
|
except Exception as e: |
|
import logging |
|
|
|
logging.exception(e) |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
def setup(api_manager: APIManager): |
|
api_manager.post("/v1/openai/audio/speech", response_class=FileResponse)( |
|
openai_speech_api |
|
) |
|
|