Alina Lozovskaya
Simplify Setup tab
7ccf9d4
raw
history blame
3.35 kB
import io
import os
import pathlib
import shutil
from loguru import logger
import subprocess
from typing import List
UPLOAD_DIRECTORY = pathlib.Path("/app/uploaded_files")
CONFIG_PATH = pathlib.Path("/app/yourbench_config.yml")
# Ensure the upload directory exists
UPLOAD_DIRECTORY.mkdir(parents=True, exist_ok=True)
def save_files(files: List[pathlib.Path]) -> str:
"""Save uploaded files to the UPLOAD_DIRECTORY safely"""
saved_paths = []
for file in files:
try:
source_path = pathlib.Path(file)
destination_path = UPLOAD_DIRECTORY / source_path.name
if not source_path.exists():
print(f"File not found: {source_path}")
continue # Skip missing files
shutil.move(str(source_path), str(destination_path))
saved_paths.append(str(destination_path))
except Exception as e:
print(f"Error moving file {file}: {e}")
return f"Files saved to: {', '.join(saved_paths)}" if saved_paths else "No files were saved"
class SubprocessManager:
def __init__(self, command):
self.command = command
self.process = None
self.output_stream = io.StringIO()
def start_process(self, custom_env: dict | None):
"""Start the subprocess."""
if self.is_running():
logger.info("Process is already running")
return
self.output_stream = io.StringIO()
self.process = subprocess.Popen(
self.command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # Combine stderr with stdout
text=True,
bufsize=1,
start_new_session=True,
env=custom_env
)
os.set_blocking(self.process.stdout.fileno(), False)
logger.info("Started the process")
def read_and_get_output(self):
"""Read available subprocess output and return the captured output"""
if self.process and self.process.stdout:
try:
while True:
line = self.process.stdout.readline()
if line:
self.output_stream.write(line) # Capture in StringIO
else:
break
except BlockingIOError:
pass
return self.output_stream.getvalue()
def stop_process(self):
"""Terminate the subprocess."""
if not self.is_running():
logger.info("Process is not running")
return
logger.info("Sending SIGTERM to the Process")
self.process.terminate()
exit_code = self.process.wait() # Wait for process to terminate
logger.info(f"Process stopped exit code {exit_code}")
#return exit_code
def kill_process(self):
"""Forcefully kill the subprocess"""
if not self.is_running():
logger.info("Process is not running")
return
logger.info("Sending SIGKILL to the Process")
self.process.kill()
exit_code = self.process.wait() # Wait for process to be killed
logger.info(f"Process killed exit code {exit_code}")
#return exit_code
def is_running(self):
"""Check if the subprocess is still running"""
return self.process and self.process.poll() is None