import os import gradio as gr from modelscope.pipelines import pipeline from modelscope.outputs import OutputKeys import requests import tempfile import boto3 from botocore.exceptions import NoCredentialsError # Read S3 keys from environment variables s3_access_key = "juj22qxqxql7u2pl6nomgxu3ip7a" s3_secret_key = "j3uwidtozhboy5vczhymhzkkjsaumznnqlzck5zjs5qxgsung4ukk" s3_endpoint = "https://gateway.storjshare.io" # Initialize Storj with boto3 s3 = boto3.client( "s3", endpoint_url=s3_endpoint, aws_access_key_id=s3_access_key, aws_secret_access_key=s3_secret_key, ) p = pipeline("text-to-video-synthesis", "damo/text-to-video-synthesis") async def generate_video(text_input: str, duration: int) -> (str, str): test_text = {'text': text_input, 'duration': duration} output_video_path = p(test_text,)[OutputKeys.OUTPUT_VIDEO] # Upload to Storj with open(output_video_path, "rb") as f: video_key = os.path.basename(output_video_path) s3.upload_fileobj(f, "huggingface-demo", video_key) # Generate signed URL for uploaded video try: storj_video_url = s3.generate_presigned_url( "get_object", Params={"Bucket": "huggingface-demo", "Key": video_key}, ExpiresIn=3600, ) except NoCredentialsError: print("Credentials not available") return None # Download the video from Storj to a temporary file response = requests.get(storj_video_url, stream=True) response.raise_for_status() with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as temp_file: for chunk in response.iter_content(chunk_size=8192): temp_file.write(chunk) local_video_path = temp_file.name return local_video_path, storj_video_url # Use the custom Gradio theme storj_theme = gr.Theme.from_hub("bethecloud/storj_theme") iface = gr.Interface( fn=generate_video, inputs=[ gr.inputs.Textbox(lines=3, label="Input text"), gr.inputs.Slider(minimum=1, maximum=60, step=1, default=5, label="Duration (seconds)") ], outputs=[ gr.outputs.Video(label="Generated Video"), gr.outputs.Textbox(label="Storj Video URL") ], title="Text to Video to Storj", theme=storj_theme # Pass the theme to the Interface ) iface.launch(share=False, debug=True) # Set share to False when deploying to Hugging Face Spaces