File size: 12,472 Bytes
193db9d e00ec4e 193db9d e00ec4e 193db9d e00ec4e 193db9d e00ec4e 193db9d e00ec4e 193db9d e00ec4e 193db9d e00ec4e 193db9d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 |
import gradio as gr
import yaml
from loguru import logger
from components.model_pipeline.state_manager import (
ModelStepUIState,
PipelineState,
PipelineStateManager,
PipelineUIState,
)
from components.model_step.model_step import ModelStepComponent
from components.utils import make_state
from workflows.structs import ModelStep, Workflow
from workflows.validators import WorkflowValidator
def validate_simple_workflow(workflow: Workflow, required_output_variables: list[str]) -> Workflow:
"""Validate the workflow."""
step = next(iter(workflow.steps.values()))
if not step.output_fields:
raise ValueError("No output fields found in the workflow")
output_field_names = {output.name for output in step.output_fields}
if not set(required_output_variables) <= output_field_names:
missing_vars = required_output_variables - output_field_names
raise ValueError(f"Missing required output variables: {missing_vars}")
return workflow
def validate_complex_workflow(workflow: Workflow, required_output_variables: list[str]) -> Workflow:
"""Validate the workflow."""
print("Validating complex workflow.")
return workflow
step = next(iter(workflow.steps.values()))
if not step.output_fields:
raise ValueError("No output fields found in the workflow")
output_field_names = {output.name for output in step.output_fields}
if not output_field_names <= set(required_output_variables):
missing_vars = output_field_names - set(required_output_variables)
raise ValueError(f"Missing required output variables: {missing_vars}")
return workflow
def parse_yaml_workflow(yaml_str: str) -> Workflow:
"""Parse a YAML workflow."""
workflow = yaml.safe_load(yaml_str)
return Workflow(**workflow)
def update_workflow_from_code(yaml_str: str, ui_state: PipelineUIState) -> PipelineState:
"""Update a workflow from a YAML string."""
workflow = parse_yaml_workflow(yaml_str)
ui_state = PipelineUIState.from_workflow(workflow)
return PipelineState(workflow=workflow, ui_state=ui_state)
class PipelineInterface:
"""UI for the pipeline."""
def __init__(
self,
workflow: Workflow,
ui_state: PipelineUIState | None = None,
model_options: list[str] = None,
simple: bool = False,
):
self.model_options = model_options
self.simple = simple
if not ui_state:
ui_state = PipelineUIState.from_workflow(workflow)
self.ui_state = make_state(ui_state)
self.pipeline_state = make_state(PipelineState(workflow=workflow, ui_state=ui_state))
self.variables_state = make_state(workflow.get_available_variables())
self.sm = PipelineStateManager()
self.input_variables = workflow.inputs
self.required_output_variables = list(workflow.outputs.keys())
# UI elements
self.steps_container = None
self.components = []
# Render the pipeline UI
self.render()
def _render_step(
self,
model_step: ModelStep,
step_ui_state: ModelStepUIState,
available_variables: list[str],
position: int = 0,
n_steps: int = 1,
):
with gr.Column(elem_classes="step-container"):
# Create the step component
step_interface = ModelStepComponent(
value=model_step,
ui_state=step_ui_state,
model_options=self.model_options,
input_variables=available_variables,
pipeline_state_manager=self.sm,
)
step_interface.on_model_step_change(
self.sm.update_model_step_state,
inputs=[self.pipeline_state, step_interface.model_step_state, step_interface.ui_state],
outputs=[self.pipeline_state, self.ui_state, self.variables_state],
)
step_interface.on_ui_change(
self.sm.update_model_step_ui,
inputs=[self.pipeline_state, step_interface.ui_state, gr.State(model_step.id)],
outputs=[self.pipeline_state, self.ui_state],
)
if self.simple:
return step_interface
is_multi_step = n_steps > 1
logger.debug(f"Rendering step {position} of {n_steps}")
# Add step controls below
with gr.Row(elem_classes="step-controls", visible=is_multi_step):
up_button = gr.Button("⬆️ Move Up", elem_classes="step-control-btn", interactive=is_multi_step)
down_button = gr.Button("⬇️ Move Down", elem_classes="step-control-btn", interactive=is_multi_step)
remove_button = gr.Button("🗑️ Remove", elem_classes="step-control-btn", interactive=is_multi_step)
buttons = (up_button, down_button, remove_button)
self._assign_step_controls(buttons, position)
return (step_interface, *buttons)
def _assign_step_controls(self, buttons: tuple[gr.Button, gr.Button, gr.Button], position: int):
up_button, down_button, remove_button = buttons
position = gr.State(position)
up_button.click(self.sm.move_up, inputs=[self.ui_state, position], outputs=self.ui_state)
down_button.click(self.sm.move_down, inputs=[self.ui_state, position], outputs=self.ui_state)
remove_button.click(
self.sm.remove_step,
inputs=[self.pipeline_state, position],
outputs=[self.pipeline_state, self.ui_state, self.variables_state],
)
def _render_add_step_button(self, position: int):
if position not in {0, -1}:
raise ValueError("Position must be 0 or -1")
row_class = "pipeline-header" if position == 0 else "pipeline-footer"
with gr.Row(elem_classes=row_class):
add_step_btn = gr.Button("➕ Add Step", elem_classes="add-step-button")
add_step_btn.click(
self.sm.add_step,
inputs=[self.pipeline_state, gr.State(position)],
outputs=[self.pipeline_state, self.ui_state, self.variables_state],
)
return add_step_btn
def _render_output_fields(self, available_variables: list[str], pipeline_state: PipelineState):
dropdowns = {}
UNSET_VALUE = "Choose variable..."
variable_options = [UNSET_VALUE] + [v for v in available_variables if v not in self.input_variables]
with gr.Column(elem_classes="step-accordion"):
with gr.Row(elem_classes="output-fields-header"):
gr.Markdown("#### Final output variables mapping:")
with gr.Row(elem_classes="output-fields-row"):
for output_field in self.required_output_variables:
value = pipeline_state.workflow.outputs[output_field]
if not value:
value = UNSET_VALUE
dropdown = gr.Dropdown(
label=output_field,
value=value,
choices=variable_options,
interactive=True,
elem_classes="output-field-variable",
# show_label=False,
)
dropdown.change(
self.sm.update_output_variables,
inputs=[self.pipeline_state, gr.State(output_field), dropdown],
outputs=[self.pipeline_state],
)
dropdowns[output_field] = dropdown
def update_choices(available_variables):
"""Update the choices for the dropdowns"""
return [
gr.update(choices=available_variables, value=None, selected=None) for dropdown in dropdowns.values()
]
self.variables_state.change(
update_choices,
inputs=[self.variables_state],
outputs=list(dropdowns.values()),
)
return dropdowns
def validate_workflow(self, state: PipelineState) -> PipelineState:
"""Validate the workflow."""
try:
if self.simple:
workflow = validate_simple_workflow(state.workflow, self.required_output_variables)
else:
workflow = validate_complex_workflow(state.workflow, self.required_output_variables)
state.workflow = workflow
return state
except ValueError as e:
raise gr.Error(e)
def _render_pipeline_header(self):
# Add Step button at top
input_variables_str = ", ".join([f"`{variable}`" for variable in self.input_variables])
output_variables_str = ", ".join([f"`{variable}`" for variable in self.required_output_variables])
if self.simple:
instruction = "Create a simple single LLM call pipeline that takes in the following input variables and outputs the following output variables:"
else:
instruction = "Create a pipeline that takes in the following input variables and outputs the following output variables:"
gr.Markdown(f"### {instruction}")
gr.Markdown(f"Input Variables: {input_variables_str}")
gr.Markdown(f"Output Variables: {output_variables_str}")
# if not self.simple:
# self._render_add_step_button(0)
def render(self):
"""Render the pipeline UI."""
# Create a placeholder for all the step components
self.all_components = []
# self.pipeline_state.change(
# lambda x, y: print(f"Pipeline state changed! UI:\n{x}\n\n Data:\n{y}"),
# inputs=[self.ui_state, self.pipeline_state],
# outputs=[],
# )
self._render_pipeline_header()
# Function to render all steps
@gr.render(inputs=[self.pipeline_state, self.ui_state])
def render_steps(state: PipelineState, ui_state: PipelineUIState):
"""Render all steps in the pipeline"""
logger.info(f"\nRerender triggered! Current UI State:{ui_state.model_dump()}")
workflow = state.workflow
components = []
step_objects = [] # Reset step objects list
for i, step_id in enumerate(ui_state.step_ids):
step_data = workflow.steps[step_id]
step_ui_state = ui_state.steps[step_id]
available_variables = self.sm.get_all_variables(state, step_id)
sub_components = self._render_step(step_data, step_ui_state, available_variables, i, ui_state.n_steps)
step_objects.append(sub_components)
components.append(step_objects)
# Bottom buttons
if not self.simple:
self._render_add_step_button(-1)
@gr.render(inputs=[self.variables_state, self.pipeline_state])
def render_output_fields(available_variables, pipeline_state):
return self._render_output_fields(available_variables, pipeline_state)
export_btn = gr.Button("Export Pipeline", elem_classes="export-button")
# components.append(export_btn)
# Add a code box to display the workflow JSON
# with gr.Column(elem_classes="workflow-json-container"):
with gr.Accordion("Pipeline Preview", open=False, elem_classes="pipeline-preview") as config_accordion:
config_output = gr.Code(
label="Workflow Configuration",
language="yaml",
elem_classes="workflow-json",
interactive=True,
autocomplete=True,
)
# components.append(config_accordion)
config_output.blur(
fn=update_workflow_from_code,
inputs=[config_output, self.ui_state],
outputs=[self.pipeline_state],
)
# Connect the export button to show the workflow JSON
export_btn.click(self.validate_workflow, inputs=[self.pipeline_state], outputs=[self.pipeline_state]).success(
fn=lambda: gr.update(visible=True, open=True), outputs=[config_accordion]
)
export_btn.click(
fn=self.sm.get_formatted_config,
inputs=[self.pipeline_state, gr.State("yaml")],
outputs=[config_output],
js="() => {document.querySelector('.pipeline-preview').scrollIntoView({behavior: 'smooth'})}",
)
# self.all_components = components
|