|
from flask import Flask, request, jsonify, send_file |
|
from flask_cors import CORS |
|
import asyncio |
|
import tempfile |
|
import os |
|
from threading import RLock |
|
from huggingface_hub import InferenceClient |
|
from PIL import Image |
|
from io import BytesIO |
|
|
|
myapp = Flask(__name__) |
|
CORS(myapp) |
|
|
|
lock = RLock() |
|
HF_TOKEN = os.environ.get("HF_TOKEN") |
|
|
|
inference_timeout = 600 |
|
|
|
@myapp.route('/') |
|
def home(): |
|
return "Welcome to the Image Background Remover!" |
|
|
|
|
|
def get_model_from_name(model_name): |
|
return model_name if model_name in models else None |
|
|
|
|
|
async def infer(client, prompt, seed=1, timeout=inference_timeout, model="prompthero/openjourney-v4"): |
|
task = asyncio.create_task( |
|
asyncio.to_thread(client.text_to_image, prompt=prompt, seed=seed, model=model) |
|
) |
|
await asyncio.sleep(0) |
|
try: |
|
result = await asyncio.wait_for(task, timeout=timeout) |
|
except (Exception, asyncio.TimeoutError) as e: |
|
print(e) |
|
print(f"Task timed out for model: {model}") |
|
if not task.done(): |
|
task.cancel() |
|
result = None |
|
|
|
if task.done() and result is not None: |
|
with lock: |
|
|
|
image_bytes = BytesIO() |
|
|
|
result.save(image_bytes, format='PNG') |
|
image_bytes.seek(0) |
|
|
|
|
|
temp_image = tempfile.NamedTemporaryFile(suffix=".png", delete=False) |
|
with open(temp_image.name, "wb") as f: |
|
f.write(image_bytes.read()) |
|
|
|
return temp_image.name |
|
return None |
|
|
|
|
|
@myapp.route('/generate_image', methods=['POST']) |
|
def generate_image(): |
|
data = request.get_json() |
|
|
|
|
|
prompt = data.get('prompt', '') |
|
seed = data.get('seed', 1) |
|
model_name = data.get('model', 'prompthero/openjourney-v4') |
|
|
|
if not prompt: |
|
return jsonify({"error": "Prompt is required"}), 400 |
|
|
|
|
|
model = get_model_from_name(model_name) |
|
if not model: |
|
return jsonify({"error": f"Model '{model_name}' not found in available models"}), 400 |
|
|
|
try: |
|
|
|
client = InferenceClient(token=HF_TOKEN) |
|
|
|
|
|
result_path = asyncio.run(infer(client, prompt, seed, model=model)) |
|
if result_path: |
|
return send_file(result_path, mimetype='image/png') |
|
else: |
|
return jsonify({"error": "Failed to generate image"}), 500 |
|
except Exception as e: |
|
print(f"Error in generate_image: {str(e)}") |
|
return jsonify({"error": str(e)}), 500 |
|
|
|
|
|
if __name__ == "__main__": |
|
myapp.run(host='0.0.0.0', port=7860) |