Spaces:
Running
Running
import gradio as gr | |
import cv2 | |
import numpy as np | |
import tensorflow as tf | |
import cv2 | |
import json | |
import numpy as np | |
from concurrent.futures import ThreadPoolExecutor | |
import dlib | |
import tempfile | |
import shutil | |
import os | |
from tensorflow.keras.applications.inception_v3 import preprocess_input | |
model_path = 'deepfake_detection_model.h5' | |
model = tf.keras.models.load_model(model_path) | |
IMG_SIZE = (299, 299) | |
MOTION_THRESHOLD = 20 | |
FRAME_SKIP = 2 | |
no_of_frames = 10 | |
MAX_FRAMES=no_of_frames | |
detector = dlib.get_frontal_face_detector() | |
def extract_faces_from_frame(frame, detector): | |
""" | |
Detects faces in a frame and returns the resized faces. | |
Parameters: | |
- frame: The video frame to process. | |
- detector: Dlib face detector. | |
Returns: | |
- resized_faces (list): List of resized faces detected in the frame. | |
""" | |
gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
faces = detector(gray_frame) | |
resized_faces = [] | |
for face in faces: | |
x1, y1, x2, y2 = face.left(), face.top(), face.right(), face.bottom() | |
crop_img = frame[y1:y2, x1:x2] | |
if crop_img.size != 0: | |
resized_face = cv2.resize(crop_img, IMG_SIZE) | |
resized_faces.append(resized_face) | |
# Debug: Log the number of faces detected | |
#print(f"Detected {len(resized_faces)} faces in current frame") | |
return resized_faces | |
def process_frame(video_path, detector, frame_skip): | |
""" | |
Processes frames to extract motion and face data concurrently. | |
Parameters: | |
- cap: OpenCV VideoCapture object. | |
- detector: Dlib face detector. | |
- frame_skip (int): Number of frames to skip for processing. | |
Returns: | |
- motion_frames (list): List of motion-based face images. | |
- all_faces (list): List of all detected faces for fallback. | |
""" | |
prev_frame = None | |
frame_count = 0 | |
motion_frames = [] | |
all_faces = [] | |
cap = cv2.VideoCapture(video_path) | |
while cap.isOpened(): | |
ret, frame = cap.read() | |
if not ret: | |
break | |
# Skip frames to improve processing speed | |
if frame_count % frame_skip != 0: | |
frame_count += 1 | |
continue | |
# Debug: Log frame number being processed | |
#print(f"Processing frame {frame_count}") | |
# # Resize frame to reduce processing time (optional, adjust size as needed) | |
# frame = cv2.resize(frame, (640, 360)) | |
# Extract faces from the current frame | |
faces = extract_faces_from_frame(frame, detector) | |
all_faces.extend(faces) # Store all faces detected, including non-motion | |
gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
if prev_frame is None: | |
prev_frame = gray_frame | |
frame_count += 1 | |
continue | |
# Calculate frame difference to detect motion | |
frame_diff = cv2.absdiff(prev_frame, gray_frame) | |
motion_score = np.sum(frame_diff) | |
# Debug: Log the motion score | |
#print(f"Motion score: {motion_score}") | |
# Check if motion is above the defined threshold and add the face to motion frames | |
if motion_score > MOTION_THRESHOLD and faces: | |
motion_frames.extend(faces) | |
prev_frame = gray_frame | |
frame_count += 1 | |
cap.release() | |
return motion_frames, all_faces | |
def select_well_distributed_frames(motion_frames, all_faces, no_of_frames): | |
""" | |
Selects well-distributed frames from the detected motion and fallback faces. | |
Parameters: | |
- motion_frames (list): List of frames with detected motion. | |
- all_faces (list): List of all detected faces. | |
- no_of_frames (int): Required number of frames. | |
Returns: | |
- final_frames (list): List of selected frames. | |
""" | |
# Case 1: Motion frames exceed the required number | |
if len(motion_frames) >= no_of_frames: | |
interval = len(motion_frames) // no_of_frames | |
distributed_motion_frames = [motion_frames[i * interval] for i in range(no_of_frames)] | |
return distributed_motion_frames | |
# Case 2: Motion frames are less than the required number | |
needed_frames = no_of_frames - len(motion_frames) | |
# If all frames together are still less than needed, return all frames available | |
if len(motion_frames) + len(all_faces) < no_of_frames: | |
#print(f"Returning all available frames: {len(motion_frames) + len(all_faces)}") | |
return motion_frames + all_faces | |
interval = max(1, len(all_faces) // needed_frames) | |
additional_faces = [all_faces[i * interval] for i in range(needed_frames)] | |
combined_frames = motion_frames + additional_faces | |
interval = max(1, len(combined_frames) // no_of_frames) | |
final_frames = [combined_frames[i * interval] for i in range(no_of_frames)] | |
return final_frames | |
def extract_frames(no_of_frames, video_path): | |
motion_frames, all_faces = process_frame(video_path, detector, FRAME_SKIP) | |
final_frames = select_well_distributed_frames(motion_frames, all_faces, no_of_frames) | |
return final_frames | |
def predict_video(model, video_path): | |
""" | |
Predict if a video is REAL or FAKE using the trained model. | |
Parameters: | |
- model: The loaded deepfake detection model. | |
- video_path: Path to the video file to be processed. | |
Returns: | |
- str: 'REAL' or 'FAKE' based on the model's prediction. | |
""" | |
# Extract frames from the video | |
frames = extract_frames(no_of_frames, video_path) | |
original_frames = frames | |
# Convert the frames list to a 5D tensor (1, time_steps, height, width, channels) | |
if len(frames) < MAX_FRAMES: | |
# Pad with zero arrays to match MAX_FRAMES | |
while len(frames) < MAX_FRAMES: | |
frames.append(np.zeros((299, 299, 3), dtype=np.float32)) | |
frames = frames[:MAX_FRAMES] | |
frames = np.array(frames) | |
frames = preprocess_input(frames) | |
# Expand dims to fit the model input shape | |
input_data = np.expand_dims(frames, axis=0) # Shape becomes (1, MAX_FRAMES, 299, 299, 3) | |
# Predict using the model | |
prediction = model.predict(input_data) | |
probability = prediction[0][0] # Get the probability for the first (and only) sample | |
# Convert probability to class label | |
if probability >=0.6: | |
predicted_label='FAKE' | |
else: | |
predicted_label = 'REAL' | |
probability=1-probability | |
return original_frames, predicted_label, probability | |
def display_frames_and_prediction(video_file): | |
# Ensure file is completely uploaded and of correct type | |
if video_file is None: | |
return [], "<div style='color: red;'>No video uploaded!</div>", "" | |
# Check file size (10 MB limit) | |
if os.path.getsize(video_file) > 10 * 1024 * 1024: # 10 MB | |
return [], "<div style='color: red;'>File size exceeds 10 MB limit!</div>", "" | |
# Check file extension | |
if not video_file.endswith('.mp4'): | |
return [], "<div style='color: red;'>Only .mp4 files are allowed!</div>", "" | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as temp_file: | |
temp_file_path = temp_file.name | |
with open(video_file, 'rb') as src_file: | |
with open(temp_file_path, 'wb') as dest_file: | |
shutil.copyfileobj(src_file, dest_file) | |
frames, predicted_label, confidence = predict_video(model, temp_file_path) | |
os.remove(temp_file_path) | |
confidence_text = f"Confidence: {confidence:.2%}" | |
prediction_style = ( | |
f"<div style='color: {'green' if predicted_label == 'REAL' else 'red'}; " | |
"text-align: center; font-size: 24px; font-weight: bold; " | |
"border: 2px solid; padding: 10px; border-radius: 5px;'>" | |
f"{predicted_label}</div>" | |
) | |
return frames, prediction_style, confidence_text | |
iface = gr.Interface( | |
fn=display_frames_and_prediction, | |
inputs=gr.File(label="Upload Video", interactive=True), | |
outputs=[ | |
gr.Gallery(label="Extracted Frames"), | |
gr.HTML(label="Prediction"), | |
gr.Textbox(label="Confidence", interactive=False) | |
], | |
title="Deepfake Detection", | |
description="Upload a video to determine if it is REAL or FAKE based on the deepfake detection model.", | |
css="app.css", | |
examples=[ | |
["examples/abarnvbtwb.mp4"], | |
["examples/aapnvogymq.mp4"], | |
], | |
cache_examples="lazy" | |
) | |
iface.launch() |