mhdzumair commited on
Commit
1470245
1 Parent(s): 1f95615

Fix HLS proxy when content is chunked & removed upstream content-encoding header & parse m3u8 playlist based on response content-type header in-case of destination doesn't end with .m3u8

Browse files
Files changed (1) hide show
  1. mediaflow_proxy/handlers.py +65 -40
mediaflow_proxy/handlers.py CHANGED
@@ -31,17 +31,43 @@ async def handle_hls_stream_proxy(request: Request, destination: str, headers: d
31
  Returns:
32
  Response: The HTTP response with the processed m3u8 playlist or streamed content.
33
  """
 
 
 
 
 
 
 
34
  try:
35
- if destination.endswith((".m3u", ".m3u8")) or "mpegurl" in headers.get("accept", "").lower():
36
- return await fetch_and_process_m3u8(destination, headers, request, key_url)
37
-
38
- return await handle_stream_request(request.method, destination, headers)
 
 
 
 
 
 
 
 
 
 
 
 
 
39
  except httpx.HTTPStatusError as e:
40
- logger.error(f"HTTP error while fetching m3u8: {e}")
41
- return Response(status_code=e.response.status_code, content=str(e))
 
 
 
 
 
42
  except Exception as e:
43
- logger.exception(f"Error in live_stream_proxy: {str(e)}")
44
- return Response(status_code=500, content=f"Internal server error: {str(e)}")
 
45
 
46
 
47
  async def proxy_stream(method: str, video_url: str, headers: dict):
@@ -90,23 +116,27 @@ async def handle_stream_request(method: str, video_url: str, headers: dict):
90
  background=BackgroundTask(streamer.close),
91
  )
92
  except httpx.HTTPStatusError as e:
93
- logger.error(f"Upstream service error while handling {method} request: {e}")
94
  await client.aclose()
 
95
  return Response(status_code=e.response.status_code, content=f"Upstream service error: {e}")
96
  except DownloadError as e:
 
97
  logger.error(f"Error downloading {video_url}: {e}")
98
- return Response(status_code=502, content=str(e))
99
  except Exception as e:
100
- logger.error(f"Internal server error while handling {method} request: {e}")
101
  await client.aclose()
 
102
  return Response(status_code=502, content=f"Internal server error: {e}")
103
 
104
 
105
- async def fetch_and_process_m3u8(url: str, headers: dict, request: Request, key_url: HttpUrl = None):
 
 
106
  """
107
  Fetches and processes the m3u8 playlist, converting it to an HLS playlist.
108
 
109
  Args:
 
110
  url (str): The URL of the m3u8 playlist.
111
  headers (dict): The headers to include in the request.
112
  request (Request): The incoming HTTP request.
@@ -115,34 +145,29 @@ async def fetch_and_process_m3u8(url: str, headers: dict, request: Request, key_
115
  Returns:
116
  Response: The HTTP response with the processed m3u8 playlist.
117
  """
118
- async with httpx.AsyncClient(
119
- follow_redirects=True,
120
- timeout=httpx.Timeout(30.0),
121
- limits=httpx.Limits(max_keepalive_connections=10, max_connections=20),
122
- proxy=settings.proxy_url,
123
- ) as client:
124
- try:
125
- streamer = Streamer(client)
126
- content = await streamer.get_text(url, headers)
127
- processor = M3U8Processor(request, key_url)
128
- processed_content = await processor.process_m3u8(content, str(streamer.response.url))
129
- return Response(
130
- content=processed_content,
131
- media_type="application/vnd.apple.mpegurl",
132
- headers={
133
- "Content-Disposition": "inline",
134
- "Accept-Ranges": "none",
135
- },
136
- )
137
- except httpx.HTTPStatusError as e:
138
- logger.error(f"HTTP error while fetching m3u8: {e}")
139
- return Response(status_code=e.response.status_code, content=str(e))
140
- except DownloadError as e:
141
- logger.error(f"Error downloading m3u8: {url}")
142
- return Response(status_code=502, content=str(e))
143
- except Exception as e:
144
- logger.exception(f"Unexpected error while processing m3u8: {e}")
145
- return Response(status_code=502, content=str(e))
146
 
147
 
148
  async def handle_drm_key_data(key_id, key, drm_info):
 
31
  Returns:
32
  Response: The HTTP response with the processed m3u8 playlist or streamed content.
33
  """
34
+ client = httpx.AsyncClient(
35
+ follow_redirects=True,
36
+ timeout=httpx.Timeout(30.0),
37
+ limits=httpx.Limits(max_keepalive_connections=10, max_connections=20),
38
+ proxy=settings.proxy_url,
39
+ )
40
+ streamer = Streamer(client)
41
  try:
42
+ if destination.endswith((".m3u", ".m3u8")):
43
+ return await fetch_and_process_m3u8(streamer, destination, headers, request, key_url)
44
+
45
+ response = await streamer.head(destination, headers)
46
+ if "mpegurl" in response.headers.get("content-type", "").lower():
47
+ return await fetch_and_process_m3u8(streamer, destination, headers, request, key_url)
48
+
49
+ headers.update({"accept-ranges": headers.get("range", "bytes=0-")})
50
+ # handle the encoding response header, since decompression is handled by the httpx
51
+ if "content-encoding" in response.headers:
52
+ del response.headers["content-encoding"]
53
+
54
+ return StreamingResponse(
55
+ streamer.stream_content(destination, headers),
56
+ headers=response.headers,
57
+ background=BackgroundTask(streamer.close),
58
+ )
59
  except httpx.HTTPStatusError as e:
60
+ await client.aclose()
61
+ logger.error(f"Upstream service error while handling request: {e}")
62
+ return Response(status_code=e.response.status_code, content=f"Upstream service error: {e}")
63
+ except DownloadError as e:
64
+ await client.aclose()
65
+ logger.error(f"Error downloading {destination}: {e}")
66
+ return Response(status_code=e.status_code, content=str(e))
67
  except Exception as e:
68
+ await client.aclose()
69
+ logger.error(f"Internal server error while handling request: {e}")
70
+ return Response(status_code=502, content=f"Internal server error: {e}")
71
 
72
 
73
  async def proxy_stream(method: str, video_url: str, headers: dict):
 
116
  background=BackgroundTask(streamer.close),
117
  )
118
  except httpx.HTTPStatusError as e:
 
119
  await client.aclose()
120
+ logger.error(f"Upstream service error while handling {method} request: {e}")
121
  return Response(status_code=e.response.status_code, content=f"Upstream service error: {e}")
122
  except DownloadError as e:
123
+ await client.aclose()
124
  logger.error(f"Error downloading {video_url}: {e}")
125
+ return Response(status_code=e.status_code, content=str(e))
126
  except Exception as e:
 
127
  await client.aclose()
128
+ logger.error(f"Internal server error while handling {method} request: {e}")
129
  return Response(status_code=502, content=f"Internal server error: {e}")
130
 
131
 
132
+ async def fetch_and_process_m3u8(
133
+ streamer: Streamer, url: str, headers: dict, request: Request, key_url: HttpUrl = None
134
+ ):
135
  """
136
  Fetches and processes the m3u8 playlist, converting it to an HLS playlist.
137
 
138
  Args:
139
+ streamer (Streamer): The HTTP client to use for streaming.
140
  url (str): The URL of the m3u8 playlist.
141
  headers (dict): The headers to include in the request.
142
  request (Request): The incoming HTTP request.
 
145
  Returns:
146
  Response: The HTTP response with the processed m3u8 playlist.
147
  """
148
+ try:
149
+ content = await streamer.get_text(url, headers)
150
+ processor = M3U8Processor(request, key_url)
151
+ processed_content = await processor.process_m3u8(content, str(streamer.response.url))
152
+ return Response(
153
+ content=processed_content,
154
+ media_type="application/vnd.apple.mpegurl",
155
+ headers={
156
+ "Content-Disposition": "inline",
157
+ "Accept-Ranges": "none",
158
+ },
159
+ )
160
+ except httpx.HTTPStatusError as e:
161
+ logger.error(f"HTTP error while fetching m3u8: {e}")
162
+ return Response(status_code=e.response.status_code, content=str(e))
163
+ except DownloadError as e:
164
+ logger.error(f"Error downloading m3u8: {url}")
165
+ return Response(status_code=502, content=str(e))
166
+ except Exception as e:
167
+ logger.exception(f"Unexpected error while processing m3u8: {e}")
168
+ return Response(status_code=502, content=str(e))
169
+ finally:
170
+ await streamer.close()
 
 
 
 
 
171
 
172
 
173
  async def handle_drm_key_data(key_id, key, drm_info):