File size: 3,790 Bytes
5289522
 
089a447
6454c0e
5289522
 
 
7ccf9d4
6454c0e
 
 
5289522
7ccf9d4
 
bae4131
089a447
 
 
 
 
 
 
 
 
 
 
 
7ccf9d4
 
 
5289522
7ccf9d4
 
 
 
5289522
7ccf9d4
 
 
 
 
 
 
 
 
 
089a447
 
 
 
 
 
5289522
 
 
 
 
 
 
bae4131
5289522
 
 
 
 
 
 
 
 
 
 
 
 
089a447
5289522
 
 
 
 
089a447
5289522
 
 
 
 
089a447
5289522
 
 
 
089a447
 
 
 
 
 
5289522
 
 
 
 
 
 
 
089a447
5289522
089a447
5289522
 
7ccf9d4
5289522
 
 
 
 
089a447
5289522
089a447
5289522
 
7ccf9d4
5289522
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import io
import os
import re
import pathlib
import shutil
from loguru import logger
import subprocess
from typing import List

UPLOAD_DIRECTORY = pathlib.Path("/app/uploaded_files")
CONFIG_PATH = pathlib.Path("/app/yourbench_config.yml")

# Ensure the upload directory exists
UPLOAD_DIRECTORY.mkdir(parents=True, exist_ok=True)

STAGES = [
    "ingestion",
    "upload_ingest_to_hub",
    "summarization",
    "chunking",
    "single_shot_question_generation",
    "multi_hop_question_generation",
    "answer_generation",
    "judge_answers",
]


def save_files(files: List[pathlib.Path]) -> str:
    """Save uploaded files to the UPLOAD_DIRECTORY safely"""
    saved_paths = []

    for file in files:
        try:
            source_path = pathlib.Path(file)
            destination_path = UPLOAD_DIRECTORY / source_path.name

            if not source_path.exists():
                print(f"File not found: {source_path}")
                continue  # Skip missing files

            shutil.move(str(source_path), str(destination_path))
            saved_paths.append(str(destination_path))

        except Exception as e:
            print(f"Error moving file {file}: {e}")

    return (
        f"Files saved to: {', '.join(saved_paths)}"
        if saved_paths
        else "No files were saved"
    )


class SubprocessManager:
    def __init__(self, command):
        self.command = command
        self.process = None
        self.output_stream = io.StringIO()

    def start_process(self, custom_env: dict | None):
        """Start the subprocess."""
        if self.is_running():
            logger.info("Process is already running")
            return

        self.output_stream = io.StringIO()
        self.process = subprocess.Popen(
            self.command,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,  # Combine stderr with stdout
            text=True,
            bufsize=1,
            start_new_session=True,
            env=custom_env,
        )
        os.set_blocking(self.process.stdout.fileno(), False)
        logger.info("Started the process")

    def read_and_get_output(self):
        """Read subprocess output, capture it, and return log and completed stages."""
        if self.process and self.process.stdout:
            try:
                while True:
                    line = self.process.stdout.readline()
                    if line:
                        self.output_stream.write(line)
                    else:
                        break
            except BlockingIOError:
                pass

        current_output = self.output_stream.getvalue()
        completed_stages = list(
            set(re.findall(r"Successfully completed stage: (\w+)", current_output))
        )
        return current_output, completed_stages

    def stop_process(self):
        """Terminate the subprocess."""
        if not self.is_running():
            logger.info("Process is not running")
            return
        logger.info("Sending SIGTERM to the Process")
        self.process.terminate()
        exit_code = self.process.wait()  # Wait for process to terminate
        logger.info(f"Process stopped exit code {exit_code}")
        # return exit_code

    def kill_process(self):
        """Forcefully kill the subprocess"""
        if not self.is_running():
            logger.info("Process is not running")
            return
        logger.info("Sending SIGKILL to the Process")
        self.process.kill()
        exit_code = self.process.wait()  # Wait for process to be killed
        logger.info(f"Process killed exit code {exit_code}")
        # return exit_code

    def is_running(self):
        """Check if the subprocess is still running"""
        return self.process and self.process.poll() is None