Spaces:
Sleeping
Sleeping
from datetime import datetime | |
from pymongo import MongoClient | |
from langchain_core.prompts import ChatPromptTemplate | |
from langchain_google_genai import ChatGoogleGenerativeAI | |
from langchain_core.messages import HumanMessage | |
import re | |
import json | |
import streamlit as st | |
from langchain_google_genai import GoogleGenerativeAIEmbeddings | |
import os | |
import pinecone | |
from dotenv import load_dotenv | |
from bson import ObjectId | |
import google.generativeai as genai | |
import requests | |
import fitz | |
import base64 | |
from PIL import Image | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
import time | |
load_dotenv() | |
MONGO_URI = os.getenv("MONGO_URI") | |
DB_NAME = os.getenv("DB_NAME") | |
COLLECTION_NAME = os.getenv("COLLECTION_NAME") | |
ABOUT_COMPANY_COLLECTION=os.getenv("COMPANY_COLLECTION_NAME") | |
FLASH_API = os.getenv("FLASH_API") | |
PINECONE_API=os.getenv("PINECONE_API") | |
PINECONE_INDEX=os.getenv("PINECONE_INDEX") | |
mongo_client = MongoClient(MONGO_URI) | |
db = mongo_client[DB_NAME] | |
collection = db[COLLECTION_NAME] | |
collection2=db[ABOUT_COMPANY_COLLECTION] | |
genai.configure(api_key=FLASH_API) | |
embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001", google_api_key=FLASH_API) | |
model = ChatGoogleGenerativeAI(model="gemini-1.5-flash", temperature=0, max_tokens=None, google_api_key=FLASH_API) | |
model2 = genai.GenerativeModel('models/gemini-1.5-flash') | |
pc = pinecone.Pinecone( | |
api_key=PINECONE_API # Your Pinecone API key | |
) | |
index = pc.Index(PINECONE_INDEX) | |
temp_audio_folder = "temp-audio" | |
os.makedirs(temp_audio_folder, exist_ok=True) | |
about_company_doc=collection2.find_one({"type":"about_company"}) | |
if about_company_doc: | |
about_company=about_company_doc.get('company_description','') | |
DOWNLOAD_DIR = "downloaded_pdfs" | |
IMAGE_DIR = "extracted_images" | |
os.makedirs(DOWNLOAD_DIR, exist_ok=True) | |
os.makedirs(IMAGE_DIR, exist_ok=True) | |
if 'images' not in st.session_state: | |
st.session_state.images = [] | |
if 'pdfs' not in st.session_state: | |
st.session_state.pdfs = [] | |
if 'query_submitted' not in st.session_state: | |
st.session_state.query_submitted = False | |
if 'audio' not in st.session_state: | |
st.session_state.audio = False | |
def cleanup_directories(): | |
# Cleanup the PDF download directory | |
for filename in os.listdir(DOWNLOAD_DIR): | |
file_path = os.path.join(DOWNLOAD_DIR, filename) | |
try: | |
if os.path.isfile(file_path) or os.path.islink(file_path): | |
os.unlink(file_path) # Remove the file | |
print(f"Deleted PDF file: {file_path}") | |
except Exception as e: | |
print(f"Failed to delete {file_path}. Reason: {e}") | |
# Cleanup the image extraction directory | |
for filename in os.listdir(IMAGE_DIR): | |
file_path = os.path.join(IMAGE_DIR, filename) | |
try: | |
if os.path.isfile(file_path) or os.path.islink(file_path): | |
os.unlink(file_path) # Remove the file | |
print(f"Deleted Image file: {file_path}") | |
except Exception as e: | |
print(f"Failed to delete {file_path}. Reason: {e}") | |
def process_user_query(user_query, about_company=""): | |
try: | |
# No f-string here, so we avoid additional formatting complications | |
prompt_template = ChatPromptTemplate.from_template(""" | |
Given is a user query. Your task is to first translate the user query from any other language to English if not already in English. | |
Then you have to extract important keywords from this query. Return the result in the format given below. | |
Instructions: | |
1. Give the output in JSON format defined below | |
Expected output format: | |
{{"query":"String", | |
"keywords":["String"] | |
}} | |
This query will be related to Ministry of Statistics and Programme Implementation(MOSPI), and the statistics stored by this organisation. | |
"Query": | |
{user_query} | |
""") | |
# Chain the prompt with LLM for response generation | |
chain = prompt_template | model | |
result = chain.invoke({ | |
"about_company": about_company, | |
"user_query": user_query | |
}) | |
print(f"Model response for reformulated query is {result.content}") | |
# Use non-greedy regex and handle multiline content | |
match = re.search(r"\{[\s\S]*?\}", result.content.strip()) | |
if match: | |
json_data = match.group(0) # Extract JSON-like content as a string | |
json_data = json_data.replace("'", '"') | |
data = json.loads(json_data) | |
enhanced_query = data.get('query', '') | |
keywords = data.get('keywords', []) | |
return enhanced_query, keywords | |
else: | |
print("No JSON data found in the model response.") | |
return None, None | |
except Exception as e: | |
print(f"Error occurred while processing query using LLM: {e}") | |
return None, None | |
def filter_chunks_by_keywords_images(chunks, keywords): | |
keywords_set = set(kw.strip().lower() for kw in keywords) | |
chunks_with_keyword_counts = [] | |
for chunk in chunks: | |
chunk_text = chunk['metadata'].get('description', '').lower() | |
keyword_count = sum(1 for kw in keywords_set if kw in chunk_text) | |
chunks_with_keyword_counts.append({ | |
'chunk': chunk, | |
'keyword_count': keyword_count | |
}) | |
# Sort chunks based on keyword count and similarity score | |
sorted_chunks = sorted( | |
chunks_with_keyword_counts, | |
key=lambda x: (x['keyword_count'], x['chunk']['score']), | |
reverse=True | |
) | |
# Filter chunks that have at least one keyword match | |
chunks_with_keywords = [item for item in sorted_chunks if item['keyword_count'] > 0] | |
if len(chunks_with_keywords) >= 3: | |
# If 3 or more chunks have keyword matches, return the top 3 of those | |
return chunks_with_keywords[:3] | |
elif len(chunks_with_keywords) > 0: | |
# If fewer than 3 chunks have keyword matches, return all that have matches | |
return chunks_with_keywords | |
else: | |
# If no chunks have keyword matches, return the top 3 by similarity score alone | |
sorted_by_similarity = sorted( | |
chunks_with_keyword_counts, | |
key=lambda x: x['chunk']['score'], | |
reverse=True | |
) | |
return sorted_by_similarity[:3] | |
def filter_chunks_by_keywords_pdf(chunks, keywords): | |
keywords_set = set(kw.strip().lower() for kw in keywords) | |
pdf_chunk_map = {} | |
# Step 1: Calculate keyword count and similarity for each chunk, grouped by PDF URL | |
for chunk in chunks: | |
chunk_text = chunk['metadata'].get('description', '').lower() | |
pdf_url = chunk['metadata'].get('url') # Unique identifier for each PDF | |
keyword_count = sum(1 for kw in keywords_set if kw in chunk_text) | |
# Structure each chunk with its metadata and computed values | |
chunk_data = { | |
'chunk': chunk, | |
'keyword_count': keyword_count, | |
'similarity_score': chunk['score'] | |
} | |
# Group chunks by PDF URL, keeping only the most relevant chunk per PDF | |
if pdf_url not in pdf_chunk_map: | |
pdf_chunk_map[pdf_url] = chunk_data | |
else: | |
existing_chunk = pdf_chunk_map[pdf_url] | |
# Keep the chunk with higher relevance (more keywords or higher similarity) | |
if (chunk_data['keyword_count'], chunk_data['similarity_score']) > (existing_chunk['keyword_count'], existing_chunk['similarity_score']): | |
pdf_chunk_map[pdf_url] = chunk_data | |
# Step 2: Collect the top chunk from each PDF, sort by keyword count and similarity score | |
sorted_chunks = sorted( | |
pdf_chunk_map.values(), | |
key=lambda x: (x['keyword_count'], x['similarity_score']), | |
reverse=True | |
) | |
# Step 3: Select the top 3 chunks from different PDFs | |
top_chunks = sorted_chunks[:3] if len(sorted_chunks) >= 3 else sorted_chunks | |
return top_chunks | |
def get_images_from_chunks(chunks): | |
images = [] | |
for item in chunks: | |
chunk = item['chunk'] | |
mongo_id_str = chunk['metadata'].get('mongo_id') | |
if mongo_id_str: | |
mongo_id = ObjectId(mongo_id_str) | |
image = collection.find_one({"_id": mongo_id}) | |
if image: | |
images.append({ | |
'image': image, | |
'similarity_score': chunk['score'] | |
}) | |
return images | |
def get_pdfs_from_chunks(chunks): | |
pdfs = [] | |
for item in chunks: | |
chunk = item['chunk'] | |
mongo_id_str = chunk['metadata'].get('mongo_id') | |
page_number=chunk['metadata'].get('page_number') | |
if mongo_id_str: | |
mongo_id = ObjectId(mongo_id_str) | |
pdf = collection.find_one({"_id": mongo_id}) | |
if pdf: | |
pdfs.append({ | |
'pdf': pdf, | |
'similarity_score': chunk['score'], | |
'page_number': page_number | |
}) | |
return pdfs | |
def format_date(timestamp): | |
"""Convert timestamp to a readable date format.""" | |
return datetime.fromtimestamp(timestamp).strftime("%B %d, %Y") | |
# Dialog function for viewing the PDF page image | |
# Dialog function for viewing the PDF page image | |
def show_dialog(image, chunk,page_number): | |
print("entering dialog box") | |
try: | |
# Display the image | |
img = Image.open(image) | |
st.image(img, caption=f"Page Number: {page_number}", use_container_width=True) | |
st.markdown("### Relevant Chunk", unsafe_allow_html=True) | |
# Display chunk with styling | |
st.markdown(f""" | |
<div style="background-color: #FFEB3B; padding: 10px; border-radius: 10px; color: black;"> | |
{chunk} | |
</div> | |
""", unsafe_allow_html=True) | |
except Exception as e: | |
print(f"Error occurred in displaying image: {e}") | |
# Display the results of the image and PDFs | |
def display_results(images, pdfs,type): | |
# Display Images Section | |
images = sorted(images, key=lambda x: x['similarity_score'], reverse=True) | |
num_images = len(images) | |
if num_images > 0: | |
st.write("### Here are the matching images for your query") | |
for start_idx in range(0, num_images, 3): | |
num_cols = min(3, num_images - start_idx) | |
cols = st.columns(num_cols) | |
# Display images in the current row | |
for idx in range(num_cols): | |
img_info = images[start_idx + idx] | |
col = cols[idx] | |
with col: | |
image_data = img_info['image'] | |
similarity_score = img_info['similarity_score'] | |
st.markdown( | |
f""" | |
<div style='text-align: center;'> | |
<img src='{image_data['object_url']}' alt='Image' style='width:250px; height:250px; object-fit: cover; border-radius: 8px;' /> | |
<p><strong>Similarity Score:</strong> {similarity_score:.4f}</p> | |
</div> | |
""", | |
unsafe_allow_html=True | |
) | |
with st.expander("View Image Details"): | |
st.write(f"**File Name:** {image_data.get('name', 'N/A')}") | |
st.write(f"**Date Uploaded:** {format_date(image_data.get('upload_date', datetime.now().timestamp()))}") | |
st.write(f"**Description:** {image_data.get('description', 'No description available')}") | |
tags = ", ".join(image_data.get("tags", [])) | |
st.write(f"**Tags:** {tags if tags else 'No tags'}") | |
categories = ", ".join(image_data.get("categories", [])) | |
st.write(f"**Categories:** {categories if categories else 'No categories'}") | |
st.markdown( | |
f"<a href='{image_data['object_url']}' class='download-link' download>Download Image</a>", | |
unsafe_allow_html=True | |
) | |
else: | |
st.write("No images to display.") | |
# Display PDFs Section in rows of three columns | |
pdfs = sorted(pdfs, key=lambda x: x['similarity_score'], reverse=True) | |
num_pdfs = len(pdfs) | |
if num_pdfs > 0: | |
st.write("### Here are the matching PDFs for your query") | |
for start_idx in range(0, num_pdfs, 3): | |
num_cols = min(3, num_pdfs - start_idx) | |
cols = st.columns(num_cols) | |
for idx in range(num_cols): | |
pdf_info = pdfs[start_idx + idx] | |
col = cols[idx] | |
with col: | |
pdf_data = pdf_info['pdf'] | |
similarity_score = pdf_info['similarity_score'] | |
extracted_image_path=pdf_info["image"] | |
relevant_chunk=pdf_info["relevant_chunk"] | |
print(f"extracted image path is : {extracted_image_path}") | |
# Expander for each PDF in a column | |
with st.expander(f"{pdf_data.get('name', 'PDF Document')}"): | |
st.write(f"**File Name:** {pdf_data.get('name', 'N/A')}") | |
st.write(f"**Page Number:** {int(pdf_info['page_number'])}") | |
st.write( | |
f"**Date Uploaded:** {format_date(pdf_data.get('upload_date', datetime.now().timestamp()))}") | |
tags = ", ".join(pdf_data.get("tags", [])) | |
st.write(f"**Tags:** {tags if tags else 'No tags'}") | |
categories = ", ".join(pdf_data.get("categories", [])) | |
st.write(f"**Categories:** {categories if categories else 'No categories'}") | |
st.markdown( | |
f"<a href='{pdf_data['object_url']}' class='download-link' download>Download PDF</a>", | |
unsafe_allow_html=True | |
) | |
# Button that will trigger the dialog | |
if st.button("View chunk", key=f"chunk_{type}_{pdf_data['name']}"): | |
print(f"button is pressed") | |
# Call the dialog function when the button is pressed | |
show_dialog(extracted_image_path,relevant_chunk,int(pdf_info["page_number"])) | |
st.markdown( | |
f"""<div style='text-align: center;'> | |
<p><strong>Similarity Score:</strong> {similarity_score:.4f}</p></div>""", | |
unsafe_allow_html=True | |
) | |
else: | |
st.write("No PDFs to display.") | |
def upload_audio_google(audio_path): | |
try: | |
audio_file = genai.upload_file(path=audio_path, display_name="Query Audio") | |
print(f"Uploaded file '{audio_file.display_name}' as: {audio_file.uri}") | |
return audio_file | |
except Exception as e: | |
print(f"error occured while uploading audio to google : {e}") | |
return None | |
def extract_query_from_audio(audio_file): | |
try: | |
prompt=f""" Given is a user query related in form of audio, your task is to understand the user query and convert it to text. If the audio is not in english then transalte it to english textual query.Also extract important keywords from the query. | |
Expected output format : {{ | |
"query":"String", | |
"keywords":["String"] | |
}} | |
""" | |
response = model2.generate_content( | |
[prompt, audio_file] | |
) | |
if response: | |
print(response.text) | |
match = re.search(r"\{[\s\S]*?\}", response.text) | |
if match: | |
json_data = match.group(0) # Extract JSON-like content as a string | |
json_data = json_data.replace("'", '"') | |
data = json.loads(json_data) | |
enhanced_query = data.get('query', '') | |
keywords = data.get('keywords', []) | |
return enhanced_query, keywords | |
else: | |
print("No JSON data found in the model response.") | |
return None,None | |
except Exception as e: | |
print(f"error occured in extracting query from audio {e}") | |
return None,None | |
def search_pinecone(k,filetype,query_embedding): | |
search_results = index.query( | |
vector=query_embedding, | |
top_k=k, | |
include_metadata=True, | |
filter={"tag": filetype} | |
) | |
return search_results | |
def download_pdf(url,filename): | |
try: | |
response = requests.get(url) | |
response.raise_for_status() # Raise an error for bad responses | |
pdf_path = os.path.join(DOWNLOAD_DIR, filename) | |
with open(pdf_path, 'wb') as f: | |
f.write(response.content) | |
print(f"Downloaded PDF: {pdf_path}") | |
return pdf_path | |
except requests.exceptions.RequestException as e: | |
print(f"Error downloading PDF: {e}") | |
return None | |
def convert_pdf_page_to_image(pdf_path, page_number): | |
try: | |
doc = fitz.open(pdf_path) | |
page_number=int(page_number) | |
# Check if the page number is valid | |
if page_number < 1 or page_number > doc.page_count: | |
print(f"Error: Page number {page_number} is out of bounds. Document only has {doc.page_count} pages.") | |
return None | |
# Load the page (PyMuPDF uses 0-based indexing, so subtract 1 from the page number) | |
page = doc.load_page(page_number - 1) | |
# Render the page as an image (pixmap) | |
pix = page.get_pixmap() | |
# Save the image | |
image_path = f"extracted_images/page_{page_number}.png" | |
pix.save(image_path) | |
print(f"Converted page {page_number} to image: {image_path}") | |
return image_path | |
except Exception as e: | |
print(f"Error converting PDF to image: {e}") | |
return None | |
def process_pdf(page_number,chunk,doc): | |
object_url=doc.get("object_url") | |
filename=doc.get("name") | |
pdf_path=download_pdf(object_url,filename) | |
if pdf_path: | |
image_path = convert_pdf_page_to_image(pdf_path, page_number) | |
return image_path | |
system_prompt_text=f""" | |
Given is the extracted image of a PDF page relevant to a user query. Your task is to see the user query and see the page image and then return the relevant chunk of text from the page, which might be related to user query. | |
Follow the instructions given below: | |
1. Do not summarise the text, you have to give the chunk as it is from the page. | |
2. Give the output in a JSON format defined below | |
Output Format: {{ | |
"chunk" : "String" | |
}} | |
""" | |
def process_image_using_llm(image): | |
try: | |
# Send the image and system prompt to the LLM | |
message = HumanMessage( | |
content=[ | |
{"type": "text", "text": system_prompt_text}, | |
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image}"}}, | |
], | |
) | |
response = model.invoke([message]) | |
print(f" LLM response for page : {response}") | |
# Retry only if JSON data is missing or JSON decode error occurs | |
# Check for JSON content in the response | |
if response: | |
result=response.content | |
print(f"llm result for relevant chunk extraction is :{result}") | |
if result: | |
json_string = result.strip('```json\n').strip('```') | |
if json_string: | |
parsed_data = json.loads(json_string) | |
chunk = parsed_data["chunk"] | |
if chunk: | |
return chunk | |
else: | |
return None | |
except Exception as e: | |
print(f"error occurred in in llm call for chunk extraction: {e}") | |
return None | |
def get_relevant_chunk_from_image(image_path,enhanced_query): | |
with open(image_path, "rb") as image_file: | |
image_data = base64.b64encode(image_file.read()).decode("utf-8") | |
chunk = process_image_using_llm(image_data) | |
return chunk | |
def process_single_pdf(pdf, enhanced_query): | |
try: | |
page_number = pdf.get("page_number") | |
chunk = pdf.get("chunk") | |
mongo_doc = pdf.get("pdf") | |
image_path = process_pdf(page_number, chunk, mongo_doc) | |
if image_path: | |
relevant_chunk = get_relevant_chunk_from_image(image_path, enhanced_query) | |
pdf["relevant_chunk"] = relevant_chunk | |
pdf["image"] = image_path | |
return pdf | |
except Exception as e: | |
print(f"Error processing PDF: {e}") | |
return None | |
def process_pdfs_in_parallel(pdfs, enhanced_query): | |
processed_pdfs = [] | |
with ThreadPoolExecutor() as executor: | |
# Submit all PDF processing tasks to the thread pool | |
futures = [executor.submit(process_single_pdf, pdf, enhanced_query) for pdf in pdfs] | |
# Wait for the results and collect them | |
for future in as_completed(futures): | |
result = future.result() | |
if result: | |
processed_pdfs.append(result) | |
return processed_pdfs | |
def search(): | |
if st.button("Back", key="back_button"): | |
st.session_state.images = [] # Reset images | |
st.session_state.pdfs = [] # Reset pdfs | |
st.session_state.query_submitted = False # Reset query state | |
st.session_state.audio = False # Reset audio state | |
st.session_state.page = "home" # Reset page | |
st.rerun() # Reload the app | |
st.title("AI Inspired Smart Search Engine") | |
st.subheader("Multilingual text search 🖊️") | |
user_query = st.text_input("Enter your search query:") | |
# Text query submission | |
if user_query and st.button("Submit Query", key="submit_query"): | |
# Reset previous results before processing new text query | |
st.session_state.query_submitted = False # Reset query submitted flag | |
st.session_state.audio = False # Reset audio flag | |
cleanup_directories() | |
with st.spinner("Processing your query, please wait..."): | |
enhanced_query, keywords = process_user_query(user_query, about_company) | |
if enhanced_query and keywords: | |
query_embedding = embeddings.embed_query(enhanced_query) | |
search_results_image = search_pinecone(5, "Image", query_embedding) | |
search_result_pdfs = search_pinecone(20, "PDF", query_embedding) | |
matches_pdf = search_result_pdfs['matches'] | |
matches_image = search_results_image['matches'] | |
images = [] | |
pdfs = [] | |
if not matches_image and not matches_pdf: | |
st.write("No matching PDFs and Images found for your query") | |
else: | |
if matches_image: | |
top_chunks_images = filter_chunks_by_keywords_images(matches_image, keywords) | |
if top_chunks_images: | |
images = get_images_from_chunks(top_chunks_images) | |
if matches_pdf: | |
top_chunks_pdf = filter_chunks_by_keywords_pdf(matches_pdf, keywords) | |
if top_chunks_pdf: | |
pdfs = get_pdfs_from_chunks(top_chunks_pdf) | |
if pdfs: | |
# Process PDFs in parallel | |
processed_pdfs = process_pdfs_in_parallel(pdfs, enhanced_query) | |
st.session_state.pdfs = processed_pdfs | |
# Store the results in session state | |
st.session_state.images = images | |
st.session_state.pdfs = pdfs | |
st.session_state.query_submitted = True # Mark query as submitted | |
st.session_state.audio = False # Ensure it's not audio after text query | |
# Display results | |
display_results(images, pdfs,"text") | |
# Display results if query has been submitted and it's a text query | |
elif st.session_state.query_submitted and not st.session_state.audio: | |
display_results(st.session_state.images, st.session_state.pdfs,"text") | |
# Audio query section | |
st.markdown("<hr>", unsafe_allow_html=True) | |
st.subheader("Multilingual Audio Search 🗣️") | |
audio_value = st.audio_input("Record your query") | |
if audio_value and st.button("Submit Audio", key="audio-button"): | |
# Reset previous results before processing new audio query | |
st.session_state.query_submitted = False # Reset query submitted flag | |
st.session_state.audio = True # Mark it as an audio query | |
cleanup_directories() | |
with st.spinner("Processing your query, please wait..."): | |
timestamp = datetime.now().strftime("%Y%m%d%H%M%S") | |
audio_file_path = os.path.join(temp_audio_folder, f"audio_query_{timestamp}.wav") | |
# Save the audio input to the file | |
with open(audio_file_path, "wb") as f: | |
f.write(audio_value.getvalue()) | |
audio_file = upload_audio_google(audio_file_path) | |
if audio_file: | |
audio_query, audio_keywords = extract_query_from_audio(audio_file) | |
if audio_query and audio_keywords: | |
query_embedding = embeddings.embed_query(audio_query) | |
search_results_image = search_pinecone(5, "Image", query_embedding) | |
search_result_pdfs = search_pinecone(20, "PDF", query_embedding) | |
matches_pdf = search_result_pdfs['matches'] | |
matches_image = search_results_image['matches'] | |
images = [] | |
pdfs = [] | |
if not matches_image and not matches_pdf: | |
st.write("No matching PDFs and Images found for your query") | |
else: | |
if matches_image: | |
top_chunks_images = filter_chunks_by_keywords_images(matches_image, audio_keywords) | |
if top_chunks_images: | |
images = get_images_from_chunks(top_chunks_images) | |
if matches_pdf: | |
top_chunks_pdf = filter_chunks_by_keywords_pdf(matches_pdf, audio_keywords) | |
if top_chunks_pdf: | |
pdfs = get_pdfs_from_chunks(top_chunks_pdf) | |
if pdfs: | |
# Process PDFs in parallel | |
processed_pdfs = process_pdfs_in_parallel(pdfs, audio_query) | |
st.session_state.pdfs = processed_pdfs | |
# Store the results in session state | |
st.session_state.images = images | |
st.session_state.pdfs = pdfs | |
st.session_state.query_submitted = True # Mark query as submitted | |
st.session_state.audio = True # Set it to audio after audio query | |
# Display the results | |
display_results(images, pdfs,"audio") | |
else: | |
st.error(f"Sorry, could not process your request, please try again later!") | |
# Clean up: Delete the audio file from Google and remove temp files | |
try: | |
genai.delete_file(audio_file.name) | |
except Exception as e: | |
print(f"Failed to delete audio file from Google storage: {e}") | |
# Delete files inside the temp directory | |
for filename in os.listdir(temp_audio_folder): | |
file_path = os.path.join(temp_audio_folder, filename) | |
try: | |
if os.path.isfile(file_path) or os.path.islink(file_path): | |
os.unlink(file_path) | |
except Exception as e: | |
print(f"Failed to delete {file_path}. Reason: {e}") | |
# Display results if audio query has been processed | |
elif st.session_state.query_submitted and st.session_state.audio: | |
display_results(st.session_state.images, st.session_state.pdfs,"audio") | |