CM2000112 / internals /util /failure_hander.py
jayparmr's picture
Upload 118 files
19b3da3
raw
history blame
1.23 kB
import json
import os
from pathlib import Path
from internals.data.dataAccessor import updateSource
from internals.data.task import Task
from internals.util.config import set_configs_from_task
from internals.util.slack import Slack
class FailureHandler:
__task_path = Path.home() / ".cache" / "inference" / "task.json"
@staticmethod
def register():
path = FailureHandler.__task_path
path.parent.mkdir(parents=True, exist_ok=True)
if path.exists():
task = Task(json.loads(path.read_text()))
set_configs_from_task(task)
# Slack().error_alert(task, Exception("CATASTROPHIC FAILURE"))
updateSource(task.get_sourceId(), task.get_userId(), "FAILED")
os.remove(path)
@staticmethod
def clear(func):
def wrapper(*args, **kwargs):
result = func(*args, **kwargs)
if result is not None:
path = FailureHandler.__task_path
if path.exists():
os.remove(path)
return result
return wrapper
@staticmethod
def handle(task: Task):
path = FailureHandler.__task_path
path.write_text(json.dumps(task.get_raw()))