tgbs / bot /services /users.py
AZILS's picture
Upload 120 files
e35e6bc verified
from __future__ import annotations
from typing import TYPE_CHECKING
from sqlalchemy import func, select, update
from bot.cache.redis import build_key, cached, clear_cache
from bot.database.models import UserModel
if TYPE_CHECKING:
from aiogram.types import User
from sqlalchemy.ext.asyncio import AsyncSession
async def add_user(
session: AsyncSession,
user: User,
referrer: str | None,
) -> None:
"""Add a new user to the database."""
user_id: int = user.id
first_name: str = user.first_name
last_name: str | None = user.last_name
username: str | None = user.username
language_code: str | None = user.language_code
is_premium: bool = user.is_premium or False
new_user = UserModel(
id=user_id,
first_name=first_name,
last_name=last_name,
username=username,
language_code=language_code,
is_premium=is_premium,
referrer=referrer,
)
session.add(new_user)
await session.commit()
await clear_cache(user_exists, user_id)
@cached(key_builder=lambda session, user_id: build_key(user_id))
async def user_exists(session: AsyncSession, user_id: int) -> bool:
"""Checks if the user is in the database."""
query = select(UserModel.id).filter_by(id=user_id).limit(1)
result = await session.execute(query)
user = result.scalar_one_or_none()
return bool(user)
@cached(key_builder=lambda session, user_id: build_key(user_id))
async def get_first_name(session: AsyncSession, user_id: int) -> str:
query = select(UserModel.first_name).filter_by(id=user_id)
result = await session.execute(query)
first_name = result.scalar_one_or_none()
return first_name or ""
@cached(key_builder=lambda session, user_id: build_key(user_id))
async def get_language_code(session: AsyncSession, user_id: int) -> str:
query = select(UserModel.language_code).filter_by(id=user_id)
result = await session.execute(query)
language_code = result.scalar_one_or_none()
return language_code or ""
async def set_language_code(
session: AsyncSession,
user_id: int,
language_code: str,
) -> None:
stmt = update(UserModel).where(UserModel.id == user_id).values(language_code=language_code)
await session.execute(stmt)
await session.commit()
@cached(key_builder=lambda session, user_id: build_key(user_id))
async def is_admin(session: AsyncSession, user_id: int) -> bool:
query = select(UserModel.is_admin).filter_by(id=user_id)
result = await session.execute(query)
is_admin = result.scalar_one_or_none()
return bool(is_admin)
async def set_is_admin(session: AsyncSession, user_id: int, is_admin: bool) -> None:
stmt = update(UserModel).where(UserModel.id == user_id).values(is_admin=is_admin)
await session.execute(stmt)
await session.commit()
@cached(key_builder=lambda session: build_key())
async def get_all_users(session: AsyncSession) -> list[UserModel]:
query = select(UserModel)
result = await session.execute(query)
users = result.scalars()
return list(users)
@cached(key_builder=lambda session: build_key())
async def get_user_count(session: AsyncSession) -> int:
query = select(func.count()).select_from(UserModel)
result = await session.execute(query)
count = result.scalar_one_or_none() or 0
return int(count)