import uuid from datetime import datetime from typing import Any, Callable, Dict, List, Literal, Union from doc_master import doc_master from pydantic import BaseModel, Field from tenacity import retry, stop_after_attempt, wait_fixed from swarms.prompts.ag_prompt import aggregator_system_prompt from swarms.structs.agent import Agent from swarms.structs.concurrent_workflow import ConcurrentWorkflow from swarms.structs.mixture_of_agents import MixtureOfAgents from swarms.structs.rearrange import AgentRearrange from swarms.structs.sequential_workflow import SequentialWorkflow from swarms.structs.spreadsheet_swarm import SpreadSheetSwarm from swarms.structs.swarm_matcher import swarm_matcher from swarms.utils.wrapper_clusterop import ( exec_callable_with_clusterops, ) from swarms.utils.loguru_logger import initialize_logger logger = initialize_logger(log_folder="swarm_router") SwarmType = Literal[ "AgentRearrange", "MixtureOfAgents", "SpreadSheetSwarm", "SequentialWorkflow", "ConcurrentWorkflow", "auto", ] class Document(BaseModel): file_path: str data: str class SwarmLog(BaseModel): """ A Pydantic model to capture log entries. """ id: str = Field(default_factory=lambda: str(uuid.uuid4())) timestamp: datetime = Field(default_factory=datetime.utcnow) level: str message: str swarm_type: SwarmType task: str = "" metadata: Dict[str, Any] = Field(default_factory=dict) documents: List[Document] = [] class SwarmRouter: """ A class that dynamically routes tasks to different swarm types based on user selection or automatic matching. The SwarmRouter enables flexible task execution by either using a specified swarm type or automatically determining the most suitable swarm type for a given task. It handles task execution while managing logging, type validation, and metadata capture. Args: name (str, optional): Name identifier for the SwarmRouter instance. Defaults to "swarm-router". description (str, optional): Description of the SwarmRouter's purpose. Defaults to "Routes your task to the desired swarm". max_loops (int, optional): Maximum number of execution loops. Defaults to 1. agents (List[Union[Agent, Callable]], optional): List of Agent objects or callables to use. Defaults to empty list. swarm_type (SwarmType, optional): Type of swarm to use. Defaults to "SequentialWorkflow". autosave (bool, optional): Whether to enable autosaving. Defaults to False. flow (str, optional): Flow configuration string. Defaults to None. return_json (bool, optional): Whether to return results as JSON. Defaults to False. auto_generate_prompts (bool, optional): Whether to auto-generate agent prompts. Defaults to False. shared_memory_system (Any, optional): Shared memory system for agents. Defaults to None. rules (str, optional): Rules to inject into every agent. Defaults to None. documents (List[str], optional): List of document file paths to use. Defaults to empty list. output_type (str, optional): Output format type. Defaults to "string". Attributes: name (str): Name identifier for the SwarmRouter instance description (str): Description of the SwarmRouter's purpose max_loops (int): Maximum number of execution loops agents (List[Union[Agent, Callable]]): List of Agent objects or callables swarm_type (SwarmType): Type of swarm being used autosave (bool): Whether autosaving is enabled flow (str): Flow configuration string return_json (bool): Whether results are returned as JSON auto_generate_prompts (bool): Whether prompt auto-generation is enabled shared_memory_system (Any): Shared memory system for agents rules (str): Rules injected into every agent documents (List[str]): List of document file paths output_type (str): Output format type logs (List[SwarmLog]): List of execution logs swarm: The instantiated swarm object Available Swarm Types: - AgentRearrange: Optimizes agent arrangement for task execution - MixtureOfAgents: Combines multiple agent types for diverse tasks - SpreadSheetSwarm: Uses spreadsheet-like operations for task management - SequentialWorkflow: Executes tasks sequentially - ConcurrentWorkflow: Executes tasks in parallel - "auto": Automatically selects best swarm type via embedding search Methods: run(task: str, device: str = "cpu", all_cores: bool = False, all_gpus: bool = False, *args, **kwargs) -> Any: Executes a task using the configured swarm batch_run(tasks: List[str], *args, **kwargs) -> List[Any]: Executes multiple tasks in sequence threaded_run(task: str, *args, **kwargs) -> Any: Executes a task in a separate thread async_run(task: str, *args, **kwargs) -> Any: Executes a task asynchronously concurrent_run(task: str, *args, **kwargs) -> Any: Executes a task using concurrent execution concurrent_batch_run(tasks: List[str], *args, **kwargs) -> List[Any]: Executes multiple tasks concurrently get_logs() -> List[SwarmLog]: Retrieves execution logs """ def __init__( self, name: str = "swarm-router", description: str = "Routes your task to the desired swarm", max_loops: int = 1, agents: List[Union[Agent, Callable]] = [], swarm_type: SwarmType = "SequentialWorkflow", # "SpreadSheetSwarm" # "auto" autosave: bool = False, rearrange_flow: str = None, return_json: bool = False, auto_generate_prompts: bool = False, shared_memory_system: Any = None, rules: str = None, documents: List[str] = [], # A list of docs file paths output_type: str = "string", # Md, PDF, Txt, csv no_cluster_ops: bool = False, *args, **kwargs, ): self.name = name self.description = description self.max_loops = max_loops self.agents = agents self.swarm_type = swarm_type self.autosave = autosave self.rearrange_flow = rearrange_flow self.return_json = return_json self.auto_generate_prompts = auto_generate_prompts self.shared_memory_system = shared_memory_system self.rules = rules self.documents = documents self.output_type = output_type self.no_cluster_ops = no_cluster_ops self.logs = [] self.reliability_check() self._log( "info", f"SwarmRouter initialized with swarm type: {swarm_type}", ) # Handle Automated Prompt Engineering self.activate_ape() # Handle shared memory if self.shared_memory_system is not None: self.activate_shared_memory() # Handle rules if self.rules is not None: self.handle_rules() # if self.documents is not None: # self.handle_docs() # let's make a function that checks the agents parameter and disables clusterops def deactivate_clusterops(self): for agent in self.agents: agent.do_not_use_cluster_ops = True def handle_docs(self): # Process all documents in parallel using list comprehension data = "".join( [doc_master(file_path=doc) for doc in self.documents] ) # Update all agents' prompts at once doc_prompt = f"##### Documents Available ########## {data}" for agent in self.agents: agent.system_prompt += doc_prompt # Add documents to the logs # self.logs.append(Document(file_path=self.documents, data=data)) def activate_shared_memory(self): logger.info("Activating shared memory with all agents ") for agent in self.agents: agent.long_term_memory = self.shared_memory_system logger.info("All agents now have the same memory system") def handle_rules(self): logger.info("Injecting rules to every agent!") for agent in self.agents: agent.system_prompt += f"### Swarm Rules ### {self.rules}" logger.info("Finished injecting rules") def activate_ape(self): """Activate automatic prompt engineering for agents that support it""" try: logger.info("Activating automatic prompt engineering...") activated_count = 0 for agent in self.agents: if hasattr(agent, "auto_generate_prompt"): agent.auto_generate_prompt = ( self.auto_generate_prompts ) activated_count += 1 logger.debug( f"Activated APE for agent: {agent.name if hasattr(agent, 'name') else 'unnamed'}" ) logger.info( f"Successfully activated APE for {activated_count} agents" ) self._log( "info", f"Activated automatic prompt engineering for {activated_count} agents", ) except Exception as e: error_msg = f"Error activating automatic prompt engineering: {str(e)}" logger.error(error_msg) self._log("error", error_msg) raise RuntimeError(error_msg) from e @retry(stop=stop_after_attempt(3), wait=wait_fixed(1)) def reliability_check(self): logger.info("Initializing reliability checks") if not self.agents: raise ValueError("No agents provided for the swarm.") if self.swarm_type is None: raise ValueError("Swarm type cannot be 'none'.") if self.max_loops == 0: raise ValueError("max_loops cannot be 0.") logger.info( "Reliability checks completed your swarm is ready." ) def _create_swarm( self, task: str = None, *args, **kwargs ) -> Union[ AgentRearrange, MixtureOfAgents, SpreadSheetSwarm, SequentialWorkflow, ConcurrentWorkflow, ]: """ Dynamically create and return the specified swarm type or automatically match the best swarm type for a given task. Args: task (str, optional): The task to be executed by the swarm. Defaults to None. *args: Variable length argument list. **kwargs: Arbitrary keyword arguments. Returns: Union[AgentRearrange, MixtureOfAgents, SpreadSheetSwarm, SequentialWorkflow, ConcurrentWorkflow]: The instantiated swarm object. Raises: ValueError: If an invalid swarm type is provided. """ if self.swarm_type == "auto": self.swarm_type = str(swarm_matcher(task)) self._create_swarm(self.swarm_type) if self.no_cluster_ops: self.deactivate_clusterops() elif self.swarm_type == "AgentRearrange": return AgentRearrange( name=self.name, description=self.description, agents=self.agents, max_loops=self.max_loops, flow=self.rearrange_flow, return_json=self.return_json, output_type=self.output_type, *args, **kwargs, ) elif self.swarm_type == "MixtureOfAgents": return MixtureOfAgents( name=self.name, description=self.description, agents=self.agents, aggregator_system_prompt=aggregator_system_prompt.get_prompt(), aggregator_agent=self.agents[-1], layers=self.max_loops, *args, **kwargs, ) elif self.swarm_type == "SpreadSheetSwarm": return SpreadSheetSwarm( name=self.name, description=self.description, agents=self.agents, max_loops=self.max_loops, autosave_on=self.autosave, *args, **kwargs, ) elif self.swarm_type == "SequentialWorkflow": return SequentialWorkflow( name=self.name, description=self.description, agents=self.agents, max_loops=self.max_loops, shared_memory_system=self.shared_memory_system, output_type=self.output_type, return_json=self.return_json, *args, **kwargs, ) elif self.swarm_type == "ConcurrentWorkflow": return ConcurrentWorkflow( name=self.name, description=self.description, agents=self.agents, max_loops=self.max_loops, auto_save=self.autosave, return_str_on=self.return_json, *args, **kwargs, ) else: raise ValueError( f"Invalid swarm type: {self.swarm_type} try again with a valid swarm type such as 'SequentialWorkflow' or 'ConcurrentWorkflow' or 'auto' or 'AgentRearrange' or 'MixtureOfAgents' or 'SpreadSheetSwarm'" ) def _log( self, level: str, message: str, task: str = "", metadata: Dict[str, Any] = None, ): """ Create a log entry and add it to the logs list. Args: level (str): The log level (e.g., "info", "error"). message (str): The log message. task (str, optional): The task being performed. Defaults to "". metadata (Dict[str, Any], optional): Additional metadata. Defaults to None. """ log_entry = SwarmLog( level=level, message=message, swarm_type=self.swarm_type, task=task, metadata=metadata or {}, ) self.logs.append(log_entry) logger.log(level.upper(), message) @retry(stop=stop_after_attempt(3), wait=wait_fixed(1)) def _run(self, task: str, img: str, *args, **kwargs) -> Any: """ Dynamically run the specified task on the selected or matched swarm type. Args: task (str): The task to be executed by the swarm. *args: Variable length argument list. **kwargs: Arbitrary keyword arguments. Returns: Any: The result of the swarm's execution. Raises: Exception: If an error occurs during task execution. """ self.swarm = self._create_swarm(task, *args, **kwargs) try: self._log( "info", f"Running task on {self.swarm_type} swarm with task: {task}", ) result = self.swarm.run(task=task, *args, **kwargs) self._log( "success", f"Task completed successfully on {self.swarm_type} swarm", task=task, metadata={"result": str(result)}, ) return result except Exception as e: self._log( "error", f"Error occurred while running task on {self.swarm_type} swarm: {str(e)}", task=task, metadata={"error": str(e)}, ) raise def run( self, task: str, img: str = None, device: str = "cpu", all_cores: bool = True, all_gpus: bool = False, no_clusterops: bool = True, *args, **kwargs, ) -> Any: """ Execute a task on the selected swarm type with specified compute resources. Args: task (str): The task to be executed by the swarm. device (str, optional): Device to run on - "cpu" or "gpu". Defaults to "cpu". all_cores (bool, optional): Whether to use all CPU cores. Defaults to True. all_gpus (bool, optional): Whether to use all available GPUs. Defaults to False. *args: Variable length argument list. **kwargs: Arbitrary keyword arguments. Returns: Any: The result of the swarm's execution. Raises: Exception: If an error occurs during task execution. """ try: if no_clusterops: return self._run(task=task, img=img, *args, **kwargs) else: return exec_callable_with_clusterops( func=self._run, device=device, all_cores=all_cores, all_gpus=all_gpus, task=task, *args, **kwargs, ) except Exception as e: logger.error(f"Error executing task on swarm: {str(e)}") raise def __call__(self, task: str, *args, **kwargs) -> Any: """ Make the SwarmRouter instance callable. Args: task (str): The task to be executed by the swarm. *args: Variable length argument list. **kwargs: Arbitrary keyword arguments. Returns: Any: The result of the swarm's execution. """ return self.run(task=task, *args, **kwargs) def batch_run( self, tasks: List[str], *args, **kwargs ) -> List[Any]: """ Execute a batch of tasks on the selected or matched swarm type. Args: tasks (List[str]): A list of tasks to be executed by the swarm. *args: Variable length argument list. **kwargs: Arbitrary keyword arguments. Returns: List[Any]: A list of results from the swarm's execution. Raises: Exception: If an error occurs during task execution. """ results = [] for task in tasks: try: result = self.run(task, *args, **kwargs) results.append(result) except Exception as e: self._log( "error", f"Error occurred while running batch task on {self.swarm_type} swarm: {str(e)}", task=task, metadata={"error": str(e)}, ) raise return results def threaded_run(self, task: str, *args, **kwargs) -> Any: """ Execute a task on the selected or matched swarm type using threading. Args: task (str): The task to be executed by the swarm. *args: Variable length argument list. **kwargs: Arbitrary keyword arguments. Returns: Any: The result of the swarm's execution. Raises: Exception: If an error occurs during task execution. """ from threading import Thread def run_in_thread(): try: result = self.run(task, *args, **kwargs) return result except Exception as e: self._log( "error", f"Error occurred while running task in thread on {self.swarm_type} swarm: {str(e)}", task=task, metadata={"error": str(e)}, ) raise thread = Thread(target=run_in_thread) thread.start() thread.join() return thread.result def async_run(self, task: str, *args, **kwargs) -> Any: """ Execute a task on the selected or matched swarm type asynchronously. Args: task (str): The task to be executed by the swarm. *args: Variable length argument list. **kwargs: Arbitrary keyword arguments. Returns: Any: The result of the swarm's execution. Raises: Exception: If an error occurs during task execution. """ import asyncio async def run_async(): try: result = await asyncio.to_thread( self.run, task, *args, **kwargs ) return result except Exception as e: self._log( "error", f"Error occurred while running task asynchronously on {self.swarm_type} swarm: {str(e)}", task=task, metadata={"error": str(e)}, ) raise return asyncio.run(run_async()) def get_logs(self) -> List[SwarmLog]: """ Retrieve all logged entries. Returns: List[SwarmLog]: A list of all log entries. """ return self.logs def concurrent_run(self, task: str, *args, **kwargs) -> Any: """ Execute a task on the selected or matched swarm type concurrently. Args: task (str): The task to be executed by the swarm. *args: Variable length argument list. **kwargs: Arbitrary keyword arguments. Returns: Any: The result of the swarm's execution. Raises: Exception: If an error occurs during task execution. """ from concurrent.futures import ThreadPoolExecutor with ThreadPoolExecutor() as executor: future = executor.submit(self.run, task, *args, **kwargs) result = future.result() return result def concurrent_batch_run( self, tasks: List[str], *args, **kwargs ) -> List[Any]: """ Execute a batch of tasks on the selected or matched swarm type concurrently. Args: tasks (List[str]): A list of tasks to be executed by the swarm. *args: Variable length argument list. **kwargs: Arbitrary keyword arguments. Returns: List[Any]: A list of results from the swarm's execution. Raises: Exception: If an error occurs during task execution. """ from concurrent.futures import ( ThreadPoolExecutor, as_completed, ) results = [] with ThreadPoolExecutor() as executor: # Submit all tasks to executor futures = [ executor.submit(self.run, task, *args, **kwargs) for task in tasks ] # Process results as they complete rather than waiting for all for future in as_completed(futures): try: result = future.result() results.append(result) except Exception as e: logger.error(f"Task execution failed: {str(e)}") results.append(None) return results def swarm_router( name: str = "swarm-router", description: str = "Routes your task to the desired swarm", max_loops: int = 1, agents: List[Union[Agent, Callable]] = [], swarm_type: SwarmType = "SequentialWorkflow", # "SpreadSheetSwarm" # "auto" autosave: bool = False, flow: str = None, return_json: bool = True, auto_generate_prompts: bool = False, task: str = None, rules: str = None, *args, **kwargs, ) -> SwarmRouter: """ Create and run a SwarmRouter instance with the given configuration. Args: name (str, optional): Name of the swarm router. Defaults to "swarm-router". description (str, optional): Description of the router. Defaults to "Routes your task to the desired swarm". max_loops (int, optional): Maximum number of execution loops. Defaults to 1. agents (List[Union[Agent, Callable]], optional): List of agents or callables. Defaults to []. swarm_type (SwarmType, optional): Type of swarm to use. Defaults to "SequentialWorkflow". autosave (bool, optional): Whether to autosave results. Defaults to False. flow (str, optional): Flow configuration. Defaults to None. return_json (bool, optional): Whether to return results as JSON. Defaults to True. auto_generate_prompts (bool, optional): Whether to auto-generate prompts. Defaults to False. task (str, optional): Task to execute. Defaults to None. *args: Additional positional arguments passed to SwarmRouter.run() **kwargs: Additional keyword arguments passed to SwarmRouter.run() Returns: Any: Result from executing the swarm router Raises: ValueError: If invalid arguments are provided Exception: If an error occurs during router creation or task execution """ try: logger.info( f"Creating SwarmRouter with name: {name}, swarm_type: {swarm_type}" ) if not agents: logger.warning( "No agents provided, router may have limited functionality" ) if task is None: logger.warning("No task provided") swarm_router = SwarmRouter( name=name, description=description, max_loops=max_loops, agents=agents, swarm_type=swarm_type, autosave=autosave, flow=flow, return_json=return_json, auto_generate_prompts=auto_generate_prompts, rules=rules, ) logger.info(f"Executing task with SwarmRouter: {task}") result = swarm_router.run(task, *args, **kwargs) logger.info( f"Task execution completed successfully: {result}" ) return result except ValueError as e: logger.error( f"Invalid arguments provided to swarm_router: {str(e)}" ) raise except Exception as e: logger.error(f"Error in swarm_router execution: {str(e)}") raise