AZILS's picture
Upload 119 files
054900e verified
from __future__ import annotations
import orjson
from aiohttp import ClientSession, ClientTimeout
from loguru import logger
from bot.analytics.types import AbstractAnalyticsLogger, BaseEvent
AMPLITUDE_ENDPOINT = "https://api2.amplitude.com/2/httpapi"
class AmplitudeTelegramLogger(AbstractAnalyticsLogger):
def __init__(self, api_token: str, base_url: str = AMPLITUDE_ENDPOINT) -> None:
self._api_token: str = api_token
self._base_url: str = base_url
self._headers = {"Content-Type": "application/json", "Accept": "*/*"}
self._timeout = ClientTimeout(total=15)
self.SUCCESS_STATUS_CODE = 200
async def _send_request(
self,
event: BaseEvent,
) -> None:
"""Implementation of interaction with Amplitude API."""
data = {"api_key": self._api_token, "events": [event.to_dict()]}
async with (
ClientSession() as session,
session.post(
self._base_url,
headers=self._headers,
data=orjson.dumps(data),
timeout=self._timeout,
) as response,
):
json_response = await response.json(content_type="application/json")
self._validate_response(json_response)
def _validate_response(self, response: dict[str, str | int]) -> None:
"""Validate response."""
if response.get("code") != self.SUCCESS_STATUS_CODE:
error = response.get("error")
code = response.get("code")
logger.error(f"get error from amplitude api | error: {error} | code: {code}")
msg = f"Error in amplitude api call | error: {error} | code: {code}"
raise ValueError(msg)
logger.info(f"successfully send to Amplitude | server_upload_time: {response['server_upload_time']}")
async def log_event(
self,
event: BaseEvent,
) -> None:
"""Use this method to sends event to Amplitude."""
await self._send_request(event)