mhdzumair commited on
Commit
3ca55c5
1 Parent(s): 9d4007c

Enhance streaming functionality and progress tracking

Browse files
mediaflow_proxy/utils/http_utils.py CHANGED
@@ -82,8 +82,12 @@ class Streamer:
82
  self.client = client
83
  self.response = None
84
  self.progress_bar = None
 
 
 
 
85
 
86
- async def stream_content(self, url: str, headers: dict):
87
  """
88
  Streams content from a URL.
89
 
@@ -94,36 +98,59 @@ class Streamer:
94
  Yields:
95
  bytes: Chunks of the streamed content.
96
  """
97
- async with self.client.stream("GET", url, headers=headers, follow_redirects=True) as self.response:
98
- self.response.raise_for_status()
99
-
100
- if settings.enable_streaming_progress:
101
- with tqdm_asyncio(
102
- total=self.get_content_length(),
103
- initial=self.get_start_byte(headers),
104
- unit="B",
105
- unit_scale=True,
106
- unit_divisor=1024,
107
- desc="MediaFlow Proxy Streaming",
108
- ) as self.progress_bar:
 
 
 
 
 
 
 
 
 
 
 
 
 
109
  async for chunk in self.response.aiter_bytes():
110
  yield chunk
111
- self.progress_bar.update(len(chunk))
112
- else:
113
- async for chunk in self.response.aiter_bytes():
114
- yield chunk
115
-
116
- def get_content_length(self) -> int:
117
- content_range = self.response.headers.get("Content-Range")
118
- if content_range:
119
- return int(content_range.split("/")[-1])
120
- return int(self.response.headers.get("Content-Length", 0))
121
 
122
  @staticmethod
123
- def get_start_byte(headers: dict) -> int:
124
- range_header = headers.get("range")
125
- start_byte = int(range_header.split("=")[-1].split("-")[0])
126
- return start_byte
 
 
 
 
 
 
 
 
 
 
 
 
 
 
127
 
128
  async def head(self, url: str, headers: dict):
129
  """
@@ -167,7 +194,6 @@ class Streamer:
167
  await self.response.aclose()
168
  if self.progress_bar:
169
  self.progress_bar.close()
170
- logger.info("Stopped the current streaming session")
171
  await self.client.aclose()
172
 
173
 
 
82
  self.client = client
83
  self.response = None
84
  self.progress_bar = None
85
+ self.bytes_transferred = 0
86
+ self.start_byte = 0
87
+ self.end_byte = 0
88
+ self.total_size = 0
89
 
90
+ async def stream_content(self, url: str, headers: dict) -> typing.AsyncGenerator[bytes, None]:
91
  """
92
  Streams content from a URL.
93
 
 
98
  Yields:
99
  bytes: Chunks of the streamed content.
100
  """
101
+ try:
102
+ async with self.client.stream("GET", url, headers=headers, follow_redirects=True) as self.response:
103
+ self.response.raise_for_status()
104
+ self.parse_content_range()
105
+
106
+ if settings.enable_streaming_progress:
107
+ with tqdm_asyncio(
108
+ total=self.total_size,
109
+ initial=self.start_byte,
110
+ unit="B",
111
+ unit_scale=True,
112
+ unit_divisor=1024,
113
+ desc="Streaming",
114
+ ncols=100,
115
+ mininterval=1,
116
+ ) as self.progress_bar:
117
+ async for chunk in self.response.aiter_bytes():
118
+ yield chunk
119
+ chunk_size = len(chunk)
120
+ self.bytes_transferred += chunk_size
121
+ self.progress_bar.set_postfix_str(
122
+ f"📥 : {self.format_bytes(self.bytes_transferred)}", refresh=False
123
+ )
124
+ self.progress_bar.update(chunk_size)
125
+ else:
126
  async for chunk in self.response.aiter_bytes():
127
  yield chunk
128
+ except GeneratorExit:
129
+ logger.info("Streaming session stopped by the user")
130
+ except Exception as e:
131
+ logger.error(f"Error streaming content: {e}")
132
+ finally:
133
+ await self.close()
 
 
 
 
134
 
135
  @staticmethod
136
+ def format_bytes(size) -> str:
137
+ power = 2**10
138
+ n = 0
139
+ units = {0: "B", 1: "KB", 2: "MB", 3: "GB", 4: "TB"}
140
+ while size > power:
141
+ size /= power
142
+ n += 1
143
+ return f"{size:.2f} {units[n]}"
144
+
145
+ def parse_content_range(self):
146
+ content_range = self.response.headers.get("Content-Range", "")
147
+ if content_range:
148
+ range_info = content_range.split()[-1]
149
+ self.start_byte, self.end_byte, self.total_size = map(int, range_info.replace("/", "-").split("-"))
150
+ else:
151
+ self.start_byte = 0
152
+ self.total_size = int(self.response.headers.get("Content-Length", 0))
153
+ self.end_byte = self.total_size - 1 if self.total_size > 0 else 0
154
 
155
  async def head(self, url: str, headers: dict):
156
  """
 
194
  await self.response.aclose()
195
  if self.progress_bar:
196
  self.progress_bar.close()
 
197
  await self.client.aclose()
198
 
199
 
pyproject.toml CHANGED
@@ -1,6 +1,6 @@
1
  [tool.poetry]
2
  name = "mediaflow-proxy"
3
- version = "1.7.5"
4
  description = "A high-performance proxy server for streaming media, supporting HTTP(S), HLS, and MPEG-DASH with real-time DRM decryption."
5
  authors = ["mhdzumair <[email protected]>"]
6
  readme = "README.md"
 
1
  [tool.poetry]
2
  name = "mediaflow-proxy"
3
+ version = "1.7.6"
4
  description = "A high-performance proxy server for streaming media, supporting HTTP(S), HLS, and MPEG-DASH with real-time DRM decryption."
5
  authors = ["mhdzumair <[email protected]>"]
6
  readme = "README.md"