import tempfile import datetime import gradio as gr import janus_swi as janus from crewai import Agent, Task, Crew from crewai_tools import tool from crewai_tools import MDXSearchTool from crewai_tools import WebsiteSearchTool from langchain_anthropic import ChatAnthropic def add_today_date_to_string(input_string: str) -> str: """Adds today's date to the given string in a natural language format. Args: input_string (str): The original string to which the date will be added. Returns: str: The modified string with today's date added in a natural language format. """ today = formatted_date = today.strftime("%B %d, %Y") return f"# Date\nToday is {formatted_date}.\n# Query\n{input_string}" DOC_URL = '' MODEL_NAME = "claude-3-5-sonnet-20240620" llm = ChatAnthropic(model=MODEL_NAME, temperature=0.2, max_tokens=4096,) webs_tool = WebsiteSearchTool( website=DOC_URL, config=dict( llm=dict( provider="anthropic", config=dict( model=MODEL_NAME, temperature=0.2, # top_p=1, # stream=true, ), ), embedder=dict( provider="ollama", config=dict( model="mxbai-embed-large", # task_type="retrieval_document", # title="Embeddings", ), ), ) ) docs_tool = MDXSearchTool( mdx='', config=dict( llm=dict( provider="anthropic", config=dict( model=MODEL_NAME, temperature=0.2, # top_p=1, # stream=true, ), ), embedder=dict( provider="ollama", config=dict( model="mxbai-embed-large", # task_type="retrieval_document", # title="Embeddings", ), ), ) ) @tool("Prolog Query Engine") def prolog_query_engine(code: str, query: str) -> str: """Executes a Prolog query with additional Prolog code defining predicates and facts, and returns the results. Args: code: Prolog code defining predicates and facts. This code will be appended to knowledge base before executing the query. query: The Prolog query to execute. Returns: A string containing the results of the query, with each result on a new line. If the query fails, returns "False". """ janus.consult("") # Remove code block markers if present if '```' in code: code = code.split('```')[1].split('```')[0] # Write the provided Prolog code to a temporary file with open('', 'w') as f: f.write(code) # Consult the temporary file to load the provided Prolog code janus.consult("") # Execute the query and return the results results = janus.query(query) if results: return '\n'.join([str(r) for r in results]) else: return "False" # Define your agents with roles, goals, and tools programmer = Agent( role='Software Engineer', goal='Write Prolog code and a line of Prolog queries to answer user queries', backstory='''A software engineer with expertise in logic programming and experience using Prolog. Can translate user requests into Prolog code and execute queries to provide accurate results. Familiar with various Prolog concepts like recursion, backtracking, and unification.''', tools=[prolog_query_engine, docs_tool], llm=llm ) consultant = Agent( role='Consultant', goal='Answer user query and explain in simple English that even 8 year old kid can understand', backstory='''A friendly and patient consultant, skilled at explaining complex topics in a clear and simple way. Can understand the output of a software engineer and translate it into easy-to-understand explanations, even for someone as young as eight years old. Use simple words and examples to make learning fun and engaging.''', tools=[webs_tool], llm=llm ) # Define a task task1 = Task( name='Answer user query', description='Given a user query, write Prolog defining predicates and facts in query and build a Prolog query to access knowledge base and answer user query.\nUser query: {query}', agent=programmer, expected_output='''A report including:\n\ - User query\n\ - Prolog code with predicates and facts\n\ - Prolog query used to answer the user query\n\ - Result of running the Prolog query\n\ - A basic explanation of the result, clarifying how the Prolog query produced the answer''' ) # Define a task task2 = Task( name='Reply user query', description='Given answer to user query, improve the wordings in answer using your knowledge.', agent=consultant, expected_output='A clear, concise, and easy-to-understand explanation of the answer to the user query, suitable for an 8-year-old.' ) # Create a crew crew = Crew( agents=[programmer, consultant], tasks=[task1, task2], verbose=True) def yes_man(user_query, history): query = add_today_date_to_string(user_query) return crew.kickoff(inputs={"query": query}).raw gr.ChatInterface( yes_man, title="SSI/SSDI expert", description="Ask expert system any question", examples=["Is it eligible for a blind US citizen born in 1996 Jan 2 name John Doe to get SSI?"], ).queue().launch(auth=("tester", "8888p4ss"))