Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
import io | |
import os | |
import re | |
import pathlib | |
import shutil | |
from loguru import logger | |
import subprocess | |
from typing import List | |
UPLOAD_DIRECTORY = pathlib.Path("/app/uploaded_files") | |
CONFIG_PATH = pathlib.Path("/app/yourbench_config.yml") | |
# Ensure the upload directory exists | |
UPLOAD_DIRECTORY.mkdir(parents=True, exist_ok=True) | |
STAGES = [ | |
"ingestion", | |
"upload_ingest_to_hub", | |
"summarization", | |
"chunking", | |
"single_shot_question_generation", | |
"multi_hop_question_generation", | |
"answer_generation", | |
"judge_answers", | |
] | |
def save_files(files: List[pathlib.Path]) -> str: | |
"""Save uploaded files to the UPLOAD_DIRECTORY safely""" | |
saved_paths = [] | |
for file in files: | |
try: | |
source_path = pathlib.Path(file) | |
destination_path = UPLOAD_DIRECTORY / source_path.name | |
if not source_path.exists(): | |
print(f"File not found: {source_path}") | |
continue # Skip missing files | |
shutil.move(str(source_path), str(destination_path)) | |
saved_paths.append(str(destination_path)) | |
except Exception as e: | |
print(f"Error moving file {file}: {e}") | |
return ( | |
f"Files saved to: {', '.join(saved_paths)}" | |
if saved_paths | |
else "No files were saved" | |
) | |
class SubprocessManager: | |
def __init__(self, command): | |
self.command = command | |
self.process = None | |
self.output_stream = io.StringIO() | |
def start_process(self, custom_env: dict | None): | |
"""Start the subprocess.""" | |
if self.is_running(): | |
logger.info("Process is already running") | |
return | |
self.output_stream = io.StringIO() | |
self.process = subprocess.Popen( | |
self.command, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.STDOUT, # Combine stderr with stdout | |
text=True, | |
bufsize=1, | |
start_new_session=True, | |
env=custom_env, | |
) | |
os.set_blocking(self.process.stdout.fileno(), False) | |
logger.info("Started the process") | |
def read_and_get_output(self): | |
"""Read subprocess output, capture it, and return log and completed stages.""" | |
if self.process and self.process.stdout: | |
try: | |
while True: | |
line = self.process.stdout.readline() | |
if line: | |
self.output_stream.write(line) | |
else: | |
break | |
except BlockingIOError: | |
pass | |
current_output = self.output_stream.getvalue() | |
completed_stages = list( | |
set(re.findall(r"Successfully completed stage: (\w+)", current_output)) | |
) | |
return current_output, completed_stages | |
def stop_process(self): | |
"""Terminate the subprocess.""" | |
if not self.is_running(): | |
logger.info("Process is not running") | |
return | |
logger.info("Sending SIGTERM to the Process") | |
self.process.terminate() | |
exit_code = self.process.wait() # Wait for process to terminate | |
logger.info(f"Process stopped exit code {exit_code}") | |
# return exit_code | |
def kill_process(self): | |
"""Forcefully kill the subprocess""" | |
if not self.is_running(): | |
logger.info("Process is not running") | |
return | |
logger.info("Sending SIGKILL to the Process") | |
self.process.kill() | |
exit_code = self.process.wait() # Wait for process to be killed | |
logger.info(f"Process killed exit code {exit_code}") | |
# return exit_code | |
def is_running(self): | |
"""Check if the subprocess is still running""" | |
return self.process and self.process.poll() is None | |