AJA / form_filler.py
prabinpanta0's picture
Update form_filler.py
e539434 unverified
raw
history blame
6.26 kB
import logging
import random
from concurrent.futures import ThreadPoolExecutor
import time
import os
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from xvfbwrapper import Xvfb
from threading import Lock, Event
class FormFiller:
def __init__(self):
self.total_iterations = 0
self.responses_sent = 0
self.errors = 0
self.iterations_left = 0
self.stop_flag = Event()
self.lock = Lock()
self.environment_status = []
def fill_form(self, url, num_iterations, update_callback=None, environment_id=1):
self.stop_flag.clear()
display = None
driver = None
try:
# Start Xvfb display
display = Xvfb(width=1280, height=720)
display.start()
# Modify the chrome options to specify the cache directory
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--disable-gpu")
# Create a cache directory if it doesn't exist
cache_dir = os.path.join(os.getcwd(), '.cache')
if not os.path.exists(cache_dir):
os.makedirs(cache_dir)
# Specify the cache directory
chrome_options.add_argument(f"--disk-cache-dir={cache_dir}")
chrome_options.add_argument(f"--cache-dir={cache_dir}")
driver = webdriver.Chrome(options=chrome_options)
driver.get(url)
for _ in range(num_iterations):
if self.stop_flag.is_set():
break
try:
# Handling radio buttons
radio_buttons = driver.find_elements(By.CSS_SELECTOR, 'div[role="radiogroup"]')
for radio_group in radio_buttons:
options = radio_group.find_elements(By.CSS_SELECTOR, 'div[role="radio"]')
# Use weighted random choice with varying weights
weights = [random.random() for _ in options]
chosen_option = random.choices(options, weights=weights, k=1)[0]
chosen_option.click()
# Handling checkboxes
checkboxes = driver.find_elements(By.CSS_SELECTOR, 'div[role="checkbox"]')
for checkbox in checkboxes:
if random.random() < random.uniform(0.3, 0.7): # Vary checkbox selection probability
checkbox.click()
# Handling multiple choice grids
grids = driver.find_elements(By.CSS_SELECTOR, 'div[role="grid"]')
for grid in grids:
rows = grid.find_elements(By.CSS_SELECTOR, 'div[role="row"]')
for row in rows:
options = row.find_elements(By.CSS_SELECTOR, 'div[role="radio"]')
# Use weighted random choice for each row
weights = [random.random() for _ in options]
chosen_option = random.choices(options, weights=weights, k=1)[0]
chosen_option.click()
# Submit the form
submit = driver.find_element(By.CSS_SELECTOR, 'div[role="button"][aria-label="Submit"]')
submit.click()
logging.info(f"Form submitted successfully by Environment {environment_id}")
with self.lock:
self.responses_sent += 1
self.iterations_left = self.total_iterations - self.responses_sent
self.environment_status.append(
f"Environment {environment_id}: Total Responses Sent: {self.responses_sent}, Errors: {self.errors}, Iterations Left: {self.iterations_left}"
)
# Wait and reload the form
time.sleep(random.uniform(2, 8)) # Increase variation in wait time
driver.get(url)
except Exception as e:
with self.lock:
self.errors += 1
self.iterations_left = self.total_iterations - self.responses_sent
self.environment_status.append(
f"Environment {environment_id}: Error occurred: {e}"
)
logging.error(f"Error occurred in Environment {environment_id}: {e}")
finally:
if driver:
driver.quit()
if display:
if 'DISPLAY' in os.environ:
display.stop()
def stop(self):
self.stop_flag.set()
def fill_form_in_parallel(self, url, total_iterations, update_callback=None):
self.total_iterations = total_iterations
self.iterations_left = total_iterations
self.responses_sent = 0
self.errors = 0
self.environment_status = []
num_envs = 10 # Total number of environments to run concurrently
iterations_per_env = total_iterations // num_envs # Each environment should process its share of iterations
with ThreadPoolExecutor(max_workers=num_envs) as executor:
futures = []
for i in range(num_envs):
futures.append(executor.submit(self.fill_form, url, iterations_per_env, update_callback, i + 1))
for future in futures:
future.result()
# Handle remaining iterations
remaining_iterations = total_iterations % num_envs
if remaining_iterations > 0:
with ThreadPoolExecutor(max_workers=remaining_iterations) as executor:
futures = []
for i in range(remaining_iterations):
futures.append(executor.submit(self.fill_form, url, 1, update_callback, i + 1))
for future in futures:
future.result()