from sqlalchemy.orm import Session from typing import List, Optional from orders.schemas import MealCreate, MealUpdate, OrderCreate from orders.models import Meal, Order def get_meals(db: Session, skip: int = 0, limit: int = 100) -> List[Meal]: return db.query(Meal).offset(skip).limit(limit).all() def create_meal(db: Session, meal: MealCreate) -> Meal: db_meal = Meal(**meal.dict()) if db.query(Meal).filter(Meal.name == db_meal.name).first(): raise ValueError("Meal with this name already exists") db.add(db_meal) db.commit() db.refresh(db_meal) return db_meal def get_meal(db: Session, meal_id: int) -> Optional[Meal]: return db.query(Meal).filter(Meal.id == meal_id).first() def update_meal(db: Session, meal_id: int, meal: MealUpdate) -> Optional[Meal]: db_meal = db.query(Meal).filter(Meal.id == meal_id).first() if not db_meal: return None # Or raise an exception update_data = meal.dict(exclude_unset=True) for key, value in update_data.items(): setattr(db_meal, key, value) db.commit() db.refresh(db_meal) return db_meal def delete_meal(db: Session, meal_id: int) -> Optional[Meal]: db_meal = db.query(Meal).filter(Meal.id == meal_id).first() if not db_meal: return None # Or raise an exception db.delete(db_meal) db.commit() return db_meal def create_user_order(db: Session, order: OrderCreate, user_id: int) -> Order: db_order = Order(**order.dict(), user_id=user_id) db.add(db_order) db.commit() db.refresh(db_order) return db_order def get_user_orders(db: Session, user_id: int, skip: int = 0, limit: int = 100) -> List[Order]: return db.query(Order).filter(Order.user_id == user_id).offset(skip).limit(limit).all()