Swarms / swarms /structs /agent_registry.py
harshalmore31's picture
Synced repo using 'sync_with_huggingface' Github Action
d8d14f1 verified
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock
from typing import Any, Callable, Dict, List, Optional
from pydantic import BaseModel, Field, ValidationError
from swarms import Agent
from swarms.utils.loguru_logger import logger
class AgentConfigSchema(BaseModel):
uuid: str = Field(
...,
description="The unique identifier for the agent.",
)
name: str = None
description: str = None
time_added: str = Field(
time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()),
description="Time when the agent was added to the registry.",
)
config: Dict[Any, Any] = None
class AgentRegistrySchema(BaseModel):
name: str
description: str
agents: List[AgentConfigSchema]
time_registry_creatd: str = Field(
time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()),
description="Time when the registry was created.",
)
number_of_agents: int = Field(
0,
description="The number of agents in the registry.",
)
class AgentRegistry:
"""
A class for managing a registry of agents.
Attributes:
name (str): The name of the registry.
description (str): A description of the registry.
return_json (bool): Indicates whether to return data in JSON format.
auto_save (bool): Indicates whether to automatically save changes to the registry.
agents (Dict[str, Agent]): A dictionary of agents in the registry, keyed by agent name.
lock (Lock): A lock for thread-safe operations on the registry.
agent_registry (AgentRegistrySchema): The schema for the agent registry.
"""
def __init__(
self,
name: str = "Agent Registry",
description: str = "A registry for managing agents.",
agents: Optional[List[Agent]] = None,
return_json: bool = True,
auto_save: bool = False,
*args,
**kwargs,
):
"""
Initializes the AgentRegistry.
Args:
name (str, optional): The name of the registry. Defaults to "Agent Registry".
description (str, optional): A description of the registry. Defaults to "A registry for managing agents.".
agents (Optional[List[Agent]], optional): A list of agents to initially add to the registry. Defaults to None.
return_json (bool, optional): Indicates whether to return data in JSON format. Defaults to True.
auto_save (bool, optional): Indicates whether to automatically save changes to the registry. Defaults to False.
"""
self.name = name
self.description = description
self.return_json = return_json
self.auto_save = auto_save
self.agents: Dict[str, Agent] = {}
self.lock = Lock()
# Initialize the agent registry
self.agent_registry = AgentRegistrySchema(
name=self.name,
description=self.description,
agents=[],
number_of_agents=len(agents) if agents else 0,
)
if agents:
self.add_many(agents)
def add(self, agent: Agent) -> None:
"""
Adds a new agent to the registry.
Args:
agent (Agent): The agent to add.
Raises:
ValueError: If the agent_name already exists in the registry.
ValidationError: If the input data is invalid.
"""
name = agent.agent_name
self.agent_to_py_model(agent)
with self.lock:
if name in self.agents:
logger.error(
f"Agent with name {name} already exists."
)
raise ValueError(
f"Agent with name {name} already exists."
)
try:
self.agents[name] = agent
logger.info(f"Agent {name} added successfully.")
except ValidationError as e:
logger.error(f"Validation error: {e}")
raise
def add_many(self, agents: List[Agent]) -> None:
"""
Adds multiple agents to the registry.
Args:
agents (List[Agent]): The list of agents to add.
Raises:
ValueError: If any of the agent_names already exist in the registry.
ValidationError: If the input data is invalid.
"""
with ThreadPoolExecutor() as executor:
futures = {
executor.submit(self.add, agent): agent
for agent in agents
}
for future in as_completed(futures):
try:
future.result()
except Exception as e:
logger.error(f"Error adding agent: {e}")
raise
def delete(self, agent_name: str) -> None:
"""
Deletes an agent from the registry.
Args:
agent_name (str): The name of the agent to delete.
Raises:
KeyError: If the agent_name does not exist in the registry.
"""
with self.lock:
try:
del self.agents[agent_name]
logger.info(
f"Agent {agent_name} deleted successfully."
)
except KeyError as e:
logger.error(f"Error: {e}")
raise
def update_agent(self, agent_name: str, new_agent: Agent) -> None:
"""
Updates an existing agent in the registry.
Args:
agent_name (str): The name of the agent to update.
new_agent (Agent): The new agent to replace the existing one.
Raises:
KeyError: If the agent_name does not exist in the registry.
ValidationError: If the input data is invalid.
"""
with self.lock:
if agent_name not in self.agents:
logger.error(
f"Agent with name {agent_name} does not exist."
)
raise KeyError(
f"Agent with name {agent_name} does not exist."
)
try:
self.agents[agent_name] = new_agent
logger.info(
f"Agent {agent_name} updated successfully."
)
except ValidationError as e:
logger.error(f"Validation error: {e}")
raise
def get(self, agent_name: str) -> Agent:
"""
Retrieves an agent from the registry.
Args:
agent_name (str): The name of the agent to retrieve.
Returns:
Agent: The agent associated with the given agent_name.
Raises:
KeyError: If the agent_name does not exist in the registry.
"""
with self.lock:
try:
agent = self.agents[agent_name]
logger.info(
f"Agent {agent_name} retrieved successfully."
)
return agent
except KeyError as e:
logger.error(f"Error: {e}")
raise
def list_agents(self) -> List[str]:
"""
Lists all agent names in the registry.
Returns:
List[str]: A list of all agent names.
"""
try:
with self.lock:
agent_names = list(self.agents.keys())
logger.info("Listing all agents.")
return agent_names
except Exception as e:
logger.error(f"Error: {e}")
raise e
def return_all_agents(self) -> List[Agent]:
"""
Returns all agents from the registry.
Returns:
List[Agent]: A list of all agents.
"""
try:
with self.lock:
agents = list(self.agents.values())
logger.info("Returning all agents.")
return agents
except Exception as e:
logger.error(f"Error: {e}")
raise e
def query(
self, condition: Optional[Callable[[Agent], bool]] = None
) -> List[Agent]:
"""
Queries agents based on a condition.
Args:
condition (Optional[Callable[[Agent], bool]]): A function that takes an agent and returns a boolean indicating
whether the agent meets the condition.
Returns:
List[Agent]: A list of agents that meet the condition.
"""
try:
with self.lock:
if condition is None:
agents = list(self.agents.values())
logger.info("Querying all agents.")
return agents
agents = [
agent
for agent in self.agents.values()
if condition(agent)
]
logger.info("Querying agents with condition.")
return agents
except Exception as e:
logger.error(f"Error: {e}")
raise e
def find_agent_by_name(self, agent_name: str) -> Optional[Agent]:
"""
Find an agent by its name.
Args:
agent_name (str): The name of the agent to find.
Returns:
Agent: The agent with the given name.
"""
try:
with ThreadPoolExecutor() as executor:
futures = {
executor.submit(self.get, agent_name): agent_name
for agent_name in self.agents.keys()
}
for future in as_completed(futures):
agent = future.result()
if agent.agent_name == agent_name:
return agent
except Exception as e:
logger.error(f"Error: {e}")
raise e
def agent_to_py_model(self, agent: Agent):
"""
Converts an agent to a Pydantic model.
Args:
agent (Agent): The agent to convert.
"""
agent_name = agent.agent_name
agent_description = (
agent.description
if agent.description
else "No description provided"
)
schema = AgentConfigSchema(
uuid=agent.id,
name=agent_name,
description=agent_description,
config=agent.to_dict(),
)
logger.info(
f"Agent {agent_name} converted to Pydantic model."
)
self.agent_registry.agents.append(schema)