mhdzumair's picture
Release MediaFlow Proxy
1ebbb74
raw
history blame
7.62 kB
import logging
from urllib import parse
import httpx
import tenacity
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from mediaflow_proxy.configs import settings
logger = logging.getLogger(__name__)
class DownloadError(Exception):
def __init__(self, status_code, message):
self.status_code = status_code
self.message = message
super().__init__(message)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
retry=retry_if_exception_type(DownloadError),
)
async def fetch_with_retry(client, method, url, headers, follow_redirects=True, **kwargs):
"""
Fetches a URL with retry logic.
Args:
client (httpx.AsyncClient): The HTTP client to use for the request.
method (str): The HTTP method to use (e.g., GET, POST).
url (str): The URL to fetch.
headers (dict): The headers to include in the request.
follow_redirects (bool, optional): Whether to follow redirects. Defaults to True.
**kwargs: Additional arguments to pass to the request.
Returns:
httpx.Response: The HTTP response.
Raises:
DownloadError: If the request fails after retries.
"""
try:
response = await client.request(method, url, headers=headers, follow_redirects=follow_redirects, **kwargs)
response.raise_for_status()
return response
except httpx.TimeoutException:
logger.warning(f"Timeout while downloading {url}")
raise DownloadError(409, f"Timeout while downloading {url}")
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error {e.response.status_code} while downloading {url}")
# if e.response.status_code == 404:
# logger.error(f"Segment Resource not found: {url}")
# raise e
raise DownloadError(e.response.status_code, f"HTTP error {e.response.status_code} while downloading {url}")
except Exception as e:
logger.error(f"Error downloading {url}: {e}")
raise
class Streamer:
def __init__(self, client):
"""
Initializes the Streamer with an HTTP client.
Args:
client (httpx.AsyncClient): The HTTP client to use for streaming.
"""
self.client = client
self.response = None
async def stream_content(self, url: str, headers: dict):
"""
Streams content from a URL.
Args:
url (str): The URL to stream content from.
headers (dict): The headers to include in the request.
Yields:
bytes: Chunks of the streamed content.
"""
async with self.client.stream("GET", url, headers=headers, follow_redirects=True) as self.response:
self.response.raise_for_status()
async for chunk in self.response.aiter_bytes():
yield chunk
async def head(self, url: str, headers: dict):
"""
Sends a HEAD request to a URL.
Args:
url (str): The URL to send the HEAD request to.
headers (dict): The headers to include in the request.
Returns:
httpx.Response: The HTTP response.
"""
try:
self.response = await fetch_with_retry(self.client, "HEAD", url, headers)
except tenacity.RetryError as e:
raise e.last_attempt.result()
return self.response
async def get_text(self, url: str, headers: dict):
"""
Sends a GET request to a URL and returns the response text.
Args:
url (str): The URL to send the GET request to.
headers (dict): The headers to include in the request.
Returns:
str: The response text.
"""
try:
self.response = await fetch_with_retry(self.client, "GET", url, headers)
except tenacity.RetryError as e:
raise e.last_attempt.result()
return self.response.text
async def close(self):
"""
Closes the HTTP client and response.
"""
if self.response:
await self.response.aclose()
await self.client.aclose()
async def download_file_with_retry(url: str, headers: dict, timeout: float = 10.0):
"""
Downloads a file with retry logic.
Args:
url (str): The URL of the file to download.
headers (dict): The headers to include in the request.
timeout (float, optional): The request timeout. Defaults to 10.0.
Returns:
bytes: The downloaded file content.
Raises:
DownloadError: If the download fails after retries.
"""
async with httpx.AsyncClient(follow_redirects=True, timeout=timeout, proxy=settings.proxy_url) as client:
try:
response = await fetch_with_retry(client, "GET", url, headers)
return response.content
except DownloadError as e:
logger.error(f"Failed to download file: {e}")
raise e
except tenacity.RetryError as e:
raise DownloadError(502, f"Failed to download file: {e.last_attempt.result()}")
async def request_with_retry(method: str, url: str, headers: dict, timeout: float = 10.0, **kwargs):
"""
Sends an HTTP request with retry logic.
Args:
method (str): The HTTP method to use (e.g., GET, POST).
url (str): The URL to send the request to.
headers (dict): The headers to include in the request.
timeout (float, optional): The request timeout. Defaults to 10.0.
**kwargs: Additional arguments to pass to the request.
Returns:
httpx.Response: The HTTP response.
Raises:
DownloadError: If the request fails after retries.
"""
async with httpx.AsyncClient(follow_redirects=True, timeout=timeout, proxy=settings.proxy_url) as client:
try:
response = await fetch_with_retry(client, method, url, headers, **kwargs)
return response
except DownloadError as e:
logger.error(f"Failed to download file: {e}")
raise
def encode_mediaflow_proxy_url(
mediaflow_proxy_url: str,
endpoint: str | None = None,
destination_url: str | None = None,
query_params: dict | None = None,
request_headers: dict | None = None,
) -> str:
"""
Encodes a MediaFlow proxy URL with query parameters and headers.
Args:
mediaflow_proxy_url (str): The base MediaFlow proxy URL.
endpoint (str, optional): The endpoint to append to the base URL. Defaults to None.
destination_url (str, optional): The destination URL to include in the query parameters. Defaults to None.
query_params (dict, optional): Additional query parameters to include. Defaults to None.
request_headers (dict, optional): Headers to include as query parameters. Defaults to None.
Returns:
str: The encoded MediaFlow proxy URL.
"""
query_params = query_params or {}
if destination_url is not None:
query_params["d"] = destination_url
# Add headers if provided
if request_headers:
query_params.update(
{key if key.startswith("h_") else f"h_{key}": value for key, value in request_headers.items()}
)
# Encode the query parameters
encoded_params = parse.urlencode(query_params, quote_via=parse.quote)
# Construct the full URL
if endpoint is None:
return f"{mediaflow_proxy_url}?{encoded_params}"
base_url = parse.urljoin(mediaflow_proxy_url, endpoint)
return f"{base_url}?{encoded_params}"