import os import json from langchain.chains import ConversationChain from langchain.schema import SystemMessage, HumanMessage, AIMessage import boto3 from typing import Dict from langchain_aws.retrievers import AmazonKnowledgeBasesRetriever from langchain_aws import ChatBedrock def create_bedrock_client(service_type: str = 'bedrock-runtime'): """Create and return a Bedrock client with default credentials""" return boto3.client( service_name=service_type, region_name='eu-west-2', ) def initialize_claude_chain(client): """Initialize and return a conversation chain with Claude""" llm = ChatBedrock( model='anthropic.claude-3-sonnet-20240229-v1:0', client=client, model_kwargs={ 'max_tokens': 1000, 'temperature': 0.6, 'top_p': 0.9, }, ) return llm def chat_with_claude(conversation: ConversationChain, user_input: str) -> Dict: """Send a message to Claude and get its response""" try: response = conversation.predict(input=user_input) return {'status': 'success', 'response': response} except Exception as e: return {'status': 'error', 'error': str(e)} def classify_kb_response(llm, response: str) -> list[str]: kb_classifer = [ SystemMessage( content="""You are a classifier determining the intent of llm responses. The llm can decide to consult 3 databases: ['eur lex', 'austrian law', 'german law'] Determine the intent and output the relevant classes. Examples: #1 Output: ['eur lex'] #2 Output: ['eur lex', 'austrian law'] #3 Output: ['eur lex', 'german law'] """ ), HumanMessage(content=f'Classify the below context:{response}'), ] response = llm.invoke(kb_classifer) res_str = str(response.content) res_parsed = [] if 'eur lex' in res_str.lower(): res_parsed.append('eur lex') if 'austrian law' in res_str.lower(): res_parsed.append('austrian law') if 'german law' in res_str.lower(): res_parsed.append('german law') return res_parsed def classify_exception_response(llm, response: str) -> bool: kb_classifer = [ SystemMessage( content="""You are a classifier determining the intent of llm responses. The llm can decide if the legal case falls upon an exception, and if so can consult a database. Determine the intent and output the relevant classes. Examples: #1 Output: 'no exception' #2 Output: 'database lookup' """ ), HumanMessage(content=f'Classify the below context:{response}'), ] response = llm.invoke(kb_classifer) res_str = str(response.content) return 'database lookup' in res_str.lower() def retrieve_knowledge_base_docs( client, knowledge_base_id: str, query: str, num_res: int = 25, ) -> list: retriever = AmazonKnowledgeBasesRetriever( client=client, knowledge_base_id=knowledge_base_id, retrieval_config={ 'vectorSearchConfiguration': {'numberOfResults': num_res} }, min_score_confidence=0.0, ) return retriever.invoke(query) case1 = """A new member is about to be appointed to the management body of a(n ECB) supervised institution in Austria. The prospective member is already a member of the board in three other group companies. The supervisor is concerned the potential board member does not have enough time to commit to this new position. Can the ECB as supervisor oppose this appointment? """ case2 = """A shareholder in a German bank did not ask approval to the ECB for the acquisition of its qualifying holding (10% or more) in the German bank. What can be the consequences? """ case3 = """A request for public access to ECB documents based on Decision ECB/2004/3 (2004/258/EC) aims at getting access to documents containing the proceedings and the outcome of the deliberations of a Governing Council meeting. Can the documents be disclosed under the applicable legal framework? """ case4 = """In relation to the Corporate sector purchase programme (CSPP): Is commercial paper an eligible asset under the CSPP? Under what conditions? """ SYSTEM = 'You are a lawyer, giving professional and accurate legal advice. Base your answers on relevant, up todate legal frameworks.' def run_pipeline(case_text: str | None = None): client = create_bedrock_client() retrieval_client = create_bedrock_client('bedrock-agent-runtime') chat = initialize_claude_chain(client) export = {} case = '' if case_text is None: case = case1 else: case = case_text human_input = f""" {case} FOLLOW THE STEP BY STEP PLAN: 1. Summarize the question 2. Identify the main actors and their relationships (if applicable) 3. Identify who is competent on the legal matter and reference the legal basis 4. Identify the legal issue based on the inital question. 5. Which knowledge bases are most likely to contain relevant information? (Eur lex, austrian law, german law) """ export['human_1'] = human_input main_wf_messages = [ SystemMessage( content=SYSTEM, ), HumanMessage( content=human_input, ), ] AUS_KB_ID = 'URGID9GFK8' GER_KB_ID = '19H00QAPZG' EU_KB_ID = 'ORMULTAIWL' response = chat.invoke(main_wf_messages) fifth_point = str(response.content).split('5. ')[-1] print(response.content) export['ai_first_5'] = str(response.content) main_wf_messages.append( AIMessage(content=str(response.content)), ) print('------') print(fifth_point) print('------') classification = classify_kb_response(chat, fifth_point) export['ai_lookup_decision_1'] = classification print('CLASS', classification) if 'eur lex' in classification: eu_res = retrieve_knowledge_base_docs( retrieval_client, EU_KB_ID, str(response.content) ) export['eur_lex_lookup'] = [str(r) for r in eu_res] main_wf_messages.append( HumanMessage( content=f'Determine which regulations apply (from eur lex):\n{eu_res}' ) ) response = chat.invoke(main_wf_messages) print(response.content) export['ai_eur_lex_eval'] = str(response.content) main_wf_messages.append( AIMessage(content=str(response.content)), ) if 'austrian law' in classification: aus_res = retrieve_knowledge_base_docs( retrieval_client, AUS_KB_ID, f'Determine which regulations apply (from austrian law):\n{case}', ) export['aus_lookup'] = [str(r) for r in aus_res] main_wf_messages.append( HumanMessage( content=f'Determine which regulations apply (from austrian law):\n{aus_res}' ) ) response = chat.invoke(main_wf_messages) print(response.content) export['ai_aus_eval'] = str(response.content) main_wf_messages.append( AIMessage(content=str(response.content)), ) if 'german law' in classification: ger_res = retrieve_knowledge_base_docs( retrieval_client, GER_KB_ID, f'Determine which regulations apply (from german law):\n{human_input}', ) export['ger_lookup'] = [str(r) for r in ger_res] main_wf_messages.append( HumanMessage( content=f'Determine which regulations apply (from german law):\n{ger_res}' ) ) response = chat.invoke(main_wf_messages) print(response.content) export['ai_ger_eval'] = str(response.content) main_wf_messages.append( AIMessage(content=str(response.content)), ) # EXCEPTION_PROMPT = """6. Determine if the case falls upon an exception and # retrieve the relevant regulation connected to the exception""" # export['human_exception'] = EXCEPTION_PROMPT # main_wf_messages.append( # HumanMessage( # content=EXCEPTION_PROMPT, # ) # ) # response = chat.invoke(main_wf_messages) # print(response.content) # export['ai_exception_investigation'] = str(response.content) # main_wf_messages.append( # AIMessage(content=str(response.content)), # ) # is_exception = classify_exception_response(chat, str(response.content)) # print('IS EXCEPTION CLASS:', is_exception) # if is_exception: # eu_res = retrieve_knowledge_base_docs( # retrieval_client, # EU_KB_ID, # str(response.content), # 10, # ) # export['ai_exception_lookup'] = [str(r) for r in eu_res] # main_wf_messages.append( # HumanMessage( # content=f'Determine if the regulations provide a strong legal basis for an exception:\n{eu_res}' # ) # ) # response = chat.invoke(main_wf_messages) # print(response.content) # export['ai_exception_eval'] = str(response.content) # main_wf_messages.append( # AIMessage(content=str(response.content)), # ) # FINAL_SUMMARY_PROMPT = """6.Based on the actors, the competencies, the relevant legal frameworks as mentioned above draft an answer to the FULL original question(s). Mention the legal issue(s), the relevant articles used in both eur lex and national and the applicable regime set up, the conditions if applicable, conclusion.""" export['human_summary'] = FINAL_SUMMARY_PROMPT main_wf_messages.append( HumanMessage( content=FINAL_SUMMARY_PROMPT, ) ) response = chat.invoke(main_wf_messages) print('----------FINAL RESPONSE------------') print(response.content) export['ai_final_summary'] = str(response.content) main_wf_messages.append( AIMessage(content=str(response.content)), ) main_wf_messages.append( HumanMessage( content="""You are a lawyer and you need explain your answer to the question(s). Your colleagues are saying you are wrong on at least one of these aspects: competencies, relevant laws, articles references (missing or hallucination). Draft a complete explanation of your reasoning, mention the legal issue(s), the correct complete relevant articles used in both eur lex and national and the applicable regime set up, the conditions if applicable, conclusions.""" ) ) response = chat.invoke(main_wf_messages) print('----------Complementary------------') print(response.content) export['ai_challange'] = str(response.content) return export if __name__ == '__main__': export = run_pipeline() print(json.dumps(export, indent=4)) with open('export.json', 'w') as f: json.dump(export, f, indent=4)