Spaces:
Sleeping
Sleeping
import json | |
import os | |
from langchain.document_loaders import CSVLoader, PyPDFLoader, Docx2txtLoader | |
from langgraph.graph import StateGraph, END | |
from langchain.prompts import PromptTemplate | |
from langchain.schema import Document, AIMessage | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain_openai import ChatOpenAI, OpenAIEmbeddings | |
from pathlib import Path | |
from pydantic import BaseModel, Field | |
from qdrant_client import QdrantClient | |
from qdrant_client.models import Distance, VectorParams, PointStruct | |
from typing import List, Dict, Any | |
from pydantic import BaseModel, Field | |
from typing import Dict, Any | |
llm = ChatOpenAI(model_name="gpt-4o") | |
embeddings = OpenAIEmbeddings(model="text-embedding-3-small") | |
qdrant = QdrantClient(":memory:") # In-memory Qdrant instance | |
# Create collection | |
qdrant.create_collection( | |
collection_name="opportunities", | |
vectors_config=VectorParams(size=1536, distance=Distance.COSINE), | |
) | |
class State(BaseModel): | |
file_path: str | |
document_processed: str = "" | |
opportunity_evaluation: Dict[str, Any] = Field(default_factory=dict) | |
next_action: Dict[str, Any] = Field(default_factory=dict) | |
def dict_representation(self) -> Dict[str, Any]: | |
return { | |
"file_path": self.file_path, | |
"document_processed": self.document_processed, | |
"opportunity_evaluation": self.opportunity_evaluation, | |
"next_action": self.next_action | |
} | |
async def prep_opportunity_review(session_state): | |
file_path = prep_document() | |
structured_results = run_analysis(file_path) | |
opportunity_review_report = create_opportunity_review_report(structured_results) | |
session_state.opportunity_review_results = structured_results | |
session_state.opportunity_review_report = opportunity_review_report | |
def prep_document(): | |
file_path = "data/HSBC Opportunity Information.docx" | |
path = Path(file_path) | |
if path.exists(): | |
if path.is_file(): | |
print(f"File found: {path}") | |
print(f"File size: {path.stat().st_size / 1024:.2f} KB") | |
print(f"Last modified: {path.stat().st_mtime}") | |
print("File is ready for processing.") | |
if os.access(path, os.R_OK): | |
print("File is readable.") | |
else: | |
print("Warning: File exists but may not be readable. Check permissions.") | |
else: | |
print(f"Error: {path} exists but is not a file. It might be a directory.") | |
else: | |
print(f"Error: File not found at {path}") | |
print("Please check the following:") | |
print("1. Ensure the file path is correct.") | |
print("2. Verify that the file exists in the specified location.") | |
print("3. Check if you have the necessary permissions to access the file.") | |
parent = path.parent | |
if not parent.exists(): | |
print(f"Note: The directory {parent} does not exist.") | |
elif not parent.is_dir(): | |
print(f"Note: {parent} exists but is not a directory.") | |
file_path_for_processing = str(path) | |
return file_path_for_processing | |
def load_and_chunk_document(file_path: str) -> List[Document]: | |
"""Load and chunk the document based on file type.""" | |
if not os.path.exists(file_path): | |
raise FileNotFoundError(f"File not found: {file_path}") | |
_, file_extension = os.path.splitext(file_path.lower()) | |
if file_extension == '.csv': | |
loader = CSVLoader(file_path) | |
elif file_extension == '.pdf': | |
loader = PyPDFLoader(file_path) | |
elif file_extension == '.docx': | |
loader = Docx2txtLoader(file_path) | |
else: | |
raise ValueError(f"Unsupported file type: {file_extension}") | |
documents = loader.load() | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) | |
return text_splitter.split_documents(documents) | |
def agent_1(file_path: str) -> str: | |
"""Agent 1: Load, chunk, embed, and store document in Qdrant.""" | |
try: | |
chunks = load_and_chunk_document(file_path) | |
points = [] | |
for i, chunk in enumerate(chunks): | |
vector = embeddings.embed_query(chunk.page_content) | |
points.append(PointStruct(id=i, vector=vector, payload={"text": chunk.page_content})) | |
qdrant.upsert( | |
collection_name="opportunities", | |
points=points | |
) | |
return f"Document processed and stored in Qdrant. {len(chunks)} chunks created." | |
except Exception as e: | |
print(f"Error in agent_1: {str(e)}") | |
return f"Error processing document: {str(e)}" | |
def agent_2() -> Dict[str, Any]: | |
"""Agent 2: Evaluate opportunity based on MEDDIC criteria.""" | |
try: | |
results = qdrant.scroll(collection_name="opportunities", limit=100) | |
if not results or len(results[0]) == 0: | |
raise ValueError("No documents found in Qdrant") | |
full_text = " ".join([point.payload.get("text", "") for point in results[0]]) | |
meddic_template = """ | |
Analyze the following opportunity information using the MEDDIC sales methodology: | |
{opportunity_info} | |
Assign an overall opportunity score (1-100) with 100 means that the opportunity is a sure win. | |
Provide a Summary of the opportunity. | |
Evaluate the opportunity based on each MEDDIC criterion and assign a score for each criterion: | |
1. Metrics | |
2. Economic Buyer | |
3. Decision Criteria | |
4. Decision Process | |
5. Identify Pain | |
6. Champion | |
Format your response as follows: | |
Summary: [Opportunity Summary] | |
Score: [Overall Opportunity Score between 1 to 100 based on MEDDIC criteria] | |
MEDDIC Evaluation: | |
- Metrics: [Score on Metrics, Evaluation on Metrics criterion] | |
- Economic Buyer: [Score on Economic Buyer, Evaluation on Economic Buyer criterion] | |
- Decision Criteria: [Score on Decision Criteria, Evaluation on Decision Criteria criterion] | |
- Decision Process: [Score on Decision Process, Evaluation on Decision Process criterion] | |
- Identify Pain: [Score on Identify Pain, Evaluation on Identify Pain criterion] | |
- Champion: [Score on Champion, Evaluation on Champion criterion] | |
""" | |
meddic_prompt = PromptTemplate(template=meddic_template, input_variables=["opportunity_info"]) | |
meddic_chain = meddic_prompt | llm | |
response = meddic_chain.invoke({"opportunity_info": full_text}) | |
if isinstance(response, AIMessage): | |
response_content = response.content | |
elif isinstance(response, str): | |
response_content = response | |
else: | |
raise ValueError(f"Unexpected response type: {type(response)}") | |
# Parse the response content | |
lines = response_content.split('\n') | |
summary = next((line.split('Summary:')[1].strip() for line in lines if line.startswith('Summary:')), 'N/A') | |
score = next((int(line.split('Score:')[1].strip()) for line in lines if line.startswith('Score:')), 0) | |
meddic_eval = {} | |
current_criterion = None | |
for line in lines: | |
if line.strip().startswith('-'): | |
parts = line.split(':', 1) | |
if len(parts) == 2: | |
current_criterion = parts[0].strip('- ') | |
meddic_eval[current_criterion] = parts[1].strip() | |
elif current_criterion and line.strip(): | |
meddic_eval[current_criterion] += ' ' + line.strip() | |
return { | |
'summary': summary, | |
'score': score, | |
'meddic_evaluation': meddic_eval | |
} | |
except Exception as e: | |
print(f"Error in agent_2: {str(e)}") | |
return { | |
'summary': "Error occurred during evaluation", | |
'score': 0, | |
'meddic_evaluation': str(e) | |
} | |
def agent_3(meddic_evaluation: Dict[str, Any]) -> Dict[str, Any]: | |
"""Agent 3: Suggest next best action and talking points.""" | |
try: | |
next_action_template = """ | |
Based on the following MEDDIC evaluation of an opportunity: | |
{meddic_evaluation} | |
Suggest the next best action for the upcoming customer meeting and provide the top 3 talking points. | |
Format your response as follows: | |
Next Action: [Your suggested action] | |
Talking Points: | |
1. [First talking point] | |
2. [Second talking point] | |
3. [Third talking point] | |
""" | |
next_action_prompt = PromptTemplate(template=next_action_template, input_variables=["meddic_evaluation"]) | |
next_action_chain = next_action_prompt | llm | |
response = next_action_chain.invoke({"meddic_evaluation": json.dumps(meddic_evaluation)}) | |
if isinstance(response, AIMessage): | |
response_content = response.content | |
elif isinstance(response, str): | |
response_content = response | |
else: | |
raise ValueError(f"Unexpected response type: {type(response)}") | |
# Parse the response content | |
lines = response_content.split('\n') | |
next_action = next((line.split('Next Action:')[1].strip() for line in lines if line.startswith('Next Action:')), 'N/A') | |
talking_points = [line.split('.')[1].strip() for line in lines if line.strip().startswith(('1.', '2.', '3.'))] | |
return { | |
'next_action': next_action, | |
'talking_points': talking_points | |
} | |
except Exception as e: | |
print(f"Error in agent_3: {str(e)}") | |
return { | |
'next_action': "Error occurred while suggesting next action", | |
'talking_points': [str(e)] | |
} | |
def process_document(state: State) -> State: | |
print("Agent 1: Processing document...") | |
file_path = state.file_path | |
result = agent_1(file_path) | |
return State(file_path=state.file_path, document_processed=result) | |
def evaluate_opportunity(state: State) -> State: | |
print("Agent 2: Evaluating opportunity...") | |
result = agent_2() | |
return State(file_path=state.file_path, document_processed=state.document_processed, opportunity_evaluation=result) | |
def suggest_next_action(state: State) -> State: | |
print("Agent 3: Suggesting next actions...") | |
result = agent_3(state.opportunity_evaluation) | |
return State(file_path=state.file_path, document_processed=state.document_processed, opportunity_evaluation=state.opportunity_evaluation, next_action=result) | |
def define_graph() -> StateGraph: | |
workflow = StateGraph(State) | |
workflow.add_node("process_document", process_document) | |
workflow.add_node("evaluate_opportunity", evaluate_opportunity) | |
workflow.add_node("suggest_next_action", suggest_next_action) | |
workflow.set_entry_point("process_document") | |
workflow.add_edge("process_document", "evaluate_opportunity") | |
workflow.add_edge("evaluate_opportunity", "suggest_next_action") | |
return workflow | |
def run_analysis(file_path: str) -> Dict[str, Any]: | |
if not os.path.exists(file_path): | |
return {"error": f"File not found: {file_path}"} | |
graph = define_graph() | |
initial_state = State(file_path=file_path) | |
try: | |
app = graph.compile() | |
final_state = app.invoke(initial_state) | |
# Convert the final state to a dictionary manually | |
structured_results = { | |
"file_path": final_state["file_path"], | |
"document_processed": final_state["document_processed"], | |
"opportunity_evaluation": final_state["opportunity_evaluation"], | |
"next_action": final_state["next_action"] | |
} | |
# Print a summary of the results | |
print("\n--- Analysis Results ---") | |
print(f"Document Processing: {'Successful' if 'Error' not in structured_results['document_processed'] else 'Failed'}") | |
print(f"Details: {structured_results['document_processed']}") | |
if isinstance(structured_results['opportunity_evaluation'], dict): | |
print("\nOpportunity Evaluation:") | |
print(f"Summary: {structured_results['opportunity_evaluation'].get('summary', 'N/A')}") | |
print(f"Score: {structured_results['opportunity_evaluation'].get('score', 'N/A')}") | |
print("MEDDIC Evaluation:") | |
for criterion, evaluation in structured_results['opportunity_evaluation'].get('meddic_evaluation', {}).items(): | |
print(f"{criterion}: {evaluation}") | |
else: | |
print("\nOpportunity Evaluation:") | |
print(f"Error: {structured_results['opportunity_evaluation']}") | |
if isinstance(structured_results['next_action'], dict): | |
print("\nNext Action:") | |
print(f"Action: {structured_results['next_action'].get('next_action', 'N/A')}") | |
print("Talking Points:") | |
for i, point in enumerate(structured_results['next_action'].get('talking_points', []), 1): | |
print(f" {i}. {point}") | |
else: | |
print("\nNext Action:") | |
print(f"Error: {structured_results['next_action']}") | |
return structured_results | |
except Exception as e: | |
print(f"An error occurred during analysis: {str(e)}") | |
return {"error": str(e)} | |
def create_opportunity_review_report(structured_results): | |
opportunity_review_report = "" | |
opportunity_review_report += "**Analysis Results**\n\n" | |
if 'Error' in structured_results['document_processed']: | |
opportunity_review_report += f"Opportunity Analysis Failed\n" | |
else: | |
if isinstance(structured_results['opportunity_evaluation'], dict): | |
opportunity_review_report += f"**Summary:** {structured_results['opportunity_evaluation'].get('summary', 'N/A')}\n\n" | |
opportunity_review_report += f"**Score:** {structured_results['opportunity_evaluation'].get('score', 'N/A')}\n\n" | |
opportunity_review_report += "**MEDDIC Evaluation:**\n\n" | |
for criterion, evaluation in structured_results['opportunity_evaluation'].get('meddic_evaluation', {}).items(): | |
opportunity_review_report += f"**{criterion}:** {evaluation}\n" | |
if isinstance(structured_results['next_action'], dict): | |
opportunity_review_report += "\n\n**Next Steps**\n\n" | |
opportunity_review_report += f"{structured_results['next_action'].get('next_action', 'N/A')}\n\n" | |
opportunity_review_report += "**Talking Points:**\n\n" | |
for i, point in enumerate(structured_results['next_action'].get('talking_points', []), 1): | |
opportunity_review_report += f" {i}. {point}\n" | |
file_path = "reports/HSBC Opportunity Review Report.md" | |
save_md_file(file_path, opportunity_review_report) | |
return opportunity_review_report | |
def save_md_file(file_path, file_content): | |
if os.path.exists(file_path): | |
os.remove(file_path) | |
print(f"Existing file deleted: {file_path}") | |
with open(file_path, 'w', encoding='utf-8') as md_file: | |
md_file.write(file_content) |