import logging import random from concurrent.futures import ThreadPoolExecutor import time import os from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from xvfbwrapper import Xvfb from threading import Lock, Event class FormFiller: def __init__(self): self.total_iterations = 0 self.responses_sent = 0 self.errors = 0 self.iterations_left = 0 self.stop_flag = Event() self.lock = Lock() self.environment_status = [] def fill_form(self, url, num_iterations, update_callback=None, environment_id=1): self.stop_flag.clear() display = None driver = None try: # Start Xvfb display display = Xvfb(width=1280, height=720) display.start() # Modify the chrome options to specify the cache directory chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") chrome_options.add_argument("--disable-gpu") # Create a cache directory if it doesn't exist cache_dir = os.path.join(os.getcwd(), '.cache') if not os.path.exists(cache_dir): os.makedirs(cache_dir) # Specify the cache directory chrome_options.add_argument(f"--disk-cache-dir={cache_dir}") chrome_options.add_argument(f"--cache-dir={cache_dir}") driver = webdriver.Chrome(options=chrome_options) driver.get(url) for _ in range(num_iterations): if self.stop_flag.is_set(): break try: # Handling radio buttons radio_buttons = driver.find_elements(By.CSS_SELECTOR, 'div[role="radiogroup"]') for radio_group in radio_buttons: options = radio_group.find_elements(By.CSS_SELECTOR, 'div[role="radio"]') # Use weighted random choice with varying weights weights = [random.random() for _ in options] chosen_option = random.choices(options, weights=weights, k=1)[0] chosen_option.click() # Handling checkboxes checkboxes = driver.find_elements(By.CSS_SELECTOR, 'div[role="checkbox"]') for checkbox in checkboxes: if random.random() < random.uniform(0.3, 0.7): # Vary checkbox selection probability checkbox.click() # Handling multiple choice grids grids = driver.find_elements(By.CSS_SELECTOR, 'div[role="grid"]') for grid in grids: rows = grid.find_elements(By.CSS_SELECTOR, 'div[role="row"]') for row in rows: options = row.find_elements(By.CSS_SELECTOR, 'div[role="radio"]') # Use weighted random choice for each row weights = [random.random() for _ in options] chosen_option = random.choices(options, weights=weights, k=1)[0] chosen_option.click() # Submit the form submit = driver.find_element(By.CSS_SELECTOR, 'div[role="button"][aria-label="Submit"]') submit.click() logging.info(f"Form submitted successfully by Environment {environment_id}") with self.lock: self.responses_sent += 1 self.iterations_left = self.total_iterations - self.responses_sent self.environment_status.append( f"Environment {environment_id}: Total Responses Sent: {self.responses_sent}, Errors: {self.errors}, Iterations Left: {self.iterations_left}" ) # Wait and reload the form time.sleep(random.uniform(2, 8)) # Increase variation in wait time driver.get(url) except Exception as e: with self.lock: self.errors += 1 self.iterations_left = self.total_iterations - self.responses_sent self.environment_status.append( f"Environment {environment_id}: Error occurred: {e}" ) logging.error(f"Error occurred in Environment {environment_id}: {e}") finally: if driver: driver.quit() if display: if 'DISPLAY' in os.environ: display.stop() def stop(self): self.stop_flag.set() def fill_form_in_parallel(self, url, total_iterations, update_callback=None): self.total_iterations = total_iterations self.iterations_left = total_iterations self.responses_sent = 0 self.errors = 0 self.environment_status = [] num_envs = 10 # Total number of environments to run concurrently iterations_per_env = total_iterations // num_envs # Each environment should process its share of iterations with ThreadPoolExecutor(max_workers=num_envs) as executor: futures = [] for i in range(num_envs): futures.append(executor.submit(self.fill_form, url, iterations_per_env, update_callback, i + 1)) for future in futures: future.result() # Handle remaining iterations remaining_iterations = total_iterations % num_envs if remaining_iterations > 0: with ThreadPoolExecutor(max_workers=remaining_iterations) as executor: futures = [] for i in range(remaining_iterations): futures.append(executor.submit(self.fill_form, url, 1, update_callback, i + 1)) for future in futures: future.result()