import json from unittest.mock import patch import pytest from workflows.errors import CyclicDependencyError, WorkflowError from workflows.executors import ( create_processed_inputs, execute_model_step, execute_workflow, lower, upper, ) from workflows.structs import InputField, ModelStep, OutputField, Workflow # Tests for utility functions def test_upper(): """Test the upper function with different input types.""" assert upper("hello") == "HELLO" assert upper("Hello World") == "HELLO WORLD" assert upper("") == "" # Non-string inputs should be returned unchanged assert upper(123) == 123 assert upper([1, 2, 3]) == [1, 2, 3] assert upper(None) is None def test_lower(): """Test the lower function with different input types.""" assert lower("HELLO") == "hello" assert lower("Hello World") == "hello world" assert lower("") == "" # Non-string inputs should be returned unchanged assert lower(123) == 123 assert lower([1, 2, 3]) == [1, 2, 3] assert lower(None) is None # Tests for create_processed_inputs def test_create_processed_inputs_basic(): """Test basic input processing without transformations.""" step = ModelStep( id="test_step", model="gpt-4", provider="openai", call_type="llm", system_prompt="Test prompt", input_fields=[InputField(name="text", description="Input text", variable="input_text")], output_fields=[], ) available_vars = {"input_text": "Hello World"} result = create_processed_inputs(step, available_vars) assert result == {"text": "Hello World"} def test_create_processed_inputs_with_transformation(): """Test input processing with transformation functions.""" step = ModelStep( id="test_step", model="gpt-4", provider="openai", call_type="llm", system_prompt="Test prompt", input_fields=[ InputField(name="upper_text", description="Uppercase text", variable="input_text", func="upper"), InputField(name="lower_text", description="Lowercase text", variable="input_caps", func="lower"), ], output_fields=[], ) available_vars = {"input_text": "hello", "input_caps": "WORLD"} result = create_processed_inputs(step, available_vars) assert result == {"upper_text": "HELLO", "lower_text": "world"} def test_create_processed_inputs_missing_var(): """Test that appropriate error is raised when a variable is missing.""" step = ModelStep( id="test_step", model="gpt-4", provider="openai", call_type="llm", system_prompt="Test prompt", input_fields=[InputField(name="text", description="Input text", variable="missing_var")], output_fields=[], ) available_vars = {"input_text": "Hello World"} with pytest.raises(KeyError): create_processed_inputs(step, available_vars) def test_create_processed_inputs_unknown_func(): """Test that appropriate error is raised when an unknown function is specified.""" step = ModelStep( id="test_step", model="gpt-4", provider="openai", call_type="llm", system_prompt="Test prompt", input_fields=[InputField(name="text", description="Input text", variable="input_text", func="unknown_func")], output_fields=[], ) available_vars = {"input_text": "Hello World"} # This should raise an error when the function isn't found with pytest.raises(Exception): create_processed_inputs(step, available_vars) # Tests for execute_model_step @patch("workflows.executors.litellm.completion") def test_execute_model_step_success(mock_completion): """Test successful execution of a model step with mocked litellm response.""" # Mock the litellm response mock_response = {"choices": [{"message": {"content": json.dumps({"summary": "This is a summary"})}}]} mock_completion.return_value = mock_response # Create a test step step = ModelStep( id="summarize", model="gpt-3.5-turbo", provider="openai", call_type="llm", system_prompt="Summarize the text", input_fields=[InputField(name="text", description="Text to summarize", variable="input_text")], output_fields=[OutputField(name="summary", description="Summary of the text", type="str")], ) # Execute the step result = execute_model_step(step, {"input_text": "Long text to be summarized..."}) # Verify the results assert result == {"summary": "This is a summary"} # Verify the litellm call was made correctly mock_completion.assert_called_once() args, kwargs = mock_completion.call_args assert kwargs["model"] == "gpt-3.5-turbo" assert "Summarize the text" in kwargs["messages"][0]["content"] @patch("workflows.executors.litellm.completion") def test_execute_model_step_error(mock_completion): """Test handling of errors in model step execution.""" # Make litellm raise an exception mock_completion.side_effect = Exception("API Error") # Create a test step step = ModelStep( id="summarize", model="gpt-3.5-turbo", provider="openai", call_type="llm", system_prompt="Summarize the text", input_fields=[InputField(name="text", description="Text to summarize", variable="input_text")], output_fields=[OutputField(name="summary", description="Summary of the text", type="str")], ) # Execute the step - should raise an exception with pytest.raises(Exception): execute_model_step(step, {"input_text": "Long text to be summarized..."}) # Tests for execute_workflow @patch("workflows.executors.execute_model_step") def test_execute_workflow_simple(mock_execute_step): """Test execution of a simple workflow with a single step.""" # Configure mock to return expected outputs mock_execute_step.return_value = {"summary": "This is a summary"} # Create a simple workflow step = ModelStep( id="summarize", model="gpt-3.5-turbo", provider="openai", call_type="llm", system_prompt="Summarize the text", input_fields=[InputField(name="text", description="Text to summarize", variable="input_text")], output_fields=[OutputField(name="summary", description="Summary of the text", type="str")], ) workflow = Workflow(steps={"summarize": step}, inputs=["input_text"], outputs={"summary": "summarize.summary"}) # Execute the workflow result = execute_workflow(workflow, {"input_text": "Long text to be summarized..."}) # Verify the results assert result == {"summary": "This is a summary"} # Verify execute_model_step was called correctly mock_execute_step.assert_called_once() @patch("workflows.executors.execute_model_step") def test_execute_workflow_multi_step(mock_execute_step): """Test execution of a multi-step workflow with dependencies.""" # Configure mock to return different values based on the step def side_effect(step, available_vars): if step.id == "extract": return {"entities": ["Apple", "product"]} elif step.id == "analyze": return {"sentiment": "positive"} return {} mock_execute_step.side_effect = side_effect # Create extract step extract_step = ModelStep( id="extract", model="gpt-3.5-turbo", provider="openai", call_type="llm", system_prompt="Extract entities", input_fields=[InputField(name="text", description="Text to analyze", variable="input_text")], output_fields=[OutputField(name="entities", description="Extracted entities", type="list[str]")], ) # Create analyze step that depends on extract step analyze_step = ModelStep( id="analyze", model="gpt-4", provider="openai", call_type="llm", system_prompt="Analyze sentiment", input_fields=[InputField(name="entities", description="Entities to analyze", variable="extract.entities")], output_fields=[OutputField(name="sentiment", description="Sentiment analysis", type="str")], ) workflow = Workflow( steps={"extract": extract_step, "analyze": analyze_step}, inputs=["input_text"], outputs={"entities": "extract.entities", "sentiment": "analyze.sentiment"}, ) # Execute the workflow result = execute_workflow(workflow, {"input_text": "Apple is launching a new product tomorrow."}) # Verify the results assert result == {"entities": ["Apple", "product"], "sentiment": "positive"} # Verify execute_model_step was called twice (once for each step) assert mock_execute_step.call_count == 2 def test_execute_workflow_missing_input(): """Test that an error is raised when a required input is missing.""" step = ModelStep( id="summarize", model="gpt-3.5-turbo", provider="openai", call_type="llm", system_prompt="Summarize the text", input_fields=[InputField(name="text", description="Text to summarize", variable="input_text")], output_fields=[OutputField(name="summary", description="Summary of the text", type="str")], ) workflow = Workflow(steps={"summarize": step}, inputs=["input_text"], outputs={"summary": "summarize.summary"}) # Execute with missing input with pytest.raises(WorkflowError, match="Missing required workflow input"): execute_workflow(workflow, {}) @patch("workflows.executors.create_dependency_graph") def test_execute_workflow_cyclic_dependency(mock_dependency_graph): """Test that a cyclic dependency in the workflow raises an appropriate error.""" # Make create_dependency_graph raise a CyclicDependencyError mock_dependency_graph.side_effect = CyclicDependencyError() step = ModelStep( id="test", model="gpt-3.5-turbo", provider="openai", call_type="llm", system_prompt="Test", input_fields=[], output_fields=[], ) workflow = Workflow(steps={"test": step}, inputs=[], outputs=[]) # This should propagate the CyclicDependencyError with pytest.raises(CyclicDependencyError): execute_workflow(workflow, {})