mhdzumair's picture
Release MediaFlow Proxy
1ebbb74
raw
history blame
5.98 kB
import logging
from fastapi import FastAPI, Request, Depends, Security, HTTPException
from fastapi.security import APIKeyQuery
from pydantic import HttpUrl
from mediaflow_proxy.configs import settings
from .handlers import handle_hls_stream_proxy, proxy_stream, get_manifest, get_playlist, get_segment, get_public_ip
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
app = FastAPI()
api_key_query = APIKeyQuery(name="api_password", auto_error=False)
async def verify_api_key(api_key: str = Security(api_key_query)):
"""
Verifies the API key for the request.
Args:
api_key (str): The API key to validate.
Raises:
HTTPException: If the API key is invalid.
"""
if api_key != settings.api_password:
raise HTTPException(status_code=403, detail="Could not validate credentials")
def get_proxy_headers(request: Request) -> dict:
"""
Extracts proxy headers from the request query parameters.
Args:
request (Request): The incoming HTTP request.
Returns:
dict: A dictionary of proxy headers.
"""
return {k[2:]: v for k, v in request.query_params.items() if k.startswith("h_")}
@app.head("/proxy/hls")
@app.get("/proxy/hls")
async def hls_stream_proxy(
request: Request,
d: HttpUrl,
headers: dict = Depends(get_proxy_headers),
key_url: HttpUrl | None = None,
_: str = Depends(verify_api_key),
):
"""
Proxify HLS stream requests, fetching and processing the m3u8 playlist or streaming the content.
Args:
request (Request): The incoming HTTP request.
d (HttpUrl): The destination URL to fetch the content from.
key_url (HttpUrl, optional): The HLS Key URL to replace the original key URL. Defaults to None. (Useful for bypassing some sneaky protection)
headers (dict): The headers to include in the request.
_ (str): The API key to validate.
Returns:
Response: The HTTP response with the processed m3u8 playlist or streamed content.
"""
destination = str(d)
return await handle_hls_stream_proxy(request, destination, headers, key_url)
@app.head("/proxy/stream")
@app.get("/proxy/stream")
async def proxy_stream_endpoint(
request: Request, d: HttpUrl, headers: dict = Depends(get_proxy_headers), _: str = Depends(verify_api_key)
):
"""
Proxies stream requests to the given video URL.
Args:
request (Request): The incoming HTTP request.
d (HttpUrl): The URL of the video to stream.
headers (dict): The headers to include in the request.
_: str: The API key to validate.
Returns:
Response: The HTTP response with the streamed content.
"""
headers.update({"range": headers.get("range", "bytes=0-")})
return await proxy_stream(request.method, str(d), headers)
@app.get("/proxy/mpd/manifest")
async def manifest_endpoint(
request: Request,
d: HttpUrl,
headers: dict = Depends(get_proxy_headers),
key_id: str = None,
key: str = None,
_: str = Depends(verify_api_key),
):
"""
Retrieves and processes the MPD manifest, converting it to an HLS manifest.
Args:
request (Request): The incoming HTTP request.
d (HttpUrl): The URL of the MPD manifest.
headers (dict): The headers to include in the request.
key_id (str, optional): The DRM key ID. Defaults to None.
key (str, optional): The DRM key. Defaults to None.
_: str: The API key to validate.
Returns:
Response: The HTTP response with the HLS manifest.
"""
return await get_manifest(request, str(d), headers, key_id, key)
@app.get("/proxy/mpd/playlist")
async def playlist_endpoint(
request: Request,
d: HttpUrl,
profile_id: str,
headers: dict = Depends(get_proxy_headers),
key_id: str = None,
key: str = None,
_: str = Depends(verify_api_key),
):
"""
Retrieves and processes the MPD manifest, converting it to an HLS playlist for a specific profile.
Args:
request (Request): The incoming HTTP request.
d (HttpUrl): The URL of the MPD manifest.
profile_id (str): The profile ID to generate the playlist for.
headers (dict): The headers to include in the request.
key_id (str, optional): The DRM key ID. Defaults to None.
key (str, optional): The DRM key. Defaults to None.
_: str: The API key to validate.
Returns:
Response: The HTTP response with the HLS playlist.
"""
return await get_playlist(request, str(d), profile_id, headers, key_id, key)
@app.get("/proxy/mpd/segment")
async def segment_endpoint(
init_url: HttpUrl,
segment_url: HttpUrl,
mime_type: str,
headers: dict = Depends(get_proxy_headers),
key_id: str = None,
key: str = None,
_: str = Depends(verify_api_key),
):
"""
Retrieves and processes a media segment, decrypting it if necessary.
Args:
init_url (HttpUrl): The URL of the initialization segment.
segment_url (HttpUrl): The URL of the media segment.
mime_type (str): The MIME type of the segment.
headers (dict): The headers to include in the request.
key_id (str, optional): The DRM key ID. Defaults to None.
key (str, optional): The DRM key. Defaults to None.
_: str: The API key to validate.
Returns:
Response: The HTTP response with the processed segment.
"""
return await get_segment(str(init_url), str(segment_url), mime_type, headers, key_id, key)
@app.get("/proxy/ip")
async def get_mediaflow_proxy_public_ip(_: str = Depends(verify_api_key)):
"""
Retrieves the public IP address of the MediaFlow proxy server.
Args:
_: str: The API key to validate.
Returns:
Response: The HTTP response with the public IP address in the form of a JSON object. {"ip": "xxx.xxx.xxx.xxx"}
"""
return await get_public_ip()