|
from __future__ import annotations |
|
|
|
import orjson |
|
from aiohttp import ClientSession, ClientTimeout |
|
from loguru import logger |
|
|
|
from bot.analytics.types import AbstractAnalyticsLogger, BaseEvent |
|
|
|
AMPLITUDE_ENDPOINT = "https://api2.amplitude.com/2/httpapi" |
|
|
|
|
|
class AmplitudeTelegramLogger(AbstractAnalyticsLogger): |
|
def __init__(self, api_token: str, base_url: str = AMPLITUDE_ENDPOINT) -> None: |
|
self._api_token: str = api_token |
|
self._base_url: str = base_url |
|
self._headers = {"Content-Type": "application/json", "Accept": "*/*"} |
|
self._timeout = ClientTimeout(total=15) |
|
self.SUCCESS_STATUS_CODE = 200 |
|
|
|
async def _send_request( |
|
self, |
|
event: BaseEvent, |
|
) -> None: |
|
"""Implementation of interaction with Amplitude API.""" |
|
data = {"api_key": self._api_token, "events": [event.to_dict()]} |
|
|
|
async with ( |
|
ClientSession() as session, |
|
session.post( |
|
self._base_url, |
|
headers=self._headers, |
|
data=orjson.dumps(data), |
|
timeout=self._timeout, |
|
) as response, |
|
): |
|
json_response = await response.json(content_type="application/json") |
|
|
|
self._validate_response(json_response) |
|
|
|
def _validate_response(self, response: dict[str, str | int]) -> None: |
|
"""Validate response.""" |
|
if response.get("code") != self.SUCCESS_STATUS_CODE: |
|
error = response.get("error") |
|
code = response.get("code") |
|
|
|
logger.error(f"get error from amplitude api | error: {error} | code: {code}") |
|
msg = f"Error in amplitude api call | error: {error} | code: {code}" |
|
raise ValueError(msg) |
|
|
|
logger.info(f"successfully send to Amplitude | server_upload_time: {response['server_upload_time']}") |
|
|
|
async def log_event( |
|
self, |
|
event: BaseEvent, |
|
) -> None: |
|
"""Use this method to sends event to Amplitude.""" |
|
await self._send_request(event) |
|
|