Spaces:
Running
Running
File size: 4,944 Bytes
c54a701 86d6248 c54a701 86d6248 c54a701 86d6248 aaf7e4c 073aa83 86d6248 073aa83 86d6248 073aa83 86d6248 073aa83 86d6248 073aa83 86d6248 073aa83 86d6248 073aa83 86d6248 073aa83 86d6248 073aa83 86d6248 073aa83 86d6248 073aa83 c54a701 86d6248 c54a701 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
import logging
import asyncio
import redis.asyncio as redis
import redis.exceptions
from datetime import datetime
from collections import defaultdict
from typing import Dict
logger = logging.getLogger(__name__)
class Analytics:
def __init__(self, redis_url: str, sync_interval: int = 60):
"""
Initializes the Analytics class with an async Redis connection and sync interval.
Parameters:
- redis_url: Redis connection URL (e.g., 'redis://localhost:6379/0')
- sync_interval: Interval in seconds for syncing with Redis.
"""
self.redis_client = redis.from_url(
redis_url,
decode_responses=True,
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True,
)
self.local_buffer = {
"access": defaultdict(
lambda: defaultdict(int)
), # {period: {model_id: access_count}}
"tokens": defaultdict(
lambda: defaultdict(int)
), # {period: {model_id: tokens_count}}
}
self.sync_interval = sync_interval
self.lock = asyncio.Lock() # Async lock for thread-safe updates
asyncio.create_task(self._start_sync_task())
def _get_period_keys(self) -> tuple:
"""
Returns keys for day, week, month, and year based on the current date.
"""
now = datetime.utcnow()
day_key = now.strftime("%Y-%m-%d")
week_key = f"{now.year}-W{now.strftime('%U')}"
month_key = now.strftime("%Y-%m")
year_key = now.strftime("%Y")
return day_key, week_key, month_key, year_key
async def access(self, model_id: str, tokens: int):
"""
Records an access and token usage for a specific model_id.
Parameters:
- model_id: The ID of the model being accessed.
- tokens: Number of tokens used in this access.
"""
day_key, week_key, month_key, year_key = self._get_period_keys()
async with self.lock:
# Increment access count
self.local_buffer["access"][day_key][model_id] += 1
self.local_buffer["access"][week_key][model_id] += 1
self.local_buffer["access"][month_key][model_id] += 1
self.local_buffer["access"][year_key][model_id] += 1
self.local_buffer["access"]["total"][model_id] += 1
# Increment token count
self.local_buffer["tokens"][day_key][model_id] += tokens
self.local_buffer["tokens"][week_key][model_id] += tokens
self.local_buffer["tokens"][month_key][model_id] += tokens
self.local_buffer["tokens"][year_key][model_id] += tokens
self.local_buffer["tokens"]["total"][model_id] += tokens
async def stats(self) -> Dict[str, Dict[str, Dict[str, int]]]:
"""
Returns statistics for all models from the local buffer.
Returns:
- A dictionary with access counts and token usage for each period.
"""
async with self.lock:
return {
"access": {
period: dict(models)
for period, models in self.local_buffer["access"].items()
},
"tokens": {
period: dict(models)
for period, models in self.local_buffer["tokens"].items()
},
}
async def _sync_to_redis(self):
"""
Synchronizes local buffer data with Redis.
"""
async with self.lock:
pipeline = self.redis_client.pipeline()
# Sync access counts
for period, models in self.local_buffer["access"].items():
for model_id, count in models.items():
redis_key = f"analytics:access:{period}"
pipeline.hincrby(redis_key, model_id, count)
# Sync token counts
for period, models in self.local_buffer["tokens"].items():
for model_id, count in models.items():
redis_key = f"analytics:tokens:{period}"
pipeline.hincrby(redis_key, model_id, count)
await pipeline.execute()
self.local_buffer["access"].clear() # Clear access buffer after sync
self.local_buffer["tokens"].clear() # Clear tokens buffer after sync
logger.info("Synced analytics data to Redis.")
async def _start_sync_task(self):
"""
Starts a background task that periodically syncs data to Redis.
"""
while True:
await asyncio.sleep(self.sync_interval)
try:
await self._sync_to_redis()
except redis.exceptions.ConnectionError as e:
logger.error("Redis connection error: %s", e)
await asyncio.sleep(5)
|