import os import re import io import zipfile import tempfile import requests from fastapi import FastAPI, HTTPException, Form from fastapi.responses import StreamingResponse from playwright.async_api import async_playwright, TimeoutError from bs4 import BeautifulSoup app = FastAPI() @app.get("/") async def read_root(): return {"message": "Hello, World!"} class AppState: product_name = None class LoginException(HTTPException): """Custom exception for login failures.""" def __init__(self, message: str, status_code: int = 420): super().__init__(status_code=status_code, detail=message) class URLException(HTTPException): """Custom exception for URL failure.""" def __init__(self, message: str, status_code: int = 419): super().__init__(status_code=status_code, detail=message) class FabricNoFound(HTTPException): """Custom exception for fabric not found errors.""" def __init__(self, fabric_name: str, status_code: int = 432): super().__init__(status_code=status_code, detail=f"Fabric '{fabric_name}' not found.") async def login(page, username, password): try: await page.goto("") await page.fill('input[name="userName"]', username) await page.fill('input[name="password"]', password) await'button[type=submit]') await page.wait_for_timeout(2000) # Wait for navigation after login # Catch if loging wasn't successful success_element_selector = '.active' try: await page.wait_for_selector(success_element_selector, timeout=2000) print('Login successful') except Exception: raise LoginException("Login failed or the page did not load correctly.") except Exception as e: raise LoginException(f"An error occurred during login: {str(e)}") async def siloshot_making(page, fabric_name): await page.wait_for_timeout(20000) # Wait for the page to fully load html = await page.content() soup = BeautifulSoup(html, 'html.parser') await page.wait_for_timeout(1000) # Zbieranie nazwy produktu z nagłówka h1 o klasie ng-star-inserted #product_name_element = soup.find('h1', class_='ng-star-inserted') #if product_name_element: #AppState.product_name = product_name_element.text.strip() #print(f'Product name collected: {AppState.product_name}') #else: #raise Exception('Product name not found in the page.') # Click the search button await'intiaro-two-states-button') await page.wait_for_timeout(2000) # Input the fabric name into the search field await page.fill('input.searchInput', fabric_name) await page.wait_for_timeout(2000) # Click the search button await'button.searchButton') await page.wait_for_timeout(2000) # Click the choice button try: await page.locator(f'button.choice-button-wrapper:has(div:has-text("{fabric_name}"))').click(timeout=5000) except TimeoutError: raise FabricNoFound(fabric_name) # Click siloshot button await page.locator('div.silo-shoot-widget').click(timeout=10000) # Timeout 10 sekund # Open siloshot in new tab await'button.intiaro-popup-button:has-text("Open")') await page.wait_for_timeout(3000) # Save all open tabss pages = page.context.pages print(f'Open pages: {[p.url for p in pages]}') # Searching tab with match tab URLs new_page = None for p in pages: if '' in p.url: new_page = p break elif '' in p.url: new_page = p break if new_page is None: raise ValueError("No page found with the specified URL pattern.") # Go to public api card await new_page.bring_to_front() # Download slider image html = await new_page.content() soup = BeautifulSoup(html, 'html.parser') slider = soup.find('img') # If slider exist get src of siloshot if slider and slider.get('src'): img = slider.get('src') print(f'Image URL: {img}') return img else: raise Exception('Image not found in the new page.') def generate_urls(img, start_angle=0, end_angle=350, step=10): # Finds “angle/” pattern and angle number angle_pattern = re.compile(r'(angle/)(\d+)') match = if match: base_url = img[:match.start(2)] # URL to the beginning of the angle number suffix_url = img[match.end(2):] # URL by angle number # Create list of URLs with all angles urls = [] for angle in range(start_angle, end_angle + 1, step): new_url = f"{base_url}{angle}{suffix_url}" urls.append(new_url) return urls async def download_images(img_urls, fabric_name, product_url): # Compile a regex pattern to extract the angle from the URL angle_pattern = re.compile(r'angle/(\d+)') count_img_downloaded = 0 # Counter for downloaded images image_paths = [] # List to store paths of downloaded images # Create a temporary directory to store the images before zipping with tempfile.TemporaryDirectory() as tmpdirname: for img_url in img_urls: try: # Send a GET request to download the image img_response = requests.get(img_url) img_response.raise_for_status() # Raise an error if the download failed # Extract the angle from the image URL angle_match = if angle_match: # Create a file name using the product name and the angle angle_value = img_name = f"{product_url}_{angle_value}_{fabric_name}.png" else: # Default file name if the angle is not found img_name = "product_unknown.jpg" # Define the full path where the image will be saved img_path = os.path.join(tmpdirname, img_name) image_paths.append(img_path) # Add the image path to the list # Write the image content to a file with open(img_path, 'wb') as file: file.write(img_response.content) count_img_downloaded += 1 # Increment the download counter except Exception as e: # Print an error message if the download fails print(f'Failed to download {img_url}: {e}') # Create a buffer to hold the zip file in memory zip_buffer = io.BytesIO() # Create a zip file and add all downloaded images to it with zipfile.ZipFile(zip_buffer, 'w') as zipf: for img_path in image_paths: zipf.write(img_path, os.path.basename(img_path)) # Rewind the buffer to the beginning before returning it return zip_buffer # Return the zip file as a byte stream async def process_images(product_url, fabric_name): async with async_playwright() as playwright: browser = await playwright.chromium.launch(headless=True) context = await browser.new_context() page = await context.new_page() #await login(page, username, password) # Catching if URL is not valid by searching slider in product success_selector = '.slider-image' try: await page.goto(f"{product_url}&build-url=") await page.wait_for_selector(success_selector, timeout=3000) except Exception: raise URLException(f"Your URL is not valid: {product_url}") img = await siloshot_making(page, fabric_name) img_urls = generate_urls(img, start_angle=0, end_angle=360, step=10) zip_buffer = await download_images(img_urls, fabric_name, product_url) await browser.close() return zip_buffer"/generate-images/") async def generate_images(product_url: str = Form(...), fabric_name: str = Form(...)): # Validate that all required form fields are provided if not (product_url and fabric_name): # If any field is missing, raise an HTTP 400 error raise HTTPException(status_code=400, detail="All input fields are required.") # Call the process_images function to generate the images and zip them zip_buffer = await process_images(product_url, fabric_name) # Return the zip file as a streaming response with appropriate headers # The Content-Disposition header is set to suggest a download with a specific filename return StreamingResponse(zip_buffer, media_type="application/zip", headers={"Content-Disposition": f"attachment; filename={fabric_name}"}) if __name__ == "__main__": import uvicorn, host="", port=7860)