File size: 1,839 Bytes
19b3da3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
from time import sleep
from typing import Optional
import requests
from internals.data.task import Task
from internals.util.config import get_environment
class Slack:
def __init__(self):
# self.webhook_url = "https://hooks.slack.com/services/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"
self.webhook_url = "https://hooks.slack.com/services/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"
self.error_webhook = "https://hooks.slack.com/services/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"
def send_alert(self, task: Task, args: Optional[dict]):
raw = task.get_raw().copy()
raw["environment"] = get_environment()
raw.pop("queue_name", None)
raw.pop("attempt", None)
raw.pop("timestamp", None)
raw.pop("task_id", None)
raw.pop("maskImageUrl", None)
if args is not None:
raw.update(args.items())
message = ""
for key, value in raw.items():
if value:
if type(value) == list:
message += f"*{key}*: {', '.join(value)}\n"
else:
message += f"*{key}*: {value}\n"
requests.post(
self.webhook_url,
headers={"Content-Type": "application/json"},
json={"text": message},
)
def error_alert(self, task: Task, e: Exception):
requests.post(
self.error_webhook,
headers={"Content-Type": "application/json"},
json={
"text": "Task failed:\n{} \n error is: \n {}".format(task.get_raw(), e)
},
)
def auto_send_alert(self, func):
def inner(*args, **kwargs):
rargs = func(*args, **kwargs)
self.send_alert(args[0], rargs)
return rargs
return inner
|