|
import torch |
|
from transformers import TextIteratorStreamer, AutoProcessor, LlavaForConditionalGeneration |
|
from diffusers import DiffusionPipeline |
|
import gradio as gr |
|
import numpy as np |
|
import accelerate |
|
import spaces |
|
from PIL import Image |
|
import threading |
|
from openai import OpenAI |
|
import os |
|
import asyncio |
|
from typing import Any |
|
|
|
API_KEY = os.getenv('OPEN_AI_API_KEYS') |
|
|
|
DESCRIPTION = ''' |
|
<div> |
|
<h1 style="text-align: center;">Chimera πͺ</h1> |
|
<p>This contains a Stable Diffusor from <a href="https://huggingface.co/stabilityai/stable-diffusion-xl-base-1.0"><b>stabilityai/stable-diffusion-xl-base-1.0</b></a> and a Multimodal from <a href="https://huggingface.co/xtuner/llava-llama-3-8b-v1_1-transformers"><b>xtuner/llava-llama-3-8b-v1_1-transformers</b></a></p> |
|
</div> |
|
''' |
|
|
|
|
|
llava_model = LlavaForConditionalGeneration.from_pretrained( |
|
"xtuner/llava-llama-3-8b-v1_1-transformers", |
|
torch_dtype=torch.float16, |
|
low_cpu_mem_usage=True, |
|
) |
|
|
|
llava_model.to("cuda:0") |
|
processor = AutoProcessor.from_pretrained("xtuner/llava-llama-3-8b-v1_1-transformers") |
|
llava_model.generation_config.eos_token_id = 128009 |
|
|
|
|
|
base = DiffusionPipeline.from_pretrained( |
|
"stabilityai/stable-diffusion-xl-base-1.0", |
|
torch_dtype=torch.float16, |
|
variant="fp16", |
|
use_safetensors=True, |
|
) |
|
base.to('cuda') |
|
|
|
refiner = DiffusionPipeline.from_pretrained( |
|
"stabilityai/stable-diffusion-xl-base-1.0", |
|
text_encoder_2=base.text_encoder_2, |
|
vae=base.vae, |
|
torch_dtype=torch.float16, |
|
use_safetensors=True, |
|
variant="fp16", |
|
) |
|
refiner.to('cuda') |
|
|
|
def multimodal_and_generation(message, history): |
|
print(f"Message:\n{message}\nType:\n{type(message)}") |
|
image_path = None |
|
if message["files"]: |
|
if isinstance(message["files"][-1], dict): |
|
image_path = message["files"][-1]["path"] |
|
else: |
|
image_path = message["files"][-1] |
|
else: |
|
for hist in history: |
|
if isinstance(hist[0], tuple): |
|
image_path = hist[0][0] |
|
|
|
if image_path is None: |
|
input_prompt = message["text"] |
|
client = OpenAI(api_key=API_KEY) |
|
stream = client.chat.completions.create( |
|
model="gpt-3.5-turbo", |
|
messages=[ |
|
{"role": "system", "content": "You are a helpful assistant called 'chimera'."}, |
|
{"role": "user", "content": input_prompt} |
|
], |
|
stream=True, |
|
) |
|
return stream |
|
else: |
|
prompt = f"user\n\n<image>\n{message['text']}assistant\n\n" |
|
image = Image.open(image_path) |
|
inputs = processor(prompt, image, return_tensors='pt').to(0, torch.float16) |
|
streamer = TextIteratorStreamer(processor.tokenizer, **{"skip_special_tokens": False, "skip_prompt": True}) |
|
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=1024, do_sample=False) |
|
|
|
thread = threading.Thread(target=llava_model.generate, kwargs=generation_kwargs) |
|
thread.start() |
|
|
|
return streamer |
|
|
|
def diffusing(prompt): |
|
image = base( |
|
prompt=prompt, |
|
num_inference_steps=40, |
|
denoising_end=0.8, |
|
output_type="latent", |
|
).images |
|
image = refiner( |
|
prompt=prompt, |
|
num_inference_steps=40, |
|
denoising_start=0.8, |
|
image=image |
|
).images[0] |
|
return image |
|
|
|
def check_cuda_availability(): |
|
if torch.cuda.is_available(): |
|
result = f"GPU: {torch.cuda.get_device_name(0)}" |
|
return result |
|
else: |
|
return "No CUDA device found." |
|
|
|
mode = "" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@spaces.GPU(duration=120) |
|
async def bot_comms_async(message, history): |
|
global mode |
|
|
|
if message == "check cuda": |
|
result = check_cuda_availability() |
|
return [result] |
|
|
|
if message == "imagery": |
|
mode = message |
|
return ["Imagery On! Type your prompt to make the image πΌοΈ"] |
|
|
|
if message == "chatting": |
|
mode = message |
|
return ["Imagery Off. Ask me any questions. βοΈ"] |
|
|
|
if mode == "imagery": |
|
print("On imagery\n\n") |
|
image = diffusing(prompt=message) |
|
return [image] |
|
|
|
if mode == "chatting" or mode == "": |
|
print("On chatting or no mode.\n\n") |
|
stream = multimodal_and_generation(message=message, history=history) |
|
gpt_outputs = [] |
|
async for chunk in stream: |
|
if chunk.choices[0].delta.content is not None: |
|
text = chunk.choices[0].delta.content |
|
gpt_outputs.append(text) |
|
return ["".join(gpt_outputs)] |
|
|
|
def bot_comms(message: str, history: Any): |
|
return asyncio.run(bot_comms_async(message, history)) |
|
|
|
|
|
import gradio as gr |
|
|
|
with gr.Blocks() as demo: |
|
chatbot = gr.Chatbot() |
|
msg = gr.Textbox() |
|
with gr.Row(): |
|
submit = gr.Button("Submit") |
|
|
|
def user(message, history): |
|
return "", history + [[message, None]] |
|
|
|
def bot_response(message, history): |
|
response = bot_comms(message, history) |
|
return history + [[message, response]] |
|
|
|
msg.submit(user, [msg, chatbot], [msg, chatbot], queue=False).then( |
|
bot_response, [msg, chatbot], [msg, chatbot] |
|
) |
|
submit.click(user, [msg, chatbot], [msg, chatbot], queue=False).then( |
|
bot_response, [msg, chatbot], [msg, chatbot] |
|
) |
|
|
|
if __name__ == "__main__": |
|
demo.launch(share=True) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|