import os import subprocess import random from huggingface_hub import InferenceClient import gradio as gr from safe_search import safe_search # You need to implement this from i_search import google from i_search import i_search as i_s from prompts import ( ACTION_PROMPT, ADD_PROMPT, COMPRESS_HISTORY_PROMPT, LOG_PROMPT, LOG_RESPONSE, MODIFY_PROMPT, PREFIX, SEARCH_QUERY, READ_PROMPT, TASK_PROMPT, UNDERSTAND_TEST_RESULTS_PROMPT, WEB_DEV, AI_SYSTEM_PROMPT, PYTHON_CODE_DEV, ) from utils import parse_action, parse_file_content, read_python_module_structure from datetime import datetime now = datetime.now() date_time_str = now.strftime("%Y-%m-%d %H:%M:%S") client = InferenceClient( "mistralai/Mixtral-8x7B-Instruct-v0.1" ) ############################################ VERBOSE = True MAX_HISTORY = 100 # Default values for inputs DEFAULT_AGENT = "WEB_DEV" DEFAULT_SYSTEM_PROMPT = "" DEFAULT_TEMPERATURE = 0.9 DEFAULT_MAX_TOKENS = 256 DEFAULT_TOP_P = 0.95 DEFAULT_REPETITION_PENALTY = 1.0 def format_prompt(message, history, system_prompt): prompt = "" prompt += f"[INST] {system_prompt}, {message} [/INST]" for user_prompt, bot_response in history: prompt += f"[INST] {user_prompt} [/INST]" prompt += f" {bot_response} " return prompt def run_gpt( prompt_template, stop_tokens, max_tokens, purpose, temperature, top_p, repetition_penalty, system_prompt, **prompt_kwargs, ): seed = random.randint(1, 1111111111111111) generate_kwargs = dict( temperature=temperature, max_new_tokens=max_tokens, top_p=top_p, repetition_penalty=repetition_penalty, do_sample=True, seed=seed, ) content = PREFIX.format( date_time_str=date_time_str, purpose=purpose, safe_search=safe_search, ) + prompt_template.format(**prompt_kwargs) if VERBOSE: print(LOG_PROMPT.format(content)) formatted_prompt = format_prompt(content, prompt_kwargs.get("history", []), system_prompt) stream = client.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=False) resp = "" for response in stream: resp += response.token.text if VERBOSE: print(LOG_RESPONSE.format(resp)) return resp def compress_history(purpose, task, history, directory): resp = run_gpt( COMPRESS_HISTORY_PROMPT, stop_tokens=["observation:", "task:", "action:", "thought:"], max_tokens=512, purpose=purpose, task=task, history=history, temperature=DEFAULT_TEMPERATURE, top_p=DEFAULT_TOP_P, repetition_penalty=DEFAULT_REPETITION_PENALTY, system_prompt=DEFAULT_SYSTEM_PROMPT, ) history = "observation: {}\n".format(resp) return history def call_search(purpose, task, history, directory, action_input): print("CALLING SEARCH") try: if "http" in action_input: if "<" in action_input: action_input = action_input.strip("<") if ">" in action_input: action_input = action_input.strip(">") response = i_s(action_input) print(response) history += "observation: search result is: {}\n".format(response) else: history += "observation: I need to provide a valid URL to 'action: SEARCH action_input=https://URL'\n" except Exception as e: history += "observation: {}'\n".format(e) return "MAIN", None, history, task def call_main(purpose, task, history, directory, action_input): resp = run_gpt( ACTION_PROMPT, stop_tokens=["observation:", "task:", "action:", "thought:"], max_tokens=2096, purpose=purpose, task=task, history=history, temperature=DEFAULT_TEMPERATURE, top_p=DEFAULT_TOP_P, repetition_penalty=DEFAULT_REPETITION_PENALTY, system_prompt=DEFAULT_SYSTEM_PROMPT, ) lines = resp.strip().strip("\n").split("\n") for line in lines: if line == "": continue if line.startswith("thought: "): history += "{}\n".format(line) elif line.startswith("action: "): action_name, action_input = parse_action(line) print(f"ACTION_NAME :: {action_name}") print(f"ACTION_INPUT :: {action_input}") history += "{}\n".format(line) if "COMPLETE" in action_name or "COMPLETE" in action_input: task = "END" return action_name, action_input, history, task else: return action_name, action_input, history, task else: history += "{}\n".format(line) return "MAIN", None, history, task def call_set_task(purpose, task, history, directory, action_input): task = run_gpt( TASK_PROMPT, stop_tokens=[], max_tokens=64, purpose=purpose, task=task, history=history, temperature=DEFAULT_TEMPERATURE, top_p=DEFAULT_TOP_P, repetition_penalty=DEFAULT_REPETITION_PENALTY, system_prompt=DEFAULT_SYSTEM_PROMPT, ).strip("\n") history += "observation: task has been updated to: {}\n".format(task) return "MAIN", None, history, task def end_fn(purpose, task, history, directory, action_input): task = "END" return "COMPLETE", "COMPLETE", history, task NAME_TO_FUNC = { "MAIN": call_main, "UPDATE-TASK": call_set_task, "SEARCH": call_search, "COMPLETE": end_fn, } def run_action(purpose, task, history, directory, action_name, action_input): print(f"action_name::{action_name}") try: if "RESPONSE" in action_name or "COMPLETE" in action_name: action_name = "COMPLETE" task = "END" return action_name, "COMPLETE", history, task # compress the history when it is long if len(history.split("\n")) > MAX_HISTORY: if VERBOSE: print("COMPRESSING HISTORY") history = compress_history(purpose, task, history, directory) if not action_name in NAME_TO_FUNC: action_name = "MAIN" if action_name == "" or action_name == None: action_name = "MAIN" assert action_name in NAME_TO_FUNC print("RUN: ", action_name, action_input) return NAME_TO_FUNC[action_name](purpose, task, history, directory, action_input) except Exception as e: history += "observation: the previous command did not produce any useful output, I need to check the commands syntax, or use a different command\n" return "MAIN", None, history, task def run(purpose, history, agent_name=DEFAULT_AGENT, system_prompt=DEFAULT_SYSTEM_PROMPT, temperature=DEFAULT_TEMPERATURE, max_tokens=DEFAULT_MAX_TOKENS, top_p=DEFAULT_TOP_P, repetition_penalty=DEFAULT_REPETITION_PENALTY): task = None directory = "./" if history: history = str(history).strip("[]") if not history: history = "" action_name = "UPDATE-TASK" if task is None else "MAIN" action_input = None while True: print("") print("") print("---") print("purpose:", purpose) print("task:", task) print("---") print(history) print("---") action_name, action_input, history, task = run_action( purpose, task, history, directory, action_name, action_input, ) yield (history) if task == "END": return (history) def process_input(user_input, history, chatbot, agent_name, system_prompt, temperature, max_tokens, top_p, repetition_penalty): """Processes user input and updates the chatbot.""" purpose = "General" history = history + [(user_input, "")] chatbot.append(user_input, "") for response in run( purpose, history, agent_name=agent_name, system_prompt=system_prompt, temperature=temperature, max_tokens=max_tokens, top_p=top_p, repetition_penalty=repetition_penalty, ): chatbot.append("", response) yield chatbot with gr.Blocks() as iface: with gr.Row(): chatbot = gr.Chatbot(show_label=False, show_share_button=False, show_copy_button=True, layout="panel") with gr.Column(): msg = gr.Textbox(label="Enter your message") with gr.Row(): submit_b = gr.Button("Submit") clear = gr.ClearButton([msg, chatbot]) # Input fields for configuration with gr.Column(): agent_dropdown = gr.Dropdown( label="Agent", choices=[s for s in ["WEB_DEV", "AI_SYSTEM_PROMPT", "PYTHON_CODE_DEV"]], value=DEFAULT_AGENT, interactive=True, ) system_prompt_textbox = gr.Textbox( label="System Prompt", value=DEFAULT_SYSTEM_PROMPT, interactive=True, ) temperature_slider = gr.Slider( label="Temperature", value=DEFAULT_TEMPERATURE, minimum=0.0, maximum=1.0, step=0.05, interactive=True, info="Higher values produce more diverse outputs", ) max_tokens_slider = gr.Slider( label="Max new tokens", value=DEFAULT_MAX_TOKENS, minimum=0, maximum=1048 * 10, step=64, interactive=True, info="The maximum numbers of new tokens", ) top_p_slider = gr.Slider( label="Top-p (nucleus sampling)", value=DEFAULT_TOP_P, minimum=0.0, maximum=1, step=0.05, interactive=True, info="Higher values sample more low-probability tokens", ) repetition_penalty_slider = gr.Slider( label="Repetition penalty", value=DEFAULT_REPETITION_PENALTY, minimum=1.0, maximum=2.0, step=0.05, interactive=True, info="Penalize repeated tokens", ) # Connect input fields to the processing function submit_b.click( process_input, [msg, chatbot, chatbot, agent_dropdown, system_prompt_textbox, temperature_slider, max_tokens_slider, top_p_slider, repetition_penalty_slider], chatbot, ) msg.submit( process_input, [msg, chatbot, chatbot, agent_dropdown, system_prompt_textbox, temperature_slider, max_tokens_slider, top_p_slider, repetition_penalty_slider], chatbot, ) iface.launch()