lamhieu commited on
Commit
f67ddbd
·
1 Parent(s): 4d45326

chore: update something

Browse files
lightweight_embeddings/analytics.py CHANGED
@@ -1,7 +1,6 @@
1
  import logging
2
  import asyncio
3
- import redis
4
- import redis.exceptions
5
  from datetime import datetime
6
  from collections import defaultdict
7
  from typing import Dict
@@ -11,21 +10,25 @@ logger = logging.getLogger(__name__)
11
 
12
 
13
  class Analytics:
14
- def __init__(self, redis_url: str, sync_interval: int = 60, max_retries: int = 5):
 
 
15
  """
16
- Initializes the Analytics class with a synchronous Redis client,
17
- wrapped in asynchronous methods by using run_in_executor.
18
 
19
  Parameters:
20
- - redis_url (str): Redis connection URL (e.g., 'redis://localhost:6379/0').
 
21
  - sync_interval (int): Interval in seconds for syncing with Redis.
22
  - max_retries (int): Maximum number of reconnection attempts to Redis.
23
  """
24
- self.redis_url = redis_url
 
25
  self.sync_interval = sync_interval
26
  self.max_retries = max_retries
27
 
28
- # Synchronous Redis client
29
  self.redis_client = self._create_redis_client()
30
 
31
  # Local buffer stores cumulative data for two-way sync
@@ -40,19 +43,13 @@ class Analytics:
40
  # Initialize data from Redis, then start the periodic sync loop
41
  asyncio.create_task(self._initialize())
42
 
43
- logger.info("Initialized Analytics with Redis connection: %s", redis_url)
44
 
45
- def _create_redis_client(self) -> redis.Redis:
46
  """
47
- Creates and returns a new synchronous Redis client.
48
  """
49
- return redis.from_url(
50
- self.redis_url,
51
- decode_responses=True,
52
- health_check_interval=10,
53
- socket_connect_timeout=5,
54
- socket_keepalive=True,
55
- )
56
 
57
  async def _initialize(self):
58
  """
@@ -61,9 +58,9 @@ class Analytics:
61
  """
62
  try:
63
  await self._sync_from_redis()
64
- logger.info("Initial sync from Redis to local buffer completed.")
65
  except Exception as e:
66
- logger.error("Error during initial sync from Redis: %s", e)
67
 
68
  # Launch the periodic sync task
69
  asyncio.create_task(self._start_sync_task())
@@ -122,7 +119,7 @@ class Analytics:
122
 
123
  async def _sync_from_redis(self):
124
  """
125
- Pulls existing analytics data from Redis into the local buffer.
126
  Uses run_in_executor to avoid blocking the event loop.
127
  """
128
  loop = asyncio.get_running_loop()
@@ -131,7 +128,15 @@ class Analytics:
131
  # Scan 'access' keys
132
  cursor = 0
133
  while True:
134
- cursor, keys = await loop.run_in_executor(
 
 
 
 
 
 
 
 
135
  None,
136
  partial(
137
  self.redis_client.scan,
@@ -140,6 +145,8 @@ class Analytics:
140
  count=100,
141
  ),
142
  )
 
 
143
  for key in keys:
144
  # key is "analytics:access:<period>"
145
  period = key.replace("analytics:access:", "")
@@ -148,13 +155,14 @@ class Analytics:
148
  )
149
  for model_id, count_str in data.items():
150
  self.local_buffer["access"][period][model_id] += int(count_str)
 
151
  if cursor == 0:
152
  break
153
 
154
  # Scan 'tokens' keys
155
  cursor = 0
156
  while True:
157
- cursor, keys = await loop.run_in_executor(
158
  None,
159
  partial(
160
  self.redis_client.scan,
@@ -163,6 +171,8 @@ class Analytics:
163
  count=100,
164
  ),
165
  )
 
 
166
  for key in keys:
167
  # key is "analytics:tokens:<period>"
168
  period = key.replace("analytics:tokens:", "")
@@ -171,62 +181,74 @@ class Analytics:
171
  )
172
  for model_id, count_str in data.items():
173
  self.local_buffer["tokens"][period][model_id] += int(count_str)
 
174
  if cursor == 0:
175
  break
176
 
177
  async def _sync_to_redis(self):
178
  """
179
- Pushes the local buffer data to Redis (local -> Redis).
180
- Uses a pipeline to minimize round trips and run_in_executor to avoid blocking.
181
  """
182
  loop = asyncio.get_running_loop()
183
 
184
  async with self.lock:
185
  try:
186
- pipeline = self.redis_client.pipeline(transaction=False)
187
-
188
- # Push 'access' data
189
  for period, models in self.local_buffer["access"].items():
190
  redis_key = f"analytics:access:{period}"
191
  for model_id, count in models.items():
192
- pipeline.hincrby(redis_key, model_id, count)
 
 
 
 
 
 
 
 
 
 
193
 
194
- # Push 'tokens' data
195
  for period, models in self.local_buffer["tokens"].items():
196
  redis_key = f"analytics:tokens:{period}"
197
  for model_id, count in models.items():
198
- pipeline.hincrby(redis_key, model_id, count)
199
-
200
- # Execute the pipeline in a separate thread
201
- await loop.run_in_executor(None, pipeline.execute)
202
-
203
- logger.info("Analytics data successfully synced to Redis.")
204
- except redis.exceptions.ConnectionError as e:
205
- logger.error("Redis connection error during sync: %s", e)
206
- raise e
 
 
 
207
  except Exception as e:
208
- logger.error("Unexpected error during Redis sync: %s", e)
209
  raise e
210
 
211
  async def _start_sync_task(self):
212
  """
213
  Periodically runs _sync_to_redis at a configurable interval.
214
- Also handles reconnections on ConnectionError.
 
215
  """
216
  while True:
217
  await asyncio.sleep(self.sync_interval)
218
  try:
219
  await self._sync_to_redis()
220
- except redis.exceptions.ConnectionError as e:
221
- logger.error("Redis connection error during scheduled sync: %s", e)
222
- await self._handle_redis_reconnection()
223
  except Exception as e:
224
- logger.error("Error during scheduled sync: %s", e)
225
- # Handle other errors as appropriate
 
226
 
227
  async def _handle_redis_reconnection(self):
228
  """
229
- Attempts to reconnect to Redis using exponential backoff.
 
 
230
  """
231
  loop = asyncio.get_running_loop()
232
  retry_count = 0
@@ -235,44 +257,46 @@ class Analytics:
235
  while retry_count < self.max_retries:
236
  try:
237
  logger.info(
238
- "Attempting to reconnect to Redis (attempt %d)...", retry_count + 1
 
239
  )
240
- # Close existing connection
241
  await loop.run_in_executor(None, self.redis_client.close)
242
- # Create a new client
243
  self.redis_client = self._create_redis_client()
244
- # Test the new connection
245
- await loop.run_in_executor(None, self.redis_client.ping)
246
- logger.info("Successfully reconnected to Redis.")
247
  return
248
- except redis.exceptions.ConnectionError as e:
249
  logger.error("Reconnection attempt %d failed: %s", retry_count + 1, e)
250
  retry_count += 1
251
  await asyncio.sleep(delay)
252
  delay *= 2 # exponential backoff
253
 
254
  logger.critical(
255
- "Max reconnection attempts reached. Unable to reconnect to Redis."
256
  )
257
 
258
- # Optional: Keep retrying indefinitely instead of giving up.
259
  # while True:
260
  # try:
261
- # logger.info("Retrying to reconnect to Redis...")
262
  # await loop.run_in_executor(None, self.redis_client.close)
263
  # self.redis_client = self._create_redis_client()
264
- # await loop.run_in_executor(None, self.redis_client.ping)
265
- # logger.info("Reconnected to Redis after extended retries.")
 
266
  # break
267
- # except redis.exceptions.ConnectionError as e:
268
  # logger.error("Extended reconnection attempt failed: %s", e)
269
  # await asyncio.sleep(delay)
270
- # delay = min(delay * 2, 60) # Cap at 60 seconds or choose your own max
271
 
272
  async def close(self):
273
  """
274
- Closes the Redis client connection. Still wrapped in an async method to avoid blocking.
 
275
  """
276
  loop = asyncio.get_running_loop()
277
  await loop.run_in_executor(None, self.redis_client.close)
278
- logger.info("Closed Redis connection.")
 
1
  import logging
2
  import asyncio
3
+ from upstash_redis import Redis as UpstashRedis
 
4
  from datetime import datetime
5
  from collections import defaultdict
6
  from typing import Dict
 
10
 
11
 
12
  class Analytics:
13
+ def __init__(
14
+ self, url: str, token: str, sync_interval: int = 60, max_retries: int = 5
15
+ ):
16
  """
17
+ Initializes the Analytics class with an Upstash Redis client (HTTP-based),
18
+ wrapped in async methods by using run_in_executor.
19
 
20
  Parameters:
21
+ - url (str): Upstash Redis REST URL.
22
+ - token (str): Upstash Redis token.
23
  - sync_interval (int): Interval in seconds for syncing with Redis.
24
  - max_retries (int): Maximum number of reconnection attempts to Redis.
25
  """
26
+ self.url = url
27
+ self.token = token
28
  self.sync_interval = sync_interval
29
  self.max_retries = max_retries
30
 
31
+ # Upstash Redis client (synchronous over HTTP)
32
  self.redis_client = self._create_redis_client()
33
 
34
  # Local buffer stores cumulative data for two-way sync
 
43
  # Initialize data from Redis, then start the periodic sync loop
44
  asyncio.create_task(self._initialize())
45
 
46
+ logger.info("Initialized Analytics with Upstash Redis: %s", url)
47
 
48
+ def _create_redis_client(self) -> UpstashRedis:
49
  """
50
+ Creates and returns a new Upstash Redis (synchronous) client.
51
  """
52
+ return UpstashRedis(url=self.url, token=self.token)
 
 
 
 
 
 
53
 
54
  async def _initialize(self):
55
  """
 
58
  """
59
  try:
60
  await self._sync_from_redis()
61
+ logger.info("Initial sync from Upstash Redis to local buffer completed.")
62
  except Exception as e:
63
+ logger.error("Error during initial sync from Upstash Redis: %s", e)
64
 
65
  # Launch the periodic sync task
66
  asyncio.create_task(self._start_sync_task())
 
119
 
120
  async def _sync_from_redis(self):
121
  """
122
+ Pulls existing analytics data from Upstash Redis into the local buffer.
123
  Uses run_in_executor to avoid blocking the event loop.
124
  """
125
  loop = asyncio.get_running_loop()
 
128
  # Scan 'access' keys
129
  cursor = 0
130
  while True:
131
+ # Upstash doesn't provide a typical 'SCAN' the same way as standard Redis?
132
+ # We'll mimic it by searching for keys, or we can store a list of known periods if needed.
133
+ # If you only store certain known patterns, adapt accordingly.
134
+ # For demonstration, we do a naive approach, or assume we have a method that lists keys.
135
+ # Upstash doesn't always support the standard SCAN. We might store known keys in a set in Redis.
136
+
137
+ # If Upstash doesn't support SCAN at all, you need another approach (like maintaining a separate index).
138
+ # For now, let's assume it can handle the SCAN command similarly:
139
+ scan_result = await loop.run_in_executor(
140
  None,
141
  partial(
142
  self.redis_client.scan,
 
145
  count=100,
146
  ),
147
  )
148
+ cursor, keys = scan_result[0], scan_result[1]
149
+
150
  for key in keys:
151
  # key is "analytics:access:<period>"
152
  period = key.replace("analytics:access:", "")
 
155
  )
156
  for model_id, count_str in data.items():
157
  self.local_buffer["access"][period][model_id] += int(count_str)
158
+
159
  if cursor == 0:
160
  break
161
 
162
  # Scan 'tokens' keys
163
  cursor = 0
164
  while True:
165
+ scan_result = await loop.run_in_executor(
166
  None,
167
  partial(
168
  self.redis_client.scan,
 
171
  count=100,
172
  ),
173
  )
174
+ cursor, keys = scan_result[0], scan_result[1]
175
+
176
  for key in keys:
177
  # key is "analytics:tokens:<period>"
178
  period = key.replace("analytics:tokens:", "")
 
181
  )
182
  for model_id, count_str in data.items():
183
  self.local_buffer["tokens"][period][model_id] += int(count_str)
184
+
185
  if cursor == 0:
186
  break
187
 
188
  async def _sync_to_redis(self):
189
  """
190
+ Pushes the local buffer data to Upstash Redis (local -> Redis).
191
+ Since Upstash does not support pipelining, we increment each field individually.
192
  """
193
  loop = asyncio.get_running_loop()
194
 
195
  async with self.lock:
196
  try:
197
+ # For each (period, model_id, count), call hincrby
 
 
198
  for period, models in self.local_buffer["access"].items():
199
  redis_key = f"analytics:access:{period}"
200
  for model_id, count in models.items():
201
+ if count != 0:
202
+ # hincrby(key, field, amount)
203
+ await loop.run_in_executor(
204
+ None,
205
+ partial(
206
+ self.redis_client.hincrby,
207
+ redis_key,
208
+ model_id,
209
+ count,
210
+ ),
211
+ )
212
 
 
213
  for period, models in self.local_buffer["tokens"].items():
214
  redis_key = f"analytics:tokens:{period}"
215
  for model_id, count in models.items():
216
+ if count != 0:
217
+ await loop.run_in_executor(
218
+ None,
219
+ partial(
220
+ self.redis_client.hincrby,
221
+ redis_key,
222
+ model_id,
223
+ count,
224
+ ),
225
+ )
226
+
227
+ logger.info("Analytics data successfully synced to Upstash Redis.")
228
  except Exception as e:
229
+ logger.error("Unexpected error during Upstash Redis sync: %s", e)
230
  raise e
231
 
232
  async def _start_sync_task(self):
233
  """
234
  Periodically runs _sync_to_redis at a configurable interval.
235
+ Also attempts reconnection on any errors (though Upstash typically won't
236
+ behave exactly like a persistent TCP connection).
237
  """
238
  while True:
239
  await asyncio.sleep(self.sync_interval)
240
  try:
241
  await self._sync_to_redis()
 
 
 
242
  except Exception as e:
243
+ # Upstash might fail differently than standard Redis if there's a network issue
244
+ logger.error("Error during scheduled sync to Upstash Redis: %s", e)
245
+ await self._handle_redis_reconnection()
246
 
247
  async def _handle_redis_reconnection(self):
248
  """
249
+ Attempts to 'reconnect' to Upstash Redis.
250
+ Because Upstash uses HTTP, it's often stateless and doesn't require
251
+ the same approach as standard Redis. We simply recreate the client if needed.
252
  """
253
  loop = asyncio.get_running_loop()
254
  retry_count = 0
 
257
  while retry_count < self.max_retries:
258
  try:
259
  logger.info(
260
+ "Attempting to reconnect to Upstash Redis (attempt %d)...",
261
+ retry_count + 1,
262
  )
263
+ # Recreate the client
264
  await loop.run_in_executor(None, self.redis_client.close)
 
265
  self.redis_client = self._create_redis_client()
266
+ # Upstash doesn't necessarily have a direct 'PING' command, so optional:
267
+ # If you want to test, you could do e.g. redis_client.get("some_known_key") as a check
268
+ logger.info("Successfully reconnected to Upstash Redis.")
269
  return
270
+ except Exception as e:
271
  logger.error("Reconnection attempt %d failed: %s", retry_count + 1, e)
272
  retry_count += 1
273
  await asyncio.sleep(delay)
274
  delay *= 2 # exponential backoff
275
 
276
  logger.critical(
277
+ "Max reconnection attempts reached. Unable to reconnect to Upstash Redis."
278
  )
279
 
280
+ # Optionally, keep trying indefinitely
281
  # while True:
282
  # try:
283
+ # logger.info("Retrying to reconnect to Upstash Redis...")
284
  # await loop.run_in_executor(None, self.redis_client.close)
285
  # self.redis_client = self._create_redis_client()
286
+ # logger.info(
287
+ # "Successfully reconnected to Upstash Redis after extended retry."
288
+ # )
289
  # break
290
+ # except Exception as e:
291
  # logger.error("Extended reconnection attempt failed: %s", e)
292
  # await asyncio.sleep(delay)
293
+ # delay = min(delay * 2, 60) # cap at 60s or choose another max
294
 
295
  async def close(self):
296
  """
297
+ Closes the Upstash Redis client (although Upstash uses stateless HTTP).
298
+ Still wrapped in async to avoid blocking the event loop.
299
  """
300
  loop = asyncio.get_running_loop()
301
  await loop.run_in_executor(None, self.redis_client.close)
302
+ logger.info("Closed Upstash Redis client.")
lightweight_embeddings/router.py CHANGED
@@ -145,7 +145,9 @@ service_config = ModelConfig()
145
  embeddings_service = EmbeddingsService(config=service_config)
146
 
147
  analytics = Analytics(
148
- redis_url=os.environ.get("REDIS_URL", "redis://localhost:6379/0"), sync_interval=60
 
 
149
  )
150
 
151
 
 
145
  embeddings_service = EmbeddingsService(config=service_config)
146
 
147
  analytics = Analytics(
148
+ url=os.environ.get("REDIS_URL", "redis://localhost:6379/0"),
149
+ token=os.environ.get("REDIS_TOKEN", "***"),
150
+ sync_interval=60,
151
  )
152
 
153
 
requirements.txt CHANGED
@@ -9,4 +9,4 @@ sentence-transformers[onnx]==3.3.1
9
  sentencepiece==0.2.0
10
  torch==2.4.0
11
  transformers==4.45.0
12
- redis==5.2.1
 
9
  sentencepiece==0.2.0
10
  torch==2.4.0
11
  transformers==4.45.0
12
+ upstash_redis==1.2.0