quizbowl-submission / tests /test_executors.py
Maharshi Gor
Refactors workflow management and model configurations
0bab47c
import json
from unittest.mock import patch
import pytest
from workflows.errors import CyclicDependencyError, WorkflowError
from workflows.executors import (
create_processed_inputs,
execute_model_step,
execute_workflow,
)
from workflows.structs import CallType, InputField, ModelStep, OutputField, Workflow
# Tests for utility functions
lower = str.lower
upper = str.upper
# Tests for create_processed_inputs
def assert_model_step_result(result: dict, expected_result: dict):
# Verify the results
assert isinstance(result, dict)
assert "outputs" in result
assert "content" in result
assert "logprob" in result
assert result["outputs"] == expected_result["outputs"]
assert result["content"] == expected_result["content"]
assert result["logprob"] == expected_result["logprob"]
def assert_workflow_output(output: dict, expected_output: dict):
assert isinstance(output, dict)
for key in ["final_outputs", "intermediate_outputs", "step_contents", "logprob"]:
assert key in output
assert output[key] == expected_output[key]
def test_create_processed_inputs_basic():
"""Test basic input processing without transformations."""
step = ModelStep(
id="test_step",
name="Test Step",
model="gpt-4",
provider="OpenAI",
call_type=CallType.LLM,
system_prompt="Test prompt",
input_fields=[InputField(name="text", description="Input text", variable="input_text")],
output_fields=[],
)
available_vars = {"input_text": "Hello World"}
result = create_processed_inputs(step, available_vars)
assert result == {"text": "Hello World"}
def test_create_processed_inputs_with_transformation():
"""Test input processing with transformation functions."""
step = ModelStep(
id="test_step",
name="Test Step",
model="gpt-4",
provider="OpenAI",
call_type=CallType.LLM,
system_prompt="Test prompt",
input_fields=[
InputField(name="upper_text", description="Uppercase text", variable="input_text", func="upper"),
InputField(name="lower_text", description="Lowercase text", variable="input_caps", func="lower"),
],
output_fields=[],
)
available_vars = {"input_text": "hello", "input_caps": "WORLD"}
result = create_processed_inputs(step, available_vars)
assert result == {"upper_text": "HELLO", "lower_text": "world"}
def test_create_processed_inputs_missing_var():
"""Test that appropriate error is raised when a variable is missing."""
step = ModelStep(
id="test_step",
name="Test Step",
model="gpt-4",
provider="OpenAI",
call_type=CallType.LLM,
system_prompt="Test prompt",
input_fields=[InputField(name="text", description="Input text", variable="missing_var")],
output_fields=[],
)
available_vars = {"input_text": "Hello World"}
with pytest.raises(KeyError):
create_processed_inputs(step, available_vars)
def test_create_processed_inputs_unknown_func():
"""Test that appropriate error is raised when an unknown function is specified."""
step = ModelStep(
id="test_step",
name="Test Step",
model="gpt-4",
provider="OpenAI",
call_type=CallType.LLM,
system_prompt="Test prompt",
input_fields=[InputField(name="text", description="Input text", variable="input_text", func="unknown_func")],
output_fields=[],
)
available_vars = {"input_text": "Hello World"}
# This should raise an error when the function isn't found
with pytest.raises(Exception):
create_processed_inputs(step, available_vars)
# Tests for execute_model_step
@patch("workflows.executors.completion")
def test_execute_model_step_success(mock_completion):
"""Test successful execution of a model step with mocked litellm response."""
# Mock the litellm response
mock_response = {
"content": json.dumps({"summary": "This is a summary"}),
"output": {"summary": "This is a summary"},
}
mock_completion.return_value = mock_response
# Create a test step
step = ModelStep(
id="summarize",
name="Summarize Text",
model="gpt-3.5-turbo",
provider="OpenAI",
call_type=CallType.LLM,
system_prompt="Summarize the text",
input_fields=[InputField(name="text", description="Text to summarize", variable="input_text")],
output_fields=[OutputField(name="summary", description="Summary of the text", type="str")],
)
# Execute the step
result = execute_model_step(step, {"input_text": "Long text to be summarized..."})
# Verify the results
assert isinstance(result, dict)
assert "outputs" in result
assert "content" in result
assert "logprob" in result
assert result["outputs"] == {"summary": "This is a summary"}
assert result["content"] is None
assert result["logprob"] is None
# Verify the litellm call was made correctly
mock_completion.assert_called_once()
args, kwargs = mock_completion.call_args
assert kwargs["model"] == "OpenAI/gpt-3.5-turbo"
assert "Summarize the text" in kwargs["system"]
@patch("workflows.executors.completion")
def test_execute_model_step_with_full_content(mock_completion):
"""Test execution of a model step with full content returned."""
# Mock the litellm response
mock_response = {
"content": "Full model response content",
"output": {"summary": "This is a summary"},
}
mock_completion.return_value = mock_response
# Create a test step
step = ModelStep(
id="summarize",
name="Summarize Text",
model="gpt-3.5-turbo",
provider="OpenAI",
call_type=CallType.LLM,
system_prompt="Summarize the text",
input_fields=[InputField(name="text", description="Text to summarize", variable="input_text")],
output_fields=[OutputField(name="summary", description="Summary of the text", type="str")],
)
# Execute the step with return_full_content=True
result = execute_model_step(step, {"input_text": "Long text to be summarized..."}, return_full_content=True)
# Verify the results
assert isinstance(result, dict)
assert "outputs" in result
assert "content" in result
assert "logprob" in result
assert result["outputs"] == {"summary": "This is a summary"}
assert result["content"] == "Full model response content"
assert result["logprob"] is None
@patch("workflows.executors.completion")
def test_execute_model_step_with_logprobs(mock_completion):
"""Test execution of a model step with log probabilities."""
# Mock the litellm response with log probability
mock_response = {
"content": json.dumps({"summary": "This is a summary"}),
"output": {"summary": "This is a summary"},
"log_prob": -2.5,
}
mock_completion.return_value = mock_response
# Create a test step
step = ModelStep(
id="summarize",
name="Summarize Text",
model="gpt-3.5-turbo",
provider="OpenAI",
call_type=CallType.LLM,
system_prompt="Summarize the text",
input_fields=[InputField(name="text", description="Text to summarize", variable="input_text")],
output_fields=[OutputField(name="summary", description="Summary of the text", type="str")],
)
# Execute the step with logprobs=True
result = execute_model_step(step, {"input_text": "Long text to be summarized..."}, logprobs=True)
# Verify the results
assert isinstance(result, dict)
assert "outputs" in result
assert "content" in result
assert "logprob" in result
assert result["outputs"] == {"summary": "This is a summary"}
assert result["content"] is None
assert result["logprob"] == -2.5
@patch("workflows.executors.completion")
def test_execute_model_step_error(mock_completion):
"""Test handling of errors in model step execution."""
# Make litellm raise an exception
mock_completion.side_effect = Exception("API Error")
# Create a test step
step = ModelStep(
id="summarize",
name="Summarize Text",
model="gpt-3.5-turbo",
provider="OpenAI",
call_type=CallType.LLM,
system_prompt="Summarize the text",
input_fields=[InputField(name="text", description="Text to summarize", variable="input_text")],
output_fields=[OutputField(name="summary", description="Summary of the text", type="str")],
)
# Execute the step - should raise an exception
with pytest.raises(Exception):
execute_model_step(step, {"input_text": "Long text to be summarized..."})
# Tests for execute_workflow
@patch("workflows.executors.execute_model_step")
def test_execute_workflow_simple(mock_execute_step):
"""Test execution of a simple workflow with a single step."""
# Configure mock to return expected outputs
mock_result = {"outputs": {"summary": "This is a summary"}, "content": None, "logprob": None}
mock_execute_step.return_value = mock_result
# Create a simple workflow
step = ModelStep(
id="summarize",
name="Summarize Text",
model="gpt-3.5-turbo",
provider="OpenAI",
call_type=CallType.LLM,
system_prompt="Summarize the text",
input_fields=[InputField(name="text", description="Text to summarize", variable="input_text")],
output_fields=[OutputField(name="summary", description="Summary of the text", type="str")],
)
workflow = Workflow(steps={"summarize": step}, inputs=["input_text"], outputs={"summary": "summarize.summary"})
# Execute the workflow
result = execute_workflow(workflow, {"input_text": "Long text to be summarized..."})
# Verify the results
assert_workflow_output(
result,
{
"final_outputs": {"summary": "This is a summary"},
"intermediate_outputs": {
"input_text": "Long text to be summarized...",
"summarize.summary": "This is a summary",
},
"step_contents": {},
"logprob": None,
},
)
# Verify execute_model_step was called correctly
mock_execute_step.assert_called_once()
@patch("workflows.executors.execute_model_step")
def test_execute_workflow_multi_step(mock_execute_step):
"""Test execution of a multi-step workflow with dependencies."""
# Configure mock to return different values based on the step
def side_effect(step, available_vars, return_full_content=False, logprobs=False):
if step.id == "extract":
return {"outputs": {"entities": ["Apple", "product"]}, "content": None, "logprob": None}
elif step.id == "analyze":
return {"outputs": {"sentiment": "positive"}, "content": None, "logprob": None}
return {"outputs": {}, "content": None, "logprob": None}
mock_execute_step.side_effect = side_effect
# Create extract step
extract_step = ModelStep(
id="extract",
name="Extract Entities",
model="gpt-3.5-turbo",
provider="OpenAI",
call_type=CallType.LLM,
system_prompt="Extract entities",
input_fields=[InputField(name="text", description="Text to analyze", variable="input_text")],
output_fields=[OutputField(name="entities", description="Extracted entities", type="list[str]")],
)
# Create analyze step that depends on extract step
analyze_step = ModelStep(
id="analyze",
name="Analyze Sentiment",
model="gpt-4",
provider="OpenAI",
call_type=CallType.LLM,
system_prompt="Analyze sentiment",
input_fields=[InputField(name="entities", description="Entities to analyze", variable="extract.entities")],
output_fields=[OutputField(name="sentiment", description="Sentiment analysis", type="str")],
)
workflow = Workflow(
steps={"extract": extract_step, "analyze": analyze_step},
inputs=["input_text"],
outputs={"entities": "extract.entities", "sentiment": "analyze.sentiment"},
)
# Execute the workflow
result = execute_workflow(workflow, {"input_text": "Apple is launching a new product tomorrow."})
assert_workflow_output(
result,
{
"final_outputs": {"entities": ["Apple", "product"], "sentiment": "positive"},
"intermediate_outputs": {
"input_text": "Apple is launching a new product tomorrow.",
"extract.entities": ["Apple", "product"],
"analyze.sentiment": "positive",
},
"step_contents": {},
"logprob": None,
},
)
# Verify execute_model_step was called twice (once for each step)
assert mock_execute_step.call_count == 2
def test_execute_workflow_missing_input():
"""Test that an error is raised when a required input is missing."""
step = ModelStep(
id="summarize",
name="Summarize Text",
model="gpt-3.5-turbo",
provider="OpenAI",
call_type=CallType.LLM,
system_prompt="Summarize the text",
input_fields=[InputField(name="text", description="Text to summarize", variable="input_text")],
output_fields=[OutputField(name="summary", description="Summary of the text", type="str")],
)
workflow = Workflow(steps={"summarize": step}, inputs=["input_text"], outputs={"summary": "summarize.summary"})
# Execute with missing input
with pytest.raises(WorkflowError, match="Missing required workflow input"):
execute_workflow(workflow, {})
def test_execute_workflow_cyclic_dependency():
"""Test that a cyclic dependency in the workflow raises an appropriate error."""
# Make create_dependency_graph raise a CyclicDependencyError
step1 = ModelStep(
id="t1",
name="Test Step 1",
model="gpt-3.5-turbo",
provider="OpenAI",
call_type=CallType.LLM,
system_prompt="Test",
input_fields=[InputField(name="v1", description="", variable="t2.var")],
output_fields=[OutputField(name="out", description="")],
)
step2 = ModelStep(
id="t2",
name="Test Step 2",
model="gpt-3.5-turbo",
provider="OpenAI",
call_type=CallType.LLM,
system_prompt="Test",
input_fields=[InputField(name="v2", description="", variable="t1.out")],
output_fields=[OutputField(name="var", description="")],
)
workflow = Workflow(steps=[step1, step2], inputs=[], outputs={})
# This should propagate the CyclicDependencyError
with pytest.raises(CyclicDependencyError):
execute_workflow(workflow, {})
@patch("workflows.executors.execute_model_step")
def test_execute_workflow_with_full_content(mock_execute_step):
"""Test execution of a workflow with return_full_content=True."""
# Configure mock to return expected outputs and content
mock_result = {
"outputs": {"summary": "This is a summary"},
"content": "Full model response content",
"logprob": None,
}
mock_execute_step.return_value = mock_result
# Create a simple workflow
step = ModelStep(
id="summarize",
name="Summarize Text",
model="gpt-3.5-turbo",
provider="OpenAI",
call_type=CallType.LLM,
system_prompt="Summarize the text",
input_fields=[InputField(name="text", description="Text to summarize", variable="input_text")],
output_fields=[OutputField(name="summary", description="Summary of the text", type="str")],
)
workflow = Workflow(steps=[step], inputs=["input_text"], outputs={"summary": "summarize.summary"})
# Execute the workflow with return_full_content=True
inputs = {"input_text": "Long text to be summarized..."}
result = execute_workflow(workflow, inputs, return_full_content=True)
assert_workflow_output(
result,
{
"final_outputs": {"summary": "This is a summary"},
"intermediate_outputs": {
"input_text": "Long text to be summarized...",
"summarize.summary": "This is a summary",
},
"step_contents": {"summarize": "Full model response content"},
"logprob": None,
},
)
# Verify execute_model_step was called correctly with return_full_content=True
mock_execute_step.assert_called_once_with(step, inputs, return_full_content=True, logprobs=False)
@patch("workflows.executors.execute_model_step")
def test_execute_workflow_with_logprob(mock_execute_step):
"""Test execution of a workflow with logprob_step specified."""
# Configure mock to return expected outputs with logprob
mock_result = {"outputs": {"summary": "This is a summary"}, "content": None, "logprob": -2.5}
mock_execute_step.return_value = mock_result
# Create a simple workflow
step = ModelStep(
id="summarize",
name="Summarize Text",
model="gpt-3.5-turbo",
provider="OpenAI",
call_type=CallType.LLM,
system_prompt="Summarize the text",
input_fields=[InputField(name="text", description="Text to summarize", variable="input_text")],
output_fields=[OutputField(name="summary", description="Summary of the text", type="str")],
)
workflow = Workflow(steps={"summarize": step}, inputs=["input_text"], outputs={"summary": "summarize.summary"})
# Execute the workflow with logprob_step specified
result = execute_workflow(workflow, {"input_text": "Long text to be summarized..."}, logprob_step="summarize")
# Verify the results
assert_workflow_output(
result,
{
"final_outputs": {"summary": "This is a summary"},
"logprob": -2.5,
"intermediate_outputs": {
"input_text": "Long text to be summarized...",
"summarize.summary": "This is a summary",
},
"step_contents": {},
},
)
# Verify execute_model_step was called with logprobs=True
mock_execute_step.assert_called_once()
args, kwargs = mock_execute_step.call_args
assert kwargs["logprobs"] is True