import io import os import re import pathlib import shutil from loguru import logger import subprocess from typing import List UPLOAD_DIRECTORY = pathlib.Path("/app/uploaded_files") CONFIG_PATH = pathlib.Path("/app/yourbench_config.yml") # Ensure the upload directory exists UPLOAD_DIRECTORY.mkdir(parents=True, exist_ok=True) STAGES = [ "ingestion", "upload_ingest_to_hub", "summarization", "chunking", "single_shot_question_generation", "multi_hop_question_generation", "answer_generation", "judge_answers", ] def save_files(files: List[pathlib.Path]) -> str: """Save uploaded files to the UPLOAD_DIRECTORY safely""" saved_paths = [] for file in files: try: source_path = pathlib.Path(file) destination_path = UPLOAD_DIRECTORY / source_path.name if not source_path.exists(): print(f"File not found: {source_path}") continue # Skip missing files shutil.move(str(source_path), str(destination_path)) saved_paths.append(str(destination_path)) except Exception as e: print(f"Error moving file {file}: {e}") return ( f"Files saved to: {', '.join(saved_paths)}" if saved_paths else "No files were saved" ) class SubprocessManager: def __init__(self, command): self.command = command self.process = None self.output_stream = io.StringIO() def start_process(self, custom_env: dict | None): """Start the subprocess.""" if self.is_running(): logger.info("Process is already running") return self.output_stream = io.StringIO() self.process = subprocess.Popen( self.command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, # Combine stderr with stdout text=True, bufsize=1, start_new_session=True, env=custom_env, ) os.set_blocking(self.process.stdout.fileno(), False) logger.info("Started the process") def read_and_get_output(self): """Read subprocess output, capture it, and return log and completed stages.""" if self.process and self.process.stdout: try: while True: line = self.process.stdout.readline() if line: self.output_stream.write(line) else: break except BlockingIOError: pass current_output = self.output_stream.getvalue() completed_stages = list( set(re.findall(r"Successfully completed stage: (\w+)", current_output)) ) return current_output, completed_stages def stop_process(self): """Terminate the subprocess.""" if not self.is_running(): logger.info("Process is not running") return logger.info("Sending SIGTERM to the Process") self.process.terminate() exit_code = self.process.wait() # Wait for process to terminate logger.info(f"Process stopped exit code {exit_code}") # return exit_code def kill_process(self): """Forcefully kill the subprocess""" if not self.is_running(): logger.info("Process is not running") return logger.info("Sending SIGKILL to the Process") self.process.kill() exit_code = self.process.wait() # Wait for process to be killed logger.info(f"Process killed exit code {exit_code}") # return exit_code def is_running(self): """Check if the subprocess is still running""" return self.process and self.process.poll() is None