|
import json |
|
from unittest.mock import patch |
|
|
|
import pytest |
|
|
|
from workflows.errors import CyclicDependencyError, WorkflowError |
|
from workflows.executors import ( |
|
create_processed_inputs, |
|
execute_model_step, |
|
execute_workflow, |
|
) |
|
from workflows.structs import CallType, InputField, ModelStep, OutputField, Workflow |
|
|
|
|
|
lower = str.lower |
|
upper = str.upper |
|
|
|
|
|
|
|
|
|
|
|
def assert_model_step_result(result: dict, expected_result: dict): |
|
|
|
assert isinstance(result, dict) |
|
assert "outputs" in result |
|
assert "content" in result |
|
assert "logprob" in result |
|
assert result["outputs"] == expected_result["outputs"] |
|
assert result["content"] == expected_result["content"] |
|
assert result["logprob"] == expected_result["logprob"] |
|
|
|
|
|
def assert_workflow_output(output: dict, expected_output: dict): |
|
assert isinstance(output, dict) |
|
for key in ["final_outputs", "intermediate_outputs", "step_contents", "logprob"]: |
|
assert key in output |
|
assert output[key] == expected_output[key] |
|
|
|
|
|
def test_create_processed_inputs_basic(): |
|
"""Test basic input processing without transformations.""" |
|
step = ModelStep( |
|
id="test_step", |
|
name="Test Step", |
|
model="gpt-4", |
|
provider="OpenAI", |
|
call_type=CallType.LLM, |
|
system_prompt="Test prompt", |
|
input_fields=[InputField(name="text", description="Input text", variable="input_text")], |
|
output_fields=[], |
|
) |
|
available_vars = {"input_text": "Hello World"} |
|
|
|
result = create_processed_inputs(step, available_vars) |
|
assert result == {"text": "Hello World"} |
|
|
|
|
|
def test_create_processed_inputs_with_transformation(): |
|
"""Test input processing with transformation functions.""" |
|
step = ModelStep( |
|
id="test_step", |
|
name="Test Step", |
|
model="gpt-4", |
|
provider="OpenAI", |
|
call_type=CallType.LLM, |
|
system_prompt="Test prompt", |
|
input_fields=[ |
|
InputField(name="upper_text", description="Uppercase text", variable="input_text", func="upper"), |
|
InputField(name="lower_text", description="Lowercase text", variable="input_caps", func="lower"), |
|
], |
|
output_fields=[], |
|
) |
|
available_vars = {"input_text": "hello", "input_caps": "WORLD"} |
|
|
|
result = create_processed_inputs(step, available_vars) |
|
assert result == {"upper_text": "HELLO", "lower_text": "world"} |
|
|
|
|
|
def test_create_processed_inputs_missing_var(): |
|
"""Test that appropriate error is raised when a variable is missing.""" |
|
step = ModelStep( |
|
id="test_step", |
|
name="Test Step", |
|
model="gpt-4", |
|
provider="OpenAI", |
|
call_type=CallType.LLM, |
|
system_prompt="Test prompt", |
|
input_fields=[InputField(name="text", description="Input text", variable="missing_var")], |
|
output_fields=[], |
|
) |
|
available_vars = {"input_text": "Hello World"} |
|
|
|
with pytest.raises(KeyError): |
|
create_processed_inputs(step, available_vars) |
|
|
|
|
|
def test_create_processed_inputs_unknown_func(): |
|
"""Test that appropriate error is raised when an unknown function is specified.""" |
|
step = ModelStep( |
|
id="test_step", |
|
name="Test Step", |
|
model="gpt-4", |
|
provider="OpenAI", |
|
call_type=CallType.LLM, |
|
system_prompt="Test prompt", |
|
input_fields=[InputField(name="text", description="Input text", variable="input_text", func="unknown_func")], |
|
output_fields=[], |
|
) |
|
available_vars = {"input_text": "Hello World"} |
|
|
|
|
|
with pytest.raises(Exception): |
|
create_processed_inputs(step, available_vars) |
|
|
|
|
|
|
|
|
|
|
|
@patch("workflows.executors.completion") |
|
def test_execute_model_step_success(mock_completion): |
|
"""Test successful execution of a model step with mocked litellm response.""" |
|
|
|
mock_response = { |
|
"content": json.dumps({"summary": "This is a summary"}), |
|
"output": {"summary": "This is a summary"}, |
|
} |
|
mock_completion.return_value = mock_response |
|
|
|
|
|
step = ModelStep( |
|
id="summarize", |
|
name="Summarize Text", |
|
model="gpt-3.5-turbo", |
|
provider="OpenAI", |
|
call_type=CallType.LLM, |
|
system_prompt="Summarize the text", |
|
input_fields=[InputField(name="text", description="Text to summarize", variable="input_text")], |
|
output_fields=[OutputField(name="summary", description="Summary of the text", type="str")], |
|
) |
|
|
|
|
|
result = execute_model_step(step, {"input_text": "Long text to be summarized..."}) |
|
|
|
|
|
assert isinstance(result, dict) |
|
assert "outputs" in result |
|
assert "content" in result |
|
assert "logprob" in result |
|
assert result["outputs"] == {"summary": "This is a summary"} |
|
assert result["content"] is None |
|
assert result["logprob"] is None |
|
|
|
|
|
mock_completion.assert_called_once() |
|
args, kwargs = mock_completion.call_args |
|
assert kwargs["model"] == "OpenAI/gpt-3.5-turbo" |
|
assert "Summarize the text" in kwargs["system"] |
|
|
|
|
|
@patch("workflows.executors.completion") |
|
def test_execute_model_step_with_full_content(mock_completion): |
|
"""Test execution of a model step with full content returned.""" |
|
|
|
mock_response = { |
|
"content": "Full model response content", |
|
"output": {"summary": "This is a summary"}, |
|
} |
|
mock_completion.return_value = mock_response |
|
|
|
|
|
step = ModelStep( |
|
id="summarize", |
|
name="Summarize Text", |
|
model="gpt-3.5-turbo", |
|
provider="OpenAI", |
|
call_type=CallType.LLM, |
|
system_prompt="Summarize the text", |
|
input_fields=[InputField(name="text", description="Text to summarize", variable="input_text")], |
|
output_fields=[OutputField(name="summary", description="Summary of the text", type="str")], |
|
) |
|
|
|
|
|
result = execute_model_step(step, {"input_text": "Long text to be summarized..."}, return_full_content=True) |
|
|
|
|
|
assert isinstance(result, dict) |
|
assert "outputs" in result |
|
assert "content" in result |
|
assert "logprob" in result |
|
assert result["outputs"] == {"summary": "This is a summary"} |
|
assert result["content"] == "Full model response content" |
|
assert result["logprob"] is None |
|
|
|
|
|
@patch("workflows.executors.completion") |
|
def test_execute_model_step_with_logprobs(mock_completion): |
|
"""Test execution of a model step with log probabilities.""" |
|
|
|
mock_response = { |
|
"content": json.dumps({"summary": "This is a summary"}), |
|
"output": {"summary": "This is a summary"}, |
|
"log_prob": -2.5, |
|
} |
|
mock_completion.return_value = mock_response |
|
|
|
|
|
step = ModelStep( |
|
id="summarize", |
|
name="Summarize Text", |
|
model="gpt-3.5-turbo", |
|
provider="OpenAI", |
|
call_type=CallType.LLM, |
|
system_prompt="Summarize the text", |
|
input_fields=[InputField(name="text", description="Text to summarize", variable="input_text")], |
|
output_fields=[OutputField(name="summary", description="Summary of the text", type="str")], |
|
) |
|
|
|
|
|
result = execute_model_step(step, {"input_text": "Long text to be summarized..."}, logprobs=True) |
|
|
|
|
|
assert isinstance(result, dict) |
|
assert "outputs" in result |
|
assert "content" in result |
|
assert "logprob" in result |
|
assert result["outputs"] == {"summary": "This is a summary"} |
|
assert result["content"] is None |
|
assert result["logprob"] == -2.5 |
|
|
|
|
|
@patch("workflows.executors.completion") |
|
def test_execute_model_step_error(mock_completion): |
|
"""Test handling of errors in model step execution.""" |
|
|
|
mock_completion.side_effect = Exception("API Error") |
|
|
|
|
|
step = ModelStep( |
|
id="summarize", |
|
name="Summarize Text", |
|
model="gpt-3.5-turbo", |
|
provider="OpenAI", |
|
call_type=CallType.LLM, |
|
system_prompt="Summarize the text", |
|
input_fields=[InputField(name="text", description="Text to summarize", variable="input_text")], |
|
output_fields=[OutputField(name="summary", description="Summary of the text", type="str")], |
|
) |
|
|
|
|
|
with pytest.raises(Exception): |
|
execute_model_step(step, {"input_text": "Long text to be summarized..."}) |
|
|
|
|
|
|
|
|
|
|
|
@patch("workflows.executors.execute_model_step") |
|
def test_execute_workflow_simple(mock_execute_step): |
|
"""Test execution of a simple workflow with a single step.""" |
|
|
|
mock_result = {"outputs": {"summary": "This is a summary"}, "content": None, "logprob": None} |
|
mock_execute_step.return_value = mock_result |
|
|
|
|
|
step = ModelStep( |
|
id="summarize", |
|
name="Summarize Text", |
|
model="gpt-3.5-turbo", |
|
provider="OpenAI", |
|
call_type=CallType.LLM, |
|
system_prompt="Summarize the text", |
|
input_fields=[InputField(name="text", description="Text to summarize", variable="input_text")], |
|
output_fields=[OutputField(name="summary", description="Summary of the text", type="str")], |
|
) |
|
|
|
workflow = Workflow(steps={"summarize": step}, inputs=["input_text"], outputs={"summary": "summarize.summary"}) |
|
|
|
|
|
result = execute_workflow(workflow, {"input_text": "Long text to be summarized..."}) |
|
|
|
|
|
assert_workflow_output( |
|
result, |
|
{ |
|
"final_outputs": {"summary": "This is a summary"}, |
|
"intermediate_outputs": { |
|
"input_text": "Long text to be summarized...", |
|
"summarize.summary": "This is a summary", |
|
}, |
|
"step_contents": {}, |
|
"logprob": None, |
|
}, |
|
) |
|
|
|
mock_execute_step.assert_called_once() |
|
|
|
|
|
@patch("workflows.executors.execute_model_step") |
|
def test_execute_workflow_multi_step(mock_execute_step): |
|
"""Test execution of a multi-step workflow with dependencies.""" |
|
|
|
|
|
def side_effect(step, available_vars, return_full_content=False, logprobs=False): |
|
if step.id == "extract": |
|
return {"outputs": {"entities": ["Apple", "product"]}, "content": None, "logprob": None} |
|
elif step.id == "analyze": |
|
return {"outputs": {"sentiment": "positive"}, "content": None, "logprob": None} |
|
return {"outputs": {}, "content": None, "logprob": None} |
|
|
|
mock_execute_step.side_effect = side_effect |
|
|
|
|
|
extract_step = ModelStep( |
|
id="extract", |
|
name="Extract Entities", |
|
model="gpt-3.5-turbo", |
|
provider="OpenAI", |
|
call_type=CallType.LLM, |
|
system_prompt="Extract entities", |
|
input_fields=[InputField(name="text", description="Text to analyze", variable="input_text")], |
|
output_fields=[OutputField(name="entities", description="Extracted entities", type="list[str]")], |
|
) |
|
|
|
|
|
analyze_step = ModelStep( |
|
id="analyze", |
|
name="Analyze Sentiment", |
|
model="gpt-4", |
|
provider="OpenAI", |
|
call_type=CallType.LLM, |
|
system_prompt="Analyze sentiment", |
|
input_fields=[InputField(name="entities", description="Entities to analyze", variable="extract.entities")], |
|
output_fields=[OutputField(name="sentiment", description="Sentiment analysis", type="str")], |
|
) |
|
|
|
workflow = Workflow( |
|
steps={"extract": extract_step, "analyze": analyze_step}, |
|
inputs=["input_text"], |
|
outputs={"entities": "extract.entities", "sentiment": "analyze.sentiment"}, |
|
) |
|
|
|
|
|
result = execute_workflow(workflow, {"input_text": "Apple is launching a new product tomorrow."}) |
|
|
|
assert_workflow_output( |
|
result, |
|
{ |
|
"final_outputs": {"entities": ["Apple", "product"], "sentiment": "positive"}, |
|
"intermediate_outputs": { |
|
"input_text": "Apple is launching a new product tomorrow.", |
|
"extract.entities": ["Apple", "product"], |
|
"analyze.sentiment": "positive", |
|
}, |
|
"step_contents": {}, |
|
"logprob": None, |
|
}, |
|
) |
|
|
|
|
|
assert mock_execute_step.call_count == 2 |
|
|
|
|
|
def test_execute_workflow_missing_input(): |
|
"""Test that an error is raised when a required input is missing.""" |
|
step = ModelStep( |
|
id="summarize", |
|
name="Summarize Text", |
|
model="gpt-3.5-turbo", |
|
provider="OpenAI", |
|
call_type=CallType.LLM, |
|
system_prompt="Summarize the text", |
|
input_fields=[InputField(name="text", description="Text to summarize", variable="input_text")], |
|
output_fields=[OutputField(name="summary", description="Summary of the text", type="str")], |
|
) |
|
|
|
workflow = Workflow(steps={"summarize": step}, inputs=["input_text"], outputs={"summary": "summarize.summary"}) |
|
|
|
|
|
with pytest.raises(WorkflowError, match="Missing required workflow input"): |
|
execute_workflow(workflow, {}) |
|
|
|
|
|
def test_execute_workflow_cyclic_dependency(): |
|
"""Test that a cyclic dependency in the workflow raises an appropriate error.""" |
|
|
|
|
|
step1 = ModelStep( |
|
id="t1", |
|
name="Test Step 1", |
|
model="gpt-3.5-turbo", |
|
provider="OpenAI", |
|
call_type=CallType.LLM, |
|
system_prompt="Test", |
|
input_fields=[InputField(name="v1", description="", variable="t2.var")], |
|
output_fields=[OutputField(name="out", description="")], |
|
) |
|
step2 = ModelStep( |
|
id="t2", |
|
name="Test Step 2", |
|
model="gpt-3.5-turbo", |
|
provider="OpenAI", |
|
call_type=CallType.LLM, |
|
system_prompt="Test", |
|
input_fields=[InputField(name="v2", description="", variable="t1.out")], |
|
output_fields=[OutputField(name="var", description="")], |
|
) |
|
|
|
workflow = Workflow(steps=[step1, step2], inputs=[], outputs={}) |
|
|
|
|
|
with pytest.raises(CyclicDependencyError): |
|
execute_workflow(workflow, {}) |
|
|
|
|
|
@patch("workflows.executors.execute_model_step") |
|
def test_execute_workflow_with_full_content(mock_execute_step): |
|
"""Test execution of a workflow with return_full_content=True.""" |
|
|
|
mock_result = { |
|
"outputs": {"summary": "This is a summary"}, |
|
"content": "Full model response content", |
|
"logprob": None, |
|
} |
|
mock_execute_step.return_value = mock_result |
|
|
|
|
|
step = ModelStep( |
|
id="summarize", |
|
name="Summarize Text", |
|
model="gpt-3.5-turbo", |
|
provider="OpenAI", |
|
call_type=CallType.LLM, |
|
system_prompt="Summarize the text", |
|
input_fields=[InputField(name="text", description="Text to summarize", variable="input_text")], |
|
output_fields=[OutputField(name="summary", description="Summary of the text", type="str")], |
|
) |
|
|
|
workflow = Workflow(steps=[step], inputs=["input_text"], outputs={"summary": "summarize.summary"}) |
|
|
|
|
|
inputs = {"input_text": "Long text to be summarized..."} |
|
result = execute_workflow(workflow, inputs, return_full_content=True) |
|
|
|
assert_workflow_output( |
|
result, |
|
{ |
|
"final_outputs": {"summary": "This is a summary"}, |
|
"intermediate_outputs": { |
|
"input_text": "Long text to be summarized...", |
|
"summarize.summary": "This is a summary", |
|
}, |
|
"step_contents": {"summarize": "Full model response content"}, |
|
"logprob": None, |
|
}, |
|
) |
|
|
|
|
|
mock_execute_step.assert_called_once_with(step, inputs, return_full_content=True, logprobs=False) |
|
|
|
|
|
@patch("workflows.executors.execute_model_step") |
|
def test_execute_workflow_with_logprob(mock_execute_step): |
|
"""Test execution of a workflow with logprob_step specified.""" |
|
|
|
mock_result = {"outputs": {"summary": "This is a summary"}, "content": None, "logprob": -2.5} |
|
mock_execute_step.return_value = mock_result |
|
|
|
|
|
step = ModelStep( |
|
id="summarize", |
|
name="Summarize Text", |
|
model="gpt-3.5-turbo", |
|
provider="OpenAI", |
|
call_type=CallType.LLM, |
|
system_prompt="Summarize the text", |
|
input_fields=[InputField(name="text", description="Text to summarize", variable="input_text")], |
|
output_fields=[OutputField(name="summary", description="Summary of the text", type="str")], |
|
) |
|
|
|
workflow = Workflow(steps={"summarize": step}, inputs=["input_text"], outputs={"summary": "summarize.summary"}) |
|
|
|
|
|
result = execute_workflow(workflow, {"input_text": "Long text to be summarized..."}, logprob_step="summarize") |
|
|
|
|
|
assert_workflow_output( |
|
result, |
|
{ |
|
"final_outputs": {"summary": "This is a summary"}, |
|
"logprob": -2.5, |
|
"intermediate_outputs": { |
|
"input_text": "Long text to be summarized...", |
|
"summarize.summary": "This is a summary", |
|
}, |
|
"step_contents": {}, |
|
}, |
|
) |
|
|
|
mock_execute_step.assert_called_once() |
|
args, kwargs = mock_execute_step.call_args |
|
assert kwargs["logprobs"] is True |
|
|