mhdzumair commited on
Commit
d130bf4
1 Parent(s): ae3e790

Add streaming progress feature with tqdm

Browse files

Integrate streaming progress tracking using tqdm_asyncio in the `stream_content` method, controlled by the new `enable_streaming_progress` setting. Update configuration to include this new setting and ensure the progress bar closes properly after streaming.

README.md CHANGED
@@ -41,6 +41,7 @@ Set the following environment variables:
41
 
42
  - `API_PASSWORD`: Required. Protects against unauthorized access and API network abuses.
43
  - `PROXY_URL`: Optional. HTTP/HTTPS/SOCKS5 proxy URL for forwarding network requests.
 
44
 
45
  ## Installation
46
 
 
41
 
42
  - `API_PASSWORD`: Required. Protects against unauthorized access and API network abuses.
43
  - `PROXY_URL`: Optional. HTTP/HTTPS/SOCKS5 proxy URL for forwarding network requests.
44
+ - `ENABLE_STREAMING_PROGRESS`: Optional. Enable streaming progress logging. Default is `false`.
45
 
46
  ## Installation
47
 
mediaflow_proxy/configs.py CHANGED
@@ -4,6 +4,7 @@ from pydantic_settings import BaseSettings
4
  class Settings(BaseSettings):
5
  api_password: str # The password for accessing the API endpoints.
6
  proxy_url: str | None = None # The URL of the proxy server to route requests through.
 
7
 
8
  class Config:
9
  env_file = ".env"
 
4
  class Settings(BaseSettings):
5
  api_password: str # The password for accessing the API endpoints.
6
  proxy_url: str | None = None # The URL of the proxy server to route requests through.
7
+ enable_streaming_progress: bool = False # Whether to enable streaming progress tracking.
8
 
9
  class Config:
10
  env_file = ".env"
mediaflow_proxy/utils/http_utils.py CHANGED
@@ -14,6 +14,7 @@ from starlette.concurrency import iterate_in_threadpool
14
  from starlette.requests import Request
15
  from starlette.types import Receive, Send, Scope
16
  from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
 
17
 
18
  from mediaflow_proxy.configs import settings
19
  from mediaflow_proxy.const import SUPPORTED_REQUEST_HEADERS
@@ -80,6 +81,7 @@ class Streamer:
80
  """
81
  self.client = client
82
  self.response = None
 
83
 
84
  async def stream_content(self, url: str, headers: dict):
85
  """
@@ -94,8 +96,34 @@ class Streamer:
94
  """
95
  async with self.client.stream("GET", url, headers=headers, follow_redirects=True) as self.response:
96
  self.response.raise_for_status()
97
- async for chunk in self.response.aiter_raw():
98
- yield chunk
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
99
 
100
  async def head(self, url: str, headers: dict):
101
  """
@@ -137,6 +165,9 @@ class Streamer:
137
  """
138
  if self.response:
139
  await self.response.aclose()
 
 
 
140
  await self.client.aclose()
141
 
142
 
 
14
  from starlette.requests import Request
15
  from starlette.types import Receive, Send, Scope
16
  from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
17
+ from tqdm.asyncio import tqdm as tqdm_asyncio
18
 
19
  from mediaflow_proxy.configs import settings
20
  from mediaflow_proxy.const import SUPPORTED_REQUEST_HEADERS
 
81
  """
82
  self.client = client
83
  self.response = None
84
+ self.progress_bar = None
85
 
86
  async def stream_content(self, url: str, headers: dict):
87
  """
 
96
  """
97
  async with self.client.stream("GET", url, headers=headers, follow_redirects=True) as self.response:
98
  self.response.raise_for_status()
99
+
100
+ if settings.enable_streaming_progress:
101
+ with tqdm_asyncio(
102
+ total=self.get_content_length(),
103
+ initial=self.get_start_byte(headers),
104
+ unit="B",
105
+ unit_scale=True,
106
+ unit_divisor=1024,
107
+ desc="MediaFlow Proxy Streaming",
108
+ ) as self.progress_bar:
109
+ async for chunk in self.response.aiter_bytes():
110
+ yield chunk
111
+ self.progress_bar.update(len(chunk))
112
+ else:
113
+ async for chunk in self.response.aiter_bytes():
114
+ yield chunk
115
+
116
+ def get_content_length(self) -> int:
117
+ content_range = self.response.headers.get("Content-Range")
118
+ if content_range:
119
+ return int(content_range.split("/")[-1])
120
+ return int(self.response.headers.get("Content-Length", 0))
121
+
122
+ @staticmethod
123
+ def get_start_byte(headers: dict) -> int:
124
+ range_header = headers.get("range")
125
+ start_byte = int(range_header.split("=")[-1].split("-")[0])
126
+ return start_byte
127
 
128
  async def head(self, url: str, headers: dict):
129
  """
 
165
  """
166
  if self.response:
167
  await self.response.aclose()
168
+ if self.progress_bar:
169
+ self.progress_bar.close()
170
+ logger.info("Stopped the current streaming session")
171
  await self.client.aclose()
172
 
173