# https://Og2-FoosballAnalytics.hf.space/ from fastapi import FastAPI, File, Form, UploadFile, HTTPException from pathlib import Path import os from pydantic import BaseModel import json import tensorflow as tf import numpy as np import cv2 import keras from keras.saving import register_keras_serializable from keras import layers from huggingface_hub import hf_hub_download from keras.applications.densenet import DenseNet121 from concurrent.futures import ThreadPoolExecutor import asyncio import pandas as pd from typing import List from huggingface_hub import HfApi import requests import io import base64 from PIL import Image #from tensorflow_docs.vis import embed app = FastAPI() UPLOAD_DIR = "uploads" os.makedirs(UPLOAD_DIR, exist_ok=True) @app.post("/upload-dropzone/") async def upload_file( file: UploadFile = File(...), chunkIndex: int = Form(...), totalChunks: int = Form(...), fileName: str = Form(...), directory: str = Form(...), ): try: print(f"Received: chunkIndex={chunkIndex}, totalChunks={totalChunks}, fileName={fileName}, directory={directory}") # Create the directory if it doesn't exist target_dir = Path(UPLOAD_DIR) / directory target_dir = target_dir.absolute() # Get the absolute path target_dir.mkdir(parents=True, exist_ok=True) # Save the chunk chunk_path = target_dir / f"{fileName}.part{chunkIndex}" with open(chunk_path, "wb") as f: f.write(await file.read()) # If it's the last chunk, reconstruct the file if chunkIndex + 1 == totalChunks: final_file_path = target_dir / fileName with open(final_file_path, "wb") as final_file: for i in range(totalChunks): part_path = target_dir / f"{fileName}.part{i}" with open(part_path, "rb") as part_file: final_file.write(part_file.read()) os.remove(part_path) # Remove the chunk after merging print(f"Final file path: {final_file_path}") # Lister tous les fichiers dans target_dir files_in_dir = list(target_dir.glob("*")) # Liste tous les fichiers (y compris les sous-dossiers) # Afficher les fichiers for file in files_in_dir: print(file) return { "status": "success", "message": "Chunk uploaded successfully.", "file_path": str(final_file_path) } return {"status": "success", "message": "Chunk uploaded successfully."} except Exception as e: raise HTTPException(status_code=500, detail=f"Upload failed: {str(e)}") # Available backend options are: "jax", "torch", "tensorflow". os.environ["KERAS_BACKEND"] = "tensorflow" # Charger le modèle Keras MAX_SEQ_LENGTH = 8 NUM_FEATURES = 1024 IMG_SIZE = 128 #center_crop_layer = layers.CenterCrop(IMG_SIZE, IMG_SIZE) # Au lieu de CenterCrop center_crop_layer = layers.Resizing(IMG_SIZE, IMG_SIZE) def crop_center(frame): cropped = center_crop_layer(frame[None, ...]) cropped = keras.ops.convert_to_numpy(cropped) cropped = keras.ops.squeeze(cropped) return cropped def build_feature_extractor(): feature_extractor = DenseNet121( weights="imagenet", include_top=False, pooling="avg", input_shape=(IMG_SIZE, IMG_SIZE, 3), ) preprocess_input = keras.applications.densenet.preprocess_input inputs = keras.Input((IMG_SIZE, IMG_SIZE, 3)) preprocessed = preprocess_input(inputs) outputs = feature_extractor(preprocessed) return keras.Model(inputs, outputs, name="feature_extractor") feature_extractor = build_feature_extractor() @keras.saving.register_keras_serializable() class PositionalEmbedding(layers.Layer): def __init__(self, sequence_length, output_dim, **kwargs): super().__init__(**kwargs) self.position_embeddings = layers.Embedding( input_dim=sequence_length, output_dim=output_dim ) self.sequence_length = sequence_length self.output_dim = output_dim def build(self, input_shape): self.position_embeddings.build(input_shape) def call(self, inputs): # The inputs are of shape: `(batch_size, frames, num_features)` inputs = keras.ops.cast(inputs, self.compute_dtype) length = keras.ops.shape(inputs)[1] positions = keras.ops.arange(start=0, stop=length, step=1) embedded_positions = self.position_embeddings(positions) return inputs + embedded_positions @keras.saving.register_keras_serializable() class TransformerEncoder(layers.Layer): def __init__(self, embed_dim, dense_dim, num_heads, **kwargs): super().__init__(**kwargs) self.embed_dim = embed_dim self.dense_dim = dense_dim self.num_heads = num_heads self.attention = layers.MultiHeadAttention( num_heads=num_heads, key_dim=embed_dim, dropout=0.3 ) self.dense_proj = keras.Sequential( [ layers.Dense(dense_dim, activation=keras.activations.gelu), layers.Dense(embed_dim), ] ) self.layernorm_1 = layers.LayerNormalization() self.layernorm_2 = layers.LayerNormalization() def call(self, inputs, mask=None): attention_output = self.attention(inputs, inputs, attention_mask=mask) proj_input = self.layernorm_1(inputs + attention_output) proj_output = self.dense_proj(proj_input) return self.layernorm_2(proj_input + proj_output) #model = keras.saving.load_model("hf://Og2/videoclassif") model = keras.saving.load_model("hf://Og2/videoclassif", custom_objects={'PositionalEmbedding': PositionalEmbedding, 'TransformerEncoder': TransformerEncoder}) # Identifier le modèle Hugging Face et le fichier que vous voulez lire model_repo = "Og2/videoclassif" # Remplacez par votre modèle spécifique file_name = "labels.txt" # Le fichier que vous voulez télécharger # Télécharger le fichier depuis Hugging Face Hub labels_file_path = hf_hub_download(repo_id=model_repo, filename=file_name) with open(labels_file_path, "r") as file: class_labels = [line.strip() for line in file] # Lecture du fichier et création de la liste #print("Tableau recréé à partir du fichier :") #print(class_labels) #read video def load_video(path, max_frames=0, offload_to_cpu=False): print("## load_video ##") cap = cv2.VideoCapture(path) frames = [] try: while True: ret, frame = cap.read() if not ret: break frame = frame[:, :, [2, 1, 0]] frame = crop_center(frame) if offload_to_cpu and keras.backend.backend() == "torch": frame = frame.to("cpu") frames.append(frame) if len(frames) == max_frames: break finally: cap.release() print("load_video finalized !") if offload_to_cpu and keras.backend.backend() == "torch": return np.array([frame.to("cpu").numpy() for frame in frames]) return np.array(frames) # test on video from val dataset def prepare_single_video(frames): frame_features = np.zeros(shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32") # Pad shorter videos. if len(frames) < MAX_SEQ_LENGTH: diff = MAX_SEQ_LENGTH - len(frames) padding = np.zeros((diff, IMG_SIZE, IMG_SIZE, 3)) frames = np.concatenate(frames, padding) frames = frames[None, ...] # Extract features from the frames of the current video. for i, batch in enumerate(frames): video_length = batch.shape[0] length = min(MAX_SEQ_LENGTH, video_length) for j in range(length): if np.mean(batch[j, :]) > 0.0: frame_features[i, j, :] = feature_extractor.predict(batch[None, j, :]) else: frame_features[i, j, :] = 0.0 return frame_features def predict_action(video): print("##### to be cancellled #####") frames = load_video(video, offload_to_cpu=True) frame_features = prepare_single_video(frames) probabilities = model.predict(frame_features)[0] # Obtenir le top 5 top_5_indices = np.argsort(probabilities)[::-1][:5] results = {class_labels[i]: float(probabilities[i]) for i in top_5_indices} #return results # Sauvegarder le JSON dans un fichier temporaire output_file = "result.json" with open(output_file, "w") as f: json.dump(results, f) return results # On va utiliser un ThreadPoolExecutor pour décharger les tâches lourdes executor = ThreadPoolExecutor(max_workers=10) # Vous pouvez ajuster max_workers selon vos besoins # Simulation de la fonction qui charge et prétraiterait la vidéo @app.post("/predict-action/") async def predict_action(uuid: str): # Renvoie immédiatement une réponse pour indiquer que le traitement a commencé print("##### predict-action started #####") # Définir le répertoire cible target_dir = Path(UPLOAD_DIR) / uuid target_dir = target_dir.absolute() # Get the absolute path # Lister tous les fichiers dans target_dir files_in_dir = list(target_dir.glob("*")) # Liste tous les fichiers (y compris les sous-dossiers) # Afficher les fichiers for file in files_in_dir: print(file) # Chercher le fichier vidéo dans le répertoire video_extensions = {".mp4", ".avi", ".mkv", ".mov", ".flv", ".wmv", ".webm"} # Liste des extensions vidéo courantes # Trouver le fichier vidéo (s'il n'y en a qu'un dans le répertoire) video_files = [file for file in target_dir.iterdir() if file.suffix.lower() in video_extensions] file_path = None if len(video_files) == 1: file_path = video_files[0] print(f"Video file found: {file_path}") elif len(video_files) > 1: print("Several video file or multiple video files found in the directory.") file_path = video_files[0] asyncio.create_task(run_video_processing(file_path)) # Démarre la tâche asynchrone return {"message": "Prediction started. Please check back later for results."} async def run_video_processing(file_path: str): # Cette fonction va utiliser l'exécuteur pour éviter de bloquer le thread principal loop = asyncio.get_event_loop() result = await loop.run_in_executor(executor, predict_video, file_path) return result def predict_video(video): print("##### predict_video started #####") # Charger les frames de la vidéo frames = load_video(video, offload_to_cpu=True) # Découper les frames en petits segments de 8 frames segment_size = MAX_SEQ_LENGTH total_frames = len(frames) print("total_frames = ", total_frames) segments = [] for i in range(0, total_frames, segment_size): # Découper un segment de 8 frames (ou moins si c'est la fin de la vidéo) segment = frames[i:i+segment_size] segments.append((i, segment)) # Conserver l'index du début du segment et le segment de frames # Liste pour stocker les données des colonnes data = [] # Analyser chaque segment de 8 frames for start_idx, segment in segments: frame_features = prepare_single_video(segment) probabilities = model.predict(frame_features)[0] # Obtenir le top 5 des classes les plus probables top_5_indices = np.argsort(probabilities)[::-1][:5] top_5_classes = [(class_labels[i], probabilities[i]) for i in top_5_indices] # Ajouter les informations sous forme de ligne row = { "start_frame": start_idx, "end_frame": min(start_idx + segment_size - 1, total_frames - 1), # Assurer que la frame finale n'excède pas le nombre total de frames } # Ajouter les classes et leurs pourcentages for rank, (label, prob) in enumerate(top_5_classes, start=1): row[f"top{rank}"] = label row[f"top{rank}%"] = prob # Ajouter des valeurs vides si moins de 5 classes sont disponibles for rank in range(len(top_5_classes) + 1, 6): row[f"top{rank}"] = None row[f"top{rank}%"] = None data.append(row) # Créer une DataFrame à partir des données df = pd.DataFrame(data) print("##### DataFrame created #####") print(df) results = ComputeStatistics(df) return results def ComputeStatistics(df): # Calculer les statistiques supplémentaires goalConceeded = df['top1'].str.startswith("Goal_2").sum() totalShots1 = df['top1'].str.startswith("Shot_1").sum() goal1_1 = df['top1'].str.startswith("Goal_1-3").sum() goal1_2 = df['top1'].str.startswith("Goal_1-2").sum() goal1_5 = df['top1'].str.startswith("Goal_1-5").sum() save1 = (df['top1'] == "Block_2-1").sum() # Compter uniquement si top1 est exactement "Block_2-1" # Statistiques supplémentaires totalShots2 = df['top1'].str.startswith("Shot_2").sum() totalGoal2 = df['top1'].str.startswith("Goal_2").sum() totalGoal1 = df['top1'].str.startswith("Goal_1").sum() totalBlock1 = (df['top1'] == "Block_1-1").sum() # Exact match pour "Block_1-1" totalBlock2 = (df['top1'] == "Block_2-1").sum() # Exact match pour "Block_2-1" # Calcul de la victoire vistory = 1 if totalGoal1 > totalGoal2 else 2 # Calcul des taux de sauvegarde saveRate1 = totalBlock1 / (totalBlock1 + totalGoal2) if (totalBlock1 + totalGoal2) > 0 else 0 saveRate2 = totalBlock2 / (totalBlock2 + totalGoal1) if (totalBlock2 + totalGoal1) > 0 else 0 # Calculer le temps du premier Goal_1 first_goal1_row = df[df['top1'].str.startswith("Goal_1")].iloc[0] if not df[df['top1'].str.startswith("Goal_1")].empty else None timeFirstGoal1 = (1 / 30) * first_goal1_row['start_frame'] if first_goal1_row is not None and 'start_frame' in first_goal1_row else None # Calculer le temps du premier Goal_2 first_goal2_row = df[df['top1'].str.startswith("Goal_2")].iloc[0] if not df[df['top1'].str.startswith("Goal_2")].empty else None timeFirstGoal2 = (1 / 30) * first_goal2_row['start_frame'] if first_goal2_row is not None and 'start_frame' in first_goal2_row else None # Calculer le taux de conversion convertionRate1 = totalGoal1 / totalShots1 if totalShots1 > 0 else 0 # Statistiques Clean Sheet cleanSheet1 = 1 if totalGoal2 > 0 else 0 cleanSheet2 = 1 if totalGoal1 > 0 else 0 # Créer un dictionnaire pour les statistiques statistics = { "goalConceeded": goalConceeded, "totalShots1": totalShots1, "goal1_1": goal1_1, "goal1_2": goal1_2, "goal1_5": goal1_5, "save1": save1, "timeFirstGoal1": timeFirstGoal1, "timeFirstGoal2": timeFirstGoal2, "convertionRate1": convertionRate1, "totalShots2": totalShots2, "totalGoal2": totalGoal2, "totalGoal1": totalGoal1, "totalBlock1": totalBlock1, "totalBlock2": totalBlock2, "vistory": vistory, "saveRate1": saveRate1, "saveRate2": saveRate2, "cleanSheet1": cleanSheet1, "cleanSheet2": cleanSheet2 } # Convertir les valeurs non compatibles en types natifs avant la sérialisation for key, value in statistics.items(): if isinstance(value, (np.integer, np.floating)): # Si NumPy statistics[key] = value.item() elif isinstance(value, pd.Timestamp): # Si c'est un Timestamp statistics[key] = value.isoformat() # Générer un JSON à partir des statistiques statistics_json = json.dumps(statistics, indent=4) print("##### Statistics JSON #####") print(statistics_json) return statistics_json UPLOAD_DIR = Path("/app/uploads") # Dossier temporaire pour stocker les chunks HF_TOKEN = os.getenv('HF_TOKEN') # 🔥 Remplace par ton token Hugging Face DATASET_REPO = "Og2/myDataSet" # 🔥 Remplace par ton dataset api = HfApi() @app.post("/upload-dataset/") async def upload_file( file: UploadFile = File(...), chunkIndex: int = Form(...), totalChunks: int = Form(...), fileName: str = Form(...), directory: str = Form(...), ): try: print(f"Received: chunkIndex={chunkIndex}, totalChunks={totalChunks}, fileName={fileName}, directory={directory}") # Créer le dossier temporaire si nécessaire target_dir = UPLOAD_DIR / directory target_dir.mkdir(parents=True, exist_ok=True) # Sauvegarder le chunk chunk_path = target_dir / f"{fileName}.part{chunkIndex}" with open(chunk_path, "wb") as f: f.write(await file.read()) # Reconstruction si dernier chunk reçu if chunkIndex + 1 == totalChunks: final_file_path = target_dir / fileName with open(final_file_path, "wb") as final_file: for i in range(totalChunks): part_path = target_dir / f"{fileName}.part{i}" with open(part_path, "rb") as part_file: final_file.write(part_file.read()) os.remove(part_path) # Supprimer les chunks après fusion print(f"Final file created: {final_file_path}") # 🔥 Upload vers Hugging Face api.upload_file( path_or_fileobj=str(final_file_path), path_in_repo=f"{directory}/{fileName}", # Stocker dans un sous-dossier du dataset repo_id=DATASET_REPO, repo_type="dataset", token=HF_TOKEN, ) # Supprimer le fichier local après upload os.remove(final_file_path) return { "status": "success", "message": "File uploaded successfully to Hugging Face.", "hf_url": f"https://huggingface.co/datasets/{DATASET_REPO}/blob/main/{directory}/{fileName}" } return {"status": "success", "message": "Chunk uploaded successfully."} except Exception as e: raise HTTPException(status_code=500, detail=f"Upload failed: {str(e)}") @app.get("/list-videos/") async def list_videos(): try: # Récupérer la liste des fichiers du dataset files = api.list_repo_files(repo_id=DATASET_REPO, repo_type="dataset", token=HF_TOKEN) # Filtrer les fichiers pour ne garder que les vidéos (par exemple .mp4, .avi, .mov) video_extensions = [".mp4", ".avi", ".mov", ".mkv", ".flv"] # Ajouter d'autres extensions si nécessaire video_files = [f for f in files if any(f.endswith(ext) for ext in video_extensions)] # Formater en JSON avec URLs complètes videos_list = [{"file_name": f, "url": f"https://huggingface.co/datasets/{DATASET_REPO}/blob/main/{f}"} for f in video_files] return {"status": "success", "videos": videos_list} except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to fetch videos: {str(e)}") @app.get("/get-video-frames/") async def get_video_frames(file_name: str, frame_id: int) -> dict: try: # URL du fichier vidéo dans le dataset video_url = f"https://huggingface.co/datasets/{DATASET_REPO}/resolve/main/{file_name}" # Télécharger la vidéo headers = {"Authorization": f"Bearer {HF_TOKEN}"} response = requests.get(video_url, headers=headers) if response.status_code != 200: raise HTTPException(status_code=404, detail="Vidéo introuvable dans le dataset") # Charger la vidéo en mémoire video_bytes = io.BytesIO(response.content) # Écriture dans un fichier temporaire pour OpenCV temp_video_path = "/tmp/temp_video.mp4" with open(temp_video_path, "wb") as f: f.write(video_bytes.getvalue()) # Ouvrir la vidéo avec OpenCV cap = cv2.VideoCapture(temp_video_path) if not cap.isOpened(): raise HTTPException(status_code=500, detail="Impossible de charger la vidéo") # Obtenir le nombre total de frames dans la vidéo total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) # Définir la plage de frames à extraire start_frame = max(0, frame_id ) end_frame = min(total_frames, frame_id + 30) # extraction de 12 framees frames = [] frame_size = (128, 128) # Taille des images pour Bubble # Lire les frames dans la plage définie for i in range(start_frame, end_frame): cap.set(cv2.CAP_PROP_POS_FRAMES, i) ret, frame = cap.read() if not ret: break # Arrêter si la lecture échoue # Convertir la frame BGR en RGB frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # Redimensionner la frame img = Image.fromarray(frame_rgb).resize(frame_size) # Sauvegarder l'image dans un buffer img_byte_arr = io.BytesIO() img.save(img_byte_arr, format="PNG") img_byte_arr.seek(0) # Encoder l'image en base64 avec le préfixe Bubble img_base64 = f"data:image/png;base64,{base64.b64encode(img_byte_arr.getvalue()).decode('utf-8')}" # Ajouter à la liste des frames frames.append({"frame_index": i, "image": img_base64}) cap.release() return {"status": "success", "frames": frames} except Exception as e: raise HTTPException(status_code=500, detail=f"Erreur lors de l'extraction des frames : {str(e)}")