import io import os import re import pathlib import shutil import subprocess import gradio as gr import pandas as pd from collections import defaultdict from datasets import load_dataset from loguru import logger from typing import List, Union, Optional STAGES = [ "ingestion", "upload_ingest_to_hub", "summarization", "chunking", "single_shot_question_generation", "answer_generation", #"evaluate_models", #"create_leaderboard" # "judge_answers", # to uncomment when fixed ] def is_running_locally() -> bool: """ Returns True if Gradio is running locally, False if it's running in a Hugging Face Space. """ return os.getenv("SPACE_ID") is None # SPACE_ID is set in Hugging Face Spaces def save_files(oauth_token: gr.OAuthToken | None, session_state: gr.State, files: List[pathlib.Path]) -> str: """Save uploaded files to the UPLOAD_DIRECTORY/uuid safely""" if oauth_token is None and not is_running_locally(): gr.Warning('You need to log in to use this Space') return saved_paths = [] for file in [file.name for file in files]: try: source_path = pathlib.Path(file) upload_directory_uuid = pathlib.Path(f"/app/{session_state.value}/uploaded_files") # Ensure the upload directory exists upload_directory_uuid.mkdir(parents=True, exist_ok=True) destination_path = upload_directory_uuid / source_path.name if not source_path.exists(): print(f"File not found: {source_path}") continue # Skip missing files shutil.move(str(source_path), str(destination_path)) saved_paths.append(str(destination_path)) except Exception as e: print(f"Error moving file {file}: {e}") return ( f"Files saved to: {', '.join(saved_paths)}" if saved_paths else "No files were saved" ) def update_dataset(stages: list, hf_org: str, hf_prefix: str, oauth_token: gr.OAuthToken): """ Updates the dataset based on the provided stages and dataset configuration. """ ingestion_df = pd.DataFrame() summarization_df = pd.DataFrame() single_hop_df = pd.DataFrame() answers_df = pd.DataFrame() # Construct dataset name from config dataset_name = f"{hf_org}/{hf_prefix}" if "ingestion" in stages: # TODO: why is the key "ingested" and not "ingestion"? (does not match the other splits) ingestion_ds = load_dataset(dataset_name, name="ingested", split="train", streaming=True, token=oauth_token.token).select_columns("document_text") ingestion_df = pd.DataFrame([next(iter(ingestion_ds)) for _ in range(1)]) # only one row if "summarization" in stages: summarization_ds = load_dataset(dataset_name, name="summarization", split="train", streaming=True, token=oauth_token.token).select_columns(['raw_document_summary', 'document_summary', 'summarization_model']) summarization_df = pd.DataFrame([next(iter(summarization_ds)) for _ in range(1)]) if "single_shot_question_generation" in stages: single_hop_ds = load_dataset(dataset_name, name="single_shot_question_generation", split="train", streaming=True, token=oauth_token.token) single_hop_df = pd.DataFrame([next(iter(single_hop_ds)) for _ in range(5)]) if "answer_generation" in stages: answers_ds = load_dataset(dataset_name, name="answer_generation", split="train", streaming=True, token=oauth_token.token) answers_df = pd.DataFrame([next(iter(answers_ds)) for _ in range(5)]) return (ingestion_df, summarization_df, single_hop_df, answers_df) class SubprocessManagerGroup: """Instanciates one manager per user (should be used as a singleton class)""" def __init__(self): self.managers: dict[str, SubprocessManager] = {} @staticmethod def grab_uuid(uid: Union[str, gr.State]): """If a gradio session state is provided, we pull the uuid from its value - else we assume the str is the uuid""" if isinstance(uid, gr.State): uid = uid.value return uid def create(self, uid: Union[str, gr.State]): uid = SubprocessManagerGroup.grab_uuid(uid) self.managers[uid] = SubprocessManager(uid) def get(self, uid: Union[str, "gr.State"]) -> Optional["SubprocessManager"]: uid = SubprocessManagerGroup.grab_uuid(uid) return self.managers.get(uid) def remove(self, uid: Union[str, gr.State]): uid = SubprocessManagerGroup.grab_uuid(uid) if manager := self.managers.get(uid): manager.stop_process() del self.managers[uid] def start_process(self, uid: Union[str, gr.State], custom_env: dict | None): uid = SubprocessManagerGroup.grab_uuid(uid) self.managers[uid].start_process(custom_env=custom_env) def stop_process(self, uid: Union[str, gr.State]): uid = SubprocessManagerGroup.grab_uuid(uid) self.managers[uid].stop_process() def kill_process(self, uid: Union[str, gr.State]): uid = SubprocessManagerGroup.grab_uuid(uid) self.managers[uid].kill_process() def read_and_get_output(self, uid: Union[str, gr.State]): if uid is None: return "", [] uid = SubprocessManagerGroup.grab_uuid(uid) return self.managers[uid].read_and_get_output() def is_running(self, uid: Union[str, gr.State]) -> bool: uid = SubprocessManagerGroup.grab_uuid(uid) if manager := self.managers.get(uid): return manager.is_running() return False class SubprocessManager: def __init__(self, session_uid: str): self.session_uid = session_uid self.path = pathlib.Path(f"/app/{session_uid}") self.path.mkdir(parents=True, exist_ok=True) self.config_path = pathlib.Path(f"{self.path}/config.yml") self.command = ["uv", "run", "yourbench", f"--config", str(self.config_path)] self.process = None self.output_stream = io.StringIO() self.exit_code = None def start_process(self, custom_env: dict | None): """Start the subprocess.""" if self.is_running(): logger.info("Process is already running") return self.output_stream = io.StringIO() self.exit_code = None try: logger.info(f"Starting process with command: {' '.join(self.command)}") self.process = subprocess.Popen( self.command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, # Combine stderr with stdout text=True, bufsize=1, start_new_session=True, env=custom_env, ) os.set_blocking(self.process.stdout.fileno(), False) logger.info(f"Started process with PID: {self.process.pid}") except Exception as e: logger.error(f"Failed to start process: {str(e)}") return def read_and_get_output(self): """Read subprocess output, capture it, and return log and completed stages.""" current_output = "" completed_stages = [] if self.process and self.process.stdout: try: while True: line = self.process.stdout.readline() if line: self.output_stream.write(line) else: break except BlockingIOError: pass current_output = self.output_stream.getvalue() completed_stages = list(set(re.findall(r"Successfully completed stage: (\w+)", current_output))) return current_output, completed_stages def stop_process(self): """Terminate the subprocess.""" if not self.is_running(): logger.info("Process is not running") return logger.info("Sending SIGTERM to the Process") try: self.process.terminate() self.exit_code = self.process.wait(timeout=5) # Wait up to 5 seconds for process to terminate logger.info(f"Process terminated by user with exit code {self.exit_code}") except subprocess.TimeoutExpired: logger.warning("Process did not terminate within timeout, sending SIGKILL") self.kill_process() def kill_process(self): """Forcefully kill the subprocess""" if not self.is_running(): logger.info("Process is not running") return logger.info("Sending SIGKILL to the Process") try: self.process.kill() self.exit_code = self.process.wait(timeout=5) # Wait up to 5 seconds for process to be killed logger.info(f"Process killed by user with exit code {self.exit_code}") except subprocess.TimeoutExpired: logger.error("Process could not be killed within timeout") def is_running(self): """Check if the subprocess is still running""" if self.process is None: return False return self.process.poll() is None def get_exit_details(self): """Return exit code and reason if process has terminated""" if self.process is None: return None, "Process was never started" if self.is_running(): return None, "Process is still running" if not self.exit_code is None and self.exit_code != 0 : return self.exit_code, "Process exited abnormaly" return self.exit_code, "Process exited normaly" def __del__(self): """Stop the process when object is deleted""" if self.process: self.process.kill()