|
import os |
|
import re |
|
import json |
|
import time |
|
import streamlit as st |
|
import base64 |
|
from io import BytesIO |
|
from dotenv import load_dotenv |
|
from google import genai |
|
import asyncio |
|
import pyppeteer |
|
import platform |
|
import sys |
|
|
|
|
|
load_dotenv() |
|
GEMINI_API_KEY = 'AIzaSyAOK9vRTSRQzd22B2gmbiuIePbZTDyaGYs' |
|
|
|
|
|
async def setup_browser(): |
|
"""Get a pyppeteer browser instance that works in Hugging Face Spaces""" |
|
try: |
|
st.info("Initializing headless browser...") |
|
browser = await pyppeteer.launch( |
|
headless=True, |
|
args=[ |
|
'--no-sandbox', |
|
'--disable-setuid-sandbox', |
|
'--disable-dev-shm-usage', |
|
'--disable-gpu', |
|
'--disable-software-rasterizer' |
|
] |
|
) |
|
page = await browser.newPage() |
|
await page.setUserAgent('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36') |
|
|
|
|
|
await page.setViewport({'width': 1280, 'height': 800}) |
|
|
|
return {"browser": browser, "page": page} |
|
except Exception as e: |
|
st.error(f"Failed to initialize browser: {e}") |
|
raise |
|
|
|
|
|
def run_async(async_func, *args, **kwargs): |
|
try: |
|
loop = asyncio.new_event_loop() |
|
asyncio.set_event_loop(loop) |
|
result = loop.run_until_complete(async_func(*args, **kwargs)) |
|
loop.close() |
|
return result |
|
except Exception as e: |
|
st.error(f"Async execution error: {e}") |
|
raise |
|
|
|
|
|
async def async_take_screenshot(browser_data): |
|
"""Takes a screenshot using pyppeteer""" |
|
try: |
|
page = browser_data["page"] |
|
screenshot_bytes = await page.screenshot() |
|
return screenshot_bytes |
|
except Exception as e: |
|
st.error(f"Screenshot error: {e}") |
|
return None |
|
|
|
def take_screenshot(browser_data): |
|
"""Wrapper for async screenshot function""" |
|
return run_async(async_take_screenshot, browser_data) |
|
|
|
def extract_questions_from_fb_data(html): |
|
""" |
|
Parses the rendered HTML to extract questions and options from the |
|
FB_PUBLIC_LOAD_DATA_ JavaScript variable. |
|
""" |
|
match = re.search(r'var\s+FB_PUBLIC_LOAD_DATA_\s*=\s*(\[.*?\]);</script>', html, re.DOTALL) |
|
if not match: |
|
st.error("FB_PUBLIC_LOAD_DATA_ not found in HTML.") |
|
return [] |
|
raw_json = match.group(1) |
|
|
|
replacements = { |
|
r'\\n': '\n', |
|
r'\\u003c': '<', |
|
r'\\u003e': '>', |
|
r'\\u0026': '&', |
|
r'\\"': '"' |
|
} |
|
for old, new in replacements.items(): |
|
raw_json = raw_json.replace(old, new) |
|
raw_json = re.sub(r'[\x00-\x08\x0B-\x1F\x7F]', '', raw_json) |
|
try: |
|
data = json.loads(raw_json) |
|
except json.JSONDecodeError as e: |
|
st.error(f"Error decoding FB_PUBLIC_LOAD_DATA_ JSON: {e}") |
|
return [] |
|
|
|
|
|
questions = [] |
|
try: |
|
questions_data = data[1][1] |
|
except (IndexError, TypeError): |
|
return questions |
|
|
|
for item in questions_data: |
|
if not isinstance(item, list) or len(item) < 2: |
|
continue |
|
q_text = item[1] if isinstance(item[1], str) else None |
|
if not q_text: |
|
continue |
|
q_text = q_text.strip() |
|
|
|
choices = [] |
|
if len(item) > 4 and isinstance(item[4], list): |
|
for block in item[4]: |
|
if isinstance(block, list) and len(block) > 1 and isinstance(block[1], list): |
|
for opt in block[1]: |
|
if isinstance(opt, list) and len(opt) > 0 and isinstance(opt[0], str): |
|
choices.append(opt[0]) |
|
questions.append({ |
|
"question_text": q_text, |
|
"options": choices |
|
}) |
|
return questions |
|
|
|
def generate_answers(questions, api_key): |
|
""" |
|
For each question, call Google Gemini to generate an answer that matches available options. |
|
""" |
|
client = genai.Client(api_key=api_key) |
|
for q in questions: |
|
question_text = q["question_text"] |
|
options = q["options"] |
|
|
|
if options: |
|
|
|
prompt = f""" |
|
Question: {question_text} |
|
|
|
These are the EXACT options (choose only one): |
|
{', '.join([f'"{opt}"' for opt in options])} |
|
|
|
Instructions: |
|
1. Choose exactly ONE option from the list above |
|
2. Return ONLY the exact text of the chosen option, nothing else |
|
3. Do not add any explanation, just the option text |
|
4. Do not add quotation marks around the option |
|
5. Do not answer questions like "What is your name?","Rollno","PRN/GRN","Email","Mobile No","Address","DOB etc |
|
|
|
Answer: |
|
""" |
|
else: |
|
|
|
prompt = f""" |
|
Question: {question_text} |
|
|
|
Please provide a brief and direct answer to this question. |
|
Keep your answer concise (1-2 sentences maximum). |
|
|
|
Answer: |
|
""" |
|
|
|
try: |
|
response = client.models.generate_content( |
|
model="gemini-1.5-flash", |
|
contents=prompt |
|
) |
|
|
|
answer = response.text.strip() |
|
|
|
|
|
if options: |
|
exact_match = False |
|
for opt in options: |
|
if opt.lower() == answer.lower(): |
|
answer = opt |
|
exact_match = True |
|
break |
|
|
|
|
|
if not exact_match: |
|
from difflib import SequenceMatcher |
|
best_match = max(options, key=lambda opt: SequenceMatcher(None, opt.lower(), answer.lower()).ratio()) |
|
answer = best_match |
|
|
|
q["gemini_answer"] = answer |
|
|
|
except Exception as e: |
|
q["gemini_answer"] = f"Error: {str(e)}" |
|
|
|
return questions |
|
|
|
async def async_fill_form(browser_data, questions): |
|
"""Fills the Google Form with generated answers using pyppeteer""" |
|
page = browser_data["page"] |
|
|
|
|
|
question_containers = await page.querySelectorAll('div.freebirdFormviewerViewItemsItemItem, div[role="listitem"]') |
|
|
|
if not question_containers: |
|
st.error("Could not locate question containers in the form.") |
|
return False |
|
|
|
|
|
print(f"Found {len(question_containers)} question containers in the form") |
|
print(f"We have {len(questions)} questions with answers to fill") |
|
|
|
|
|
await asyncio.sleep(2) |
|
|
|
for idx, q in enumerate(questions): |
|
if idx >= len(question_containers): |
|
break |
|
|
|
print(f"\n--------- Processing Question {idx+1} ---------") |
|
container = question_containers[idx] |
|
answer = q.get("gemini_answer", "").strip() |
|
options = q.get("options", []) |
|
|
|
|
|
print(f"Question: {q['question_text']}") |
|
print(f"Generated Answer: {answer}") |
|
|
|
if options: |
|
try: |
|
print(f"This is a multiple-choice question with {len(options)} options") |
|
|
|
|
|
option_elements = await container.querySelectorAll('div[role="radio"], label, div.appsMaterialWizToggleRadiogroupRadioButtonContainer, .docssharedWizToggleLabeledLabelWrapper') |
|
|
|
if not option_elements: |
|
st.warning(f"Could not find option elements for question {idx+1}") |
|
print("No option elements found with any selector strategy") |
|
continue |
|
|
|
|
|
import re |
|
normalized_answer = re.sub(r'[^\w\s]', '', answer.lower()).strip() |
|
|
|
|
|
clicked = False |
|
print("\nTrying exact matches...") |
|
|
|
|
|
for i, opt_elem in enumerate(option_elements): |
|
|
|
opt_text = await page.evaluate('(element) => element.textContent || element.innerText || ""', opt_elem) |
|
opt_text = opt_text.strip() |
|
|
|
|
|
if not opt_text: |
|
opt_text = await page.evaluate('(element) => element.getAttribute("aria-label") || ""', opt_elem) |
|
opt_text = opt_text.strip() |
|
|
|
|
|
normalized_opt = re.sub(r'[^\w\s]', '', opt_text.lower()).strip() |
|
|
|
|
|
if normalized_opt and normalized_opt == normalized_answer: |
|
await opt_elem.click() |
|
clicked = True |
|
print(f"Clicked option: '{opt_text}' (exact match)") |
|
break |
|
|
|
|
|
if not clicked: |
|
for i, opt_elem in enumerate(option_elements): |
|
opt_text = await page.evaluate('(element) => element.textContent || element.innerText || element.getAttribute("aria-label") || ""', opt_elem) |
|
opt_text = opt_text.strip() |
|
normalized_opt = re.sub(r'[^\w\s]', '', opt_text.lower()).strip() |
|
|
|
if normalized_opt and (normalized_opt in normalized_answer or normalized_answer in normalized_opt): |
|
await opt_elem.click() |
|
clicked = True |
|
print(f"Clicked option: '{opt_text}' (substring match)") |
|
break |
|
|
|
|
|
if not clicked and option_elements: |
|
await option_elements[0].click() |
|
print("No match found. Clicked first option as fallback") |
|
|
|
except Exception as e: |
|
st.error(f"Error filling multiple-choice question {idx+1}: {e}") |
|
print(f"Exception: {str(e)}") |
|
else: |
|
try: |
|
print("This is a text question") |
|
|
|
input_elem = await container.querySelector('input[type="text"], textarea, input') |
|
|
|
if input_elem: |
|
await input_elem.type(answer) |
|
print(f"Filled text answer: {answer}") |
|
else: |
|
st.error(f"Could not locate input element for question {idx+1}") |
|
print("Failed to find any input element for this question") |
|
|
|
except Exception as e: |
|
st.error(f"Error filling text question {idx+1}: {e}") |
|
print(f"Exception: {str(e)}") |
|
|
|
print("\n---------- Form filling completed ----------") |
|
return True |
|
|
|
def fill_form(browser_data, questions): |
|
"""Wrapper for async fill form function""" |
|
return run_async(async_fill_form, browser_data, questions) |
|
|
|
async def async_login_to_google(browser_data, email, password): |
|
"""Logs into Google account using pyppeteer""" |
|
try: |
|
page = browser_data["page"] |
|
|
|
|
|
await page.goto("https://accounts.google.com/signin") |
|
await asyncio.sleep(2) |
|
|
|
|
|
screenshot = await async_take_screenshot(browser_data) |
|
st.image(screenshot, caption="Login Page", use_column_width=True) |
|
|
|
|
|
email_input = await page.waitForSelector('input[type="email"]') |
|
await email_input.type(email) |
|
await page.keyboard.press('Enter') |
|
await asyncio.sleep(2) |
|
|
|
|
|
screenshot = await async_take_screenshot(browser_data) |
|
st.image(screenshot, caption="Email Entered", use_column_width=True) |
|
|
|
|
|
password_input = await page.waitForSelector('input[type="password"]') |
|
await password_input.type(password) |
|
await page.keyboard.press('Enter') |
|
await asyncio.sleep(5) |
|
|
|
|
|
screenshot = await async_take_screenshot(browser_data) |
|
st.image(screenshot, caption="Login Attempt Result", use_column_width=True) |
|
|
|
|
|
try: |
|
|
|
if "accounts.google.com/signin" not in page.url: |
|
return True |
|
|
|
|
|
page_content = await page.content() |
|
if "2-Step Verification" in page_content or "verification" in page_content.lower(): |
|
st.warning("Two-factor authentication detected. Please complete it in the browser window.") |
|
return "2FA" |
|
|
|
return False |
|
|
|
except Exception: |
|
|
|
if "accounts.google.com/signin" not in page.url: |
|
return True |
|
return False |
|
|
|
except Exception as e: |
|
st.error(f"Error during login: {str(e)}") |
|
return False |
|
|
|
def login_to_google(browser_data, email, password): |
|
"""Wrapper for async login function""" |
|
return run_async(async_login_to_google, browser_data, email, password) |
|
|
|
|
|
st.title("Google Form Auto Filler with Gemini") |
|
st.write(""" |
|
This app uses a headless browser to help you fill Google Forms automatically with AI-generated answers. |
|
You'll be able to see screenshots of what's happening in the browser as it progresses. |
|
""") |
|
|
|
|
|
if "browser_data" not in st.session_state: |
|
st.session_state.browser_data = None |
|
if "login_status" not in st.session_state: |
|
st.session_state.login_status = None |
|
if "form_filled" not in st.session_state: |
|
st.session_state.form_filled = False |
|
if "screenshot" not in st.session_state: |
|
st.session_state.screenshot = None |
|
|
|
|
|
st.header("Step 1: Login to Google Account") |
|
|
|
|
|
with st.form("google_login"): |
|
email = st.text_input("Google Email") |
|
password = st.text_input("Google Password", type="password") |
|
submit_button = st.form_submit_button("Login to Google") |
|
|
|
if submit_button and email and password: |
|
|
|
try: |
|
browser_data = run_async(setup_browser) |
|
st.session_state.browser_data = browser_data |
|
|
|
|
|
screenshot = take_screenshot(browser_data) |
|
st.session_state.screenshot = screenshot |
|
st.image(screenshot, caption="Browser Started", use_column_width=True) |
|
|
|
|
|
login_result = login_to_google(browser_data, email, password) |
|
st.session_state.login_status = login_result |
|
|
|
if login_result == True: |
|
st.success("Login successful!") |
|
elif login_result == "2FA": |
|
st.warning("Two-factor authentication may be required. Check the screenshot for verification prompts.") |
|
st.info("You might need to complete 2FA in the browser window. Screenshots will update as you proceed.") |
|
else: |
|
st.error("Login failed. Please check your credentials and try again.") |
|
|
|
except Exception as e: |
|
st.error(f"Error initializing browser: {str(e)}") |
|
st.info("If you're seeing this error, please check your Hugging Face Space logs for details.") |
|
|
|
|
|
if st.session_state.login_status == False: |
|
st.info("If you can see that you're actually logged in from the screenshot above, click the button below:") |
|
if st.button("I'm actually logged in successfully"): |
|
st.session_state.login_status = True |
|
st.success("Login status manually confirmed! You can proceed to the form filling step.") |
|
|
|
|
|
if st.session_state.login_status == "2FA" and st.session_state.browser_data: |
|
if st.button("Take New Screenshot (for 2FA completion check)"): |
|
screenshot = take_screenshot(st.session_state.browser_data) |
|
st.session_state.screenshot = screenshot |
|
st.image(screenshot, caption="Current Browser State", use_column_width=True) |
|
|
|
|
|
page = st.session_state.browser_data["page"] |
|
current_url = run_async(lambda: page.url) |
|
if "accounts.google.com/signin" not in current_url: |
|
st.success("Looks like you completed 2FA! You can proceed to the form filling step.") |
|
st.session_state.login_status = True |
|
|
|
if st.session_state.browser_data and (st.session_state.login_status == True or st.session_state.login_status == "2FA"): |
|
st.header("Step 2: Fill Google Form") |
|
|
|
|
|
st.markdown("### Enter your Google Form URL below:") |
|
form_url = st.text_input("Form URL:", key="form_url_input") |
|
|
|
if form_url: |
|
|
|
if "form_url" not in st.session_state: |
|
st.session_state.form_url = form_url |
|
|
|
|
|
if st.button("Process Form", key="process_form_button") or "questions" in st.session_state: |
|
browser_data = st.session_state.browser_data |
|
|
|
|
|
if "questions" not in st.session_state: |
|
page = browser_data["page"] |
|
run_async(lambda: page.goto(form_url)) |
|
time.sleep(5) |
|
|
|
|
|
screenshot = take_screenshot(browser_data) |
|
st.image(screenshot, caption="Google Form Loaded", use_column_width=True) |
|
|
|
html = run_async(lambda: page.content()) |
|
|
|
|
|
questions = extract_questions_from_fb_data(html) |
|
if not questions: |
|
st.error("No questions extracted from the form.") |
|
else: |
|
st.success(f"Successfully extracted {len(questions)} questions from the form.") |
|
|
|
|
|
with st.spinner("Generating answers with Gemini..."): |
|
questions = generate_answers(questions, GEMINI_API_KEY) |
|
|
|
|
|
st.session_state.questions = questions |
|
else: |
|
|
|
questions = st.session_state.questions |
|
|
|
|
|
st.write("--- Generated Answers ---") |
|
for idx, q in enumerate(questions, start=1): |
|
st.write(f"**Question {idx}:** {q['question_text']}") |
|
if q["options"]: |
|
st.write("Options:", ", ".join(q["options"])) |
|
else: |
|
st.write("(No multiple-choice options)") |
|
st.write("**Generated Answer:**", q["gemini_answer"]) |
|
st.write("---") |
|
|
|
|
|
st.markdown("### Form Actions") |
|
|
|
|
|
if not st.session_state.get("form_filled", False): |
|
if st.button("Fill Form with Generated Answers", key="fill_form_button"): |
|
with st.spinner("Filling form..."): |
|
|
|
page = browser_data["page"] |
|
run_async(lambda: page.goto(st.session_state.form_url)) |
|
time.sleep(3) |
|
|
|
if fill_form(browser_data, questions): |
|
time.sleep(2) |
|
|
|
|
|
filled_screenshot = take_screenshot(browser_data) |
|
st.session_state.filled_screenshot = filled_screenshot |
|
st.session_state.form_filled = True |
|
|
|
st.success("Form successfully filled with generated answers!") |
|
st.image(filled_screenshot, caption="Form Filled with Answers", use_column_width=True) |
|
|
|
|
|
if st.session_state.get("form_filled", False) and "filled_screenshot" in st.session_state: |
|
if not st.session_state.get("showing_filled_form", False): |
|
st.image(st.session_state.filled_screenshot, caption="Form Filled with Generated Answers", use_column_width=True) |
|
st.session_state.showing_filled_form = True |
|
|
|
|
|
st.success("β
Form has been filled with AI-generated answers! Just go and change your name and stuff") |
|
st.info("π‘ You can check the answers generated by opening the form link on your browser.") |
|
st.markdown(f"π **Form Link:** [Open in Browser]({form_url})") |
|
|
|
|
|
if st.session_state.browser_data: |
|
st.markdown("---") |
|
if st.button("Close Browser"): |
|
try: |
|
|
|
browser_data = st.session_state.browser_data |
|
run_async(lambda: browser_data["browser"].close()) |
|
|
|
|
|
st.session_state.browser_data = None |
|
st.session_state.login_status = None |
|
st.session_state.form_filled = False |
|
st.session_state.questions = None |
|
st.session_state.form_url = None |
|
st.session_state.filled_screenshot = None |
|
st.session_state.showing_filled_form = False |
|
st.success("Browser closed. All session data cleared.") |
|
except Exception as e: |
|
st.error(f"Error closing browser: {e}") |