AZILS's picture
Upload 120 files
e35e6bc verified
from __future__ import annotations
from aiohttp import ClientSession
from loguru import logger
from bot.analytics.types import AbstractAnalyticsLogger, BaseEvent
POSTHOG_ENDPOINT = "https://app.posthog.com"
class PosthogTelegramLogger(AbstractAnalyticsLogger):
def __init__(self, api_token: str, base_url: str = POSTHOG_ENDPOINT) -> None:
self._api_token: str = api_token
self._base_url: str = base_url
self._headers = {
"Authorization": "Bearer ${POSTHOG_PERSONAL_API_KEY}",
"Content-Type": "application/json",
"Accept": "*/*",
}
self._timeout = 15
async def _send_request(
self,
event: BaseEvent,
) -> dict:
url = f"{self._base_url}/api/event/?personal_api_key={self._api_token}"
params = dict(event)
async with (
ClientSession() as session,
session.post(
url,
headers=self._headers,
json=params,
timeout=self._timeout,
) as response,
):
json_response = await response.json(content_type="application/json")
logger.info("Send record to Posthog")
logger.info(f"{json_response=}")
return self._validate_response(json_response)
@staticmethod
def _validate_response(response: dict) -> dict:
"""Validate response."""
if not response.get("ok"):
name = response["error"]["name"]
code = response["error"]["code"]
logger.error(f"get error from cryptopay api | name: {name} | code: {code}")
msg = f"Error in CryptoPay API call | name: {name} | code: {code}"
raise ValueError(msg)
logger.info(f"got response | ok: {response['ok']} | result: {response['result']}")
return response
async def log_event(
self,
event: BaseEvent,
) -> None:
"""Use this method to sends event to Posthog."""
await self._send_request(event)