|
import torch |
|
import logging |
|
from transformers import TextIteratorStreamer, AutoProcessor, LlavaForConditionalGeneration |
|
from diffusers import DiffusionPipeline |
|
import gradio as gr |
|
import numpy as np |
|
from PIL import Image, ImageDraw |
|
import threading |
|
import openai |
|
import os |
|
import spaces |
|
|
|
|
|
logging.basicConfig(level=logging.DEBUG) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
API_KEY = os.getenv('OPEN_AI_API_KEYS') |
|
|
|
DESCRIPTION = ''' |
|
<div> |
|
<h1 style="text-align: center;">Chimera Image Generation</h1> |
|
<p style="text-align: center;">This contains a Stable Diffusor from <a href="https://huggingface.co/stabilityai/stable-diffusion-xl-base-1.0"><b>stabilityai/stable-diffusion-xl-base-1.0</b></a></p> |
|
</div> |
|
''' |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
llava_model = LlavaForConditionalGeneration.from_pretrained( |
|
"xtuner/llava-llama-3-8b-v1_1-transformers", |
|
torch_dtype=torch.float16, |
|
low_cpu_mem_usage=True, |
|
) |
|
|
|
llava_model.to("cuda:0") |
|
|
|
processor = AutoProcessor.from_pretrained("xtuner/llava-llama-3-8b-v1_1-transformers") |
|
|
|
llava_model.generation_config.eos_token_id = 128009 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
base = DiffusionPipeline.from_pretrained("stabilityai/stable-diffusion-xl-base-1.0", torch_dtype=torch.float16, use_safetensors=True, variant="fp16").to("cuda:0") |
|
refiner = DiffusionPipeline.from_pretrained("stabilityai/stable-diffusion-xl-refiner-1.0", |
|
text_encoder_2=base.text_encoder_2, |
|
vae=base.vae, |
|
torch_dtype=torch.float16, |
|
use_safetensor=True, |
|
variant="fp16").to("cuda:0") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
chat_mode = {} |
|
|
|
def multimodal_and_generation(message, history): |
|
""" |
|
Generates a response based on the input message and optionally an image. |
|
""" |
|
global chat_mode |
|
image_path = None |
|
if "files" in message and message["files"]: |
|
if type(message["files"][-1]) == dict: |
|
image_path = message["files"][-1]["path"] |
|
else: |
|
image_path = message["files"][-1] |
|
else: |
|
for hist in history: |
|
if type(hist[0]) == tuple: |
|
image_path = hist[0][0] |
|
|
|
if image_path is None: |
|
chat_mode["mode"] = "text" |
|
input_prompt = message if isinstance(message, str) else message.get("text", "") |
|
client = openai.OpenAI(api_key=API_KEY) |
|
stream = client.chat.completions.create( |
|
model="gpt-3.5-turbo", |
|
messages=[{"role": "system", "content": "You are a helpful assistant called 'chimera'."}, |
|
{"role": "user", "content": input_prompt}], |
|
stream=True, |
|
) |
|
return stream |
|
else: |
|
chat_mode["mode"] = "image" |
|
prompt = f"user\n\n<image>\n{message['text']}assistant\n\n" |
|
image = Image.open(image_path) |
|
inputs = processor(prompt, image, return_tensors='pt').to(0, torch.float16) |
|
streamer = TextIteratorStreamer(processor.tokenizer, **{"skip_special_tokens": False, "skip_prompt": True}) |
|
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=1024, do_sample=False) |
|
|
|
thread = threading.Thread(target=llava_model.generate, kwargs=generation_kwargs) |
|
thread.start() |
|
|
|
return streamer |
|
|
|
|
|
@spaces.GPU(duration=120) |
|
def diffusing(prompt: str, |
|
history): |
|
""" |
|
Takes input, passes it into the pipeline, |
|
get the top 5 scores, and ouput those scores into images |
|
""" |
|
|
|
|
|
image_base = base( |
|
prompt=prompt, |
|
num_inference_steps=40, |
|
denoising_end=0.8, |
|
output_type="latent" |
|
).images |
|
image = refiner( |
|
prompt=prompt, |
|
num_inference_steps=40, |
|
denoising_start=0.8, |
|
image=image_base |
|
).images[0] |
|
|
|
return image |
|
|
|
def check_cuda_availability(): |
|
if torch.cuda.is_available(): |
|
return f"GPU: {torch.cuda.get_device_name(0)}" |
|
else: |
|
return "No CUDA device found." |
|
|
|
|
|
image_created = {} |
|
|
|
@spaces.GPU(duration=120) |
|
def bot_comms(message, history): |
|
""" |
|
Handles communication between Gradio and the models. |
|
""" |
|
|
|
|
|
if not isinstance(message, dict): |
|
message = {"text": message} |
|
|
|
if message["text"] == "check cuda": |
|
logger.debug("Checking CUDA availability.") |
|
return check_cuda_availability() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
buffer = "" |
|
gpt_outputs = [] |
|
|
|
|
|
stream = multimodal_and_generation(message, history) |
|
if chat_mode == "text": |
|
for chunk in stream: |
|
if chunk is not None and hasattr(chunk.choices[0].delta, "content"): |
|
logger.debug("\n\nFound the chunk in stream for gpt-3.5\n\n") |
|
text = chunk.choices[0].delta.content |
|
if text: |
|
gpt_outputs.append(text) |
|
buffer += text |
|
yield "".join(gpt_outputs) |
|
if chat_mode == "image": |
|
for new_text in stream: |
|
if "<|eot_id|>" in new_text: |
|
new_text = new_text.split("<|eot_id|>")[0] |
|
buffer += new_text |
|
generate_text = buffer |
|
yield generate_text |
|
|
|
chat_input = gr.MultimodalTextbox(interactive=True, file_types=["images"], placeholder="Enter your question or upload an image.", show_label=False) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks(fill_height=True) as demo: |
|
with gr.Row(): |
|
|
|
with gr.Column(): |
|
gr.Markdown(DESCRIPTION) |
|
image_prompt = gr.Textbox(label="Image Prompt") |
|
output_image = gr.Image(label="Generated Image") |
|
generate_image_button = gr.Button("Generate Image") |
|
generate_image_button.click(fn=diffusing, inputs=image_prompt, outputs=output_image) |
|
|
|
with gr.Column(): |
|
|
|
gr.Markdown(''' |
|
<div> |
|
<h1 style="text-align: center;">Chimera Text Generation</h1> |
|
<p style="text-align: center;">This contains a Generative LLM from <a href="https://openai.com/"><b>Open AI</b></a> called GPT-3.5-Turbo</p> |
|
</div> |
|
''') |
|
chat = gr.ChatInterface(fn=bot_comms, |
|
multimodal=True, |
|
textbox=chat_input, |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
demo.launch() |