|
import os |
|
import openai |
|
import chromadb |
|
import numpy as np |
|
from dotenv import load_dotenv |
|
import gradio as gr |
|
import logging |
|
from huggingface_hub import HfApi, HfFolder |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
class OpenAIChatbot: |
|
def __init__(self, api_key): |
|
self.embedding_model = "text-embedding-3-large" |
|
self.chat_model = "gpt-4o" |
|
self.api_key = api_key |
|
|
|
def get_response(self, prompt): |
|
"""Get a response from OpenAI GPT-4 model.""" |
|
try: |
|
openai.api_key = self.api_key |
|
response = openai.chat.completions.create( |
|
model=self.chat_model, |
|
messages=[ |
|
{"role": "system", "content": "You are a helpful AI assistant."}, |
|
{"role": "user", "content": prompt} |
|
] |
|
) |
|
|
|
return response.choices[0].message.content |
|
except Exception as e: |
|
print(f"Error generating response: {e}") |
|
return "Error: Unable to generate a response." |
|
|
|
def text_to_embedding(self, text): |
|
"""Convert text to embedding using OpenAI embedding model.""" |
|
try: |
|
openai.api_key = self.api_key |
|
response = openai.embeddings.create( |
|
model=self.embedding_model, |
|
input=text |
|
) |
|
|
|
embedding = np.array(response.data[0].embedding) |
|
print(f"Generated embedding for text: {text}") |
|
return embedding |
|
except Exception as e: |
|
print(f"Error generating embedding: {e}") |
|
return None |
|
|
|
|
|
class LocalEmbeddingStore: |
|
def __init__(self, storage_dir="./chromadb_storage_openai_upgrade"): |
|
|
|
self.client = chromadb.PersistentClient(path=storage_dir) |
|
self.collection_name = "chatbot_docs" |
|
|
|
|
|
self.collection = self.client.get_or_create_collection(name=self.collection_name) |
|
|
|
def search_embedding(self, query_embedding, num_results=3): |
|
"""Search for the most relevant document based on embedding similarity.""" |
|
if query_embedding.shape[0] != 3072: |
|
raise ValueError("Query embedding dimensionality must be 3072.") |
|
|
|
print(f"Query embedding: {query_embedding}") |
|
results = self.collection.query( |
|
query_embeddings=[query_embedding.tolist()], |
|
n_results=num_results |
|
) |
|
print(f"Search results: {results}") |
|
return results['documents'], results['distances'] |
|
|
|
def add_embedding(self, document_id, embedding, metadata=None): |
|
"""Add a new embedding to the collection.""" |
|
self.collection.add( |
|
ids=[document_id], |
|
embeddings=[embedding.tolist()], |
|
metadatas=[metadata] if metadata else None |
|
) |
|
print(f"Added embedding for document ID: {document_id}") |
|
|
|
|
|
class RAGSystem: |
|
def __init__(self, openai_client, embedding_store): |
|
self.openai_client = openai_client |
|
self.embedding_store = embedding_store |
|
|
|
def get_most_relevant_document(self, query_embedding, similarity_threshold=0.7): |
|
"""Retrieve the most relevant document based on cosine similarity.""" |
|
docs, distances = self.embedding_store.search_embedding(query_embedding) |
|
|
|
if not docs or not distances or distances[0][0] < similarity_threshold: |
|
print("No relevant documents found or similarity is too low.") |
|
return None, None |
|
return docs[0], distances[0][0] |
|
|
|
def chat_with_rag(self, user_input): |
|
"""Handle the RAG process.""" |
|
query_embedding = self.openai_client.text_to_embedding(user_input) |
|
if query_embedding is None or query_embedding.size == 0: |
|
return "Failed to generate embeddings." |
|
|
|
context_document_id, similarity_score = self.get_most_relevant_document(query_embedding) |
|
if not context_document_id: |
|
return "No relevant documents found." |
|
|
|
|
|
context_metadata = f"Metadata for {context_document_id}" |
|
|
|
prompt = f"""Context (similarity score {similarity_score:.2f}): |
|
{context_metadata} |
|
|
|
User: {user_input} |
|
AI:""" |
|
return self.openai_client.get_response(prompt) |
|
|
|
|
|
def chat_ui(user_input, api_key, chat_history): |
|
"""Handle chat interactions and update history.""" |
|
if not api_key.strip(): |
|
return "Please provide your OpenAI API key before proceeding." |
|
|
|
|
|
chatbot = OpenAIChatbot(api_key) |
|
embedding_store = LocalEmbeddingStore(storage_dir="./chromadb_storage_openai_upgrade") |
|
rag_system = RAGSystem(openai_client=chatbot, embedding_store=embedding_store) |
|
|
|
if not user_input.strip(): |
|
return chat_history |
|
ai_response = rag_system.chat_with_rag(user_input) |
|
chat_history.append((user_input, ai_response)) |
|
return chat_history |
|
|
|
|
|
with gr.Blocks() as demo: |
|
api_key_input = gr.Textbox(label="Enter your OpenAI API Key", placeholder="API Key here...", type="password") |
|
chat_history = gr.Chatbot(label="OpenAI Chatbot with RAG", elem_id="chatbox") |
|
user_input = gr.Textbox(placeholder="Enter your prompt here...") |
|
submit_button = gr.Button("Submit") |
|
submit_button.click(chat_ui, inputs=[user_input, api_key_input, chat_history], outputs=chat_history) |
|
|
|
if __name__ == "__main__": |
|
|
|
hf_token = os.getenv("HUGGINGFACE_TOKEN") |
|
if not hf_token: |
|
raise ValueError("Hugging Face token not found in environment variables.") |
|
|
|
|
|
os.environ["HUGGINGFACE_TOKEN"] = hf_token |
|
|
|
|
|
try: |
|
api = HfApi() |
|
api.whoami(token=hf_token) |
|
print("Hugging Face token is valid and has the necessary permissions.") |
|
except Exception as e: |
|
print(f"Error verifying Hugging Face token: {e}") |
|
raise ValueError("Invalid Hugging Face token or insufficient permissions.") |
|
|
|
demo.launch() |
|
|