Maharshi Gor
First Working commit
193db9d
|
raw
history blame
3.11 kB

Workflows Subpackage

This subpackage provides a framework for defining, validating, and executing workflows composed of interconnected model steps with dependency management.

Overview

The workflows subpackage enables the creation and execution of workflows where multiple model steps can be combined, with outputs from earlier steps feeding into inputs of later steps. The package handles dependency resolution, execution order, and error handling.

Components

structs.py

Contains the core data structures used throughout the workflow system:

  • Field: Represents an input or output field with name and type information
  • ModelStep: Represents a single step in a workflow with input fields, output fields, and model details
  • Workflow: A collection of ModelSteps with their identifiers

utils.py

Provides utility functions for workflow operations:

  • _create_variable_step_mapping: Maps variables to the steps that produce them
  • create_dependency_graph: Builds a dependency graph representing the execution order constraints
  • topological_sort: Sorts steps in execution order based on their dependencies

workflow_executor.py

Handles the execution of workflows:

  • Processes inputs and outputs between steps
  • Coordinates the execution of model steps in the correct order
  • Integrates with external model providers (e.g., via litellm)

errors.py

Defines custom exceptions for workflow-related errors:

  • WorkflowError: Base class for workflow errors
  • CyclicDependencyError: Raised when detecting cycles in the workflow graph
  • UnknownVariableError: Raised when a step requires a variable that's not provided or produced

Usage Example

from workflows.structs import Field, ModelStep, Workflow

# Define a workflow with two steps
step1 = ModelStep(
    input_fields=[Field(name="query", type="string")],
    output_fields=[Field(name="summary", type="string")],
    model="gpt-3.5-turbo",
    system_prompt="Summarize the following text"
)

step2 = ModelStep(
    input_fields=[Field(name="summary", type="string", variable="step1.summary")],
    output_fields=[Field(name="key_points", type="array")],
    model="gpt-4",
    system_prompt="Extract key points from the summary"
)

workflow = Workflow(steps={"step1": step1, "step2": step2})

# Execute the workflow
from workflows.workflow_executor import execute_workflow

result = execute_workflow(
    workflow=workflow,
    input_values={"query": "Long text to summarize..."}
)

# Access results
summary = result["step1.summary"]
key_points = result["step2.key_points"]

Error Handling

The workflows system provides robust error handling:

  • Detects cyclic dependencies in workflow definitions
  • Validates input/output variable references
  • Ensures all required inputs are provided

Extending the Workflows System

To extend the workflows system:

  1. Add new model step types by extending the ModelStep class
  2. Create custom field types by extending validation in the execution logic
  3. Implement additional error types in errors.py for specialized error handling