Kaushik066's picture
Update app.py
f95d578 verified
raw
history blame
15.7 kB
# Base Framework
import torch
# For data transformation
from torchvision import transforms
from torchvision.transforms import v2
# For ML Model
import transformers
from transformers import VivitImageProcessor, VivitConfig, VivitModel, VivitForVideoClassification
from transformers import set_seed
# For Data Loaders
import datasets
from torch.utils.data import Dataset, DataLoader
# For GPU
from accelerate import Accelerator, notebook_launcher
# Use PyTorch bridge for Decord
import decord
from decord.bridge import set_bridge
decord.bridge.set_bridge("torch")
from decord import VideoReader
# General Libraries
import os
import PIL
import gc
import pandas as pd
import numpy as np
from torch.nn import Linear, Softmax
import gradio as gr
import cv2
import io
import tempfile
# Mediapipe Library
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
from mediapipe import solutions
from mediapipe.framework.formats import landmark_pb2
# Constants
CLIP_LENGTH = 32
FRAME_STEPS = 4
CLIP_SIZE = 224
BATCH_SIZE = 1
SEED = 42
# Set the device (GPU or CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# pretrained Model
MODEL_TRANSFORMER = 'google/vivit-b-16x2'
# Set Paths
#model_path = 'vivit_pytorch_loss051.pt'
model_path_2_pytorch = 'vivit_pytorch_GPU_6_acc087.pt'
#model_path_2_transformer = ''
data_path = 'signs'
# Custom CSS to control output video size
custom_css = """
#landmarked_video {
max-height: 300px;
max-width: 600px;
object-fit: fill;
width: 100%;
height: 100%;
}
"""
# Create Mediapipe Objects
mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles
mp_hands = mp.solutions.hands
mp_face = mp.solutions.face_mesh
mp_pose = mp.solutions.pose
mp_holistic = mp.solutions.holistic
hand_model_path = 'hand_landmarker.task'
pose_model_path = 'pose_landmarker.task'
BaseOptions = mp.tasks.BaseOptions
HandLandmarker = mp.tasks.vision.HandLandmarker
HandLandmarkerOptions = mp.tasks.vision.HandLandmarkerOptions
PoseLandmarker = mp.tasks.vision.PoseLandmarker
PoseLandmarkerOptions = mp.tasks.vision.PoseLandmarkerOptions
VisionRunningMode = mp.tasks.vision.RunningMode
# Create a hand landmarker instance with the video mode:
options_hand = HandLandmarkerOptions(
base_options=BaseOptions(model_asset_path = hand_model_path),
running_mode=VisionRunningMode.VIDEO)
# Create a pose landmarker instance with the video mode:
options_pose = PoseLandmarkerOptions(
base_options=BaseOptions(model_asset_path=pose_model_path),
running_mode=VisionRunningMode.VIDEO)
detector_hand = vision.HandLandmarker.create_from_options(options_hand)
detector_pose = vision.PoseLandmarker.create_from_options(options_pose)
holistic = mp_holistic.Holistic(
static_image_mode=False,
model_complexity=1,
smooth_landmarks=True,
enable_segmentation=False,
refine_face_landmarks=True,
min_detection_confidence=0.5,
min_tracking_confidence=0.5
)
# Creating Dataset
class CreateDatasetProd():
def __init__(self
, clip_len
, clip_size
, frame_step
):
super().__init__()
self.clip_len = clip_len
self.clip_size = clip_size
self.frame_step = frame_step
# Define a sample transformation pipeline
#self.transform_prod = transforms.v2.Compose([
# transforms.v2.ToImage(),
# transforms.v2.Resize((self.clip_size, self.clip_size)),
# transforms.v2.ToDtype(torch.float32, scale=True)
# ])
self.transform_prod = v2.Compose([
v2.ToImage(),
v2.Resize((self.clip_size, self.clip_size)),
v2.ToDtype(torch.float32, scale=True)
])
def read_video(self, video_path):
# Read the video and convert to frames
vr = VideoReader(video_path)
total_frames = len(vr)
# Determine frame indices based on total frames
if total_frames < self.clip_len:
key_indices = list(range(total_frames))
for _ in range(self.clip_len - len(key_indices)):
key_indices.append(key_indices[-1])
else:
key_indices = list(range(0, total_frames, max(1, total_frames // self.clip_len)))[:self.clip_len]
#load frames
frames = vr.get_batch(key_indices)
del vr
# Force garbage collection
gc.collect()
return frames
def add_landmarks(self, video):
annotated_image = []
for frame in video:
#Convert pytorch Tensor to CV2 image
image = frame.permute(1, 2, 0).numpy() # Convert to (H, W, C) format for mediapipe to work
results = holistic.process(image)
mp_drawing.draw_landmarks(
image,
results.left_hand_landmarks,
mp_hands.HAND_CONNECTIONS,
landmark_drawing_spec = mp_drawing_styles.get_default_hand_landmarks_style(),
connection_drawing_spec = mp_drawing_styles.get_default_hand_connections_style()
)
mp_drawing.draw_landmarks(
image,
results.right_hand_landmarks,
mp_hands.HAND_CONNECTIONS,
landmark_drawing_spec = mp_drawing_styles.get_default_hand_landmarks_style(),
connection_drawing_spec = mp_drawing_styles.get_default_hand_connections_style()
)
mp_drawing.draw_landmarks(
image,
results.pose_landmarks,
mp_holistic.POSE_CONNECTIONS,
landmark_drawing_spec = mp_drawing_styles.get_default_pose_landmarks_style(),
#connection_drawing_spec = None
)
annotated_image.append(torch.from_numpy(image))
del image, results
# Force garbage collection
gc.collect()
return torch.stack(annotated_image)
def create_dataset(self, video_paths):
# Read and process Videos
video = self.read_video(video_paths)
video = torch.from_numpy(video.asnumpy())
#video = transforms.v2.functional.resize(video.permute(0, 3, 1, 2), size=(self.clip_size*2, self.clip_size*3)) # Auto converts to (F, C, H, W) format
video = v2.functional.resize(video.permute(0, 3, 1, 2), size=(self.clip_size*2, self.clip_size*3)) # Auto converts to (F, C, H, W) format
video = self.add_landmarks(video)
# Data Preperation for ML Model without Augmentation
video = self.transform_prod(video.permute(0, 3, 1, 2))
pixel_values = video.to(device)
# Force garbage collection
del video
gc.collect()
return pixel_values #CustomDatasetProd(pixel_values=pixel_values)
# Creating Dataloader object
dataset_prod_obj = CreateDatasetProd(CLIP_LENGTH, CLIP_SIZE, FRAME_STEPS)
# Creating ML Model
class SignClassificationModel(torch.nn.Module):
def __init__(self, model_name, idx_to_label, label_to_idx, classes_len):
super(SignClassificationModel, self).__init__()
self.config = VivitConfig.from_pretrained(model_name, id2label=idx_to_label,
label2id=label_to_idx, hidden_dropout_prob=hyperparameters['dropout_rate'],
attention_probs_dropout_prob=hyperparameters['dropout_rate'],
return_dict=True)
self.backbone = VivitModel.from_pretrained(model_name, config=self.config) # Load ViT model
self.ff_head = Linear(self.backbone.config.hidden_size, classes_len)
def forward(self, images):
x = self.backbone(images).last_hidden_state # Extract embeddings
self.backbone.gradient_checkpointing_enable()
# Reduce along emb_dimension1 (axis 1)
reduced_tensor = x.mean(dim=1)
reduced_tensor = self.ff_head(reduced_tensor)
return reduced_tensor
# Load the model
#model_pretrained = torch.load(model_path, map_location=device, weights_only=False) #torch.device('cpu')
model_pretrained_2 = torch.load(model_path_2_pytorch, map_location=device, weights_only=False)
#model_pretrained_2 = VivitForVideoClassification.from_pretrained(model_path_2_transformer)
# Evaluation Function
def prod_function(model_pretrained, prod_ds):
# Initialize accelerator
accelerator = Accelerator()
if accelerator.is_main_process:
datasets.utils.logging.set_verbosity_warning()
transformers.utils.logging.set_verbosity_info()
else:
datasets.utils.logging.set_verbosity_error()
transformers.utils.logging.set_verbosity_error()
# The seed need to be set before we instantiate the model, as it will determine the random head.
set_seed(SEED)
# There is no specific order to remember, we just need to unpack the objects in the same order we gave them to the prepare method.
accelerated_model, acclerated_prod_ds = accelerator.prepare(model_pretrained, prod_ds)
# Evaluate at the end of the epoch
accelerated_model.eval()
with torch.no_grad():
outputs = accelerated_model(acclerated_prod_ds.unsqueeze(0))
#prod_logits = outputs.squeeze(1)
#prod_pred = prod_logits.argmax(-1)
prod_logits = outputs.logits
prod_softmax = torch.nn.functional.softmax(prod_logits, dim=-1)
prod_pred = prod_softmax.argmax(-1)
return prod_pred
# Function to get landmarked video
def save_video_to_mp4(video_tensor, fps=10):
# Convert pytorch tensor to numpy ndarray
video_numpy = video_tensor.permute(0, 2, 3, 1).cpu().numpy()
# Normalize values to [0, 255] if necessary
if video_numpy.max() <= 1.0:
video_numpy = (video_numpy * 255).astype(np.uint8)
# Create a temporary file to save the video
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4")
output_path = temp_file.name
## Create an in-memory byte buffer to store the video
#byte_buffer = io.BytesIO()
# Get video dimensions
height, width, channels = video_numpy[0].shape
fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Codec for .mp4
# Create VideoWriter object
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
#out = cv2.VideoWriter(byte_buffer, fourcc, fps, (width, height), isColor=True)
# Write the frames to the output file
for frame in video_numpy:
# Convert RGB back to BGR for OpenCV
frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
out.write(frame_bgr)
out.release()
## Return the byte buffer's content (the video as bytes)
#byte_buffer.seek(0)
return output_path #byte_buffer.read()
# Function to list available videos dynamically
def list_videos():
if os.path.exists(data_path):
video_lst = [f for f in os.listdir(data_path) if f.endswith((".mp4", ".mov", ".MOV", ".webm", ".avi"))]
return video_lst
# Function to return the selected video path
def play_video(selected_video):
return os.path.join(data_path, selected_video) if selected_video else None
# Get Landmarked video
# Main Function for tab - Gesture recognition
def translate_sign_language(gesture):
# Create Dataset
prod_ds = dataset_prod_obj.create_dataset(gesture)
prod_video_path = save_video_to_mp4(prod_ds)
#prod_video = np.random.randint(0, 255, (32, 225, 225, 3), dtype=np.uint8)
# Run ML Model
#predicted_prod_label = prod_function(model_pretrained, prod_ds)
predicted_prod_label = prod_function(model_pretrained_2, prod_ds)
# Identify the hand gesture
predicted_prod_label = predicted_prod_label.squeeze(0)
idx_to_label = model_pretrained_2.config.id2label
gesture_translation = idx_to_label[predicted_prod_label.cpu().numpy().item()] # Convert to a scalar
# Frame generator for real-time streaming
#def frame_generator():
# for frame in prod_video:
# yield frame # Stream frame-by-frame
return gesture_translation , prod_video_path # frame_generator
with gr.Blocks(css=custom_css) as demo:
gr.Markdown("# Indian Sign Language Translation App")
# Gesture recognition Tab
with gr.Tab("Gesture recognition"):
#with gr.Row(height=350, variant="panel"): # equal_height=False, show_progress=True
# with gr.Column(scale=1, variant="panel"):
# # Add webcam input for sign language video capture
# video_input = gr.Video(sources=["webcam"], format="mp4", label="Gesture")
# with gr.Column(scale=1, variant="panel"):
# # Display the landmarked video
# video_output = gr.Video(interactive=False, autoplay=True,
# streaming=False, label="Landmarked Gesture",
# elem_id="landmarked_video"
# )
#with gr.Row(variant="panel"): # equal_height=False, show_progress=True
# with gr.Column(scale=1, variant="panel"):
# # Submit the Video
# video_button = gr.Button("Submit")
# with gr.Column(): #scale=1, variant="panel"
# # Add a button or functionality to process the video
# text_output = gr.Textbox(label="Translation in English")
with gr.Row():
with gr.Column(scale=0.9, variant="panel"):
with gr.Row(height=350, variant="panel"):
# Add webcam input for sign language video capture
video_input = gr.Video(sources=["webcam"], format="mp4", label="Gesture")
with gr.Row(variant="panel"):
# Submit the Video
video_button = gr.Button("Submit")
# Add a button or functionality to process the video
text_output = gr.Textbox(label="Translation in English")
with gr.Column(scale=1, variant="panel"):
with gr.Row():
# Display the landmarked video
video_output = gr.Video(interactive=False, autoplay=True,
streaming=False, label="Landmarked Gesture"
#elem_id="landmarked_video"
)
# Set up the interface
video_button.click(translate_sign_language, inputs=video_input, outputs=[text_output, video_output])
#landmarked_video.change(translate_sign_language, inputs=landmarked_video, outputs=[text_output, video_output])
# Indian Sign Language gesture reference tab
with gr.Tab("Indian Sign Language gesture reference"):
with gr.Row(height=500, variant="panel", equal_height=False, show_progress=True):
with gr.Column(scale=1, variant="panel"):
video_dropdown = gr.Dropdown(choices=list_videos(), label="ISL gestures", info="More gestures comming soon!")
search_button = gr.Button("Search Gesture")
with gr.Column(scale=1, variant="panel"):
search_output = gr.Video(streaming=False, label="ISL gestures Video")
# Set up the interface
search_button.click(play_video, inputs=video_dropdown, outputs=search_output)
if __name__ == "__main__":
demo.launch()