Spaces:
Runtime error
Runtime error
import gradio as gr | |
import json | |
import ffmpeg | |
import os | |
from pathlib import Path | |
import time | |
from transformers import pipeline | |
import torch | |
# checkpoint = "openai/whisper-tiny" | |
# checkpoint = "openai/whisper-base" | |
checkpoint = "openai/whisper-small" | |
if torch.cuda.is_available() and torch.cuda.device_count() > 0: | |
from transformers import ( | |
AutomaticSpeechRecognitionPipeline, | |
WhisperForConditionalGeneration, | |
WhisperProcessor, | |
) | |
model = WhisperForConditionalGeneration.from_pretrained( | |
checkpoint).to("cuda").half() | |
processor = WhisperProcessor.from_pretrained(checkpoint) | |
pipe = AutomaticSpeechRecognitionPipeline( | |
model=model, | |
tokenizer=processor.tokenizer, | |
feature_extractor=processor.feature_extractor, | |
batch_size=8, | |
torch_dtype=torch.float16, | |
device="cuda:0" | |
) | |
else: | |
pipe = pipeline(model=checkpoint) | |
# TODO: no longer need to set these manually once the models have been updated on the Hub | |
# whisper-tiny | |
# pipe.model.generation_config.alignment_heads = [[2, 2], [3, 0], [3, 2], [3, 3], [3, 4], [3, 5]] | |
# whisper-base | |
# pipe.model.generation_config.alignment_heads = [[3, 1], [4, 2], [4, 3], [4, 7], [5, 1], [5, 2], [5, 4], [5, 6]] | |
# whisper-small | |
pipe.model.generation_config.alignment_heads = [[5, 3], [5, 9], [ | |
8, 0], [8, 4], [8, 7], [8, 8], [9, 0], [9, 7], [9, 9], [10, 5]] | |
videos_out_path = Path("./videos_out") | |
videos_out_path.mkdir(parents=True, exist_ok=True) | |
samples_data = sorted(Path('examples').glob('*.json')) | |
SAMPLES = [] | |
for file in samples_data: | |
with open(file) as f: | |
sample = json.load(f) | |
SAMPLES.append(sample) | |
VIDEOS = list(map(lambda x: [x['video']], SAMPLES)) | |
async def speech_to_text(video_in): | |
""" | |
Takes a video path to convert to audio, transcribe audio channel to text and char timestamps | |
Using https://huggingface.co/tasks/automatic-speech-recognition pipeline | |
""" | |
video_in = video_in[0] if isinstance(video_in, list) else video_in | |
if (video_in == None): | |
raise ValueError("Video input undefined") | |
video_path = Path(video_in.name) | |
try: | |
# convert video to audio 16k using PIPE to audio_memory | |
audio_memory, _ = ffmpeg.input(video_path).output( | |
'-', format="wav", ac=1, ar=pipe.feature_extractor.sampling_rate).overwrite_output().global_args('-loglevel', 'quiet').run(capture_stdout=True) | |
except Exception as e: | |
raise RuntimeError("Error converting video to audio") | |
try: | |
print(f'Transcribing via local model') | |
output = pipe(audio_memory, chunk_length_s=10, | |
stride_length_s=[4, 2], return_timestamps="word") | |
transcription = output["text"] | |
chunks = output["chunks"] | |
timestamps_var = [{"word": chunk["text"], "timestamp":( | |
chunk["timestamp"][0], chunk["timestamp"][1]), "state": True} for chunk in chunks] | |
words = [(word['word'], '+' if word['state'] else '-') | |
for word in timestamps_var] | |
return (words, timestamps_var, video_in.name) | |
except Exception as e: | |
raise RuntimeError("Error Running inference with local model", e) | |
async def cut_timestamps_to_video(video_in, timestamps_var): | |
video_in = video_in[0] if isinstance(video_in, list) else video_in | |
if (video_in == None or timestamps_var == None): | |
raise ValueError("Inputs undefined") | |
video_path = Path(video_in.name) | |
video_file_name = video_path.stem | |
timestamps_to_cut = [ | |
(timestamps_var[i]['timestamp'][0], timestamps_var[i]['timestamp'][1]) | |
for i in range(len(timestamps_var)) if timestamps_var[i]['state']] | |
between_str = '+'.join( | |
map(lambda t: f'between(t,{t[0]},{t[1]})', timestamps_to_cut)) | |
if timestamps_to_cut: | |
video_file = ffmpeg.input(video_path) | |
video = video_file.video.filter( | |
"select", f'({between_str})').filter("setpts", "N/FRAME_RATE/TB") | |
audio = video_file.audio.filter( | |
"aselect", f'({between_str})').filter("asetpts", "N/SR/TB") | |
output_video = f'./videos_out/{video_file_name}.mp4' | |
ffmpeg.concat(video, audio, v=1, a=1).output( | |
output_video).overwrite_output().global_args('-loglevel', 'quiet').run() | |
else: | |
output_video = video_path | |
return output_video | |
css = """ | |
#words-container { | |
max-height: 400px; | |
overflow-y: scroll !important; | |
} | |
""" | |
with gr.Blocks(css=css) as demo: | |
timestamps_var = gr.JSON(visible=False) | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown(""" | |
# Whisper: Word-Level Video Trimming | |
Quick edit a video by trimming out words. | |
Using the [Huggingface Automatic Speech Recognition Pipeline](https://huggingface.co/tasks/automatic-speech-recognition) | |
with [Whisper](https://huggingface.co/docs/transformers/model_doc/whisper) | |
""") | |
with gr.Row(): | |
with gr.Column(): | |
file_upload = gr.File( | |
label="Upload Video File", file_count=1, scale=1) | |
video_preview = gr.Video( | |
label="Video Preview", scale=3, interactive=False) | |
# with gr.Row(): | |
# transcribe_btn = gr.Button( | |
# "Transcribe Audio") | |
with gr.Column(): | |
text_in = gr.HighlightedText( | |
label="Transcription", combine_adjacent=False, show_legend=True, color_map={"+": "green", "-": "red"}, elem_id="words-container") | |
with gr.Row(): | |
cut_btn = gr.Button("Cut Video") | |
select_all_words = gr.Button("Select All Words") | |
reset_words = gr.Button("Reset Words") | |
video_out = gr.Video(label="Video Out") | |
with gr.Row(): | |
gr.Examples( | |
fn=speech_to_text, | |
examples=["./examples/ShiaLaBeouf.mp4", | |
"./examples/zuckyuval.mp4", | |
"./examples/cooking.mp4"], | |
inputs=[file_upload], | |
outputs=[text_in, timestamps_var, video_preview], | |
cache_examples=True) | |
with gr.Row(): | |
gr.Markdown(""" | |
#### Video Credits | |
1. [Cooking](https://vimeo.com/573792389) | |
1. [Shia LaBeouf "Just Do It"](https://www.youtube.com/watch?v=n2lTxIk_Dr0) | |
1. [Mark Zuckerberg & Yuval Noah Harari in Conversation](https://www.youtube.com/watch?v=Boj9eD0Wug8) | |
""") | |
def select_text(evt: gr.SelectData, timestamps_var): | |
index = evt.index | |
timestamps_var[index]['state'] = not timestamps_var[index]['state'] | |
words = [(word['word'], '+' if word['state'] else '-') | |
for word in timestamps_var] | |
return timestamps_var, words | |
def words_selection(timestamps_var, reset=False): | |
if reset: | |
for word in timestamps_var: | |
word['state'] = True | |
else: | |
# reverse the state of all words | |
for word in timestamps_var: | |
word['state'] = False | |
words = [(word['word'], '+' if word['state'] else '-') | |
for word in timestamps_var] | |
return timestamps_var, words | |
file_upload.upload(speech_to_text, inputs=[file_upload], outputs=[ | |
text_in, timestamps_var, video_preview]) | |
select_all_words.click(words_selection, inputs=[timestamps_var], outputs=[ | |
timestamps_var, text_in], queue=False, show_progress=False) | |
reset_words.click(lambda x: words_selection(x, True), inputs=[timestamps_var], outputs=[ | |
timestamps_var, text_in], queue=False, show_progress=False) | |
text_in.select(select_text, inputs=timestamps_var, | |
outputs=[timestamps_var, text_in], queue=False, show_progress=False) | |
# transcribe_btn.click(speech_to_text, inputs=[file_upload], outputs=[ | |
# text_in, transcription_var, timestamps_var, video_preview]) | |
cut_btn.click(cut_timestamps_to_video, [ | |
file_upload, timestamps_var], [video_out]) | |
demo.queue() | |
if __name__ == "__main__": | |
demo.launch(debug=True) | |