Maharshi Gor
First Working commit
193db9d
raw
history blame
12.2 kB
import json
import gradio as gr
import yaml
from components.model_pipeline.state_manager import (
ModelStepUIState,
PipelineState,
PipelineStateManager,
PipelineUIState,
)
from components.model_step.model_step import ModelStepComponent
from components.utils import make_state
from workflows.structs import ModelStep, Workflow
from workflows.validators import WorkflowValidator
def validate_simple_workflow(workflow: Workflow, required_output_variables: list[str]) -> Workflow:
"""Validate the workflow."""
step = next(iter(workflow.steps.values()))
if not step.output_fields:
raise ValueError("No output fields found in the workflow")
output_field_names = {output.name for output in step.output_fields}
if not set(required_output_variables) <= output_field_names:
missing_vars = required_output_variables - output_field_names
raise ValueError(f"Missing required output variables: {missing_vars}")
return workflow
def validate_complex_workflow(workflow: Workflow, required_output_variables: list[str]) -> Workflow:
"""Validate the workflow."""
print("Validating complex workflow.")
return workflow
step = next(iter(workflow.steps.values()))
if not step.output_fields:
raise ValueError("No output fields found in the workflow")
output_field_names = {output.name for output in step.output_fields}
if not output_field_names <= set(required_output_variables):
missing_vars = output_field_names - set(required_output_variables)
raise ValueError(f"Missing required output variables: {missing_vars}")
return workflow
def parse_yaml_workflow(yaml_str: str) -> Workflow:
"""Parse a YAML workflow."""
workflow = yaml.safe_load(yaml_str)
return Workflow(**workflow)
def update_workflow_from_code(yaml_str: str, ui_state: PipelineUIState) -> PipelineState:
"""Update a workflow from a YAML string."""
workflow = parse_yaml_workflow(yaml_str)
ui_state = PipelineUIState.from_workflow(workflow)
return PipelineState(workflow=workflow, ui_state=ui_state)
class PipelineInterface:
"""UI for the pipeline."""
def __init__(
self,
workflow: Workflow,
ui_state: PipelineUIState | None = None,
model_options: list[str] = None,
simple: bool = False,
):
self.model_options = model_options
self.simple = simple
if not ui_state:
ui_state = PipelineUIState.from_workflow(workflow)
self.ui_state = make_state(ui_state)
self.pipeline_state = make_state(PipelineState(workflow=workflow, ui_state=ui_state))
self.variables_state = make_state(workflow.get_available_variables())
self.sm = PipelineStateManager()
self.input_variables = workflow.inputs
self.required_output_variables = list(workflow.outputs.keys())
# UI elements
self.steps_container = None
self.components = []
# Render the pipeline UI
self.render()
def _render_step(
self,
model_step: ModelStep,
step_ui_state: ModelStepUIState,
available_variables: list[str],
position: int = 0,
):
with gr.Column(elem_classes="step-container"):
# Create the step component
step_interface = ModelStepComponent(
value=model_step,
ui_state=step_ui_state,
model_options=self.model_options,
input_variables=available_variables,
pipeline_state_manager=self.sm,
)
step_interface.on_model_step_change(
self.sm.update_model_step_state,
inputs=[self.pipeline_state, step_interface.model_step_state, step_interface.ui_state],
outputs=[self.pipeline_state, self.ui_state, self.variables_state],
)
step_interface.on_ui_change(
self.sm.update_model_step_ui,
inputs=[self.pipeline_state, step_interface.ui_state, gr.State(model_step.id)],
outputs=[self.pipeline_state, self.ui_state],
)
if self.simple:
return step_interface
# Add step controls below
with gr.Row(elem_classes="step-controls"):
up_button = gr.Button("⬆️ Move Up", elem_classes="step-control-btn")
down_button = gr.Button("⬇️ Move Down", elem_classes="step-control-btn")
remove_button = gr.Button("🗑️ Remove", elem_classes="step-control-btn")
buttons = (up_button, down_button, remove_button)
self._assign_step_controls(buttons, position)
return (step_interface, *buttons)
def _assign_step_controls(self, buttons: tuple[gr.Button, gr.Button, gr.Button], position: int):
up_button, down_button, remove_button = buttons
position = gr.State(position)
up_button.click(self.sm.move_up, inputs=[self.ui_state, position], outputs=self.ui_state)
down_button.click(self.sm.move_down, inputs=[self.ui_state, position], outputs=self.ui_state)
remove_button.click(
self.sm.remove_step,
inputs=[self.pipeline_state, position],
outputs=[self.pipeline_state, self.ui_state, self.variables_state],
)
def _render_add_step_button(self, position: int):
if position not in {0, -1}:
raise ValueError("Position must be 0 or -1")
row_class = "pipeline-header" if position == 0 else "pipeline-footer"
with gr.Row(elem_classes=row_class):
add_step_btn = gr.Button("➕ Add Step", elem_classes="add-step-button")
add_step_btn.click(
self.sm.add_step,
inputs=[self.pipeline_state, gr.State(position)],
outputs=[self.pipeline_state, self.ui_state, self.variables_state],
)
return add_step_btn
def _render_output_fields(self, available_variables: list[str], pipeline_state: PipelineState):
dropdowns = {}
UNSET_VALUE = "Choose variable..."
variable_options = [UNSET_VALUE] + [v for v in available_variables if v not in self.input_variables]
with gr.Column(elem_classes="step-accordion"):
with gr.Row(elem_classes="output-fields-header"):
gr.Markdown("#### Final output variables mapping:")
with gr.Row(elem_classes="output-fields-row"):
for output_field in self.required_output_variables:
value = pipeline_state.workflow.outputs[output_field]
if not value:
value = UNSET_VALUE
dropdown = gr.Dropdown(
label=output_field,
value=value,
choices=variable_options,
interactive=True,
elem_classes="output-field-variable",
# show_label=False,
)
dropdown.change(
self.sm.update_output_variables,
inputs=[self.pipeline_state, gr.State(output_field), dropdown],
outputs=[self.pipeline_state],
)
dropdowns[output_field] = dropdown
def update_choices(available_variables):
"""Update the choices for the dropdowns"""
return [
gr.update(choices=available_variables, value=None, selected=None) for dropdown in dropdowns.values()
]
self.variables_state.change(
update_choices,
inputs=[self.variables_state],
outputs=list(dropdowns.values()),
)
return dropdowns
def validate_workflow(self, state: PipelineState) -> PipelineState:
"""Validate the workflow."""
try:
if self.simple:
workflow = validate_simple_workflow(state.workflow, self.required_output_variables)
else:
workflow = validate_complex_workflow(state.workflow, self.required_output_variables)
state.workflow = workflow
return state
except ValueError as e:
raise gr.Error(e)
def _render_pipeline_header(self):
# Add Step button at top
input_variables_str = ", ".join([f"`{variable}`" for variable in self.input_variables])
output_variables_str = ", ".join([f"`{variable}`" for variable in self.required_output_variables])
if self.simple:
instruction = "Create a simple single LLM call pipeline that takes in the following input variables and outputs the following output variables:"
else:
instruction = "Create a pipeline that takes in the following input variables and outputs the following output variables:"
gr.Markdown(f"### {instruction}")
gr.Markdown(f"Input Variables: {input_variables_str}")
gr.Markdown(f"Output Variables: {output_variables_str}")
# if not self.simple:
# self._render_add_step_button(0)
def render(self):
"""Render the pipeline UI."""
# Create a placeholder for all the step components
self.all_components = []
# self.pipeline_state.change(
# lambda x, y: print(f"Pipeline state changed! UI:\n{x}\n\n Data:\n{y}"),
# inputs=[self.ui_state, self.pipeline_state],
# outputs=[],
# )
self._render_pipeline_header()
# Function to render all steps
@gr.render(inputs=[self.pipeline_state, self.ui_state])
def render_steps(state, ui_state):
"""Render all steps in the pipeline"""
workflow = state.workflow
print(f"\nRerender triggered! Current UI State:{ui_state}")
components = []
step_objects = [] # Reset step objects list
for i, step_id in enumerate(ui_state.step_ids):
step_data = workflow.steps[step_id]
step_ui_state = ui_state.steps[step_id]
available_variables = self.sm.get_all_variables(state, step_id)
sub_components = self._render_step(step_data, step_ui_state, available_variables, i)
step_objects.append(sub_components)
components.append(step_objects)
# Bottom buttons
if not self.simple:
self._render_add_step_button(-1)
@gr.render(inputs=[self.variables_state, self.pipeline_state])
def render_output_fields(available_variables, pipeline_state):
return self._render_output_fields(available_variables, pipeline_state)
export_btn = gr.Button("Export Pipeline", elem_classes="export-button")
# components.append(export_btn)
# Add a code box to display the workflow JSON
# with gr.Column(elem_classes="workflow-json-container"):
with gr.Accordion("Pipeline Preview", open=False, elem_classes="pipeline-preview") as config_accordion:
config_output = gr.Code(
label="Workflow Configuration",
language="yaml",
elem_classes="workflow-json",
interactive=True,
autocomplete=True,
)
# components.append(config_accordion)
config_output.blur(
fn=update_workflow_from_code,
inputs=[config_output, self.ui_state],
outputs=[self.pipeline_state],
)
# Connect the export button to show the workflow JSON
export_btn.click(self.validate_workflow, inputs=[self.pipeline_state], outputs=[self.pipeline_state]).success(
fn=lambda: gr.update(visible=True, open=True), outputs=[config_accordion]
)
export_btn.click(
fn=self.sm.get_formatted_config,
inputs=[self.pipeline_state, gr.State("yaml")],
outputs=[config_output],
js="() => {document.querySelector('.pipeline-preview').scrollIntoView({behavior: 'smooth'})}",
)
# self.all_components = components