from datetime import timedelta, datetime import anyio from fastapi import Depends, HTTPException from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from jose import jwt, JWTError from passlib.context import CryptContext from trauma.api.account.model import AccountModel from trauma.core.config import settings def verify_password(plain_password, hashed_password) -> bool: result = CryptContext(schemes=["bcrypt"], deprecated="auto").verify(plain_password, hashed_password) return result def create_access_token(email: str, account_id: str): payload = { "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/name": email, "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/nameidentifier": account_id, "accountId": account_id, "iss": settings.Issuer, "aud": settings.Audience, "exp": datetime.utcnow() + timedelta(days=30) } encoded_jwt = jwt.encode(payload, settings.SECRET_KEY, algorithm="HS256") return encoded_jwt class PermissionDependency: def __init__(self, is_public: bool = False): self.is_public = is_public def __call__( self, credentials: HTTPAuthorizationCredentials | None = Depends(HTTPBearer(auto_error=False)) ) -> AccountModel | None: if credentials is None: if self.is_public: return None else: raise HTTPException(status_code=403, detail="Permission denied") try: account_id = self.authenticate_jwt_token(credentials.credentials) account_data = anyio.from_thread.run(self.get_account_by_id, account_id) self.check_account_health(account_data) return account_data except JWTError: raise HTTPException(status_code=403, detail="Permission denied") async def get_account_by_id(self, account_id: str) -> AccountModel: account = await settings.DB_CLIENT.accounts.find_one({"id": account_id}) return AccountModel.from_mongo(account) @staticmethod def check_account_health(account: AccountModel): if not account: raise HTTPException(status_code=403, detail="Permission denied") def authenticate_jwt_token(self, token: str) -> str: payload = jwt.decode(token, settings.SECRET_KEY, algorithms="HS256", audience=settings.Audience) email: str = payload.get("http://schemas.xmlsoap.org/ws/2005/05/identity/claims/name") account_id = payload.get("accountId") if email is None or account_id is None: raise HTTPException(status_code=403, detail="Permission denied") return account_id