import os import cv2 import torch import zipfile import librosa import numpy as np import tensorflow_addons import tensorflow as tf from facenet_pytorch import MTCNN from rawnet import RawNet #Set random seed for reproducibility. tf.random.set_seed(42) local_zip = "./efficientnet-b0.zip" zip_ref = zipfile.ZipFile(local_zip, 'r') zip_ref.extractall() zip_ref.close() # Load models. model = tf.keras.models.load_model("efficientnet-b0/") class DetectionPipeline: """Pipeline class for detecting faces in the frames of a video file.""" def __init__(self, n_frames=None, batch_size=60, resize=None, input_modality = 'video'): """Constructor for DetectionPipeline class. Keyword Arguments: n_frames {int} -- Total number of frames to load. These will be evenly spaced throughout the video. If not specified (i.e., None), all frames will be loaded. (default: {None}) batch_size {int} -- Batch size to use with MTCNN face detector. (default: {32}) resize {float} -- Fraction by which to resize frames from original prior to face detection. A value less than 1 results in downsampling and a value greater than 1 result in upsampling. (default: {None}) """ self.n_frames = n_frames self.batch_size = batch_size self.resize = resize self.input_modality = input_modality def __call__(self, filename): """Load frames from an MP4 video and detect faces. Arguments: filename {str} -- Path to video. """ # Create video reader and find length if self.input_modality == 'video': print('Input modality is video.') v_cap = cv2.VideoCapture(filename) v_len = int(v_cap.get(cv2.CAP_PROP_FRAME_COUNT)) # Pick 'n_frames' evenly spaced frames to sample if self.n_frames is None: sample = np.arange(0, v_len) else: sample = np.linspace(0, v_len - 1, self.n_frames).astype(int) # Loop through frames faces = [] frames = [] for j in range(v_len): success = v_cap.grab() if j in sample: # Load frame success, frame = v_cap.retrieve() if not success: continue frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # Resize frame to desired size if self.resize is not None: frame = frame.resize([int(d * self.resize) for d in frame.size]) frames.append(frame) # When batch is full, detect faces and reset frame list if len(frames) % self.batch_size == 0 or j == sample[-1]: face2 = cv2.resize(frame, (224, 224)) faces.append(face2) v_cap.release() return faces elif self.input_modality == 'image': print('Input modality is image.') #Perform inference for image modality. print('Reading image') # print(f"Image path is: {filename}") image = cv2.cvtColor(filename, cv2.COLOR_BGR2RGB) image = cv2.resize(image, (224, 224)) # if not face.any(): # print("No faces found...") return image elif self.input_modality == 'audio': print("INput modality is audio.") #Load audio. x, sr = librosa.load(filename) x_pt = torch.Tensor(x) x_pt = torch.unsqueeze(x_pt, dim = 0) return x_pt else: raise ValueError("Invalid input modality. Must be either 'video' or image") detection_video_pipeline = DetectionPipeline(n_frames=5, batch_size=1, input_modality='video') detection_image_pipeline = DetectionPipeline(batch_size = 1, input_modality = 'image') def deepfakes_video_predict(input_video): faces = detection_video_pipeline(input_video) total = 0 real_res = [] fake_res = [] for face in faces: face2 = face/255 pred = model.predict(np.expand_dims(face2, axis=0))[0] real, fake = pred[0], pred[1] real_res.append(real) fake_res.append(fake) total+=1 pred2 = pred[1] if pred2 > 0.5: fake+=1 else: real+=1 real_mean = np.mean(real_res) fake_mean = np.mean(fake_res) print(f"Real Faces: {real_mean}") print(f"Fake Faces: {fake_mean}") text = "" if real_mean >= 0.6: text = "The video is REAL. \n Deepfakes Confidence: " + str(round(100 - (real_mean*100), 3)) + "%" else: text = "The video is FAKE. \n Deepfakes Confidence: " + str(round(fake_mean*100, 3)) + "%" return text def deepfakes_image_predict(input_image): faces = detection_image_pipeline(input_image) face2 = faces/255 pred = model.predict(np.expand_dims(face2, axis = 0))[0] real, fake = pred[0], pred[1] if real > 0.5: text2 = "The image is REAL. \n Deepfakes Confidence: " + str(round(100 - (real*100), 3)) + "%" else: text2 = "The image is FAKE. \n Deepfakes Confidence: " + str(round(fake*100, 3)) + "%" return text2 def load_audio_model(): d_args = { "nb_samp": 64600, "first_conv": 1024, "in_channels": 1, "filts": [20, [20, 20], [20, 128], [128, 128]], "blocks": [2, 4], "nb_fc_node": 1024, "gru_node": 1024, "nb_gru_layer": 3, "nb_classes": 2} model = RawNet(d_args = d_args, device='cpu') #Load ckpt. model_dict = model.state_dict() ckpt = torch.load('RawNet2.pth', map_location=torch.device('cpu')) model.load_state_dict(ckpt, model_dict) return model audio_label_map = { 0: "Real audio", 1: "Fake audio" } def deepfakes_audio_predict(input_audio): #Perform inference on audio. x, sr = input_audio x_pt = torch.Tensor(x) x_pt = torch.unsqueeze(x_pt, dim = 0) #Load model. model = load_audio_model() #Perform inference. grads = model(x_pt) #Get the argmax. grads_np = grads.detach().numpy() result = np.argmax(grads_np) return audio_label_map[result]