from datetime import datetime
from pymongo import MongoClient
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.prompts import ChatPromptTemplate
import re
import json
import streamlit as st
from langchain_google_genai import GoogleGenerativeAIEmbeddings
import os
import pinecone
from dotenv import load_dotenv
from bson import ObjectId
import google.generativeai as genai
load_dotenv()
MONGO_URI = os.getenv("MONGO_URI")
DB_NAME = os.getenv("DB_NAME")
COLLECTION_NAME = os.getenv("COLLECTION_NAME")
ABOUT_COMPANY_COLLECTION=os.getenv("COMPANY_COLLECTION_NAME")
FLASH_API = os.getenv("FLASH_API")
PINECONE_API=os.getenv("PINECONE_API")
PINECONE_INDEX=os.getenv("PINECONE_INDEX")
mongo_client = MongoClient(MONGO_URI)
db = mongo_client[DB_NAME]
collection = db[COLLECTION_NAME]
collection2=db[ABOUT_COMPANY_COLLECTION]
genai.configure(api_key=FLASH_API)
embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001", google_api_key=FLASH_API)
model = ChatGoogleGenerativeAI(model="gemini-1.5-flash", temperature=0, max_tokens=None, google_api_key=FLASH_API)
model2 = genai.GenerativeModel('models/gemini-1.5-flash')
pc = pinecone.Pinecone(
api_key=PINECONE_API # Your Pinecone API key
)
index = pc.Index(PINECONE_INDEX)
temp_audio_folder = "temp-audio"
os.makedirs(temp_audio_folder, exist_ok=True)
about_company_doc=collection2.find_one({"type":"about_company"})
if about_company_doc:
about_company=about_company_doc.get('company_description','')
print(about_company)
def process_user_query(user_query, about_company=""):
try:
# No f-string here, so we avoid additional formatting complications
prompt_template = ChatPromptTemplate.from_template("""
Given is a user query. Your task is to first translate the user query from any other language to English if not already in English.
Then you have to reformulate that translated query such that it is contextually rich and consistent.Do not add too much extra information in the query.
Also extract important keywords from this query. Return the result in the format given below.
Expected output format:
{{'query':'String',
'keywords':['String']
}}
Given below is the user query which is related to a company about which is given below, but this company information is just to understand what the query might be related to, until explicitly asked you do not need to include company information in query:
"About Company": {about_company}
"Query":
{user_query}
""")
# Chain the prompt with LLM for response generation
chain = prompt_template | model
result = chain.invoke({
"about_company": about_company,
"user_query": user_query
})
print(f"Model response for reformulated query is {result.content}")
# Use non-greedy regex and handle multiline content
match = re.search(r"\{[\s\S]*?\}", result.content.strip())
if match:
json_data = match.group(0) # Extract JSON-like content as a string
json_data = json_data.replace("'", '"')
data = json.loads(json_data)
enhanced_query = data.get('query', '')
keywords = data.get('keywords', [])
return enhanced_query, keywords
else:
print("No JSON data found in the model response.")
return None, None
except Exception as e:
print(f"Error occurred while processing query using LLM: {e}")
return None, None
def filter_chunks_by_keywords_images(chunks, keywords):
keywords_set = set(kw.strip().lower() for kw in keywords)
chunks_with_keyword_counts = []
for chunk in chunks:
chunk_text = chunk['metadata'].get('description', '').lower()
keyword_count = sum(1 for kw in keywords_set if kw in chunk_text)
chunks_with_keyword_counts.append({
'chunk': chunk,
'keyword_count': keyword_count
})
# Sort chunks based on keyword count and similarity score
sorted_chunks = sorted(
chunks_with_keyword_counts,
key=lambda x: (x['keyword_count'], x['chunk']['score']),
reverse=True
)
# Filter chunks that have at least one keyword match
chunks_with_keywords = [item for item in sorted_chunks if item['keyword_count'] > 0]
if len(chunks_with_keywords) >= 3:
# If 3 or more chunks have keyword matches, return the top 3 of those
return chunks_with_keywords[:3]
elif len(chunks_with_keywords) > 0:
# If fewer than 3 chunks have keyword matches, return all that have matches
return chunks_with_keywords
else:
# If no chunks have keyword matches, return the top 3 by similarity score alone
sorted_by_similarity = sorted(
chunks_with_keyword_counts,
key=lambda x: x['chunk']['score'],
reverse=True
)
return sorted_by_similarity[:3]
def filter_chunks_by_keywords_pdf(chunks, keywords):
keywords_set = set(kw.strip().lower() for kw in keywords)
pdf_chunk_map = {}
# Step 1: Calculate keyword count and similarity for each chunk, grouped by PDF URL
for chunk in chunks:
chunk_text = chunk['metadata'].get('description', '').lower()
pdf_url = chunk['metadata'].get('url') # Unique identifier for each PDF
keyword_count = sum(1 for kw in keywords_set if kw in chunk_text)
# Structure each chunk with its metadata and computed values
chunk_data = {
'chunk': chunk,
'keyword_count': keyword_count,
'similarity_score': chunk['score']
}
# Group chunks by PDF URL, keeping only the most relevant chunk per PDF
if pdf_url not in pdf_chunk_map:
pdf_chunk_map[pdf_url] = chunk_data
else:
existing_chunk = pdf_chunk_map[pdf_url]
# Keep the chunk with higher relevance (more keywords or higher similarity)
if (chunk_data['keyword_count'], chunk_data['similarity_score']) > (existing_chunk['keyword_count'], existing_chunk['similarity_score']):
pdf_chunk_map[pdf_url] = chunk_data
# Step 2: Collect the top chunk from each PDF, sort by keyword count and similarity score
sorted_chunks = sorted(
pdf_chunk_map.values(),
key=lambda x: (x['keyword_count'], x['similarity_score']),
reverse=True
)
# Step 3: Select the top 3 chunks from different PDFs
top_chunks = sorted_chunks[:3] if len(sorted_chunks) >= 3 else sorted_chunks
return top_chunks
def get_images_from_chunks(chunks):
images = []
for item in chunks:
chunk = item['chunk']
mongo_id_str = chunk['metadata'].get('mongo_id')
if mongo_id_str:
mongo_id = ObjectId(mongo_id_str)
image = collection.find_one({"_id": mongo_id})
if image:
images.append({
'image': image,
'similarity_score': chunk['score']
})
return images
def get_pdfs_from_chunks(chunks):
pdfs = []
for item in chunks:
chunk = item['chunk']
mongo_id_str = chunk['metadata'].get('mongo_id')
page_number = chunk['metadata'].get('page_number')
chunk_text = chunk['metadata'].get('description', 'No description available')
if mongo_id_str:
mongo_id = ObjectId(mongo_id_str)
pdf = collection.find_one({"_id": mongo_id})
if pdf:
pdfs.append({
'pdf': pdf,
'similarity_score': chunk['score'],
'page_number': page_number,
'chunk_text': chunk_text # Include the chunk text for display
})
return pdfs
def format_date(timestamp):
"""Convert timestamp to a readable date format."""
return datetime.fromtimestamp(timestamp).strftime("%B %d, %Y")
# def display_images(images):
# images = sorted(images, key=lambda x: x['similarity_score'], reverse=True)
# num_images = len(images)
# if num_images == 0:
# st.write("No images to display.")
# return
#
# # Iterate over the images in steps of 3 to create rows
# st.write("Here are the matching images for your query")
# for start_idx in range(0, num_images, 3):
# # Determine the number of columns for this row (could be less than 3 in the last row)
# num_cols = min(3, num_images - start_idx)
# cols = st.columns(num_cols)
#
# # Display images in the current row
# for idx in range(num_cols):
# img_info = images[start_idx + idx]
# col = cols[idx]
# with col:
# image_data = img_info['image']
# similarity_score = img_info['similarity_score']
#
# # Display the image using object_url directly with consistent sizing
# st.markdown(
# f"""
#
#

#
Similarity Score: {similarity_score:.4f}
#
# """,
# unsafe_allow_html=True
# )
#
# # Expander for image details
# with st.expander("View Image Details"):
# st.write(f"**File Name:** {image_data.get('name', 'N/A')}")
# st.write(f"**Date Uploaded:** {format_date(image_data.get('upload_date', datetime.now().timestamp()))}")
# st.write(f"**Description:** {image_data.get('description', 'No description available')}")
#
# # Display tags if available
# tags = ", ".join(image_data.get("tags", []))
# st.write(f"**Tags:** {tags if tags else 'No tags'}")
#
# # Display categories if available
# categories = ", ".join(image_data.get("categories", []))
# st.write(f"**Categories:** {categories if categories else 'No categories'}")
#
# # Download link
# st.markdown(
# f"Download Image",
# unsafe_allow_html=True
# )
def display_results(images, pdfs):
# Display Images Section
images = sorted(images, key=lambda x: x['similarity_score'], reverse=True)
num_images = len(images)
if num_images > 0:
st.write("### Here are the matching images for your query")
for start_idx in range(0, num_images, 3):
num_cols = min(3, num_images - start_idx)
cols = st.columns(num_cols)
# Display images in the current row
for idx in range(num_cols):
img_info = images[start_idx + idx]
col = cols[idx]
with col:
image_data = img_info['image']
similarity_score = img_info['similarity_score']
st.markdown(
f"""
Similarity Score: {similarity_score:.4f}
""",
unsafe_allow_html=True
)
with st.expander("View Image Details"):
st.write(f"**File Name:** {image_data.get('name', 'N/A')}")
st.write(
f"**Date Uploaded:** {format_date(image_data.get('upload_date', datetime.now().timestamp()))}")
st.write(f"**Description:** {image_data.get('description', 'No description available')}")
tags = ", ".join(image_data.get("tags", []))
st.write(f"**Tags:** {tags if tags else 'No tags'}")
categories = ", ".join(image_data.get("categories", []))
st.write(f"**Categories:** {categories if categories else 'No categories'}")
st.markdown(
f"Download Image",
unsafe_allow_html=True
)
else:
st.write("No images to display.")
# Display PDFs Section in rows of three columns
pdfs = sorted(pdfs, key=lambda x: x['similarity_score'], reverse=True)
num_pdfs = len(pdfs)
if num_pdfs > 0:
st.write("### Here are the matching PDFs for your query")
for start_idx in range(0, num_pdfs, 3):
num_cols = min(3, num_pdfs - start_idx)
cols = st.columns(num_cols)
for idx in range(num_cols):
pdf_info = pdfs[start_idx + idx]
col = cols[idx]
with col:
pdf_data = pdf_info['pdf']
similarity_score = pdf_info['similarity_score']
page_number = pdf_info['page_number']
chunk_text = pdf_info['chunk_text']
with st.expander(f"{pdf_data.get('name', 'PDF Document')}"):
st.write(f"**Page Number:** {int(page_number)}")
# Highlighted description using inline styles
st.markdown(
f"""
Chunk:
{chunk_text}
""",
unsafe_allow_html=True
)
st.write(f"**Similarity Score:** {similarity_score:.4f}")
tags = ", ".join(pdf_data.get("tags", []))
st.write(f"**Tags:** {tags if tags else 'No tags'}")
categories = ", ".join(pdf_data.get("categories", []))
st.write(f"**Categories:** {categories if categories else 'No categories'}")
st.markdown(
f"Download PDF",
unsafe_allow_html=True
)
else:
st.write("No PDFs to display.")
def upload_audio_google(audio_path):
try:
audio_file = genai.upload_file(path=audio_path, display_name="Query Audio")
print(f"Uploaded file '{audio_file.display_name}' as: {audio_file.uri}")
return audio_file
except Exception as e:
print(f"error occured while uploading audio to google : {e}")
return None
def extract_query_from_audio(audio_file):
try:
prompt=f""" Given is a user query related to a company in form of audio, your task is to understand the user query and convert it to text. If the audio is not in english then transalte it to english textual query. Make sure the generated query is consistent and contextual.Also extract important keywords from the query.
For the context I am providing with company information {about_company}
Expected output format : {{
"query":"String",
"keywords":["String"]
}}
"""
response = model2.generate_content(
[prompt, audio_file]
)
if response:
print(response.text)
match = re.search(r"\{[\s\S]*?\}", response.text)
if match:
json_data = match.group(0) # Extract JSON-like content as a string
json_data = json_data.replace("'", '"')
data = json.loads(json_data)
enhanced_query = data.get('query', '')
keywords = data.get('keywords', [])
return enhanced_query, keywords
else:
print("No JSON data found in the model response.")
return None,None
except Exception as e:
print(f"error occured in extracting query from audio {e}")
return None,None
def search_pinecone(k,filetype,query_embedding):
search_results = index.query(
vector=query_embedding,
top_k=k,
include_metadata=True,
filter={"filetype": filetype}
)
return search_results
def search():
if st.button("Back",key="back_button"):
st.session_state.page="home"
st.rerun()
st.title("AI Inspired Smart Search Engine")
st.subheader("Multilingual text search 🖊️")
user_query = st.text_input("Enter your search query:")
if user_query and st.button("submit query",key="submit_query"):
with st.spinner("Processing your query, please wait"):
enhanced_query,keywords=process_user_query(user_query,about_company)
if enhanced_query and keywords:
query_embedding = embeddings.embed_query(enhanced_query)
search_results_image = search_pinecone(5,"Image",query_embedding)
search_result_pdfs = search_pinecone(20, "PDF", query_embedding)
matches_pdf = search_result_pdfs['matches']
matches_image = search_results_image['matches']
images=[]
pdfs=[]
if not matches_image and not matches_pdf:
print(f"No matching PDFs and Images found for your query")
st.write(f"No matching PDFs and Images found for your query")
else:
if matches_image:
top_chunks_images = filter_chunks_by_keywords_images(matches_image, keywords)
if top_chunks_images:
# Step 5: Retrieve images from MongoDB
images = get_images_from_chunks(top_chunks_images)
if matches_pdf:
top_chunks_pdf=filter_chunks_by_keywords_pdf(matches_pdf,keywords)
if top_chunks_pdf:
pdfs=get_pdfs_from_chunks(top_chunks_pdf)
print(f"len of pdfs : {pdfs}")
display_results(images,pdfs)
else:
st.error(f"Sorry could not process your request, please try again later!")
st.markdown("
", unsafe_allow_html=True)
st.subheader("Multilingual Audio Search 🗣️")
audio_value = st.audio_input("Record your query")
if audio_value and st.button("Submit Audio",key="audio-button"):
with st.spinner("Processing your query, please wait"):
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
audio_file_path = os.path.join(temp_audio_folder, f"audio_query_{timestamp}.wav")
# Save the audio input to the file
with open(audio_file_path, "wb") as f:
f.write(audio_value.getvalue())
print(f"Audio saved to {audio_file_path}")
audio_file=upload_audio_google(audio_file_path)
if audio_file:
audio_query,audio_keywords=extract_query_from_audio(audio_file)
if audio_query and audio_keywords:
query_embedding = embeddings.embed_query(audio_query)
search_results_image = search_pinecone(5, "Image", query_embedding)
search_result_pdfs = search_pinecone(20, "PDF", query_embedding)
matches_pdf = search_result_pdfs['matches']
matches_image = search_results_image['matches']
images = []
pdfs = []
if not matches_image and not matches_pdf:
print(f"No matching PDFs and Images found for your query")
st.write(f"No matching PDFs and Images found for your query")
else:
if matches_image:
top_chunks_images = filter_chunks_by_keywords_images(matches_image, audio_keywords)
if not top_chunks_images:
st.write("No chunks matched the keywords.")
else:
# Step 5: Retrieve images from MongoDB
images = get_images_from_chunks(top_chunks_images)
if matches_pdf:
top_chunks_pdf = filter_chunks_by_keywords_pdf(matches_pdf, audio_keywords)
if top_chunks_pdf:
pdfs = get_pdfs_from_chunks(top_chunks_pdf)
len(pdfs)
display_results(images,pdfs)
else:
st.error(f"Sorry could not process your request, please try again later!")
#deleting the audio file from google
try:
genai.delete_file(audio_file.name)
print(f"deleted audio file from google storage")
except Exception as e:
print(f"failed to delete audio file from google storage")
#delete files inside temp directory
for filename in os.listdir(temp_audio_folder):
file_path = os.path.join(temp_audio_folder, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path) # Remove the file
print(f"Deleted file: {file_path}")
except Exception as e:
print(f"Failed to delete {file_path}. Reason: {e}")