import json import os from pathlib import Path from internals.data.dataAccessor import updateSource from internals.data.task import Task from internals.util.config import set_configs_from_task from internals.util.slack import Slack class FailureHandler: __task_path = Path.home() / ".cache" / "inference" / "task.json" @staticmethod def register(): path = FailureHandler.__task_path path.parent.mkdir(parents=True, exist_ok=True) if path.exists(): task = Task(json.loads(path.read_text())) set_configs_from_task(task) # Slack().error_alert(task, Exception("CATASTROPHIC FAILURE")) updateSource(task.get_sourceId(), task.get_userId(), "FAILED") os.remove(path) @staticmethod def clear(func): def wrapper(*args, **kwargs): result = func(*args, **kwargs) if result is not None: path = FailureHandler.__task_path if path.exists(): os.remove(path) return result return wrapper @staticmethod def handle(task: Task): path = FailureHandler.__task_path path.write_text(json.dumps(task.get_raw()))