import json import os from langchain.document_loaders import CSVLoader, PyPDFLoader, Docx2txtLoader from langgraph.graph import StateGraph, END from langchain.prompts import PromptTemplate from langchain.schema import Document, AIMessage from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_openai import ChatOpenAI, OpenAIEmbeddings from pathlib import Path from pydantic import BaseModel, Field from qdrant_client import QdrantClient from qdrant_client.models import Distance, VectorParams, PointStruct from typing import List, Dict, Any from pydantic import BaseModel, Field from typing import Dict, Any llm = ChatOpenAI(model_name="gpt-4o") embeddings = OpenAIEmbeddings(model="text-embedding-3-small") qdrant = QdrantClient(":memory:") # In-memory Qdrant instance # Create collection qdrant.create_collection( collection_name="opportunities", vectors_config=VectorParams(size=1536, distance=Distance.COSINE), ) class State(BaseModel): file_path: str document_processed: str = "" opportunity_evaluation: Dict[str, Any] = Field(default_factory=dict) next_action: Dict[str, Any] = Field(default_factory=dict) def dict_representation(self) -> Dict[str, Any]: return { "file_path": self.file_path, "document_processed": self.document_processed, "opportunity_evaluation": self.opportunity_evaluation, "next_action": self.next_action } async def prep_opportunity_review(session_state): file_path = prep_document() structured_results = run_analysis(file_path) opportunity_review_report = create_opportunity_review_report(structured_results) session_state.opportunity_review_results = structured_results session_state.opportunity_review_report = opportunity_review_report def prep_document(): file_path = "data/HSBC Opportunity Information.docx" path = Path(file_path) if path.exists(): if path.is_file(): print(f"File found: {path}") print(f"File size: {path.stat().st_size / 1024:.2f} KB") print(f"Last modified: {path.stat().st_mtime}") print("File is ready for processing.") if os.access(path, os.R_OK): print("File is readable.") else: print("Warning: File exists but may not be readable. Check permissions.") else: print(f"Error: {path} exists but is not a file. It might be a directory.") else: print(f"Error: File not found at {path}") print("Please check the following:") print("1. Ensure the file path is correct.") print("2. Verify that the file exists in the specified location.") print("3. Check if you have the necessary permissions to access the file.") parent = path.parent if not parent.exists(): print(f"Note: The directory {parent} does not exist.") elif not parent.is_dir(): print(f"Note: {parent} exists but is not a directory.") file_path_for_processing = str(path) return file_path_for_processing def load_and_chunk_document(file_path: str) -> List[Document]: """Load and chunk the document based on file type.""" if not os.path.exists(file_path): raise FileNotFoundError(f"File not found: {file_path}") _, file_extension = os.path.splitext(file_path.lower()) if file_extension == '.csv': loader = CSVLoader(file_path) elif file_extension == '.pdf': loader = PyPDFLoader(file_path) elif file_extension == '.docx': loader = Docx2txtLoader(file_path) else: raise ValueError(f"Unsupported file type: {file_extension}") documents = loader.load() text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) return text_splitter.split_documents(documents) def agent_1(file_path: str) -> str: """Agent 1: Load, chunk, embed, and store document in Qdrant.""" try: chunks = load_and_chunk_document(file_path) points = [] for i, chunk in enumerate(chunks): vector = embeddings.embed_query(chunk.page_content) points.append(PointStruct(id=i, vector=vector, payload={"text": chunk.page_content})) qdrant.upsert( collection_name="opportunities", points=points ) return f"Document processed and stored in Qdrant. {len(chunks)} chunks created." except Exception as e: print(f"Error in agent_1: {str(e)}") return f"Error processing document: {str(e)}" def agent_2() -> Dict[str, Any]: """Agent 2: Evaluate opportunity based on MEDDIC criteria.""" try: results = qdrant.scroll(collection_name="opportunities", limit=100) if not results or len(results[0]) == 0: raise ValueError("No documents found in Qdrant") full_text = " ".join([point.payload.get("text", "") for point in results[0]]) meddic_template = """ Analyze the following opportunity information using the MEDDIC sales methodology: {opportunity_info} Assign an overall opportunity score (1-100) with 100 means that the opportunity is a sure win. Provide a Summary of the opportunity. Evaluate the opportunity based on each MEDDIC criterion and assign a score for each criterion: 1. Metrics 2. Economic Buyer 3. Decision Criteria 4. Decision Process 5. Identify Pain 6. Champion Format your response as follows: Summary: [Opportunity Summary] Score: [Overall Opportunity Score between 1 to 100 based on MEDDIC criteria] MEDDIC Evaluation: - Metrics: [Score on Metrics, Evaluation on Metrics criterion] - Economic Buyer: [Score on Economic Buyer, Evaluation on Economic Buyer criterion] - Decision Criteria: [Score on Decision Criteria, Evaluation on Decision Criteria criterion] - Decision Process: [Score on Decision Process, Evaluation on Decision Process criterion] - Identify Pain: [Score on Identify Pain, Evaluation on Identify Pain criterion] - Champion: [Score on Champion, Evaluation on Champion criterion] """ meddic_prompt = PromptTemplate(template=meddic_template, input_variables=["opportunity_info"]) meddic_chain = meddic_prompt | llm response = meddic_chain.invoke({"opportunity_info": full_text}) if isinstance(response, AIMessage): response_content = response.content elif isinstance(response, str): response_content = response else: raise ValueError(f"Unexpected response type: {type(response)}") # Parse the response content lines = response_content.split('\n') summary = next((line.split('Summary:')[1].strip() for line in lines if line.startswith('Summary:')), 'N/A') score = next((int(line.split('Score:')[1].strip()) for line in lines if line.startswith('Score:')), 0) meddic_eval = {} current_criterion = None for line in lines: if line.strip().startswith('-'): parts = line.split(':', 1) if len(parts) == 2: current_criterion = parts[0].strip('- ') meddic_eval[current_criterion] = parts[1].strip() elif current_criterion and line.strip(): meddic_eval[current_criterion] += ' ' + line.strip() return { 'summary': summary, 'score': score, 'meddic_evaluation': meddic_eval } except Exception as e: print(f"Error in agent_2: {str(e)}") return { 'summary': "Error occurred during evaluation", 'score': 0, 'meddic_evaluation': str(e) } def agent_3(meddic_evaluation: Dict[str, Any]) -> Dict[str, Any]: """Agent 3: Suggest next best action and talking points.""" try: next_action_template = """ Based on the following MEDDIC evaluation of an opportunity: {meddic_evaluation} Suggest the next best action for the upcoming customer meeting and provide the top 3 talking points. Format your response as follows: Next Action: [Your suggested action] Talking Points: 1. [First talking point] 2. [Second talking point] 3. [Third talking point] """ next_action_prompt = PromptTemplate(template=next_action_template, input_variables=["meddic_evaluation"]) next_action_chain = next_action_prompt | llm response = next_action_chain.invoke({"meddic_evaluation": json.dumps(meddic_evaluation)}) if isinstance(response, AIMessage): response_content = response.content elif isinstance(response, str): response_content = response else: raise ValueError(f"Unexpected response type: {type(response)}") # Parse the response content lines = response_content.split('\n') next_action = next((line.split('Next Action:')[1].strip() for line in lines if line.startswith('Next Action:')), 'N/A') talking_points = [line.split('.')[1].strip() for line in lines if line.strip().startswith(('1.', '2.', '3.'))] return { 'next_action': next_action, 'talking_points': talking_points } except Exception as e: print(f"Error in agent_3: {str(e)}") return { 'next_action': "Error occurred while suggesting next action", 'talking_points': [str(e)] } def process_document(state: State) -> State: print("Agent 1: Processing document...") file_path = state.file_path result = agent_1(file_path) return State(file_path=state.file_path, document_processed=result) def evaluate_opportunity(state: State) -> State: print("Agent 2: Evaluating opportunity...") result = agent_2() return State(file_path=state.file_path, document_processed=state.document_processed, opportunity_evaluation=result) def suggest_next_action(state: State) -> State: print("Agent 3: Suggesting next actions...") result = agent_3(state.opportunity_evaluation) return State(file_path=state.file_path, document_processed=state.document_processed, opportunity_evaluation=state.opportunity_evaluation, next_action=result) def define_graph() -> StateGraph: workflow = StateGraph(State) workflow.add_node("process_document", process_document) workflow.add_node("evaluate_opportunity", evaluate_opportunity) workflow.add_node("suggest_next_action", suggest_next_action) workflow.set_entry_point("process_document") workflow.add_edge("process_document", "evaluate_opportunity") workflow.add_edge("evaluate_opportunity", "suggest_next_action") return workflow def run_analysis(file_path: str) -> Dict[str, Any]: if not os.path.exists(file_path): return {"error": f"File not found: {file_path}"} graph = define_graph() initial_state = State(file_path=file_path) try: app = graph.compile() final_state = app.invoke(initial_state) # Convert the final state to a dictionary manually structured_results = { "file_path": final_state["file_path"], "document_processed": final_state["document_processed"], "opportunity_evaluation": final_state["opportunity_evaluation"], "next_action": final_state["next_action"] } # Print a summary of the results print("\n--- Analysis Results ---") print(f"Document Processing: {'Successful' if 'Error' not in structured_results['document_processed'] else 'Failed'}") print(f"Details: {structured_results['document_processed']}") if isinstance(structured_results['opportunity_evaluation'], dict): print("\nOpportunity Evaluation:") print(f"Summary: {structured_results['opportunity_evaluation'].get('summary', 'N/A')}") print(f"Score: {structured_results['opportunity_evaluation'].get('score', 'N/A')}") print("MEDDIC Evaluation:") for criterion, evaluation in structured_results['opportunity_evaluation'].get('meddic_evaluation', {}).items(): print(f"{criterion}: {evaluation}") else: print("\nOpportunity Evaluation:") print(f"Error: {structured_results['opportunity_evaluation']}") if isinstance(structured_results['next_action'], dict): print("\nNext Action:") print(f"Action: {structured_results['next_action'].get('next_action', 'N/A')}") print("Talking Points:") for i, point in enumerate(structured_results['next_action'].get('talking_points', []), 1): print(f" {i}. {point}") else: print("\nNext Action:") print(f"Error: {structured_results['next_action']}") return structured_results except Exception as e: print(f"An error occurred during analysis: {str(e)}") return {"error": str(e)} def create_opportunity_review_report(structured_results): opportunity_review_report = "" opportunity_review_report += "**Analysis Results**\n\n" if 'Error' in structured_results['document_processed']: opportunity_review_report += f"Opportunity Analysis Failed\n" else: if isinstance(structured_results['opportunity_evaluation'], dict): opportunity_review_report += f"**Summary:** {structured_results['opportunity_evaluation'].get('summary', 'N/A')}\n\n" opportunity_review_report += f"**Score:** {structured_results['opportunity_evaluation'].get('score', 'N/A')}\n\n" opportunity_review_report += "**MEDDIC Evaluation:**\n\n" for criterion, evaluation in structured_results['opportunity_evaluation'].get('meddic_evaluation', {}).items(): opportunity_review_report += f"**{criterion}:** {evaluation}\n" if isinstance(structured_results['next_action'], dict): opportunity_review_report += "\n\n**Next Steps**\n\n" opportunity_review_report += f"{structured_results['next_action'].get('next_action', 'N/A')}\n\n" opportunity_review_report += "**Talking Points:**\n\n" for i, point in enumerate(structured_results['next_action'].get('talking_points', []), 1): opportunity_review_report += f" {i}. {point}\n" file_path = "reports/HSBC Opportunity Review Report.md" save_md_file(file_path, opportunity_review_report) return opportunity_review_report def save_md_file(file_path, file_content): if os.path.exists(file_path): os.remove(file_path) print(f"Existing file deleted: {file_path}") with open(file_path, 'w', encoding='utf-8') as md_file: md_file.write(file_content)