Maharshi Gor
Squash merge dictify-states into main
9756440
raw
history blame
15.4 kB
import json
from typing import Any
import gradio as gr
import pandas as pd
from datasets import Dataset
from loguru import logger
from app_configs import UNSELECTED_PIPELINE_NAME
from components import commons
from components.model_pipeline.model_pipeline import PipelineInterface, PipelineState, PipelineUIState
from components.typed_dicts import PipelineStateDict
from display.formatting import styled_error
from submission import submit
from workflows.qb_agents import QuizBowlBonusAgent
from workflows.structs import ModelStep, Workflow
from . import populate
from .plotting import (
create_bonus_confidence_plot,
create_bonus_html,
create_scatter_pyplot,
update_tossup_plot,
)
from .utils import evaluate_prediction
def process_bonus_results(results: list[dict]) -> pd.DataFrame:
"""Process results from bonus mode and prepare visualization data."""
return pd.DataFrame(
[
{
"Part": f"Part {r['part_number']}",
"Correct?": "✅" if r["score"] == 1 else "❌",
"Confidence": r["confidence"],
"Prediction": r["answer"],
"Explanation": r["explanation"],
}
for r in results
]
)
def initialize_eval_interface(example: dict, model_outputs: list[dict]):
"""Initialize the interface with example text."""
try:
html_content = create_bonus_html(example["leadin"], example["parts"])
# Create confidence plot data
plot_data = create_bonus_confidence_plot(example["parts"], model_outputs)
# Store state
state = json.dumps({"parts": example["parts"], "outputs": model_outputs})
return html_content, plot_data, state
except Exception as e:
logger.exception(f"Error initializing interface: {e.args}")
return f"<div>Error initializing interface: {str(e)}</div>", pd.DataFrame(), "{}"
def validate_workflow(workflow: Workflow):
"""Validate that a workflow is properly configured for the bonus task."""
if not workflow.steps:
raise ValueError("Workflow must have at least one step")
# Ensure all steps are properly configured
for step_id, step in workflow.steps.items():
validate_model_step(step)
# Check that the workflow has the correct structure
input_vars = set(workflow.inputs)
if "leadin" not in input_vars or "part" not in input_vars:
raise ValueError("Workflow must have 'leadin' and 'part' as inputs")
output_vars = set(workflow.outputs)
if not all(var in output_vars for var in ["answer", "confidence", "explanation"]):
raise ValueError("Workflow must produce 'answer', 'confidence', and 'explanation' as outputs")
def validate_model_step(model_step: ModelStep):
"""Validate that a model step is properly configured for the bonus task."""
# Check required fields
if not model_step.model or not model_step.provider:
raise ValueError("Model step must have both model and provider specified")
if model_step.call_type != "llm":
raise ValueError("Model step must have call_type 'llm'")
# Validate temperature for LLM steps
if model_step.temperature is None:
raise ValueError("Temperature must be specified for LLM model steps")
if not (0.0 <= model_step.temperature <= 1.0):
raise ValueError(f"Temperature must be between 0.0 and 1.0, got {model_step.temperature}")
# Validate input fields
input_field_names = {field.name for field in model_step.input_fields}
if "leadin" not in input_field_names or "part" not in input_field_names:
raise ValueError("Model step must have 'leadin' and 'part' input fields")
# Validate output fields
output_field_names = {field.name for field in model_step.output_fields}
required_outputs = {"answer", "confidence", "explanation"}
if not all(out in output_field_names for out in required_outputs):
raise ValueError("Model step must have all required output fields: answer, confidence, explanation")
# Validate confidence output field is of type float
for field in model_step.output_fields:
if field.name == "confidence" and field.type != "float":
raise ValueError("The 'confidence' output field must be of type 'float'")
class BonusInterface:
"""Gradio interface for the Bonus mode."""
def __init__(self, app: gr.Blocks, dataset: Dataset, model_options: dict, defaults: dict):
"""Initialize the Bonus interface."""
logger.info(f"Initializing Bonus interface with dataset size: {len(dataset)}")
self.ds = dataset
self.model_options = model_options
self.app = app
self.defaults = defaults
self.output_state = gr.State(value="{}")
self.render()
def _render_pipeline_interface(self, workflow: Workflow, simple: bool = True):
"""Render the model interface."""
with gr.Row(elem_classes="bonus-header-row form-inline"):
self.pipeline_selector = commons.get_pipeline_selector([])
self.load_btn = gr.Button("⬇️ Import Pipeline", variant="secondary")
self.pipeline_interface = PipelineInterface(
self.app,
workflow,
simple=simple,
model_options=list(self.model_options.keys()),
)
def _render_qb_interface(self):
"""Render the quizbowl interface."""
with gr.Row(elem_classes="bonus-header-row form-inline"):
self.qid_selector = commons.get_qid_selector(len(self.ds))
self.run_btn = gr.Button("Run on Bonus Question", variant="secondary")
self.question_display = gr.HTML(label="Question", elem_id="bonus-question-display")
self.error_display = gr.HTML(label="Error", elem_id="bonus-error-display", visible=False)
self.results_table = gr.DataFrame(
label="Model Outputs",
value=pd.DataFrame(columns=["Part", "Correct?", "Confidence", "Prediction", "Explanation"]),
visible=False,
)
self.model_outputs_display = gr.JSON(label="Model Outputs", value="{}", show_indices=True, visible=False)
with gr.Row():
self.eval_btn = gr.Button("Evaluate", variant="primary")
with gr.Accordion("Model Submission", elem_classes="model-submission-accordion", open=True):
with gr.Row():
self.model_name_input = gr.Textbox(label="Model Name")
self.description_input = gr.Textbox(label="Description")
with gr.Row():
gr.LoginButton()
self.submit_btn = gr.Button("Submit", variant="primary")
self.submit_status = gr.HTML(label="Submission Status")
def render(self):
"""Create the Gradio interface."""
self.hidden_input = gr.Textbox(value="", visible=False, elem_id="hidden-index")
workflow = self.defaults["init_workflow"]
with gr.Row():
# Model Panel
with gr.Column(scale=1):
self._render_pipeline_interface(workflow, simple=self.defaults["simple_workflow"])
with gr.Column(scale=1):
self._render_qb_interface()
self._setup_event_listeners()
def get_new_question_html(self, question_id: int):
"""Get the HTML for a new question."""
if question_id is None:
logger.error("Question ID is None. Setting to 1")
question_id = 1
try:
question_id = int(question_id) - 1
if not self.ds or question_id < 0 or question_id >= len(self.ds):
return "Invalid question ID or dataset not loaded"
example = self.ds[question_id]
leadin = example["leadin"]
parts = example["parts"]
return create_bonus_html(leadin, parts)
except Exception as e:
return f"Error loading question: {str(e)}"
def get_model_outputs(self, example: dict, pipeline_state: PipelineState):
"""Get the model outputs for a given question ID."""
outputs = []
leadin = example["leadin"]
agent = QuizBowlBonusAgent(pipeline_state.workflow)
for i, part in enumerate(example["parts"]):
# Run model for each part
part_output = agent.run(leadin, part["part"])
# Add part number and evaluate score
part_output["part_number"] = i + 1
part_output["score"] = evaluate_prediction(part_output["answer"], part["clean_answers"])
outputs.append(part_output)
return outputs
def get_pipeline_names(self, profile: gr.OAuthProfile | None) -> list[str]:
names = [UNSELECTED_PIPELINE_NAME] + populate.get_pipeline_names("bonus", profile)
return gr.update(choices=names, value=UNSELECTED_PIPELINE_NAME)
def load_pipeline(
self, model_name: str, pipeline_change: bool, profile: gr.OAuthProfile | None
) -> tuple[str, PipelineStateDict, bool, dict]:
try:
workflow = populate.load_workflow("bonus", model_name, profile)
if workflow is None:
logger.warning(f"Could not load workflow for {model_name}")
return UNSELECTED_PIPELINE_NAME, gr.skip(), gr.skip(), gr.update(visible=False)
pipeline_state_dict = PipelineState.from_workflow(workflow).model_dump()
return UNSELECTED_PIPELINE_NAME, pipeline_state_dict, not pipeline_change, gr.update(visible=True)
except Exception as e:
error_msg = styled_error(f"Error loading pipeline: {str(e)}")
return UNSELECTED_PIPELINE_NAME, gr.skip(), gr.skip(), gr.update(visible=True, value=error_msg)
def single_run(
self,
question_id: int,
state_dict: PipelineStateDict,
) -> tuple[str, Any, Any]:
"""Run the agent in bonus mode."""
try:
pipeline_state = PipelineState(**state_dict)
question_id = int(question_id - 1)
if not self.ds or question_id < 0 or question_id >= len(self.ds):
return "Invalid question ID or dataset not loaded", None, None
example = self.ds[question_id]
outputs = self.get_model_outputs(example, pipeline_state)
# Process results and prepare visualization data
html_content, plot_data, output_state = initialize_eval_interface(example, outputs)
df = process_bonus_results(outputs)
step_outputs = [output["step_outputs"] for output in outputs]
return (
html_content,
gr.update(value=output_state),
gr.update(value=df, label=f"Model Outputs for Question {question_id + 1}", visible=True),
gr.update(value=step_outputs, label=f"Step Outputs for Question {question_id + 1}", visible=True),
gr.update(visible=False),
)
except Exception as e:
import traceback
error_msg = f"Error: {str(e)}\n{traceback.format_exc()}"
return (
gr.skip(),
gr.skip(),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=True, value=error_msg),
)
def evaluate(self, state_dict: PipelineStateDict, progress: gr.Progress = gr.Progress()):
"""Evaluate the bonus questions."""
try:
pipeline_state = PipelineState(**state_dict)
# Validate inputs
if not self.ds or not self.ds.num_rows:
return "No dataset loaded", None, None
total_correct = 0
total_parts = 0
part_scores = []
part_numbers = []
for example in progress.tqdm(self.ds, desc="Evaluating bonus questions"):
model_outputs = self.get_model_outputs(example, pipeline_state)
for output in model_outputs:
total_parts += 1
if output["score"] == 1:
total_correct += 1
part_scores.append(output["score"])
part_numbers.append(output["part_number"])
accuracy = total_correct / total_parts
df = pd.DataFrame(
[
{
"Part Accuracy": f"{accuracy:.2%}",
"Total Score": f"{total_correct}/{total_parts}",
"Questions Evaluated": len(self.ds),
}
]
)
# plot_data = create_scatter_pyplot(part_numbers, part_scores)
return (
gr.update(value=df, label="Scores on Sample Set"),
gr.update(visible=False),
)
except Exception as e:
error_msg = styled_error(f"Error evaluating bonus: {e.args}")
logger.exception(f"Error evaluating bonus: {e.args}")
return gr.skip(), gr.update(visible=True, value=error_msg)
def submit_model(
self,
model_name: str,
description: str,
state_dict: PipelineStateDict,
profile: gr.OAuthProfile = None,
):
"""Submit the model output."""
pipeline_state = PipelineState(**state_dict)
return submit.submit_model(model_name, description, pipeline_state.workflow, "bonus", profile)
def _setup_event_listeners(self):
# Initialize with the default question (ID 0)
gr.on(
triggers=[self.app.load, self.qid_selector.change],
fn=self.get_new_question_html,
inputs=[self.qid_selector],
outputs=[self.question_display],
)
gr.on(
triggers=[self.app.load],
fn=self.get_pipeline_names,
outputs=[self.pipeline_selector],
)
pipeline_state = self.pipeline_interface.pipeline_state
pipeline_change = self.pipeline_interface.pipeline_change
self.load_btn.click(
fn=self.load_pipeline,
inputs=[self.pipeline_selector, pipeline_change],
outputs=[self.pipeline_selector, pipeline_state, pipeline_change, self.error_display],
)
self.pipeline_interface.add_triggers_for_pipeline_export([pipeline_state.change], pipeline_state)
self.run_btn.click(
self.pipeline_interface.validate_workflow,
inputs=[self.pipeline_interface.pipeline_state],
outputs=[],
).success(
self.single_run,
inputs=[
self.qid_selector,
self.pipeline_interface.pipeline_state,
],
outputs=[
self.question_display,
self.output_state,
self.results_table,
self.model_outputs_display,
self.error_display,
],
)
self.eval_btn.click(
fn=self.evaluate,
inputs=[self.pipeline_interface.pipeline_state],
outputs=[self.results_table, self.error_display],
)
self.submit_btn.click(
fn=self.submit_model,
inputs=[
self.model_name_input,
self.description_input,
self.pipeline_interface.pipeline_state,
],
outputs=[self.submit_status],
)