text2vid2storj / app.py
bethecloud's picture
Update app.py
204ccd1
raw
history blame
6.35 kB
import gradio as gr
from gradio import Theme
import os, cv2, torch, time, random
import numpy as np
from moviepy.editor import *
import boto3
from botocore.client import Config
from botocore.exceptions import NoCredentialsError
from diffusers import DiffusionPipeline, EulerAncestralDiscreteScheduler
from PIL import Image
from yt_dlp import YoutubeDL
pipe = DiffusionPipeline.from_pretrained("timbrooks/instruct-pix2pix", torch_dtype=torch.float16, safety_checker=None)
pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(pipe.scheduler.config)
pipe.enable_xformers_memory_efficient_attention()
pipe.unet.to(memory_format=torch.channels_last)
pipe = pipe.to("cuda")
s3_access_key = "juj22qxqxql7u2pl6nomgxu3ip7a"
s3_secret_key = "j3uwidtozhboy5vczhymhzkkjsaumznnqlzck5zjs5qxgsung4ukk"
s3_endpoint = "https://gateway.storjshare.io"
s3 = boto3.client("s3", aws_access_key_id=s3_access_key, aws_secret_access_key=s3_secret_key, endpoint_url=s3_endpoint, config=Config(signature_version="s3v4"))
my_theme = Theme.from_hub("bethecloud/storj_theme")
def download_video(url):
ydl_opts = {'overwrites':True, 'format':'bestvideo[ext=mp4]+bestaudio[ext=m4a]/mp4', 'outtmpl':'/content/video.mp4'}
with YoutubeDL(ydl_opts) as ydl:
ydl.download(url)
return f"/content/video.mp4"
def pix2pix(prompt, text_guidance_scale, image_guidance_scale, image, steps, neg_prompt="", width=512, height=512, seed=0):
if seed == 0:
seed = random.randint(0, 2147483647)
generator = torch.Generator("cuda").manual_seed(seed)
try:
image = Image.open(image)
ratio = min(height / image.height, width / image.width)
image = image.resize((int(image.width * ratio), int(image.height * ratio)), Image.LANCZOS)
result = pipe(
prompt,
negative_prompt=neg_prompt,
image=image,
num_inference_steps=int(steps),
image_guidance_scale=image_guidance_scale,
guidance_scale=text_guidance_scale,
generator=generator,
)
return result.images, result.nsfw_content_detected, seed
except Exception as e:
return None, None, error_str(e)
def error_str(error, title="Error"):
return (
f"""#### {title}
{error}"""
if error
else ""
)
def get_frames(video_in):
frames = []
clip = VideoFileClip(video_in)
if clip.fps > 30:
print("vide rate is over 30, resetting to 30")
clip_resized = clip.resize(height=512)
clip_resized.write_videofile("video_resized.mp4", fps=30, verbose=False)
else:
print("video rate is OK")
clip_resized = clip.resize(height=512)
clip_resized.write_videofile("video_resized.mp4", fps=clip.fps, verbose=False)
print("video resized to 512 height")
cap= cv2.VideoCapture("video_resized.mp4")
fps = cap.get(cv2.CAP_PROP_FPS)
print("video fps: " + str(fps))
i=0
while(cap.isOpened()):
ret, frame = cap.read()
if ret == False:
break
cv2.imwrite('in'+str(i)+'.jpg',frame)
frames.append('in'+str(i)+'.jpg')
i+=1
cap.release()
cv2.destroyAllWindows()
print("broke the video into frames")
return frames, fps
def create_video(frames, fps):
clips = [ImageClip(m).set_duration(1 / fps) for m in frames]
concat_clip = concatenate_videoclips(clips, method="compose")
concat_clip.write_videofile("/content/output.mp4", fps=fps, verbose=False)
return "/content/output.mp4"
def infer(prompt,video_in, seed_in, trim_value):
print(prompt)
break_vid = get_frames(video_in)
frames_list= break_vid[0]
fps = break_vid[1]
n_frame = int(trim_value*fps)
if n_frame >= len(frames_list):
print("video is shorter than the cut value")
n_frame = len(frames_list)
result_frames = []
print("set stop frames to: " + str(n_frame))
for i in frames_list[0:int(n_frame)]:
pix2pix_img = pix2pix(prompt,5.5,1.5,i,15,"",512,512,seed_in)
images = pix2pix_img[0]
rgb_im = images[0].convert("RGB")
rgb_im.save(f"out-{i}.jpg")
result_frames.append(f"out-{i}.jpg")
print("frame " + i + "/" + str(n_frame) + ": done;")
final_vid = create_video(result_frames, fps)
print("Done!")
return final_vid
def upload_to_storj(bucket_name, file_path):
try:
file_name = os.path.basename(file_path)
s3.upload_file(file_path, bucket_name, file_name)
print(f"{file_name} uploaded to {bucket_name}")
except NoCredentialsError:
print("Credentials not available")
with gr.Blocks(theme=my_theme) as demo:
with gr.Column(elem_id="col-container"):
with gr.Row():
with gr.Column():
input_text = gr.Textbox(show_label=False, value="https://link.storjshare.io/s/jwn3ljgyj4ynlg6qtqj6yrbo63ha/demo/dancing.mp4?view")
input_download_button = gr.Button(value="Enter a link from Storj Linkshare")
prompt = gr.Textbox(label="Prompt", placeholder="Enter new style of video", show_label=False, elem_id="prompt-in")
video_inp = gr.Video(label="Video source", source="upload", type="filepath", elem_id="input-vid")
input_download_button.click(download_video, inputs=[input_text], outputs=[video_inp])
with gr.Column():
video_out = gr.Video(label="Pix2pix video result", type="filepath", elem_id="video-output")
with gr.Row():
seed_inp = gr.Slider(label="Seed", minimum=0, maximum=2147483647, step=1, value=69)
trim_in = gr.Slider(label="Cut video at (s)", minimun=1, maximum=600, step=1, value=1)
submit_btn = gr.Button("Generate transformed video with Storj")
inputs = [prompt,video_inp,seed_inp, trim_in]
def process_and_upload(prompt, video_inp, seed_inp, trim_in):
final_vid = infer(prompt, video_inp, seed_inp, trim_in)
# Specify the name of the bucket to upload to
storj_bucket_name = "huggingface-demo"
upload_to_storj(storj_bucket_name, final_vid)
return final_vid
submit_btn.click(process_and_upload, inputs=inputs, outputs=[video_out])
demo.queue().launch(debug=True, share=True, inline=False)