Clémentine
added evaluation + leaderboard generation + reorg of the viz
67741f2
raw
history blame
6.76 kB
import io
import os
import re
import pathlib
import shutil
import subprocess
import pandas as pd
from datasets import load_dataset, get_dataset_config_names
from loguru import logger
from typing import List
UPLOAD_DIRECTORY = pathlib.Path("/app/uploaded_files")
CONFIG_PATH = pathlib.Path("/app/yourbench_config.yml")
# Ensure the upload directory exists
UPLOAD_DIRECTORY.mkdir(parents=True, exist_ok=True)
STAGES = [
"ingestion",
"upload_ingest_to_hub",
"summarization",
"chunking",
"single_shot_question_generation",
"answer_generation",
#"evaluate_models",
#"create_leaderboard"
# "judge_answers", # to uncomment when fixed
]
def save_files(files: List[pathlib.Path]) -> str:
"""Save uploaded files to the UPLOAD_DIRECTORY safely"""
saved_paths = []
for file in files:
try:
source_path = pathlib.Path(file)
destination_path = UPLOAD_DIRECTORY / source_path.name
if not source_path.exists():
print(f"File not found: {source_path}")
continue # Skip missing files
shutil.move(str(source_path), str(destination_path))
saved_paths.append(str(destination_path))
except Exception as e:
print(f"Error moving file {file}: {e}")
return (
f"Files saved to: {', '.join(saved_paths)}"
if saved_paths
else "No files were saved"
)
def update_dataset(stages, hf_org, hf_prefix):
"""
Updates the dataset based on the provided stages and dataset configuration.
"""
ingestion_df = pd.DataFrame()
summarization_df = pd.DataFrame()
single_hop_df = pd.DataFrame()
answers_df = pd.DataFrame()
# Construct dataset name from config
dataset_name = f"{hf_org}/{hf_prefix}"
# TODO: add cache dir
# Will be able to group everything in one pass once the names get homogeneized
if "ingestion" in stages:
# TODO: why is the key "ingested" and not "ingestion"? (does not match the other splits)
ingestion_ds = load_dataset(dataset_name, name="ingested", split="train", streaming=True)
ingestion_df = pd.DataFrame([next(iter(ingestion_ds)) for _ in range(5)])
if "summarization" in stages:
summarization_ds = load_dataset(dataset_name, name="summarization", split="train", streaming=True)
summarization_df = pd.DataFrame([next(iter(summarization_ds)) for _ in range(5)])
if "single_shot_question_generation" in stages:
single_hop_ds = load_dataset(dataset_name, name="single_shot_question_generation", split="train", streaming=True)
single_hop_df = pd.DataFrame([next(iter(single_hop_ds)) for _ in range(5)])
if "answer_generation" in stages:
answers_ds = load_dataset(dataset_name, name="answer_generation", split="train", streaming=True)
answers_df = pd.DataFrame([next(iter(answers_ds)) for _ in range(5)])
return (ingestion_df, summarization_df, single_hop_df, answers_df)
class SubprocessManager:
def __init__(self, command):
self.command = command
self.process = None
self.output_stream = io.StringIO()
self.exit_code = None
def start_process(self, custom_env: dict | None):
"""Start the subprocess."""
if self.is_running():
logger.info("Process is already running")
return
self.output_stream = io.StringIO()
self.exit_code = None
try:
logger.info(f"Starting process with command: {' '.join(self.command)}")
self.process = subprocess.Popen(
self.command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # Combine stderr with stdout
text=True,
bufsize=1,
start_new_session=True,
env=custom_env,
)
os.set_blocking(self.process.stdout.fileno(), False)
logger.info(f"Started process with PID: {self.process.pid}")
except Exception as e:
logger.error(f"Failed to start process: {str(e)}")
return
def read_and_get_output(self):
"""Read subprocess output, capture it, and return log and completed stages."""
current_output = ""
completed_stages = []
if self.process and self.process.stdout:
try:
while True:
line = self.process.stdout.readline()
if line:
self.output_stream.write(line)
else:
break
except BlockingIOError:
pass
current_output = self.output_stream.getvalue()
completed_stages = list(set(re.findall(r"Successfully completed stage: (\w+)", current_output)))
return current_output, completed_stages
def stop_process(self):
"""Terminate the subprocess."""
if not self.is_running():
logger.info("Process is not running")
return
logger.info("Sending SIGTERM to the Process")
try:
self.process.terminate()
self.exit_code = self.process.wait(timeout=5) # Wait up to 5 seconds for process to terminate
logger.info(f"Process terminated by user with exit code {self.exit_code}")
except subprocess.TimeoutExpired:
logger.warning("Process did not terminate within timeout, sending SIGKILL")
self.kill_process()
def kill_process(self):
"""Forcefully kill the subprocess"""
if not self.is_running():
logger.info("Process is not running")
return
logger.info("Sending SIGKILL to the Process")
try:
self.process.kill()
self.exit_code = self.process.wait(timeout=5) # Wait up to 5 seconds for process to be killed
logger.info(f"Process killed by user with exit code {self.exit_code}")
except subprocess.TimeoutExpired:
logger.error("Process could not be killed within timeout")
def is_running(self):
"""Check if the subprocess is still running"""
if self.process is None:
return False
return self.process.poll() is None
def get_exit_details(self):
"""Return exit code and reason if process has terminated"""
if self.process is None:
return None, "Process was never started"
if self.is_running():
return None, "Process is still running"
if not self.exit_code is None and self.exit_code != 0 :
return self.exit_code, "Process exited abnormaly"
return self.exit_code, "Process exited normaly"