Mahiruoshi's picture
Update server.py
f4cadb2
raw
history blame
8.1 kB
import argparse
import os
from pathlib import Path
import logging
import re_matching
import uuid
from flask import Flask, request, jsonify, render_template_string
from flask_cors import CORS
logging.getLogger("numba").setLevel(logging.WARNING)
logging.getLogger("markdown_it").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("matplotlib").setLevel(logging.WARNING)
logging.basicConfig(
level=logging.INFO, format="| %(name)s | %(levelname)s | %(message)s"
)
logger = logging.getLogger(__name__)
import librosa
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm
import utils
from config import config
import requests
import torch
import commons
from text import cleaned_text_to_sequence, get_bert
from clap_wrapper import get_clap_audio_feature, get_clap_text_feature
from text.cleaner import clean_text
import utils
from models import SynthesizerTrn
from text.symbols import symbols
import sys
from scipy.io.wavfile import write
net_g = None
device = (
"cuda:0"
if torch.cuda.is_available()
else (
"mps"
if sys.platform == "darwin" and torch.backends.mps.is_available()
else "cpu"
)
)
#device = 'cpu'
def get_net_g(model_path: str, device: str, hps):
net_g = SynthesizerTrn(
len(symbols),
hps.data.filter_length // 2 + 1,
hps.train.segment_size // hps.data.hop_length,
n_speakers=hps.data.n_speakers,
**hps.model,
).to(device)
_ = net_g.eval()
_ = utils.load_checkpoint(model_path, net_g, None, skip_optimizer=True)
return net_g
def get_text(text, language_str, hps, device):
norm_text, phone, tone, word2ph = clean_text(text, language_str)
phone, tone, language = cleaned_text_to_sequence(phone, tone, language_str)
#print(text)
if hps.data.add_blank:
phone = commons.intersperse(phone, 0)
tone = commons.intersperse(tone, 0)
language = commons.intersperse(language, 0)
for i in range(len(word2ph)):
word2ph[i] = word2ph[i] * 2
word2ph[0] += 1
bert_ori = get_bert(norm_text, word2ph, language_str, device)
del word2ph
assert bert_ori.shape[-1] == len(phone), phone
if language_str == "ZH":
bert = bert_ori
ja_bert = torch.zeros(1024, len(phone))
en_bert = torch.zeros(1024, len(phone))
elif language_str == "JP":
bert = torch.zeros(1024, len(phone))
ja_bert = bert_ori
en_bert = torch.zeros(1024, len(phone))
else:
raise ValueError("language_str should be ZH, JP or EN")
assert bert.shape[-1] == len(
phone
), f"Bert seq len {bert.shape[-1]} != {len(phone)}"
phone = torch.LongTensor(phone)
tone = torch.LongTensor(tone)
language = torch.LongTensor(language)
return bert, ja_bert, en_bert, phone, tone, language
def infer(
text,
sdp_ratio,
noise_scale,
noise_scale_w,
length_scale,
sid,
reference_audio=None,
emotion='Happy',
):
language= 'JP' if is_japanese(text) else 'ZH'
if isinstance(reference_audio, np.ndarray):
emo = get_clap_audio_feature(reference_audio, device)
else:
emo = get_clap_text_feature(emotion, device)
emo = torch.squeeze(emo, dim=1)
bert, ja_bert, en_bert, phones, tones, lang_ids = get_text(
text, language, hps, device
)
with torch.no_grad():
x_tst = phones.to(device).unsqueeze(0)
tones = tones.to(device).unsqueeze(0)
lang_ids = lang_ids.to(device).unsqueeze(0)
bert = bert.to(device).unsqueeze(0)
ja_bert = ja_bert.to(device).unsqueeze(0)
en_bert = en_bert.to(device).unsqueeze(0)
x_tst_lengths = torch.LongTensor([phones.size(0)]).to(device)
emo = emo.to(device).unsqueeze(0)
del phones
speakers = torch.LongTensor([hps.data.spk2id[sid]]).to(device)
audio = (
net_g.infer(
x_tst,
x_tst_lengths,
speakers,
tones,
lang_ids,
bert,
ja_bert,
en_bert,
emo,
sdp_ratio=sdp_ratio,
noise_scale=noise_scale,
noise_scale_w=noise_scale_w,
length_scale=length_scale,
)[0][0, 0]
.data.cpu()
.float()
.numpy()
)
del x_tst, tones, lang_ids, bert, x_tst_lengths, speakers, ja_bert, en_bert, emo
if torch.cuda.is_available():
torch.cuda.empty_cache()
unique_filename = f"temp{uuid.uuid4()}.wav"
write(unique_filename, 44100, audio)
return unique_filename
def is_japanese(string):
for ch in string:
if ord(ch) > 0x3040 and ord(ch) < 0x30FF:
return True
return False
def loadmodel(model):
try:
_ = net_g.eval()
_ = utils.load_checkpoint(model, net_g, None, skip_optimizer=True)
return "success"
except:
return "error"
def send_audio_to_server(audio_path,text):
url="http://127.0.0.1:3000/response"
files = {'file': open(audio_path, 'rb')}
data = {'text': text}
try:
response = requests.post(url, files=files,data=data)
return response.status_code, response.text
except Exception as e:
return 500, str(e)
app = Flask(__name__)
CORS(app)
@app.route('/')
def tts():
global last_text, last_model
speaker = request.args.get('speaker')
sdp_ratio = float(request.args.get('sdp_ratio', 0.2))
noise_scale = float(request.args.get('noise_scale', 0.6))
noise_scale_w = float(request.args.get('noise_scale_w', 0.8))
length_scale = float(request.args.get('length_scale', 1))
emotion = request.args.get('emotion', 'happy')
text = request.args.get('text')
is_chat = request.args.get('is_chat', 'false').lower() == 'true'
model = request.args.get('model',modelPaths[-1])
if not speaker or not text:
return render_template_string("""
<!DOCTYPE html>
<html>
<head>
<title>TTS API Documentation</title>
</head>
<body>
<iframe src="http://love.soyorin.top" style="width:100%; height:100vh; border:none;"></iframe>
</body>
</html>
""")
if model != last_model:
unique_filename = loadmodel(model)
last_model = model
if is_chat and text == last_text:
# Generate 1 second of silence and return
unique_filename = 'blank.wav'
silence = np.zeros(44100, dtype=np.int16)
write(unique_filename , 44100, silence)
else:
last_text = text
unique_filename = infer(text, sdp_ratio=sdp_ratio, noise_scale=noise_scale, noise_scale_w=noise_scale_w, length_scale=length_scale,sid = speaker, reference_audio=None, emotion=emotion)
status_code, response_text = send_audio_to_server(unique_filename,text)
print(f"Response from server: {response_text} (Status code: {status_code})")
with open(unique_filename ,'rb') as bit:
wav_bytes = bit.read()
os.remove(unique_filename)
headers = {
'Content-Type': 'audio/wav',
'Text': unique_filename .encode('utf-8')}
return wav_bytes, 200, headers
if __name__ == "__main__":
languages = [ "Auto", "ZH", "JP"]
modelPaths = []
for dirpath, dirnames, filenames in os.walk("Data/BangDreamV22/models/"):
for filename in filenames:
modelPaths.append(os.path.join(dirpath, filename))
hps = utils.get_hparams_from_file('Data/BangDreamV22/configs/config.json')
net_g = get_net_g(
model_path=modelPaths[-1], device=device, hps=hps
)
speaker_ids = hps.data.spk2id
speakers = list(speaker_ids.keys())
last_text = ""
last_model = modelPaths[-1]
app.run(host="0.0.0.0", port=5000)