File size: 3,680 Bytes
d8d14f1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import json
import re
from typing import Any, Dict, List, Optional

from swarms.structs.agent import Agent


# Helper functions for manager/corporate agents
def parse_tasks(
    task: str = None,
) -> Dict[str, Any]:
    """Parse tasks

    Args:
        task (str, optional): _description_. Defaults to None.

    Returns:
        Dict[str, Any]: _description_
    """
    tasks = {}
    for line in task.split("\n"):
        if line.startswith("<agent_id>") and line.endwith(
            "</agent_id>"
        ):
            agent_id, task = line[10:-11].split("><")
            tasks[agent_id] = task
    return tasks


def find_agent_by_id(
    agent_id: str = None,
    agents: List[Agent] = None,
    task: str = None,
    *args,
    **kwargs,
) -> Agent:
    """Find agent by id

    Args:
        agent_id (str, optional): _description_. Defaults to None.
        agents (List[Agent], optional): _description_. Defaults to None.

    Returns:
        Agent: _description_
    """
    for agent in agents:
        if agent.id == agent_id:
            if task:
                return agent.run(task, *args, **kwargs)
            else:
                return agent

    return None


def distribute_tasks(
    task: str = None, agents: List[Agent] = None, *args, **kwargs
):
    """Distribute tasks to agents

    Args:
        task (str, optional): _description_. Defaults to None.
        agents (List[Agent], optional): _description_. Defaults to None.
    """
    # Parse the task to extract tasks and agent id
    tasks = parse_tasks(task)

    # Distribute tasks to agents
    for agent_id, task in tasks.item():
        assigned_agent = find_agent_by_id(agent_id, agents)
        if assigned_agent:
            print(f"Assigning task {task} to agent {agent_id}")
            output = assigned_agent.run(task, *args, **kwargs)
            print(f"Output from agent {agent_id}: {output}")
        else:
            print(
                f"No agent found with ID {agent_id}. Task '{task}' is"
                " not assigned."
            )


def find_token_in_text(text: str, token: str = "<DONE>") -> bool:
    """
    Parse a block of text for a specific token.

    Args:
        text (str): The text to parse.
        token (str): The token to find.

    Returns:
        bool: True if the token is found in the text, False otherwise.
    """
    # Check if the token is in the text
    if token in text:
        return True
    else:
        return False


def extract_key_from_json(
    json_response: str, key: str
) -> Optional[str]:
    """
    Extract a specific key from a JSON response.

    Args:
        json_response (str): The JSON response to parse.
        key (str): The key to extract.

    Returns:
        Optional[str]: The value of the key if it exists, None otherwise.
    """
    response_dict = json.loads(json_response)
    return response_dict.get(key)


def extract_tokens_from_text(
    text: str, tokens: List[str]
) -> List[str]:
    """
    Extract a list of tokens from a text response.

    Args:
        text (str): The text to parse.
        tokens (List[str]): The tokens to extract.

    Returns:
        List[str]: The tokens that were found in the text.
    """
    return [token for token in tokens if token in text]


def detect_markdown(text: str) -> bool:
    """
    Checks if a string contains Markdown code enclosed in six backticks.

    Parameters
    ----------
    text : str
        The text to check.

    Returns
    -------
    bool
        True if the text contains Markdown code enclosed in six backticks, False otherwise.
    """
    pattern = r"``````[\s\S]*?``````"
    return bool(re.search(pattern, text))