Spaces:
Running
Running
import json | |
from unittest.mock import patch | |
import pytest | |
from workflows.errors import CyclicDependencyError, WorkflowError | |
from workflows.executors import ( | |
create_processed_inputs, | |
execute_model_step, | |
execute_workflow, | |
) | |
from workflows.structs import CallType, InputField, ModelStep, OutputField, Workflow | |
# Tests for utility functions | |
lower = str.lower | |
upper = str.upper | |
# Tests for create_processed_inputs | |
def assert_model_step_result(result: dict, expected_result: dict): | |
# Verify the results | |
assert isinstance(result, dict) | |
assert "outputs" in result | |
assert "content" in result | |
assert "logprob" in result | |
assert result["outputs"] == expected_result["outputs"] | |
assert result["content"] == expected_result["content"] | |
assert result["logprob"] == expected_result["logprob"] | |
def assert_workflow_output(output: dict, expected_output: dict): | |
assert isinstance(output, dict) | |
for key in ["final_outputs", "intermediate_outputs", "step_contents", "logprob"]: | |
assert key in output | |
assert output[key] == expected_output[key] | |
def test_create_processed_inputs_basic(): | |
"""Test basic input processing without transformations.""" | |
step = ModelStep( | |
id="test_step", | |
name="Test Step", | |
model="gpt-4", | |
provider="OpenAI", | |
call_type=CallType.LLM, | |
system_prompt="Test prompt", | |
input_fields=[InputField(name="text", description="Input text", variable="input_text")], | |
output_fields=[], | |
) | |
available_vars = {"input_text": "Hello World"} | |
result = create_processed_inputs(step, available_vars) | |
assert result == {"text": "Hello World"} | |
def test_create_processed_inputs_with_transformation(): | |
"""Test input processing with transformation functions.""" | |
step = ModelStep( | |
id="test_step", | |
name="Test Step", | |
model="gpt-4", | |
provider="OpenAI", | |
call_type=CallType.LLM, | |
system_prompt="Test prompt", | |
input_fields=[ | |
InputField(name="upper_text", description="Uppercase text", variable="input_text", func="upper"), | |
InputField(name="lower_text", description="Lowercase text", variable="input_caps", func="lower"), | |
], | |
output_fields=[], | |
) | |
available_vars = {"input_text": "hello", "input_caps": "WORLD"} | |
result = create_processed_inputs(step, available_vars) | |
assert result == {"upper_text": "HELLO", "lower_text": "world"} | |
def test_create_processed_inputs_missing_var(): | |
"""Test that appropriate error is raised when a variable is missing.""" | |
step = ModelStep( | |
id="test_step", | |
name="Test Step", | |
model="gpt-4", | |
provider="OpenAI", | |
call_type=CallType.LLM, | |
system_prompt="Test prompt", | |
input_fields=[InputField(name="text", description="Input text", variable="missing_var")], | |
output_fields=[], | |
) | |
available_vars = {"input_text": "Hello World"} | |
with pytest.raises(KeyError): | |
create_processed_inputs(step, available_vars) | |
def test_create_processed_inputs_unknown_func(): | |
"""Test that appropriate error is raised when an unknown function is specified.""" | |
step = ModelStep( | |
id="test_step", | |
name="Test Step", | |
model="gpt-4", | |
provider="OpenAI", | |
call_type=CallType.LLM, | |
system_prompt="Test prompt", | |
input_fields=[InputField(name="text", description="Input text", variable="input_text", func="unknown_func")], | |
output_fields=[], | |
) | |
available_vars = {"input_text": "Hello World"} | |
# This should raise an error when the function isn't found | |
with pytest.raises(Exception): | |
create_processed_inputs(step, available_vars) | |
# Tests for execute_model_step | |
def test_execute_model_step_success(mock_completion): | |
"""Test successful execution of a model step with mocked litellm response.""" | |
# Mock the litellm response | |
mock_response = { | |
"content": json.dumps({"summary": "This is a summary"}), | |
"output": {"summary": "This is a summary"}, | |
} | |
mock_completion.return_value = mock_response | |
# Create a test step | |
step = ModelStep( | |
id="summarize", | |
name="Summarize Text", | |
model="gpt-3.5-turbo", | |
provider="OpenAI", | |
call_type=CallType.LLM, | |
system_prompt="Summarize the text", | |
input_fields=[InputField(name="text", description="Text to summarize", variable="input_text")], | |
output_fields=[OutputField(name="summary", description="Summary of the text", type="str")], | |
) | |
# Execute the step | |
result = execute_model_step(step, {"input_text": "Long text to be summarized..."}) | |
# Verify the results | |
assert isinstance(result, dict) | |
assert "outputs" in result | |
assert "content" in result | |
assert "logprob" in result | |
assert result["outputs"] == {"summary": "This is a summary"} | |
assert result["content"] is None | |
assert result["logprob"] is None | |
# Verify the litellm call was made correctly | |
mock_completion.assert_called_once() | |
args, kwargs = mock_completion.call_args | |
assert kwargs["model"] == "OpenAI/gpt-3.5-turbo" | |
assert "Summarize the text" in kwargs["system"] | |
def test_execute_model_step_with_full_content(mock_completion): | |
"""Test execution of a model step with full content returned.""" | |
# Mock the litellm response | |
mock_response = { | |
"content": "Full model response content", | |
"output": {"summary": "This is a summary"}, | |
} | |
mock_completion.return_value = mock_response | |
# Create a test step | |
step = ModelStep( | |
id="summarize", | |
name="Summarize Text", | |
model="gpt-3.5-turbo", | |
provider="OpenAI", | |
call_type=CallType.LLM, | |
system_prompt="Summarize the text", | |
input_fields=[InputField(name="text", description="Text to summarize", variable="input_text")], | |
output_fields=[OutputField(name="summary", description="Summary of the text", type="str")], | |
) | |
# Execute the step with return_full_content=True | |
result = execute_model_step(step, {"input_text": "Long text to be summarized..."}, return_full_content=True) | |
# Verify the results | |
assert isinstance(result, dict) | |
assert "outputs" in result | |
assert "content" in result | |
assert "logprob" in result | |
assert result["outputs"] == {"summary": "This is a summary"} | |
assert result["content"] == "Full model response content" | |
assert result["logprob"] is None | |
def test_execute_model_step_with_logprobs(mock_completion): | |
"""Test execution of a model step with log probabilities.""" | |
# Mock the litellm response with log probability | |
mock_response = { | |
"content": json.dumps({"summary": "This is a summary"}), | |
"output": {"summary": "This is a summary"}, | |
"log_prob": -2.5, | |
} | |
mock_completion.return_value = mock_response | |
# Create a test step | |
step = ModelStep( | |
id="summarize", | |
name="Summarize Text", | |
model="gpt-3.5-turbo", | |
provider="OpenAI", | |
call_type=CallType.LLM, | |
system_prompt="Summarize the text", | |
input_fields=[InputField(name="text", description="Text to summarize", variable="input_text")], | |
output_fields=[OutputField(name="summary", description="Summary of the text", type="str")], | |
) | |
# Execute the step with logprobs=True | |
result = execute_model_step(step, {"input_text": "Long text to be summarized..."}, logprobs=True) | |
# Verify the results | |
assert isinstance(result, dict) | |
assert "outputs" in result | |
assert "content" in result | |
assert "logprob" in result | |
assert result["outputs"] == {"summary": "This is a summary"} | |
assert result["content"] is None | |
assert result["logprob"] == -2.5 | |
def test_execute_model_step_error(mock_completion): | |
"""Test handling of errors in model step execution.""" | |
# Make litellm raise an exception | |
mock_completion.side_effect = Exception("API Error") | |
# Create a test step | |
step = ModelStep( | |
id="summarize", | |
name="Summarize Text", | |
model="gpt-3.5-turbo", | |
provider="OpenAI", | |
call_type=CallType.LLM, | |
system_prompt="Summarize the text", | |
input_fields=[InputField(name="text", description="Text to summarize", variable="input_text")], | |
output_fields=[OutputField(name="summary", description="Summary of the text", type="str")], | |
) | |
# Execute the step - should raise an exception | |
with pytest.raises(Exception): | |
execute_model_step(step, {"input_text": "Long text to be summarized..."}) | |
# Tests for execute_workflow | |
def test_execute_workflow_simple(mock_execute_step): | |
"""Test execution of a simple workflow with a single step.""" | |
# Configure mock to return expected outputs | |
mock_result = {"outputs": {"summary": "This is a summary"}, "content": None, "logprob": None} | |
mock_execute_step.return_value = mock_result | |
# Create a simple workflow | |
step = ModelStep( | |
id="summarize", | |
name="Summarize Text", | |
model="gpt-3.5-turbo", | |
provider="OpenAI", | |
call_type=CallType.LLM, | |
system_prompt="Summarize the text", | |
input_fields=[InputField(name="text", description="Text to summarize", variable="input_text")], | |
output_fields=[OutputField(name="summary", description="Summary of the text", type="str")], | |
) | |
workflow = Workflow(steps={"summarize": step}, inputs=["input_text"], outputs={"summary": "summarize.summary"}) | |
# Execute the workflow | |
result = execute_workflow(workflow, {"input_text": "Long text to be summarized..."}) | |
# Verify the results | |
assert_workflow_output( | |
result, | |
{ | |
"final_outputs": {"summary": "This is a summary"}, | |
"intermediate_outputs": { | |
"input_text": "Long text to be summarized...", | |
"summarize.summary": "This is a summary", | |
}, | |
"step_contents": {}, | |
"logprob": None, | |
}, | |
) | |
# Verify execute_model_step was called correctly | |
mock_execute_step.assert_called_once() | |
def test_execute_workflow_multi_step(mock_execute_step): | |
"""Test execution of a multi-step workflow with dependencies.""" | |
# Configure mock to return different values based on the step | |
def side_effect(step, available_vars, return_full_content=False, logprobs=False): | |
if step.id == "extract": | |
return {"outputs": {"entities": ["Apple", "product"]}, "content": None, "logprob": None} | |
elif step.id == "analyze": | |
return {"outputs": {"sentiment": "positive"}, "content": None, "logprob": None} | |
return {"outputs": {}, "content": None, "logprob": None} | |
mock_execute_step.side_effect = side_effect | |
# Create extract step | |
extract_step = ModelStep( | |
id="extract", | |
name="Extract Entities", | |
model="gpt-3.5-turbo", | |
provider="OpenAI", | |
call_type=CallType.LLM, | |
system_prompt="Extract entities", | |
input_fields=[InputField(name="text", description="Text to analyze", variable="input_text")], | |
output_fields=[OutputField(name="entities", description="Extracted entities", type="list[str]")], | |
) | |
# Create analyze step that depends on extract step | |
analyze_step = ModelStep( | |
id="analyze", | |
name="Analyze Sentiment", | |
model="gpt-4", | |
provider="OpenAI", | |
call_type=CallType.LLM, | |
system_prompt="Analyze sentiment", | |
input_fields=[InputField(name="entities", description="Entities to analyze", variable="extract.entities")], | |
output_fields=[OutputField(name="sentiment", description="Sentiment analysis", type="str")], | |
) | |
workflow = Workflow( | |
steps={"extract": extract_step, "analyze": analyze_step}, | |
inputs=["input_text"], | |
outputs={"entities": "extract.entities", "sentiment": "analyze.sentiment"}, | |
) | |
# Execute the workflow | |
result = execute_workflow(workflow, {"input_text": "Apple is launching a new product tomorrow."}) | |
assert_workflow_output( | |
result, | |
{ | |
"final_outputs": {"entities": ["Apple", "product"], "sentiment": "positive"}, | |
"intermediate_outputs": { | |
"input_text": "Apple is launching a new product tomorrow.", | |
"extract.entities": ["Apple", "product"], | |
"analyze.sentiment": "positive", | |
}, | |
"step_contents": {}, | |
"logprob": None, | |
}, | |
) | |
# Verify execute_model_step was called twice (once for each step) | |
assert mock_execute_step.call_count == 2 | |
def test_execute_workflow_missing_input(): | |
"""Test that an error is raised when a required input is missing.""" | |
step = ModelStep( | |
id="summarize", | |
name="Summarize Text", | |
model="gpt-3.5-turbo", | |
provider="OpenAI", | |
call_type=CallType.LLM, | |
system_prompt="Summarize the text", | |
input_fields=[InputField(name="text", description="Text to summarize", variable="input_text")], | |
output_fields=[OutputField(name="summary", description="Summary of the text", type="str")], | |
) | |
workflow = Workflow(steps={"summarize": step}, inputs=["input_text"], outputs={"summary": "summarize.summary"}) | |
# Execute with missing input | |
with pytest.raises(WorkflowError, match="Missing required workflow input"): | |
execute_workflow(workflow, {}) | |
def test_execute_workflow_cyclic_dependency(): | |
"""Test that a cyclic dependency in the workflow raises an appropriate error.""" | |
# Make create_dependency_graph raise a CyclicDependencyError | |
step1 = ModelStep( | |
id="t1", | |
name="Test Step 1", | |
model="gpt-3.5-turbo", | |
provider="OpenAI", | |
call_type=CallType.LLM, | |
system_prompt="Test", | |
input_fields=[InputField(name="v1", description="", variable="t2.var")], | |
output_fields=[OutputField(name="out", description="")], | |
) | |
step2 = ModelStep( | |
id="t2", | |
name="Test Step 2", | |
model="gpt-3.5-turbo", | |
provider="OpenAI", | |
call_type=CallType.LLM, | |
system_prompt="Test", | |
input_fields=[InputField(name="v2", description="", variable="t1.out")], | |
output_fields=[OutputField(name="var", description="")], | |
) | |
workflow = Workflow(steps=[step1, step2], inputs=[], outputs={}) | |
# This should propagate the CyclicDependencyError | |
with pytest.raises(CyclicDependencyError): | |
execute_workflow(workflow, {}) | |
def test_execute_workflow_with_full_content(mock_execute_step): | |
"""Test execution of a workflow with return_full_content=True.""" | |
# Configure mock to return expected outputs and content | |
mock_result = { | |
"outputs": {"summary": "This is a summary"}, | |
"content": "Full model response content", | |
"logprob": None, | |
} | |
mock_execute_step.return_value = mock_result | |
# Create a simple workflow | |
step = ModelStep( | |
id="summarize", | |
name="Summarize Text", | |
model="gpt-3.5-turbo", | |
provider="OpenAI", | |
call_type=CallType.LLM, | |
system_prompt="Summarize the text", | |
input_fields=[InputField(name="text", description="Text to summarize", variable="input_text")], | |
output_fields=[OutputField(name="summary", description="Summary of the text", type="str")], | |
) | |
workflow = Workflow(steps=[step], inputs=["input_text"], outputs={"summary": "summarize.summary"}) | |
# Execute the workflow with return_full_content=True | |
inputs = {"input_text": "Long text to be summarized..."} | |
result = execute_workflow(workflow, inputs, return_full_content=True) | |
assert_workflow_output( | |
result, | |
{ | |
"final_outputs": {"summary": "This is a summary"}, | |
"intermediate_outputs": { | |
"input_text": "Long text to be summarized...", | |
"summarize.summary": "This is a summary", | |
}, | |
"step_contents": {"summarize": "Full model response content"}, | |
"logprob": None, | |
}, | |
) | |
# Verify execute_model_step was called correctly with return_full_content=True | |
mock_execute_step.assert_called_once_with(step, inputs, return_full_content=True, logprobs=False) | |
def test_execute_workflow_with_logprob(mock_execute_step): | |
"""Test execution of a workflow with logprob_step specified.""" | |
# Configure mock to return expected outputs with logprob | |
mock_result = {"outputs": {"summary": "This is a summary"}, "content": None, "logprob": -2.5} | |
mock_execute_step.return_value = mock_result | |
# Create a simple workflow | |
step = ModelStep( | |
id="summarize", | |
name="Summarize Text", | |
model="gpt-3.5-turbo", | |
provider="OpenAI", | |
call_type=CallType.LLM, | |
system_prompt="Summarize the text", | |
input_fields=[InputField(name="text", description="Text to summarize", variable="input_text")], | |
output_fields=[OutputField(name="summary", description="Summary of the text", type="str")], | |
) | |
workflow = Workflow(steps={"summarize": step}, inputs=["input_text"], outputs={"summary": "summarize.summary"}) | |
# Execute the workflow with logprob_step specified | |
result = execute_workflow(workflow, {"input_text": "Long text to be summarized..."}, logprob_step="summarize") | |
# Verify the results | |
assert_workflow_output( | |
result, | |
{ | |
"final_outputs": {"summary": "This is a summary"}, | |
"logprob": -2.5, | |
"intermediate_outputs": { | |
"input_text": "Long text to be summarized...", | |
"summarize.summary": "This is a summary", | |
}, | |
"step_contents": {}, | |
}, | |
) | |
# Verify execute_model_step was called with logprobs=True | |
mock_execute_step.assert_called_once() | |
args, kwargs = mock_execute_step.call_args | |
assert kwargs["logprobs"] is True | |