Spaces:
Runtime error
Runtime error
import sys | |
sys.path.append("../../agents") | |
import os | |
from gradio_base import WebUI, UIHelper, PORT, HOST, Client | |
from gradio_config import GradioConfig as gc | |
from typing import List, Tuple, Any | |
import gradio as gr | |
from Agent import Agent | |
import time | |
import json | |
from utils import cos_sim | |
from design_states import get_desgin_states, get_cot_result, gen_coder_task | |
from gen_utils import * | |
import openai | |
import torch | |
def get_embedding(sentence,api_key): | |
openai.api_key = api_key | |
embedding_model = openai.Embedding | |
embed = embedding_model.create( | |
model="text-embedding-ada-002", | |
input=sentence | |
) | |
embed = embed["data"][0]["embedding"] | |
embed = torch.tensor(embed,dtype=torch.float32) | |
if len(embed.shape)==1: | |
embed = embed.unsqueeze(0) | |
return embed | |
class CodeUI(WebUI): | |
def render_and_register_ui(self): | |
self.agent_name:list = [self.cache["agents_name"]] if isinstance(self.cache["agents_name"], str) else self.cache['agents_name'] | |
gc.add_agent(self.agent_name) | |
def __init__( | |
self, | |
client_cmd: list, | |
socket_host: str = HOST, | |
socket_port: int = PORT, | |
bufsize: int = 1024, | |
ui_name: str = "CodeUI" | |
): | |
super(CodeUI, self).__init__(client_cmd, socket_host, socket_port, bufsize, ui_name) | |
self.first_recieve_from_client() | |
self.data_history = list() | |
self.caller = 0 | |
def load_sop_fn(self,sop): | |
return sop.name | |
def generate_sop_stage_1(self,api_key,proxy,target): | |
os.environ["API_KEY"] = api_key | |
# os.environ["PROXY"] = proxy | |
self.software = "You are a software,aim to write a snake game with python" | |
self.debate = "Simulate a debate competition" | |
self.ecological_environment = "Simulate the interactions and competition among different organisms within an ecosystem" | |
self.software = get_embedding(self.software,api_key) | |
self.debate = get_embedding(self.debate,api_key) | |
self.ecological_environment = get_embedding(self.ecological_environment,api_key) | |
self.embeddings = torch.cat([self.software,self.debate,self.ecological_environment],dim = 0) | |
self.SOP["config"]["API_KEY"] = api_key | |
# self.SOP["config"]["PROXY"] = proxy | |
target_tensor = get_embedding(target,api_key) | |
sim_scores = cos_sim(target_tensor, self.embeddings)[0] | |
top_k_score, top_k_idx = torch.topk(sim_scores,k = 1) | |
if top_k_score > 0.7: | |
self.index = top_k_idx | |
else: | |
self.index = 0 | |
target_processed = get_cot_result(target) | |
print("finished!!!!") | |
return target_processed,self.target_finish_flag.update(visible = True) | |
def generate_sop_stage_2(self,target): | |
design_states = get_desgin_states(target,self.index) | |
root = design_states[0]["state_name"] | |
self.SOP["root"] = root | |
return design_states,self.state_finish_flag.update(visible = True) | |
def generate_sop_stage_3(self,design_states): | |
agents = get_agents(design_states,self.index) | |
relations = get_relations(design_states) | |
self.SOP["relations"] = relations | |
self.SOP["agents"] = agents | |
return agents, self.agent_relation_finish_flag.update(visible = True),self.reminder.update(visible = True) | |
def generate_sop_stage_4(self,agents, need_coder,design_states): | |
states = gen_states(design_states,self.index) | |
if "Coder" in need_coder: | |
agents["coder"] = {"style": "professional", "roles": {}} | |
for state_name, state in states.items(): | |
if state_name != "end_state": | |
agents["coder"]["roles"][state_name] = "coder" | |
state["roles"].append("coder") | |
task = gen_coder_task(state["environment_prompt"]) | |
now_coder = self.coder.copy() | |
now_coder["task"]["task"] = task | |
state["agent_states"]["coder"] = now_coder | |
state["controller"]["max_chat_nums"] = str( | |
int(state["controller"]["max_chat_nums"])+2) | |
for name, agent in state["agent_states"].items(): | |
if name != "coder": | |
agent["rule"]["rule"] += "\nEvaluate the code of the coder and provide feedback and response as concise as possible.It is best not to exceed 100 words" | |
agent["task"]["task"] += "\nEvaluate the code of the coder and provide feedback." | |
self.SOP["states"] = states | |
# 将字典写入JSON文件 | |
file_name = 'generated_sop.json' | |
with open(file_name, "w") as json_file: | |
json.dump(self.SOP, json_file ,indent=4,ensure_ascii=False) | |
return file_name | |
def construct_ui(self): | |
with gr.Blocks(css=gc.CSS) as demo: | |
with gr.Tab(label="SOP generation") as tab1: | |
self.coder = { | |
"task": { | |
"task": "" | |
}, | |
"rule": {"rule": "1.write code that conforms to standards like PEP8, is modular, easy to read, and maintainable.\n 2.The output strictly follows the following format:<title>{the file name}</title>\n<python>{the target code}</python>\n3.Please carefully modify the code based on feedback from others.\n4.Output the code only."}, | |
"last": { | |
"last_prompt": "The output strictly follows the following format:<title>{the file name}</title>\n<python>{the target code}</python>,Output the code only." | |
} | |
} | |
self.SOP = { | |
"config": { | |
"API_KEY": "sk-*********", | |
"MAX_CHAT_HISTORY": "5", | |
"User_Names": '["User"]', | |
}, | |
"root": "state1", | |
"relations": { | |
"state1": {"0": "state1", "1": "state2"}, | |
"state2": {"0": "state2", "1": "end_state"}, | |
}, | |
"agents": None, | |
"states": None, | |
} | |
gr.Markdown("""# Generate Agent""") | |
with gr.Row(): | |
api_key = gr.Textbox(label="api_key") | |
proxy = gr.Textbox(label="proxy",visible=False) | |
with gr.Row(): | |
requirement = gr.Textbox(value ="A software company aim to write a mine sweeping game",label="requirement") | |
with gr.Row(): | |
need_coder = gr.CheckboxGroup(["Coder"],label="Please check this option if your multi-agent system aims to produce code.") | |
with gr.Row(): | |
self.target_finish_flag = gr.Label(value = "The process of completing requirement handling is finished.",visible=False) | |
with gr.Row(): | |
self.state_finish_flag = gr.Label(value = "The process of determining the state is completed.",visible=False) | |
with gr.Row(): | |
self.agent_relation_finish_flag = gr.Label(value = "The process of initializing the agent and relation is completed.",visible=False) | |
with gr.Row(): | |
self.reminder = gr.Markdown("""Generating SOP...""",visible=False) | |
generated_sop = gr.File(label="generated_file") | |
generate_button = gr.Button(label="Generate") | |
target_processed = gr.State() | |
design_states = gr.State() | |
agents = gr.State() | |
generate_button.click(self.generate_sop_stage_1,[api_key,proxy,requirement],[target_processed,self.target_finish_flag]).then( | |
self.generate_sop_stage_2, [target_processed], [design_states,self.state_finish_flag]).then( | |
self.generate_sop_stage_3, [design_states], [agents,self.agent_relation_finish_flag,self.reminder]).then( | |
self.generate_sop_stage_4, [agents, need_coder,design_states], [generated_sop]) | |
with gr.Tab(label="Chat") as tab2: | |
uploaded_sop = gr.State() | |
with gr.Row(): | |
sop = gr.File(label="upload your custmized SOP") | |
load_sop_btn = gr.Button(value="Load SOP") | |
load_sop_btn.click(self.load_sop_fn, sop,uploaded_sop) | |
with gr.Row(): | |
with gr.Column(): | |
self.text_api = gr.Textbox( | |
value = self.cache["api_key"], | |
placeholder="openai key", | |
label="Please input valid openai key for gpt-3.5-turbo-16k." | |
) | |
self.radio_mode = gr.Radio( | |
[Client.SINGLE_MODE], | |
value=Client.SINGLE_MODE, | |
interactive=True, | |
label = Client.MODE_LABEL, | |
info = Client.MODE_INFO | |
) | |
self.chatbot = gr.Chatbot( | |
elem_id="chatbot1" | |
) | |
self.btn_next = gr.Button( | |
value="Next Agent", | |
visible=False, elem_id="btn" | |
) | |
with gr.Row(): | |
self.text_requirement = gr.Textbox( | |
value=self.cache['requirement'], | |
placeholder="Please enter your content", | |
scale=9, | |
) | |
self.btn_start = gr.Button( | |
value="Start!", | |
scale=1 | |
) | |
self.btn_reset = gr.Button( | |
value="Restart", | |
visible=False | |
) | |
with gr.Column(): | |
self.file = gr.File(visible=False) | |
self.chat_code_show = gr.Chatbot( | |
elem_id="chatbot1", | |
visible=False | |
) | |
self.btn_start.click( | |
fn=self.btn_send_when_click, | |
inputs=[self.chatbot, self.text_requirement, self.radio_mode, self.text_api,uploaded_sop], | |
outputs=[self.chatbot, self.btn_start, self.text_requirement, self.btn_reset] | |
).then( | |
fn=self.btn_send_after_click, | |
inputs=[self.file, self.chatbot, self.chat_code_show, self.btn_start, self.btn_reset, self.text_requirement], | |
outputs=[self.file, self.chatbot, self.chat_code_show, self.btn_start, self.btn_reset, self.text_requirement, self.btn_next] | |
) | |
self.text_requirement.submit( | |
fn=self.btn_send_when_click, | |
inputs=[self.chatbot, self.text_requirement, self.text_api,uploaded_sop], | |
outputs=[self.chatbot, self.btn_start, self.text_requirement, self.btn_reset] | |
).then( | |
fn=self.btn_send_after_click, | |
inputs=[self.file, self.chatbot, self.chat_code_show, self.btn_start, self.btn_reset, self.text_requirement], | |
outputs=[self.file, self.chatbot, self.chat_code_show, self.btn_start, self.btn_reset, self.text_requirement, self.btn_next] | |
) | |
self.btn_reset.click( | |
fn=self.btn_reset_when_click, | |
inputs=[], | |
outputs=[self.file, self.chatbot, self.chat_code_show, self.btn_start, self.btn_reset, self.text_requirement, self.btn_next] | |
).then( | |
fn=self.btn_reset_after_click, | |
inputs=[self.file, self.chatbot, self.chat_code_show, self.btn_start, self.btn_reset, self.text_requirement], | |
outputs=[self.file, self.chatbot, self.chat_code_show, self.btn_start, self.btn_reset, self.text_requirement, self.btn_next] | |
) | |
self.file.select( | |
fn=self.file_when_select, | |
inputs=[self.file], | |
outputs=[self.chat_code_show] | |
) | |
self.btn_next.click( | |
fn = self.btn_next_when_click, | |
inputs=[], | |
outputs=[self.btn_next] | |
).then( | |
fn=self.btn_send_after_click, | |
inputs=[self.file, self.chatbot, self.chat_code_show, self.btn_start, self.btn_reset, self.text_requirement], | |
outputs=[self.file, self.chatbot, self.chat_code_show, self.btn_start, self.btn_reset, self.text_requirement, self.btn_next] | |
) | |
self.demo = demo | |
def handle_message(self, history:list, state, agent_name, token, node_name): | |
if state % 10 == 0: | |
self.data_history.append({agent_name: token}) | |
elif state % 10 == 1: | |
# Same state. Need to add new bubble in same bubble. | |
self.data_history[-1][agent_name] += token | |
elif state % 10 == 2: | |
# New state. Need to add new bubble. | |
history.append([None, ""]) | |
self.data_history.clear() | |
self.data_history.append({agent_name: token}) | |
else: | |
assert False, "Invalid state." | |
render_data = self.render_bubble(history, self.data_history, node_name, render_node_name=True) | |
return render_data | |
def btn_send_when_click(self, chatbot, text_requirement, mode, api, sop): | |
""" | |
inputs=[self.chatbot, self.text_requirement, radio, text_api], | |
outputs=[self.chatbot, self.btn_start, self.text_requirement, self.btn_reset] | |
""" | |
chatbot = [[UIHelper.wrap_css(content=text_requirement, name="User"), None]] | |
yield chatbot,\ | |
gr.Button.update(visible=True, interactive=False, value="Running"),\ | |
gr.Textbox.update(visible=True, interactive=False, value=""),\ | |
gr.Button.update(visible=False, interactive=False) | |
self.send_start_cmd({'requirement': text_requirement, "mode": mode, "api_key": api ,"uploaded_sop": sop}) | |
agents,roles_to_names,names_to_roles = Agent.from_config(str(sop)) | |
agents_name = [] | |
for i in names_to_roles : | |
for j in names_to_roles[i]: | |
agents_name.append(j+"("+names_to_roles[i][j]+")") | |
self.new_render_and_register_ui(agents_name) | |
return | |
def new_render_and_register_ui(self,agent_names): | |
gc.add_agent(agent_names, 0) | |
def btn_send_after_click( | |
self, | |
file, | |
history, | |
show_code, | |
btn_send, | |
btn_reset, | |
text_requirement | |
): | |
""" | |
outputs=[self.file, self.chatbot, self.chat_code_show, self.btn_start, self.btn_reset, self.text_requirement, self.btn_next] | |
""" | |
if self.caller == 0: | |
self.data_history = list() | |
self.caller = 0 | |
receive_server = self.receive_server | |
while True: | |
data_list: List = receive_server.send(None) | |
for item in data_list: | |
data = eval(item) | |
assert isinstance(data, list) | |
state, agent_name, token, node_name = data | |
assert isinstance(state, int) | |
assert state in [10, 11, 12, 99, 98] | |
if state == 99: | |
# finish | |
fs = [self.cache['pwd']+'/output_code/'+_ for _ in os.listdir(self.cache['pwd']+'/output_code')] | |
yield gr.File.update(value=fs, visible=True, interactive=True),\ | |
history, \ | |
gr.Chatbot.update(visible=True),\ | |
gr.Button.update(visible=True, interactive=True, value="Start"),\ | |
gr.Button.update(visible=True, interactive=True),\ | |
gr.Textbox.update(visible=True, interactive=True, placeholder="Please input your requirement", value=""),\ | |
gr.Button.update(visible=False) | |
return | |
elif state == 98: | |
yield gr.File.update(visible=False),\ | |
history, \ | |
gr.Chatbot.update(visible=False),\ | |
gr.Button.update(visible=True, interactive=False),\ | |
gr.Button.update(visible=True, interactive=True),\ | |
gr.Textbox.update(visible=True, interactive=False),\ | |
gr.Button.update(visible=True, value=f"Next Agent: 🤖{agent_name} | Next Node: ⭕{node_name}") | |
return | |
history = self.handle_message(history, state, agent_name, token, node_name) | |
yield gr.File.update(visible=False),\ | |
history, \ | |
gr.Chatbot.update(visible=False),\ | |
gr.Button.update(visible=True, interactive=False),\ | |
gr.Button.update(visible=False, interactive=False),\ | |
gr.Textbox.update(visible=True, interactive=False),\ | |
gr.Button.update(visible=False) | |
def btn_reset_when_click(self): | |
""" | |
inputs = [] | |
outputs = [self.file, self.chatbot, self.chat_code_show, self.btn_start, self.btn_reset, self.text_requirement, self.btn_next] | |
""" | |
return gr.File.update(visible=False),\ | |
None, None, gr.Button.update(value="Restarting...", interactive=False),\ | |
gr.Button.update(value="Restarting...", interactive=False),\ | |
gr.Textbox.update(value="Restarting", interactive=False),\ | |
gr.Button.update(visible=False) | |
def btn_reset_after_click( | |
self, | |
file, | |
chatbot, | |
show_code, | |
btn_send, | |
btn_reset, | |
text_requirement | |
): | |
self.reset() | |
self.first_recieve_from_client(reset_mode=True) | |
return gr.File.update(value=None, visible=False),\ | |
gr.Chatbot.update(value=None, visible=True),\ | |
gr.Chatbot.update(value=None, visible=False),\ | |
gr.Button.update(value="Start", visible=True, interactive=True),\ | |
gr.Button.update(value="Restart", interactive=False, visible=False),\ | |
gr.Textbox.update(value=self.cache['requirement'], interactive=True, visible=True),\ | |
gr.Button.update(visible=False) | |
def file_when_select(self, file): | |
CODE_PREFIX = "```python\n{}\n```" | |
with open(file.name, "r", encoding='utf-8') as f: | |
contents = f.readlines() | |
codes = "".join(contents) | |
return [[CODE_PREFIX.format(codes),None]] | |
def btn_next_when_click(self): | |
self.caller = 1 # it will remain the value in self.data_history | |
self.send_message("nothing") | |
time.sleep(0.5) | |
yield gr.Button.update(visible=False) | |
return | |
if __name__ == '__main__': | |
ui = CodeUI(client_cmd=["python3","gradio_backend.py"]) | |
ui.construct_ui() | |
ui.run(share=True) | |