Spaces:
Sleeping
Sleeping
File size: 5,770 Bytes
75f3f8a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
from fastapi import FastAPI, UploadFile,File,HTTPException
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from dotenv import load_dotenv
from langchain_community.document_loaders import PyMuPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_groq import ChatGroq
from langchain_pinecone import PineconeVectorStore
from langchain_core.runnables import RunnablePassthrough
from pathlib import Path
import uvicorn
import shutil
import os
import hashlib
from pinecone import Pinecone
import fitz
import pytesseract
from PIL import Image
from langchain.schema import Document
import io
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
UPLOAD_DIR = "uploads"
os.makedirs(UPLOAD_DIR, exist_ok=True)
index_name = "pinecone-chatbot"
load_dotenv()
os.environ["HF_TOKEN"] = os.getenv("HF_TOKEN")
os.environ["PINECONE_API_KEY"] = os.getenv("PINECONE_API_KEY")
os.environ["GROQ_API_KEY"] = os.getenv("GROQ_API_KEY")
llm = ChatGroq(model_name = "Llama3-8b-8192")
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
prompt = '''You are given a context below. Use it to answer the question that follows.
Provide a concise and factual response. If the answer is not in the context, simply state "I don't know based on context provided."
<context>
{context}
</context>
Question: {question}
Answer:'''
parser = StrOutputParser()
pc = Pinecone(api_key=os.environ.get("PINECONE_API_KEY"))
index = pc.Index(name=index_name)
def generate_file_id(file_path):
hasher = hashlib.md5()
with open(file_path, "rb") as f:
hasher.update(f.read())
return hasher.hexdigest()
def delete_existing_embedding(file_id):
index_stats = index.describe_index_stats()
if index_stats["total_vector_count"] > 0:
index.delete(delete_all=True)
def tempUploadFile(filePath,file):
with open(filePath,'wb') as buffer:
shutil.copyfileobj(file.file, buffer)
def loadAndSplitDocuments(filePath):
loader = PyMuPDFLoader(filePath)
docs = loader.load()
splitter = RecursiveCharacterTextSplitter(chunk_size=2000, chunk_overlap=500)
final_chunks = splitter.split_documents(docs)
return final_chunks
def loadAndSplitPdfFile(filePath):
doc = fitz.open(filePath)
documents = []
for i, page in enumerate(doc):
text = page.get_text("text") # Extract text from page
metadata = {"source": filePath, "page": i + 1}
if text.strip():
documents.append(Document(page_content=text, metadata=metadata))
# Extract and process images with OCR
images = page.get_images(full=True)
for img_index, img in enumerate(images):
xref = img[0]
base_image = doc.extract_image(xref)
image_bytes = base_image["image"]
img = Image.open(io.BytesIO(image_bytes))
# Perform OCR on the image
ocr_text = pytesseract.image_to_string(img)
if ocr_text.strip():
img_metadata = metadata.copy()
img_metadata["type"] = "image"
img_metadata["image_index"] = img_index
documents.append(Document(page_content=ocr_text, metadata=img_metadata))
splitter = RecursiveCharacterTextSplitter(chunk_size=2000, chunk_overlap=500)
final_chunks = splitter.split_documents(documents)
return final_chunks
def prepare_retriever(filePath = "", load_from_pinecone = False):
if load_from_pinecone:
vector_store = PineconeVectorStore.from_existing_index(index_name, embeddings)
return vector_store.as_retriever(search_kwargs={"k": 5})
elif filePath:
doc_chunks = loadAndSplitPdfFile(filePath)
vector_data = []
for i, doc in enumerate(doc_chunks):
embedding = embeddings.embed_query(doc.page_content)
if embedding:
metadata = {
"text": doc.page_content,
"source": doc.metadata.get("source", "unknown"),
"page": doc.metadata.get("page", i), # Add page info if available
}
vector_data.append((str(i), embedding, metadata))
print(f"Upserting {len(vector_data)} records into Pinecone...")
index.upsert(vectors=vector_data)
print("Upsert complete")
def get_retriever_chain(retriever):
chat_prompt = ChatPromptTemplate.from_template(prompt)
chain =({"context": retriever, "question": RunnablePassthrough()} | chat_prompt | llm | parser)
return chain
@app.post("/UploadFileInStore")
def UploadFileInStore(file: UploadFile = File(...)):
if not file.filename.endswith('.pdf'):
raise HTTPException(status_code=400, detail="File must be a pdf file")
filePath = Path(UPLOAD_DIR) / file.filename
tempUploadFile(filePath,file)
file_id = generate_file_id(filePath)
delete_existing_embedding(file_id)
prepare_retriever(filePath)
if os.path.exists(filePath):
os.remove(filePath)
return JSONResponse({"message": "File uploaded successfully"})
@app.get("/QnAFromPdf")
async def QnAFromPdf(query: str):
retriever = prepare_retriever(load_from_pinecone=True)
chain = get_retriever_chain(retriever)
response = chain.invoke(query)
return response
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
|