|
import json |
|
from unittest.mock import patch |
|
|
|
import pytest |
|
|
|
from workflows.errors import CyclicDependencyError, WorkflowError |
|
from workflows.executors import ( |
|
create_processed_inputs, |
|
execute_model_step, |
|
execute_workflow, |
|
lower, |
|
upper, |
|
) |
|
from workflows.structs import InputField, ModelStep, OutputField, Workflow |
|
|
|
|
|
|
|
|
|
def test_upper(): |
|
"""Test the upper function with different input types.""" |
|
assert upper("hello") == "HELLO" |
|
assert upper("Hello World") == "HELLO WORLD" |
|
assert upper("") == "" |
|
|
|
assert upper(123) == 123 |
|
assert upper([1, 2, 3]) == [1, 2, 3] |
|
assert upper(None) is None |
|
|
|
|
|
def test_lower(): |
|
"""Test the lower function with different input types.""" |
|
assert lower("HELLO") == "hello" |
|
assert lower("Hello World") == "hello world" |
|
assert lower("") == "" |
|
|
|
assert lower(123) == 123 |
|
assert lower([1, 2, 3]) == [1, 2, 3] |
|
assert lower(None) is None |
|
|
|
|
|
|
|
|
|
|
|
def test_create_processed_inputs_basic(): |
|
"""Test basic input processing without transformations.""" |
|
step = ModelStep( |
|
id="test_step", |
|
model="gpt-4", |
|
provider="openai", |
|
call_type="llm", |
|
system_prompt="Test prompt", |
|
input_fields=[InputField(name="text", description="Input text", variable="input_text")], |
|
output_fields=[], |
|
) |
|
available_vars = {"input_text": "Hello World"} |
|
|
|
result = create_processed_inputs(step, available_vars) |
|
assert result == {"text": "Hello World"} |
|
|
|
|
|
def test_create_processed_inputs_with_transformation(): |
|
"""Test input processing with transformation functions.""" |
|
step = ModelStep( |
|
id="test_step", |
|
model="gpt-4", |
|
provider="openai", |
|
call_type="llm", |
|
system_prompt="Test prompt", |
|
input_fields=[ |
|
InputField(name="upper_text", description="Uppercase text", variable="input_text", func="upper"), |
|
InputField(name="lower_text", description="Lowercase text", variable="input_caps", func="lower"), |
|
], |
|
output_fields=[], |
|
) |
|
available_vars = {"input_text": "hello", "input_caps": "WORLD"} |
|
|
|
result = create_processed_inputs(step, available_vars) |
|
assert result == {"upper_text": "HELLO", "lower_text": "world"} |
|
|
|
|
|
def test_create_processed_inputs_missing_var(): |
|
"""Test that appropriate error is raised when a variable is missing.""" |
|
step = ModelStep( |
|
id="test_step", |
|
model="gpt-4", |
|
provider="openai", |
|
call_type="llm", |
|
system_prompt="Test prompt", |
|
input_fields=[InputField(name="text", description="Input text", variable="missing_var")], |
|
output_fields=[], |
|
) |
|
available_vars = {"input_text": "Hello World"} |
|
|
|
with pytest.raises(KeyError): |
|
create_processed_inputs(step, available_vars) |
|
|
|
|
|
def test_create_processed_inputs_unknown_func(): |
|
"""Test that appropriate error is raised when an unknown function is specified.""" |
|
step = ModelStep( |
|
id="test_step", |
|
model="gpt-4", |
|
provider="openai", |
|
call_type="llm", |
|
system_prompt="Test prompt", |
|
input_fields=[InputField(name="text", description="Input text", variable="input_text", func="unknown_func")], |
|
output_fields=[], |
|
) |
|
available_vars = {"input_text": "Hello World"} |
|
|
|
|
|
with pytest.raises(Exception): |
|
create_processed_inputs(step, available_vars) |
|
|
|
|
|
|
|
|
|
|
|
@patch("workflows.executors.litellm.completion") |
|
def test_execute_model_step_success(mock_completion): |
|
"""Test successful execution of a model step with mocked litellm response.""" |
|
|
|
mock_response = {"choices": [{"message": {"content": json.dumps({"summary": "This is a summary"})}}]} |
|
mock_completion.return_value = mock_response |
|
|
|
|
|
step = ModelStep( |
|
id="summarize", |
|
model="gpt-3.5-turbo", |
|
provider="openai", |
|
call_type="llm", |
|
system_prompt="Summarize the text", |
|
input_fields=[InputField(name="text", description="Text to summarize", variable="input_text")], |
|
output_fields=[OutputField(name="summary", description="Summary of the text", type="str")], |
|
) |
|
|
|
|
|
result = execute_model_step(step, {"input_text": "Long text to be summarized..."}) |
|
|
|
|
|
assert result == {"summary": "This is a summary"} |
|
|
|
|
|
mock_completion.assert_called_once() |
|
args, kwargs = mock_completion.call_args |
|
assert kwargs["model"] == "gpt-3.5-turbo" |
|
assert "Summarize the text" in kwargs["messages"][0]["content"] |
|
|
|
|
|
@patch("workflows.executors.litellm.completion") |
|
def test_execute_model_step_error(mock_completion): |
|
"""Test handling of errors in model step execution.""" |
|
|
|
mock_completion.side_effect = Exception("API Error") |
|
|
|
|
|
step = ModelStep( |
|
id="summarize", |
|
model="gpt-3.5-turbo", |
|
provider="openai", |
|
call_type="llm", |
|
system_prompt="Summarize the text", |
|
input_fields=[InputField(name="text", description="Text to summarize", variable="input_text")], |
|
output_fields=[OutputField(name="summary", description="Summary of the text", type="str")], |
|
) |
|
|
|
|
|
with pytest.raises(Exception): |
|
execute_model_step(step, {"input_text": "Long text to be summarized..."}) |
|
|
|
|
|
|
|
|
|
|
|
@patch("workflows.executors.execute_model_step") |
|
def test_execute_workflow_simple(mock_execute_step): |
|
"""Test execution of a simple workflow with a single step.""" |
|
|
|
mock_execute_step.return_value = {"summary": "This is a summary"} |
|
|
|
|
|
step = ModelStep( |
|
id="summarize", |
|
model="gpt-3.5-turbo", |
|
provider="openai", |
|
call_type="llm", |
|
system_prompt="Summarize the text", |
|
input_fields=[InputField(name="text", description="Text to summarize", variable="input_text")], |
|
output_fields=[OutputField(name="summary", description="Summary of the text", type="str")], |
|
) |
|
|
|
workflow = Workflow(steps={"summarize": step}, inputs=["input_text"], outputs={"summary": "summarize.summary"}) |
|
|
|
|
|
result = execute_workflow(workflow, {"input_text": "Long text to be summarized..."}) |
|
|
|
|
|
assert result == {"summary": "This is a summary"} |
|
|
|
|
|
mock_execute_step.assert_called_once() |
|
|
|
|
|
@patch("workflows.executors.execute_model_step") |
|
def test_execute_workflow_multi_step(mock_execute_step): |
|
"""Test execution of a multi-step workflow with dependencies.""" |
|
|
|
|
|
def side_effect(step, available_vars): |
|
if step.id == "extract": |
|
return {"entities": ["Apple", "product"]} |
|
elif step.id == "analyze": |
|
return {"sentiment": "positive"} |
|
return {} |
|
|
|
mock_execute_step.side_effect = side_effect |
|
|
|
|
|
extract_step = ModelStep( |
|
id="extract", |
|
model="gpt-3.5-turbo", |
|
provider="openai", |
|
call_type="llm", |
|
system_prompt="Extract entities", |
|
input_fields=[InputField(name="text", description="Text to analyze", variable="input_text")], |
|
output_fields=[OutputField(name="entities", description="Extracted entities", type="list[str]")], |
|
) |
|
|
|
|
|
analyze_step = ModelStep( |
|
id="analyze", |
|
model="gpt-4", |
|
provider="openai", |
|
call_type="llm", |
|
system_prompt="Analyze sentiment", |
|
input_fields=[InputField(name="entities", description="Entities to analyze", variable="extract.entities")], |
|
output_fields=[OutputField(name="sentiment", description="Sentiment analysis", type="str")], |
|
) |
|
|
|
workflow = Workflow( |
|
steps={"extract": extract_step, "analyze": analyze_step}, |
|
inputs=["input_text"], |
|
outputs={"entities": "extract.entities", "sentiment": "analyze.sentiment"}, |
|
) |
|
|
|
|
|
result = execute_workflow(workflow, {"input_text": "Apple is launching a new product tomorrow."}) |
|
|
|
|
|
assert result == {"entities": ["Apple", "product"], "sentiment": "positive"} |
|
|
|
|
|
assert mock_execute_step.call_count == 2 |
|
|
|
|
|
def test_execute_workflow_missing_input(): |
|
"""Test that an error is raised when a required input is missing.""" |
|
step = ModelStep( |
|
id="summarize", |
|
model="gpt-3.5-turbo", |
|
provider="openai", |
|
call_type="llm", |
|
system_prompt="Summarize the text", |
|
input_fields=[InputField(name="text", description="Text to summarize", variable="input_text")], |
|
output_fields=[OutputField(name="summary", description="Summary of the text", type="str")], |
|
) |
|
|
|
workflow = Workflow(steps={"summarize": step}, inputs=["input_text"], outputs={"summary": "summarize.summary"}) |
|
|
|
|
|
with pytest.raises(WorkflowError, match="Missing required workflow input"): |
|
execute_workflow(workflow, {}) |
|
|
|
|
|
@patch("workflows.executors.create_dependency_graph") |
|
def test_execute_workflow_cyclic_dependency(mock_dependency_graph): |
|
"""Test that a cyclic dependency in the workflow raises an appropriate error.""" |
|
|
|
mock_dependency_graph.side_effect = CyclicDependencyError() |
|
|
|
step = ModelStep( |
|
id="test", |
|
model="gpt-3.5-turbo", |
|
provider="openai", |
|
call_type="llm", |
|
system_prompt="Test", |
|
input_fields=[], |
|
output_fields=[], |
|
) |
|
|
|
workflow = Workflow(steps={"test": step}, inputs=[], outputs=[]) |
|
|
|
|
|
with pytest.raises(CyclicDependencyError): |
|
execute_workflow(workflow, {}) |
|
|