Spaces:
Running
Running
# https://Og2-FoosballAnalytics.hf.space/ | |
from fastapi import FastAPI, File, Form, UploadFile, HTTPException | |
from pathlib import Path | |
import os | |
from pydantic import BaseModel | |
import json | |
import tensorflow as tf | |
import numpy as np | |
import cv2 | |
import keras | |
from keras.saving import register_keras_serializable | |
from keras import layers | |
from huggingface_hub import hf_hub_download | |
from keras.applications.densenet import DenseNet121 | |
from concurrent.futures import ThreadPoolExecutor | |
import asyncio | |
import pandas as pd | |
from typing import List | |
from huggingface_hub import HfApi | |
import requests | |
import io | |
import base64 | |
from PIL import Image | |
#from tensorflow_docs.vis import embed | |
app = FastAPI() | |
UPLOAD_DIR = "uploads" | |
os.makedirs(UPLOAD_DIR, exist_ok=True) | |
async def upload_file( | |
file: UploadFile = File(...), | |
chunkIndex: int = Form(...), | |
totalChunks: int = Form(...), | |
fileName: str = Form(...), | |
directory: str = Form(...), | |
): | |
try: | |
print(f"Received: chunkIndex={chunkIndex}, totalChunks={totalChunks}, fileName={fileName}, directory={directory}") | |
# Create the directory if it doesn't exist | |
target_dir = Path(UPLOAD_DIR) / directory | |
target_dir = target_dir.absolute() # Get the absolute path | |
target_dir.mkdir(parents=True, exist_ok=True) | |
# Save the chunk | |
chunk_path = target_dir / f"{fileName}.part{chunkIndex}" | |
with open(chunk_path, "wb") as f: | |
f.write(await file.read()) | |
# If it's the last chunk, reconstruct the file | |
if chunkIndex + 1 == totalChunks: | |
final_file_path = target_dir / fileName | |
with open(final_file_path, "wb") as final_file: | |
for i in range(totalChunks): | |
part_path = target_dir / f"{fileName}.part{i}" | |
with open(part_path, "rb") as part_file: | |
final_file.write(part_file.read()) | |
os.remove(part_path) # Remove the chunk after merging | |
print(f"Final file path: {final_file_path}") | |
# Lister tous les fichiers dans target_dir | |
files_in_dir = list(target_dir.glob("*")) # Liste tous les fichiers (y compris les sous-dossiers) | |
# Afficher les fichiers | |
for file in files_in_dir: | |
print(file) | |
return { | |
"status": "success", | |
"message": "Chunk uploaded successfully.", | |
"file_path": str(final_file_path) | |
} | |
return {"status": "success", "message": "Chunk uploaded successfully."} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Upload failed: {str(e)}") | |
# Available backend options are: "jax", "torch", "tensorflow". | |
os.environ["KERAS_BACKEND"] = "tensorflow" | |
# Charger le modèle Keras | |
MAX_SEQ_LENGTH = 8 | |
NUM_FEATURES = 1024 | |
IMG_SIZE = 128 | |
#center_crop_layer = layers.CenterCrop(IMG_SIZE, IMG_SIZE) | |
# Au lieu de CenterCrop | |
center_crop_layer = layers.Resizing(IMG_SIZE, IMG_SIZE) | |
def crop_center(frame): | |
cropped = center_crop_layer(frame[None, ...]) | |
cropped = keras.ops.convert_to_numpy(cropped) | |
cropped = keras.ops.squeeze(cropped) | |
return cropped | |
def build_feature_extractor(): | |
feature_extractor = DenseNet121( | |
weights="imagenet", | |
include_top=False, | |
pooling="avg", | |
input_shape=(IMG_SIZE, IMG_SIZE, 3), | |
) | |
preprocess_input = keras.applications.densenet.preprocess_input | |
inputs = keras.Input((IMG_SIZE, IMG_SIZE, 3)) | |
preprocessed = preprocess_input(inputs) | |
outputs = feature_extractor(preprocessed) | |
return keras.Model(inputs, outputs, name="feature_extractor") | |
feature_extractor = build_feature_extractor() | |
class PositionalEmbedding(layers.Layer): | |
def __init__(self, sequence_length, output_dim, **kwargs): | |
super().__init__(**kwargs) | |
self.position_embeddings = layers.Embedding( | |
input_dim=sequence_length, output_dim=output_dim | |
) | |
self.sequence_length = sequence_length | |
self.output_dim = output_dim | |
def build(self, input_shape): | |
self.position_embeddings.build(input_shape) | |
def call(self, inputs): | |
# The inputs are of shape: `(batch_size, frames, num_features)` | |
inputs = keras.ops.cast(inputs, self.compute_dtype) | |
length = keras.ops.shape(inputs)[1] | |
positions = keras.ops.arange(start=0, stop=length, step=1) | |
embedded_positions = self.position_embeddings(positions) | |
return inputs + embedded_positions | |
class TransformerEncoder(layers.Layer): | |
def __init__(self, embed_dim, dense_dim, num_heads, **kwargs): | |
super().__init__(**kwargs) | |
self.embed_dim = embed_dim | |
self.dense_dim = dense_dim | |
self.num_heads = num_heads | |
self.attention = layers.MultiHeadAttention( | |
num_heads=num_heads, key_dim=embed_dim, dropout=0.3 | |
) | |
self.dense_proj = keras.Sequential( | |
[ | |
layers.Dense(dense_dim, activation=keras.activations.gelu), | |
layers.Dense(embed_dim), | |
] | |
) | |
self.layernorm_1 = layers.LayerNormalization() | |
self.layernorm_2 = layers.LayerNormalization() | |
def call(self, inputs, mask=None): | |
attention_output = self.attention(inputs, inputs, attention_mask=mask) | |
proj_input = self.layernorm_1(inputs + attention_output) | |
proj_output = self.dense_proj(proj_input) | |
return self.layernorm_2(proj_input + proj_output) | |
#model = keras.saving.load_model("hf://Og2/videoclassif") | |
model = keras.saving.load_model("hf://Og2/videoclassif", custom_objects={'PositionalEmbedding': PositionalEmbedding, 'TransformerEncoder': TransformerEncoder}) | |
# Identifier le modèle Hugging Face et le fichier que vous voulez lire | |
model_repo = "Og2/videoclassif" # Remplacez par votre modèle spécifique | |
file_name = "labels.txt" # Le fichier que vous voulez télécharger | |
# Télécharger le fichier depuis Hugging Face Hub | |
labels_file_path = hf_hub_download(repo_id=model_repo, filename=file_name) | |
with open(labels_file_path, "r") as file: | |
class_labels = [line.strip() for line in file] # Lecture du fichier et création de la liste | |
#print("Tableau recréé à partir du fichier :") | |
#print(class_labels) | |
#read video | |
def load_video(path, max_frames=0, offload_to_cpu=False): | |
print("## load_video ##") | |
cap = cv2.VideoCapture(path) | |
frames = [] | |
try: | |
while True: | |
ret, frame = cap.read() | |
if not ret: | |
break | |
frame = frame[:, :, [2, 1, 0]] | |
frame = crop_center(frame) | |
if offload_to_cpu and keras.backend.backend() == "torch": | |
frame = frame.to("cpu") | |
frames.append(frame) | |
if len(frames) == max_frames: | |
break | |
finally: | |
cap.release() | |
print("load_video finalized !") | |
if offload_to_cpu and keras.backend.backend() == "torch": | |
return np.array([frame.to("cpu").numpy() for frame in frames]) | |
return np.array(frames) | |
# test on video from val dataset | |
def prepare_single_video(frames): | |
frame_features = np.zeros(shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32") | |
# Pad shorter videos. | |
if len(frames) < MAX_SEQ_LENGTH: | |
diff = MAX_SEQ_LENGTH - len(frames) | |
padding = np.zeros((diff, IMG_SIZE, IMG_SIZE, 3)) | |
frames = np.concatenate(frames, padding) | |
frames = frames[None, ...] | |
# Extract features from the frames of the current video. | |
for i, batch in enumerate(frames): | |
video_length = batch.shape[0] | |
length = min(MAX_SEQ_LENGTH, video_length) | |
for j in range(length): | |
if np.mean(batch[j, :]) > 0.0: | |
frame_features[i, j, :] = feature_extractor.predict(batch[None, j, :]) | |
else: | |
frame_features[i, j, :] = 0.0 | |
return frame_features | |
def predict_action(video): | |
print("##### to be cancellled #####") | |
frames = load_video(video, offload_to_cpu=True) | |
frame_features = prepare_single_video(frames) | |
probabilities = model.predict(frame_features)[0] | |
# Obtenir le top 5 | |
top_5_indices = np.argsort(probabilities)[::-1][:5] | |
results = {class_labels[i]: float(probabilities[i]) for i in top_5_indices} | |
#return results | |
# Sauvegarder le JSON dans un fichier temporaire | |
output_file = "result.json" | |
with open(output_file, "w") as f: | |
json.dump(results, f) | |
return results | |
# On va utiliser un ThreadPoolExecutor pour décharger les tâches lourdes | |
executor = ThreadPoolExecutor(max_workers=10) # Vous pouvez ajuster max_workers selon vos besoins | |
# Simulation de la fonction qui charge et prétraiterait la vidéo | |
async def predict_action(uuid: str): | |
# Renvoie immédiatement une réponse pour indiquer que le traitement a commencé | |
print("##### predict-action started #####") | |
# Définir le répertoire cible | |
target_dir = Path(UPLOAD_DIR) / uuid | |
target_dir = target_dir.absolute() # Get the absolute path | |
# Lister tous les fichiers dans target_dir | |
files_in_dir = list(target_dir.glob("*")) # Liste tous les fichiers (y compris les sous-dossiers) | |
# Afficher les fichiers | |
for file in files_in_dir: | |
print(file) | |
# Chercher le fichier vidéo dans le répertoire | |
video_extensions = {".mp4", ".avi", ".mkv", ".mov", ".flv", ".wmv", ".webm"} # Liste des extensions vidéo courantes | |
# Trouver le fichier vidéo (s'il n'y en a qu'un dans le répertoire) | |
video_files = [file for file in target_dir.iterdir() if file.suffix.lower() in video_extensions] | |
file_path = None | |
if len(video_files) == 1: | |
file_path = video_files[0] | |
print(f"Video file found: {file_path}") | |
elif len(video_files) > 1: | |
print("Several video file or multiple video files found in the directory.") | |
file_path = video_files[0] | |
asyncio.create_task(run_video_processing(file_path)) # Démarre la tâche asynchrone | |
return {"message": "Prediction started. Please check back later for results."} | |
async def run_video_processing(file_path: str): | |
# Cette fonction va utiliser l'exécuteur pour éviter de bloquer le thread principal | |
loop = asyncio.get_event_loop() | |
result = await loop.run_in_executor(executor, predict_video, file_path) | |
return result | |
def predict_video(video): | |
print("##### predict_video started #####") | |
# Charger les frames de la vidéo | |
frames = load_video(video, offload_to_cpu=True) | |
# Découper les frames en petits segments de 8 frames | |
segment_size = MAX_SEQ_LENGTH | |
total_frames = len(frames) | |
print("total_frames = ", total_frames) | |
segments = [] | |
for i in range(0, total_frames, segment_size): | |
# Découper un segment de 8 frames (ou moins si c'est la fin de la vidéo) | |
segment = frames[i:i+segment_size] | |
segments.append((i, segment)) # Conserver l'index du début du segment et le segment de frames | |
# Liste pour stocker les données des colonnes | |
data = [] | |
# Analyser chaque segment de 8 frames | |
for start_idx, segment in segments: | |
frame_features = prepare_single_video(segment) | |
probabilities = model.predict(frame_features)[0] | |
# Obtenir le top 5 des classes les plus probables | |
top_5_indices = np.argsort(probabilities)[::-1][:5] | |
top_5_classes = [(class_labels[i], probabilities[i]) for i in top_5_indices] | |
# Ajouter les informations sous forme de ligne | |
row = { | |
"start_frame": start_idx, | |
"end_frame": min(start_idx + segment_size - 1, total_frames - 1), # Assurer que la frame finale n'excède pas le nombre total de frames | |
} | |
# Ajouter les classes et leurs pourcentages | |
for rank, (label, prob) in enumerate(top_5_classes, start=1): | |
row[f"top{rank}"] = label | |
row[f"top{rank}%"] = prob | |
# Ajouter des valeurs vides si moins de 5 classes sont disponibles | |
for rank in range(len(top_5_classes) + 1, 6): | |
row[f"top{rank}"] = None | |
row[f"top{rank}%"] = None | |
data.append(row) | |
# Créer une DataFrame à partir des données | |
df = pd.DataFrame(data) | |
print("##### DataFrame created #####") | |
print(df) | |
results = ComputeStatistics(df) | |
return results | |
def ComputeStatistics(df): | |
# Calculer les statistiques supplémentaires | |
goalConceeded = df['top1'].str.startswith("Goal_2").sum() | |
totalShots1 = df['top1'].str.startswith("Shot_1").sum() | |
goal1_1 = df['top1'].str.startswith("Goal_1-3").sum() | |
goal1_2 = df['top1'].str.startswith("Goal_1-2").sum() | |
goal1_5 = df['top1'].str.startswith("Goal_1-5").sum() | |
save1 = (df['top1'] == "Block_2-1").sum() # Compter uniquement si top1 est exactement "Block_2-1" | |
# Statistiques supplémentaires | |
totalShots2 = df['top1'].str.startswith("Shot_2").sum() | |
totalGoal2 = df['top1'].str.startswith("Goal_2").sum() | |
totalGoal1 = df['top1'].str.startswith("Goal_1").sum() | |
totalBlock1 = (df['top1'] == "Block_1-1").sum() # Exact match pour "Block_1-1" | |
totalBlock2 = (df['top1'] == "Block_2-1").sum() # Exact match pour "Block_2-1" | |
# Calcul de la victoire | |
vistory = 1 if totalGoal1 > totalGoal2 else 2 | |
# Calcul des taux de sauvegarde | |
saveRate1 = totalBlock1 / (totalBlock1 + totalGoal2) if (totalBlock1 + totalGoal2) > 0 else 0 | |
saveRate2 = totalBlock2 / (totalBlock2 + totalGoal1) if (totalBlock2 + totalGoal1) > 0 else 0 | |
# Calculer le temps du premier Goal_1 | |
first_goal1_row = df[df['top1'].str.startswith("Goal_1")].iloc[0] if not df[df['top1'].str.startswith("Goal_1")].empty else None | |
timeFirstGoal1 = (1 / 30) * first_goal1_row['start_frame'] if first_goal1_row is not None and 'start_frame' in first_goal1_row else None | |
# Calculer le temps du premier Goal_2 | |
first_goal2_row = df[df['top1'].str.startswith("Goal_2")].iloc[0] if not df[df['top1'].str.startswith("Goal_2")].empty else None | |
timeFirstGoal2 = (1 / 30) * first_goal2_row['start_frame'] if first_goal2_row is not None and 'start_frame' in first_goal2_row else None | |
# Calculer le taux de conversion | |
convertionRate1 = totalGoal1 / totalShots1 if totalShots1 > 0 else 0 | |
# Statistiques Clean Sheet | |
cleanSheet1 = 1 if totalGoal2 > 0 else 0 | |
cleanSheet2 = 1 if totalGoal1 > 0 else 0 | |
# Créer un dictionnaire pour les statistiques | |
statistics = { | |
"goalConceeded": goalConceeded, | |
"totalShots1": totalShots1, | |
"goal1_1": goal1_1, | |
"goal1_2": goal1_2, | |
"goal1_5": goal1_5, | |
"save1": save1, | |
"timeFirstGoal1": timeFirstGoal1, | |
"timeFirstGoal2": timeFirstGoal2, | |
"convertionRate1": convertionRate1, | |
"totalShots2": totalShots2, | |
"totalGoal2": totalGoal2, | |
"totalGoal1": totalGoal1, | |
"totalBlock1": totalBlock1, | |
"totalBlock2": totalBlock2, | |
"vistory": vistory, | |
"saveRate1": saveRate1, | |
"saveRate2": saveRate2, | |
"cleanSheet1": cleanSheet1, | |
"cleanSheet2": cleanSheet2 | |
} | |
# Convertir les valeurs non compatibles en types natifs avant la sérialisation | |
for key, value in statistics.items(): | |
if isinstance(value, (np.integer, np.floating)): # Si NumPy | |
statistics[key] = value.item() | |
elif isinstance(value, pd.Timestamp): # Si c'est un Timestamp | |
statistics[key] = value.isoformat() | |
# Générer un JSON à partir des statistiques | |
statistics_json = json.dumps(statistics, indent=4) | |
print("##### Statistics JSON #####") | |
print(statistics_json) | |
return statistics_json | |
UPLOAD_DIR = Path("/app/uploads") # Dossier temporaire pour stocker les chunks | |
HF_TOKEN = os.getenv('HF_TOKEN') # 🔥 Remplace par ton token Hugging Face | |
DATASET_REPO = "Og2/myDataSet" # 🔥 Remplace par ton dataset | |
api = HfApi() | |
async def upload_file( | |
file: UploadFile = File(...), | |
chunkIndex: int = Form(...), | |
totalChunks: int = Form(...), | |
fileName: str = Form(...), | |
directory: str = Form(...), | |
): | |
try: | |
print(f"Received: chunkIndex={chunkIndex}, totalChunks={totalChunks}, fileName={fileName}, directory={directory}") | |
# Créer le dossier temporaire si nécessaire | |
target_dir = UPLOAD_DIR / directory | |
target_dir.mkdir(parents=True, exist_ok=True) | |
# Sauvegarder le chunk | |
chunk_path = target_dir / f"{fileName}.part{chunkIndex}" | |
with open(chunk_path, "wb") as f: | |
f.write(await file.read()) | |
# Reconstruction si dernier chunk reçu | |
if chunkIndex + 1 == totalChunks: | |
final_file_path = target_dir / fileName | |
with open(final_file_path, "wb") as final_file: | |
for i in range(totalChunks): | |
part_path = target_dir / f"{fileName}.part{i}" | |
with open(part_path, "rb") as part_file: | |
final_file.write(part_file.read()) | |
os.remove(part_path) # Supprimer les chunks après fusion | |
print(f"Final file created: {final_file_path}") | |
# 🔥 Upload vers Hugging Face | |
api.upload_file( | |
path_or_fileobj=str(final_file_path), | |
path_in_repo=f"{directory}/{fileName}", # Stocker dans un sous-dossier du dataset | |
repo_id=DATASET_REPO, | |
repo_type="dataset", | |
token=HF_TOKEN, | |
) | |
# Supprimer le fichier local après upload | |
os.remove(final_file_path) | |
return { | |
"status": "success", | |
"message": "File uploaded successfully to Hugging Face.", | |
"hf_url": f"https://huggingface.co/datasets/{DATASET_REPO}/blob/main/{directory}/{fileName}" | |
} | |
return {"status": "success", "message": "Chunk uploaded successfully."} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Upload failed: {str(e)}") | |
async def list_videos(): | |
try: | |
# Récupérer la liste des fichiers du dataset | |
files = api.list_repo_files(repo_id=DATASET_REPO, repo_type="dataset", token=HF_TOKEN) | |
# Filtrer les fichiers pour ne garder que les vidéos (par exemple .mp4, .avi, .mov) | |
video_extensions = [".mp4", ".avi", ".mov", ".mkv", ".flv"] # Ajouter d'autres extensions si nécessaire | |
video_files = [f for f in files if any(f.endswith(ext) for ext in video_extensions)] | |
# Formater en JSON avec URLs complètes | |
videos_list = [{"file_name": f, "url": f"https://huggingface.co/datasets/{DATASET_REPO}/blob/main/{f}"} for f in video_files] | |
return {"status": "success", "videos": videos_list} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Failed to fetch videos: {str(e)}") | |
async def get_video_frames(file_name: str, frame_id: int) -> dict: | |
try: | |
# URL du fichier vidéo dans le dataset | |
video_url = f"https://huggingface.co/datasets/{DATASET_REPO}/resolve/main/{file_name}" | |
# Télécharger la vidéo | |
headers = {"Authorization": f"Bearer {HF_TOKEN}"} | |
response = requests.get(video_url, headers=headers) | |
if response.status_code != 200: | |
raise HTTPException(status_code=404, detail="Vidéo introuvable dans le dataset") | |
# Charger la vidéo en mémoire | |
video_bytes = io.BytesIO(response.content) | |
# Écriture dans un fichier temporaire pour OpenCV | |
temp_video_path = "/tmp/temp_video.mp4" | |
with open(temp_video_path, "wb") as f: | |
f.write(video_bytes.getvalue()) | |
# Ouvrir la vidéo avec OpenCV | |
cap = cv2.VideoCapture(temp_video_path) | |
if not cap.isOpened(): | |
raise HTTPException(status_code=500, detail="Impossible de charger la vidéo") | |
# Obtenir le nombre total de frames dans la vidéo | |
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
# Définir la plage de frames à extraire | |
start_frame = max(0, frame_id ) | |
end_frame = min(total_frames, frame_id + 30) # extraction de 12 framees | |
frames = [] | |
frame_size = (128, 128) # Taille des images pour Bubble | |
# Lire les frames dans la plage définie | |
for i in range(start_frame, end_frame): | |
cap.set(cv2.CAP_PROP_POS_FRAMES, i) | |
ret, frame = cap.read() | |
if not ret: | |
break # Arrêter si la lecture échoue | |
# Convertir la frame BGR en RGB | |
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
# Redimensionner la frame | |
img = Image.fromarray(frame_rgb).resize(frame_size) | |
# Sauvegarder l'image dans un buffer | |
img_byte_arr = io.BytesIO() | |
img.save(img_byte_arr, format="PNG") | |
img_byte_arr.seek(0) | |
# Encoder l'image en base64 avec le préfixe Bubble | |
img_base64 = f"data:image/png;base64,{base64.b64encode(img_byte_arr.getvalue()).decode('utf-8')}" | |
# Ajouter à la liste des frames | |
frames.append({"frame_index": i, "image": img_base64}) | |
cap.release() | |
return {"status": "success", "frames": frames} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Erreur lors de l'extraction des frames : {str(e)}") | |