|
|
|
import json |
|
from typing import Any |
|
|
|
import pydantic |
|
|
|
from llms import completion |
|
from workflows.errors import WorkflowError |
|
from workflows.structs import InputField, ModelStep, OutputField, Workflow |
|
from workflows.utils import create_dependency_graph, topological_sort |
|
|
|
""" |
|
Core workflow execution functionality. |
|
|
|
This module handles the execution of defined workflows, including input processing, |
|
dependency-based execution order, model calling, and output collection. It integrates |
|
with the litellm library to handle model interactions. |
|
|
|
Key components: |
|
- Utility functions for input/output transformation |
|
- Input processing and validation |
|
- Model step execution |
|
- Complete workflow execution with dependency resolution |
|
|
|
The module orchestrates the execution of steps in the correct order based on their |
|
dependencies and manages the flow of data between steps. |
|
""" |
|
|
|
|
|
def upper(x): |
|
if isinstance(x, str): |
|
return x.upper() |
|
return x |
|
|
|
|
|
def lower(x): |
|
if isinstance(x, str): |
|
return x.lower() |
|
return x |
|
|
|
|
|
TYPE_MAP = { |
|
"str": str, |
|
"int": int, |
|
"float": float, |
|
"bool": bool, |
|
} |
|
|
|
FUNCTION_MAP = { |
|
"upper": upper, |
|
"lower": lower, |
|
"len": len, |
|
"split": str.split, |
|
} |
|
|
|
|
|
def get_type(type_str: str) -> type: |
|
return TYPE_MAP.get(type_str, eval(type_str)) |
|
|
|
|
|
def create_processed_inputs(model_step: ModelStep, available_vars: dict[str, Any]) -> dict[str, Any]: |
|
""" |
|
Creates processed inputs for a model step. |
|
|
|
This function extracts and processes the required inputs for a model step based on |
|
its input field definitions. It retrieves values from the available variables dictionary |
|
and applies any specified transformations. |
|
|
|
Args: |
|
model_step (ModelStep): The model step for which to create processed inputs. |
|
available_vars (dict[str, Any]): Dictionary of variables available for use as inputs. |
|
Keys are variable names, values are the variable values. |
|
|
|
Returns: |
|
dict[str, Any]: A dictionary of processed inputs ready for use by the model step. |
|
Keys are input field names, values are the processed input values. |
|
|
|
Raises: |
|
WorkflowError: If a required variable is not found in available_vars, |
|
or if a specified transformation function is not available. |
|
|
|
Example: |
|
>>> available_vars = {"step1.output": "Hello World"} |
|
>>> create_processed_inputs(model_step, available_vars) |
|
{"input_field_name": "HELLO WORLD"} # If upper transformation was specified |
|
""" |
|
processed_inputs: dict[str, Any] = {} |
|
for input_field in model_step.input_fields: |
|
var = input_field.variable |
|
value = available_vars[var] |
|
if input_field.func is not None: |
|
func = FUNCTION_MAP.get(input_field.func) |
|
func = func or eval(input_field.func) |
|
value = func(value) |
|
processed_inputs[input_field.name] = value |
|
return processed_inputs |
|
|
|
|
|
|
|
def execute_model_step( |
|
model_step: ModelStep, available_vars: dict[str, Any], return_full_content: bool = False |
|
) -> dict[str, Any] | tuple[dict[str, Any], str]: |
|
""" |
|
Executes a model step using the provided available variables. |
|
|
|
This function handles the complete execution of a model step, including: |
|
1. Processing inputs using variable references and transformations |
|
2. Constructing the appropriate prompt for the model |
|
3. Calling the model via litellm with structured output |
|
4. Processing and validating the model's response |
|
5. Applying any output transformations |
|
|
|
The function supports different providers and model types through the litellm |
|
integration, allowing for a consistent interface regardless of the underlying model. |
|
|
|
Args: |
|
model_step (ModelStep): The model step to execute, containing model details, |
|
input/output specifications, and system prompt. |
|
available_vars (dict[str, Any]): A dictionary of all variables available to this step, |
|
including outputs from previous steps and external inputs. |
|
|
|
Returns: |
|
dict[str, Any]: A dictionary of processed outputs from the model step, |
|
with keys matching the output field names. |
|
|
|
Raises: |
|
WorkflowError: If there's an error in input processing, model execution, |
|
or output validation. |
|
|
|
Example: |
|
>>> step = ModelStep( |
|
... id="summarize", |
|
... model="gpt-3.5-turbo", |
|
... provider="openai", |
|
... call_type="llm", |
|
... system_prompt="Summarize the text", |
|
... input_fields=[InputField(name="text", variable="input_text", description="Text to summarize")], |
|
... output_fields=[OutputField(name="summary", type="str", description="Summary of the text")] |
|
... ) |
|
>>> execute_model_step(step, {"input_text": "Long text to be summarized..."}) |
|
{"summary": "A concise summary of the text."} |
|
""" |
|
|
|
processed_inputs = create_processed_inputs(model_step, available_vars) |
|
|
|
|
|
input_str = ", ".join(f"{k}={v}" for k, v in processed_inputs.items()) |
|
step_result = f"{model_step.system_prompt} | Inputs: {input_str}" |
|
|
|
|
|
fields = { |
|
field.name: (get_type(field.type), pydantic.Field(..., description=field.description)) |
|
for field in model_step.output_fields |
|
} |
|
ModelResponse = pydantic.create_model("ModelResponse", **fields) |
|
|
|
|
|
api_response = completion( |
|
model=f"{model_step.provider}/{model_step.model}", |
|
system=model_step.system_prompt, |
|
prompt=step_result, |
|
response_format=ModelResponse, |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
model_response = api_response["output"] |
|
|
|
outputs = {field.name: model_response[field.name] for field in model_step.output_fields} |
|
if return_full_content: |
|
return outputs, api_response["content"] |
|
return outputs |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
model_step = ModelStep( |
|
id="step1", |
|
model="gpt-4o-mini", |
|
provider="OpenAI", |
|
call_type="llm", |
|
system_prompt="You are a simple NLP tool that takes a string, and a number N, and return the first N entities in the string, and the total count of entities in the string.", |
|
input_fields=[ |
|
InputField(name="sentence", description="The sentence to process", variable="sentence", func=None), |
|
InputField(name="n", description="The number of entities to return", variable="n", func=None), |
|
], |
|
output_fields=[ |
|
OutputField( |
|
name="entities", |
|
description="The first N entities in the string as a list of strings", |
|
type="list[str]", |
|
func=None, |
|
), |
|
OutputField(name="count", description="The total count of entities in the string", type="int", func=None), |
|
], |
|
) |
|
|
|
|
|
processed_inputs = {"sentence": "Abdul Akbar is a good person, but Jesus is the son of God.", "n": 3} |
|
|
|
|
|
outputs = execute_model_step(model_step, processed_inputs) |
|
print(outputs) |
|
|
|
|
|
|
|
def execute_workflow( |
|
workflow: Workflow, input_values: dict[str, Any], return_full_content: bool = False |
|
) -> dict[str, Any] | tuple[dict[str, Any], str]: |
|
""" |
|
Execute the given workflow as a computational graph. |
|
|
|
This function orchestrates the complete execution of a workflow by: |
|
|
|
1. Validating and populating initial values using the provided external inputs |
|
2. Building a dependency graph between workflow steps |
|
3. Determining a valid execution order using topological sorting |
|
4. Executing each step in the correct order, with inputs from previous steps |
|
5. Collecting and returning the final outputs |
|
|
|
The execution process ensures that all dependencies are satisfied before a step |
|
is executed, and that the data flows correctly between steps according to the |
|
variable references defined in each step's input fields. |
|
|
|
Args: |
|
workflow (Workflow): The workflow to execute, containing steps, their |
|
dependencies, and input/output specifications. |
|
input_values (dict[str, Any]): External input values to be used by the workflow. |
|
Keys should match the required workflow.inputs. |
|
|
|
Returns: |
|
dict[str, Any]: A dictionary of the workflow's outputs, with keys matching |
|
the variables defined in workflow.outputs. |
|
|
|
Raises: |
|
UnknownVariableError: If an input_field references a variable that is not |
|
provided externally nor produced by any step. |
|
CyclicDependencyError: If the workflow contains a circular dependency that |
|
prevents a valid execution order. |
|
FunctionNotFoundError: If a transformation function specified in input_fields.func |
|
or output_fields.func is not available. |
|
WorkflowError: For any other workflow-related errors, such as missing required inputs. |
|
|
|
Example: |
|
>>> workflow = Workflow( |
|
... steps={ |
|
... "extract": ModelStep(...), # A step that extracts entities |
|
... "analyze": ModelStep(...) # A step that analyzes the entities |
|
... }, |
|
... inputs=["text"], |
|
... outputs=["analyze.sentiment", "extract.entities"] |
|
... ) |
|
>>> result = execute_workflow(workflow, {"text": "Apple is launching a new product tomorrow."}) |
|
>>> print(result["analyze.sentiment"]) |
|
"positive" |
|
>>> print(result["extract.entities"]) |
|
["Apple", "product"] |
|
""" |
|
|
|
computed_values: dict[str, Any] = {} |
|
for var in workflow.inputs: |
|
if var not in input_values: |
|
raise WorkflowError(f"Missing required workflow input: {var}") |
|
computed_values[var] = input_values[var] |
|
|
|
|
|
|
|
|
|
dependencies = create_dependency_graph(workflow, input_values) |
|
|
|
|
|
|
|
execution_order = topological_sort(dependencies) |
|
|
|
|
|
for step_id in execution_order: |
|
step = workflow.steps[step_id] |
|
|
|
|
|
outputs = execute_model_step(step, computed_values) |
|
outputs = {f"{step_id}.{k}": v for k, v in outputs.items()} |
|
computed_values.update(outputs) |
|
|
|
|
|
final_outputs: dict[str, Any] = {} |
|
for target, var in workflow.outputs.items(): |
|
if var not in computed_values: |
|
raise WorkflowError(f"Workflow output variable {var} was not produced") |
|
final_outputs[target] = computed_values[var] |
|
|
|
return final_outputs |
|
|
|
|
|
def run_examples(): |
|
""" |
|
Runs three example workflows demonstrating: |
|
1. A successful (linear) workflow execution. |
|
2. A cyclic dependency error. |
|
3. An unknown variable dependency error. |
|
""" |
|
print("Example 1: Successful Workflow Execution") |
|
|
|
|
|
|
|
|
|
from workflows.structs import ModelStep, Workflow |
|
|
|
workflow_success = Workflow( |
|
steps={ |
|
"step1": ModelStep( |
|
id="step1", |
|
model="gpt-4o-mini", |
|
provider="OpenAI", |
|
call_type="llm", |
|
system_prompt="Step1 processing", |
|
input_fields=[InputField(name="value", description="Input value", variable="input.value")], |
|
output_fields=[OutputField(name="result", description="Processed result", type="str", func="upper")], |
|
), |
|
"step2": ModelStep( |
|
id="step2", |
|
model="gpt-4o-mini", |
|
provider="OpenAI", |
|
call_type="llm", |
|
system_prompt="Step2 processing", |
|
input_fields=[InputField(name="result", description="Result from step1", variable="step1.result")], |
|
output_fields=[OutputField(name="final", description="Final output", type="str", func="lower")], |
|
), |
|
}, |
|
inputs=["input.value"], |
|
outputs={"final": "step2.final"}, |
|
) |
|
input_values_success = {"input.value": "Hello, World!"} |
|
try: |
|
outputs = execute_workflow(workflow_success, input_values_success) |
|
print("Workflow outputs:", outputs) |
|
except WorkflowError as e: |
|
print("Workflow failed with error:", e) |
|
|
|
print("\nExample 2: Cyclic Dependency Workflow") |
|
|
|
|
|
workflow_cycle = Workflow( |
|
steps={ |
|
"stepA": ModelStep( |
|
id="stepA", |
|
model="gpt-4o-mini", |
|
provider="OpenAI", |
|
call_type="llm", |
|
system_prompt="StepA processing", |
|
input_fields=[ |
|
InputField(name="input", description="Input from stepB", variable="stepB.output", func="identity") |
|
], |
|
output_fields=[OutputField(name="output", description="Output from A", type="str", func="upper")], |
|
), |
|
"stepB": ModelStep( |
|
id="stepB", |
|
model="gpt-4o-mini", |
|
provider="OpenAI", |
|
call_type="llm", |
|
system_prompt="StepB processing", |
|
input_fields=[ |
|
InputField(name="input", description="Input from stepA", variable="stepA.output", func="identity") |
|
], |
|
output_fields=[OutputField(name="output", description="Output from B", type="str", func="upper")], |
|
), |
|
}, |
|
inputs=[], |
|
outputs={"output": "stepB.output"}, |
|
) |
|
try: |
|
outputs = execute_workflow(workflow_cycle, {}) |
|
print("Workflow outputs:", outputs) |
|
except WorkflowError as e: |
|
print("Workflow failed with error:", e) |
|
|
|
print("\nExample 3: Unknown Variable Dependency Workflow") |
|
|
|
workflow_unknown = Workflow( |
|
steps={ |
|
"stepX": ModelStep( |
|
id="stepX", |
|
model="gpt-4o-mini", |
|
provider="OpenAI", |
|
call_type="llm", |
|
system_prompt="StepX processing", |
|
input_fields=[ |
|
InputField( |
|
name="input", description="Non-existent input", variable="nonexistent.value", func="identity" |
|
) |
|
], |
|
output_fields=[OutputField(name="output", description="Output from X", type="str", func="upper")], |
|
) |
|
}, |
|
inputs=[], |
|
outputs={"output": "stepX.output"}, |
|
) |
|
try: |
|
outputs = execute_workflow(workflow_unknown, {}) |
|
print("Workflow outputs:", outputs) |
|
except WorkflowError as e: |
|
print("Workflow failed with error:", e) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
model_step = ModelStep( |
|
id="step1", |
|
model="gpt-4o-mini", |
|
provider="OpenAI", |
|
call_type="llm", |
|
system_prompt="You are a simple NLP tool that takes a string, and a number N, and return the first N entities in the string, and the total count of entities in the string.", |
|
input_fields=[ |
|
InputField(name="sentence", description="The sentence to process", variable="sentence", func=None), |
|
InputField(name="n", description="The number of entities to return", variable="n", func=None), |
|
], |
|
output_fields=[ |
|
OutputField( |
|
name="entities", |
|
description="The first N entities in the string as a list of strings", |
|
type="list[str]", |
|
func=None, |
|
), |
|
OutputField(name="count", description="The total count of entities in the string", type="int", func=None), |
|
], |
|
) |
|
|
|
processed_inputs = {"sentence": "Abdul Akbar is a good person, but Jesus is the son of God.", "n": 3} |
|
processed_inputs = create_processed_inputs(model_step, processed_inputs) |
|
print(processed_inputs) |
|
|
|
run_examples() |
|
|
|
|
|
|