Spaces:
Sleeping
Sleeping
File size: 3,680 Bytes
d8d14f1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
import json
import re
from typing import Any, Dict, List, Optional
from swarms.structs.agent import Agent
# Helper functions for manager/corporate agents
def parse_tasks(
task: str = None,
) -> Dict[str, Any]:
"""Parse tasks
Args:
task (str, optional): _description_. Defaults to None.
Returns:
Dict[str, Any]: _description_
"""
tasks = {}
for line in task.split("\n"):
if line.startswith("<agent_id>") and line.endwith(
"</agent_id>"
):
agent_id, task = line[10:-11].split("><")
tasks[agent_id] = task
return tasks
def find_agent_by_id(
agent_id: str = None,
agents: List[Agent] = None,
task: str = None,
*args,
**kwargs,
) -> Agent:
"""Find agent by id
Args:
agent_id (str, optional): _description_. Defaults to None.
agents (List[Agent], optional): _description_. Defaults to None.
Returns:
Agent: _description_
"""
for agent in agents:
if agent.id == agent_id:
if task:
return agent.run(task, *args, **kwargs)
else:
return agent
return None
def distribute_tasks(
task: str = None, agents: List[Agent] = None, *args, **kwargs
):
"""Distribute tasks to agents
Args:
task (str, optional): _description_. Defaults to None.
agents (List[Agent], optional): _description_. Defaults to None.
"""
# Parse the task to extract tasks and agent id
tasks = parse_tasks(task)
# Distribute tasks to agents
for agent_id, task in tasks.item():
assigned_agent = find_agent_by_id(agent_id, agents)
if assigned_agent:
print(f"Assigning task {task} to agent {agent_id}")
output = assigned_agent.run(task, *args, **kwargs)
print(f"Output from agent {agent_id}: {output}")
else:
print(
f"No agent found with ID {agent_id}. Task '{task}' is"
" not assigned."
)
def find_token_in_text(text: str, token: str = "<DONE>") -> bool:
"""
Parse a block of text for a specific token.
Args:
text (str): The text to parse.
token (str): The token to find.
Returns:
bool: True if the token is found in the text, False otherwise.
"""
# Check if the token is in the text
if token in text:
return True
else:
return False
def extract_key_from_json(
json_response: str, key: str
) -> Optional[str]:
"""
Extract a specific key from a JSON response.
Args:
json_response (str): The JSON response to parse.
key (str): The key to extract.
Returns:
Optional[str]: The value of the key if it exists, None otherwise.
"""
response_dict = json.loads(json_response)
return response_dict.get(key)
def extract_tokens_from_text(
text: str, tokens: List[str]
) -> List[str]:
"""
Extract a list of tokens from a text response.
Args:
text (str): The text to parse.
tokens (List[str]): The tokens to extract.
Returns:
List[str]: The tokens that were found in the text.
"""
return [token for token in tokens if token in text]
def detect_markdown(text: str) -> bool:
"""
Checks if a string contains Markdown code enclosed in six backticks.
Parameters
----------
text : str
The text to check.
Returns
-------
bool
True if the text contains Markdown code enclosed in six backticks, False otherwise.
"""
pattern = r"``````[\s\S]*?``````"
return bool(re.search(pattern, text))
|