|
import requests |
|
import concurrent.futures |
|
import librosa |
|
import numpy as np |
|
import time |
|
import functools |
|
import soundfile as sf |
|
import os |
|
import gradio as gr |
|
import numpy as np |
|
from scipy.io import wavfile |
|
from io import BytesIO |
|
from typing import Tuple |
|
|
|
|
|
API_KEY = (os.environ["UBERDUCK_USER"], os.environ["UBERDUCK_PASS"]) |
|
API_URL = "https://api.uberduck.ai" |
|
|
|
def start_synthesis(text, voice): |
|
url = f"{API_URL}/speak" |
|
data = { |
|
"speech": text, |
|
"voice": voice, |
|
} |
|
response = requests.post(url, auth=API_KEY, json=data) |
|
response.raise_for_status() |
|
return response.json()["uuid"] |
|
|
|
def check_synthesis_status(uuid): |
|
url = f"{API_URL}/speak-status?uuid={uuid}" |
|
response = requests.get(url, auth=API_KEY) |
|
response.raise_for_status() |
|
return response.json() |
|
|
|
def download_synthesis(url): |
|
response = requests.get(url) |
|
response.raise_for_status() |
|
return response.content |
|
|
|
|
|
@functools.lru_cache(maxsize=100) |
|
def download_and_process_speech(text, voice, sr): |
|
uuid = start_synthesis(text, voice) |
|
status = "started" |
|
|
|
while status != "completed": |
|
synthesis_status = check_synthesis_status(uuid) |
|
url = synthesis_status["path"] |
|
if url: |
|
break |
|
time.sleep(1) |
|
|
|
audio_data = download_synthesis(url) |
|
with open(f"{text}.wav", "wb") as f: |
|
f.write(audio_data) |
|
vocal, _ = librosa.load(f"{text}.wav", sr=sr) |
|
return vocal |
|
|
|
|
|
def place_vocals_on_track(instrumental_file, text_list, voice, name='output', offset=8, time_signature=4): |
|
instrumental, sr = librosa.load(instrumental_file) |
|
tempo, beat_frames = librosa.beat.beat_track(y=instrumental, sr=sr) |
|
beat_times = librosa.frames_to_time(beat_frames, sr=sr) |
|
measure_starts = beat_times[::time_signature] |
|
|
|
vocals = [None] * len(text_list) |
|
with concurrent.futures.ThreadPoolExecutor() as executor: |
|
futures = {} |
|
for i, text in enumerate(text_list): |
|
if isinstance(voice, dict): |
|
tvoice, ttext = text.split(':', maxsplit=1) |
|
futures[executor.submit(download_and_process_speech, ttext, voice[tvoice], sr)] = i |
|
else: |
|
futures[executor.submit(download_and_process_speech, text, voice, sr)] = i |
|
for future in concurrent.futures.as_completed(futures.keys()): |
|
vocals[futures[future]] = future.result() |
|
|
|
output = np.zeros_like(instrumental) |
|
output[:len(instrumental)] = instrumental |
|
|
|
for i, vocal in enumerate(vocals): |
|
if i < len(measure_starts): |
|
start_sample = librosa.time_to_samples(measure_starts[i+offset], sr=sr) |
|
end_sample = start_sample + len(vocal) |
|
output[start_sample:end_sample] += vocal[:end_sample - start_sample] |
|
|
|
if name is not None: |
|
sf.write(name+'.wav', output, sr, 'PCM_24') |
|
return sr, output |
|
|
|
def solve(text, beat, offset, time_signature): |
|
text = text.replace(",", "").splitlines() |
|
text = [l for l in text if l.strip() and not l.startswith("(") and not l.startswith('[')] |
|
sr, output = place_vocals_on_track(beat, text, "snoop-dogg", name=None, offset=offset, time_signature=time_signature) |
|
return sr, output |
|
|
|
def process_and_play(text: str, file: gr.inputs.Audio, offset, time_signature) -> Tuple[str, gr.outputs.Audio]: |
|
output = BytesIO() |
|
wavfile.write(output, *file) |
|
output.seek(0) |
|
sr, output_wav = solve(text, output, int(offset), int(time_signature)) |
|
output = BytesIO() |
|
wavfile.write(output, sr, output_wav) |
|
output.seek(0) |
|
return (sr, output_wav), |
|
|
|
|
|
inputs = [ |
|
gr.inputs.Textbox(label="Input Text"), |
|
gr.inputs.Audio(label="Input Audio"), |
|
gr.inputs.Number(label="Offset", default=2), |
|
gr.inputs.Number(label="Time Signature", default=8), |
|
] |
|
|
|
outputs = [ |
|
gr.outputs.Audio(label="Processed Audio", type='numpy') |
|
] |
|
|
|
iface = gr.Interface(fn=process_and_play, inputs=inputs, outputs=outputs, title="Text and File Processor") |
|
iface.launch() |