import os import numpy as np import random from pathlib import Path from PIL import Image import streamlit as st from huggingface_hub import InferenceClient, AsyncInferenceClient from gradio_client import Client, handle_file import asyncio from concurrent.futures import ThreadPoolExecutor import yaml import cv2 import dlib # Cargar configuración try: with open("config.yaml", "r") as file: credentials = yaml.safe_load(file) except Exception as e: st.error(f"Error al cargar el archivo de configuración: {e}") credentials = {"username": "", "password": ""} MAX_SEED = np.iinfo(np.int32).max HF_TOKEN_UPSCALER = os.environ.get("HF_TOKEN_UPSCALER") client = AsyncInferenceClient() llm_client = InferenceClient("mistralai/Mixtral-8x7B-Instruct-v0.1") DATA_PATH = Path("./data") DATA_PATH.mkdir(exist_ok=True) def run_async(func): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) executor = ThreadPoolExecutor(max_workers=1) result = loop.run_in_executor(executor, func) return loop.run_until_complete(result) async def generate_image(combined_prompt, model, width, height, scales, steps, seed): seed = int(seed) if seed != -1 else random.randint(0, MAX_SEED) try: image = await client.text_to_image( prompt=combined_prompt, height=height, width=width, guidance_scale=scales, num_inference_steps=steps, model=model ) return image, seed except Exception as e: return f"Error al generar imagen: {e}", None def get_upscale_finegrain(prompt, img_path, upscale_factor): try: client = Client("finegrain/finegrain-image-enhancer", hf_token=HF_TOKEN_UPSCALER) result = client.predict( input_image=handle_file(img_path), prompt=prompt, upscale_factor=upscale_factor ) return result[1] if isinstance(result, list) and len(result) > 1 else None except Exception: return None def save_prompt(prompt_text, seed): try: prompt_file_path = DATA_PATH / f"prompt_{seed}.txt" with open(prompt_file_path, "w") as prompt_file: prompt_file.write(prompt_text) return prompt_file_path except Exception as e: st.error(f"Error al guardar el prompt: {e}") return None async def gen(prompt, basemodel, width, height, scales, steps, seed, upscale_factor, process_upscale, process_enhancer, language): combined_prompt = f"{prompt} {await improve_prompt(prompt, language) if process_enhancer else ''}".strip() seed = int(seed) if seed != -1 else random.randint(0, MAX_SEED) progress_bar = st.progress(0) image, seed = await generate_image(combined_prompt, basemodel, width, height, scales, steps, seed) progress_bar.progress(50) if isinstance(image, str) and image.startswith("Error"): progress_bar.empty() return [image, None, combined_prompt] image_path = save_image(image, seed) prompt_file_path = save_prompt(combined_prompt, seed) if process_upscale: upscale_image_path = get_upscale_finegrain(combined_prompt, image_path, upscale_factor) if upscale_image_path: Image.open(upscale_image_path).save(DATA_PATH / f"upscale_image_{seed}.jpg", format="JPEG") progress_bar.progress(100) image_path.unlink() return [str(DATA_PATH / f"upscale_image_{seed}.jpg"), str(prompt_file_path)] progress_bar.progress(100) return [str(image_path), str(prompt_file_path)] async def improve_prompt(prompt, language): instruction = ( "Con esta idea, describe en español un prompt detallado de txt2img en un máximo de 500 caracteres, " "con iluminación, atmósfera, elementos cinematográficos y en su caso personajes..." if language == "es" else "With this idea, describe in English a detailed txt2img prompt in 500 characters at most, " "add illumination, atmosphere, cinematic elements, and characters if needed..." ) formatted_prompt = f"{prompt}: {instruction}" response = llm_client.text_generation(formatted_prompt, max_new_tokens=500) improved_text = response.get('generated_text', '').strip() if 'generated_text' in response else response.strip() return improved_text[:500] if len(improved_text) > 500 else improved_text def save_image(image, seed): try: image_path = DATA_PATH / f"image_{seed}.jpg" image.save(image_path, format="JPEG") return image_path except Exception as e: st.error(f"Error al guardar la imagen: {e}") return None def get_storage(): files = [file for file in DATA_PATH.glob("*.jpg") if file.is_file()] files.sort(key=lambda x: x.stat().st_mtime, reverse=True) usage = sum(file.stat().st_size for file in files) return [str(file.resolve()) for file in files], f"Uso total: {usage / (1024.0 ** 3):.3f}GB" def get_prompts(): prompt_files = [file for file in DATA_PATH.glob("*.txt") if file.is_file()] return {file.stem.replace("prompt_", ""): file for file in prompt_files} def delete_image(image_path): try: if Path(image_path).exists(): Path(image_path).unlink() st.success(f"Imagen {image_path} borrada.") else: st.error("El archivo de imagen no existe.") except Exception as e: st.error(f"Error al borrar la imagen: {e}") def swap_faces(image_path): try: image = cv2.imread(str(image_path)) detector = dlib.get_frontal_face_detector() predictor = dlib.shape_predictor("shape_predictor_68_face_landmarks.dat") faces = detector(image) if len(faces) != 2: st.error("Se necesitan exactamente dos caras para realizar el intercambio.") return None landmarks1 = predictor(image, faces[0]) landmarks2 = predictor(image, faces[1]) points1 = np.array([[p.x, p.y] for p in landmarks1.parts()]) points2 = np.array([[p.x, p.y] for p in landmarks2.parts()]) mask1 = np.zeros(image.shape[:2], dtype=np.uint8) cv2.fillConvexPoly(mask1, cv2.convexHull(points1), 255) mask2 = np.zeros(image.shape[:2], dtype=np.uint8) cv2.fillConvexPoly(mask2, cv2.convexHull(points2), 255) face1 = cv2.bitwise_and(image, image, mask=mask1) face2 = cv2.bitwise_and(image, image, mask=mask2) image[mask1 == 255] = face2[mask1 == 255] image[mask2 == 255] = face1[mask2 == 255] swapped_image_path = DATA_PATH / f"swapped_image_{Path(image_path).stem}.jpg" cv2.imwrite(str(swapped_image_path), image) return str(swapped_image_path) except Exception as e: st.error(f"Error en el face swap: {e}") return None def main(): st.set_page_config(layout="wide") prompt = st.sidebar.text_input("Descripción de la imagen", max_chars=900) process_enhancer = st.sidebar.checkbox("Mejorar Prompt", value=False) language = st.sidebar.selectbox("Idioma", ["en", "es"]) basemodel = st.sidebar.selectbox("Modelo Base", ["black-forest-labs/FLUX.1-DEV", "black-forest-labs/FLUX.1-schnell"]) format_option = st.sidebar.selectbox("Formato", ["9:16", "16:9"]) process_upscale = st.sidebar.checkbox("Procesar Escalador", value=False) upscale_factor = st.sidebar.selectbox("Factor de Escala", [2, 4, 8], index=0) scales = st.sidebar.slider("Escalado", 1, 20, 10) steps = st.sidebar.slider("Pasos", 1, 100, 20) seed = st.sidebar.number_input("Semilla", value=-1) width, height = (720, 1280) if format_option == "9:16" else (1280, 720) if st.sidebar.button("Generar Imagen"): with st.spinner("Mejorando y generando imagen..."): result = asyncio.run(gen(prompt, basemodel, width, height, scales, steps, seed, upscale_factor, process_upscale, process_enhancer, language)) image_paths, prompt_file = result[0], result[1] st.write(f"Image paths: {image_paths}") if image_paths and Path(image_paths).exists(): st.image(image_paths, caption="Imagen generada", use_column_width=True) if prompt_file and Path(prompt_file).exists(): with open(prompt_file, "r") as file: st.text_area("Prompt utilizado", file.read(), height=150) st.sidebar.header("Galería de Imágenes") image_storage, usage = get_storage() st.sidebar.write(usage) for img_path in image_storage: st.sidebar.image(img_path, width=100) if st.sidebar.button("Borrar Imagen"): delete_image(image_paths) if st.sidebar.button("Intercambiar Caras"): if image_paths: swapped_path = swap_faces(image_paths) if swapped_path: st.image(swapped_path, caption="Imagen con caras intercambiadas", use_column_width=True) if __name__ == "__main__": main()