import os import io import json import base64 import random import urllib.request import urllib.parse import websocket import requests import uuid from dotenv import load_dotenv from flask import Flask, request, jsonify, render_template, send_file from PIL import Image from werkzeug.utils import secure_filename # Load environment variables from the .env file load_dotenv() # Initialize Flask app app = Flask(__name__) ALLOWED_EXTENSIONS = {'jpg', 'jpeg', 'png', 'webp'} # Define supported image types # Set server and websocket addresses from environment variables server_address = os.getenv("SERVER_ADDRESS") ws_address = os.getenv("WS_ADDRESS") # Generate a unique client ID client_id = str(uuid.uuid4()) def get_image(filename, subfolder, image_type, token): url_values = {'filename': filename, 'subfolder': subfolder, 'type': image_type} url = f"{server_address}/view?{urllib.parse.urlencode(url_values)}" req = urllib.request.Request(url) req.add_header("Authorization", f"Bearer {token}") try: return urllib.request.urlopen(req).read() except urllib.error.HTTPError as e: print(f"HTTP Error: {e.code} - {e.reason}") print(e.read()) raise def get_images(ws, workflow, token): prompt_id = queue_prompt(workflow, token)['prompt_id'] output_images = {} while True: out = ws.recv() if isinstance(out, str): message = json.loads(out) if message['type'] == 'executing': data = message['data'] if data['node'] is None and data['prompt_id'] == prompt_id: break # Execution is done history = get_history(prompt_id, token)[prompt_id] for node_id in history['outputs']: node_output = history['outputs'][node_id] images_output = [] if 'images' in node_output: for image in node_output['images']: image_data = get_image(image['filename'], image['subfolder'], image['type'], token) images_output.append(image_data) output_images[node_id] = images_output return output_images def fetch_video(video_data, token): video_url = f"{server_address}/download?file={video_data['filename']}" req = urllib.request.Request(video_url) req.add_header("Authorization", f"Bearer {token}") return urllib.request.urlopen(req).read() # Default route for home welcome @app.route('/') def home(): return render_template('home.html') # Generate image route @app.route('/generate_image', methods=['POST']) def generate_image(): data = request.json # Extract the token from the request headers token = request.headers.get('Authorization') # token = "Bearer JDJiJDEyJDgwbUJwQTFrQ0JYdS9lR2R4ZEZWdmV3WS9VTmlCeHNtc2txbnBITjR4Qm96ZmFnVUkvNDlh" if token is None: return jsonify({'error': 'No token provided'}), 400 if token.startswith("Bearer "): token = token.split(" ")[1] # Base64 decode the encoded token token = base64.b64decode(token).decode("utf-8") if 'text_prompt' not in data: return jsonify({'error': 'No text prompt provided'}), 400 text_prompt = data['text_prompt'] # Get the path to the current file's directory current_dir = os.path.dirname(os.path.abspath(__file__)) file_path = os.path.join(current_dir, 'workflows/flux1_dev_checkpoint_workflow_api.json') with open(file_path, 'r', encoding='utf-8') as file: workflow_jsondata = file.read() workflow = json.loads(workflow_jsondata) workflow["6"]["inputs"]["text"] = text_prompt # workflow["7"]["inputs"]["text"] = "text, watermark, low quality, extra hands, extra legs." # seednum = random.randint(1, 9999999999999) # workflow["3"]["inputs"]["seed"] = seednum ####################### # For model Flux1.dev # ####################### # Generate a random 15-digit seed as an integer seednum = random.randint(100000000000000, 999999999999999) workflow["31"]["inputs"]["seed"] = seednum ws = websocket.WebSocket() try: ws.connect(f"{ws_address}?clientId={client_id}&token={token}", header= {"Authorization": f"Bearer {token}"}) except websocket.WebSocketException as e: return jsonify({'error': f'WebSocket connection failed: {str(e)}'}), 500 images = get_images(ws, workflow, token) ws.close() output_images_base64 = [] for node_id in images: for image_data in images[node_id]: image = Image.open(io.BytesIO(image_data)) buffered = io.BytesIO() image.save(buffered, format="PNG") img_str = base64.b64encode(buffered.getvalue()).decode("utf-8") output_images_base64.append(img_str) return jsonify({'images': output_images_base64}) # Get image route @app.route('/get_image/', methods=['GET']) def get_image_file(filename): return send_file(filename, mimetype='image/png') def allowed_file(filename): """Check if the uploaded file has an allowed extension.""" return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def save_base64_image(b64_string): """Decode a base64 string and save it as an image.""" try: # Handle Data URI scheme if present if ',' in b64_string: header, encoded = b64_string.split(',', 1) ext = header.split('/')[1].split(';')[0] if '/' in header else 'png' else: encoded = b64_string ext = 'png' # Decode the image data image_data = base64.b64decode(encoded) # Generate a unique path for the image image_path = f"/tmp/{uuid.uuid4()}.{ext}" # Ensure directory exists os.makedirs('/tmp/', exist_ok=True) # Save the image with open(image_path, 'wb') as f: f.write(image_data) print(f"Image saved at: {image_path}") return (f"https://gosign-de-comfyui-api.hf.space{image_path}") except Exception as e: raise ValueError(f"Failed to save image: {e}") def make_request(url, data=None, headers=None): req = urllib.request.Request(url, data=data, headers=headers) try: with urllib.request.urlopen(req) as response: response_body = response.read().decode() # Decode the response # print(response_body) return json.loads(response_body) # Convert to JSON if valid except urllib.error.HTTPError as e: print(f"HTTPError: {e.code}, {e.reason}") print(e.read().decode()) # Print detailed error response except urllib.error.URLError as e: print(f"URLError: {e.reason}") # Helper: Queue the prompt def queue_prompt(workflow, token): payload = {"prompt": workflow, "client_id": client_id} headers = { 'Authorization': f'Bearer {token}', 'Content-Type': 'application/json' } response = make_request(f"{server_address}/prompt", data=json.dumps(payload).encode('utf-8'), headers=headers) if not response or 'prompt_id' not in response: raise ValueError("Failed to queue the prompt. Check the request or API response.") return response['prompt_id'] def get_history(prompt_id, token): headers = { 'Authorization': f'Bearer {token}', 'Content-Type': 'application/json' } return make_request(f"{server_address}/history/{prompt_id}", headers=headers) def get_video_data(filename, subfolder, token): url_values = {'filename': filename, 'subfolder': subfolder} url = f"{server_address}/view?{urllib.parse.urlencode(url_values)}" req = urllib.request.Request(url) req.add_header("Authorization", f"Bearer {token}") try: return urllib.request.urlopen(req).read() # Return video content except urllib.error.HTTPError as e: print(f"HTTP Error: {e.code} - {e.reason}") print(e.read()) raise # Helper: Upload Image to Hugging Face def upload_image_to_hf(image_path, token): with open(image_path, 'rb') as img: headers = { "Authorization": f"Bearer {token}", "Content-Type": "image/jpeg" # Change as per your image type } response = requests.post( "https://gosign-de-image-to-video.hf.space", headers=headers, files={'file': img} ) if response.status_code != 200: raise ValueError(f"Failed to upload image: {response.text}") return response.json().get('url') # Route: Image to Video @app.route('/image_to_video', methods=['POST']) def image_to_video(): data = request.json # Extract and validate token token = request.headers.get('Authorization') if not token or not token.startswith("Bearer "): return jsonify({'error': 'Valid Bearer token required'}), 400 token = token.split(" ")[1] # Validate text prompt text_prompt = data.get('text_prompt') if not text_prompt: return jsonify({'error': 'Text prompt is required'}), 400 # Handle uploaded image or base64 image image_file = request.files.get('image') base64_image = data.get('base64_image') if image_file: # Validate and save the uploaded image if not allowed_file(image_file.filename): return jsonify({'error': 'Unsupported image format'}), 400 filename = secure_filename(image_file.filename) image_path = f"/tmp/{uuid.uuid4()}_{filename}" image_file.save(image_path) elif base64_image: # Save base64 image try: image_path = save_base64_image(base64_image) # return jsonify({'image_path': image_path}) # image_path = (upload_image_to_hf(image_path, token)) print(f"Image saved at HF: {image_path}") except Exception as e: return jsonify({'error': f'Invalid base64 image data: {str(e)}'}), 400 else: return jsonify({'error': 'Image is required (file or base64)'}), 400 # Load workflow configuration current_dir = os.path.dirname(os.path.abspath(__file__)) workflow_path = os.path.join(current_dir, 'workflows/cogvideox_image_to_video_workflow_api.json') with open(workflow_path, 'r', encoding='utf-8') as f: workflow = json.load(f) # Modify workflow with inputs workflow["30"]["inputs"]["prompt"] = text_prompt workflow["36"]["inputs"]["image"] = image_path workflow["31"]["inputs"]["prompt"] = "Low quality, watermark, strange motion" workflow["57"]["inputs"]["seed"] = random.randint(100000000000000, 999999999999999) # WebSocket connection to queue and monitor workflow ws = websocket.WebSocket() try: ws.connect(f"{ws_address}?clientId={client_id}&token={token}", header={"Authorization": f"Bearer {token}"}) except websocket.WebSocketException as e: return jsonify({'error': f'WebSocket connection failed: {str(e)}'}), 500 try: prompt_id = queue_prompt(workflow, token) except ValueError as e: return jsonify({'error': str(e)}), 400 # Wait for workflow execution to complete while True: message = json.loads(ws.recv()) if message.get('type') == 'executing' and message['data']['node'] is None and message['data']['prompt_id'] == prompt_id: break # Fetch video details history = get_history(prompt_id, token).get(prompt_id, {}) video_data = None for node_id, node_output in history.get('outputs', {}).items(): if 'videos' in node_output: video = node_output['videos'][0] video_data = get_video_data(video['filename'], video['subfolder'], token) if not video_data: return jsonify({'error': 'Failed to generate video'}), 500 # Save and return the generated video local_video_path = f"/tmp/generated_video_{uuid.uuid4()}.mp4" with open(local_video_path, 'wb') as f: f.write(video_data) return send_file( io.BytesIO(video_data), mimetype='video/mp4', as_attachment=True, download_name='generated_video.mp4' ) if __name__ == '__main__': app.run(host='0.0.0.0', port=7860, debug=True) # Removed 'debug=True'