MEMO / hf_gradio_app.py
fffiloni's picture
Update hf_gradio_app.py
c6e84c6 verified
import os, random, time
import uuid
import tempfile, shutil
from pydub import AudioSegment
import gradio as gr
from huggingface_hub import snapshot_download
# Download models
os.makedirs("checkpoints", exist_ok=True)
# List of subdirectories to create inside "checkpoints"
subfolders = [
"vae",
"wav2vec2",
"emotion2vec_plus_large"
]
# Create each subdirectory
for subfolder in subfolders:
os.makedirs(os.path.join("checkpoints", subfolder), exist_ok=True)
snapshot_download(
repo_id = "memoavatar/memo",
local_dir = "./checkpoints"
)
snapshot_download(
repo_id = "stabilityai/sd-vae-ft-mse",
local_dir = "./checkpoints/vae"
)
snapshot_download(
repo_id = "facebook/wav2vec2-base-960h",
local_dir = "./checkpoints/wav2vec2"
)
snapshot_download(
repo_id = "emotion2vec/emotion2vec_plus_large",
local_dir = "./checkpoints/emotion2vec_plus_large"
)
import torch
from diffusers import AutoencoderKL, FlowMatchEulerDiscreteScheduler
from tqdm import tqdm
from memo.models.audio_proj import AudioProjModel
from memo.models.image_proj import ImageProjModel
from memo.models.unet_2d_condition import UNet2DConditionModel
from memo.models.unet_3d import UNet3DConditionModel
from memo.pipelines.video_pipeline import VideoPipeline
from memo.utils.audio_utils import extract_audio_emotion_labels, preprocess_audio, resample_audio
from memo.utils.vision_utils import preprocess_image, tensor_to_video
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
weight_dtype = torch.bfloat16
with torch.inference_mode():
vae = AutoencoderKL.from_pretrained("./checkpoints/vae").to(device=device, dtype=weight_dtype)
reference_net = UNet2DConditionModel.from_pretrained("./checkpoints", subfolder="reference_net", use_safetensors=True)
diffusion_net = UNet3DConditionModel.from_pretrained("./checkpoints", subfolder="diffusion_net", use_safetensors=True)
image_proj = ImageProjModel.from_pretrained("./checkpoints", subfolder="image_proj", use_safetensors=True)
audio_proj = AudioProjModel.from_pretrained("./checkpoints", subfolder="audio_proj", use_safetensors=True)
vae.requires_grad_(False).eval()
reference_net.requires_grad_(False).eval()
diffusion_net.requires_grad_(False).eval()
image_proj.requires_grad_(False).eval()
audio_proj.requires_grad_(False).eval()
reference_net.enable_xformers_memory_efficient_attention()
diffusion_net.enable_xformers_memory_efficient_attention()
noise_scheduler = FlowMatchEulerDiscreteScheduler()
pipeline = VideoPipeline(vae=vae, reference_net=reference_net, diffusion_net=diffusion_net, scheduler=noise_scheduler, image_proj=image_proj)
pipeline.to(device=device, dtype=weight_dtype)
def process_audio(file_path, temp_dir):
# Load the audio file
audio = AudioSegment.from_file(file_path)
# Check and cut the audio if longer than 4 seconds
max_duration = 8 * 1000 # 4 seconds in milliseconds
if len(audio) > max_duration:
audio = audio[:max_duration]
# Save the processed audio in the temporary directory
output_path = os.path.join(temp_dir, "trimmed_audio.wav")
audio.export(output_path, format="wav")
# Return the path to the trimmed file
print(f"Processed audio saved at: {output_path}")
return output_path
@torch.inference_mode()
def generate(input_video, input_audio, seed, progress=gr.Progress(track_tqdm=True)):
is_shared_ui = True if "fffiloni/MEMO" in os.environ['SPACE_ID'] else False
temp_dir = None
if is_shared_ui:
temp_dir = tempfile.mkdtemp()
input_audio = process_audio(input_audio, temp_dir)
print(f"Processed file was stored temporarily at: {input_audio}")
resolution = 512
num_generated_frames_per_clip = 16
fps = 30
num_init_past_frames = 2
num_past_frames = 16
inference_steps = 20
cfg_scale = 3.5
if seed == 0:
random.seed(int(time.time()))
seed = random.randint(0, 18446744073709551615)
generator = torch.manual_seed(seed)
img_size = (resolution, resolution)
pixel_values, face_emb = preprocess_image(face_analysis_model="./checkpoints/misc/face_analysis", image_path=input_video, image_size=resolution)
output_dir = "./outputs"
os.makedirs(output_dir, exist_ok=True)
cache_dir = os.path.join(output_dir, "audio_preprocess")
os.makedirs(cache_dir, exist_ok=True)
input_audio = resample_audio(input_audio, os.path.join(cache_dir, f"{os.path.basename(input_audio).split('.')[0]}-16k.wav"))
if is_shared_ui:
# Clean up the temporary directory
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
print(f"Temporary directory {temp_dir} deleted.")
audio_emb, audio_length = preprocess_audio(
wav_path=input_audio,
num_generated_frames_per_clip=num_generated_frames_per_clip,
fps=fps,
wav2vec_model="./checkpoints/wav2vec2",
vocal_separator_model="./checkpoints/misc/vocal_separator/Kim_Vocal_2.onnx",
cache_dir=cache_dir,
device=device,
)
audio_emotion, num_emotion_classes = extract_audio_emotion_labels(
model="./checkpoints",
wav_path=input_audio,
emotion2vec_model="./checkpoints/emotion2vec_plus_large",
audio_length=audio_length,
device=device,
)
video_frames = []
num_clips = audio_emb.shape[0] // num_generated_frames_per_clip
for t in tqdm(range(num_clips), desc="Generating video clips"):
if len(video_frames) == 0:
past_frames = pixel_values.repeat(num_init_past_frames, 1, 1, 1)
past_frames = past_frames.to(dtype=pixel_values.dtype, device=pixel_values.device)
pixel_values_ref_img = torch.cat([pixel_values, past_frames], dim=0)
else:
past_frames = video_frames[-1][0]
past_frames = past_frames.permute(1, 0, 2, 3)
past_frames = past_frames[0 - num_past_frames :]
past_frames = past_frames * 2.0 - 1.0
past_frames = past_frames.to(dtype=pixel_values.dtype, device=pixel_values.device)
pixel_values_ref_img = torch.cat([pixel_values, past_frames], dim=0)
pixel_values_ref_img = pixel_values_ref_img.unsqueeze(0)
audio_tensor = (audio_emb[t * num_generated_frames_per_clip : min((t + 1) * num_generated_frames_per_clip, audio_emb.shape[0])].unsqueeze(0).to(device=audio_proj.device, dtype=audio_proj.dtype))
audio_tensor = audio_proj(audio_tensor)
audio_emotion_tensor = audio_emotion[t * num_generated_frames_per_clip : min((t + 1) * num_generated_frames_per_clip, audio_emb.shape[0])]
pipeline_output = pipeline(
ref_image=pixel_values_ref_img,
audio_tensor=audio_tensor,
audio_emotion=audio_emotion_tensor,
emotion_class_num=num_emotion_classes,
face_emb=face_emb,
width=img_size[0],
height=img_size[1],
video_length=num_generated_frames_per_clip,
num_inference_steps=inference_steps,
guidance_scale=cfg_scale,
generator=generator,
)
video_frames.append(pipeline_output.videos)
video_frames = torch.cat(video_frames, dim=2)
video_frames = video_frames.squeeze(0)
video_frames = video_frames[:, :audio_length]
# Save the output video
unique_id = str(uuid.uuid4())
video_path = os.path.join(output_dir, f"memo-{seed}_{unique_id}.mp4")
tensor_to_video(video_frames, video_path, input_audio, fps=fps)
return video_path
with gr.Blocks(analytics_enabled=False) as demo:
with gr.Column():
gr.Markdown("# MEMO: Memory-Guided Diffusion for Expressive Talking Video Generation")
gr.Markdown("Note: On fffiloni's shared UI, audio length is trimmed to max 8 seconds, so everyone can get a taste without to much waiting time in queue.")
gr.Markdown("Duplicate the space to skip the queue and enjoy full length capacity.")
gr.HTML("""
<div style="display:flex;column-gap:4px;">
<a href="https://github.com/memoavatar/memo">
<img src='https://img.shields.io/badge/GitHub-Repo-blue'>
</a>
<a href="https://memoavatar.github.io/">
<img src='https://img.shields.io/badge/Project-Page-green'>
</a>
<a href="https://arxiv.org/abs/2412.04448">
<img src='https://img.shields.io/badge/ArXiv-Paper-red'>
</a>
<a href="https://huggingface.co/spaces/fffiloni/MEMO?duplicate=true">
<img src="https://huggingface.co/datasets/huggingface/badges/resolve/main/duplicate-this-space-sm.svg" alt="Duplicate this Space">
</a>
<a href="https://huggingface.co/fffiloni">
<img src="https://huggingface.co/datasets/huggingface/badges/resolve/main/follow-me-on-HF-sm-dark.svg" alt="Follow me on HF">
</a>
</div>
""")
with gr.Row():
with gr.Column():
input_video = gr.Image(label="Upload Input Image", type="filepath")
input_audio = gr.Audio(label="Upload Input Audio", type="filepath")
seed = gr.Number(label="Seed (0 for Random)", value=0, precision=0)
with gr.Column():
video_output = gr.Video(label="Generated Video")
generate_button = gr.Button("Generate")
generate_button.click(
fn=generate,
inputs=[input_video, input_audio, seed],
outputs=[video_output],
)
demo.queue().launch(share=False, show_api=False, show_error=True)