Spaces:
Runtime error
Runtime error
| from __future__ import annotations | |
| import json | |
| import pprint | |
| import uuid | |
| from abc import ABC | |
| from enum import Enum | |
| from typing import Any, Optional | |
| from swarms.artifacts.main import Artifact | |
| from pydantic import BaseModel, Field, StrictStr, conlist | |
| from swarms.artifacts.error_artifact import ErrorArtifact | |
| class BaseTask(ABC): | |
| class State(Enum): | |
| PENDING = 1 | |
| EXECUTING = 2 | |
| FINISHED = 3 | |
| def __init__(self): | |
| self.id = uuid.uuid4().hex | |
| self.state = self.State.PENDING | |
| self.parent_ids = [] | |
| self.child_ids = [] | |
| self.output = None | |
| self.structure = None | |
| # @abstractmethod | |
| def input(self): | |
| pass | |
| def parents(self): | |
| return [self.structure.find_task(parent_id) for parent_id in self.parent_ids] | |
| def children(self): | |
| return [self.structure.find_task(child_id) for child_id in self.child_ids] | |
| def __rshift__(self, child): | |
| return self.add_child(child) | |
| def __lshift__(self, child): | |
| return self.add_parent(child) | |
| def preprocess(self, structure): | |
| self.structure = structure | |
| return self | |
| def add_child(self, child): | |
| if self.structure: | |
| child.structure = self.structure | |
| elif child.structure: | |
| self.structure = child.structure | |
| if child not in self.structure.tasks: | |
| self.structure.tasks.append(child) | |
| if self not in self.structure.tasks: | |
| self.structure.tasks.append(self) | |
| if child.id not in self.child_ids: | |
| self.child_ids.append(child.id) | |
| if self.id not in child.parent_ids: | |
| child.parent_ids.append(self.id) | |
| return child | |
| def add_parent(self, parent): | |
| if self.structure: | |
| parent.structure = self.structure | |
| elif parent.structure: | |
| self.structure = parent.structure | |
| if parent not in self.structure.tasks: | |
| self.structure.tasks.append(parent) | |
| if self not in self.structure.tasks: | |
| self.structure.tasks.append(self) | |
| if parent.id not in self.parent_ids: | |
| self.parent_ids.append(parent.id) | |
| if self.id not in parent.child_ids: | |
| parent.child_ids.append(self.id) | |
| return parent | |
| def is_pending(self): | |
| return self.state == self.State.PENDING | |
| def is_finished(self): | |
| return self.state == self.State.FINISHED | |
| def is_executing(self): | |
| return self.state == self.State.EXECUTING | |
| def before_run(self): | |
| pass | |
| def after_run(self): | |
| pass | |
| def execute(self): | |
| try: | |
| self.state = self.State.EXECUTING | |
| self.before_run() | |
| self.output = self.run() | |
| self.after_run() | |
| except Exception as e: | |
| self.output = ErrorArtifact(str(e)) | |
| finally: | |
| self.state = self.State.FINISHED | |
| return self.output | |
| def can_execute(self): | |
| return self.state == self.State.PENDING and all(parent.is_finished() for parent in self.parents) | |
| def reset(self): | |
| self.state = self.State.PENDING | |
| self.output = None | |
| return self | |
| # @abstractmethod | |
| def run(self): | |
| pass | |
| class Task(BaseModel): | |
| input: Optional[StrictStr] = Field( | |
| None, | |
| description="Input prompt for the task" | |
| ) | |
| additional_input: Optional[Any] = Field( | |
| None, | |
| description="Input parameters for the task. Any value is allowed" | |
| ) | |
| task_id: StrictStr = Field( | |
| ..., | |
| description="ID of the task" | |
| ) | |
| artifacts: conlist(Artifact) = Field( | |
| ..., | |
| description="A list of artifacts that the task has been produced" | |
| ) | |
| __properties = ["input", "additional_input", "task_id", "artifact"] | |
| class Config: | |
| #pydantic config | |
| allow_population_by_field_name = True | |
| validate_assignment = True | |
| def to_str(self) -> str: | |
| """Returns the str representation of the model using alias""" | |
| return pprint.pformat(self.dict(by_alias=True)) | |
| def to_json(self) -> str: | |
| """Returns the JSON representation of the model using alias""" | |
| return json.dumps(self.to_dict()) | |
| def from_json(cls, json_str: str) -> Task: | |
| """Create an instance of Task from a json string""" | |
| return cls.from_dict(json.loads(json_str)) | |
| def to_dict(self): | |
| """Returns the dict representation of the model using alias""" | |
| _dict = self.dict(by_alias=True, exclude={}, exclude_none=True) | |
| _items =[] | |
| if self.artifacts: | |
| for _item in self.artifacts: | |
| if _item: | |
| _items.append(_item.to_dict()) | |
| _dict["artifacts"] = _items | |
| #set to None if additional input is None | |
| # and __fields__set contains the field | |
| if self.additional_input is None and "additional_input" in self.__fields__set__: | |
| _dict["additional_input"] = None | |
| return _dict | |
| def from_dict(cls, obj: dict) -> Task: | |
| """Create an instance of Task from dict""" | |
| if obj is None: | |
| return None | |
| if not isinstance(obj, dict): | |
| return Task.parse_obj(obj) | |
| _obj = Task.parse_obj( | |
| { | |
| "input": obj.get("input"), | |
| "additional_input": obj.get("additional_input"), | |
| "task_id": obj.get("task_id"), | |
| "artifacts": [ | |
| Artifact.from_dict(_item) for _item in obj.get("artifacts") | |
| ] | |
| if obj.get("artifacts") is not None | |
| else None, | |
| } | |
| ) |