llm / services /data_service.py
Chris4K's picture
Update services/data_service.py
d74e0f6 verified
# services/data_service.py
from typing import List, Dict, Any, Optional, Tuple
import pandas as pd
import faiss
import numpy as np
import aiohttp
from datetime import datetime
import logging
from config.config import settings
from functools import lru_cache
from io import StringIO # Add explicit StringIO import
logger = logging.getLogger(__name__)
class DataService:
def __init__(self, model_service):
self.embedder = model_service.embedder
self.cache = {}
self.last_update = None
self.faiss_index = None
self.data_cleaned = None
async def fetch_csv_data(self) -> pd.DataFrame:
"""Fetch CSV data from URL with retry logic"""
async with aiohttp.ClientSession() as session:
for attempt in range(settings.MAX_RETRIES):
try:
async with session.get(settings.CSV_URL) as response:
if response.status == 200:
content = await response.text()
return pd.read_csv(StringIO(content), sep='|')
else:
logger.error(f"Failed to fetch data: HTTP {response.status}")
except Exception as e:
logger.error(f"Attempt {attempt + 1} failed: {e}", exc_info=True)
if attempt == settings.MAX_RETRIES - 1:
raise
return pd.DataFrame() # Return empty DataFrame if all attempts fail
async def prepare_data_and_index(self) -> Tuple[pd.DataFrame, Any]:
"""Prepare data and create FAISS index with caching"""
try:
current_time = datetime.now()
# Check cache validity
if (self.last_update and
(current_time - self.last_update).seconds < settings.CACHE_DURATION and
self.cache):
return self.cache['data'], self.cache['index']
data = await self.fetch_csv_data()
if data.empty:
logger.error("Failed to fetch data")
return pd.DataFrame(), None
# Data cleaning and preparation
columns_to_keep = [
'ID', 'Name', 'Description', 'Price',
'ProductCategory', 'Grammage',
'BasePriceText', 'Rating', 'RatingCount',
'Ingredients', 'CreationDate', 'Keywords', 'Brand'
]
self.data_cleaned = data[columns_to_keep].copy()
# Clean description text
self.data_cleaned['Description'] = self.data_cleaned['Description'].astype(str).str.replace(
r'[^\w\s.,;:\'/?!€$%&()\[\]{}<>|=+\\-]', ' ', regex=True
)
# Combine text fields with weights
self.data_cleaned['combined_text'] = self.data_cleaned.apply(
lambda row: (
f"{row['Name']} {row['Name']} " # Double weight for name
f"{str(row['Description'])} "
f"{str(row['Keywords']) if pd.notnull(row['Keywords']) else ''} "
f"{str(row['ProductCategory']) if pd.notnull(row['ProductCategory']) else ''}"
).strip(),
axis=1
)
# Create FAISS index
embeddings = self.embedder.encode(
self.data_cleaned['combined_text'].tolist(),
convert_to_tensor=True,
show_progress_bar=True
).cpu().detach().numpy()
d = embeddings.shape[1]
self.faiss_index = faiss.IndexFlatL2(d)
self.faiss_index.add(embeddings)
# Update cache
self.cache = {
'data': self.data_cleaned,
'index': self.faiss_index
}
self.last_update = current_time
return self.data_cleaned, self.faiss_index
except Exception as e:
logger.error(f"Error in prepare_data_and_index: {e}", exc_info=True)
return pd.DataFrame(), None
async def search(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]:
"""Search for products similar to the query"""
try:
if not self.faiss_index:
self.data_cleaned, self.faiss_index = await self.prepare_data_and_index()
if self.faiss_index is None:
return []
# Create query embedding
query_embedding = self.embedder.encode([query], convert_to_tensor=True)
query_embedding_np = query_embedding.cpu().detach().numpy()
# Search in FAISS index
distances, indices = self.faiss_index.search(query_embedding_np, top_k)
# Prepare results
results = []
for i, idx in enumerate(indices[0]):
try:
product = {}
row = self.data_cleaned.iloc[idx]
for column in self.data_cleaned.columns:
value = row[column]
# Convert numpy/pandas types to Python native types
if isinstance(value, (np.integer, np.floating)):
value = value.item()
elif isinstance(value, pd.Timestamp):
value = value.isoformat()
elif isinstance(value, np.bool_):
value = bool(value)
product[column] = value
product['score'] = float(distances[0][i])
results.append(product)
except Exception as e:
logger.error(f"Error processing search result {i}: {e}", exc_info=True)
continue
return results
except Exception as e:
logger.error(f"Error in search: {e}", exc_info=True)
return []