# Base Framework import torch # For data transformation from torchvision import transforms from torchvision.transforms import v2 # For ML Model import transformers from transformers import VivitImageProcessor, VivitConfig, VivitModel, VivitForVideoClassification from transformers import set_seed # For Data Loaders import datasets from torch.utils.data import Dataset, DataLoader # For GPU from accelerate import Accelerator, notebook_launcher # Use PyTorch bridge for Decord import decord from decord.bridge import set_bridge decord.bridge.set_bridge("torch") from decord import VideoReader # General Libraries import os import PIL import gc import pandas as pd import numpy as np from torch.nn import Linear, Softmax import gradio as gr import cv2 import io import tempfile # Mediapipe Library import mediapipe as mp from mediapipe.tasks import python from mediapipe.tasks.python import vision from mediapipe import solutions from mediapipe.framework.formats import landmark_pb2 # Constants CLIP_LENGTH = 32 FRAME_STEPS = 4 CLIP_SIZE = 224 BATCH_SIZE = 1 SEED = 42 # Set the device (GPU or CPU) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # pretrained Model MODEL_TRANSFORMER = 'google/vivit-b-16x2' # Set Paths #model_path = 'vivit_pytorch_loss051.pt' model_path_2_pytorch = 'vivit_pytorch_GPU_6_acc087.pt' #model_path_2_transformer = '' data_path = 'signs' # Custom CSS to control output video size custom_css = """ #landmarked_video { max-height: 300px; max-width: 600px; object-fit: fill; width: 100%; height: 100%; } """ # Create Mediapipe Objects mp_drawing = mp.solutions.drawing_utils mp_drawing_styles = mp.solutions.drawing_styles mp_hands = mp.solutions.hands mp_face = mp.solutions.face_mesh mp_pose = mp.solutions.pose mp_holistic = mp.solutions.holistic hand_model_path = 'hand_landmarker.task' pose_model_path = 'pose_landmarker.task' BaseOptions = mp.tasks.BaseOptions HandLandmarker = mp.tasks.vision.HandLandmarker HandLandmarkerOptions = mp.tasks.vision.HandLandmarkerOptions PoseLandmarker = mp.tasks.vision.PoseLandmarker PoseLandmarkerOptions = mp.tasks.vision.PoseLandmarkerOptions VisionRunningMode = mp.tasks.vision.RunningMode # Create a hand landmarker instance with the video mode: options_hand = HandLandmarkerOptions( base_options=BaseOptions(model_asset_path = hand_model_path), running_mode=VisionRunningMode.VIDEO) # Create a pose landmarker instance with the video mode: options_pose = PoseLandmarkerOptions( base_options=BaseOptions(model_asset_path=pose_model_path), running_mode=VisionRunningMode.VIDEO) detector_hand = vision.HandLandmarker.create_from_options(options_hand) detector_pose = vision.PoseLandmarker.create_from_options(options_pose) holistic = mp_holistic.Holistic( static_image_mode=False, model_complexity=1, smooth_landmarks=True, enable_segmentation=False, refine_face_landmarks=True, min_detection_confidence=0.5, min_tracking_confidence=0.5 ) # Creating Dataset class CreateDatasetProd(): def __init__(self , clip_len , clip_size , frame_step ): super().__init__() self.clip_len = clip_len self.clip_size = clip_size self.frame_step = frame_step # Define a sample transformation pipeline #self.transform_prod = transforms.v2.Compose([ # transforms.v2.ToImage(), # transforms.v2.Resize((self.clip_size, self.clip_size)), # transforms.v2.ToDtype(torch.float32, scale=True) # ]) self.transform_prod = v2.Compose([ v2.ToImage(), v2.Resize((self.clip_size, self.clip_size)), v2.ToDtype(torch.float32, scale=True) ]) def read_video(self, video_path): # Read the video and convert to frames vr = VideoReader(video_path) total_frames = len(vr) # Determine frame indices based on total frames if total_frames < self.clip_len: key_indices = list(range(total_frames)) for _ in range(self.clip_len - len(key_indices)): key_indices.append(key_indices[-1]) else: key_indices = list(range(0, total_frames, max(1, total_frames // self.clip_len)))[:self.clip_len] #load frames frames = vr.get_batch(key_indices) del vr # Force garbage collection gc.collect() return frames def add_landmarks(self, video): annotated_image = [] for frame in video: #Convert pytorch Tensor to CV2 image image = frame.permute(1, 2, 0).numpy() # Convert to (H, W, C) format for mediapipe to work results = holistic.process(image) mp_drawing.draw_landmarks( image, results.left_hand_landmarks, mp_hands.HAND_CONNECTIONS, landmark_drawing_spec = mp_drawing_styles.get_default_hand_landmarks_style(), connection_drawing_spec = mp_drawing_styles.get_default_hand_connections_style() ) mp_drawing.draw_landmarks( image, results.right_hand_landmarks, mp_hands.HAND_CONNECTIONS, landmark_drawing_spec = mp_drawing_styles.get_default_hand_landmarks_style(), connection_drawing_spec = mp_drawing_styles.get_default_hand_connections_style() ) mp_drawing.draw_landmarks( image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS, landmark_drawing_spec = mp_drawing_styles.get_default_pose_landmarks_style(), #connection_drawing_spec = None ) annotated_image.append(torch.from_numpy(image)) del image, results # Force garbage collection gc.collect() return torch.stack(annotated_image) def create_dataset(self, video_paths): # Read and process Videos video = self.read_video(video_paths) video = torch.from_numpy(video.asnumpy()) #video = transforms.v2.functional.resize(video.permute(0, 3, 1, 2), size=(self.clip_size*2, self.clip_size*3)) # Auto converts to (F, C, H, W) format video = v2.functional.resize(video.permute(0, 3, 1, 2), size=(self.clip_size*2, self.clip_size*3)) # Auto converts to (F, C, H, W) format video = self.add_landmarks(video) # Data Preperation for ML Model without Augmentation video = self.transform_prod(video.permute(0, 3, 1, 2)) pixel_values = video.to(device) # Force garbage collection del video gc.collect() return pixel_values #CustomDatasetProd(pixel_values=pixel_values) # Creating Dataloader object dataset_prod_obj = CreateDatasetProd(CLIP_LENGTH, CLIP_SIZE, FRAME_STEPS) # Creating ML Model class SignClassificationModel(torch.nn.Module): def __init__(self, model_name, idx_to_label, label_to_idx, classes_len): super(SignClassificationModel, self).__init__() self.config = VivitConfig.from_pretrained(model_name, id2label=idx_to_label, label2id=label_to_idx, hidden_dropout_prob=hyperparameters['dropout_rate'], attention_probs_dropout_prob=hyperparameters['dropout_rate'], return_dict=True) self.backbone = VivitModel.from_pretrained(model_name, config=self.config) # Load ViT model self.ff_head = Linear(self.backbone.config.hidden_size, classes_len) def forward(self, images): x = self.backbone(images).last_hidden_state # Extract embeddings self.backbone.gradient_checkpointing_enable() # Reduce along emb_dimension1 (axis 1) reduced_tensor = x.mean(dim=1) reduced_tensor = self.ff_head(reduced_tensor) return reduced_tensor # Load the model #model_pretrained = torch.load(model_path, map_location=device, weights_only=False) #torch.device('cpu') model_pretrained_2 = torch.load(model_path_2_pytorch, map_location=device, weights_only=False) #model_pretrained_2 = VivitForVideoClassification.from_pretrained(model_path_2_transformer) # Evaluation Function def prod_function(model_pretrained, prod_ds): # Initialize accelerator accelerator = Accelerator() if accelerator.is_main_process: datasets.utils.logging.set_verbosity_warning() transformers.utils.logging.set_verbosity_info() else: datasets.utils.logging.set_verbosity_error() transformers.utils.logging.set_verbosity_error() # The seed need to be set before we instantiate the model, as it will determine the random head. set_seed(SEED) # There is no specific order to remember, we just need to unpack the objects in the same order we gave them to the prepare method. accelerated_model, acclerated_prod_ds = accelerator.prepare(model_pretrained, prod_ds) # Evaluate at the end of the epoch accelerated_model.eval() with torch.no_grad(): outputs = accelerated_model(acclerated_prod_ds.unsqueeze(0)) #prod_logits = outputs.squeeze(1) #prod_pred = prod_logits.argmax(-1) prod_logits = outputs.logits prod_softmax = torch.nn.functional.softmax(prod_logits, dim=-1) prod_pred = prod_softmax.argmax(-1) return prod_pred # Function to get landmarked video def save_video_to_mp4(video_tensor, fps=10): # Convert pytorch tensor to numpy ndarray video_numpy = video_tensor.permute(0, 2, 3, 1).cpu().numpy() # Normalize values to [0, 255] if necessary if video_numpy.max() <= 1.0: video_numpy = (video_numpy * 255).astype(np.uint8) # Create a temporary file to save the video temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") output_path = temp_file.name ## Create an in-memory byte buffer to store the video #byte_buffer = io.BytesIO() # Get video dimensions height, width, channels = video_numpy[0].shape fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Codec for .mp4 # Create VideoWriter object out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) #out = cv2.VideoWriter(byte_buffer, fourcc, fps, (width, height), isColor=True) # Write the frames to the output file for frame in video_numpy: # Convert RGB back to BGR for OpenCV frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) out.write(frame_bgr) out.release() ## Return the byte buffer's content (the video as bytes) #byte_buffer.seek(0) return output_path #byte_buffer.read() # Function to list available videos dynamically def list_videos(): if os.path.exists(data_path): video_lst = [f for f in os.listdir(data_path) if f.endswith((".mp4", ".mov", ".MOV", ".webm", ".avi"))] return video_lst # Function to return the selected video path def play_video(selected_video): return os.path.join(data_path, selected_video) if selected_video else None # Get Landmarked video # Main Function for tab - Gesture recognition def translate_sign_language(gesture): # Create Dataset prod_ds = dataset_prod_obj.create_dataset(gesture) prod_video_path = save_video_to_mp4(prod_ds) #prod_video = np.random.randint(0, 255, (32, 225, 225, 3), dtype=np.uint8) # Run ML Model #predicted_prod_label = prod_function(model_pretrained, prod_ds) predicted_prod_label = prod_function(model_pretrained_2, prod_ds) # Identify the hand gesture predicted_prod_label = predicted_prod_label.squeeze(0) idx_to_label = model_pretrained_2.config.id2label gesture_translation = idx_to_label[predicted_prod_label.cpu().numpy().item()] # Convert to a scalar # Frame generator for real-time streaming #def frame_generator(): # for frame in prod_video: # yield frame # Stream frame-by-frame return gesture_translation , prod_video_path # frame_generator with gr.Blocks(css=custom_css) as demo: gr.Markdown("# Indian Sign Language Translation App") # Gesture recognition Tab with gr.Tab("Gesture recognition"): #with gr.Row(height=350, variant="panel"): # equal_height=False, show_progress=True # with gr.Column(scale=1, variant="panel"): # # Add webcam input for sign language video capture # video_input = gr.Video(sources=["webcam"], format="mp4", label="Gesture") # with gr.Column(scale=1, variant="panel"): # # Display the landmarked video # video_output = gr.Video(interactive=False, autoplay=True, # streaming=False, label="Landmarked Gesture", # elem_id="landmarked_video" # ) #with gr.Row(variant="panel"): # equal_height=False, show_progress=True # with gr.Column(scale=1, variant="panel"): # # Submit the Video # video_button = gr.Button("Submit") # with gr.Column(): #scale=1, variant="panel" # # Add a button or functionality to process the video # text_output = gr.Textbox(label="Translation in English") with gr.Row(): with gr.Column(scale=0.9, variant="panel"): with gr.Row(height=350, variant="panel"): # Add webcam input for sign language video capture video_input = gr.Video(sources=["webcam"], format="mp4", label="Gesture") with gr.Row(variant="panel"): # Submit the Video video_button = gr.Button("Submit") # Add a button or functionality to process the video text_output = gr.Textbox(label="Translation in English") with gr.Column(scale=1, variant="panel"): with gr.Row(): # Display the landmarked video video_output = gr.Video(interactive=False, autoplay=True, streaming=False, label="Landmarked Gesture" #elem_id="landmarked_video" ) # Set up the interface video_button.click(translate_sign_language, inputs=video_input, outputs=[text_output, video_output]) #landmarked_video.change(translate_sign_language, inputs=landmarked_video, outputs=[text_output, video_output]) # Indian Sign Language gesture reference tab with gr.Tab("Indian Sign Language gesture reference"): with gr.Row(height=500, variant="panel", equal_height=False, show_progress=True): with gr.Column(scale=1, variant="panel"): video_dropdown = gr.Dropdown(choices=list_videos(), label="ISL gestures", info="More gestures comming soon!") search_button = gr.Button("Search Gesture") with gr.Column(scale=1, variant="panel"): search_output = gr.Video(streaming=False, label="ISL gestures Video") # Set up the interface search_button.click(play_video, inputs=video_dropdown, outputs=search_output) if __name__ == "__main__": demo.launch()