Spaces:
Running
Running
File size: 4,908 Bytes
36dd645 9a4edab 1e9882c 9a4edab 1e9882c 9a4edab 524f3f2 9a4edab 524f3f2 1e9882c 8352ac4 524f3f2 9a4edab 1e9882c 9a4edab 1e9882c a2398fe 1e9882c a2398fe 1e9882c 9a4edab 1e9882c a2398fe 1e9882c 9a4edab 1e9882c b45d081 1e9882c 6be94e8 1e9882c 6be94e8 13fad22 1e9882c 13fad22 1e9882c f326d9e 1e9882c 4e8bac1 1e9882c 9a4edab 1e9882c 9a4edab 1e9882c 9a4edab 1e9882c 4e6c3f2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
import os
import io
import json
import base64
import random
import urllib.request
import urllib.parse
import websocket
import uuid
from dotenv import load_dotenv
from flask import Flask, request, jsonify, render_template
from PIL import Image
# Load environment variables from the .env file
load_dotenv()
# Initialize Flask app
app = Flask(__name__)
# Set server and websocket addresses from environment variables
server_address = os.getenv("SERVER_ADDRESS")
ws_address = os.getenv("WS_ADDRESS")
# Generate a unique client ID
client_id = str(uuid.uuid4())
def make_request(url, data=None, headers=None):
req = urllib.request.Request(url, data=data, headers=headers)
with urllib.request.urlopen(req) as response:
return json.loads(response.read())
def queue_prompt(prompt, token):
payload = {"prompt": prompt, "client_id": client_id}
data = json.dumps(payload).encode('utf-8')
headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
return make_request(f"{server_address}/prompt", data=data, headers=headers)
def get_image(filename, subfolder, image_type, token):
url_values = {'filename': filename, 'subfolder': subfolder, 'type': image_type}
url = f"{server_address}/view?{urllib.parse.urlencode(url_values)}"
req = urllib.request.Request(url)
req.add_header("Authorization", f"Bearer {token}")
try:
return urllib.request.urlopen(req).read()
except urllib.error.HTTPError as e:
print(f"HTTP Error: {e.code} - {e.reason}")
print(e.read())
raise
def get_history(prompt_id, token):
headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
return make_request(f"{server_address}/history/{prompt_id}", headers=headers)
def get_images(ws, prompt, token):
prompt_id = queue_prompt(prompt, token)['prompt_id']
output_images = {}
while True:
out = ws.recv()
if isinstance(out, str):
message = json.loads(out)
if message['type'] == 'executing':
data = message['data']
if data['node'] is None and data['prompt_id'] == prompt_id:
break # Execution is done
history = get_history(prompt_id, token)[prompt_id]
for node_id in history['outputs']:
node_output = history['outputs'][node_id]
images_output = []
if 'images' in node_output:
for image in node_output['images']:
image_data = get_image(image['filename'], image['subfolder'], image['type'], token)
images_output.append(image_data)
output_images[node_id] = images_output
return output_images
# Default route for home welcome
@app.route('/')
def home():
return render_template('home.html')
# Generate image route
@app.route('/generate_image', methods=['POST'])
def generate_image():
data = request.json
# Extract the token from the request headers
# token = request.headers.get('Authorization')
token = "Bearer JDJiJDEyJDgwbUJwQTFrQ0JYdS9lR2R4ZEZWdmV3WS9VTmlCeHNtc2txbnBITjR4Qm96ZmFnVUkvNDlh"
if token is None:
return jsonify({'error': 'No token provided'}), 400
if token.startswith("Bearer "):
token = token.split(" ")[1]
# Base64 decode the encoded token
token = base64.b64decode(token).decode("utf-8")
if 'text_prompt' not in data:
return jsonify({'error': 'No text prompt provided'}), 400
text_prompt = data['text_prompt']
# Get the path to the current file's directory
current_dir = os.path.dirname(os.path.abspath(__file__))
file_path = os.path.join(current_dir, 'workflow_api.json')
with open(file_path, 'r', encoding='utf-8') as file:
workflow_jsondata = file.read()
prompt = json.loads(workflow_jsondata)
prompt["6"]["inputs"]["text"] = text_prompt
prompt["7"]["inputs"]["text"] = "text, watermark, low quality, extra hands, extra legs."
seednum = random.randint(1, 9999999999999)
prompt["3"]["inputs"]["seed"] = seednum
ws = websocket.WebSocket()
ws.connect(f"{ws_address}?clientId={client_id}&token={token}")
images = get_images(ws, prompt, token)
ws.close()
output_images_base64 = []
for node_id in images:
for image_data in images[node_id]:
image = Image.open(io.BytesIO(image_data))
buffered = io.BytesIO()
image.save(buffered, format="PNG")
img_str = base64.b64encode(buffered.getvalue()).decode("utf-8")
output_images_base64.append(img_str)
return jsonify({'images': output_images_base64})
@app.route('/get_image/<filename>', methods=['GET'])
def get_image_file(filename):
return send_file(filename, mimetype='image/png')
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860) # Removed 'debug=True'
|