Spaces:
Runtime error
Runtime error
import os | |
import configparser | |
# config = configparser.ConfigParser() | |
# config.read('./secrets.ini') | |
# openai_api_key = config['OPENAI']['OPENAI_API_KEY'] | |
# serper_api_key = config['SERPER']['SERPER_API_KEY'] | |
# serp_api_key = config['SERPAPI']['SERPAPI_API_KEY'] | |
# os.environ.update({'OPENAI_API_KEY': openai_api_key}) | |
# os.environ.update({'SERPER_API_KEY': serper_api_key}) | |
# os.environ.update({'SERPAPI_API_KEY': serp_api_key}) | |
from typing import List, Union | |
import re | |
import json | |
import pandas as pd | |
from langchain import SerpAPIWrapper, LLMChain | |
from langchain.agents import Tool, AgentType, AgentExecutor, LLMSingleActionAgent, AgentOutputParser | |
from langchain.chat_models import ChatOpenAI | |
from langchain.chains import LLMChain, SimpleSequentialChain | |
from langchain.chains.query_constructor.base import AttributeInfo | |
from langchain.document_loaders import DataFrameLoader, SeleniumURLLoader | |
from langchain.embeddings import OpenAIEmbeddings | |
from langchain.indexes import VectorstoreIndexCreator | |
from langchain.prompts import PromptTemplate, StringPromptTemplate, load_prompt, BaseChatPromptTemplate | |
from langchain.llms import OpenAI | |
from langchain.retrievers.self_query.base import SelfQueryRetriever | |
from langchain.schema import AgentAction, AgentFinish, HumanMessage | |
from langchain.vectorstores import DocArrayInMemorySearch, Chroma | |
stage_analyzer_inception_prompt = load_prompt("./templates/stage_analyzer_inception_prompt_template.json") | |
llm = ChatOpenAI(model='gpt-3.5-turbo', temperature=0.0) | |
stage_analyzer_chain = LLMChain( | |
llm=llm, | |
prompt=stage_analyzer_inception_prompt, | |
verbose=False, | |
output_key="stage_number") | |
df = pd.read_json('./data/unified_wine_data.json', encoding='utf-8', lines=True) | |
loader =DataFrameLoader(data_frame=df, page_content_column='name') | |
docs = loader.load() | |
embeddings = OpenAIEmbeddings() | |
metadata_field_info = [ | |
AttributeInfo( | |
name="body", | |
description="1-5 rating for the body of wine", | |
type="int", | |
), | |
AttributeInfo( | |
name="sweetness", | |
description="1-5 rating for the sweetness of wine", | |
type="int", | |
), | |
AttributeInfo( | |
name="alcohol", | |
description="1-5 rating for the alcohol of wine", | |
type="int", | |
), | |
AttributeInfo( | |
name="price", | |
description="The price of the wine", | |
type="int", | |
), | |
AttributeInfo( | |
name="rating", | |
description="1-5 rating for the wine", | |
type="float" | |
), | |
AttributeInfo( | |
name="wine_type", | |
description="The type of wine. It can be '๋ ๋', '๋ก์ ', '์คํํด๋ง', 'ํ์ดํธ', '๋์ ํธ', '์ฃผ์ ๊ฐํ'", | |
type="string" | |
), | |
AttributeInfo( | |
name="country", | |
description="The country of wine. It can be '๊ธฐํ ์ ๋๋ฅ', '๊ธฐํ๊ตฌ๋๋ฅ', '๋ด์ง๋๋', '๋ ์ผ', '๋ฏธ๊ตญ', '์คํ์ธ', '์๋ฅดํจํฐ๋', '์ดํ๋ฆฌ์', '์น ๋ ', 'ํฌ๋ฃจํฌ์นผ', 'ํ๋์ค', 'ํธ์ฃผ'", | |
type="float" | |
), | |
] | |
vectorstore = Chroma.from_documents(docs, embeddings) | |
document_content_description = "Database of a wine" | |
llm = OpenAI(temperature=0) | |
retriever = SelfQueryRetriever.from_llm( | |
llm, vectorstore, document_content_description, metadata_field_info, verbose=False | |
) # Added missing closing parenthesis | |
def search_with_url(query): | |
return SeleniumURLLoader(urls=[query]).load() | |
index = VectorstoreIndexCreator( | |
vectorstore_cls=DocArrayInMemorySearch | |
).from_loaders([loader]) | |
search = SerpAPIWrapper() | |
tools = [ | |
Tool( | |
name="Wine database", | |
func=retriever.get_relevant_documents, | |
description=""" | |
Database about the wines in wine store. You can get information such as the price of the wine, purchase URL, features, rating information, and more. | |
You can search wines with the following attributes: | |
- body: 1-5 rating int for the body of wine. You have to specify greater than or less than. For example, if you want to search for wines with a body rating of less than 3, enter 'body: gt 0 lt 3' | |
- price: The price range of the wine. Please enter the price range in the form of range. For example, if you want to search for wines that cost less than 20,000 won, enter 'price: gt 0 lt20000' | |
- rating: 1-5 rating float for the wine. You have to specify greater than or less than. For example, if you want to search for wines with a rating of less than 3, enter 'rating: gt 0 lt 3' | |
- wine_type: The type of wine. It can be '๋ ๋', '๋ก์ ', '์คํํด๋ง', 'ํ์ดํธ', '๋์ ํธ', '์ฃผ์ ๊ฐํ' | |
- name: The name of wine. ์ ๋ ฅํ ๋๋ '์์ธ ์ด๋ฆ์ "๋น๋ ์กฐ์" ์ ๋๋ค' ์ด๋ฐ ์์ผ๋ก ์ ๋ ฅํด์ฃผ์ธ์. | |
""" | |
), | |
Tool( | |
name = "Search specific wine with url", | |
func=search_with_url, | |
description="Search specific wine with url. Query must be url" | |
), | |
Tool( | |
name = "Wine database 2", | |
func=index.query, | |
description="Database about the wines in wine store. You can use this tool if you're having trouble getting information from the wine database tool above. Query must be in String" | |
), | |
Tool( | |
name = "Search", | |
func=search.run, | |
description="Useful for when you need to ask with search. Search in English only." | |
), | |
] | |
template = """ | |
Your role is a chatbot that asks customers questions about wine and makes recommendations. | |
Never forget your name is "์ด์ฐ์ ". | |
Keep your responses in short length to retain the user's attention. | |
Only generate one response at a time! When you are done generating, end with '<END_OF_TURN>' to give the user a chance to respond. | |
Responses should be in Korean. | |
Complete the objective as best you can. You have access to the following tools: | |
{tools} | |
Use the following format: | |
Thought: you should always think about what to do | |
Action: the action to take, should be one of [{tool_names}] | |
Action Input: the input to the action | |
Observation: the result of the action | |
(this Thought/Action/Action Input/Observation can repeat N times) | |
Thought: I now know the final answer | |
์ด์ฐ์ : the final response to the user | |
You must respond according to the conversation stage within the triple backticks and conversation history within in '======'. | |
Current conversation stage: | |
```{conversation_stage}``` | |
Conversation history: | |
======= | |
{conversation_history} | |
======= | |
Last user saying: {input} | |
{agent_scratchpad} | |
""" | |
conversation_stages_dict = { | |
"1": "Start: Start the conversation by introducing yourself. Be polite and respectful while maintaining a professional tone of conversation.", | |
"2": "Analyze: Identify the user's preferences in order to make wine recommendations. Ask questions to understand the preferences of your users in order to make wine recommendations. Ask only one question at a time. The wine database tool is not available here.", | |
"3": "Recommendation: Recommend the right wine based on the user's preferences identified. Recommendations must be limited to wines in wine database, and you can use tools to do this.", | |
"4": "After recommendation: After making a wine recommendation, it asks if the user likes the wine you recommended, and if they do, it provides a link to it. Otherwise, it takes you back to the recommendation stage.", | |
"5": "Close: When you're done, say goodbye to the user.", | |
"6": "Question and Answering: This is where you answer the user's questions. To answer user question, you can use the search tool or the wine database tool.", | |
"7": "Not in the given steps: This step is for when none of the steps between 1 and 6 apply.", | |
} | |
# Set up a prompt template | |
class CustomPromptTemplate(StringPromptTemplate): | |
# The template to use | |
template: str | |
# The list of tools available | |
tools: List[Tool] | |
def format(self, **kwargs) -> str: | |
stage_number = kwargs.pop("stage_number") | |
kwargs["conversation_stage"] = conversation_stages_dict[stage_number] | |
# Get the intermediate steps (AgentAction, Observation tuples) | |
# Format them in a particular way | |
intermediate_steps = kwargs.pop("intermediate_steps") | |
thoughts = "" | |
for action, observation in intermediate_steps: | |
thoughts += action.log | |
thoughts += f"\nObservation: {observation}\nThought: " | |
# Set the agent_scratchpad variable to that value | |
kwargs["agent_scratchpad"] = thoughts | |
# Create a tools variable from the list of tools provided | |
kwargs["tools"] = "\n".join([f"{tool.name}: {tool.description}" for tool in self.tools]) | |
# Create a list of tool names for the tools provided | |
kwargs["tool_names"] = ", ".join([tool.name for tool in self.tools]) | |
return self.template.format(**kwargs) | |
prompt = CustomPromptTemplate( | |
template=template, | |
tools=tools, | |
input_variables=["input", "intermediate_steps", "conversation_history", "stage_number"] | |
) | |
class CustomOutputParser(AgentOutputParser): | |
def parse(self, llm_output: str) -> Union[AgentAction, AgentFinish]: | |
# Check if agent should finish | |
if "์ด์ฐ์ : " in llm_output: | |
return AgentFinish( | |
# Return values is generally always a dictionary with a single `output` key | |
# It is not recommended to try anything else at the moment :) | |
return_values={"output": llm_output.split("์ด์ฐ์ : ")[-1].strip()}, | |
log=llm_output, | |
) | |
# Parse out the action and action input | |
regex = r"Action\s*\d*\s*:(.*?)\nAction\s*\d*\s*Input\s*\d*\s*:[\s]*(.*)" | |
match = re.search(regex, llm_output, re.DOTALL) | |
if not match: | |
raise ValueError(f"Could not parse LLM output: `{llm_output}`") | |
action = match.group(1).strip() | |
action_input = match.group(2) | |
# Return the action and action input | |
return AgentAction(tool=action, tool_input=action_input.strip(" ").strip('"'), log=llm_output) | |
output_parser = CustomOutputParser() | |
llm_chain = LLMChain(llm=ChatOpenAI(model='gpt-4', temperature=0.0), prompt=prompt, verbose=False,) | |
tool_names = [tool.name for tool in tools] | |
agent = LLMSingleActionAgent( | |
llm_chain=llm_chain, | |
output_parser=output_parser, | |
stop=["\nObservation:"], | |
allowed_tools=tool_names | |
) | |
agent_executor = AgentExecutor.from_agent_and_tools(agent=agent, tools=tools, verbose=False) | |
import gradio as gr | |
# user_response, stage_history, conversation_history, pre_conversation_history = "", "", """""", """""" | |
stage_description = "" | |
for key, value in conversation_stages_dict.items(): | |
stage_description += f"{key}.{value}\n" | |
with gr.Blocks(css='#chatbot .overflow-y-auto{height:750px}') as demo: | |
with gr.Row(): | |
gr.HTML("""<div style="text-align: center; max-width: 500px; margin: 0 auto;"> | |
<div> | |
<h1>ChatWine</h1> | |
</div> | |
<p style="margin-bottom: 10px; font-size: 94%"> | |
LinkedIn <a href="https://www.linkedin.com/company/audrey-ai/about/">Audrey.ai</a> | |
</p> | |
</div>""") | |
chatbot = gr.Chatbot() | |
msg = gr.Textbox(label='User input') | |
stage_history = gr.Textbox(value="stage history: ", interactive=False, label='stage history') | |
submit_btn = gr.Button("์ ์ก") | |
clear_btn = gr.ClearButton([msg, chatbot, stage_history]) | |
stage_info = gr.Textbox(value=stage_description, interactive=False, label='stage description') | |
def answer(user_response, chat_history, stage_history): | |
chat_history = chat_history or [] | |
stage_history = stage_history or "" | |
pre_conversation_history = "" | |
for idx, chat in enumerate(chat_history): | |
pre_conversation_history += f"User: {chat[0]} <END_OF_TURN>\n" | |
pre_conversation_history += f"์ด์ฐ์ : {chat[1]} <END_OF_TURN>\n" | |
conversation_history = pre_conversation_history + f"User: {user_response} <END_OF_TURN>\n" | |
stage_number = stage_analyzer_chain.run({'conversation_history': conversation_history, 'stage_history': stage_history}) | |
stage_number = stage_number[-1] | |
stage_history += stage_number if stage_history == "stage history: " else ", " + stage_number | |
response = agent_executor.run({'input':user_response, 'conversation_history': pre_conversation_history, 'stage_number': stage_number}) | |
# conversation_history += "์ด์ฐ์ : " + response + "\n" | |
response = response.split('<END_OF_TURN>')[0] | |
chat_history.append((user_response, response)) | |
return "", chat_history, stage_history | |
def user(user_message, history): | |
return gr.update(value="", interactive=False), history + [[user_message, None]] | |
# def clear(*args): | |
# global conversation_history, pre_conversation_history, stage_history, answer_token | |
# answer_token = '' | |
# conversation_history, pre_conversation_history, stage_history = """""", """""", "" | |
def clear(): | |
pass | |
clear_btn.click(fn=clear) | |
submit_btn.click(answer, [msg, chatbot, stage_history], [msg, chatbot, stage_history]) | |
msg.submit(answer, [msg, chatbot, stage_history], [msg, chatbot, stage_history]) | |
demo.launch() |