File size: 4,191 Bytes
4d47e65 da299f7 4d47e65 94d75fa 4d47e65 2699d30 4d47e65 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
import requests
import concurrent.futures
import librosa
import numpy as np
import time
import functools
import soundfile as sf
import os
import gradio as gr
import numpy as np
from scipy.io import wavfile
from io import BytesIO
from typing import Tuple
API_KEY = (os.environ["UBERDUCK_USER"], os.environ["UBERDUCK_PASS"])
API_URL = "https://api.uberduck.ai"
def start_synthesis(text, voice):
url = f"{API_URL}/speak"
data = {
"speech": text,
"voice": voice,
}
response = requests.post(url, auth=API_KEY, json=data)
response.raise_for_status()
return response.json()["uuid"]
def check_synthesis_status(uuid):
url = f"{API_URL}/speak-status?uuid={uuid}"
response = requests.get(url, auth=API_KEY)
response.raise_for_status()
return response.json()
def download_synthesis(url):
response = requests.get(url)
response.raise_for_status()
return response.content
@functools.lru_cache(maxsize=100)
def download_and_process_speech(text, voice, sr):
uuid = start_synthesis(text, voice)
status = "started"
while status != "completed":
synthesis_status = check_synthesis_status(uuid)
url = synthesis_status["path"]
if url:
break
time.sleep(1)
audio_data = download_synthesis(url)
with open(f"{text}.wav", "wb") as f:
f.write(audio_data)
vocal, _ = librosa.load(f"{text}.wav", sr=sr)
return vocal
def place_vocals_on_track(instrumental_file, text_list, voice, name='output', offset=8, time_signature=4):
instrumental, sr = librosa.load(instrumental_file)
tempo, beat_frames = librosa.beat.beat_track(y=instrumental, sr=sr)
beat_times = librosa.frames_to_time(beat_frames, sr=sr)
measure_starts = beat_times[::time_signature]
vocals = [None] * len(text_list)
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {}
for i, text in enumerate(text_list):
if isinstance(voice, dict):
tvoice, ttext = text.split(':', maxsplit=1)
futures[executor.submit(download_and_process_speech, ttext, voice[tvoice], sr)] = i
else:
futures[executor.submit(download_and_process_speech, text, voice, sr)] = i
for future in concurrent.futures.as_completed(futures.keys()):
vocals[futures[future]] = future.result()
output = np.zeros_like(instrumental)
output[:len(instrumental)] = instrumental
for i, vocal in enumerate(vocals):
if i < len(measure_starts):
start_sample = librosa.time_to_samples(measure_starts[i+offset], sr=sr)
end_sample = start_sample + len(vocal)
output[start_sample:end_sample] += vocal[:end_sample - start_sample]
if name is not None:
sf.write(name+'.wav', output, sr, 'PCM_24')
return sr, output
def solve(text, beat, offset, time_signature):
text = text.replace(",", "").splitlines()
text = [l for l in text if l.strip() and not l.startswith("(") and not l.startswith('[')]
sr, output = place_vocals_on_track(beat, text, "snoop-dogg", name=None, offset=offset, time_signature=time_signature)
return sr, output
def process_and_play(text: str, file: gr.inputs.Audio, offset, time_signature) -> Tuple[str, gr.outputs.Audio]:
output = BytesIO()
wavfile.write(output, *file)
output.seek(0) # Reset the file pointer to the beginning of the buffer
sr, output_wav = solve(text, output, int(offset), int(time_signature)) # Call the solve() function
output = BytesIO()
wavfile.write(output, sr, output_wav)
output.seek(0) # Reset the file pointer to the beginning of the buffer
return sr, output_wav
inputs = [
gr.inputs.Textbox(label="Input Text"),
gr.inputs.Audio(label="Input Audio"),
gr.inputs.Number(label="Offset", default=2), # Added input for the "offset"
gr.inputs.Number(label="Time Signature", default=8), # Added input for the "time signature"
]
outputs = [
gr.outputs.Audio(label="Processed Audio", type='numpy')
]
iface = gr.Interface(fn=process_and_play, inputs=inputs, outputs=outputs, title="Text and File Processor")
iface.launch() |