Workflows Subpackage
This subpackage provides a framework for defining, validating, and executing workflows composed of interconnected model steps with dependency management.
Overview
The workflows subpackage enables the creation and execution of workflows where multiple model steps can be combined, with outputs from earlier steps feeding into inputs of later steps. The package handles dependency resolution, execution order, and error handling.
Components
structs.py
Contains the core data structures used throughout the workflow system:
Field
: Represents an input or output field with name and type informationModelStep
: Represents a single step in a workflow with input fields, output fields, and model detailsWorkflow
: A collection of ModelSteps with their identifiers
utils.py
Provides utility functions for workflow operations:
_create_variable_step_mapping
: Maps variables to the steps that produce themcreate_dependency_graph
: Builds a dependency graph representing the execution order constraintstopological_sort
: Sorts steps in execution order based on their dependencies
workflow_executor.py
Handles the execution of workflows:
- Processes inputs and outputs between steps
- Coordinates the execution of model steps in the correct order
- Integrates with external model providers (e.g., via litellm)
errors.py
Defines custom exceptions for workflow-related errors:
WorkflowError
: Base class for workflow errorsCyclicDependencyError
: Raised when detecting cycles in the workflow graphUnknownVariableError
: Raised when a step requires a variable that's not provided or produced
Usage Example
from workflows.structs import Field, ModelStep, Workflow
# Define a workflow with two steps
step1 = ModelStep(
input_fields=[Field(name="query", type="string")],
output_fields=[Field(name="summary", type="string")],
model="gpt-3.5-turbo",
system_prompt="Summarize the following text"
)
step2 = ModelStep(
input_fields=[Field(name="summary", type="string", variable="step1.summary")],
output_fields=[Field(name="key_points", type="array")],
model="gpt-4",
system_prompt="Extract key points from the summary"
)
workflow = Workflow(steps={"step1": step1, "step2": step2})
# Execute the workflow
from workflows.workflow_executor import execute_workflow
result = execute_workflow(
workflow=workflow,
input_values={"query": "Long text to summarize..."}
)
# Access results
summary = result["step1.summary"]
key_points = result["step2.key_points"]
Error Handling
The workflows system provides robust error handling:
- Detects cyclic dependencies in workflow definitions
- Validates input/output variable references
- Ensures all required inputs are provided
Extending the Workflows System
To extend the workflows system:
- Add new model step types by extending the
ModelStep
class - Create custom field types by extending validation in the execution logic
- Implement additional error types in
errors.py
for specialized error handling