Spaces:
Running
Running
import binascii | |
from base64 import b64decode | |
from typing import Optional | |
from fastapi.exceptions import HTTPException | |
from fastapi.openapi.models import HTTPBase as HTTPBaseModel | |
from fastapi.openapi.models import HTTPBearer as HTTPBearerModel | |
from fastapi.security.base import SecurityBase | |
from fastapi.security.utils import get_authorization_scheme_param | |
from pydantic import BaseModel | |
from starlette.requests import Request | |
from starlette.status import HTTP_401_UNAUTHORIZED, HTTP_403_FORBIDDEN | |
from typing_extensions import Annotated, Doc | |
class HTTPBasicCredentials(BaseModel): | |
""" | |
The HTTP Basic credentials given as the result of using `HTTPBasic` in a | |
dependency. | |
Read more about it in the | |
[FastAPI docs for HTTP Basic Auth](https://fastapi.tiangolo.com/advanced/security/http-basic-auth/). | |
""" | |
username: Annotated[str, Doc("The HTTP Basic username.")] | |
password: Annotated[str, Doc("The HTTP Basic password.")] | |
class HTTPAuthorizationCredentials(BaseModel): | |
""" | |
The HTTP authorization credentials in the result of using `HTTPBearer` or | |
`HTTPDigest` in a dependency. | |
The HTTP authorization header value is split by the first space. | |
The first part is the `scheme`, the second part is the `credentials`. | |
For example, in an HTTP Bearer token scheme, the client will send a header | |
like: | |
``` | |
Authorization: Bearer deadbeef12346 | |
``` | |
In this case: | |
* `scheme` will have the value `"Bearer"` | |
* `credentials` will have the value `"deadbeef12346"` | |
""" | |
scheme: Annotated[ | |
str, | |
Doc( | |
""" | |
The HTTP authorization scheme extracted from the header value. | |
""" | |
), | |
] | |
credentials: Annotated[ | |
str, | |
Doc( | |
""" | |
The HTTP authorization credentials extracted from the header value. | |
""" | |
), | |
] | |
class HTTPBase(SecurityBase): | |
def __init__( | |
self, | |
*, | |
scheme: str, | |
scheme_name: Optional[str] = None, | |
description: Optional[str] = None, | |
auto_error: bool = True, | |
): | |
self.model = HTTPBaseModel(scheme=scheme, description=description) | |
self.scheme_name = scheme_name or self.__class__.__name__ | |
self.auto_error = auto_error | |
async def __call__( | |
self, request: Request | |
) -> Optional[HTTPAuthorizationCredentials]: | |
authorization = request.headers.get("Authorization") | |
scheme, credentials = get_authorization_scheme_param(authorization) | |
if not (authorization and scheme and credentials): | |
if self.auto_error: | |
raise HTTPException( | |
status_code=HTTP_403_FORBIDDEN, detail="Not authenticated" | |
) | |
else: | |
return None | |
return HTTPAuthorizationCredentials(scheme=scheme, credentials=credentials) | |
class HTTPBasic(HTTPBase): | |
""" | |
HTTP Basic authentication. | |
## Usage | |
Create an instance object and use that object as the dependency in `Depends()`. | |
The dependency result will be an `HTTPBasicCredentials` object containing the | |
`username` and the `password`. | |
Read more about it in the | |
[FastAPI docs for HTTP Basic Auth](https://fastapi.tiangolo.com/advanced/security/http-basic-auth/). | |
## Example | |
```python | |
from typing import Annotated | |
from fastapi import Depends, FastAPI | |
from fastapi.security import HTTPBasic, HTTPBasicCredentials | |
app = FastAPI() | |
security = HTTPBasic() | |
@app.get("/users/me") | |
def read_current_user(credentials: Annotated[HTTPBasicCredentials, Depends(security)]): | |
return {"username": credentials.username, "password": credentials.password} | |
``` | |
""" | |
def __init__( | |
self, | |
*, | |
scheme_name: Annotated[ | |
Optional[str], | |
Doc( | |
""" | |
Security scheme name. | |
It will be included in the generated OpenAPI (e.g. visible at `/docs`). | |
""" | |
), | |
] = None, | |
realm: Annotated[ | |
Optional[str], | |
Doc( | |
""" | |
HTTP Basic authentication realm. | |
""" | |
), | |
] = None, | |
description: Annotated[ | |
Optional[str], | |
Doc( | |
""" | |
Security scheme description. | |
It will be included in the generated OpenAPI (e.g. visible at `/docs`). | |
""" | |
), | |
] = None, | |
auto_error: Annotated[ | |
bool, | |
Doc( | |
""" | |
By default, if the HTTP Basic authentication is not provided (a | |
header), `HTTPBasic` will automatically cancel the request and send the | |
client an error. | |
If `auto_error` is set to `False`, when the HTTP Basic authentication | |
is not available, instead of erroring out, the dependency result will | |
be `None`. | |
This is useful when you want to have optional authentication. | |
It is also useful when you want to have authentication that can be | |
provided in one of multiple optional ways (for example, in HTTP Basic | |
authentication or in an HTTP Bearer token). | |
""" | |
), | |
] = True, | |
): | |
self.model = HTTPBaseModel(scheme="basic", description=description) | |
self.scheme_name = scheme_name or self.__class__.__name__ | |
self.realm = realm | |
self.auto_error = auto_error | |
async def __call__( # type: ignore | |
self, request: Request | |
) -> Optional[HTTPBasicCredentials]: | |
authorization = request.headers.get("Authorization") | |
scheme, param = get_authorization_scheme_param(authorization) | |
if self.realm: | |
unauthorized_headers = {"WWW-Authenticate": f'Basic realm="{self.realm}"'} | |
else: | |
unauthorized_headers = {"WWW-Authenticate": "Basic"} | |
if not authorization or scheme.lower() != "basic": | |
if self.auto_error: | |
raise HTTPException( | |
status_code=HTTP_401_UNAUTHORIZED, | |
detail="Not authenticated", | |
headers=unauthorized_headers, | |
) | |
else: | |
return None | |
invalid_user_credentials_exc = HTTPException( | |
status_code=HTTP_401_UNAUTHORIZED, | |
detail="Invalid authentication credentials", | |
headers=unauthorized_headers, | |
) | |
try: | |
data = b64decode(param).decode("ascii") | |
except (ValueError, UnicodeDecodeError, binascii.Error): | |
raise invalid_user_credentials_exc # noqa: B904 | |
username, separator, password = data.partition(":") | |
if not separator: | |
raise invalid_user_credentials_exc | |
return HTTPBasicCredentials(username=username, password=password) | |
class HTTPBearer(HTTPBase): | |
""" | |
HTTP Bearer token authentication. | |
## Usage | |
Create an instance object and use that object as the dependency in `Depends()`. | |
The dependency result will be an `HTTPAuthorizationCredentials` object containing | |
the `scheme` and the `credentials`. | |
## Example | |
```python | |
from typing import Annotated | |
from fastapi import Depends, FastAPI | |
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer | |
app = FastAPI() | |
security = HTTPBearer() | |
@app.get("/users/me") | |
def read_current_user( | |
credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)] | |
): | |
return {"scheme": credentials.scheme, "credentials": credentials.credentials} | |
``` | |
""" | |
def __init__( | |
self, | |
*, | |
bearerFormat: Annotated[Optional[str], Doc("Bearer token format.")] = None, | |
scheme_name: Annotated[ | |
Optional[str], | |
Doc( | |
""" | |
Security scheme name. | |
It will be included in the generated OpenAPI (e.g. visible at `/docs`). | |
""" | |
), | |
] = None, | |
description: Annotated[ | |
Optional[str], | |
Doc( | |
""" | |
Security scheme description. | |
It will be included in the generated OpenAPI (e.g. visible at `/docs`). | |
""" | |
), | |
] = None, | |
auto_error: Annotated[ | |
bool, | |
Doc( | |
""" | |
By default, if the HTTP Bearer token not provided (in an | |
`Authorization` header), `HTTPBearer` will automatically cancel the | |
request and send the client an error. | |
If `auto_error` is set to `False`, when the HTTP Bearer token | |
is not available, instead of erroring out, the dependency result will | |
be `None`. | |
This is useful when you want to have optional authentication. | |
It is also useful when you want to have authentication that can be | |
provided in one of multiple optional ways (for example, in an HTTP | |
Bearer token or in a cookie). | |
""" | |
), | |
] = True, | |
): | |
self.model = HTTPBearerModel(bearerFormat=bearerFormat, description=description) | |
self.scheme_name = scheme_name or self.__class__.__name__ | |
self.auto_error = auto_error | |
async def __call__( | |
self, request: Request | |
) -> Optional[HTTPAuthorizationCredentials]: | |
authorization = request.headers.get("Authorization") | |
scheme, credentials = get_authorization_scheme_param(authorization) | |
if not (authorization and scheme and credentials): | |
if self.auto_error: | |
raise HTTPException( | |
status_code=HTTP_403_FORBIDDEN, detail="Not authenticated" | |
) | |
else: | |
return None | |
if scheme.lower() != "bearer": | |
if self.auto_error: | |
raise HTTPException( | |
status_code=HTTP_403_FORBIDDEN, | |
detail="Invalid authentication credentials", | |
) | |
else: | |
return None | |
return HTTPAuthorizationCredentials(scheme=scheme, credentials=credentials) | |
class HTTPDigest(HTTPBase): | |
""" | |
HTTP Digest authentication. | |
## Usage | |
Create an instance object and use that object as the dependency in `Depends()`. | |
The dependency result will be an `HTTPAuthorizationCredentials` object containing | |
the `scheme` and the `credentials`. | |
## Example | |
```python | |
from typing import Annotated | |
from fastapi import Depends, FastAPI | |
from fastapi.security import HTTPAuthorizationCredentials, HTTPDigest | |
app = FastAPI() | |
security = HTTPDigest() | |
@app.get("/users/me") | |
def read_current_user( | |
credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)] | |
): | |
return {"scheme": credentials.scheme, "credentials": credentials.credentials} | |
``` | |
""" | |
def __init__( | |
self, | |
*, | |
scheme_name: Annotated[ | |
Optional[str], | |
Doc( | |
""" | |
Security scheme name. | |
It will be included in the generated OpenAPI (e.g. visible at `/docs`). | |
""" | |
), | |
] = None, | |
description: Annotated[ | |
Optional[str], | |
Doc( | |
""" | |
Security scheme description. | |
It will be included in the generated OpenAPI (e.g. visible at `/docs`). | |
""" | |
), | |
] = None, | |
auto_error: Annotated[ | |
bool, | |
Doc( | |
""" | |
By default, if the HTTP Digest not provided, `HTTPDigest` will | |
automatically cancel the request and send the client an error. | |
If `auto_error` is set to `False`, when the HTTP Digest is not | |
available, instead of erroring out, the dependency result will | |
be `None`. | |
This is useful when you want to have optional authentication. | |
It is also useful when you want to have authentication that can be | |
provided in one of multiple optional ways (for example, in HTTP | |
Digest or in a cookie). | |
""" | |
), | |
] = True, | |
): | |
self.model = HTTPBaseModel(scheme="digest", description=description) | |
self.scheme_name = scheme_name or self.__class__.__name__ | |
self.auto_error = auto_error | |
async def __call__( | |
self, request: Request | |
) -> Optional[HTTPAuthorizationCredentials]: | |
authorization = request.headers.get("Authorization") | |
scheme, credentials = get_authorization_scheme_param(authorization) | |
if not (authorization and scheme and credentials): | |
if self.auto_error: | |
raise HTTPException( | |
status_code=HTTP_403_FORBIDDEN, detail="Not authenticated" | |
) | |
else: | |
return None | |
if scheme.lower() != "digest": | |
raise HTTPException( | |
status_code=HTTP_403_FORBIDDEN, | |
detail="Invalid authentication credentials", | |
) | |
return HTTPAuthorizationCredentials(scheme=scheme, credentials=credentials) | |