|
import torch |
|
import logging |
|
from transformers import TextIteratorStreamer, AutoProcessor, LlavaForConditionalGeneration |
|
from diffusers import DiffusionPipeline |
|
import gradio as gr |
|
import numpy as np |
|
from PIL import Image |
|
import threading |
|
import openai |
|
import os |
|
import spaces |
|
|
|
|
|
logging.basicConfig(level=logging.DEBUG) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
API_KEY = os.getenv('OPEN_AI_API_KEYS') |
|
|
|
DESCRIPTION = ''' |
|
<div> |
|
<h1 style="text-align: center;">Chimera πͺ</h1> |
|
<p>This contains a Stable Diffusor from <a href="https://huggingface.co/stabilityai/stable-diffusion-xl-base-1.0"><b>stabilityai/stable-diffusion-xl-base-1.0</b></a> and a Multimodal from <a href="https://huggingface.co/xtuner/llava-llama-3-8b-v1_1-transformers"><b>xtuner/llava-llama-3-8b-v1_1-transformers</b></a></p> |
|
</div> |
|
''' |
|
|
|
|
|
llava_model = LlavaForConditionalGeneration.from_pretrained( |
|
"xtuner/llava-llama-3-8b-v1_1-transformers", |
|
torch_dtype=torch.float16, |
|
low_cpu_mem_usage=True, |
|
) |
|
|
|
llava_model.to("cuda:0") |
|
|
|
processor = AutoProcessor.from_pretrained("xtuner/llava-llama-3-8b-v1_1-transformers") |
|
|
|
llava_model.generation_config.eos_token_id = 128009 |
|
|
|
|
|
base = DiffusionPipeline.from_pretrained( |
|
"stabilityai/stable-diffusion-xl-base-1.0", |
|
torch_dtype=torch.float16, |
|
variant="fp16", |
|
use_safetensors=True, |
|
) |
|
base.to('cuda') |
|
|
|
refiner = DiffusionPipeline.from_pretrained( |
|
"stabilityai/stable-diffusion-xl-base-1.0", |
|
text_encoder_2=base.text_encoder_2, |
|
vae=base.vae, |
|
torch_dtype=torch.float16, |
|
use_safetensors=True, |
|
variant="fp16", |
|
) |
|
refiner.to('cuda') |
|
|
|
class ModeManager: |
|
def __init__(self): |
|
self.mode = None |
|
|
|
def set_mode(self, mode): |
|
if mode == "chatting": |
|
self.mode = mode |
|
else: |
|
self.mode = mode |
|
|
|
def get_mode(self): |
|
return self.mode if self.mode is not None else "chatting" |
|
|
|
mode_manager = ModeManager() |
|
|
|
def multimodal_and_generation(message, history): |
|
""" |
|
Generates a response based on the input message and optionally an image. |
|
""" |
|
print(f"Message:\n{message}\nType:\n{type(message)}") |
|
image_path = None |
|
if "files" in message and message["files"]: |
|
if type(message["files"][-1]) == dict: |
|
image_path = message["files"][-1]["path"] |
|
else: |
|
image_path = message["files"][-1] |
|
else: |
|
for hist in history: |
|
if type(hist[0]) == tuple: |
|
image_path = hist[0][0] |
|
|
|
if image_path is None: |
|
input_prompt = message["text"] |
|
print(f"Input Prompt: {input_prompt}\nType: {type(input_prompt)}") |
|
client = openai.OpenAI(api_key=API_KEY) |
|
stream = client.chat.completions.create( |
|
model="gpt-3.5-turbo", |
|
messages=[{"role": "system", "content": "You are a helpful assistant called 'chimera'."}, |
|
{"role": "user", "content": input_prompt}], |
|
stream=True, |
|
) |
|
return stream |
|
else: |
|
prompt = f"user\n\n<image>\n{message['text']}assistant\n\n" |
|
image = Image.open(image_path) |
|
inputs = processor(prompt, image, return_tensors='pt').to(0, torch.float16) |
|
streamer = TextIteratorStreamer(processor.tokenizer, **{"skip_special_tokens": False, "skip_prompt": True}) |
|
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=1024, do_sample=False) |
|
|
|
thread = threading.Thread(target=llava_model.generate, kwargs=generation_kwargs) |
|
thread.start() |
|
|
|
return streamer |
|
|
|
def diffusing(prompt): |
|
""" |
|
Generates an image using Stable Diffusion based on the input prompt. |
|
""" |
|
image = base( |
|
prompt=prompt, |
|
num_inference_steps=40, |
|
denoising_end=0.8, |
|
output_type="latent", |
|
).images |
|
image = refiner( |
|
prompt=prompt, |
|
num_inference_steps=40, |
|
denoising_start=0.8, |
|
image=image |
|
).images[0] |
|
return image |
|
|
|
def check_cuda_availability(): |
|
if torch.cuda.is_available(): |
|
return f"GPU: {torch.cuda.get_device_name(0)}" |
|
else: |
|
return "No CUDA device found." |
|
|
|
mode = "" |
|
|
|
@spaces.GPU(duration=120) |
|
def bot_comms(message, history): |
|
""" |
|
Handles communication between Gradio and the models. |
|
""" |
|
|
|
mode = mode_manager.get_mode() |
|
logger.debug(f"bot_comms called with message: {message} and mode: {mode}") |
|
|
|
if message == "check cuda": |
|
logger.debug("Checking CUDA availability.") |
|
yield check_cuda_availability() |
|
return |
|
|
|
if message == "imagery": |
|
logger.debug("Switching to imagery mode.") |
|
mode_manager.set_mode("imagery") |
|
yield "Imagery On! Type your prompt to make the image πΌοΈ" |
|
return |
|
|
|
if message == "chatting": |
|
logger.debug("Switching to chatting mode.") |
|
mode_manager.set_mode("chatting") |
|
yield "Imagery Off. Ask me any questions. βοΈ" |
|
return |
|
|
|
if mode == "imagery": |
|
logger.debug("Processing imagery prompt.") |
|
image = diffusing(message) |
|
yield image |
|
return |
|
|
|
buffer = "" |
|
gpt_outputs = [] |
|
if mode == "chatting" or mode == "": |
|
logger.debug("On chatting or no mode.\n\n") |
|
stream = multimodal_and_generation(message, history) |
|
for chunk in stream: |
|
if chunk is not None and hasattr(chunk.choices[0].delta, "content"): |
|
text = chunk.choices[0].delta.content |
|
if text: |
|
|
|
gpt_outputs.append(text) |
|
buffer += text |
|
yield "".join(gpt_outputs) |
|
|
|
chatbot = gr.Chatbot(height=600, label="Chimera AI") |
|
chat_input = gr.MultimodalTextbox(interactive=True, file_types=["images"], placeholder="Enter your question or upload an image.", show_label=False) |
|
|
|
with gr.Blocks(fill_height=True) as demo: |
|
gr.Markdown(DESCRIPTION) |
|
gr.ChatInterface( |
|
fn=bot_comms, |
|
chatbot=chatbot, |
|
fill_height=True, |
|
multimodal=True, |
|
textbox=chat_input, |
|
) |
|
|
|
if __name__ == "__main__": |
|
demo.launch() |
|
|