hamzabouajila commited on
Commit
2f1e30c
·
1 Parent(s): 34052ff

feat: enhance evaluation queue reliability and add stale job recovery

Browse files

- Add timeout mechanism for stalled RUNNING evaluations
- Implement automatic reset of stale RUNNING jobs to PENDING status
- Increase sleep interval between evaluation cycles to 3 minutes
- Add pydantic dependency for better data validation
- Improve error handling and logging in submission process
- Update repository references to use TunisianLLMLeaderBoard
- Add debug logging for file uploads in submission flow
- Fix timezone handling for evaluation timestamps

app.py CHANGED
@@ -1,5 +1,5 @@
1
  from dotenv import load_dotenv
2
-
3
  load_dotenv()
4
 
5
  import gradio as gr
@@ -30,14 +30,12 @@ from src.display.utils import (
30
  from src.envs import API, EVAL_REQUESTS_PATH, EVAL_RESULTS_PATH, QUEUE_REPO, REPO_ID, RESULTS_REPO, TOKEN
31
  from src.populate import get_evaluation_queue_df, get_leaderboard_df
32
  from src.submission.submit import add_new_eval
33
- from src.evaluator.evaluate import process_evaluation_queue
34
- import threading
35
- import time
36
 
37
 
38
  def restart_space():
39
  try:
40
- API.restart_space(repo_id=REPO_ID)
 
41
  except Exception as e:
42
  print(f"Error restarting space: {str(e)}")
43
  try:
@@ -53,21 +51,6 @@ def restart_space():
53
 
54
 
55
 
56
-
57
-
58
- def run_evaluator():
59
- print("Starting evaluator service...")
60
- while True:
61
- try:
62
- process_evaluation_queue()
63
- print("Evaluation queue processed. Sleeping for 5 minutes...")
64
- time.sleep(10) # Sleep for 5 minutes
65
- except Exception as e:
66
- print(f"Error in evaluation process: {e}")
67
- print("Retrying in 5 minutes...")
68
- time.sleep(10)
69
-
70
-
71
  def init_leaderboard(dataframe):
72
  if dataframe is None:
73
  raise ValueError("Leaderboard DataFrame is empty or None.")
@@ -93,27 +76,14 @@ def init_leaderboard(dataframe):
93
 
94
 
95
 
96
- def evaluate_and_update(model_name, revision, precision, weight_type):
97
- """Add a model evaluation request to the queue"""
98
- try:
99
- add_new_eval(
100
- model_name=model_name,
101
- revision=revision,
102
- precision=precision,
103
- weight_type=weight_type,
104
- model_type="LLM",
105
- )
106
- get_leaderboard_df(EVAL_RESULTS_PATH, EVAL_REQUESTS_PATH, COLS, BENCHMARK_COLS)
107
- return "Evaluation request added to queue! Check the leaderboard for updates."
108
- except Exception as e:
109
- print(f"Error in evaluate_and_update: {str(e)}")
110
- print(f"Full traceback: {traceback.format_exc()}")
111
- return f"Error adding evaluation request: {str(e)}"
112
-
113
 
114
  ### Space initialisation
115
  try:
116
  print(f"\n=== Starting space initialization ===")
 
117
  print(f"EVAL_REQUESTS_PATH: {EVAL_REQUESTS_PATH}")
118
  print(f"EVAL_RESULTS_PATH: {EVAL_RESULTS_PATH}")
119
  print(f"QUEUE_REPO: {QUEUE_REPO}")
@@ -144,12 +114,8 @@ try:
144
  except Exception as e:
145
  print(f"\n=== Error during space initialization ===")
146
  print(f"Error: {str(e)}")
147
- restart_space()
148
-
149
-
150
 
151
- evaluator_thread = threading.Thread(target=run_evaluator, daemon=True)
152
- evaluator_thread.start()
153
 
154
  LEADERBOARD_DF = get_leaderboard_df(EVAL_RESULTS_PATH, EVAL_REQUESTS_PATH, COLS, BENCHMARK_COLS)
155
  finished_eval_queue_df, running_eval_queue_df, pending_eval_queue_df = get_evaluation_queue_df(EVAL_REQUESTS_PATH, EVAL_COLS)
@@ -270,6 +236,6 @@ with demo:
270
  )
271
 
272
  scheduler = BackgroundScheduler()
273
- scheduler.add_job(restart_space, "interval", seconds=1800)
274
  scheduler.start()
275
  demo.queue(default_concurrency_limit=40).launch()
 
1
  from dotenv import load_dotenv
2
+ import sys
3
  load_dotenv()
4
 
5
  import gradio as gr
 
30
  from src.envs import API, EVAL_REQUESTS_PATH, EVAL_RESULTS_PATH, QUEUE_REPO, REPO_ID, RESULTS_REPO, TOKEN
31
  from src.populate import get_evaluation_queue_df, get_leaderboard_df
32
  from src.submission.submit import add_new_eval
 
 
 
33
 
34
 
35
  def restart_space():
36
  try:
37
+ print("Restarting space...")
38
+ API.restart_space(repo_id=REPO_ID,token=TOKEN)
39
  except Exception as e:
40
  print(f"Error restarting space: {str(e)}")
41
  try:
 
51
 
52
 
53
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
54
  def init_leaderboard(dataframe):
55
  if dataframe is None:
56
  raise ValueError("Leaderboard DataFrame is empty or None.")
 
76
 
77
 
78
 
79
+ # API.delete_files(repo_id=QUEUE_REPO, token=TOKEN,delete_patterns=["*"],commit_message="Clearing queue",repo_type="dataset")
80
+ # API.delete_files(repo_id=RESULTS_REPO, token=TOKEN,delete_patterns=["*"],commit_message="Clearing results",repo_type="dataset")
81
+ # sys.exit(0)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
82
 
83
  ### Space initialisation
84
  try:
85
  print(f"\n=== Starting space initialization ===")
86
+
87
  print(f"EVAL_REQUESTS_PATH: {EVAL_REQUESTS_PATH}")
88
  print(f"EVAL_RESULTS_PATH: {EVAL_RESULTS_PATH}")
89
  print(f"QUEUE_REPO: {QUEUE_REPO}")
 
114
  except Exception as e:
115
  print(f"\n=== Error during space initialization ===")
116
  print(f"Error: {str(e)}")
117
+ # restart_space()
 
 
118
 
 
 
119
 
120
  LEADERBOARD_DF = get_leaderboard_df(EVAL_RESULTS_PATH, EVAL_REQUESTS_PATH, COLS, BENCHMARK_COLS)
121
  finished_eval_queue_df, running_eval_queue_df, pending_eval_queue_df = get_evaluation_queue_df(EVAL_REQUESTS_PATH, EVAL_COLS)
 
236
  )
237
 
238
  scheduler = BackgroundScheduler()
239
+ scheduler.add_job(restart_space, "interval", seconds=300)
240
  scheduler.start()
241
  demo.queue(default_concurrency_limit=40).launch()
pyproject.toml CHANGED
@@ -17,6 +17,7 @@ dependencies = [
17
  "matplotlib>=3.10.3",
18
  "numpy>=2.3.1",
19
  "pandas>=2.3.0",
 
20
  "python-dateutil>=2.9.0.post0",
21
  "python-dotenv>=1.1.1",
22
  "scikit-learn>=1.7.0",
 
17
  "matplotlib>=3.10.3",
18
  "numpy>=2.3.1",
19
  "pandas>=2.3.0",
20
+ "pydantic>=2.11.7",
21
  "python-dateutil>=2.9.0.post0",
22
  "python-dotenv>=1.1.1",
23
  "scikit-learn>=1.7.0",
src/envs.py CHANGED
@@ -9,7 +9,7 @@ TOKEN = os.environ.get("HF_TOKEN") # A read/write token for your org
9
  OWNER = "hamzabouajila" # Change to your org - don't forget to create a results and request dataset, with the correct format!
10
  # ----------------------------------
11
 
12
- REPO_ID = f"{OWNER}/leaderboard"
13
  QUEUE_REPO = f"{OWNER}/requests"
14
  RESULTS_REPO = f"{OWNER}/results"
15
 
 
9
  OWNER = "hamzabouajila" # Change to your org - don't forget to create a results and request dataset, with the correct format!
10
  # ----------------------------------
11
 
12
+ REPO_ID = f"{OWNER}/TunisianLLMLeaderBoard"
13
  QUEUE_REPO = f"{OWNER}/requests"
14
  RESULTS_REPO = f"{OWNER}/results"
15
 
src/evaluator/evaluate.py CHANGED
@@ -1,19 +1,16 @@
1
  import json
2
  import os
3
- import time
4
- from typing import Dict, Any
5
  from dataclasses import dataclass
6
  from enum import Enum
7
- from datetime import datetime
8
  import torch
9
  from transformers import AutoModelForSequenceClassification, AutoTokenizer
10
  import traceback
11
 
12
-
13
- from src.envs import API, OWNER, EVAL_REQUESTS_PATH, EVAL_RESULTS_PATH, RESULTS_REPO, QUEUE_REPO
14
  from src.evaluator.tunisian_corpus_coverage import evaluate_tunisian_corpus_coverage
15
  from src.evaluator.tsac import evaluate_tsac_sentiment
16
- from huggingface_hub import snapshot_download
17
 
18
 
19
  class EvaluationStatus(Enum):
@@ -121,7 +118,34 @@ def evaluate_model(model_name: str, revision: str, precision: str, weight_type:
121
  error=error_msg
122
  )
123
 
 
 
 
 
 
 
 
 
124
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
125
  def process_evaluation_queue():
126
  """
127
  Processes all pending evaluations in the queue.
@@ -130,22 +154,6 @@ def process_evaluation_queue():
130
  """
131
  print(f"\n=== Starting evaluation queue processing ===")
132
  print(f"Current time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
133
-
134
- # --- NEW STEP: Download the latest queue from Hugging Face Hub ---
135
- try:
136
- print(f"Downloading evaluation requests from: {QUEUE_REPO}")
137
- snapshot_download(
138
- repo_id=QUEUE_REPO,
139
- repo_type="dataset",
140
- local_dir=EVAL_REQUESTS_PATH,
141
- local_dir_use_symlinks=False,
142
- token=API.token
143
- )
144
- print("Successfully downloaded evaluation queue.")
145
- except Exception as e:
146
- print(f"Error downloading evaluation queue: {str(e)}")
147
- print(f"Full traceback: {traceback.format_exc()}")
148
- return
149
 
150
  print(f"Looking for evaluation requests in: {EVAL_REQUESTS_PATH}")
151
 
@@ -163,7 +171,8 @@ def process_evaluation_queue():
163
  with open(file_path, 'r') as f:
164
  eval_entry = json.load(f)
165
 
166
- status = eval_entry.get('status', '')
 
167
 
168
  if status == EvaluationStatus.PENDING.value:
169
  print(f"Found pending evaluation for model: {eval_entry['model']}")
@@ -194,6 +203,7 @@ def process_evaluation_queue():
194
  precision=eval_entry['precision'],
195
  weight_type=eval_entry['weight_type']
196
  )
 
197
  print("\n=== Evaluation completed ===")
198
 
199
  # --- Step 3: Update file with final status and results locally ---
@@ -238,7 +248,9 @@ def process_evaluation_queue():
238
  print(f"Final status for {eval_entry['model']} updated in the queue repository.")
239
  except Exception as status_update_error:
240
  print(f"Error updating status in queue: {str(status_update_error)}")
241
-
 
 
242
  else:
243
  print(f"Skipping file with status: {status}")
244
  except Exception as e:
 
1
  import json
2
  import os
3
+ from datetime import datetime,timedelta,timezone
4
+ from typing import Dict
5
  from dataclasses import dataclass
6
  from enum import Enum
 
7
  import torch
8
  from transformers import AutoModelForSequenceClassification, AutoTokenizer
9
  import traceback
10
 
11
+ from src.envs import API, OWNER, EVAL_REQUESTS_PATH, EVAL_RESULTS_PATH, RESULTS_REPO, QUEUE_REPO,TOKEN
 
12
  from src.evaluator.tunisian_corpus_coverage import evaluate_tunisian_corpus_coverage
13
  from src.evaluator.tsac import evaluate_tsac_sentiment
 
14
 
15
 
16
  class EvaluationStatus(Enum):
 
118
  error=error_msg
119
  )
120
 
121
+ def reset_stale_running_eval(eval_entry,root ,file_path ,filename ,timeout_interval=10):
122
+ submission = eval_entry.get("submitted_time")
123
+ try:
124
+ started = datetime.fromisoformat(submission) # aware datetime
125
+ except Exception as e:
126
+ print("Invalid submitted_time format:", submission, e)
127
+
128
+ now_utc = datetime.now(timezone.utc)
129
 
130
+ if now_utc - started > timedelta(seconds=timeout_interval):
131
+ print(f"Timeout detected — resetting {eval_entry['model']} to PENDING")
132
+ eval_entry["status"] = EvaluationStatus.PENDING.value
133
+ eval_entry["submitted_time"] = now_utc.isoformat()
134
+ with open(file_path, 'w') as f:
135
+ json.dump(eval_entry, f, indent=2)
136
+ API.upload_file(
137
+ path_or_fileobj=file_path,
138
+ path_in_repo=os.path.join(os.path.basename(root), filename),
139
+ repo_id=QUEUE_REPO,
140
+ repo_type="dataset",
141
+ commit_message=f"Update status to PENDING for {eval_entry['model']} (timeout)",
142
+ token=TOKEN
143
+ )
144
+ return
145
+
146
+
147
+
148
+
149
  def process_evaluation_queue():
150
  """
151
  Processes all pending evaluations in the queue.
 
154
  """
155
  print(f"\n=== Starting evaluation queue processing ===")
156
  print(f"Current time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
157
 
158
  print(f"Looking for evaluation requests in: {EVAL_REQUESTS_PATH}")
159
 
 
171
  with open(file_path, 'r') as f:
172
  eval_entry = json.load(f)
173
 
174
+ status = eval_entry.get('status', '')
175
+
176
 
177
  if status == EvaluationStatus.PENDING.value:
178
  print(f"Found pending evaluation for model: {eval_entry['model']}")
 
203
  precision=eval_entry['precision'],
204
  weight_type=eval_entry['weight_type']
205
  )
206
+
207
  print("\n=== Evaluation completed ===")
208
 
209
  # --- Step 3: Update file with final status and results locally ---
 
248
  print(f"Final status for {eval_entry['model']} updated in the queue repository.")
249
  except Exception as status_update_error:
250
  print(f"Error updating status in queue: {str(status_update_error)}")
251
+ elif status == EvaluationStatus.RUNNING.value:
252
+ print("Found Running evaluation for model: ", eval_entry['model'])
253
+ reset_stale_running_eval(eval_entry, root, file_path, filename)
254
  else:
255
  print(f"Skipping file with status: {status}")
256
  except Exception as e:
src/evaluator/run_evaluator.py CHANGED
@@ -16,12 +16,12 @@ def main():
16
  while True:
17
  try:
18
  process_evaluation_queue()
19
- print("Evaluation queue processed. Sleeping for 5 minutes...")
20
- time.sleep(20) # Sleep for 5 minutes
21
  except Exception as e:
22
  print(f"Error in evaluation process: {e}")
23
- print("Retrying in 5 minutes...")
24
- time.sleep(20)
25
 
26
  if __name__ == "__main__":
27
  main()
 
16
  while True:
17
  try:
18
  process_evaluation_queue()
19
+ print("Evaluation queue processed. Sleeping for 3 minutes...")
20
+ time.sleep(180) # Sleep for 3 minutes
21
  except Exception as e:
22
  print(f"Error in evaluation process: {e}")
23
+ print("Retrying in 3 minutes...")
24
+ time.sleep(180)
25
 
26
  if __name__ == "__main__":
27
  main()
src/submission/submit.py CHANGED
@@ -61,11 +61,14 @@ def _create_eval_request(
61
  # Use a try-finally block to ensure the local file is always removed
62
  try:
63
  with open(local_path, 'w') as f:
 
64
  json.dump(request_data, f, indent=2)
65
 
66
  # Upload the request file to the Hugging Face queue repository
67
  print(f"Uploading evaluation request to {QUEUE_REPO}")
68
  path_in_repo = os.path.join(user_name, request_filename)
 
 
69
  API.upload_file(
70
  path_or_fileobj=local_path,
71
  path_in_repo=path_in_repo,
@@ -127,6 +130,7 @@ def add_new_eval(model: str, base_model: str, revision: str, precision: str, wei
127
  print(f"Error reading queue file: {e}")
128
  print(f"Full traceback:\n{traceback.format_exc()}")
129
  return styled_warning("Error checking model status. Please try again later.")
 
130
  print(f"No existing submission found for key: {model_key} or previous submission had a FAILED status.")
131
 
132
  # --- Step 2: Validate model type and existence on the Hub ---
 
61
  # Use a try-finally block to ensure the local file is always removed
62
  try:
63
  with open(local_path, 'w') as f:
64
+ print(request_data)
65
  json.dump(request_data, f, indent=2)
66
 
67
  # Upload the request file to the Hugging Face queue repository
68
  print(f"Uploading evaluation request to {QUEUE_REPO}")
69
  path_in_repo = os.path.join(user_name, request_filename)
70
+ print(path_in_repo)
71
+ print(local_path)
72
  API.upload_file(
73
  path_or_fileobj=local_path,
74
  path_in_repo=path_in_repo,
 
130
  print(f"Error reading queue file: {e}")
131
  print(f"Full traceback:\n{traceback.format_exc()}")
132
  return styled_warning("Error checking model status. Please try again later.")
133
+
134
  print(f"No existing submission found for key: {model_key} or previous submission had a FAILED status.")
135
 
136
  # --- Step 2: Validate model type and existence on the Hub ---