File size: 4,194 Bytes
4d47e65
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
da299f7
4d47e65
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
94d75fa
4d47e65
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import requests
import concurrent.futures
import librosa
import numpy as np
import time
import functools
import soundfile as sf
import os
import gradio as gr
import numpy as np
from scipy.io import wavfile
from io import BytesIO
from typing import Tuple


API_KEY = (os.environ["UBERDUCK_USER"], os.environ["UBERDUCK_PASS"])
API_URL = "https://api.uberduck.ai"

def start_synthesis(text, voice):
    url = f"{API_URL}/speak"
    data = {
        "speech": text,
        "voice": voice,
    }
    response = requests.post(url, auth=API_KEY, json=data)
    response.raise_for_status()
    return response.json()["uuid"]

def check_synthesis_status(uuid):
    url = f"{API_URL}/speak-status?uuid={uuid}"
    response = requests.get(url, auth=API_KEY)
    response.raise_for_status()
    return response.json()

def download_synthesis(url):
    response = requests.get(url)
    response.raise_for_status()
    return response.content


@functools.lru_cache(maxsize=100)
def download_and_process_speech(text, voice, sr):
    uuid = start_synthesis(text, voice)
    status = "started"

    while status != "completed":
        synthesis_status = check_synthesis_status(uuid)
        url = synthesis_status["path"]
        if url:
            break
        time.sleep(1)

    audio_data = download_synthesis(url)
    with open(f"{text}.wav", "wb") as f:
        f.write(audio_data)
    vocal, _ = librosa.load(f"{text}.wav", sr=sr)
    return vocal


def place_vocals_on_track(instrumental_file, text_list, voice, name='output', offset=8, time_signature=4):
    instrumental, sr = librosa.load(instrumental_file)
    tempo, beat_frames = librosa.beat.beat_track(y=instrumental, sr=sr)
    beat_times = librosa.frames_to_time(beat_frames, sr=sr)
    measure_starts = beat_times[::time_signature]

    vocals = [None] * len(text_list)
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = {}
        for i, text in enumerate(text_list):
            if isinstance(voice, dict):
                tvoice, ttext = text.split(':', maxsplit=1)
                futures[executor.submit(download_and_process_speech, ttext, voice[tvoice], sr)] = i
            else:
                futures[executor.submit(download_and_process_speech, text, voice, sr)] = i
        for future in concurrent.futures.as_completed(futures.keys()):
            vocals[futures[future]] = future.result()

    output = np.zeros_like(instrumental)
    output[:len(instrumental)] = instrumental

    for i, vocal in enumerate(vocals):
        if i < len(measure_starts):
            start_sample = librosa.time_to_samples(measure_starts[i+offset], sr=sr)
            end_sample = start_sample + len(vocal)
            output[start_sample:end_sample] += vocal[:end_sample - start_sample]

    if name is not None:
        sf.write(name+'.wav', output, sr, 'PCM_24')
    return sr, output

def solve(text, beat, offset, time_signature):
    text = text.replace(",", "").splitlines()
    text = [l for l in text if l.strip() and not l.startswith("(") and not l.startswith('[')]
    sr, output = place_vocals_on_track(beat, text, "snoop-dogg", name=None, offset=offset, time_signature=time_signature)
    return sr, output

def process_and_play(text: str, file: gr.inputs.Audio, offset, time_signature) -> Tuple[str, gr.outputs.Audio]:
    output = BytesIO()
    wavfile.write(output, *file)
    output.seek(0)  # Reset the file pointer to the beginning of the buffer
    sr, output_wav = solve(text, output, int(offset), int(time_signature))  # Call the solve() function
    output = BytesIO()
    wavfile.write(output, sr, output_wav)
    output.seek(0)  # Reset the file pointer to the beginning of the buffer
    return (sr, output_wav),


inputs = [
    gr.inputs.Textbox(label="Input Text"),
    gr.inputs.Audio(label="Input Audio"),
    gr.inputs.Number(label="Offset", default=2),  # Added input for the "offset"
    gr.inputs.Number(label="Time Signature", default=8),  # Added input for the "time signature"
]

outputs = [
    gr.outputs.Audio(label="Processed Audio", type='numpy')
]

iface = gr.Interface(fn=process_and_play, inputs=inputs, outputs=outputs, title="Text and File Processor")
iface.launch()