Maharshi Gor
Squash merge dictify-states into main
9756440
raw
history blame
19 kB
import json
from typing import Any
import gradio as gr
import numpy as np
import pandas as pd
from datasets import Dataset
from loguru import logger
from app_configs import UNSELECTED_PIPELINE_NAME
from components import commons
from components.model_pipeline.model_pipeline import PipelineInterface, PipelineState, PipelineUIState
from components.model_pipeline.tossup_pipeline import TossupPipelineInterface, TossupPipelineState
from components.typed_dicts import PipelineStateDict, TossupPipelineStateDict
from display.formatting import styled_error
from submission import submit
from workflows.qb_agents import QuizBowlTossupAgent, TossupResult
from workflows.structs import ModelStep, TossupWorkflow
from . import populate
from .plotting import (
create_scatter_pyplot,
create_tossup_confidence_pyplot,
create_tossup_html,
update_tossup_plot,
)
from .utils import evaluate_prediction
# TODO: Error handling on run tossup and evaluate tossup and show correct messages
# TODO: ^^ Same for Bonus
class ScoredTossupResult(TossupResult):
"""Result of a tossup question with evaluation score and position."""
score: int # Correctness score of the answer
token_position: int # Position in the question where prediction was made
def add_model_scores(model_outputs: list[dict], clean_answers: list[str], run_indices: list[int]) -> list[dict]:
"""Add model scores to the model outputs."""
for output, run_idx in zip(model_outputs, run_indices):
output["score"] = evaluate_prediction(output["answer"], clean_answers)
output["token_position"] = run_idx + 1
return model_outputs
def prepare_buzz_evals(
run_indices: list[int], model_outputs: list[dict]
) -> tuple[list[str], list[tuple[int, float, bool]]]:
"""Process text into tokens and assign random values for demonstration."""
if not run_indices:
logger.warning("No run indices provided, returning empty results")
return [], []
eval_points = []
for i, v in zip(run_indices, model_outputs):
eval_points.append((int(i), v))
return eval_points
def initialize_eval_interface(example, model_outputs: list[dict]):
"""Initialize the interface with example text."""
try:
tokens = example["question"].split()
run_indices = example["run_indices"]
answer = example["answer_primary"]
clean_answers = example["clean_answers"]
eval_points = prepare_buzz_evals(run_indices, model_outputs)
if not tokens:
return "<div>No tokens found in the provided text.</div>", pd.DataFrame(), "{}"
highlighted_index = next((int(i) for i, v in eval_points if v["buzz"] == 1), -1)
html_content = create_tossup_html(tokens, answer, clean_answers, run_indices, eval_points)
plot_data = create_tossup_confidence_pyplot(tokens, eval_points, highlighted_index)
# Store tokens, values, and buzzes as JSON for later use
state = json.dumps({"tokens": tokens, "values": eval_points})
return html_content, plot_data, state
except Exception as e:
logger.exception(f"Error initializing interface: {e.args}")
return f"<div>Error initializing interface: {str(e)}</div>", pd.DataFrame(), "{}"
def process_tossup_results(results: list[dict], top_k_mode: bool = False) -> pd.DataFrame:
"""Process results from tossup mode and prepare visualization data."""
# Create DataFrame for detailed results
if top_k_mode:
raise ValueError("Top-k mode not supported for tossup mode")
return pd.DataFrame(
[
{
"Token Position": r["token_position"],
"Correct?": "✅" if r["score"] == 1 else "❌",
"Confidence": r["confidence"],
"Prediction": r["answer"],
}
for r in results
]
)
def validate_workflow(workflow: TossupWorkflow):
"""
Validate that a workflow is properly configured for the tossup task.
Args:
workflow (TossupWorkflow): The workflow to validate
Raises:
ValueError: If the workflow is not properly configured
"""
if not workflow.steps:
raise ValueError("Workflow must have at least one step")
# Ensure all steps are properly configured
for step_id, step in workflow.steps.items():
validate_model_step(step)
# Check that the workflow has the correct structure
input_vars = set(workflow.inputs)
if "question" not in input_vars:
raise ValueError("Workflow must have 'question' as an input")
output_vars = set(workflow.outputs)
if not any("answer" in out_var for out_var in output_vars):
raise ValueError("Workflow must produce an 'answer' as output")
if not any("confidence" in out_var for out_var in output_vars):
raise ValueError("Workflow must produce a 'confidence' score as output")
def validate_model_step(model_step: ModelStep):
"""
Validate that a model step is properly configured for the tossup task.
Args:
model_step (ModelStep): The model step to validate
Raises:
ValueError: If the model step is not properly configured
"""
# Check required fields
if not model_step.model or not model_step.provider:
raise ValueError("Model step must have both model and provider specified")
if model_step.call_type != "llm":
raise ValueError("Model step must have call_type 'llm'")
# Validate temperature for LLM steps
if model_step.temperature is None:
raise ValueError("Temperature must be specified for LLM model steps")
if not (0.0 <= model_step.temperature <= 1.0):
raise ValueError(f"Temperature must be between 0.0 and 1.0, got {model_step.temperature}")
# Validate input fields
input_field_names = {field.name for field in model_step.input_fields}
if "question" not in input_field_names:
raise ValueError("Model step must have a 'question' input field")
# Validate output fields
output_field_names = {field.name for field in model_step.output_fields}
if "answer" not in output_field_names:
raise ValueError("Model step must have an 'answer' output field")
if "confidence" not in output_field_names:
raise ValueError("Model step must have a 'confidence' output field")
# Validate confidence output field is of type float
for field in model_step.output_fields:
if field.name == "confidence" and field.type != "float":
raise ValueError("The 'confidence' output field must be of type 'float'")
class TossupInterface:
"""Gradio interface for the Tossup mode."""
def __init__(self, app: gr.Blocks, dataset: Dataset, model_options: dict, defaults: dict):
"""Initialize the Tossup interface."""
logger.info(f"Initializing Tossup interface with dataset size: {len(dataset)}")
self.ds = dataset
self.model_options = model_options
self.app = app
self.defaults = defaults
self.output_state = gr.State(value="{}")
self.render()
def _render_pipeline_interface(self, workflow: TossupWorkflow, simple: bool = True):
"""Render the model interface."""
with gr.Row(elem_classes="bonus-header-row form-inline"):
self.pipeline_selector = commons.get_pipeline_selector([])
self.load_btn = gr.Button("⬇️ Import Pipeline", variant="secondary")
self.pipeline_interface = TossupPipelineInterface(
self.app,
workflow,
simple=simple,
model_options=list(self.model_options.keys()),
defaults=self.defaults,
)
def _render_qb_interface(self):
"""Render the quizbowl interface."""
with gr.Row(elem_classes="bonus-header-row form-inline"):
self.qid_selector = commons.get_qid_selector(len(self.ds))
self.early_stop_checkbox = gr.Checkbox(
value=self.defaults["early_stop"],
label="Early Stop",
info="Stop if already buzzed",
scale=0,
)
self.run_btn = gr.Button("Run on Tossup Question", variant="secondary")
self.question_display = gr.HTML(label="Question", elem_id="tossup-question-display")
self.error_display = gr.HTML(label="Error", elem_id="tossup-error-display", visible=False)
with gr.Row():
self.confidence_plot = gr.Plot(
label="Buzz Confidence",
format="webp",
)
self.model_outputs_display = gr.JSON(label="Model Outputs", value="{}", show_indices=True, visible=False)
self.results_table = gr.DataFrame(
label="Model Outputs",
value=pd.DataFrame(columns=["Token Position", "Correct?", "Confidence", "Prediction"]),
visible=False,
)
with gr.Row():
self.eval_btn = gr.Button("Evaluate", variant="primary")
with gr.Accordion("Model Submission", elem_classes="model-submission-accordion", open=True):
with gr.Row():
self.model_name_input = gr.Textbox(label="Model Name")
self.description_input = gr.Textbox(label="Description")
with gr.Row():
gr.LoginButton()
self.submit_btn = gr.Button("Submit", variant="primary")
self.submit_status = gr.HTML(label="Submission Status")
def render(self):
"""Create the Gradio interface."""
self.hidden_input = gr.Textbox(value="", visible=False, elem_id="hidden-index")
workflow = self.defaults["init_workflow"]
with gr.Row():
# Model Panel
with gr.Column(scale=1):
self._render_pipeline_interface(workflow, simple=self.defaults["simple_workflow"])
with gr.Column(scale=1):
self._render_qb_interface()
self._setup_event_listeners()
def validate_workflow(self, state_dict: TossupPipelineStateDict):
"""Validate the workflow."""
try:
pipeline_state = TossupPipelineState(**state_dict)
validate_workflow(pipeline_state.workflow)
except Exception as e:
raise gr.Error(f"Error validating workflow: {str(e)}")
def get_new_question_html(self, question_id: int) -> str:
"""Get the HTML for a new question."""
if question_id is None:
logger.error("Question ID is None. Setting to 1")
question_id = 1
try:
example = self.ds[question_id - 1]
question_tokens = example["question"].split()
return create_tossup_html(
question_tokens, example["answer_primary"], example["clean_answers"], example["run_indices"]
)
except Exception as e:
return f"Error loading question: {str(e)}"
def get_model_outputs(
self, example: dict, pipeline_state: PipelineState, early_stop: bool
) -> list[ScoredTossupResult]:
"""Get the model outputs for a given question ID."""
question_runs = []
tokens = example["question"].split()
for run_idx in example["run_indices"]:
question_runs.append(" ".join(tokens[: run_idx + 1]))
agent = QuizBowlTossupAgent(pipeline_state.workflow)
outputs = list(agent.run(question_runs, early_stop=early_stop))
outputs = add_model_scores(outputs, example["clean_answers"], example["run_indices"])
return outputs
def get_pipeline_names(self, profile: gr.OAuthProfile | None) -> list[str]:
names = [UNSELECTED_PIPELINE_NAME] + populate.get_pipeline_names("tossup", profile)
return gr.update(choices=names, value=UNSELECTED_PIPELINE_NAME)
def load_pipeline(
self, model_name: str, pipeline_change: bool, profile: gr.OAuthProfile | None
) -> tuple[str, PipelineStateDict, bool, dict]:
try:
workflow = populate.load_workflow("tossup", model_name, profile)
if workflow is None:
logger.warning(f"Could not load workflow for {model_name}")
return UNSELECTED_PIPELINE_NAME, gr.skip(), gr.skip(), gr.update(visible=False)
pipeline_state_dict = TossupPipelineState.from_workflow(workflow).model_dump()
return UNSELECTED_PIPELINE_NAME, pipeline_state_dict, not pipeline_change, gr.update(visible=True)
except Exception as e:
logger.exception(e)
error_msg = styled_error(f"Error loading pipeline: {str(e)}")
return UNSELECTED_PIPELINE_NAME, gr.skip(), gr.skip(), gr.update(visible=True, value=error_msg)
def single_run(
self,
question_id: int,
state_dict: TossupPipelineStateDict,
early_stop: bool = True,
) -> tuple[str, Any, Any]:
"""Run the agent in tossup mode with a system prompt."""
try:
# Validate inputs
question_id = int(question_id - 1)
if not self.ds or question_id < 0 or question_id >= len(self.ds):
return "Invalid question ID or dataset not loaded", None, None
example = self.ds[question_id]
pipeline_state = TossupPipelineState(**state_dict)
outputs = self.get_model_outputs(example, pipeline_state, early_stop)
# Process results and prepare visualization data
tokens_html, plot_data, output_state = initialize_eval_interface(example, outputs)
df = process_tossup_results(outputs)
step_outputs = [output["step_outputs"] for output in outputs]
return (
tokens_html,
gr.update(value=output_state),
gr.update(value=plot_data, label=f"Buzz Confidence on Question {question_id + 1}"),
gr.update(value=df, label=f"Model Outputs for Question {question_id + 1}", visible=True),
gr.update(value=step_outputs, label=f"Step Outputs for Question {question_id + 1}", visible=True),
gr.update(visible=False),
)
except Exception as e:
import traceback
error_msg = styled_error(f"Error: {str(e)}\n{traceback.format_exc()}")
return (
gr.skip(),
gr.skip(),
gr.skip(),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=True, value=error_msg),
)
def evaluate(self, state_dict: TossupPipelineStateDict, progress: gr.Progress = gr.Progress()):
"""Evaluate the tossup questions."""
try:
# Validate inputs
if not self.ds or not self.ds.num_rows:
return "No dataset loaded", None, None
pipeline_state = TossupPipelineState(**state_dict)
buzz_counts = 0
correct_buzzes = 0
token_positions = []
correctness = []
for example in progress.tqdm(self.ds, desc="Evaluating tossup questions"):
model_outputs = self.get_model_outputs(example, pipeline_state, early_stop=True)
if model_outputs[-1]["buzz"]:
buzz_counts += 1
if model_outputs[-1]["score"] == 1:
correct_buzzes += 1
token_positions.append(model_outputs[-1]["token_position"])
correctness.append(model_outputs[-1]["score"])
buzz_accuracy = correct_buzzes / buzz_counts
df = pd.DataFrame(
[
{
"Avg Buzz Position": f"{np.mean(token_positions):.2f}",
"Buzz Accuracy": f"{buzz_accuracy:.2%}",
"Total Score": f"{correct_buzzes}/{len(self.ds)}",
}
]
)
plot_data = create_scatter_pyplot(token_positions, correctness)
return (
gr.update(value=plot_data, label="Buzz Positions on Sample Set"),
gr.update(value=df, label="Scores on Sample Set", visible=True),
gr.update(visible=False),
)
except Exception as e:
import traceback
logger.exception(f"Error evaluating tossups: {e.args}")
return (
gr.skip(),
gr.update(visible=False),
gr.update(visible=True, value=styled_error(f"Error: {str(e)}\n{traceback.format_exc()}")),
)
def submit_model(
self,
model_name: str,
description: str,
state_dict: TossupPipelineStateDict,
profile: gr.OAuthProfile = None,
):
"""Submit the model output."""
pipeline_state = TossupPipelineState(**state_dict)
return submit.submit_model(model_name, description, pipeline_state.workflow, "tossup", profile)
def _setup_event_listeners(self):
gr.on(
triggers=[self.app.load, self.qid_selector.change],
fn=self.get_new_question_html,
inputs=[self.qid_selector],
outputs=[self.question_display],
)
gr.on(
triggers=[self.app.load],
fn=self.get_pipeline_names,
outputs=[self.pipeline_selector],
)
pipeline_state = self.pipeline_interface.pipeline_state
pipeline_change = self.pipeline_interface.pipeline_change
self.load_btn.click(
fn=self.load_pipeline,
inputs=[self.pipeline_selector, pipeline_change],
outputs=[self.pipeline_selector, pipeline_state, pipeline_change, self.error_display],
)
self.pipeline_interface.add_triggers_for_pipeline_export([pipeline_state.change], pipeline_state)
self.run_btn.click(
self.pipeline_interface.validate_workflow,
inputs=[self.pipeline_interface.pipeline_state],
outputs=[],
).success(
self.single_run,
inputs=[
self.qid_selector,
self.pipeline_interface.pipeline_state,
self.early_stop_checkbox,
],
outputs=[
self.question_display,
self.output_state,
self.confidence_plot,
self.results_table,
self.model_outputs_display,
self.error_display,
],
)
self.eval_btn.click(
fn=self.evaluate,
inputs=[self.pipeline_interface.pipeline_state],
outputs=[self.confidence_plot, self.results_table, self.error_display],
)
self.submit_btn.click(
fn=self.submit_model,
inputs=[
self.model_name_input,
self.description_input,
self.pipeline_interface.pipeline_state,
],
outputs=[self.submit_status],
)