import gradio as gr import cv2 import multiprocessing import os import requests from refacer import Refacer # Hugging Face URL to download the model model_url = "https://huggingface.co/ofter/4x-UltraSharp/resolve/main/inswapper_128.onnx" model_path = "./inswapper_128.onnx" # Function to download the model def download_model(): if not os.path.exists(model_path): print("Downloading inswapper_128.onnx...") response = requests.get(model_url) if response.status_code == 200: with open(model_path, 'wb') as f: f.write(response.content) print("Model downloaded successfully!") else: raise Exception(f"Failed to download the model. Status code: {response.status_code}") else: print("Model already exists.") # Download the model when the script runs download_model() # Initialize Refacer class (force CPU mode) refacer = Refacer(force_cpu=True) # Dummy function to simulate frame-level processing def process_frame(frame, origin_face, destination_face, threshold): # Simulate face swapping or any processing needed result_frame = refacer.reface(frame, [{ 'origin': origin_face, 'destination': destination_face, 'threshold': threshold }]) return result_frame # Function to process the video in parallel using multiprocessing def process_video(video_path, origins, destinations, thresholds, max_processes=2): cap = cv2.VideoCapture(video_path) frames = [] # Read all frames from the video while cap.isOpened(): ret, frame = cap.read() if not ret: break frames.append(frame) cap.release() # Parallel processing of frames with limited processes (for CPU optimization) with multiprocessing.Pool(processes=max_processes) as pool: processed_frames = pool.starmap(process_frame, [ (frame, origins[min(i, len(origins) - 1)], destinations[min(i, len(destinations) - 1)], thresholds[min(i, len(thresholds) - 1)]) for i, frame in enumerate(frames) ]) # Saving the processed frames back into a video output_video_path = "processed_video.mp4" fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Compression using mp4 codec out = cv2.VideoWriter(output_video_path, fourcc, 30.0, (640, 360)) # Reduce resolution to speed up processing for frame in processed_frames: out.write(frame) out.release() return output_video_path # Gradio Interface function def run(video_path, *vars): # Split the inputs into origins, destinations, and thresholds based on num_faces num_faces = 5 # You can adjust this based on your UI origins = vars[:num_faces] destinations = vars[num_faces:2*num_faces] thresholds = vars[2*num_faces:] # Ensure there are no index errors by limiting the number of inputs if len(origins) != num_faces or len(destinations) != num_faces or len(thresholds) != num_faces: return "Please provide input for all faces." refaced_video_path = process_video(video_path, origins, destinations, thresholds) print(f"Refaced video can be found at {refaced_video_path}") return refaced_video_path # Prepare Gradio components origin = [] destination = [] thresholds = [] with gr.Blocks() as demo: with gr.Row(): gr.Markdown("# Refacer") with gr.Row(): video_input = gr.Video(label="Original video", format="mp4") video_output = gr.Video(label="Refaced video", interactive=False, format="mp4") for i in range(5): # Set max faces to 5 with gr.Tab(f"Face #{i+1}"): with gr.Row(): origin.append(gr.Image(label="Face to replace")) destination.append(gr.Image(label="Destination face")) with gr.Row(): thresholds.append(gr.Slider(label="Threshold", minimum=0.0, maximum=1.0, value=0.2)) with gr.Row(): button = gr.Button("Reface", variant="primary") button.click(fn=run, inputs=[video_input] + origin + destination + thresholds, outputs=[video_output]) # Launch the Gradio app demo.launch(show_error=True, server_name="0.0.0.0", server_port=7860)