Spaces:
Sleeping
Sleeping
import os | |
import gradio as gr | |
import json | |
import asyncio | |
import aiohttp | |
import requests | |
from tqdm.asyncio import tqdm | |
API_BASE_URL = os.environ.get("DUBPRO_API_BASE_URL") | |
def fetch_data_from_db(_id, details="GET_ADVANCE_DETAILS"): | |
API_URL = f"{API_BASE_URL}/video-service/videos/{_id}/get_details" | |
headers = { | |
"Accept": "*/*", | |
"Authorization": os.environ.get("DUBPRO_API_HEADER_KEY") | |
} | |
params = {"type": details} | |
response = requests.get(API_URL, headers=headers, params=params) | |
if response.status_code == 200: | |
data = response.json() | |
return data["data"] | |
else: | |
raise Exception(response.text) | |
async def generate_audio(session, video_id, segment_id, text, progress_bar): | |
payload = {"type": "EDIT_TEXT", "updatedText": text} | |
headers = {"Authorization": os.environ.get("DUBPRO_API_HEADER_KEY")} | |
API_URL = f"{API_BASE_URL}/video-service/videos/{video_id}/segment/{segment_id}" | |
async with session.put(API_URL, headers=headers, json=payload) as response: | |
if response.status != 200: | |
print(segment_id, text, await response.text()) | |
progress_bar.update(1) | |
async def generate_audios(video_id, segments=[]): | |
async with aiohttp.ClientSession() as session: | |
progress_bar = tqdm(total=len(segments), desc="Generating Audios", unit="segment") | |
tasks = [ | |
generate_audio(session, video_id, segment["id"], segment["updatedText"], progress_bar) | |
for segment in segments | |
] | |
await asyncio.gather(*tasks) | |
progress_bar.close() | |
def perform_generation(video_id, gen_choice, progress_bar=gr.Progress(track_tqdm=True)): | |
data = fetch_data_from_db(video_id) | |
segments = data["targetTranscriptData"] | |
if gen_choice == "Generate only non-generated segments": | |
segments = [s for s in segments if s["audioUrl"] is None or "updated" not in s["audioUrl"]] | |
elif gen_choice == "Re-generate generated only segments": | |
segments = [s for s in segments if s["audioUrl"] is not None and "updated" in s["audioUrl"]] | |
asyncio.run(generate_audios(video_id, segments=segments)) | |
return "Done" | |
async def update_tempo(session, video_id, segment_id, tempo): | |
payload = {"type": "EDIT_SPEED", "tempo": tempo} | |
headers = {"Authorization": os.environ.get("DUBPRO_API_HEADER_KEY")} | |
API_URL = f"{API_BASE_URL}/video-service/videos/{video_id}/segment/{segment_id}" | |
async with session.put(API_URL, headers=headers, json=payload) as response: | |
if response.status != 200: | |
print(await response.text()) | |
async def process_segment(session, video_id, segment, progress_bar): | |
WARNING_BOUNDARY = 0.1 | |
duration = round(segment["endTimestamp"] - segment["startTimestamp"], 3) | |
if WARNING_BOUNDARY <= abs(duration - segment["audioDuration"]) <= (WARNING_BOUNDARY + 2.5): | |
if duration - segment["audioDuration"] > 0: | |
# Gap case | |
tempo = round(segment["audioDuration"] / duration, 3) | |
else: | |
tempo = round(segment["audioDuration"] / (duration + 0.1), 3) | |
await update_tempo(session, video_id, segment["id"], tempo) | |
progress_bar.update(1) | |
async def update_tempos(video_id, segments=[]): | |
async with aiohttp.ClientSession() as session: | |
progress_bar = tqdm(total=len(segments), desc="Processing Segments", unit="segment") | |
tasks = [ | |
process_segment(session, video_id, segment, progress_bar) | |
for segment in segments | |
] | |
await asyncio.gather(*tasks) | |
progress_bar.close() | |
def perform_tempo_adjustment(video_id, gen_choice, progress_bar=gr.Progress(track_tqdm=True)): | |
data = fetch_data_from_db(video_id) | |
segments = data["targetTranscriptData"] | |
if gen_choice == "Generate only non-generated segments": | |
segments = [s for s in segments if s["audioUrl"] is None or "updated" not in s["audioUrl"]] | |
elif gen_choice == "Re-generate generated only segments": | |
segments = [s for s in segments if s["audioUrl"] is not None and "updated" in s["audioUrl"]] | |
asyncio.run(update_tempos(video_id, segments=segments)) | |
return "Done" | |
def check_details(video_id): | |
data = fetch_data_from_db(video_id) | |
segments = data["targetTranscriptData"] | |
results = json.dumps(segments[0]) + "\n\n" + json.dumps(segments[-1]) | |
return results, gr.update(visible=True) | |
with gr.Blocks() as demo: | |
gr.Markdown("# Generate Audio") | |
video_id_txt = gr.Text(label="Video ID") | |
gen_choice_radio = gr.Radio(["Generate only non-generated segments", "Re-generate generated only segments", "Generate all the segments"], label="Audio generation style", value="Generate only non-generated segments") | |
check_settings_btn = gr.Button("Check settings") | |
logs_txt = gr.Textbox(label="Logs") | |
generate_btn = gr.Button("Generate audio", visible=False) | |
check_settings_btn.click(check_details, video_id_txt, [logs_txt, generate_btn]) | |
generate_btn.click(perform_generation, [video_id_txt, gen_choice_radio], logs_txt) | |
gr.Markdown("## Update Tempo [SAH]") | |
update_tempo_btn = gr.Button("Adjust tempo") | |
update_tempo_btn.click(perform_tempo_adjustment, [video_id_txt, gen_choice_radio], logs_txt) | |
if __name__=="__main__": | |
demo.queue().launch(auth=(os.environ.get("GRADIO_USERNAME"), os.environ.get("GRADIO_PASSWORD"))) |