import pandas as pd from surprise import Dataset, Reader, SVD from surprise.model_selection import train_test_split from datetime import datetime, timedelta import pickle import random from sqlalchemy.orm import Session from sqlalchemy import func from typing import List from orders.models import Order, Meal, RecommendationModel from users.models import User class MealRecommender: def __init__(self, db: Session): self.db = db self.retrain_interval = timedelta(days=1) self.algo = self.load_or_train_model() def fetch_data(self): # Fetch data in batches to handle large datasets batch_size = 1000 offset = 0 data = [] while True: batch = self.db.query(Order.user_id, Order.meal_id, Order.quantity).offset(offset).limit(batch_size).all() if not batch: break data.extend(batch) offset += batch_size return pd.DataFrame(data, columns=['user_id', 'meal_id', 'quantity']) def train_model(self): data = self.fetch_data() if data.empty: return None reader = Reader(rating_scale=(1, 5)) dataset = Dataset.load_from_df(data[['user_id', 'meal_id', 'quantity']], reader) trainset = dataset.build_full_trainset() algo = SVD() algo.fit(trainset) # Save model to database model_binary = pickle.dumps(algo) model_record = RecommendationModel(model=model_binary, created_at=datetime.now()) self.db.add(model_record) self.db.commit() return algo def load_or_train_model(self): latest_model = self.db.query(RecommendationModel).order_by(RecommendationModel.created_at.desc()).first() if latest_model and datetime.now() - latest_model.created_at <= self.retrain_interval: return pickle.loads(latest_model.model) else: return self.train_model() def get_recommendations(self, user: User): if self.algo is None: return self.get_random_recommendations() all_meals = self.db.query(Meal).all() meal_ids = [meal.id for meal in all_meals] predictions = [self.algo.predict(str(user.id), str(meal_id)) for meal_id in meal_ids] sorted_predictions = sorted(predictions, key=lambda x: x.est, reverse=True) top_recommendations = self.db.query(Meal).filter(Meal.id.in_([int(pred.iid) for pred in sorted_predictions[:20]])).all() top_recommendations = self.adjust_for_preferences(user, top_recommendations) return top_recommendations[:5] # Return top 5 recommendations def adjust_for_preferences(self, user: User, recommendations: List[Meal]) -> List[Meal]: preferences = user.preferences if user.preferences else [] preference_scores = {meal.id: 0 for meal in recommendations} for meal in recommendations: for preferred in preferences: if preferred.lower() in meal.name.lower() or preferred.lower() in meal.description.lower(): preference_scores[meal.id] += 1 sorted_recommendations = sorted(recommendations, key=lambda meal: preference_scores[meal.id], reverse=True) return sorted_recommendations def get_random_recommendations(self): all_meals = self.db.query(Meal).all() return random.sample(all_meals, min(5, len(all_meals)))