Pipertts / app.py
Gregniuki's picture
Update app.py
482ae7f
enhanced_accessibility = False #@param {type:"boolean"}
#@markdown ---
#@markdown #### Please select your language:
#lang_select = "English" #@param ["English", "Spanish"]
#if lang_select == "English":
# lang = "en"
#elif lang_select == "Spanish":
# lang = "es"
#else:
# raise Exception("Language not supported.")
#@markdown ---
use_gpu = False #@param {type:"boolean"}
from fastapi import FastAPI, Request, Form
from fastapi.responses import HTMLResponse
from fastapi.responses import FileResponse
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
# ...
# Mount a directory to serve static files (e.g., CSS and JavaScript)
import logging
app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
files = {}
# Configure logging
logging.basicConfig(level=logging.DEBUG)
# Mock data for your interface
data = {
"speaker_options": ["en","en-us","en-029","en-gb-x-gbclan","en-gb-x-rp","en-gb-scotland","en-gb-gbcwmd", "es", "de", "pl","ar","be","bn","bpy","bs","bg","ca","yue","hak","haw","cmn","hr","cs","da","nl","eo","et","fa","fa-latn","fi","fr-be","fr","ga","gd","ka","grc","el","kl","gn","gu","ht","he","hi","hu","id","io","it","ja","kn","kok","ko","ku","kk","ky","la","lb","ltg","lv","lfn","lt","jbo","mi","mk","ms","ml","mt","mr","nci","ne","nb","nog","or","om","pap","pt-br","pt","ro","ru","ru-lv","uk","sjn","sr","tn","sd","shn","si","sk","sl","es","es-419","sw","sv","ta","th","tk","tt","te","tr","ug","ur","uz","vi-vn-x-central","vi","vi0vn-x-south"],
"default_speaker": "en",
}
# Define a dictionary to store model configurations
model_configurations = {}
# Define global variables
onnx_models = [] # A list to store model names
onnx_configs = []
speaker_id_map = {
"speaker1": "Speaker 1 Name",
"speaker2": "Speaker 2 Name",
# Add more speaker IDs and names as needed
}
import logging
import math
import sys
from pathlib import Path
from enum import Enum
from typing import Iterable, List, Optional, Union
import numpy as np
import onnxruntime
import glob
#import ipywidgets as widgets
from pydub import AudioSegment
import tempfile
import uuid
import soundfile as sf
#from IPython.display import display, Audio, Markdown, clear_output
from piper_phonemize import phonemize_codepoints, phonemize_espeak, tashkeel_run
@app.get("/", response_class=HTMLResponse)
async def read_root(request: Request):
# You should populate data and model_configurations here
# Make sure speaker_id_map is defined and populated correctly
# data = {"your_data_key": "your_data_value"} # Replace with your data
# model_configurations = {} # Replace with your model configurations
# Ensure that speaker_id_map is included in the context
return templates.TemplateResponse("interface.html", {"request": request, "data": data, "model_names": onnx_models, "speaker_id_map": speaker_id_map})
import json
_LOGGER = logging.getLogger("piper_train.infer_onnx")
import os
read_key = os.environ.get('HF_TOKEN', None)
#if not os.path.exists("./content/piper/src/python/lng"):
# import subprocess
# command = "cp -r ./content/piper/notebooks/lng ./content/piper/src/python/lng"
# subprocess.run(command, shell=True)
import sys
#sys.path.append('/content/piper/notebooks')
sys.path.append('./content/piper/src/python')
import configparser
class Translator:
def __init__(self):
self.configs = {}
def load_language(self, language_name):
if language_name not in self.configs:
config = configparser.ConfigParser()
config.read(os.path.join(os.getcwd(), "lng", f"{language_name}.lang"))
self.configs[language_name] = config
def translate(self, language_name, string):
if language_name == "en":
return string
elif language_name not in self.configs:
self.load_language(language_name)
config = self.configs[language_name]
try:
return config.get("Strings", string)
except (configparser.NoOptionError, configparser.NoSectionError):
if string:
return string
else:
raise Exception("language engine error: This translation is corrupt!")
return 0
#from translator import *
lan = Translator()
def detect_onnx_models(path):
onnx_models = glob.glob(path + '/*.onnx')
onnx_configs = glob.glob(path + '/*.json')
if len(onnx_models) > 1:
return onnx_models, onnx_configs # Return both lists as a tuple
elif len(onnx_models) == 1:
return onnx_models[0], onnx_configs[0]
else:
return None
# Define a dependency function to get the selected_model and selected_speaker_id on startup
#def get_initial_values():
# You can set default values or load them from a configuration file here
# selected_model = onnx_models[0] if onnx_models else "default_model"
# selected_speaker_id = 0 # Default value
# Check if there are onnx models and load the speaker_id_map from the first model's config
# if onnx_models:
# first_model_config = model_configurations.get(onnx_models[0])
# if first_model_config:
# speaker_id_map = first_model_config.get("speaker_id_map")
# if speaker_id_map:
# selected_speaker_id = next(iter(speaker_id_map)) # Get the first speaker_id
# else:
# selected_speaker_id = 0
# return selected_model, selected_speaker_id
@app.get("/get_speaker_id_map")
async def get_speaker_id_map(selected_model: str):
config = model_configurations.get(selected_model + ".json")
if config:
speaker_id_map = config.get("speaker_id_map", {})
if not speaker_id_map:
# Assign a default value to speaker_id_map if it's empty
speaker_id_map = {"speaker1": "0"}
return {"speaker_id_map": speaker_id_map}
# Handle the case where the config is not available for the selected model
return {"speaker_id_map": {}}
@app.on_event("startup")
async def load_model_data():
global config_names, onnx_models, model_configurations, models_path # Make onnx_models, model_configurations, and models_path available globally
# Load data for all models in the directory upon startup
sys.path.append('./content/piper/src/python')
models_path = "./content/piper/src/python"
logging.basicConfig(level=logging.DEBUG)
# Collect data for all models in the directory and populate model_configurations
model_names, config_names = detect_onnx_models(models_path)
onnx_models = model_names # Populate onnx_models here
for config_name in config_names:
# Load the configuration data for each model (including speaker_id_map)
config = load_model_configuration(models_path, config_name) # Pass config_name, not models_path
if config:
model_configurations[config_name] = config
def load_model_configuration(models_path, config_name):
# Assuming config_name is the name of the JSON configuration file, e.g., 'model.json'
config_file_path = os.path.join("", config_name)
try:
with open(config_file_path, 'r') as config_file:
config_data = json.load(config_file)
return config_data
except FileNotFoundError:
# Handle the case where the configuration file does not exist
return None
except IsADirectoryError:
# Handle the case where config_name is a directory (not a file)
return None
# Define a dependency function to get the selected_model and selected_speaker_id on startup
#def get_initial_values() -> Tuple[str, str]:
# You can set default values or load them from a configuration file here
# selected_model = onnx_models[0] if onnx_models else "default_model"
# selected_speaker_id = "default_speaker_id" # Default value
# Check if there are onnx models and load the speaker_id_map from the first model's config
# if onnx_models:
# first_model_config = model_configurations.get(onnx_models[0])
# if first_model_config:
# speaker_id_map = first_model_config.get("speaker_id_map")
# if speaker_id_map:
# selected_speaker_id = next(iter(speaker_id_map)) # Get the first speaker_id
#return selected_model, selected_speaker_id
@app.post("/", response_class=HTMLResponse)
async def main(
request: Request,
text_input: str = Form(default="1, 2, 3. This is a test. Enter some text to generate."),
selected_model: str = Form(...), # Selected model
selected_speaker_id: str = Form(...), # Selected speaker ID
speaker: str = Form(...),
speed_slider: float = Form(...),
noise_scale_slider: float = Form(...),
noise_scale_w_slider: float = Form(...),
play: bool = Form(True),
# initial_values: Tuple[str, str] = Depends(get_initial_values) # Use the dependency here
):
# ... (previous code)
if selected_model in onnx_models:
# model_name = selected_model
# onnx_model = selected_model # Replace with the actual key for your ONNX model file
# providers = [("CUDAExecutionProvider", {"cudnn_conv_use_max_workspace": '1'})]
providers = ["CPUExecutionProvider"
if use_gpu is False
else ("CUDAExecutionProvider", {"cudnn_conv_algo_search": "DEFAULT"})
]
print(onnxruntime.get_device())
sess_options = onnxruntime.SessionOptions()
model, config = load_onnx(selected_model, sess_options, providers)
config["espeak"]["voice"] = speaker
# speaker_id_map = config.get("speaker_id_map", {})
print(text_input)
print(speaker)
auto_play = play
audio = inferencing(model, config, selected_speaker_id, text_input, speed_slider, noise_scale_slider, noise_scale_w_slider, auto_play)
temp_dir = tempfile.mkdtemp()
renamed_audio_file = os.path.join(temp_dir, "download.mp3")
audio.export(renamed_audio_file, format="mp3")
# Generate a unique file ID
file_id = str(uuid.uuid4())
# Store the file path with the generated file ID
files[file_id] = renamed_audio_file
# Create a URL to download the file
file_url = f'/download?fileId={file_id}'
# Restore the form and return the response
response_html = """
<script>
document.getElementById("loading-message").innerText = "Audio generated successfully!";
document.getElementById("synthesize_button").disabled = false;
</script>
"""
else:
# The selected_model is not found in the list; handle this case as needed
# You can show an error message or handle it differently
response_html = """
<div id="error-message">Selected model not found.</div>
<script>
document.getElementById("synthesize_button").disabled = true;
</script>
"""
# Pass the necessary data to the HTML template, including speaker_id_map
return templates.TemplateResponse("interface.html", {
"request": request,
"file_url": file_url,
"text_input": text_input,
"data": data,
"selected_model": selected_model,
"model_names": onnx_models,
"selected_model": selected_model,
"selected_speaker_id": selected_speaker_id,
"speaker_id_map": speaker_id_map, # Make sure speaker_id_map is included here
"dynamic_content": response_html
})
@app.get("/download")
async def download_file(fileId: str):
# Retrieve the file path from the dictionary using the file ID
filepath = files.get(fileId)
if filepath:
# Create a FileResponse to serve the file for download
return FileResponse(filepath, headers={"Content-Disposition": "attachment"})
else:
return {"error": "File not found"}
def load_onnx(model, sess_options, providers):
_LOGGER.debug("Loading model from %s", model)
config = load_config(model)
model = onnxruntime.InferenceSession(
str(model),
sess_options=sess_options,
providers= providers
)
_LOGGER.info("Loaded model from %s", model)
return model, config
def load_config(model):
with open(f"{model}.json", "r") as file:
config = json.load(file)
return config
PAD = "_" # padding (0)
BOS = "^" # beginning of sentence
EOS = "$" # end of sentence
class PhonemeType(str, Enum):
ESPEAK = "espeak"
TEXT = "text"
def phonemize(config, text: str) -> List[List[str]]:
"""Text to phonemes grouped by sentence."""
if config["phoneme_type"] == PhonemeType.ESPEAK:
if config["espeak"]["voice"] == "ar":
# Arabic diacritization
# https://github.com/mush42/libtashkeel/
text = tashkeel_run(text)
return phonemize_espeak(text, config["espeak"]["voice"])
if config["phoneme_type"] == PhonemeType.TEXT:
return phonemize_codepoints(text)
raise ValueError(f'Unexpected phoneme type: {config["phoneme_type"]}')
def phonemes_to_ids(config, phonemes: List[str]) -> List[int]:
"""Phonemes to ids."""
id_map = config["phoneme_id_map"]
ids: List[int] = list(id_map[BOS])
for phoneme in phonemes:
if phoneme not in id_map:
print("Missing phoneme from id map: %s", phoneme)
continue
ids.extend(id_map[phoneme])
ids.extend(id_map[PAD])
ids.extend(id_map[EOS])
return ids
def audio_float_to_int16(
audio: np.ndarray, max_wav_value: float = 32767.0
) -> np.ndarray:
"""Normalize audio and convert to int16 range"""
audio_norm = audio * (max_wav_value / max(0.01, np.max(np.abs(audio))))
audio_norm = np.clip(audio_norm, -max_wav_value, max_wav_value)
audio_norm = audio_norm.astype("int16")
return audio_norm
def inferencing(model, config, sid, line, length_scale, noise_scale, noise_scale_w, auto_play=True):
audios = []
# Check if 'phoneme_type' exists in the config dictionary
phoneme_type = config.get("phoneme_type", PhonemeType.ESPEAK.value)
# Fix applied here
if phoneme_type == PhonemeType.ESPEAK.value:
config["phoneme_type"] = "espeak"
text = phonemize(config, line)
for phonemes in text:
phoneme_ids = phonemes_to_ids(config, phonemes)
num_speakers = config["num_speakers"]
if num_speakers == 1:
speaker_id = None # for now
else:
speaker_id = sid
text = np.expand_dims(np.array(phoneme_ids, dtype=np.int64), 0)
text_lengths = np.array([text.shape[1]], dtype=np.int64)
scales = np.array(
[noise_scale, length_scale, noise_scale_w],
dtype=np.float32,
)
sid = None
if speaker_id is not None:
sid = np.asarray([int(speaker_id)], dtype=np.int64) # Convert to 1D array
audio = model.run(
None,
{
"input": text,
"input_lengths": text_lengths,
"scales": scales,
"sid": sid,
},
)[0].squeeze((0, 1))
audio = audio_float_to_int16(audio.squeeze())
audios.append(audio)
merged_audio = np.concatenate(audios)
sample_rate = config["audio"]["sample_rate"]
temp_audio_path = os.path.join(tempfile.gettempdir(), "generated_audio.wav")
sf.write(temp_audio_path, merged_audio, config["audio"]["sample_rate"])
audio = AudioSegment.from_mp3(temp_audio_path)
os.remove(temp_audio_path)
return audio
def denoise(
audio: np.ndarray, bias_spec: np.ndarray, denoiser_strength: float
) -> np.ndarray:
audio_spec, audio_angles = transform(audio)
a = bias_spec.shape[-1]
b = audio_spec.shape[-1]
repeats = max(1, math.ceil(b / a))
bias_spec_repeat = np.repeat(bias_spec, repeats, axis=-1)[..., :b]
audio_spec_denoised = audio_spec - (bias_spec_repeat * denoiser_strength)
audio_spec_denoised = np.clip(audio_spec_denoised, a_min=0.0, a_max=None)
audio_denoised = inverse(audio_spec_denoised, audio_angles)
return audio_denoised
def stft(x, fft_size, hopsamp):
"""Compute and return the STFT of the supplied time domain signal x.
Args:
x (1-dim Numpy array): A time domain signal.
fft_size (int): FFT size. Should be a power of 2, otherwise DFT will be used.
hopsamp (int):
Returns:
The STFT. The rows are the time slices and columns are the frequency bins.
"""
window = np.hanning(fft_size)
fft_size = int(fft_size)
hopsamp = int(hopsamp)
return np.array(
[
np.fft.rfft(window * x[i : i + fft_size])
for i in range(0, len(x) - fft_size, hopsamp)
]
)
def istft(X, fft_size, hopsamp):
"""Invert a STFT into a time domain signal.
Args:
X (2-dim Numpy array): Input spectrogram. The rows are the time slices and columns are the frequency bins.
fft_size (int):
hopsamp (int): The hop size, in samples.
Returns:
The inverse STFT.
"""
fft_size = int(fft_size)
hopsamp = int(hopsamp)
window = np.hanning(fft_size)
time_slices = X.shape[0]
len_samples = int(time_slices * hopsamp + fft_size)
x = np.zeros(len_samples)
for n, i in enumerate(range(0, len(x) - fft_size, hopsamp)):
x[i : i + fft_size] += window * np.real(np.fft.irfft(X[n]))
return x
def inverse(magnitude, phase):
recombine_magnitude_phase = np.concatenate(
[magnitude * np.cos(phase), magnitude * np.sin(phase)], axis=1
)
x_org = recombine_magnitude_phase
n_b, n_f, n_t = x_org.shape # pylint: disable=unpacking-non-sequence
x = np.empty([n_b, n_f // 2, n_t], dtype=np.complex64)
x.real = x_org[:, : n_f // 2]
x.imag = x_org[:, n_f // 2 :]
inverse_transform = []
for y in x:
y_ = istft(y.T, fft_size=1024, hopsamp=256)
inverse_transform.append(y_[None, :])
inverse_transform = np.concatenate(inverse_transform, 0)
return inverse_transform
def transform(input_data):
x = input_data
real_part = []
imag_part = []
for y in x:
y_ = stft(y, fft_size=1024, hopsamp=256).T
real_part.append(y_.real[None, :, :]) # pylint: disable=unsubscriptable-object
imag_part.append(y_.imag[None, :, :]) # pylint: disable=unsubscriptable-object
real_part = np.concatenate(real_part, 0)
imag_part = np.concatenate(imag_part, 0)
magnitude = np.sqrt(real_part**2 + imag_part**2)
phase = np.arctan2(imag_part.data, real_part.data)
return magnitude, phase
#@app.get("/")
#async def read_root(request: Request):
# return templates.TemplateResponse("interface.html", {"request": request})
if __name__ == "__main__":
# main()
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)
# main()
# pass
# app()
# Create an instance of the FastAPI class
#app = main()
# Define a route for the root endpoint
#def read_root():
# return {"message": "Hello, World!"}