Spaces:
Sleeping
Sleeping
import os | |
import re | |
import io | |
import zipfile | |
import tempfile | |
import requests | |
from fastapi import FastAPI, HTTPException, Form | |
from fastapi.responses import StreamingResponse | |
from playwright.async_api import async_playwright, TimeoutError | |
from bs4 import BeautifulSoup | |
app = FastAPI() | |
async def read_root(): | |
return {"message": "Hello, World!"} | |
class LoginException(HTTPException): | |
"""Custom exception for login failures.""" | |
def __init__(self, message: str, status_code: int = 420): | |
super().__init__(status_code=status_code, detail=message) | |
class URLException(HTTPException): | |
"""Custom exception for URL failure.""" | |
def __init__(self, message: str, status_code: int = 419): | |
super().__init__(status_code=status_code, detail=message) | |
async def login(page, username, password): | |
try: | |
await page.goto("https://portal.intiaro.com/login?configuratorVersion=2.5") | |
await page.fill('input[name="userName"]', username) | |
await page.fill('input[name="password"]', password) | |
await page.click('button[type=submit]') | |
await page.wait_for_timeout(2000) # Wait for navigation after login | |
# Catch if loging wasn't successful | |
success_element_selector = '.active' | |
try: | |
await page.wait_for_selector(success_element_selector, timeout=2000) | |
print('Login successful') | |
except Exception: | |
raise LoginException("Login failed or the page did not load correctly.") | |
except Exception as e: | |
raise LoginException(f"An error occurred during login: {str(e)}") | |
async def siloshot_making(page, fabric_name): | |
await page.wait_for_timeout(20000) # Wait for the page to fully load | |
html = await page.content() | |
soup = BeautifulSoup(html, 'html.parser') | |
await page.wait_for_timeout(1000) | |
# Zbieranie nazwy produktu z nagłówka h1 o klasie ng-star-inserted | |
product_name_element = soup.find('h1', class_='ng-star-inserted') | |
if product_name_element: | |
product_name = product_name_element.text.strip() | |
print(f'Product name collected: {product_name}') | |
else: | |
raise Exception('Product name not found in the page.') | |
# Click the search button | |
await page.click('intiaro-two-states-button') | |
await page.wait_for_timeout(2000) | |
# Input the fabric name into the search field | |
await page.fill('input.searchInput', fabric_name) | |
await page.wait_for_timeout(2000) | |
# Click the search button | |
await page.click('button.searchButton') | |
await page.wait_for_timeout(2000) | |
# Click the choice button | |
await page.locator('button.choice-button-wrapper', has=page.locator(f'div:has-text("{fabric_name}")')).click() | |
# Click siloshot button | |
await page.locator('div.silo-shoot-widget').click(timeout=10000) # Timeout 10 sekund | |
# Open siloshot in new tab | |
await page.click('button.intiaro-popup-button:has-text("Open")') | |
await page.wait_for_timeout(3000) | |
# Save all open tabss | |
pages = page.context.pages | |
print(f'Open pages: {[p.url for p in pages]}') | |
# Searching tab with match tab URLs | |
new_page = None | |
for p in pages: | |
if 'https://public-api.intiaro.com' in p.url: | |
new_page = p | |
break | |
elif 'https://backend.intiaro.com' in p.url: | |
new_page = p | |
break | |
if new_page is None: | |
raise ValueError("No page found with the specified URL pattern.") | |
# Go to public api card | |
await new_page.bring_to_front() | |
# Download slider image | |
html = await new_page.content() | |
soup = BeautifulSoup(html, 'html.parser') | |
slider = soup.find('img') | |
# If slider exist get src of siloshot | |
if slider and slider.get('src'): | |
img = slider.get('src') | |
print(f'Image URL: {img}') | |
return img | |
else: | |
raise Exception('Image not found in the new page.') | |
def generate_urls(img, start_angle=0, end_angle=350, step=10): | |
# Finds “angle/” pattern and angle number | |
angle_pattern = re.compile(r'(angle/)(\d+)') | |
match = angle_pattern.search(img) | |
if match: | |
base_url = img[:match.start(2)] # URL to the beginning of the angle number | |
suffix_url = img[match.end(2):] # URL by angle number | |
# Create list of URLs with all angles | |
urls = [] | |
for angle in range(start_angle, end_angle + 1, step): | |
new_url = f"{base_url}{angle}{suffix_url}" | |
urls.append(new_url) | |
return urls | |
async def download_images(img_urls, fabric_name): | |
# Compile a regex pattern to extract the angle from the URL | |
angle_pattern = re.compile(r'angle/(\d+)') | |
count_img_downloaded = 0 # Counter for downloaded images | |
image_paths = [] # List to store paths of downloaded images | |
# Create a temporary directory to store the images before zipping | |
with tempfile.TemporaryDirectory() as tmpdirname: | |
for img_url in img_urls: | |
try: | |
# Send a GET request to download the image | |
img_response = requests.get(img_url) | |
img_response.raise_for_status() # Raise an error if the download failed | |
# Extract the angle from the image URL | |
angle_match = angle_pattern.search(img_url) | |
if angle_match: | |
# Create a file name using the product name and the angle | |
angle_value = angle_match.group(1) | |
img_name = f"{angle_value}_{fabric_name}.png" | |
else: | |
# Default file name if the angle is not found | |
img_name = "product_unknown.jpg" | |
# Define the full path where the image will be saved | |
img_path = os.path.join(tmpdirname, img_name) | |
image_paths.append(img_path) # Add the image path to the list | |
# Write the image content to a file | |
with open(img_path, 'wb') as file: | |
file.write(img_response.content) | |
count_img_downloaded += 1 # Increment the download counter | |
except Exception as e: | |
# Print an error message if the download fails | |
print(f'Failed to download {img_url}: {e}') | |
# Create a buffer to hold the zip file in memory | |
zip_buffer = io.BytesIO() | |
# Create a zip file and add all downloaded images to it | |
with zipfile.ZipFile(zip_buffer, 'w') as zipf: | |
for img_path in image_paths: | |
zipf.write(img_path, os.path.basename(img_path)) | |
zip_buffer.seek(0) # Rewind the buffer to the beginning before returning it | |
return zip_buffer # Return the zip file as a byte stream | |
async def process_images(username, password, product_url, fabric_name): | |
async with async_playwright() as playwright: | |
browser = await playwright.chromium.launch(headless=True) | |
context = await browser.new_context() | |
page = await context.new_page() | |
await login(page, username, password) | |
# Catching if URL is not valid by searching slider in product | |
success_selector = '.slider-image' | |
try: | |
await page.goto(product_url) | |
await page.wait_for_selector(success_selector, timeout=3000) | |
except Exception: | |
raise URLException(f"Your URL is not valid: {product_url}") | |
img = await siloshot_making(page, fabric_name) | |
img_urls = generate_urls(img, start_angle=0, end_angle=360, step=10) | |
zip_buffer = await download_images(img_urls) | |
await browser.close() | |
return zip_buffer | |
async def generate_images(username: str = Form(...), password: str = Form(...), product_url: str = Form(...), fabric_name: str = Form(...)): | |
# Validate that all required form fields are provided | |
if not (username and password and product_url and fabric_name): | |
# If any field is missing, raise an HTTP 400 error | |
raise HTTPException(status_code=400, detail="All input fields are required.") | |
# Call the process_images function to generate the images and zip them | |
zip_buffer = await process_images(username, password, product_url, fabric_name) | |
# Return the zip file as a streaming response with appropriate headers | |
# The Content-Disposition header is set to suggest a download with a specific filename | |
return StreamingResponse(zip_buffer, media_type="application/zip", | |
headers={"Content-Disposition": f"attachment; filename={fabric_name}_images.zip"}) | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=7860) | |