import torch | |
import logging | |
from transformers import TextIteratorStreamer, AutoProcessor, LlavaForConditionalGeneration | |
from diffusers import DiffusionPipeline | |
import gradio as gr | |
import numpy as np | |
from PIL import Image, ImageDraw | |
import threading | |
import openai | |
import os | |
import spaces | |
# Setup logging | |
logging.basicConfig(level=logging.DEBUG) | |
logger = logging.getLogger(__name__) | |
# Retrieve the OpenAI API key from the environment | |
API_KEY = os.getenv('OPEN_AI_API_KEYS') | |
DESCRIPTION = ''' | |
<div> | |
<h1 style="text-align: center;">Chimera Image Generation</h1> | |
<p style="text-align: center;">This contains a Stable Diffusor from <a href="https://huggingface.co/stabilityai/stable-diffusion-xl-base-1.0"><b>stabilityai/stable-diffusion-xl-base-1.0</b></a></p> | |
</div> | |
''' | |
# DESCRIPTION = ''' | |
# <div> | |
# <h1 style="text-align: center;">Chimera Image Generation</h1> | |
# <p>This contains a Stable Diffusor from <a href="https://huggingface.co/stabilityai/stable-diffusion-xl-base-1.0"><b>stabilityai/stable-diffusion-xl-base-1.0</b></a> and a Multimodal from <a href="https://huggingface.co/xtuner/llava-llama-3-8b-v1_1-transformers"><b>xtuner/llava-llama-3-8b-v1_1-transformers</b></a></p> | |
# </div> | |
# ''' | |
# Initialize the models | |
# llava_model = LlavaForConditionalGeneration.from_pretrained( | |
# "xtuner/llava-llama-3-8b-v1_1-transformers", | |
# torch_dtype=torch.float16, | |
# low_cpu_mem_usage=True, | |
# ) | |
# llava_model.to("cuda:0") | |
# processor = AutoProcessor.from_pretrained("xtuner/llava-llama-3-8b-v1_1-transformers") | |
# llava_model.generation_config.eos_token_id = 128009 | |
# # Initialize Stable Diffusion pipelines | |
# base = DiffusionPipeline.from_pretrained( | |
# "stabilityai/stable-diffusion-xl-base-1.0", | |
# torch_dtype=torch.float16, | |
# variant="fp16", | |
# use_safetensors=True, | |
# ) | |
# base.to('cuda') | |
# refiner = DiffusionPipeline.from_pretrained( | |
# "stabilityai/stable-diffusion-xl-base-1.0", | |
# text_encoder_2=base.text_encoder_2, | |
# vae=base.vae, | |
# torch_dtype=torch.float16, | |
# use_safetensors=True, | |
# variant="fp16", | |
# ) | |
# refiner.to('cuda') | |
# load both base and refiner | |
base = DiffusionPipeline.from_pretrained("stabilityai/stable-diffusion-xl-base-1.0", torch_dtype=torch.float16, use_safetensors=True, variant="fp16").to("cuda:0") | |
refiner = DiffusionPipeline.from_pretrained("stabilityai/stable-diffusion-xl-refiner-1.0", | |
text_encoder_2=base.text_encoder_2, | |
vae=base.vae, | |
torch_dtype=torch.float16, | |
use_safetensor=True, | |
variant="fp16").to("cuda:0") | |
class ModeManager: | |
def __init__(self): | |
self.mode = None | |
def set_mode(self, mode): | |
if mode == "chatting": | |
self.mode = mode | |
else: | |
self.mode = mode | |
def get_mode(self): | |
return self.mode if self.mode is not None else "chatting" | |
mode_manager = ModeManager() | |
def multimodal_and_generation(message, history): | |
""" | |
Generates a response based on the input message and optionally an image. | |
""" | |
# image_path = None | |
# if "files" in message and message["files"]: | |
# if type(message["files"][-1]) == dict: | |
# image_path = message["files"][-1]["path"] | |
# else: | |
# image_path = message["files"][-1] | |
# else: | |
# for hist in history: | |
# if type(hist[0]) == tuple: | |
# image_path = hist[0][0] | |
# if image_path is None: | |
input_prompt = message if isinstance(message, str) else message.get("text", "") | |
client = openai.OpenAI(api_key=API_KEY) | |
stream = client.chat.completions.create( | |
model="gpt-3.5-turbo", | |
messages=[{"role": "system", "content": "You are a helpful assistant called 'chimera'."}, | |
{"role": "user", "content": input_prompt}], | |
stream=True, | |
) | |
return stream | |
# else: | |
# prompt = f"user\n\n<image>\n{message['text']}assistant\n\n" | |
# image = Image.open(image_path) | |
# inputs = processor(prompt, image, return_tensors='pt').to(0, torch.float16) | |
# streamer = TextIteratorStreamer(processor.tokenizer, **{"skip_special_tokens": False, "skip_prompt": True}) | |
# generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=1024, do_sample=False) | |
# thread = threading.Thread(target=llava_model.generate, kwargs=generation_kwargs) | |
# thread.start() | |
# return streamer | |
# function to take input and generate text tokena | |
def diffusing(prompt: str, | |
history): | |
""" | |
Takes input, passes it into the pipeline, | |
get the top 5 scores, and ouput those scores into images | |
""" | |
# n_steps = int(n_steps) | |
# Generate image based on text | |
image_base = base( | |
prompt=prompt, | |
num_inference_steps=40, | |
denoising_end=0.8, | |
output_type="latent" | |
).images | |
image = refiner( | |
prompt=prompt, | |
num_inference_steps=40, | |
denoising_start=0.8, | |
image=image_base | |
).images[0] | |
return image | |
def check_cuda_availability(): | |
if torch.cuda.is_available(): | |
return f"GPU: {torch.cuda.get_device_name(0)}" | |
else: | |
return "No CUDA device found." | |
mode = "" | |
# logger.debug(f"\n\nthis is the mode before calling it in bot_comms: {mode}\n\n") | |
# Image created from diffusing | |
image_created = {} | |
def bot_comms(message, history): | |
""" | |
Handles communication between Gradio and the models. | |
""" | |
# ensures message is a dictionary | |
if not isinstance(message, dict): | |
message = {"text": message} | |
if message["text"] == "check cuda": | |
logger.debug("Checking CUDA availability.") | |
return check_cuda_availability() | |
# if message["text"] == "imagery": | |
# logger.debug("Switching to imagery mode.") | |
# # mode_manager.set_mode("imagery") | |
# mode += "imagery" | |
# # logger.debug(f"\nimagery mode: {mode}\n") | |
# return "Imagery On! Type your prompt to make the image πΌοΈ" | |
# if message["text"] == "chatting": | |
# logger.debug("Switching to chatting mode.") | |
# # mode_manager.set_mode("chatting") | |
# mode += "chatting" | |
# # logger.debug(f"\nchatting mode: {mode}\n") | |
# return "Imagery Off. Ask me any questions. βοΈ" | |
# if mode == "imagery": | |
# logger.debug("Processing imagery prompt.") | |
# if isinstance(message, dict) and "text" in message: | |
# message = message["text"] | |
# image = diffusing(message) | |
# # mode_gradio("imagery") | |
# image_created["Image"] = image | |
# return image | |
buffer = "" | |
gpt_outputs = [] | |
# if mode == "chatting" or mode == "": | |
# logger.debug("On chatting or no mode.\n\n") | |
stream = multimodal_and_generation(message, history) | |
mode += "chatting" | |
for chunk in stream: | |
if chunk is not None and hasattr(chunk.choices[0].delta, "content"): | |
logger.debug("\n\nFound the chunk in stream for gpt-3.5\n\n") | |
text = chunk.choices[0].delta.content | |
if text: | |
gpt_outputs.append(text) | |
buffer += text | |
yield "".join(gpt_outputs) | |
chatbot = gr.Chatbot(height=600, label="Chimera AI") | |
# chat_input = gr.MultimodalTextbox(interactive=True, file_types=["images"], placeholder="Enter your question or upload an image.", show_label=False) | |
# with gr.Blocks(fill_height=True) as demo: | |
# gr.Markdown(DESCRIPTION) | |
# # image_output = gr.Image(type="pil", label="Generated Image") | |
# # def process_response(message, history): | |
# # response = bot_comms(message, history) | |
# # if isinstance(response, tuple) and len(response) == 2: | |
# # text, image = response | |
# # return text, image | |
# # return response, None | |
# # chatbot_output = gr.Chatbot(height=600, label="Chimera AI") | |
# # chat_input.submit(process_response, inputs=[chat_input, chatbot], outputs=[chatbot_output, image_output]) | |
# if mode_manager.get_mode() == "imagery": | |
# # # Ensure's a unique block ID for image output | |
# gr.Interface( | |
# fn=diffusing, | |
# inputs="text", | |
# outputs="image", | |
# fill_height=True, | |
# ) | |
# # with gr.Blocks(): | |
# # gr.Interface( | |
# # fn=diffusing, | |
# # inputs='text', | |
# # outputs='image', | |
# # fill_height=True, | |
# # ) | |
# # # Customize chatinterface to handle tuples | |
# # # def custom_fn(*args, **kwargs): | |
# # # result = list(bot_comms(*args, **kwargs)) | |
# # # output = [] | |
# # # for item in result: | |
# # # if isinstance(item, tuple) and isinstance(item[1], Image.Image): | |
# # # output.append((item[0], None)) | |
# # # output.append((None, item[1])) | |
# # # else: | |
# # # output.append(item) | |
# # # return output | |
# # else: | |
# # # Unique block ID for chat interface | |
# # with gr.Blocks(): | |
# # gr.ChatInterface( | |
# # fn=bot_comms, | |
# # chatbot=chatbot, | |
# # fill_height=True, | |
# # multimodal=True, | |
# # textbox=chat_input, | |
# # ) | |
# if __name__ == "__main__": | |
# demo.launch() | |
with gr.Blocks(fill_height=True) as demo: | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown(DESCRIPTION) | |
image_prompt = gr.Textbox(label="Image Prompt") | |
output_image = gr.Image(label="Generated Image") | |
generate_image_button = gr.Button("Generate Image") | |
generate_image_button.click(fn=diffusing, inputs=image_prompt, outputs=output_image) | |
with gr.Column(): | |
gr.Markdown(''' | |
<div> | |
<h1 style="text-align: center;">Chimera Text Generation</h1> | |
<p style="text-align: center;">This contains a Generative LLM from <a href="https://openai.com/"><b>Open AI</b></a> called GPT-3.5-Turbo</p> | |
</div> | |
''') | |
# text_prompt = gr.Textbox(label="Text Prompt") | |
# text_output = gr.Textbox(label="Generated Text") | |
# generate_text_button = gr.Button("Generated Text") | |
# generate_text_button.click(fn=bot_comms, inputs=text_prompt, outputs=text_output) | |
chat = gr.ChatInterface(fn=bot_comms) | |
# gr.Markdown(DESCRIPTION) | |
# if mode == "chatting": | |
# gr.Interface( | |
# fn=bot_comms, | |
# inputs="text", | |
# outputs="text", | |
# fill_height=True, | |
# ) | |
# g | |
# # logger.debug(f"\n|now on chat interface|\n") | |
# gr.ChatInterface( | |
# fn=bot_comms, | |
# chatbot=chatbot, | |
# fill_height=True, | |
# multimodal=True, | |
# textbox=chat_input, | |
# ) | |
# if __name__ == "__main__": | |
# demo.launch() | |
demo.launch() |