import os import configparser # config = configparser.ConfigParser() # config.read('./secrets.ini') # openai_api_key = config['OPENAI']['OPENAI_API_KEY'] # serper_api_key = config['SERPER']['SERPER_API_KEY'] # serp_api_key = config['SERPAPI']['SERPAPI_API_KEY'] # os.environ.update({'OPENAI_API_KEY': openai_api_key}) # os.environ.update({'SERPER_API_KEY': serper_api_key}) # os.environ.update({'SERPAPI_API_KEY': serp_api_key}) from typing import List, Union import re import json import pandas as pd from langchain import SerpAPIWrapper, LLMChain from langchain.agents import Tool, AgentType, AgentExecutor, LLMSingleActionAgent, AgentOutputParser from langchain.chat_models import ChatOpenAI from langchain.chains import LLMChain, SimpleSequentialChain from langchain.chains.query_constructor.base import AttributeInfo from langchain.document_loaders import DataFrameLoader, SeleniumURLLoader from langchain.embeddings import OpenAIEmbeddings from langchain.indexes import VectorstoreIndexCreator from langchain.prompts import PromptTemplate, StringPromptTemplate, load_prompt, BaseChatPromptTemplate from langchain.llms import OpenAI from langchain.retrievers.self_query.base import SelfQueryRetriever from langchain.schema import AgentAction, AgentFinish, HumanMessage from langchain.vectorstores import DocArrayInMemorySearch, Chroma stage_analyzer_inception_prompt = load_prompt("./templates/stage_analyzer_inception_prompt_template.json") llm = ChatOpenAI(model='gpt-3.5-turbo', temperature=0.0) stage_analyzer_chain = LLMChain( llm=llm, prompt=stage_analyzer_inception_prompt, verbose=False, output_key="stage_number") user_response_prompt = load_prompt("./templates/user_response_prompt.json") llm = ChatOpenAI(model='gpt-3.5-turbo', temperature=0.0) user_response_chain = LLMChain( llm=llm, prompt=user_response_prompt, verbose=False, # 과정을 출력할지 output_key="user_responses" ) df = pd.read_json('./data/unified_wine_data.json', encoding='utf-8', lines=True) loader =DataFrameLoader(data_frame=df, page_content_column='name') docs = loader.load() embeddings = OpenAIEmbeddings() metadata_field_info = [ AttributeInfo( name="body", description="1-5 rating for the body of wine", type="int", ), AttributeInfo( name="sweetness", description="1-5 rating for the sweetness of wine", type="int", ), AttributeInfo( name="alcohol", description="1-5 rating for the alcohol of wine", type="int", ), AttributeInfo( name="price", description="The price of the wine", type="int", ), AttributeInfo( name="rating", description="1-5 rating for the wine", type="float" ), AttributeInfo( name="wine_type", description="The type of wine. It can be '레드', '로제', '스파클링', '화이트', '디저트', '주정강화'", type="string" ), AttributeInfo( name="country", description="The country of wine. It can be '기타 신대륙', '기타구대륙', '뉴질랜드', '독일', '미국', '스페인', '아르헨티나', '이탈리아', '칠레', '포루투칼', '프랑스', '호주'", type="float" ), ] vectorstore = Chroma.from_documents(docs, embeddings) document_content_description = "Database of a wine" llm = OpenAI(temperature=0) retriever = SelfQueryRetriever.from_llm( llm, vectorstore, document_content_description, metadata_field_info, verbose=False ) # Added missing closing parenthesis def search_with_url(query): return SeleniumURLLoader(urls=[query]).load() index = VectorstoreIndexCreator( vectorstore_cls=DocArrayInMemorySearch ).from_loaders([loader]) search = SerpAPIWrapper() tools = [ Tool( name="Wine database", func=retriever.get_relevant_documents, description=""" Database about the wines in wine store. You can get information such as the price of the wine, purchase URL, features, rating information, and more. You can search wines with the following attributes: - body: 1-5 rating int for the body of wine. You have to specify greater than or less than. For example, if you want to search for wines with a body rating of less than 3, enter 'body: gt 0 lt 3' - price: The price range of the wine. Please enter the price range in the form of range. For example, if you want to search for wines that cost less than 20,000 won, enter 'price: gt 0 lt20000' - rating: 1-5 rating float for the wine. You have to specify greater than or less than. For example, if you want to search for wines with a rating of less than 3, enter 'rating: gt 0 lt 3' - wine_type: The type of wine. It can be '레드', '로제', '스파클링', '화이트', '디저트', '주정강화' - name: The name of wine. 입력할 때는 '와인 이름은 "비냐 조잘" 입니다' 이런 식으로 입력해주세요. """ ), Tool( name = "Search specific wine with url", func=search_with_url, description="Search specific wine with url. Query must be url" ), Tool( name = "Wine database 2", func=index.query, description="Database about the wines in wine store. You can use this tool if you're having trouble getting information from the wine database tool above. Query must be in String" ), Tool( name = "Search", func=search.run, description="Useful for when you need to ask with search. Search in English only." ), ] template = """ Your role is a chatbot that asks customers questions about wine and makes recommendations. Never forget your name is "이우선". Keep your responses in short length to retain the user's attention. Only generate one response at a time! When you are done generating, end with '' to give the user a chance to respond. Responses should be in Korean. Complete the objective as best you can. You have access to the following tools: {tools} Use the following format: Thought: you should always think about what to do Action: the action to take, should be one of [{tool_names}] Action Input: the input to the action Observation: the result of the action ... (this Thought/Action/Action Input/Observation can repeat N times) Thought: I now know the final answer 이우선: the final response to the user You must respond according to the conversation stage within the triple backticks and conversation history within in '======'. Current conversation stage: ```{conversation_stage}``` Conversation history: ======= {conversation_history} ======= Last user saying: {input} {agent_scratchpad} """ conversation_stages_dict = { "1": "Start: Start the conversation by introducing yourself. Be polite and respectful while maintaining a professional tone of conversation.", "2": "Analyze: Identify the user's preferences in order to make wine recommendations. Ask questions to understand the preferences of your users in order to make wine recommendations. Ask only one question at a time. The wine database tool is not available here.", "3": "Recommendation: Recommend the right wine based on the user's preferences identified. Recommendations must be limited to wines in wine database, and you can use tools to do this.", "4": "After recommendation: After making a wine recommendation, it asks if the user likes the wine you recommended, and if they do, it provides a link and image to it. Otherwise, it takes you back to the recommendation stage.", "5": "Close: When you're done, say goodbye to the user.", "6": "Question and Answering: This is where you answer the user's questions. To answer user question, you can use the search tool or the wine database tool.", "7": "Not in the given steps: This step is for when none of the steps between 1 and 6 apply.", } # Set up a prompt template class CustomPromptTemplate(StringPromptTemplate): # The template to use template: str # The list of tools available tools: List[Tool] def format(self, **kwargs) -> str: stage_number = kwargs.pop("stage_number") kwargs["conversation_stage"] = conversation_stages_dict[stage_number] # Get the intermediate steps (AgentAction, Observation tuples) # Format them in a particular way intermediate_steps = kwargs.pop("intermediate_steps") thoughts = "" for action, observation in intermediate_steps: thoughts += action.log thoughts += f"\nObservation: {observation}\nThought: " # Set the agent_scratchpad variable to that value kwargs["agent_scratchpad"] = thoughts # Create a tools variable from the list of tools provided kwargs["tools"] = "\n".join([f"{tool.name}: {tool.description}" for tool in self.tools]) # Create a list of tool names for the tools provided kwargs["tool_names"] = ", ".join([tool.name for tool in self.tools]) return self.template.format(**kwargs) prompt = CustomPromptTemplate( template=template, tools=tools, input_variables=["input", "intermediate_steps", "conversation_history", "stage_number"] ) class CustomOutputParser(AgentOutputParser): def parse(self, llm_output: str) -> Union[AgentAction, AgentFinish]: # Check if agent should finish if "이우선: " in llm_output: return AgentFinish( # Return values is generally always a dictionary with a single `output` key # It is not recommended to try anything else at the moment :) return_values={"output": llm_output.split("이우선: ")[-1].strip()}, log=llm_output, ) # Parse out the action and action input regex = r"Action\s*\d*\s*:(.*?)\nAction\s*\d*\s*Input\s*\d*\s*:[\s]*(.*)" match = re.search(regex, llm_output, re.DOTALL) if not match: raise ValueError(f"Could not parse LLM output: `{llm_output}`") action = match.group(1).strip() action_input = match.group(2) # Return the action and action input return AgentAction(tool=action, tool_input=action_input.strip(" ").strip('"'), log=llm_output) output_parser = CustomOutputParser() llm_chain = LLMChain(llm=ChatOpenAI(model='gpt-4', temperature=0.0), prompt=prompt, verbose=False,) tool_names = [tool.name for tool in tools] agent = LLMSingleActionAgent( llm_chain=llm_chain, output_parser=output_parser, stop=["\nObservation:"], allowed_tools=tool_names ) agent_executor = AgentExecutor.from_agent_and_tools(agent=agent, tools=tools, verbose=True) import gradio as gr # user_response, stage_history, conversation_history, pre_conversation_history = "", "", """""", """""" stage_description = "" for key, value in conversation_stages_dict.items(): stage_description += f"{key}.{value}\n" with gr.Blocks(css='#chatbot .overflow-y-auto{height:750px}') as demo: with gr.Row(): gr.HTML("""

ChatWine

LinkedIn Audrey.ai

""") chatbot = gr.Chatbot() msg = gr.Textbox(label='User input') samples = [["이번 주에 친구들과 모임이 있는데, 훌륭한 와인 한 병을 추천해줄래?"], ["입문자에게 좋은 와인을 추천해줄래?"], ["보르도와 부르고뉴 와인의 차이점은 뭐야?"]] user_response_examples = gr.Dataset(samples=samples, components=[msg], type="index") stage_history = gr.Textbox(value="stage history: ", interactive=False, label='stage history') submit_btn = gr.Button("전송") clear_btn = gr.ClearButton([msg, chatbot, stage_history]) stage_info = gr.Textbox(value=stage_description, interactive=False, label='stage description') def load_example(example_id): global samples return samples[example_id][0] def answer(user_response, chat_history, stage_history): global samples chat_history = chat_history or [] stage_history = stage_history or "" pre_conversation_history = "" for idx, chat in enumerate(chat_history): pre_conversation_history += f"User: {chat[0]} \n" pre_conversation_history += f"이우선: {chat[1]} \n" conversation_history = pre_conversation_history + f"User: {user_response} \n" stage_number = stage_analyzer_chain.run({'conversation_history': conversation_history, 'stage_history': stage_history}) stage_number = stage_number[-1] stage_history += stage_number if stage_history == "stage history: " else ", " + stage_number response = agent_executor.run({'input':user_response, 'conversation_history': pre_conversation_history, 'stage_number': stage_number}) conversation_history += "이우선: " + response + "\n" response = response.split('')[0] chat_history.append((user_response, response)) user_response_examples = [] for user_response_example in user_response_chain.run({'conversation_history': conversation_history}).split('|'): user_response_examples.append([user_response_example]) samples = user_response_examples return "", chat_history, stage_history, gr.Dataset.update(samples=samples) def clear(user_response_examples): global samples samples = [["이번 주에 친구들과 모임이 있는데, 훌륭한 와인 한 병을 추천해줄래?"], ["입문자에게 좋은 와인을 추천해줄래?"], ["보르도와 부르고뉴 와인의 차이점은 뭐야?"]] return gr.Dataset.update(samples=samples) clear_btn.click(fn=clear, inputs=[user_response_examples], outputs=[user_response_examples]) user_response_examples.click(load_example, inputs=[user_response_examples], outputs=[msg]) submit_btn.click(answer, [msg, chatbot, stage_history], [msg, chatbot, stage_history, user_response_examples]) msg.submit(answer, [msg, chatbot, stage_history], [msg, chatbot, stage_history, user_response_examples]) demo.launch()