|
# Workflows Subpackage |
|
|
|
This subpackage provides a framework for defining, validating, and executing workflows composed of interconnected model steps with dependency management. |
|
|
|
## Overview |
|
|
|
The workflows subpackage enables the creation and execution of workflows where multiple model steps can be combined, with outputs from earlier steps feeding into inputs of later steps. The package handles dependency resolution, execution order, and error handling. |
|
|
|
## Components |
|
|
|
### `structs.py` |
|
|
|
Contains the core data structures used throughout the workflow system: |
|
|
|
- `Field`: Represents an input or output field with name and type information |
|
- `ModelStep`: Represents a single step in a workflow with input fields, output fields, and model details |
|
- `Workflow`: A collection of ModelSteps with their identifiers |
|
|
|
### `utils.py` |
|
|
|
Provides utility functions for workflow operations: |
|
|
|
- `_create_variable_step_mapping`: Maps variables to the steps that produce them |
|
- `create_dependency_graph`: Builds a dependency graph representing the execution order constraints |
|
- `topological_sort`: Sorts steps in execution order based on their dependencies |
|
|
|
### `workflow_executor.py` |
|
|
|
Handles the execution of workflows: |
|
|
|
- Processes inputs and outputs between steps |
|
- Coordinates the execution of model steps in the correct order |
|
- Integrates with external model providers (e.g., via litellm) |
|
|
|
### `errors.py` |
|
|
|
Defines custom exceptions for workflow-related errors: |
|
|
|
- `WorkflowError`: Base class for workflow errors |
|
- `CyclicDependencyError`: Raised when detecting cycles in the workflow graph |
|
- `UnknownVariableError`: Raised when a step requires a variable that's not provided or produced |
|
|
|
## Usage Example |
|
|
|
```python |
|
from workflows.structs import Field, ModelStep, Workflow |
|
|
|
# Define a workflow with two steps |
|
step1 = ModelStep( |
|
input_fields=[Field(name="query", type="string")], |
|
output_fields=[Field(name="summary", type="string")], |
|
model="gpt-3.5-turbo", |
|
system_prompt="Summarize the following text" |
|
) |
|
|
|
step2 = ModelStep( |
|
input_fields=[Field(name="summary", type="string", variable="step1.summary")], |
|
output_fields=[Field(name="key_points", type="array")], |
|
model="gpt-4", |
|
system_prompt="Extract key points from the summary" |
|
) |
|
|
|
workflow = Workflow(steps={"step1": step1, "step2": step2}) |
|
|
|
# Execute the workflow |
|
from workflows.workflow_executor import execute_workflow |
|
|
|
result = execute_workflow( |
|
workflow=workflow, |
|
input_values={"query": "Long text to summarize..."} |
|
) |
|
|
|
# Access results |
|
summary = result["step1.summary"] |
|
key_points = result["step2.key_points"] |
|
``` |
|
|
|
## Error Handling |
|
|
|
The workflows system provides robust error handling: |
|
|
|
- Detects cyclic dependencies in workflow definitions |
|
- Validates input/output variable references |
|
- Ensures all required inputs are provided |
|
|
|
## Extending the Workflows System |
|
|
|
To extend the workflows system: |
|
|
|
1. Add new model step types by extending the `ModelStep` class |
|
2. Create custom field types by extending validation in the execution logic |
|
3. Implement additional error types in `errors.py` for specialized error handling |