import json from unittest.mock import patch import pytest from workflows.errors import CyclicDependencyError, WorkflowError from workflows.executors import ( create_processed_inputs, execute_model_step, execute_workflow, ) from workflows.structs import CallType, InputField, ModelStep, OutputField, Workflow # Tests for utility functions lower = str.lower upper = str.upper # Tests for create_processed_inputs def assert_model_step_result(result: dict, expected_result: dict): # Verify the results assert isinstance(result, dict) assert "outputs" in result assert "content" in result assert "logprob" in result assert result["outputs"] == expected_result["outputs"] assert result["content"] == expected_result["content"] assert result["logprob"] == expected_result["logprob"] def assert_workflow_output(output: dict, expected_output: dict): assert isinstance(output, dict) for key in ["final_outputs", "intermediate_outputs", "step_contents", "logprob"]: assert key in output assert output[key] == expected_output[key] def test_create_processed_inputs_basic(): """Test basic input processing without transformations.""" step = ModelStep( id="test_step", name="Test Step", model="gpt-4", provider="OpenAI", call_type=CallType.LLM, system_prompt="Test prompt", input_fields=[InputField(name="text", description="Input text", variable="input_text")], output_fields=[], ) available_vars = {"input_text": "Hello World"} result = create_processed_inputs(step, available_vars) assert result == {"text": "Hello World"} def test_create_processed_inputs_with_transformation(): """Test input processing with transformation functions.""" step = ModelStep( id="test_step", name="Test Step", model="gpt-4", provider="OpenAI", call_type=CallType.LLM, system_prompt="Test prompt", input_fields=[ InputField(name="upper_text", description="Uppercase text", variable="input_text", func="upper"), InputField(name="lower_text", description="Lowercase text", variable="input_caps", func="lower"), ], output_fields=[], ) available_vars = {"input_text": "hello", "input_caps": "WORLD"} result = create_processed_inputs(step, available_vars) assert result == {"upper_text": "HELLO", "lower_text": "world"} def test_create_processed_inputs_missing_var(): """Test that appropriate error is raised when a variable is missing.""" step = ModelStep( id="test_step", name="Test Step", model="gpt-4", provider="OpenAI", call_type=CallType.LLM, system_prompt="Test prompt", input_fields=[InputField(name="text", description="Input text", variable="missing_var")], output_fields=[], ) available_vars = {"input_text": "Hello World"} with pytest.raises(KeyError): create_processed_inputs(step, available_vars) def test_create_processed_inputs_unknown_func(): """Test that appropriate error is raised when an unknown function is specified.""" step = ModelStep( id="test_step", name="Test Step", model="gpt-4", provider="OpenAI", call_type=CallType.LLM, system_prompt="Test prompt", input_fields=[InputField(name="text", description="Input text", variable="input_text", func="unknown_func")], output_fields=[], ) available_vars = {"input_text": "Hello World"} # This should raise an error when the function isn't found with pytest.raises(Exception): create_processed_inputs(step, available_vars) # Tests for execute_model_step @patch("workflows.executors.completion") def test_execute_model_step_success(mock_completion): """Test successful execution of a model step with mocked litellm response.""" # Mock the litellm response mock_response = { "content": json.dumps({"summary": "This is a summary"}), "output": {"summary": "This is a summary"}, } mock_completion.return_value = mock_response # Create a test step step = ModelStep( id="summarize", name="Summarize Text", model="gpt-3.5-turbo", provider="OpenAI", call_type=CallType.LLM, system_prompt="Summarize the text", input_fields=[InputField(name="text", description="Text to summarize", variable="input_text")], output_fields=[OutputField(name="summary", description="Summary of the text", type="str")], ) # Execute the step result = execute_model_step(step, {"input_text": "Long text to be summarized..."}) # Verify the results assert isinstance(result, dict) assert "outputs" in result assert "content" in result assert "logprob" in result assert result["outputs"] == {"summary": "This is a summary"} assert result["content"] is None assert result["logprob"] is None # Verify the litellm call was made correctly mock_completion.assert_called_once() args, kwargs = mock_completion.call_args assert kwargs["model"] == "OpenAI/gpt-3.5-turbo" assert "Summarize the text" in kwargs["system"] @patch("workflows.executors.completion") def test_execute_model_step_with_full_content(mock_completion): """Test execution of a model step with full content returned.""" # Mock the litellm response mock_response = { "content": "Full model response content", "output": {"summary": "This is a summary"}, } mock_completion.return_value = mock_response # Create a test step step = ModelStep( id="summarize", name="Summarize Text", model="gpt-3.5-turbo", provider="OpenAI", call_type=CallType.LLM, system_prompt="Summarize the text", input_fields=[InputField(name="text", description="Text to summarize", variable="input_text")], output_fields=[OutputField(name="summary", description="Summary of the text", type="str")], ) # Execute the step with return_full_content=True result = execute_model_step(step, {"input_text": "Long text to be summarized..."}, return_full_content=True) # Verify the results assert isinstance(result, dict) assert "outputs" in result assert "content" in result assert "logprob" in result assert result["outputs"] == {"summary": "This is a summary"} assert result["content"] == "Full model response content" assert result["logprob"] is None @patch("workflows.executors.completion") def test_execute_model_step_with_logprobs(mock_completion): """Test execution of a model step with log probabilities.""" # Mock the litellm response with log probability mock_response = { "content": json.dumps({"summary": "This is a summary"}), "output": {"summary": "This is a summary"}, "log_prob": -2.5, } mock_completion.return_value = mock_response # Create a test step step = ModelStep( id="summarize", name="Summarize Text", model="gpt-3.5-turbo", provider="OpenAI", call_type=CallType.LLM, system_prompt="Summarize the text", input_fields=[InputField(name="text", description="Text to summarize", variable="input_text")], output_fields=[OutputField(name="summary", description="Summary of the text", type="str")], ) # Execute the step with logprobs=True result = execute_model_step(step, {"input_text": "Long text to be summarized..."}, logprobs=True) # Verify the results assert isinstance(result, dict) assert "outputs" in result assert "content" in result assert "logprob" in result assert result["outputs"] == {"summary": "This is a summary"} assert result["content"] is None assert result["logprob"] == -2.5 @patch("workflows.executors.completion") def test_execute_model_step_error(mock_completion): """Test handling of errors in model step execution.""" # Make litellm raise an exception mock_completion.side_effect = Exception("API Error") # Create a test step step = ModelStep( id="summarize", name="Summarize Text", model="gpt-3.5-turbo", provider="OpenAI", call_type=CallType.LLM, system_prompt="Summarize the text", input_fields=[InputField(name="text", description="Text to summarize", variable="input_text")], output_fields=[OutputField(name="summary", description="Summary of the text", type="str")], ) # Execute the step - should raise an exception with pytest.raises(Exception): execute_model_step(step, {"input_text": "Long text to be summarized..."}) # Tests for execute_workflow @patch("workflows.executors.execute_model_step") def test_execute_workflow_simple(mock_execute_step): """Test execution of a simple workflow with a single step.""" # Configure mock to return expected outputs mock_result = {"outputs": {"summary": "This is a summary"}, "content": None, "logprob": None} mock_execute_step.return_value = mock_result # Create a simple workflow step = ModelStep( id="summarize", name="Summarize Text", model="gpt-3.5-turbo", provider="OpenAI", call_type=CallType.LLM, system_prompt="Summarize the text", input_fields=[InputField(name="text", description="Text to summarize", variable="input_text")], output_fields=[OutputField(name="summary", description="Summary of the text", type="str")], ) workflow = Workflow(steps={"summarize": step}, inputs=["input_text"], outputs={"summary": "summarize.summary"}) # Execute the workflow result = execute_workflow(workflow, {"input_text": "Long text to be summarized..."}) # Verify the results assert_workflow_output( result, { "final_outputs": {"summary": "This is a summary"}, "intermediate_outputs": { "input_text": "Long text to be summarized...", "summarize.summary": "This is a summary", }, "step_contents": {}, "logprob": None, }, ) # Verify execute_model_step was called correctly mock_execute_step.assert_called_once() @patch("workflows.executors.execute_model_step") def test_execute_workflow_multi_step(mock_execute_step): """Test execution of a multi-step workflow with dependencies.""" # Configure mock to return different values based on the step def side_effect(step, available_vars, return_full_content=False, logprobs=False): if step.id == "extract": return {"outputs": {"entities": ["Apple", "product"]}, "content": None, "logprob": None} elif step.id == "analyze": return {"outputs": {"sentiment": "positive"}, "content": None, "logprob": None} return {"outputs": {}, "content": None, "logprob": None} mock_execute_step.side_effect = side_effect # Create extract step extract_step = ModelStep( id="extract", name="Extract Entities", model="gpt-3.5-turbo", provider="OpenAI", call_type=CallType.LLM, system_prompt="Extract entities", input_fields=[InputField(name="text", description="Text to analyze", variable="input_text")], output_fields=[OutputField(name="entities", description="Extracted entities", type="list[str]")], ) # Create analyze step that depends on extract step analyze_step = ModelStep( id="analyze", name="Analyze Sentiment", model="gpt-4", provider="OpenAI", call_type=CallType.LLM, system_prompt="Analyze sentiment", input_fields=[InputField(name="entities", description="Entities to analyze", variable="extract.entities")], output_fields=[OutputField(name="sentiment", description="Sentiment analysis", type="str")], ) workflow = Workflow( steps={"extract": extract_step, "analyze": analyze_step}, inputs=["input_text"], outputs={"entities": "extract.entities", "sentiment": "analyze.sentiment"}, ) # Execute the workflow result = execute_workflow(workflow, {"input_text": "Apple is launching a new product tomorrow."}) assert_workflow_output( result, { "final_outputs": {"entities": ["Apple", "product"], "sentiment": "positive"}, "intermediate_outputs": { "input_text": "Apple is launching a new product tomorrow.", "extract.entities": ["Apple", "product"], "analyze.sentiment": "positive", }, "step_contents": {}, "logprob": None, }, ) # Verify execute_model_step was called twice (once for each step) assert mock_execute_step.call_count == 2 def test_execute_workflow_missing_input(): """Test that an error is raised when a required input is missing.""" step = ModelStep( id="summarize", name="Summarize Text", model="gpt-3.5-turbo", provider="OpenAI", call_type=CallType.LLM, system_prompt="Summarize the text", input_fields=[InputField(name="text", description="Text to summarize", variable="input_text")], output_fields=[OutputField(name="summary", description="Summary of the text", type="str")], ) workflow = Workflow(steps={"summarize": step}, inputs=["input_text"], outputs={"summary": "summarize.summary"}) # Execute with missing input with pytest.raises(WorkflowError, match="Missing required workflow input"): execute_workflow(workflow, {}) def test_execute_workflow_cyclic_dependency(): """Test that a cyclic dependency in the workflow raises an appropriate error.""" # Make create_dependency_graph raise a CyclicDependencyError step1 = ModelStep( id="t1", name="Test Step 1", model="gpt-3.5-turbo", provider="OpenAI", call_type=CallType.LLM, system_prompt="Test", input_fields=[InputField(name="v1", description="", variable="t2.var")], output_fields=[OutputField(name="out", description="")], ) step2 = ModelStep( id="t2", name="Test Step 2", model="gpt-3.5-turbo", provider="OpenAI", call_type=CallType.LLM, system_prompt="Test", input_fields=[InputField(name="v2", description="", variable="t1.out")], output_fields=[OutputField(name="var", description="")], ) workflow = Workflow(steps=[step1, step2], inputs=[], outputs={}) # This should propagate the CyclicDependencyError with pytest.raises(CyclicDependencyError): execute_workflow(workflow, {}) @patch("workflows.executors.execute_model_step") def test_execute_workflow_with_full_content(mock_execute_step): """Test execution of a workflow with return_full_content=True.""" # Configure mock to return expected outputs and content mock_result = { "outputs": {"summary": "This is a summary"}, "content": "Full model response content", "logprob": None, } mock_execute_step.return_value = mock_result # Create a simple workflow step = ModelStep( id="summarize", name="Summarize Text", model="gpt-3.5-turbo", provider="OpenAI", call_type=CallType.LLM, system_prompt="Summarize the text", input_fields=[InputField(name="text", description="Text to summarize", variable="input_text")], output_fields=[OutputField(name="summary", description="Summary of the text", type="str")], ) workflow = Workflow(steps=[step], inputs=["input_text"], outputs={"summary": "summarize.summary"}) # Execute the workflow with return_full_content=True inputs = {"input_text": "Long text to be summarized..."} result = execute_workflow(workflow, inputs, return_full_content=True) assert_workflow_output( result, { "final_outputs": {"summary": "This is a summary"}, "intermediate_outputs": { "input_text": "Long text to be summarized...", "summarize.summary": "This is a summary", }, "step_contents": {"summarize": "Full model response content"}, "logprob": None, }, ) # Verify execute_model_step was called correctly with return_full_content=True mock_execute_step.assert_called_once_with(step, inputs, return_full_content=True, logprobs=False) @patch("workflows.executors.execute_model_step") def test_execute_workflow_with_logprob(mock_execute_step): """Test execution of a workflow with logprob_step specified.""" # Configure mock to return expected outputs with logprob mock_result = {"outputs": {"summary": "This is a summary"}, "content": None, "logprob": -2.5} mock_execute_step.return_value = mock_result # Create a simple workflow step = ModelStep( id="summarize", name="Summarize Text", model="gpt-3.5-turbo", provider="OpenAI", call_type=CallType.LLM, system_prompt="Summarize the text", input_fields=[InputField(name="text", description="Text to summarize", variable="input_text")], output_fields=[OutputField(name="summary", description="Summary of the text", type="str")], ) workflow = Workflow(steps={"summarize": step}, inputs=["input_text"], outputs={"summary": "summarize.summary"}) # Execute the workflow with logprob_step specified result = execute_workflow(workflow, {"input_text": "Long text to be summarized..."}, logprob_step="summarize") # Verify the results assert_workflow_output( result, { "final_outputs": {"summary": "This is a summary"}, "logprob": -2.5, "intermediate_outputs": { "input_text": "Long text to be summarized...", "summarize.summary": "This is a summary", }, "step_contents": {}, }, ) # Verify execute_model_step was called with logprobs=True mock_execute_step.assert_called_once() args, kwargs = mock_execute_step.call_args assert kwargs["logprobs"] is True