mediaflow-proxy / mediaflow_proxy /mpd_processor.py
mhdzumair's picture
Remove #EXT-X-PROGRAM-DATE-TIME tags, live stream delay due to playback not supported in ExoPlayer
418a6d3
import logging
import math
import time
from fastapi import Request, Response, HTTPException
from mediaflow_proxy.drm.decrypter import decrypt_segment
from mediaflow_proxy.utils.crypto_utils import encryption_handler
from mediaflow_proxy.utils.http_utils import encode_mediaflow_proxy_url, get_original_scheme, ProxyRequestHeaders
logger = logging.getLogger(__name__)
async def process_manifest(
request: Request, mpd_dict: dict, proxy_headers: ProxyRequestHeaders, key_id: str = None, key: str = None
) -> Response:
"""
Processes the MPD manifest and converts it to an HLS manifest.
Args:
request (Request): The incoming HTTP request.
mpd_dict (dict): The MPD manifest data.
proxy_headers (ProxyRequestHeaders): The headers to include in the request.
key_id (str, optional): The DRM key ID. Defaults to None.
key (str, optional): The DRM key. Defaults to None.
Returns:
Response: The HLS manifest as an HTTP response.
"""
hls_content = build_hls(mpd_dict, request, key_id, key)
return Response(content=hls_content, media_type="application/vnd.apple.mpegurl", headers=proxy_headers.response)
async def process_playlist(
request: Request, mpd_dict: dict, profile_id: str, proxy_headers: ProxyRequestHeaders
) -> Response:
"""
Processes the MPD manifest and converts it to an HLS playlist for a specific profile.
Args:
request (Request): The incoming HTTP request.
mpd_dict (dict): The MPD manifest data.
profile_id (str): The profile ID to generate the playlist for.
proxy_headers (ProxyRequestHeaders): The headers to include in the request.
Returns:
Response: The HLS playlist as an HTTP response.
Raises:
HTTPException: If the profile is not found in the MPD manifest.
"""
matching_profiles = [p for p in mpd_dict["profiles"] if p["id"] == profile_id]
if not matching_profiles:
raise HTTPException(status_code=404, detail="Profile not found")
hls_content = build_hls_playlist(mpd_dict, matching_profiles, request)
return Response(content=hls_content, media_type="application/vnd.apple.mpegurl", headers=proxy_headers.response)
async def process_segment(
init_content: bytes,
segment_content: bytes,
mimetype: str,
proxy_headers: ProxyRequestHeaders,
key_id: str = None,
key: str = None,
) -> Response:
"""
Processes and decrypts a media segment.
Args:
init_content (bytes): The initialization segment content.
segment_content (bytes): The media segment content.
mimetype (str): The MIME type of the segment.
proxy_headers (ProxyRequestHeaders): The headers to include in the request.
key_id (str, optional): The DRM key ID. Defaults to None.
key (str, optional): The DRM key. Defaults to None.
Returns:
Response: The decrypted segment as an HTTP response.
"""
if key_id and key:
# For DRM protected content
now = time.time()
decrypted_content = decrypt_segment(init_content, segment_content, key_id, key)
logger.info(f"Decryption of {mimetype} segment took {time.time() - now:.4f} seconds")
else:
# For non-DRM protected content, we just concatenate init and segment content
decrypted_content = init_content + segment_content
return Response(content=decrypted_content, media_type=mimetype, headers=proxy_headers.response)
def build_hls(mpd_dict: dict, request: Request, key_id: str = None, key: str = None) -> str:
"""
Builds an HLS manifest from the MPD manifest.
Args:
mpd_dict (dict): The MPD manifest data.
request (Request): The incoming HTTP request.
key_id (str, optional): The DRM key ID. Defaults to None.
key (str, optional): The DRM key. Defaults to None.
Returns:
str: The HLS manifest as a string.
"""
hls = ["#EXTM3U", "#EXT-X-VERSION:6"]
query_params = dict(request.query_params)
has_encrypted = query_params.pop("has_encrypted", False)
video_profiles = {}
audio_profiles = {}
# Get the base URL for the playlist_endpoint endpoint
proxy_url = request.url_for("playlist_endpoint")
proxy_url = str(proxy_url.replace(scheme=get_original_scheme(request)))
for profile in mpd_dict["profiles"]:
query_params.update({"profile_id": profile["id"], "key_id": key_id or "", "key": key or ""})
playlist_url = encode_mediaflow_proxy_url(
proxy_url,
query_params=query_params,
encryption_handler=encryption_handler if has_encrypted else None,
)
if "video" in profile["mimeType"]:
video_profiles[profile["id"]] = (profile, playlist_url)
elif "audio" in profile["mimeType"]:
audio_profiles[profile["id"]] = (profile, playlist_url)
# Add audio streams
for i, (profile, playlist_url) in enumerate(audio_profiles.values()):
is_default = "YES" if i == 0 else "NO" # Set the first audio track as default
hls.append(
f'#EXT-X-MEDIA:TYPE=AUDIO,GROUP-ID="audio",NAME="{profile["id"]}",DEFAULT={is_default},AUTOSELECT={is_default},LANGUAGE="{profile.get("lang", "und")}",URI="{playlist_url}"'
)
# Add video streams
for profile, playlist_url in video_profiles.values():
hls.append(
f'#EXT-X-STREAM-INF:BANDWIDTH={profile["bandwidth"]},RESOLUTION={profile["width"]}x{profile["height"]},CODECS="{profile["codecs"]}",FRAME-RATE={profile["frameRate"]},AUDIO="audio"'
)
hls.append(playlist_url)
return "\n".join(hls)
def build_hls_playlist(mpd_dict: dict, profiles: list[dict], request: Request) -> str:
"""
Builds an HLS playlist from the MPD manifest for specific profiles.
Args:
mpd_dict (dict): The MPD manifest data.
profiles (list[dict]): The profiles to include in the playlist.
request (Request): The incoming HTTP request.
Returns:
str: The HLS playlist as a string.
"""
hls = ["#EXTM3U", "#EXT-X-VERSION:6"]
added_segments = 0
proxy_url = request.url_for("segment_endpoint")
proxy_url = str(proxy_url.replace(scheme=get_original_scheme(request)))
for index, profile in enumerate(profiles):
segments = profile["segments"]
if not segments:
logger.warning(f"No segments found for profile {profile['id']}")
continue
# Add headers for only the first profile
if index == 0:
sequence = segments[0]["number"]
extinf_values = [f["extinf"] for f in segments if "extinf" in f]
target_duration = math.ceil(max(extinf_values)) if extinf_values else 3
hls.extend(
[
f"#EXT-X-TARGETDURATION:{target_duration}",
f"#EXT-X-MEDIA-SEQUENCE:{sequence}",
]
)
if mpd_dict["isLive"]:
hls.append("#EXT-X-PLAYLIST-TYPE:EVENT")
else:
hls.append("#EXT-X-PLAYLIST-TYPE:VOD")
init_url = profile["initUrl"]
query_params = dict(request.query_params)
query_params.pop("profile_id", None)
query_params.pop("d", None)
has_encrypted = query_params.pop("has_encrypted", False)
for segment in segments:
hls.append(f'#EXTINF:{segment["extinf"]:.3f},')
query_params.update(
{"init_url": init_url, "segment_url": segment["media"], "mime_type": profile["mimeType"]}
)
hls.append(
encode_mediaflow_proxy_url(
proxy_url,
query_params=query_params,
encryption_handler=encryption_handler if has_encrypted else None,
)
)
added_segments += 1
if not mpd_dict["isLive"]:
hls.append("#EXT-X-ENDLIST")
logger.info(f"Added {added_segments} segments to HLS playlist")
return "\n".join(hls)