import os from typing import List, Tuple import multiprocessing import numpy as np import pandas as pd import streamlit as st import torch from torch import Tensor from decord import VideoReader, cpu from transformers import AutoFeatureExtractor, TimesformerForVideoClassification np.random.seed(0) st.set_page_config( page_title="TimeSFormer", page_icon="🧊", layout="wide", initial_sidebar_state="expanded", menu_items={ "Get Help": "https://www.extremelycoolapp.com/help", "Report a bug": "https://www.extremelycoolapp.com/bug", "About": "# This is a header. This is an *extremely* cool app!", }, ) def sample_frame_indices( clip_len: int, frame_sample_rate: float, seg_len: int ) -> np.ndarray: converted_len = int(clip_len * frame_sample_rate) end_idx = np.random.randint(converted_len, seg_len) start_idx = end_idx - converted_len indices = np.linspace(start_idx, end_idx, num=clip_len) indices = np.clip(indices, start_idx, end_idx - 1).astype(np.int64) return indices # @st.cache_resource @st.experimental_singleton def load_model(model_name: str): if "base-finetuned-k400" in model_name or "base-finetuned-k600" in model_name: feature_extractor = AutoFeatureExtractor.from_pretrained( "MCG-NJU/videomae-base-finetuned-kinetics" ) else: feature_extractor = AutoFeatureExtractor.from_pretrained(model_name) model = TimesformerForVideoClassification.from_pretrained(model_name) return feature_extractor, model def inference(file_path: str): videoreader = VideoReader(VIDEO_TMP_PATH, num_threads=1, ctx=cpu(0)) # sample 8 frames videoreader.seek(0) indices = sample_frame_indices( clip_len=8, frame_sample_rate=4, seg_len=len(videoreader) ) video = videoreader.get_batch(indices).asnumpy() inputs = feature_extractor(list(video), return_tensors="pt") with torch.no_grad(): outputs = model(**inputs) logits: Tensor = outputs.logits # model predicts one of the 400 Kinetics-400 classes predicted_label = logits.argmax(-1).item() print(model.config.id2label[predicted_label]) TOP_K = 12 # logits = np.squeeze(logits) logits = logits.squeeze().numpy() indices = np.argsort(logits)[::-1][:TOP_K] values = logits[indices] results: List[Tuple[str, float]] = [] for index, value in zip(indices, values): predicted_label = model.config.id2label[index] print(f"Label: {predicted_label} - {value:.2f}%") results.append((predicted_label, value)) return pd.DataFrame(results, columns=("Label", "Confidence")) st.title("TimeSFormer") with st.expander("INTRODUCTION"): st.text( f"""Streamlit demo for TimeSFormer. Author: Hiep Phuoc Secondary High School Number of CPU(s): {multiprocessing.cpu_count()} """ ) model_name = st.selectbox( "model_name", ( "facebook/timesformer-base-finetuned-k400", "facebook/timesformer-base-finetuned-k600", "facebook/timesformer-base-finetuned-ssv2", "facebook/timesformer-hr-finetuned-k600", "facebook/timesformer-hr-finetuned-k400", "facebook/timesformer-hr-finetuned-ssv2", "fcakyon/timesformer-large-finetuned-k400", "fcakyon/timesformer-large-finetuned-k600", ), ) feature_extractor, model = load_model(model_name) VIDEO_TMP_PATH = os.path.join("tmp", "tmp.mp4") uploadedfile = st.file_uploader("Upload file", type=["mp4"]) if uploadedfile is not None: with st.spinner(): with open(VIDEO_TMP_PATH, "wb") as f: f.write(uploadedfile.getbuffer()) with st.spinner("Processing..."): df = inference(VIDEO_TMP_PATH) st.dataframe(df) st.video(VIDEO_TMP_PATH)