import gradio as gr import cv2 import torch import numpy as np import os from control_cogvideox.cogvideox_transformer_3d import CogVideoXTransformer3DModel from control_cogvideox.controlnet_cogvideox_transformer_3d import ControlCogVideoXTransformer3DModel from pipeline_cogvideox_controlnet_5b_i2v_instruction2 import ControlCogVideoXPipeline from diffusers.utils import export_to_video from diffusers import AutoencoderKLCogVideoX from transformers import T5EncoderModel, T5Tokenizer from diffusers.schedulers import CogVideoXDDIMScheduler from omegaconf import OmegaConf from transformers import T5EncoderModel from einops import rearrange import decord from typing import List from tqdm import tqdm import PIL import torch.nn.functional as F from torchvision import transforms import spaces from huggingface_hub import snapshot_download import time DEVICE = "cuda" if torch.cuda.is_available() else "cpu" def download_model(): snapshot_download(repo_id="THUDM/CogVideoX-5b-I2V", local_dir="./cogvideox-5b-i2v") print("Download completed") def download_model_senorita(): snapshot_download(repo_id="PengWeixuanSZU/Senorita-2M", local_dir="./senorita-2m") print("Download completed") download_model() download_model_senorita() print("Download successfully!") def get_prompt(file:str): with open(file,'r') as f: a=f.readlines() return a #a[0]:positive prompt, a[1] negative prompt def unwarp_model(state_dict): new_state_dict = {} for key in state_dict: new_state_dict[key.split('module.')[1]] = state_dict[key] return new_state_dict def init_pipe(): i2v=True if i2v: key = "i2v" else: key = "t2v" noise_scheduler = CogVideoXDDIMScheduler( **OmegaConf.to_container( OmegaConf.load(f"./cogvideox-5b-{key}/scheduler/scheduler_config.json") ) ) text_encoder = T5EncoderModel.from_pretrained(f"./cogvideox-5b-{key}/", subfolder="text_encoder", torch_dtype=torch.float16) vae = AutoencoderKLCogVideoX.from_pretrained(f"./cogvideox-5b-{key}/", subfolder="vae", torch_dtype=torch.float16) tokenizer = T5Tokenizer.from_pretrained(f"./cogvideox-5b-{key}/tokenizer", torch_dtype=torch.float16) config = OmegaConf.to_container( OmegaConf.load(f"./cogvideox-5b-{key}/transformer/config.json") ) if i2v: config["in_channels"] = 32 else: config["in_channels"] = 16 transformer = CogVideoXTransformer3DModel(**config) control_config = OmegaConf.to_container( OmegaConf.load(f"./cogvideox-5b-{key}/transformer/config.json") ) if i2v: control_config["in_channels"] = 32 else: control_config["in_channels"] = 16 control_config['num_layers'] = 6 control_config['control_in_channels'] = 16 controlnet_transformer = ControlCogVideoXTransformer3DModel(**control_config) all_state_dicts = torch.load("./senorita-2m/models_half/ff_controlnet_half.pth", map_location="cpu",weights_only=True) transformer_state_dict = unwarp_model(all_state_dicts["transformer_state_dict"]) controlnet_transformer_state_dict = unwarp_model(all_state_dicts["controlnet_transformer_state_dict"]) transformer.load_state_dict(transformer_state_dict, strict=True) controlnet_transformer.load_state_dict(controlnet_transformer_state_dict, strict=True) transformer = transformer.half() controlnet_transformer = controlnet_transformer.half() vae = vae.eval() text_encoder = text_encoder.eval() transformer = transformer.eval() controlnet_transformer = controlnet_transformer.eval() pipe = ControlCogVideoXPipeline(tokenizer, text_encoder, vae, transformer, noise_scheduler, controlnet_transformer, ) pipe.vae.enable_slicing() pipe.vae.enable_tiling() pipe.enable_model_cpu_offload() return pipe def inference(source_images, target_images, text_prompt, negative_prompt, pipe, vae, guidance_scale, h, w, random_seed)->List[PIL.Image.Image]: torch.manual_seed(random_seed) pipe.vae.to(DEVICE) pipe.transformer.to(DEVICE) pipe.controlnet_transformer.to(DEVICE) source_pixel_values = source_images/127.5 - 1.0 source_pixel_values = source_pixel_values.to(torch.float16).to(DEVICE) if target_images is not None: target_pixel_values = target_images/127.5 - 1.0 target_pixel_values = target_pixel_values.to(torch.float16).to(DEVICE) bsz,f,h,w,c = source_pixel_values.shape with torch.no_grad(): source_pixel_values = rearrange(source_pixel_values, "b f w h c -> b c f w h") source_latents = vae.encode(source_pixel_values).latent_dist.sample() source_latents = source_latents.to(torch.float16) source_latents = source_latents * vae.config.scaling_factor source_latents = rearrange(source_latents, "b c f h w -> b f c h w") if target_images is not None: target_pixel_values = rearrange(target_pixel_values, "b f w h c -> b c f w h") images = target_pixel_values[:,:,:1,...] image_latents = vae.encode(images).latent_dist.sample() image_latents = image_latents.to(torch.float16) image_latents = image_latents * vae.config.scaling_factor image_latents = rearrange(image_latents, "b c f h w -> b f c h w") image_latents = torch.cat([image_latents, torch.zeros_like(source_latents)[:,1:]],dim=1) latents = torch.cat([image_latents, source_latents], dim=2) else: image_latents = None latents = source_latents a=time.perf_counter() video = pipe( prompt = text_prompt, negative_prompt = negative_prompt, video_condition = source_latents, # input to controlnet video_condition2 = image_latents, # concat with latents height = h, width = w, num_frames = f, num_inference_steps = 20, interval = 6, guidance_scale = guidance_scale, generator = torch.Generator(device=DEVICE).manual_seed(random_seed) ).frames[0] b=time.perf_counter() print(f"Denoise 5 steps in {b-a}s") return video @spaces.GPU(duration=300) def process_video(video_file, image_file, positive_prompt, negative_prompt, guidance, random_seed, choice, progress=gr.Progress(track_tqdm=True))->str: if choice==33: video_shard=1 elif choice==65: video_shard=2 pipe=PIPE h = 448 w = 768 frames_per_shard=33 #get image image = cv2.imread(image_file) resized_image = cv2.resize(image, (768, 448)) resized_image = cv2.cvtColor(resized_image, cv2.COLOR_BGR2RGB) image=torch.from_numpy(resized_image) #get mp4 vr = decord.VideoReader(video_file) frames = vr.get_batch(list(range(33))).asnumpy() _,src_h,src_w,_=frames.shape resized_frames = [cv2.resize(frame, (768, 448)) for frame in frames] images=torch.from_numpy(np.array(resized_frames)) target_path="outputvideo" source_images = images[None,...] target_images = image[None,None,...] video:List[PIL.Image.Image]=[] for i in progress.tqdm(range(video_shard)): if i>0: #first frame guidence first_frame=transforms.ToTensor()(video[-1]) first_frame = first_frame*255.0 first_frame = rearrange(first_frame,"c w h -> w h c") source_images=source_images target_images=first_frame[None,None,...] video+=inference(source_images, \ target_images, positive_prompt, \ negative_prompt, pipe, pipe.vae, \ guidance, \ h, w, random_seed) i+=1 video=[image.resize((int(src_w/src_h*448),448))for image in video] os.makedirs(f"./{target_path}", exist_ok=True) output_path:str=f"./{target_path}/output_{video_file[-5]}.mp4" export_to_video(video, output_path, fps=8) return output_path PIPE=init_pipe() with gr.Blocks() as demo: gr.Markdown( """ # Señorita-2M: A High-Quality Instruction-based Dataset for General Video Editing by Video Specialists [Paper](https://arxiv.org/abs/2502.06734) | [Code](https://github.com/zibojia/SENORITA) | [Huggingface](https://huggingface.co/datasets/SENORITADATASET/Senorita) This is the official implementation of Señorita. The original model requires 50 denoising steps to generate a video. However, due to GPU usage limitations on Hugging Face Spaces, we have reduced the number of denoising steps to 20, which takes about 240s to generate one video. As a result, the performance may be slightly affected. Thank you for your understanding! This UI is made by [PengWeixuanSZU](https://huggingface.co/PengWeixuanSZU). """ ) with gr.Row(): video_input = gr.Video(label="Video input") image_input = gr.Image(type="filepath", label="First frame guidence") with gr.Row(): with gr.Column(): positive_prompt = gr.Textbox(label="Positive prompt",value="") negative_prompt = gr.Textbox(label="Negative prompt",value="") seed = gr.Slider(minimum=0, maximum=2147483647, step=1, value=0, label="Seed") guidance_slider = gr.Slider(minimum=1, maximum=10, value=4, label="Guidance") choice=gr.Radio(choices=[33,65],label="Frame number",value=33) with gr.Column(): video_output = gr.Video(label="Video output") with gr.Row(): submit_button = gr.Button("Generate") submit_button.click(fn=process_video, inputs=[video_input, image_input, positive_prompt, negative_prompt, guidance_slider, seed, choice], outputs=video_output) with gr.Row(): gr.Examples( [ ["assets/0.mp4","assets/0_edit.png",get_prompt("assets/0.txt")[0],get_prompt("assets/0.txt")[1],4,0,33], ["assets/1.mp4","assets/1_edit.png",get_prompt("assets/1.txt")[0],get_prompt("assets/1.txt")[1],4,0,33], ["assets/2.mp4","assets/2_edit.png",get_prompt("assets/2.txt")[0],get_prompt("assets/2.txt")[1],4,0,33], ["assets/3.mp4","assets/3_edit.png",get_prompt("assets/3.txt")[0],get_prompt("assets/3.txt")[1],4,0,33], ["assets/4.mp4","assets/4_edit.png",get_prompt("assets/4.txt")[0],get_prompt("assets/4.txt")[1],4,0,33], ["assets/5.mp4","assets/5_edit.png",get_prompt("assets/5.txt")[0],get_prompt("assets/5.txt")[1],4,0,33], ["assets/6.mp4","assets/6_edit.png",get_prompt("assets/6.txt")[0],get_prompt("assets/6.txt")[1],4,0,33], ["assets/7.mp4","assets/7_edit.png",get_prompt("assets/7.txt")[0],get_prompt("assets/7.txt")[1],4,0,33], ["assets/8.mp4","assets/8_edit.png",get_prompt("assets/8.txt")[0],get_prompt("assets/8.txt")[1],4,0,33] ], inputs=[video_input, image_input, positive_prompt, negative_prompt, guidance_slider, seed, choice], outputs=video_output, fn=process_video, cache_examples=False ) demo.queue().launch()