Spaces:
Runtime error
Runtime error
import gradio as gr | |
from gradio import Theme | |
import os, cv2, torch, time, random | |
import numpy as np | |
from moviepy.editor import * | |
import boto3 | |
from botocore.client import Config | |
from botocore.exceptions import NoCredentialsError | |
from diffusers import DiffusionPipeline, EulerAncestralDiscreteScheduler | |
from PIL import Image | |
from yt_dlp import YoutubeDL | |
pipe = DiffusionPipeline.from_pretrained("timbrooks/instruct-pix2pix", torch_dtype=torch.float16, safety_checker=None) | |
pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(pipe.scheduler.config) | |
pipe.enable_xformers_memory_efficient_attention() | |
pipe.unet.to(memory_format=torch.channels_last) | |
pipe = pipe.to("cuda") | |
s3_access_key = "juj22qxqxql7u2pl6nomgxu3ip7a" | |
s3_secret_key = "j3uwidtozhboy5vczhymhzkkjsaumznnqlzck5zjs5qxgsung4ukk" | |
s3_endpoint = "https://gateway.storjshare.io" | |
s3 = boto3.client("s3", aws_access_key_id=s3_access_key, aws_secret_access_key=s3_secret_key, endpoint_url=s3_endpoint, config=Config(signature_version="s3v4")) | |
my_theme = Theme.from_hub("bethecloud/storj_theme") | |
def download_video(url): | |
ydl_opts = {'overwrites':True, 'format':'bestvideo[ext=mp4]+bestaudio[ext=m4a]/mp4', 'outtmpl':'/content/video.mp4'} | |
with YoutubeDL(ydl_opts) as ydl: | |
ydl.download(url) | |
return f"/content/video.mp4" | |
def pix2pix(prompt, text_guidance_scale, image_guidance_scale, image, steps, neg_prompt="", width=512, height=512, seed=0): | |
if seed == 0: | |
seed = random.randint(0, 2147483647) | |
generator = torch.Generator("cuda").manual_seed(seed) | |
try: | |
image = Image.open(image) | |
ratio = min(height / image.height, width / image.width) | |
image = image.resize((int(image.width * ratio), int(image.height * ratio)), Image.LANCZOS) | |
result = pipe( | |
prompt, | |
negative_prompt=neg_prompt, | |
image=image, | |
num_inference_steps=int(steps), | |
image_guidance_scale=image_guidance_scale, | |
guidance_scale=text_guidance_scale, | |
generator=generator, | |
) | |
return result.images, result.nsfw_content_detected, seed | |
except Exception as e: | |
return None, None, error_str(e) | |
def error_str(error, title="Error"): | |
return ( | |
f"""#### {title} | |
{error}""" | |
if error | |
else "" | |
) | |
def get_frames(video_in): | |
frames = [] | |
clip = VideoFileClip(video_in) | |
if clip.fps > 30: | |
print("vide rate is over 30, resetting to 30") | |
clip_resized = clip.resize(height=512) | |
clip_resized.write_videofile("video_resized.mp4", fps=30, verbose=False) | |
else: | |
print("video rate is OK") | |
clip_resized = clip.resize(height=512) | |
clip_resized.write_videofile("video_resized.mp4", fps=clip.fps, verbose=False) | |
print("video resized to 512 height") | |
cap= cv2.VideoCapture("video_resized.mp4") | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
print("video fps: " + str(fps)) | |
i=0 | |
while(cap.isOpened()): | |
ret, frame = cap.read() | |
if ret == False: | |
break | |
cv2.imwrite('in'+str(i)+'.jpg',frame) | |
frames.append('in'+str(i)+'.jpg') | |
i+=1 | |
cap.release() | |
cv2.destroyAllWindows() | |
print("broke the video into frames") | |
return frames, fps | |
def create_video(frames, fps): | |
clips = [ImageClip(m).set_duration(1 / fps) for m in frames] | |
concat_clip = concatenate_videoclips(clips, method="compose") | |
concat_clip.write_videofile("/content/output.mp4", fps=fps, verbose=False) | |
return "/content/output.mp4" | |
def infer(prompt,video_in, seed_in, trim_value): | |
print(prompt) | |
break_vid = get_frames(video_in) | |
frames_list= break_vid[0] | |
fps = break_vid[1] | |
n_frame = int(trim_value*fps) | |
if n_frame >= len(frames_list): | |
print("video is shorter than the cut value") | |
n_frame = len(frames_list) | |
result_frames = [] | |
print("set stop frames to: " + str(n_frame)) | |
for i in frames_list[0:int(n_frame)]: | |
pix2pix_img = pix2pix(prompt,5.5,1.5,i,15,"",512,512,seed_in) | |
images = pix2pix_img[0] | |
rgb_im = images[0].convert("RGB") | |
rgb_im.save(f"out-{i}.jpg") | |
result_frames.append(f"out-{i}.jpg") | |
print("frame " + i + "/" + str(n_frame) + ": done;") | |
final_vid = create_video(result_frames, fps) | |
print("Done!") | |
return final_vid | |
def upload_to_storj(bucket_name, file_path): | |
try: | |
file_name = os.path.basename(file_path) | |
s3.upload_file(file_path, bucket_name, file_name) | |
print(f"{file_name} uploaded to {bucket_name}") | |
except NoCredentialsError: | |
print("Credentials not available") | |
with gr.Blocks(theme=my_theme) as demo: | |
with gr.Column(elem_id="col-container"): | |
with gr.Row(): | |
with gr.Column(): | |
input_text = gr.Textbox(show_label=False, value="https://link.storjshare.io/s/jwn3ljgyj4ynlg6qtqj6yrbo63ha/demo/dancing.mp4?view") | |
input_download_button = gr.Button(value="Enter a link from Storj Linkshare") | |
prompt = gr.Textbox(label="Prompt", placeholder="Enter new style of video", show_label=False, elem_id="prompt-in") | |
video_inp = gr.Video(label="Video source", source="upload", type="filepath", elem_id="input-vid") | |
input_download_button.click(download_video, inputs=[input_text], outputs=[video_inp]) | |
with gr.Column(): | |
video_out = gr.Video(label="Pix2pix video result", type="filepath", elem_id="video-output") | |
with gr.Row(): | |
seed_inp = gr.Slider(label="Seed", minimum=0, maximum=2147483647, step=1, value=69) | |
trim_in = gr.Slider(label="Cut video at (s)", minimun=1, maximum=600, step=1, value=1) | |
submit_btn = gr.Button("Generate transformed video with Storj") | |
inputs = [prompt,video_inp,seed_inp, trim_in] | |
def process_and_upload(prompt, video_inp, seed_inp, trim_in): | |
final_vid = infer(prompt, video_inp, seed_inp, trim_in) | |
# Specify the name of the bucket to upload to | |
storj_bucket_name = "huggingface-demo" | |
upload_to_storj(storj_bucket_name, final_vid) | |
return final_vid | |
submit_btn.click(process_and_upload, inputs=inputs, outputs=[video_out]) | |
demo.queue().launch(debug=True, share=True, inline=False) |