mhdzumair commited on
Commit
33e2f36
1 Parent(s): af6d93e

Fix HLS & HTTPS proxy stream with streaming response and proper headers & Yield raw bytes without handling the decoding

Browse files
mediaflow_proxy/const.py CHANGED
@@ -7,8 +7,8 @@ SUPPORTED_RESPONSE_HEADERS = [
7
  "transfer-encoding",
8
  "last-modified",
9
  "etag",
10
- "server",
11
- "date",
12
  ]
13
 
14
  SUPPORTED_REQUEST_HEADERS = [
@@ -16,8 +16,9 @@ SUPPORTED_REQUEST_HEADERS = [
16
  "accept-encoding",
17
  "accept-language",
18
  "connection",
19
- "transfer-encoding",
20
  "range",
21
  "if-range",
22
  "user-agent",
 
 
23
  ]
 
7
  "transfer-encoding",
8
  "last-modified",
9
  "etag",
10
+ "cache-control",
11
+ "expires",
12
  ]
13
 
14
  SUPPORTED_REQUEST_HEADERS = [
 
16
  "accept-encoding",
17
  "accept-language",
18
  "connection",
 
19
  "range",
20
  "if-range",
21
  "user-agent",
22
+ "referer",
23
+ "origin",
24
  ]
mediaflow_proxy/handlers.py CHANGED
@@ -1,18 +1,23 @@
1
  import base64
2
  import logging
3
- from ipaddress import ip_address
4
 
5
  import httpx
6
  from fastapi import Request, Response, HTTPException
7
- from fastapi.responses import StreamingResponse
8
  from pydantic import HttpUrl
9
  from starlette.background import BackgroundTask
 
10
 
11
  from .configs import settings
12
  from .const import SUPPORTED_RESPONSE_HEADERS
13
  from .mpd_processor import process_manifest, process_playlist, process_segment
14
  from .utils.cache_utils import get_cached_mpd, get_cached_init_segment
15
- from .utils.http_utils import Streamer, DownloadError, download_file_with_retry, request_with_retry
 
 
 
 
 
 
16
  from .utils.m3u8_processor import M3U8Processor
17
  from .utils.mpd_utils import pad_base64
18
 
@@ -49,13 +54,18 @@ async def handle_hls_stream_proxy(request: Request, destination: str, headers: d
49
 
50
  headers.update({"range": headers.get("range", "bytes=0-")})
51
  # clean up the headers to only include the necessary headers and remove acl headers
52
- response_headers = {
53
- k.title(): v for k, v in response.headers.items() if k.lower() in SUPPORTED_RESPONSE_HEADERS
54
- }
55
 
56
- return StreamingResponse(
 
 
 
 
 
 
 
57
  streamer.stream_content(destination, headers),
58
- status_code=206,
59
  headers=response_headers,
60
  background=BackgroundTask(streamer.close),
61
  )
@@ -110,18 +120,22 @@ async def handle_stream_request(method: str, video_url: str, headers: dict):
110
  try:
111
  response = await streamer.head(video_url, headers)
112
  # clean up the headers to only include the necessary headers and remove acl headers
113
- response_headers = {
114
- k.title(): v for k, v in response.headers.items() if k.lower() in SUPPORTED_RESPONSE_HEADERS
115
- }
 
 
 
 
116
 
117
  if method == "HEAD":
118
  await streamer.close()
119
- return Response(headers=response_headers, status_code=response.status_code)
120
  else:
121
- return StreamingResponse(
122
  streamer.stream_content(video_url, headers),
123
  headers=response_headers,
124
- status_code=206,
125
  background=BackgroundTask(streamer.close),
126
  )
127
  except httpx.HTTPStatusError as e:
@@ -162,8 +176,9 @@ async def fetch_and_process_m3u8(
162
  content=processed_content,
163
  media_type="application/vnd.apple.mpegurl",
164
  headers={
165
- "Content-Disposition": "inline",
166
- "Accept-Ranges": "none",
 
167
  },
168
  )
169
  except httpx.HTTPStatusError as e:
 
1
  import base64
2
  import logging
 
3
 
4
  import httpx
5
  from fastapi import Request, Response, HTTPException
 
6
  from pydantic import HttpUrl
7
  from starlette.background import BackgroundTask
8
+ from starlette.status import HTTP_206_PARTIAL_CONTENT
9
 
10
  from .configs import settings
11
  from .const import SUPPORTED_RESPONSE_HEADERS
12
  from .mpd_processor import process_manifest, process_playlist, process_segment
13
  from .utils.cache_utils import get_cached_mpd, get_cached_init_segment
14
+ from .utils.http_utils import (
15
+ Streamer,
16
+ DownloadError,
17
+ download_file_with_retry,
18
+ request_with_retry,
19
+ EnhancedStreamingResponse,
20
+ )
21
  from .utils.m3u8_processor import M3U8Processor
22
  from .utils.mpd_utils import pad_base64
23
 
 
54
 
55
  headers.update({"range": headers.get("range", "bytes=0-")})
56
  # clean up the headers to only include the necessary headers and remove acl headers
57
+ response_headers = {k: v for k, v in response.headers.multi_items() if k in SUPPORTED_RESPONSE_HEADERS}
 
 
58
 
59
+ if transfer_encoding := response_headers.get("transfer-encoding"):
60
+ if "chunked" not in transfer_encoding:
61
+ transfer_encoding += ", chunked"
62
+ else:
63
+ transfer_encoding = "chunked"
64
+ response_headers["transfer-encoding"] = transfer_encoding
65
+
66
+ return EnhancedStreamingResponse(
67
  streamer.stream_content(destination, headers),
68
+ status_code=HTTP_206_PARTIAL_CONTENT,
69
  headers=response_headers,
70
  background=BackgroundTask(streamer.close),
71
  )
 
120
  try:
121
  response = await streamer.head(video_url, headers)
122
  # clean up the headers to only include the necessary headers and remove acl headers
123
+ response_headers = {k: v for k, v in response.headers.multi_items() if k in SUPPORTED_RESPONSE_HEADERS}
124
+ if transfer_encoding := response_headers.get("transfer-encoding"):
125
+ if "chunked" not in transfer_encoding:
126
+ transfer_encoding += ", chunked"
127
+ else:
128
+ transfer_encoding = "chunked"
129
+ response_headers["transfer-encoding"] = transfer_encoding
130
 
131
  if method == "HEAD":
132
  await streamer.close()
133
+ return Response(headers=response_headers, status_code=HTTP_206_PARTIAL_CONTENT)
134
  else:
135
+ return EnhancedStreamingResponse(
136
  streamer.stream_content(video_url, headers),
137
  headers=response_headers,
138
+ status_code=HTTP_206_PARTIAL_CONTENT,
139
  background=BackgroundTask(streamer.close),
140
  )
141
  except httpx.HTTPStatusError as e:
 
176
  content=processed_content,
177
  media_type="application/vnd.apple.mpegurl",
178
  headers={
179
+ "Cache-Control": "no-cache, no-store, must-revalidate",
180
+ "Pragma": "no-cache",
181
+ "Expires": "0",
182
  },
183
  )
184
  except httpx.HTTPStatusError as e:
mediaflow_proxy/utils/http_utils.py CHANGED
@@ -1,9 +1,16 @@
1
  import logging
 
 
2
  from urllib import parse
3
 
 
4
  import httpx
5
  import tenacity
 
 
 
6
  from starlette.requests import Request
 
7
  from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
8
 
9
  from mediaflow_proxy.configs import settings
@@ -84,7 +91,7 @@ class Streamer:
84
  """
85
  async with self.client.stream("GET", url, headers=headers, follow_redirects=True) as self.response:
86
  self.response.raise_for_status()
87
- async for chunk in self.response.aiter_bytes():
88
  yield chunk
89
 
90
  async def head(self, url: str, headers: dict):
@@ -266,3 +273,80 @@ def get_proxy_headers(request: Request) -> dict:
266
  request_headers = {k: v for k, v in request.headers.items() if k in SUPPORTED_REQUEST_HEADERS}
267
  request_headers.update({k[2:].lower(): v for k, v in request.query_params.items() if k.startswith("h_")})
268
  return request_headers
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import logging
2
+ import typing
3
+ from functools import partial
4
  from urllib import parse
5
 
6
+ import anyio
7
  import httpx
8
  import tenacity
9
+ from fastapi import Response
10
+ from starlette.background import BackgroundTask
11
+ from starlette.concurrency import iterate_in_threadpool
12
  from starlette.requests import Request
13
+ from starlette.types import Receive, Send, Scope
14
  from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
15
 
16
  from mediaflow_proxy.configs import settings
 
91
  """
92
  async with self.client.stream("GET", url, headers=headers, follow_redirects=True) as self.response:
93
  self.response.raise_for_status()
94
+ async for chunk in self.response.aiter_raw():
95
  yield chunk
96
 
97
  async def head(self, url: str, headers: dict):
 
273
  request_headers = {k: v for k, v in request.headers.items() if k in SUPPORTED_REQUEST_HEADERS}
274
  request_headers.update({k[2:].lower(): v for k, v in request.query_params.items() if k.startswith("h_")})
275
  return request_headers
276
+
277
+
278
+ class EnhancedStreamingResponse(Response):
279
+ body_iterator: typing.AsyncIterable[typing.Any]
280
+
281
+ def __init__(
282
+ self,
283
+ content: typing.Union[typing.AsyncIterable[typing.Any], typing.Iterable[typing.Any]],
284
+ status_code: int = 200,
285
+ headers: typing.Optional[typing.Mapping[str, str]] = None,
286
+ media_type: typing.Optional[str] = None,
287
+ background: typing.Optional[BackgroundTask] = None,
288
+ ) -> None:
289
+ if isinstance(content, typing.AsyncIterable):
290
+ self.body_iterator = content
291
+ else:
292
+ self.body_iterator = iterate_in_threadpool(content)
293
+ self.status_code = status_code
294
+ self.media_type = self.media_type if media_type is None else media_type
295
+ self.background = background
296
+ self.init_headers(headers)
297
+
298
+ @staticmethod
299
+ async def listen_for_disconnect(receive: Receive) -> None:
300
+ try:
301
+ while True:
302
+ message = await receive()
303
+ if message["type"] == "http.disconnect":
304
+ logger.debug("Client disconnected")
305
+ break
306
+ except Exception as e:
307
+ logger.error(f"Error in listen_for_disconnect: {str(e)}")
308
+
309
+ async def stream_response(self, send: Send) -> None:
310
+ try:
311
+ await send(
312
+ {
313
+ "type": "http.response.start",
314
+ "status": self.status_code,
315
+ "headers": self.raw_headers,
316
+ }
317
+ )
318
+ async for chunk in self.body_iterator:
319
+ if not isinstance(chunk, (bytes, memoryview)):
320
+ chunk = chunk.encode(self.charset)
321
+ try:
322
+ await send({"type": "http.response.body", "body": chunk, "more_body": True})
323
+ except (ConnectionResetError, anyio.BrokenResourceError):
324
+ logger.info("Client disconnected during streaming")
325
+ return
326
+
327
+ await send({"type": "http.response.body", "body": b"", "more_body": False})
328
+ except Exception as e:
329
+ logger.error(f"Error in stream_response: {str(e)}")
330
+
331
+ async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
332
+ async with anyio.create_task_group() as task_group:
333
+
334
+ async def wrap(func: typing.Callable[[], typing.Awaitable[None]]) -> None:
335
+ try:
336
+ await func()
337
+ except ExceptionGroup as e:
338
+ if not any(isinstance(exc, anyio.get_cancelled_exc_class()) for exc in e.exceptions):
339
+ logger.exception("Error in streaming task")
340
+ raise
341
+ except Exception as e:
342
+ if not isinstance(e, anyio.get_cancelled_exc_class()):
343
+ logger.exception("Error in streaming task")
344
+ raise
345
+ finally:
346
+ task_group.cancel_scope.cancel()
347
+
348
+ task_group.start_soon(wrap, partial(self.stream_response, send))
349
+ await wrap(partial(self.listen_for_disconnect, receive))
350
+
351
+ if self.background is not None:
352
+ await self.background()