File size: 1,954 Bytes
f9589f4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
from app_configs import CONFIGS
from components.structs import PipelineState, TossupPipelineState
from components.typed_dicts import PipelineStateDict, TossupPipelineStateDict
from workflows.structs import TossupWorkflow, Workflow
from workflows.validators import WorkflowValidator
def validate_workflow(
workflow: TossupWorkflow | Workflow, required_input_vars: list[str], required_output_vars: list[str]
):
"""
Validate that a workflow is properly configured for the tossup task.
Args:
workflow (TossupWorkflow): The workflow to validate
Raises:
ValueError: If the workflow is not properly configured
"""
if not workflow.steps:
raise ValueError("Workflow must have at least one step")
# Check that the workflow has the correct structure
input_vars = set(workflow.inputs)
for req_var in required_input_vars:
if req_var not in input_vars:
raise ValueError(f"Workflow must have '{req_var}' as an input")
output_vars = set(workflow.outputs)
for req_var in required_output_vars:
if req_var not in output_vars:
raise ValueError(f"Workflow must produce '{req_var}' as an output")
# Ensure all steps are properly configured
WorkflowValidator().validate(workflow)
def validate_tossup_workflow(pipeline_state_dict: TossupPipelineStateDict) -> TossupPipelineState:
pipeline_state = TossupPipelineState(**pipeline_state_dict)
validate_workflow(
pipeline_state.workflow,
CONFIGS["tossup"]["required_input_vars"],
CONFIGS["tossup"]["required_output_vars"],
)
return pipeline_state
def validate_bonus_workflow(pipeline_state_dict: PipelineStateDict):
pipeline_state = PipelineState(**pipeline_state_dict)
validate_workflow(
pipeline_state.workflow,
CONFIGS["bonus"]["required_input_vars"],
CONFIGS["bonus"]["required_output_vars"],
)
return pipeline_state
|