|
import gradio as gr |
|
import cv2 |
|
import os |
|
import spaces |
|
import tempfile |
|
from torchvision import transforms |
|
from torchvision.transforms import Compose |
|
import torch |
|
import numpy as np |
|
from PIL import Image |
|
import torch.nn.functional as F |
|
from pytorchvideo.transforms.functional import predict_depth |
|
from transformers import pipeline, TimesformerModel, VideoMAEImageProcessor |
|
from utils import * |
|
from algorithm import * |
|
|
|
@spaces.GPU |
|
def make_video(video_path, outdir='./summarized_video',encoder='Kmeans'): |
|
if encoder not in ["Kmeans", "Sum of Squared Difference 01", "Sum of Squared Difference 02"]: |
|
encoder = "Kmeans" |
|
|
|
margin_width = 50 |
|
|
|
model, processor, device = load_model() |
|
|
|
|
|
|
|
|
|
if os.path.isfile(video_path): |
|
if video_path.endswith('txt'): |
|
with open(video_path, 'r') as f: |
|
lines = f.read().splitlines() |
|
else: |
|
filenames = [video_path] |
|
else: |
|
filenames = os.listdir(video_path) |
|
filenames = [os.path.join(video_path, filename) for filename in filenames if not filename.startswith('.')] |
|
filenames.sort() |
|
|
|
for k, filename in enumerate(filenames): |
|
print('Progress {:}/{:},'.format(k+1, len(filenames)), 'Processing', filename) |
|
|
|
raw_video = cv2.VideoCapture(filename) |
|
frame_width, frame_height = int(raw_video.get(cv2.CAP_PROP_FRAME_WIDTH)), int(raw_video.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
frame_rate = int(raw_video.get(cv2.CAP_PROP_FPS)) |
|
|
|
output_width = frame_width * 2 + margin_width |
|
|
|
filename = os.path.basename(filename) |
|
|
|
|
|
if "shortest_edge" in processor.size: |
|
height = width = processor.size["shortest_edge"] |
|
else: |
|
height = processor.size["height"] |
|
width = processor.size["width"] |
|
resize_to = (height, width) |
|
|
|
|
|
clip_sample_rate = 1 |
|
|
|
num_frames = 8 |
|
|
|
frames = [] |
|
features = [] |
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as tmpfile: |
|
output_path = tmpfile.name |
|
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
|
out = cv2.VideoWriter(output_path, fourcc, frame_rate, (output_width, frame_height)) |
|
|
|
|
|
while raw_video.isOpened(): |
|
ret, raw_frame = raw_video.read() |
|
if not ret: |
|
break |
|
|
|
raw_frame = cv2.resize(raw_frame, resize_to) |
|
frames.append(raw_frame) |
|
|
|
|
|
key_frames = frames[::clip_sample_rate] |
|
|
|
|
|
|
|
num_redudant_frames = len(key_frames) - (len(key_frames) % num_frames) |
|
|
|
|
|
final_key_frames = key_frames[:num_redudant_frames] |
|
|
|
|
|
for i in range(0, len(final_key_frames), num_frames): |
|
if i % num_frames*50 == 0: |
|
print(f"Loading {i}/{len(final_key_frames)}") |
|
|
|
|
|
input_frames = final_key_frames[i:i+num_frames] |
|
|
|
batch_features = extract_features(input_frames, device, model, processor) |
|
|
|
batch_features = np.array(batch_features.cpu().detach().numpy()) |
|
features.extend(batch_features) |
|
|
|
number_of_clusters = round(len(features)*0.15) |
|
|
|
selected_frames = [] |
|
if encoder == "Kmeans": |
|
selected_frames = kmeans(features, number_of_clusters) |
|
elif encoder == "Sum of Squared Difference 01": |
|
selected_frames = tt01(features, 400) |
|
else: |
|
selected_frames = tt02(features, 400) |
|
|
|
video_writer = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'mp4v'), frame_rate, (frames[0].shape[1], frames[0].shape[0])) |
|
for idx in selected_frames: |
|
video_writer.write(frames[idx]) |
|
|
|
raw_video.release() |
|
video_writer.release() |
|
print("Completed summarizing the video (wait for a moment to load).") |
|
return output_path |
|
|
|
css = """ |
|
#img-display-container { |
|
max-height: 100vh; |
|
} |
|
#img-display-input { |
|
max-height: 80vh; |
|
} |
|
#img-display-output { |
|
max-height: 80vh; |
|
} |
|
""" |
|
|
|
title = "# Video Summarization Demo" |
|
description = """Video Summarization using Timesformer. |
|
|
|
Author: Nguyen Hoai Nam. |
|
""" |
|
|
|
with gr.Blocks(css=css) as demo: |
|
gr.Markdown(title) |
|
gr.Markdown(description) |
|
gr.Markdown("### Video Summarization demo") |
|
|
|
with gr.Row(): |
|
input_video = gr.Video(label="Input Video") |
|
algorithm_type = gr.Dropdown(["Kmeans", "Sum of Squared Difference 01", "Sum of Squared Difference 02"], type="value", label='Algorithm') |
|
submit = gr.Button("Submit") |
|
processed_video = gr.Video(label="Summarized Video") |
|
|
|
def on_submit(uploaded_video,algorithm_type): |
|
|
|
|
|
|
|
pass |
|
|
|
|
|
submit.click(on_submit, inputs=[input_video, algorithm_type], outputs=processed_video) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
demo.queue().launch() |