File size: 2,032 Bytes
e35e6bc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
from __future__ import annotations
from aiohttp import ClientSession
from loguru import logger
from bot.analytics.types import AbstractAnalyticsLogger, BaseEvent
POSTHOG_ENDPOINT = "https://app.posthog.com"
class PosthogTelegramLogger(AbstractAnalyticsLogger):
def __init__(self, api_token: str, base_url: str = POSTHOG_ENDPOINT) -> None:
self._api_token: str = api_token
self._base_url: str = base_url
self._headers = {
"Authorization": "Bearer ${POSTHOG_PERSONAL_API_KEY}",
"Content-Type": "application/json",
"Accept": "*/*",
}
self._timeout = 15
async def _send_request(
self,
event: BaseEvent,
) -> dict:
url = f"{self._base_url}/api/event/?personal_api_key={self._api_token}"
params = dict(event)
async with (
ClientSession() as session,
session.post(
url,
headers=self._headers,
json=params,
timeout=self._timeout,
) as response,
):
json_response = await response.json(content_type="application/json")
logger.info("Send record to Posthog")
logger.info(f"{json_response=}")
return self._validate_response(json_response)
@staticmethod
def _validate_response(response: dict) -> dict:
"""Validate response."""
if not response.get("ok"):
name = response["error"]["name"]
code = response["error"]["code"]
logger.error(f"get error from cryptopay api | name: {name} | code: {code}")
msg = f"Error in CryptoPay API call | name: {name} | code: {code}"
raise ValueError(msg)
logger.info(f"got response | ok: {response['ok']} | result: {response['result']}")
return response
async def log_event(
self,
event: BaseEvent,
) -> None:
"""Use this method to sends event to Posthog."""
await self._send_request(event)
|