import concurrent.futures import requests from pdf2image import convert_from_path import base64 from pymongo import MongoClient from langchain_google_genai import ChatGoogleGenerativeAI from langchain_google_genai import GoogleGenerativeAIEmbeddings from langchain_core.messages import HumanMessage import os import re import json import uuid from dotenv import load_dotenv import pinecone load_dotenv() MONGO_URI = os.getenv("MONGO_URI") DB_NAME = os.getenv("DB_NAME") COLLECTION_NAME = os.getenv("COLLECTION_NAME") FLASH_API = os.getenv("FLASH_API") mongo_client = MongoClient(MONGO_URI) db = mongo_client[DB_NAME] collection = db[COLLECTION_NAME] collection2=db['about_company'] model = ChatGoogleGenerativeAI(model="gemini-1.5-flash", temperature=0, max_tokens=None, google_api_key=FLASH_API) google_embeddings = GoogleGenerativeAIEmbeddings( model="models/embedding-001", # Correct model name google_api_key="AIzaSyANNRKfEb-YnVIBaSAq6hQ38XpxxGwvaws" # Your API key ) pc = pinecone.Pinecone( api_key="4a80f293-ae6d-489a-a7d8-33ea3fcdd26b" # Your Pinecone API key ) index_name = "mospi" index = pc.Index(index_name) about_company_doc=collection2.find_one({"type":"about_company"}) if about_company_doc: about_company=about_company_doc.get('company_description','') pdf_temp_dir = 'temp/pdf_files' image_temp_dir = 'temp/page_images' os.makedirs(pdf_temp_dir, exist_ok=True) os.makedirs(image_temp_dir, exist_ok=True) pdf_path = os.path.join(pdf_temp_dir, 'downloaded_file.pdf') def download_and_split_pdf_to_image(url): try: response = requests.get(url) with open(pdf_path, 'wb') as pdf_file: pdf_file.write(response.content) except Exception as e: print(f"error occured during downloading pdf from object url : {e}") return None try: images = convert_from_path(pdf_path) for i, image in enumerate(images): image_path = os.path.join(image_temp_dir, f'page_{i + 1}.png') image.save(image_path, 'PNG') print(f'Saved image: {image_path}') return True except Exception as e: print(f"error occured in converting pdf pages to image : {e}") return None system_prompt_text = f"""Given is an image of a PDF page.Your task is to extract all the information from this image and give a detailed summary of the page, do not miss out on any information, include keywords or any terms mentioned in the pdf.' Given below is a company information whose pdf page is givn to you, to understand the context. - About Company: {about_company} Follow this Expected output format given below: Expected Output format : {{"description":"String"}} """ def process_image_using_llm(image, page_number, url): try: message = HumanMessage( content=[ {"type": "text", "text": system_prompt_text}, {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image}"}}, ], ) response = model.invoke([message]) print(f"LLM response for page {page_number}: {response}") # Extract JSON from the response content using regex match = re.search(r"\{.*\}", response.content.strip()) if match: json_data = match.group(0) # Step 1: Locate the "description" field and escape all single quotes within it description_match = re.search(r"'description'\s*:\s*('.*?'|\".*?\")", json_data) if description_match: description_text = description_match.group(1) # Replace outer single quotes with double quotes if necessary if description_text.startswith("'") and description_text.endswith("'"): description_text = f'"{description_text[1:-1]}"' elif description_text.startswith('"') and description_text.endswith('"'): pass # No change needed if already double quotes # Escape all single quotes within the description text description_text = description_text.replace("'", "\\'") # Replace the original match with the updated description text json_data = ( json_data[:description_match.start(1)] + description_text + json_data[description_match.end(1):] ) # Step 2: Attempt to load the cleaned JSON string try: data = json.loads(json_data) # Load as JSON description = data.get("description", "None").strip() can_find_description = description != "None" return { "page_number": page_number, "description": description if can_find_description else None, "can_find_description": can_find_description } except json.JSONDecodeError as e: print(f"Error decoding JSON for page {page_number}: {e}") return { "page_number": page_number, "description": None, "can_find_description": False } else: print(f"No valid JSON found in the response for page {page_number}") return { "page_number": page_number, "description": None, "can_find_description": False } except Exception as e: print(f"Error processing page {page_number}: {e}") return { "page_number": page_number, "description": None, "can_find_description": False } def create_embedding_for_pdf_chunks(page,description,url,tags,categories): try: document = collection.find_one({'object_url': url}) file_type = document.get("type") mongo_id = str(document.get('_id')) embedding = google_embeddings.embed_query(description) pinecone_id = str(uuid.uuid4()) vectors = [{ 'id': pinecone_id, 'values': embedding, 'metadata': { 'description': description, "url": url, "page_number":page, "tag": file_type, "mongo_id": mongo_id, "tags": ','.join(tags), "categories": ','.join(categories) # Store MongoDB ID in metadata } }] index.upsert(vectors) print(f"Inserted: page {page} in Pinecone with MongoDB ID {mongo_id} in metadata") collection.update_one( { "_id": document["_id"], "chunks.page_number": page # Match document and specific chunk by page number }, { "$set": { "chunks.$.pinecone_id": pinecone_id, "chunks.$.successfully_embedding_created": True } } ) return True except Exception as e: print(f"error occured in creating embedding for pdf with mongo id {mongo_id} for page {page}") collection.update_one( { "_id": document["_id"], "chunks.page_number": page # Match document and specific chunk by page number }, { "$set": { "chunks.$.successfully_embedding_created": False } } ) return False def process_image_and_create_embedding(page_number, image_path, url, tags, categories): with open(image_path, "rb") as image_file: image_data = base64.b64encode(image_file.read()).decode("utf-8") # Process image using LLM to get description page_result = process_image_using_llm(image_data, page_number, url) # If description is available, create embedding if page_result.get("description"): create_embedding_for_pdf_chunks(page_number, page_result["description"], url, tags, categories) else: print(f"Skipping page {page_number} as description is None") return page_result def cleanup_directory(directory_path): try: for filename in os.listdir(directory_path): file_path = os.path.join(directory_path, filename) if os.path.isfile(file_path): os.remove(file_path) print(f"Cleaned up files in {directory_path}") except Exception as e: print(f"Error cleaning up directory {directory_path}: {e}") def process_pdf(url, tags, categories): print(f"Processing PDF with URL: {url}") if download_and_split_pdf_to_image(url): chunks = [] image_files = sorted( os.listdir(image_temp_dir), key=lambda x: int(re.search(r'page_(\d+)', x).group(1)) ) # Use ThreadPoolExecutor to process each page in parallel with concurrent.futures.ThreadPoolExecutor() as executor: futures = [ executor.submit( process_image_and_create_embedding, count, os.path.join(image_temp_dir, image_name), url, tags, categories ) for count, image_name in enumerate(image_files, start=1) ] # Collect results as each thread completes for future in concurrent.futures.as_completed(futures): try: page_result = future.result() chunks.append(page_result) except Exception as e: print(f"Error processing page: {e}") # Update MongoDB document with the collected chunks collection.update_one( {"object_url": url}, {"$set": {"chunks": chunks}}, upsert=True ) print("Saved chunks to MongoDB.") # Cleanup directories cleanup_directory(pdf_temp_dir) cleanup_directory(image_temp_dir) # Check how many pages failed to create embeddings total_pages = len(chunks) failed_pages = sum(1 for chunk in chunks if not chunk.get("can_find_description")) return failed_pages < total_pages