|
from __future__ import annotations |
|
|
|
from aiohttp import ClientSession |
|
from loguru import logger |
|
|
|
from bot.analytics.types import AbstractAnalyticsLogger, BaseEvent |
|
|
|
POSTHOG_ENDPOINT = "https://app.posthog.com" |
|
|
|
|
|
class PosthogTelegramLogger(AbstractAnalyticsLogger): |
|
def __init__(self, api_token: str, base_url: str = POSTHOG_ENDPOINT) -> None: |
|
self._api_token: str = api_token |
|
self._base_url: str = base_url |
|
self._headers = { |
|
"Authorization": "Bearer ${POSTHOG_PERSONAL_API_KEY}", |
|
"Content-Type": "application/json", |
|
"Accept": "*/*", |
|
} |
|
self._timeout = 15 |
|
|
|
async def _send_request( |
|
self, |
|
event: BaseEvent, |
|
) -> dict: |
|
url = f"{self._base_url}/api/event/?personal_api_key={self._api_token}" |
|
params = dict(event) |
|
|
|
async with ( |
|
ClientSession() as session, |
|
session.post( |
|
url, |
|
headers=self._headers, |
|
json=params, |
|
timeout=self._timeout, |
|
) as response, |
|
): |
|
json_response = await response.json(content_type="application/json") |
|
|
|
logger.info("Send record to Posthog") |
|
logger.info(f"{json_response=}") |
|
|
|
return self._validate_response(json_response) |
|
|
|
@staticmethod |
|
def _validate_response(response: dict) -> dict: |
|
"""Validate response.""" |
|
if not response.get("ok"): |
|
name = response["error"]["name"] |
|
code = response["error"]["code"] |
|
|
|
logger.error(f"get error from cryptopay api | name: {name} | code: {code}") |
|
msg = f"Error in CryptoPay API call | name: {name} | code: {code}" |
|
raise ValueError(msg) |
|
|
|
logger.info(f"got response | ok: {response['ok']} | result: {response['result']}") |
|
return response |
|
|
|
async def log_event( |
|
self, |
|
event: BaseEvent, |
|
) -> None: |
|
"""Use this method to sends event to Posthog.""" |
|
await self._send_request(event) |
|
|