Spaces:
Sleeping
Sleeping
streamlit_app.py | |
import streamlit as st | |
from streamlit_tags import st_tags_sidebar | |
import pandas as pd | |
import json | |
from datetime import datetime | |
from scraper import fetch_html_selenium, save_raw_data, format_data, save_formatted_data, calculate_price, html_to_markdown_with_readability, create_dynamic_listing_model, create_listings_container_model, scrape_url | |
from pagination_detector import detect_pagination_elements, PaginationData | |
import re | |
from urllib.parse import urlparse | |
from assets import PRICING | |
import os | |
from pydantic import BaseModel | |
def serialize_pydantic(obj): | |
if isinstance(obj, BaseModel): | |
return obj.dict() | |
raise TypeError(f'Object of type {obj.__class__.__name__} is not JSON serializable') | |
# Initialize Streamlit app | |
st.set_page_config(page_title="Universal Web Scraper", page_icon="🦑") | |
st.title("Universal Web Scraper 🦑") | |
# Initialize session state variables if they don't exist | |
if 'results' not in st.session_state: | |
st.session_state['results'] = None | |
if 'perform_scrape' not in st.session_state: | |
st.session_state['perform_scrape'] = False | |
# Sidebar components | |
st.sidebar.title("Web Scraper Settings") | |
model_selection = st.sidebar.selectbox("Select Model", options=list(PRICING.keys()), index=0) | |
url_input = st.sidebar.text_input("Enter URL(s) separated by whitespace") | |
# Add toggle to show/hide tags field | |
show_tags = st.sidebar.toggle("Enable Scraping") | |
# Conditionally show tags input based on the toggle | |
tags = [] | |
if show_tags: | |
tags = st_tags_sidebar( | |
label='Enter Fields to Extract:', | |
text='Press enter to add a tag', | |
value=[], | |
suggestions=[], | |
maxtags=-1, | |
key='tags_input' | |
) | |
st.sidebar.markdown("---") | |
# Add pagination toggle and input | |
use_pagination = st.sidebar.toggle("Enable Pagination") | |
pagination_details = None | |
if use_pagination: | |
pagination_details = st.sidebar.text_input("Enter Pagination Details (optional)", | |
help="Describe how to navigate through pages (e.g., 'Next' button class, URL pattern)") | |
st.sidebar.markdown("---") | |
def generate_unique_folder_name(url): | |
timestamp = datetime.now().strftime('%Y_%m_%d__%H_%M_%S') | |
# Parse the URL | |
parsed_url = urlparse(url) | |
# Extract the domain name | |
domain = parsed_url.netloc or parsed_url.path.split('/')[0] | |
# Remove 'www.' if present | |
domain = re.sub(r'^www\.', '', domain) | |
# Remove any non-alphanumeric characters and replace with underscores | |
clean_domain = re.sub(r'\W+', '_', domain) | |
return f"{clean_domain}_{timestamp}" | |
def scrape_multiple_urls(urls, fields, selected_model): | |
output_folder = os.path.join('output', generate_unique_folder_name(urls[0])) | |
os.makedirs(output_folder, exist_ok=True) | |
total_input_tokens = 0 | |
total_output_tokens = 0 | |
total_cost = 0 | |
all_data = [] | |
first_url_markdown = None | |
for i, url in enumerate(urls, start=1): | |
raw_html = fetch_html_selenium(url) | |
markdown = html_to_markdown_with_readability(raw_html) | |
if i == 1: | |
first_url_markdown = markdown | |
input_tokens, output_tokens, cost, formatted_data = scrape_url(url, fields, selected_model, output_folder, i, markdown) | |
total_input_tokens += input_tokens | |
total_output_tokens += output_tokens | |
total_cost += cost | |
all_data.append(formatted_data) | |
return output_folder, total_input_tokens, total_output_tokens, total_cost, all_data, first_url_markdown | |
# Define the scraping function | |
def perform_scrape(): | |
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
raw_html = fetch_html_selenium(url_input) | |
markdown = html_to_markdown_with_readability(raw_html) | |
save_raw_data(markdown, timestamp) | |
# Detect pagination if enabled | |
pagination_info = None | |
if use_pagination: | |
pagination_data, token_counts, pagination_price = detect_pagination_elements( | |
url_input, pagination_details, model_selection, markdown | |
) | |
pagination_info = { | |
"page_urls": pagination_data.page_urls, | |
"token_counts": token_counts, | |
"price": pagination_price | |
} | |
# Initialize token and cost variables with default values | |
input_tokens = 0 | |
output_tokens = 0 | |
total_cost = 0 | |
if show_tags: | |
DynamicListingModel = create_dynamic_listing_model(tags) | |
DynamicListingsContainer = create_listings_container_model(DynamicListingModel) | |
formatted_data, tokens_count = format_data( | |
markdown, DynamicListingsContainer, DynamicListingModel, model_selection | |
) | |
input_tokens, output_tokens, total_cost = calculate_price(tokens_count, model=model_selection) | |
df = save_formatted_data(formatted_data, timestamp) | |
else: | |
formatted_data = None | |
df = None | |
return df, formatted_data, markdown, input_tokens, output_tokens, total_cost, timestamp, pagination_info | |
if st.sidebar.button("Scrape"): | |
with st.spinner('Please wait... Data is being scraped.'): | |
urls = url_input.split() | |
field_list = tags | |
output_folder, total_input_tokens, total_output_tokens, total_cost, all_data, first_url_markdown = scrape_multiple_urls(urls, field_list, model_selection) | |
# Perform pagination if enabled and only one URL is provided | |
pagination_info = None | |
if use_pagination and len(urls) == 1: | |
try: | |
pagination_result = detect_pagination_elements( | |
urls[0], pagination_details, model_selection, first_url_markdown | |
) | |
if pagination_result is not None: | |
pagination_data, token_counts, pagination_price = pagination_result | |
# Handle both PaginationData objects and dictionaries | |
if isinstance(pagination_data, PaginationData): | |
page_urls = pagination_data.page_urls | |
elif isinstance(pagination_data, dict): | |
page_urls = pagination_data.get("page_urls", []) | |
else: | |
page_urls = [] | |
pagination_info = { | |
"page_urls": page_urls, | |
"token_counts": token_counts, | |
"price": pagination_price | |
} | |
else: | |
st.warning("Pagination detection returned None. No pagination information available.") | |
except Exception as e: | |
st.error(f"An error occurred during pagination detection: {e}") | |
pagination_info = { | |
"page_urls": [], | |
"token_counts": {"input_tokens": 0, "output_tokens": 0}, | |
"price": 0.0 | |
} | |
st.session_state['results'] = (all_data, None, first_url_markdown, total_input_tokens, total_output_tokens, total_cost, output_folder, pagination_info) | |
st.session_state['perform_scrape'] = True | |
# Display results if they exist in session state | |
if st.session_state['results']: | |
all_data, _, _, input_tokens, output_tokens, total_cost, output_folder, pagination_info = st.session_state['results'] | |
# Display scraping details in sidebar only if scraping was performed and the toggle is on | |
if all_data and show_tags: | |
st.sidebar.markdown("---") | |
st.sidebar.markdown("### Scraping Details") | |
st.sidebar.markdown("#### Token Usage") | |
st.sidebar.markdown(f"*Input Tokens:* {input_tokens}") | |
st.sidebar.markdown(f"*Output Tokens:* {output_tokens}") | |
st.sidebar.markdown(f"**Total Cost:** :green-background[**${total_cost:.4f}**]") | |
# Display scraped data in main area | |
st.subheader("Scraped/Parsed Data") | |
for i, data in enumerate(all_data, start=1): | |
st.write(f"Data from URL {i}:") | |
# Handle string data (convert to dict if it's JSON) | |
if isinstance(data, str): | |
try: | |
data = json.loads(data) | |
except json.JSONDecodeError: | |
st.error(f"Failed to parse data as JSON for URL {i}") | |
continue | |
if isinstance(data, dict): | |
if 'listings' in data and isinstance(data['listings'], list): | |
df = pd.DataFrame(data['listings']) | |
else: | |
# If 'listings' is not in the dict or not a list, use the entire dict | |
df = pd.DataFrame([data]) | |
elif hasattr(data, 'listings') and isinstance(data.listings, list): | |
# Handle the case where data is a Pydantic model | |
listings = [item.dict() for item in data.listings] | |
df = pd.DataFrame(listings) | |
else: | |
st.error(f"Unexpected data format for URL {i}") | |
continue | |
# Display the dataframe | |
st.dataframe(df, use_container_width=True) | |
# Download options | |
st.subheader("Download Options") | |
col1, col2 = st.columns(2) | |
with col1: | |
json_data = json.dumps(all_data, default=lambda o: o.dict() if hasattr(o, 'dict') else str(o), indent=4) | |
st.download_button( | |
"Download JSON", | |
data=json_data, | |
file_name="scraped_data.json" | |
) | |
with col2: | |
# Convert all data to a single DataFrame | |
all_listings = [] | |
for data in all_data: | |
if isinstance(data, str): | |
try: | |
data = json.loads(data) | |
except json.JSONDecodeError: | |
continue | |
if isinstance(data, dict) and 'listings' in data: | |
all_listings.extend(data['listings']) | |
elif hasattr(data, 'listings'): | |
all_listings.extend([item.dict() for item in data.listings]) | |
else: | |
all_listings.append(data) | |
combined_df = pd.DataFrame(all_listings) | |
st.download_button( | |
"Download CSV", | |
data=combined_df.to_csv(index=False), | |
file_name="scraped_data.csv" | |
) | |
st.success(f"Scraping completed. Results saved in {output_folder}") | |
# Add pagination details to sidebar | |
if pagination_info and use_pagination: | |
st.sidebar.markdown("---") | |
st.sidebar.markdown("### Pagination Details") | |
st.sidebar.markdown(f"**Number of Page URLs:** {len(pagination_info['page_urls'])}") | |
st.sidebar.markdown("#### Pagination Token Usage") | |
st.sidebar.markdown(f"*Input Tokens:* {pagination_info['token_counts']['input_tokens']}") | |
st.sidebar.markdown(f"*Output Tokens:* {pagination_info['token_counts']['output_tokens']}") | |
st.sidebar.markdown(f"**Pagination Cost:** :red-background[**${pagination_info['price']:.4f}**]") | |
st.markdown("---") | |
st.subheader("Pagination Information") | |
pagination_df = pd.DataFrame(pagination_info["page_urls"], columns=["Page URLs"]) | |
st.dataframe( | |
pagination_df, | |
column_config={ | |
"Page URLs": st.column_config.LinkColumn("Page URLs") | |
},use_container_width=True | |
) | |
# Create columns for download buttons | |
col1, col2 = st.columns(2) | |
with col1: | |
st.download_button( | |
"Download Pagination JSON", | |
data=json.dumps(pagination_info["page_urls"], indent=4), | |
file_name=f"pagination_urls.json" | |
) | |
with col2: | |
st.download_button( | |
"Download Pagination CSV", | |
data=pagination_df.to_csv(index=False), | |
file_name=f"pagination_urls.csv" | |
) | |
# Display combined totals only if both scraping and pagination were performed and both toggles are on | |
if all_data and pagination_info and show_tags and use_pagination: | |
st.markdown("---") | |
total_input_tokens = input_tokens + pagination_info['token_counts']['input_tokens'] | |
total_output_tokens = output_tokens + pagination_info['token_counts']['output_tokens'] | |
total_combined_cost = total_cost + pagination_info['price'] | |
st.markdown("### Total Counts and Cost (Including Pagination)") | |
st.markdown(f"**Total Input Tokens:** {total_input_tokens}") | |
st.markdown(f"**Total Output Tokens:** {total_output_tokens}") | |
st.markdown(f"**Total Combined Cost:** :green[**${total_combined_cost:.4f}**]") | |
# Add a clear results button | |
if st.sidebar.button("Clear Results"): | |
st.session_state['results'] = None | |
st.session_state['perform_scrape'] = False | |
st.rerun() | |