lamhieu commited on
Commit
b8fd9fb
·
1 Parent(s): b6b230c

chore: update something

Browse files
lightweight_embeddings/analytics.py CHANGED
@@ -6,27 +6,22 @@ from datetime import datetime
6
  from collections import defaultdict
7
  from typing import Dict
8
 
9
-
10
  logger = logging.getLogger(__name__)
11
 
12
-
13
  class Analytics:
14
- def __init__(self, redis_url: str, sync_interval: int = 60):
15
  """
16
  Initializes the Analytics class with an async Redis connection and sync interval.
17
 
18
  Parameters:
19
  - redis_url: Redis connection URL (e.g., 'redis://localhost:6379/0')
20
  - sync_interval: Interval in seconds for syncing with Redis.
 
21
  """
22
- self.redis_client = redis.from_url(
23
- redis_url,
24
- decode_responses=True,
25
- health_check_interval=10,
26
- socket_connect_timeout=5,
27
- retry_on_timeout=True,
28
- socket_keepalive=True,
29
- )
30
  self.local_buffer = {
31
  "access": defaultdict(
32
  lambda: defaultdict(int)
@@ -35,12 +30,24 @@ class Analytics:
35
  lambda: defaultdict(int)
36
  ), # {period: {model_id: tokens_count}}
37
  }
38
- self.sync_interval = sync_interval
39
  self.lock = asyncio.Lock() # Async lock for thread-safe updates
40
  asyncio.create_task(self._start_sync_task())
41
-
42
  logger.info("Initialized Analytics with Redis connection: %s", redis_url)
43
 
 
 
 
 
 
 
 
 
 
 
 
 
 
44
  def _get_period_keys(self) -> tuple:
45
  """
46
  Returns keys for day, week, month, and year based on the current date.
@@ -101,33 +108,93 @@ class Analytics:
101
  Synchronizes local buffer data with Redis.
102
  """
103
  async with self.lock:
104
- pipeline = self.redis_client.pipeline()
 
105
 
106
- # Sync access counts
107
- for period, models in self.local_buffer["access"].items():
108
- for model_id, count in models.items():
109
- redis_key = f"analytics:access:{period}"
110
- pipeline.hincrby(redis_key, model_id, count)
111
 
112
- # Sync token counts
113
- for period, models in self.local_buffer["tokens"].items():
114
- for model_id, count in models.items():
115
- redis_key = f"analytics:tokens:{period}"
116
- pipeline.hincrby(redis_key, model_id, count)
117
 
118
- pipeline.execute()
119
- self.local_buffer["access"].clear() # Clear access buffer after sync
120
- self.local_buffer["tokens"].clear() # Clear tokens buffer after sync
121
- logger.info("Synced analytics data to Redis.")
 
 
 
 
 
 
 
122
 
123
  async def _start_sync_task(self):
124
  """
125
  Starts a background task that periodically syncs data to Redis.
 
126
  """
 
 
127
  while True:
128
  await asyncio.sleep(self.sync_interval)
129
  try:
130
  await self._sync_to_redis()
 
131
  except redis.exceptions.ConnectionError as e:
132
  logger.error("Redis connection error: %s", e)
133
- await asyncio.sleep(5)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6
  from collections import defaultdict
7
  from typing import Dict
8
 
 
9
  logger = logging.getLogger(__name__)
10
 
 
11
  class Analytics:
12
+ def __init__(self, redis_url: str, sync_interval: int = 60, max_retries: int = 5):
13
  """
14
  Initializes the Analytics class with an async Redis connection and sync interval.
15
 
16
  Parameters:
17
  - redis_url: Redis connection URL (e.g., 'redis://localhost:6379/0')
18
  - sync_interval: Interval in seconds for syncing with Redis.
19
+ - max_retries: Maximum number of retries for reconnecting to Redis.
20
  """
21
+ self.redis_url = redis_url
22
+ self.sync_interval = sync_interval
23
+ self.max_retries = max_retries
24
+ self.redis_client = self._create_redis_client()
 
 
 
 
25
  self.local_buffer = {
26
  "access": defaultdict(
27
  lambda: defaultdict(int)
 
30
  lambda: defaultdict(int)
31
  ), # {period: {model_id: tokens_count}}
32
  }
 
33
  self.lock = asyncio.Lock() # Async lock for thread-safe updates
34
  asyncio.create_task(self._start_sync_task())
35
+
36
  logger.info("Initialized Analytics with Redis connection: %s", redis_url)
37
 
38
+ def _create_redis_client(self) -> redis.Redis:
39
+ """
40
+ Creates and returns a new Redis client.
41
+ """
42
+ return redis.from_url(
43
+ self.redis_url,
44
+ decode_responses=True,
45
+ health_check_interval=10,
46
+ socket_connect_timeout=5,
47
+ retry_on_timeout=True,
48
+ socket_keepalive=True,
49
+ )
50
+
51
  def _get_period_keys(self) -> tuple:
52
  """
53
  Returns keys for day, week, month, and year based on the current date.
 
108
  Synchronizes local buffer data with Redis.
109
  """
110
  async with self.lock:
111
+ try:
112
+ pipeline = self.redis_client.pipeline()
113
 
114
+ # Sync access counts
115
+ for period, models in self.local_buffer["access"].items():
116
+ for model_id, count in models.items():
117
+ redis_key = f"analytics:access:{period}"
118
+ pipeline.hincrby(redis_key, model_id, count)
119
 
120
+ # Sync token counts
121
+ for period, models in self.local_buffer["tokens"].items():
122
+ for model_id, count in models.items():
123
+ redis_key = f"analytics:tokens:{period}"
124
+ pipeline.hincrby(redis_key, model_id, count)
125
 
126
+ pipeline.execute()
127
+ self.local_buffer["access"].clear() # Clear access buffer after sync
128
+ self.local_buffer["tokens"].clear() # Clear tokens buffer after sync
129
+ logger.info("Synced analytics data to Redis.")
130
+
131
+ except redis.exceptions.ConnectionError as e:
132
+ logger.error("Redis connection error during sync: %s", e)
133
+ raise e
134
+ except Exception as e:
135
+ logger.error("Unexpected error during Redis sync: %s", e)
136
+ raise e
137
 
138
  async def _start_sync_task(self):
139
  """
140
  Starts a background task that periodically syncs data to Redis.
141
+ Implements retry logic with exponential backoff on connection failures.
142
  """
143
+ retry_delay = 1 # Initial retry delay in seconds
144
+
145
  while True:
146
  await asyncio.sleep(self.sync_interval)
147
  try:
148
  await self._sync_to_redis()
149
+ retry_delay = 1 # Reset retry delay after successful sync
150
  except redis.exceptions.ConnectionError as e:
151
  logger.error("Redis connection error: %s", e)
152
+ await self._handle_redis_reconnection()
153
+ except Exception as e:
154
+ logger.error("Error during sync: %s", e)
155
+ # Depending on the error, you might want to handle differently
156
+
157
+ async def _handle_redis_reconnection(self):
158
+ """
159
+ Handles Redis reconnection with exponential backoff.
160
+ """
161
+ retry_count = 0
162
+ delay = 1 # Start with 1 second delay
163
+
164
+ while retry_count < self.max_retries:
165
+ try:
166
+ logger.info("Attempting to reconnect to Redis (Attempt %d)...", retry_count + 1)
167
+ self.redis_client.close()
168
+ self.redis_client = self._create_redis_client()
169
+ # Optionally, perform a simple command to check connection
170
+ self.redis_client.ping()
171
+ logger.info("Successfully reconnected to Redis.")
172
+ return
173
+ except redis.exceptions.ConnectionError as e:
174
+ logger.error("Reconnection attempt %d failed: %s", retry_count + 1, e)
175
+ retry_count += 1
176
+ await asyncio.sleep(delay)
177
+ delay *= 2 # Exponential backoff
178
+
179
+ logger.critical("Max reconnection attempts reached. Unable to reconnect to Redis.")
180
+ # Depending on your application's requirements, you might choose to exit or keep retrying indefinitely
181
+ # For example, to keep retrying:
182
+ while True:
183
+ try:
184
+ logger.info("Retrying to reconnect to Redis...")
185
+ self.redis_client.close()
186
+ self.redis_client = self._create_redis_client()
187
+ self.redis_client.ping()
188
+ logger.info("Successfully reconnected to Redis.")
189
+ break
190
+ except redis.exceptions.ConnectionError as e:
191
+ logger.error("Reconnection attempt failed: %s", e)
192
+ await asyncio.sleep(delay)
193
+ delay = min(delay * 2, 60) # Cap the delay to 60 seconds
194
+
195
+ async def close(self):
196
+ """
197
+ Closes the Redis connection gracefully.
198
+ """
199
+ self.redis_client.close()
200
+ logger.info("Closed Redis connection.")
lightweight_embeddings/service.py CHANGED
@@ -155,7 +155,7 @@ class EmbeddingsService:
155
  """
156
 
157
  def __init__(self, config: Optional[ModelConfig] = None):
158
- self.lru_cache = LRUCache(maxsize=50_000) # Approximate for ~500MB usage
159
 
160
  self.device = "cuda" if torch.cuda.is_available() else "cpu"
161
  self.config = config or ModelConfig()
 
155
  """
156
 
157
  def __init__(self, config: Optional[ModelConfig] = None):
158
+ self.lru_cache = LRUCache(maxsize=10_000) # Approximate for ~100MB usage
159
 
160
  self.device = "cuda" if torch.cuda.is_available() else "cpu"
161
  self.config = config or ModelConfig()