Spaces:
Sleeping
Sleeping
File size: 11,629 Bytes
2649124 54d66e1 4125626 c634ddd 2649124 c634ddd 41f933e c634ddd 2649124 3f815a2 c634ddd 2649124 c634ddd 2649124 c634ddd 2649124 c634ddd 2649124 c634ddd 2649124 c634ddd 2649124 e3479f5 2649124 54d66e1 2649124 54d66e1 15af633 2649124 c634ddd 54d66e1 2649124 3a8cc44 0342ce4 2649124 c634ddd 2649124 c634ddd c885d38 2649124 c634ddd e3ac915 c634ddd e3ac915 56e14a4 e3ac915 c634ddd 29ff717 15af633 c634ddd ecf6658 15af633 3f815a2 2649124 c634ddd 2649124 c885d38 e1f0dec c634ddd 2649124 c634ddd 2649124 c634ddd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 |
from openai import OpenAI
import json_repair
from transformers import AutoTokenizer
from openai import RateLimitError
import time
from prompts import *
import re
class ChatbotSimulation:
def __init__(self, app_name, site_map, page_details, user_state, system_data, user_data, task, solution,
log_location, openai_api_key, agent='human',
max_steps=50, max_tokens=8192, buffer_tokens=500):
self.app_name = app_name
self.sitemap = site_map
self.page_details = page_details
self.user_state = user_state
self.user_state['current_page'] = 'Home' # Initialize current page
self.user_state['task_completed'] = 'False'
self.user_state['back'] = 'False'
self.system_data = system_data
self.user_data = user_data
self.task = task
self.solution = solution
self.log_location = log_location
self.agent = agent.lower()
if self.agent not in ['human', 'llm']:
raise ValueError("Invalid agent type. Expected 'Human' or 'llm'.")
self.max_steps = max_steps
self.max_tokens = max_tokens
self.buffer_tokens = buffer_tokens
self.conversation = [] # Stores recent conversation snippets
self.prompt_count = 0
self.client = OpenAI(api_key=openai_api_key)
self.actions = []
self.tokenizer = AutoTokenizer.from_pretrained("gpt2", clean_up_tokenization_spaces=True)
# back button
self.page_history = ['Home']
def _get_page_uid(self, page_name):
"""Retrieve the UID of the given page from the sitemap."""
return self.sitemap.get(page_name, {}).get('uid')
def _get_page_details(self, page_name):
"""Retrieve the page details using its UID."""
uid = self._get_page_uid(page_name)
return self.page_details.get(uid, {})
def _generate_system_prompt(self):
"""Create a dynamic system prompt based on the current state."""
#current_page = self.user_state['current_page']
#last_page = self.user_state['last_page']
current_page = self.page_history[-1] if len(self.page_history) >= 1 else "Home"
last_page = self.page_history[-2] if len(self.page_history) > 1 else "Home"
page_info = self._get_page_details(current_page)
return get_system_prompt(app_name=self.app_name,
system_data=self.system_data,
task=self.task,
user_data=self.user_data,
current_page=current_page,
last_page=last_page,
actions=self.actions,
user_state=self.user_state,
page_info=page_info
)
def _get_openai_response(self, prompt):
"""Fetch response from OpenAI API."""
self._trim_conversation()
while True:
try:
response = self.client.chat.completions.create(
model="gpt-4",
messages=prompt,
max_tokens=self.buffer_tokens, # Adjusted max_tokens if needed
temperature=0.7,
)
return response.choices[0].message.content
except RateLimitError as e:
# Parse the suggested retry time from the error message, default to 5s if not available
wait_time = 5
try:
# Attempt to get the time from the error message
wait_time = float(e.response['error']['message'].split("in ")[1].split("s")[0])
except (KeyError, IndexError, ValueError):
print("Could not parse wait time from error message. Defaulting to 5 seconds.")
print(f"Rate limit reached. Retrying in {wait_time} seconds...")
time.sleep(wait_time)
def _calculate_token_count(self, conversation):
"""Accurately calculate the token count in the conversation using a tokenizer."""
total_tokens = 0
for entry in conversation:
# Tokenize each entry content and count tokens
tokens = self.tokenizer.encode(entry['content'], truncation=False, add_special_tokens=False)
total_tokens += len(tokens)
return total_tokens
def _trim_conversation(self):
"""Trim the conversation to keep it within the token limit."""
while self._calculate_token_count(self.conversation) >= (self.max_tokens - self.buffer_tokens * 2):
self.conversation.pop(0)
def one_conversation_round(self, user_input):
"""Conduct one round of conversation between the user and the assistant."""
# User provides input
valid_input = self._is_valid_input(user_input)
if valid_input[0]:
pass
else:
return f"\n{self.app_name}: Invalid input. {valid_input[1]}"
self.actions.append(user_input + f'on {self.user_state["current_page"]} page')
self.conversation.append({"role": "user", "content": user_input})
self.prompt_count += 1
# Update user state using GPT's response
current_page = self.page_history[-1] if len(self.page_history) >= 1 else "Home"
update_prompt = get_user_state_update_prompt(user_input=user_input,
current_page=current_page,
task=self.task,
solution=self.solution,
user_state=self.user_state,
sitemap=self.sitemap)
self.conversation.append({"role": "assistant", "content": update_prompt})
updated_state = self._get_openai_response(self.conversation).split("UPDATED", 1)[1].strip()
self.conversation.pop(-1) ## update prompt don't have to stay in conversation history
# Parse and update the user state
updated_state = json_repair.loads(updated_state)
# format forcing of updated state
required_keys = {'current_page', 'task_completed', 'back'}
# Ensure `updated_state` is a dictionary
while not isinstance(updated_state, dict):
transform_prompt = f"""
Transform {updated_state} to a properly formatted JSON file.
Example Output Format:
{{
'current_page': 'Home',
'task_completed': False,
'back': False
}}
"""
updated_state = self._get_openai_response([{"role": "system", "content": transform_prompt}])
updated_state = json_repair.loads(updated_state)
# Manually add missing required keys
for key in required_keys:
if key not in updated_state:
if key == 'current_page':
updated_state[key] = self.page_history[-1] if len(self.page_history) >= 1 else "Home"
else:
updated_state[key] = False
try:
if str(updated_state['task_completed']).lower() == 'true':
return f"Task completed! You took {self.prompt_count} steps."
except:
updated_state['task_completed'] = 'False'
self.user_state = updated_state
if str(updated_state['back']).lower() == 'false':
self.page_history.append(updated_state['current_page'])
elif self.page_history:
self.page_history.pop()
## no need to store old system prompt while we get a new one
self.conversation = [entry for entry in self.conversation if entry["role"] != "system"]
system_prompt = self._generate_system_prompt()
# GPT generates the page instructions
self.conversation.append({"role": "system", "content": system_prompt})
gpt_instruction = self._get_openai_response(self.conversation)
self.conversation.append({"role": "assistant", "content": gpt_instruction})
return gpt_instruction
def start_conversation(self):
greeting = f'\nWelcome to {self.app_name} simulator! Your task is: {self.task}. \n'
system_prompt = self._generate_system_prompt()
# GPT generates the page instructions
self.conversation.append({"role": "system", "content": system_prompt})
gpt_instruction = self._get_openai_response(self.conversation)
self.conversation.append({"role": "assistant", "content": gpt_instruction})
return greeting + gpt_instruction
def _extract_buttons(self):
"""Extract buttons and their action types from the latest conversation if role is 'assistant'."""
# Get the last message
last_message = self.conversation[-1]
# Ensure the role of the last message is 'assistant'
if last_message.get("role") != "assistant":
return {}
# Extract the content of the last message
message_content = last_message.get("content", "")
# Make the split case-insensitive by searching for the phrase with re.IGNORECASE
options_split = re.split(r"you have the following options:", message_content, flags=re.IGNORECASE)
# If the split doesn't produce at least two parts, return an empty dictionary
if len(options_split) < 2:
return {}
# Extract button definitions from the second part of the split content
button_section = options_split[1]
pattern = r"\d+\.\s+(.*?):\s+([a-zA-Z_]+)"
buttons = re.findall(pattern, button_section)
# Construct the dictionary with button names as keys and action types as values
return {name.strip().lower(): action_type.strip().lower() for name, action_type in buttons}
def _is_valid_input(self, user_input):
"""Validate user input format."""
valid_buttons = self._extract_buttons()
# Validate input format
pattern = r"^(?P<action_type>\w+)\((?P<button_name>[^,]+)(?:,\s*(?P<query>.+))?\)$"
match = re.match(pattern, user_input)
if not match:
return [False, "Your input doesn't match the format: action_type(button name), OR if type, use type(button name, query)"]
# Extract parsed components
action_type = match.group("action_type").lower()
button_name = match.group("button_name").strip().lower()
query = match.group("query") # Optional query for `type`
# Validate button name and action type
if button_name not in valid_buttons:
return [False,
"Invalid Button name! Recall: Each button is in the format: `number. button name: action_type`"] # Button name must match exactly (case insensitive)
if action_type != valid_buttons[button_name]:
return [False,
"Invalid action type! Recall: Each button is in the format: `number. button name: action_type`"] # Action type must match the button's specified type
if action_type == "type" and query is None:
return [False,
"Missing Query for action type 'type'! Recall: use the format: `type(button name, query)`"] # `type` action requires a query
if action_type != "type" and query is not None:
return [False,
"Non-`type` action_type cannot take query!"] # Non-`type` actions must not have a query
return [True, 'Pass']
|