import time from concurrent.futures import ThreadPoolExecutor, as_completed from threading import Lock from typing import Any, Callable, Dict, List, Optional from pydantic import BaseModel, Field, ValidationError from swarms import Agent from swarms.utils.loguru_logger import logger class AgentConfigSchema(BaseModel): uuid: str = Field( ..., description="The unique identifier for the agent.", ) name: str = None description: str = None time_added: str = Field( time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()), description="Time when the agent was added to the registry.", ) config: Dict[Any, Any] = None class AgentRegistrySchema(BaseModel): name: str description: str agents: List[AgentConfigSchema] time_registry_creatd: str = Field( time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()), description="Time when the registry was created.", ) number_of_agents: int = Field( 0, description="The number of agents in the registry.", ) class AgentRegistry: """ A class for managing a registry of agents. Attributes: name (str): The name of the registry. description (str): A description of the registry. return_json (bool): Indicates whether to return data in JSON format. auto_save (bool): Indicates whether to automatically save changes to the registry. agents (Dict[str, Agent]): A dictionary of agents in the registry, keyed by agent name. lock (Lock): A lock for thread-safe operations on the registry. agent_registry (AgentRegistrySchema): The schema for the agent registry. """ def __init__( self, name: str = "Agent Registry", description: str = "A registry for managing agents.", agents: Optional[List[Agent]] = None, return_json: bool = True, auto_save: bool = False, *args, **kwargs, ): """ Initializes the AgentRegistry. Args: name (str, optional): The name of the registry. Defaults to "Agent Registry". description (str, optional): A description of the registry. Defaults to "A registry for managing agents.". agents (Optional[List[Agent]], optional): A list of agents to initially add to the registry. Defaults to None. return_json (bool, optional): Indicates whether to return data in JSON format. Defaults to True. auto_save (bool, optional): Indicates whether to automatically save changes to the registry. Defaults to False. """ self.name = name self.description = description self.return_json = return_json self.auto_save = auto_save self.agents: Dict[str, Agent] = {} self.lock = Lock() # Initialize the agent registry self.agent_registry = AgentRegistrySchema( name=self.name, description=self.description, agents=[], number_of_agents=len(agents) if agents else 0, ) if agents: self.add_many(agents) def add(self, agent: Agent) -> None: """ Adds a new agent to the registry. Args: agent (Agent): The agent to add. Raises: ValueError: If the agent_name already exists in the registry. ValidationError: If the input data is invalid. """ name = agent.agent_name self.agent_to_py_model(agent) with self.lock: if name in self.agents: logger.error( f"Agent with name {name} already exists." ) raise ValueError( f"Agent with name {name} already exists." ) try: self.agents[name] = agent logger.info(f"Agent {name} added successfully.") except ValidationError as e: logger.error(f"Validation error: {e}") raise def add_many(self, agents: List[Agent]) -> None: """ Adds multiple agents to the registry. Args: agents (List[Agent]): The list of agents to add. Raises: ValueError: If any of the agent_names already exist in the registry. ValidationError: If the input data is invalid. """ with ThreadPoolExecutor() as executor: futures = { executor.submit(self.add, agent): agent for agent in agents } for future in as_completed(futures): try: future.result() except Exception as e: logger.error(f"Error adding agent: {e}") raise def delete(self, agent_name: str) -> None: """ Deletes an agent from the registry. Args: agent_name (str): The name of the agent to delete. Raises: KeyError: If the agent_name does not exist in the registry. """ with self.lock: try: del self.agents[agent_name] logger.info( f"Agent {agent_name} deleted successfully." ) except KeyError as e: logger.error(f"Error: {e}") raise def update_agent(self, agent_name: str, new_agent: Agent) -> None: """ Updates an existing agent in the registry. Args: agent_name (str): The name of the agent to update. new_agent (Agent): The new agent to replace the existing one. Raises: KeyError: If the agent_name does not exist in the registry. ValidationError: If the input data is invalid. """ with self.lock: if agent_name not in self.agents: logger.error( f"Agent with name {agent_name} does not exist." ) raise KeyError( f"Agent with name {agent_name} does not exist." ) try: self.agents[agent_name] = new_agent logger.info( f"Agent {agent_name} updated successfully." ) except ValidationError as e: logger.error(f"Validation error: {e}") raise def get(self, agent_name: str) -> Agent: """ Retrieves an agent from the registry. Args: agent_name (str): The name of the agent to retrieve. Returns: Agent: The agent associated with the given agent_name. Raises: KeyError: If the agent_name does not exist in the registry. """ with self.lock: try: agent = self.agents[agent_name] logger.info( f"Agent {agent_name} retrieved successfully." ) return agent except KeyError as e: logger.error(f"Error: {e}") raise def list_agents(self) -> List[str]: """ Lists all agent names in the registry. Returns: List[str]: A list of all agent names. """ try: with self.lock: agent_names = list(self.agents.keys()) logger.info("Listing all agents.") return agent_names except Exception as e: logger.error(f"Error: {e}") raise e def return_all_agents(self) -> List[Agent]: """ Returns all agents from the registry. Returns: List[Agent]: A list of all agents. """ try: with self.lock: agents = list(self.agents.values()) logger.info("Returning all agents.") return agents except Exception as e: logger.error(f"Error: {e}") raise e def query( self, condition: Optional[Callable[[Agent], bool]] = None ) -> List[Agent]: """ Queries agents based on a condition. Args: condition (Optional[Callable[[Agent], bool]]): A function that takes an agent and returns a boolean indicating whether the agent meets the condition. Returns: List[Agent]: A list of agents that meet the condition. """ try: with self.lock: if condition is None: agents = list(self.agents.values()) logger.info("Querying all agents.") return agents agents = [ agent for agent in self.agents.values() if condition(agent) ] logger.info("Querying agents with condition.") return agents except Exception as e: logger.error(f"Error: {e}") raise e def find_agent_by_name(self, agent_name: str) -> Optional[Agent]: """ Find an agent by its name. Args: agent_name (str): The name of the agent to find. Returns: Agent: The agent with the given name. """ try: with ThreadPoolExecutor() as executor: futures = { executor.submit(self.get, agent_name): agent_name for agent_name in self.agents.keys() } for future in as_completed(futures): agent = future.result() if agent.agent_name == agent_name: return agent except Exception as e: logger.error(f"Error: {e}") raise e def agent_to_py_model(self, agent: Agent): """ Converts an agent to a Pydantic model. Args: agent (Agent): The agent to convert. """ agent_name = agent.agent_name agent_description = ( agent.description if agent.description else "No description provided" ) schema = AgentConfigSchema( uuid=agent.id, name=agent_name, description=agent_description, config=agent.to_dict(), ) logger.info( f"Agent {agent_name} converted to Pydantic model." ) self.agent_registry.agents.append(schema)