Spaces:
Paused
Paused
File size: 6,264 Bytes
85c5f42 e539434 85c5f42 e539434 85c5f42 f9cf047 85c5f42 f9cf047 85c5f42 f9cf047 85c5f42 f9cf047 85c5f42 f9cf047 85c5f42 e539434 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
import logging
import random
from concurrent.futures import ThreadPoolExecutor
import time
import os
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from xvfbwrapper import Xvfb
from threading import Lock, Event
class FormFiller:
def __init__(self):
self.total_iterations = 0
self.responses_sent = 0
self.errors = 0
self.iterations_left = 0
self.stop_flag = Event()
self.lock = Lock()
self.environment_status = []
def fill_form(self, url, num_iterations, update_callback=None, environment_id=1):
self.stop_flag.clear()
display = None
driver = None
try:
# Start Xvfb display
display = Xvfb(width=1280, height=720)
display.start()
# Modify the chrome options to specify the cache directory
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--disable-gpu")
# Create a cache directory if it doesn't exist
cache_dir = os.path.join(os.getcwd(), '.cache')
if not os.path.exists(cache_dir):
os.makedirs(cache_dir)
# Specify the cache directory
chrome_options.add_argument(f"--disk-cache-dir={cache_dir}")
chrome_options.add_argument(f"--cache-dir={cache_dir}")
driver = webdriver.Chrome(options=chrome_options)
driver.get(url)
for _ in range(num_iterations):
if self.stop_flag.is_set():
break
try:
# Handling radio buttons
radio_buttons = driver.find_elements(By.CSS_SELECTOR, 'div[role="radiogroup"]')
for radio_group in radio_buttons:
options = radio_group.find_elements(By.CSS_SELECTOR, 'div[role="radio"]')
# Use weighted random choice with varying weights
weights = [random.random() for _ in options]
chosen_option = random.choices(options, weights=weights, k=1)[0]
chosen_option.click()
# Handling checkboxes
checkboxes = driver.find_elements(By.CSS_SELECTOR, 'div[role="checkbox"]')
for checkbox in checkboxes:
if random.random() < random.uniform(0.3, 0.7): # Vary checkbox selection probability
checkbox.click()
# Handling multiple choice grids
grids = driver.find_elements(By.CSS_SELECTOR, 'div[role="grid"]')
for grid in grids:
rows = grid.find_elements(By.CSS_SELECTOR, 'div[role="row"]')
for row in rows:
options = row.find_elements(By.CSS_SELECTOR, 'div[role="radio"]')
# Use weighted random choice for each row
weights = [random.random() for _ in options]
chosen_option = random.choices(options, weights=weights, k=1)[0]
chosen_option.click()
# Submit the form
submit = driver.find_element(By.CSS_SELECTOR, 'div[role="button"][aria-label="Submit"]')
submit.click()
logging.info(f"Form submitted successfully by Environment {environment_id}")
with self.lock:
self.responses_sent += 1
self.iterations_left = self.total_iterations - self.responses_sent
self.environment_status.append(
f"Environment {environment_id}: Total Responses Sent: {self.responses_sent}, Errors: {self.errors}, Iterations Left: {self.iterations_left}"
)
# Wait and reload the form
time.sleep(random.uniform(2, 8)) # Increase variation in wait time
driver.get(url)
except Exception as e:
with self.lock:
self.errors += 1
self.iterations_left = self.total_iterations - self.responses_sent
self.environment_status.append(
f"Environment {environment_id}: Error occurred: {e}"
)
logging.error(f"Error occurred in Environment {environment_id}: {e}")
finally:
if driver:
driver.quit()
if display:
if 'DISPLAY' in os.environ:
display.stop()
def stop(self):
self.stop_flag.set()
def fill_form_in_parallel(self, url, total_iterations, update_callback=None):
self.total_iterations = total_iterations
self.iterations_left = total_iterations
self.responses_sent = 0
self.errors = 0
self.environment_status = []
num_envs = 10 # Total number of environments to run concurrently
iterations_per_env = total_iterations // num_envs # Each environment should process its share of iterations
with ThreadPoolExecutor(max_workers=num_envs) as executor:
futures = []
for i in range(num_envs):
futures.append(executor.submit(self.fill_form, url, iterations_per_env, update_callback, i + 1))
for future in futures:
future.result()
# Handle remaining iterations
remaining_iterations = total_iterations % num_envs
if remaining_iterations > 0:
with ThreadPoolExecutor(max_workers=remaining_iterations) as executor:
futures = []
for i in range(remaining_iterations):
futures.append(executor.submit(self.fill_form, url, 1, update_callback, i + 1))
for future in futures:
future.result()
|