|
import gradio as gr |
|
from transformers import AutoModelForCausalLM, AutoTokenizer |
|
from peft import PeftModel |
|
from dotenv import load_dotenv |
|
import torch |
|
import cohere |
|
from pinecone import Pinecone, ServerlessSpec |
|
import os |
|
|
|
|
|
load_dotenv() |
|
|
|
max_seq_length = 2048 |
|
dtype = None |
|
load_in_4bit = True |
|
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") |
|
hf_token = os.getenv("HF_TOKEN") |
|
|
|
class ModelInterface: |
|
def __init__(self): |
|
|
|
model_name = "Aradhya15/Mistral7b_hypertuned" |
|
base_model_name = "mistralai/Mistral-7B-v0.1" |
|
|
|
base_model = AutoModelForCausalLM.from_pretrained(base_model_name, use_auth_token=hf_token) |
|
|
|
|
|
self.tokenizer = AutoTokenizer.from_pretrained(base_model_name, use_auth_token=hf_token) |
|
self.tokenizer.pad_token = self.tokenizer.eos_token |
|
|
|
|
|
self.model = PeftModel.from_pretrained(base_model, model_name) |
|
if torch.cuda.is_available(): |
|
print("GPU is available and ready to use:", torch.cuda.get_device_name(0)) |
|
else: |
|
print("GPU not detected; using CPU.") |
|
|
|
if torch.cuda.is_available(): |
|
device = torch.device("cuda") |
|
else: |
|
device = torch.device("cpu") |
|
|
|
|
|
self.model = self.model.half() |
|
|
|
|
|
self.model = self.model.to(device) |
|
|
|
print(f"Model is moved to {device}") |
|
|
|
|
|
self.cohere_client = cohere.Client(api_key=os.getenv("COHERE_API_KEY")) |
|
|
|
|
|
pc = Pinecone(api_key=os.getenv("PINECONE_API_KEY")) |
|
self.index = pc.Index("cohere-pinecone-tree") |
|
|
|
def generate_response(self, query): |
|
try: |
|
|
|
response = self.cohere_client.embed( |
|
texts=[query], |
|
model="embed-english-light-v2.0" |
|
) |
|
query_embedding = response.embeddings[0] |
|
|
|
|
|
results = self.index.query( |
|
vector=query_embedding, |
|
top_k=5, |
|
include_metadata=True |
|
) |
|
retrieved_context = "\n".join( |
|
[result["metadata"]["text"] for result in results["matches"]] |
|
) |
|
|
|
|
|
messages = [ |
|
{"role": "system", "content": f"Context: {retrieved_context}"}, |
|
{"role": "user", "content": query}, |
|
] |
|
|
|
|
|
inputs = self.tokenizer( |
|
f"Context: {retrieved_context}\nUser: {query}", |
|
return_tensors="pt", |
|
truncation=True, |
|
padding=True, |
|
max_length=max_seq_length |
|
).to("cuda") |
|
|
|
|
|
outputs = self.model.generate( |
|
inputs["input_ids"], |
|
max_new_tokens=64, |
|
temperature=0.3, |
|
top_p=0.9, |
|
repetition_penalty=1.2 |
|
) |
|
|
|
response_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True) |
|
return response_text |
|
|
|
except Exception as e: |
|
return f"Error generating response: {str(e)}" |
|
|
|
|
|
def create_interface(): |
|
interface = ModelInterface() |
|
|
|
def predict(message): |
|
return interface.generate_response(message) |
|
|
|
iface = gr.Interface( |
|
fn=predict, |
|
inputs=gr.Textbox(label="Enter your question"), |
|
outputs=gr.Textbox(label="Response"), |
|
title="RAG-Enhanced LLM Assistant", |
|
description="Ask a question and get a response enhanced with retrieved context.", |
|
examples=[["What are the best practices for tree planting?"], ["How can I improve soil quality in my garden?"]] |
|
) |
|
|
|
return iface |
|
|
|
|
|
if __name__ == "__main__": |
|
iface = create_interface() |
|
iface.launch() |
|
|
|
|
|
|
|
|
|
|