Spaces:
Running
Running
File size: 2,866 Bytes
1ebbb74 fb6e6a7 1ebbb74 fb6e6a7 1ebbb74 ad23c2a 1ebbb74 fb6e6a7 1ebbb74 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
import re
from urllib import parse
from pydantic import HttpUrl
from mediaflow_proxy.utils.http_utils import encode_mediaflow_proxy_url, get_original_scheme
class M3U8Processor:
def __init__(self, request, key_url: HttpUrl = None):
"""
Initializes the M3U8Processor with the request and URL prefix.
Args:
request (Request): The incoming HTTP request.
key_url (HttpUrl, optional): The URL of the key server. Defaults to None.
"""
self.request = request
self.key_url = key_url
self.mediaflow_proxy_url = str(request.url_for("hls_stream_proxy").replace(scheme=get_original_scheme(request)))
async def process_m3u8(self, content: str, base_url: str) -> str:
"""
Processes the m3u8 content, proxying URLs and handling key lines.
Args:
content (str): The m3u8 content to process.
base_url (str): The base URL to resolve relative URLs.
Returns:
str: The processed m3u8 content.
"""
lines = content.splitlines()
processed_lines = []
for line in lines:
if "URI=" in line:
processed_lines.append(await self.process_key_line(line, base_url))
elif not line.startswith("#") and line.strip():
processed_lines.append(await self.proxy_url(line, base_url))
else:
processed_lines.append(line)
return "\n".join(processed_lines)
async def process_key_line(self, line: str, base_url: str) -> str:
"""
Processes a key line in the m3u8 content, proxying the URI.
Args:
line (str): The key line to process.
base_url (str): The base URL to resolve relative URLs.
Returns:
str: The processed key line.
"""
uri_match = re.search(r'URI="([^"]+)"', line)
if uri_match:
original_uri = uri_match.group(1)
uri = parse.urlparse(original_uri)
if self.key_url:
uri = uri._replace(scheme=self.key_url.scheme, netloc=self.key_url.host)
new_uri = await self.proxy_url(uri.geturl(), base_url)
line = line.replace(f'URI="{original_uri}"', f'URI="{new_uri}"')
return line
async def proxy_url(self, url: str, base_url: str) -> str:
"""
Proxies a URL, encoding it with the MediaFlow proxy URL.
Args:
url (str): The URL to proxy.
base_url (str): The base URL to resolve relative URLs.
Returns:
str: The proxied URL.
"""
full_url = parse.urljoin(base_url, url)
return encode_mediaflow_proxy_url(
self.mediaflow_proxy_url,
"",
full_url,
query_params=dict(self.request.query_params),
)
|