|
from app_configs import CONFIGS |
|
from components.structs import PipelineState, TossupPipelineState |
|
from components.typed_dicts import PipelineStateDict, TossupPipelineStateDict |
|
from workflows.structs import TossupWorkflow, Workflow |
|
from workflows.validators import WorkflowValidator |
|
|
|
|
|
def validate_workflow( |
|
workflow: TossupWorkflow | Workflow, required_input_vars: list[str], required_output_vars: list[str] |
|
): |
|
""" |
|
Validate that a workflow is properly configured for the tossup task. |
|
|
|
Args: |
|
workflow (TossupWorkflow): The workflow to validate |
|
|
|
Raises: |
|
ValueError: If the workflow is not properly configured |
|
""" |
|
if not workflow.steps: |
|
raise ValueError("Workflow must have at least one step") |
|
|
|
|
|
input_vars = set(workflow.inputs) |
|
for req_var in required_input_vars: |
|
if req_var not in input_vars: |
|
raise ValueError(f"Workflow must have '{req_var}' as an input") |
|
|
|
output_vars = set(workflow.outputs) |
|
for req_var in required_output_vars: |
|
if req_var not in output_vars: |
|
raise ValueError(f"Workflow must produce '{req_var}' as an output") |
|
|
|
|
|
WorkflowValidator().validate(workflow) |
|
|
|
|
|
def validate_tossup_workflow(pipeline_state_dict: TossupPipelineStateDict) -> TossupPipelineState: |
|
pipeline_state = TossupPipelineState(**pipeline_state_dict) |
|
validate_workflow( |
|
pipeline_state.workflow, |
|
CONFIGS["tossup"]["required_input_vars"], |
|
CONFIGS["tossup"]["required_output_vars"], |
|
) |
|
return pipeline_state |
|
|
|
|
|
def validate_bonus_workflow(pipeline_state_dict: PipelineStateDict): |
|
pipeline_state = PipelineState(**pipeline_state_dict) |
|
validate_workflow( |
|
pipeline_state.workflow, |
|
CONFIGS["bonus"]["required_input_vars"], |
|
CONFIGS["bonus"]["required_output_vars"], |
|
) |
|
return pipeline_state |
|
|