|
import pytest |
|
|
|
from workflows.errors import CyclicDependencyError, UnknownVariableError, WorkflowError |
|
from workflows.utils import _create_variable_step_mapping, create_dependency_graph, topological_sort |
|
|
|
|
|
|
|
class DummyField: |
|
def __init__(self, name, type="str", variable=None): |
|
self.name = name |
|
self.type = type |
|
|
|
self.variable = variable if variable is not None else name |
|
|
|
|
|
class DummyStep: |
|
def __init__(self, input_fields, output_fields): |
|
self.input_fields = input_fields |
|
self.output_fields = output_fields |
|
|
|
|
|
class DummyWorkflow: |
|
def __init__(self, steps): |
|
|
|
self.steps = steps |
|
|
|
|
|
|
|
|
|
|
|
def test_create_variable_step_mapping_success(): |
|
|
|
step_a = DummyStep(input_fields=[], output_fields=[DummyField("out1")]) |
|
step_b = DummyStep(input_fields=[], output_fields=[DummyField("out2")]) |
|
workflow = DummyWorkflow({"A": step_a, "B": step_b}) |
|
mapping = _create_variable_step_mapping(workflow) |
|
assert mapping == {"A.out1": "A", "B.out2": "B"} |
|
|
|
|
|
def test_create_variable_step_mapping_duplicate(): |
|
|
|
step_a = DummyStep(input_fields=[], output_fields=[DummyField("out"), DummyField("out")]) |
|
workflow = DummyWorkflow({"A": step_a}) |
|
with pytest.raises(WorkflowError): |
|
_create_variable_step_mapping(workflow) |
|
|
|
|
|
def test_create_variable_step_mapping_empty(): |
|
"""Test _create_variable_step_mapping with an empty workflow should return an empty mapping.""" |
|
workflow = DummyWorkflow({}) |
|
mapping = _create_variable_step_mapping(workflow) |
|
assert mapping == {} |
|
|
|
|
|
def test_create_variable_step_mapping_multiple_outputs(): |
|
"""Test a workflow where a single step produces multiple outputs with unique names.""" |
|
step = DummyStep(input_fields=[], output_fields=[DummyField("out1"), DummyField("out2")]) |
|
workflow = DummyWorkflow({"A": step}) |
|
mapping = _create_variable_step_mapping(workflow) |
|
assert mapping == {"A.out1": "A", "A.out2": "A"} |
|
|
|
|
|
|
|
|
|
|
|
def test_create_dependency_graph_success_with_dependency(): |
|
|
|
step_a = DummyStep(input_fields=[], output_fields=[DummyField("out")]) |
|
|
|
step_b = DummyStep(input_fields=[DummyField("dummy", variable="A.out")], output_fields=[DummyField("result")]) |
|
workflow = DummyWorkflow({"A": step_a, "B": step_b}) |
|
|
|
deps = create_dependency_graph(workflow, input_values={}) |
|
|
|
assert deps["B"] == {"A"} |
|
|
|
assert deps["A"] == set() |
|
|
|
|
|
def test_create_dependency_graph_success_with_external_input(): |
|
|
|
step_b = DummyStep( |
|
input_fields=[DummyField("param", variable="external_param")], output_fields=[DummyField("result")] |
|
) |
|
workflow = DummyWorkflow({"B": step_b}) |
|
|
|
deps = create_dependency_graph(workflow, input_values={"external_param": 42}) |
|
|
|
assert deps["B"] == set() |
|
|
|
|
|
def test_create_dependency_graph_unknown_variable(): |
|
|
|
step_b = DummyStep( |
|
input_fields=[DummyField("param", variable="non_existent")], output_fields=[DummyField("result")] |
|
) |
|
workflow = DummyWorkflow({"B": step_b}) |
|
with pytest.raises(UnknownVariableError): |
|
_ = create_dependency_graph(workflow, input_values={}) |
|
|
|
|
|
def test_create_dependency_graph_complex(): |
|
"""Test create_dependency_graph on a more complex workflow with multiple dependencies.""" |
|
|
|
step_a = DummyStep(input_fields=[], output_fields=[DummyField("out")]) |
|
step_b = DummyStep(input_fields=[DummyField("inp", variable="A.out")], output_fields=[DummyField("out")]) |
|
step_c = DummyStep(input_fields=[DummyField("inp", variable="B.out")], output_fields=[DummyField("result")]) |
|
step_d = DummyStep( |
|
input_fields=[DummyField("inp1", variable="A.out"), DummyField("inp2", variable="B.out")], |
|
output_fields=[DummyField("final")], |
|
) |
|
|
|
workflow = DummyWorkflow({"A": step_a, "B": step_b, "C": step_c, "D": step_d}) |
|
|
|
|
|
|
|
|
|
|
|
deps = create_dependency_graph(workflow, input_values={}) |
|
|
|
|
|
|
|
|
|
assert deps["B"] == {"A"} |
|
assert deps["C"] == {"B"} |
|
assert deps["D"] == {"A", "B"} |
|
|
|
|
|
|
|
|
|
|
|
def test_topological_sort_success(): |
|
|
|
deps = {"A": set(), "B": {"A"}, "C": {"B"}} |
|
order = topological_sort(deps) |
|
|
|
assert order.index("A") < order.index("B") < order.index("C") |
|
|
|
|
|
def test_topological_sort_cycle(): |
|
|
|
deps = {"A": {"B"}, "B": {"A"}} |
|
with pytest.raises(CyclicDependencyError): |
|
_ = topological_sort(deps) |
|
|
|
|
|
def test_topological_sort_single_node(): |
|
"""Test topological_sort on a graph with a single node and no dependencies.""" |
|
deps = {"A": set()} |
|
order = topological_sort(deps) |
|
assert order == ["A"] |
|
|
|
|
|
def test_topological_sort_disconnected(): |
|
"""Test topological_sort on a graph with disconnected nodes (no dependencies among them).""" |
|
deps = {"A": set(), "B": set(), "C": set()} |
|
order = topological_sort(deps) |
|
|
|
assert set(order) == {"A", "B", "C"} |
|
|