amaye15 commited on
Commit
bc82930
·
1 Parent(s): b9c19b4

Feat - concurrent request update

Browse files
src/api/models/embedding_models.py CHANGED
@@ -8,7 +8,8 @@ class CreateEmbeddingRequest(BaseModel):
8
  target_column: str = "product_type"
9
  output_column: str = "embedding"
10
  model: str = "text-embedding-3-small"
11
- batch_size: int = 100
 
12
  dataset_name: str = "re-mind/product_type_embedding"
13
 
14
 
 
8
  target_column: str = "product_type"
9
  output_column: str = "embedding"
10
  model: str = "text-embedding-3-small"
11
+ batch_size: int = 10
12
+ max_concurrent_requests: int = 10
13
  dataset_name: str = "re-mind/product_type_embedding"
14
 
15
 
src/api/services/embedding_service.py CHANGED
@@ -1,3 +1,66 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  from openai import AsyncOpenAI
2
  import logging
3
  from typing import List, Dict
@@ -17,20 +80,23 @@ class EmbeddingService:
17
  self,
18
  openai_api_key: str,
19
  model: str = "text-embedding-3-small",
20
- batch_size: int = 100,
 
21
  ):
22
  self.client = AsyncOpenAI(api_key=openai_api_key)
23
  self.model = model
24
  self.batch_size = batch_size
 
25
 
26
  async def get_embedding(self, text: str) -> List[float]:
27
  """Generate embeddings for the given text using OpenAI."""
28
  text = text.replace("\n", " ")
29
  try:
30
- response = await self.client.embeddings.create(
31
- input=[text], model=self.model
32
- )
33
- return response.data[0].embedding
 
34
  except Exception as e:
35
  logger.error(f"Failed to generate embedding: {e}")
36
  raise OpenAIError(f"OpenAI API error: {e}")
 
1
+ # from openai import AsyncOpenAI
2
+ # import logging
3
+ # from typing import List, Dict
4
+ # import pandas as pd
5
+ # import asyncio
6
+ # from src.api.exceptions import OpenAIError
7
+
8
+ # # Set up structured logging
9
+ # logging.basicConfig(
10
+ # level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
11
+ # )
12
+ # logger = logging.getLogger(__name__)
13
+
14
+
15
+ # class EmbeddingService:
16
+ # def __init__(
17
+ # self,
18
+ # openai_api_key: str,
19
+ # model: str = "text-embedding-3-small",
20
+ # batch_size: int = 100,
21
+ # ):
22
+ # self.client = AsyncOpenAI(api_key=openai_api_key)
23
+ # self.model = model
24
+ # self.batch_size = batch_size
25
+
26
+ # async def get_embedding(self, text: str) -> List[float]:
27
+ # """Generate embeddings for the given text using OpenAI."""
28
+ # text = text.replace("\n", " ")
29
+ # try:
30
+ # response = await self.client.embeddings.create(
31
+ # input=[text], model=self.model
32
+ # )
33
+ # return response.data[0].embedding
34
+ # except Exception as e:
35
+ # logger.error(f"Failed to generate embedding: {e}")
36
+ # raise OpenAIError(f"OpenAI API error: {e}")
37
+
38
+ # async def create_embeddings(
39
+ # self, df: pd.DataFrame, target_column: str, output_column: str
40
+ # ) -> pd.DataFrame:
41
+ # """Create embeddings for the target column in the dataset."""
42
+ # logger.info("Generating embeddings...")
43
+ # batches = [
44
+ # df[i : i + self.batch_size] for i in range(0, len(df), self.batch_size)
45
+ # ]
46
+ # processed_batches = await asyncio.gather(
47
+ # *[
48
+ # self._process_batch(batch, target_column, output_column)
49
+ # for batch in batches
50
+ # ]
51
+ # )
52
+ # return pd.concat(processed_batches)
53
+
54
+ # async def _process_batch(
55
+ # self, df_batch: pd.DataFrame, target_column: str, output_column: str
56
+ # ) -> pd.DataFrame:
57
+ # """Process a batch of rows to generate embeddings."""
58
+ # embeddings = await asyncio.gather(
59
+ # *[self.get_embedding(row[target_column]) for _, row in df_batch.iterrows()]
60
+ # )
61
+ # df_batch[output_column] = embeddings
62
+ # return df_batch
63
+
64
  from openai import AsyncOpenAI
65
  import logging
66
  from typing import List, Dict
 
80
  self,
81
  openai_api_key: str,
82
  model: str = "text-embedding-3-small",
83
+ batch_size: int = 10,
84
+ max_concurrent_requests: int = 10, # Limit to 10 concurrent requests
85
  ):
86
  self.client = AsyncOpenAI(api_key=openai_api_key)
87
  self.model = model
88
  self.batch_size = batch_size
89
+ self.semaphore = asyncio.Semaphore(max_concurrent_requests) # Rate limiter
90
 
91
  async def get_embedding(self, text: str) -> List[float]:
92
  """Generate embeddings for the given text using OpenAI."""
93
  text = text.replace("\n", " ")
94
  try:
95
+ async with self.semaphore: # Acquire a semaphore slot
96
+ response = await self.client.embeddings.create(
97
+ input=[text], model=self.model
98
+ )
99
+ return response.data[0].embedding
100
  except Exception as e:
101
  logger.error(f"Failed to generate embedding: {e}")
102
  raise OpenAIError(f"OpenAI API error: {e}")