jayparmr's picture
Upload 118 files
19b3da3
raw
history blame
1.84 kB
from time import sleep
from typing import Optional
import requests
from internals.data.task import Task
from internals.util.config import get_environment
class Slack:
def __init__(self):
# self.webhook_url = "https://hooks.slack.com/services/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"
self.webhook_url = "https://hooks.slack.com/services/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"
self.error_webhook = "https://hooks.slack.com/services/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"
def send_alert(self, task: Task, args: Optional[dict]):
raw = task.get_raw().copy()
raw["environment"] = get_environment()
raw.pop("queue_name", None)
raw.pop("attempt", None)
raw.pop("timestamp", None)
raw.pop("task_id", None)
raw.pop("maskImageUrl", None)
if args is not None:
raw.update(args.items())
message = ""
for key, value in raw.items():
if value:
if type(value) == list:
message += f"*{key}*: {', '.join(value)}\n"
else:
message += f"*{key}*: {value}\n"
requests.post(
self.webhook_url,
headers={"Content-Type": "application/json"},
json={"text": message},
)
def error_alert(self, task: Task, e: Exception):
requests.post(
self.error_webhook,
headers={"Content-Type": "application/json"},
json={
"text": "Task failed:\n{} \n error is: \n {}".format(task.get_raw(), e)
},
)
def auto_send_alert(self, func):
def inner(*args, **kwargs):
rargs = func(*args, **kwargs)
self.send_alert(args[0], rargs)
return rargs
return inner