Spaces:
Runtime error
Runtime error
# noaa_incidents.py | |
import os | |
import hashlib | |
import json | |
import pickle | |
import threading | |
import time | |
from pathlib import Path | |
from datetime import datetime | |
from typing import Dict, List, Optional, Tuple | |
import logging | |
import pandas as pd | |
import requests | |
from bs4 import BeautifulSoup | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
from tqdm import tqdm | |
import chromadb | |
from chromadb.utils import embedding_functions | |
from chromadb.config import Settings | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s' | |
) | |
logger = logging.getLogger(__name__) | |
# Constants | |
BASE_URL = "https://incidentnews.noaa.gov" | |
BROWSE_URL = f"{BASE_URL}/browse/date" | |
OUTPUT_DIR = Path("output") | |
CACHE_DIR = Path("cache") | |
MAX_RETRIES = 3 | |
BATCH_SIZE = 500 | |
class NOAAIncidentScraper: | |
""" | |
Scrapes NOAA Incident News data with caching and multi-threading support. | |
Optimized for Hugging Face Spaces environment. | |
""" | |
def __init__(self, max_workers: int = 5, cache_dir: str = 'cache'): | |
""" | |
Initialize the scraper with custom configuration. | |
Args: | |
max_workers (int): Maximum number of concurrent threads | |
cache_dir (str): Directory for caching web responses | |
""" | |
self.base_url = BASE_URL | |
self.browse_url = BROWSE_URL | |
self.headers = { | |
'User-Agent': ('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ' | |
'(KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36') | |
} | |
self.incidents_data = [] | |
self.data_lock = threading.Lock() | |
self.max_workers = max_workers | |
self.cache_dir = Path(cache_dir) | |
self.cache_dir.mkdir(exist_ok=True) | |
self.session = requests.Session() | |
self.session.headers.update(self.headers) | |
def get_cache_path(self, url: str) -> Path: | |
"""Generate a cache file path for a given URL.""" | |
url_hash = hashlib.md5(url.encode()).hexdigest() | |
return self.cache_dir / f"{url_hash}.cache" | |
def get_cached_response(self, url: str) -> Optional[str]: | |
"""Retrieve cached response for a URL.""" | |
cache_path = self.get_cache_path(url) | |
if cache_path.exists(): | |
try: | |
with cache_path.open('rb') as f: | |
return pickle.load(f) | |
except Exception as e: | |
logger.warning(f"Error loading cache for {url}: {e}") | |
return None | |
return None | |
def cache_response(self, url: str, content: str): | |
"""Cache response content for a URL.""" | |
cache_path = self.get_cache_path(url) | |
try: | |
with cache_path.open('wb') as f: | |
pickle.dump(content, f) | |
except Exception as e: | |
logger.warning(f"Error caching response for {url}: {e}") | |
def fetch_url(self, url: str, use_cache: bool = True) -> Optional[str]: | |
""" | |
Fetch URL content with caching and retry mechanism. | |
Args: | |
url (str): URL to fetch | |
use_cache (bool): Whether to use cached response | |
Returns: | |
Optional[str]: Response content or None if failed | |
""" | |
if use_cache: | |
cached = self.get_cached_response(url) | |
if cached: | |
return cached | |
for attempt in range(MAX_RETRIES): | |
try: | |
response = self.session.get(url, timeout=10) | |
response.raise_for_status() | |
content = response.text | |
if use_cache: | |
self.cache_response(url, content) | |
return content | |
except requests.RequestException as e: | |
wait_time = min(2 ** attempt, 10) # Exponential backoff with max 10s | |
logger.warning(f"Attempt {attempt + 1}/{MAX_RETRIES} failed for {url}: {e}") | |
if attempt < MAX_RETRIES - 1: | |
logger.info(f"Retrying in {wait_time} seconds...") | |
time.sleep(wait_time) | |
else: | |
logger.error(f"Failed to fetch {url} after {MAX_RETRIES} attempts") | |
return None | |
def validate_incident_data(self, incident_data: Dict) -> bool: | |
""" | |
Validate scraped incident data. | |
Args: | |
incident_data (Dict): Incident data to validate | |
Returns: | |
bool: True if data is valid | |
""" | |
required_fields = ['title', 'date', 'location'] | |
return all(incident_data.get(field) for field in required_fields) | |
def get_total_pages(self) -> int: | |
"""Get the total number of pages to scrape.""" | |
content = self.fetch_url(self.browse_url) | |
if not content: | |
return 0 | |
try: | |
soup = BeautifulSoup(content, 'html.parser') | |
pagination = soup.find('ul', class_='pagination') | |
if pagination: | |
last_page = int(pagination.find_all('li')[-2].text) | |
return last_page | |
except Exception as e: | |
logger.error(f"Error determining total pages: {e}") | |
return 1 | |
def scrape_incident_page(self, incident_url: str, use_cache: bool = True) -> Optional[Dict]: | |
""" | |
Scrape detailed information from an individual incident page. | |
Args: | |
incident_url (str): URL of the incident page | |
use_cache (bool): Whether to use cached response | |
Returns: | |
Optional[Dict]: Incident data or None if failed | |
""" | |
try: | |
full_url = f"{self.base_url}{incident_url}" | |
content = self.fetch_url(full_url, use_cache) | |
if not content: | |
return None | |
soup = BeautifulSoup(content, 'html.parser') | |
incident_details = {} | |
# Extract Title | |
title = soup.find('h1') | |
incident_details['title'] = title.text.strip() if title else None | |
# Extract Initial Notification | |
initial_notification = soup.find('p', class_='sub') | |
incident_details['initial_notification'] = ( | |
initial_notification.text.strip() if initial_notification else None | |
) | |
# Extract Location and Date | |
location_date = soup.find('p', class_='') | |
if location_date: | |
location = location_date.find('span', class_='glyphicon-map-marker') | |
if location and location.next_sibling: | |
incident_details['location'] = location.next_sibling.strip() | |
date = location_date.find('span', class_='incident-date') | |
if date: | |
incident_details['date'] = date.text.strip() | |
# Extract Additional Details from Table | |
details_table = soup.find('table', class_='table') | |
if details_table: | |
for row in details_table.find_all('tr'): | |
cols = row.find_all('td') | |
if len(cols) == 2: | |
key = cols[0].text.strip().rstrip(':').lower().replace(' ', '_') | |
value = cols[1].text.strip() | |
incident_details[key] = value | |
if not self.validate_incident_data(incident_details): | |
logger.warning(f"Invalid incident data for {incident_url}") | |
return None | |
return incident_details | |
except Exception as e: | |
logger.error(f"Error scraping incident page {incident_url}: {e}") | |
return None | |
def scrape_listing_page(self, page_number: int, use_cache: bool = True) -> List[str]: | |
""" | |
Scrape a single listing page of incidents. | |
Args: | |
page_number (int): Page number to scrape | |
use_cache (bool): Whether to use cached response | |
Returns: | |
List[str]: List of incident URLs | |
""" | |
url = f"{self.browse_url}?page={page_number}" | |
try: | |
content = self.fetch_url(url, use_cache) | |
if not content: | |
return [] | |
soup = BeautifulSoup(content, 'html.parser') | |
incidents_table = soup.find('table', class_='incident-links') | |
if not incidents_table: | |
logger.warning(f"No incidents table found on page {page_number}") | |
return [] | |
incident_urls = [] | |
for row in incidents_table.find_all('tr')[1:]: # Skip header row | |
cols = row.find_all('td') | |
if len(cols) >= 3: | |
incident_link = cols[0].find('a') | |
if incident_link and 'href' in incident_link.attrs: | |
incident_urls.append(incident_link['href']) | |
return incident_urls | |
except Exception as e: | |
logger.error(f"Error scraping listing page {page_number}: {e}") | |
return [] | |
def process_incident(self, incident_url: str, use_cache: bool = True) -> Optional[str]: | |
"""Process a single incident URL with thread safety.""" | |
incident_data = self.scrape_incident_page(incident_url, use_cache) | |
if incident_data: | |
with self.data_lock: | |
self.incidents_data.append(incident_data) | |
return incident_url | |
return None | |
def save_data(self) -> Tuple[str, str]: | |
""" | |
Save the scraped data to CSV and JSON files. | |
Returns: | |
Tuple[str, str]: Paths to saved CSV and JSON files | |
""" | |
OUTPUT_DIR.mkdir(exist_ok=True) | |
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
try: | |
# Save to CSV | |
csv_filename = OUTPUT_DIR / f'noaa_incidents_{timestamp}.csv' | |
df = pd.DataFrame(self.incidents_data) | |
df.to_csv(csv_filename, index=False) | |
# Save to JSON | |
json_filename = OUTPUT_DIR / f'noaa_incidents_{timestamp}.json' | |
with open(json_filename, 'w', encoding='utf-8') as f: | |
json.dump(self.incidents_data, f, indent=4) | |
logger.info(f"Data saved to {csv_filename} and {json_filename}") | |
return str(csv_filename), str(json_filename) | |
except Exception as e: | |
logger.error(f"Error saving data: {e}") | |
return None, None | |
def run(self, validate_first: bool = True) -> Tuple[Optional[str], Optional[str]]: | |
""" | |
Run the complete scraping process. | |
Args: | |
validate_first (bool): Whether to validate first page before full scrape | |
Returns: | |
Tuple[Optional[str], Optional[str]]: Paths to saved CSV and JSON files | |
""" | |
logger.info("Starting NOAA IncidentNews scraper...") | |
if validate_first: | |
logger.info("Performing initial validation run...") | |
test_page = self.scrape_listing_page(1, use_cache=False) | |
if not test_page: | |
logger.error("Unable to scrape the first page. Aborting.") | |
return None, None | |
test_incident = self.scrape_incident_page(test_page[0], use_cache=False) | |
if not test_incident: | |
logger.error("Unable to scrape test incident. Aborting.") | |
return None, None | |
logger.info("Validation successful, proceeding with full scrape...") | |
total_pages = self.get_total_pages() | |
logger.info(f"Found {total_pages} pages to scrape") | |
# Collect incident URLs | |
all_incident_urls = [] | |
logger.info("Collecting incident URLs...") | |
with tqdm(total=total_pages, desc="Collecting URLs") as pbar: | |
for page in range(1, total_pages + 1): | |
urls = self.scrape_listing_page(page) | |
all_incident_urls.extend(urls) | |
pbar.update(1) | |
total_incidents = len(all_incident_urls) | |
logger.info(f"Found {total_incidents} incidents to process") | |
if total_incidents == 0: | |
logger.error("No incidents found to process") | |
return None, None | |
# Process incidents | |
logger.info("Processing incidents...") | |
with ThreadPoolExecutor(max_workers=self.max_workers) as executor: | |
futures = [ | |
executor.submit(self.process_incident, url) | |
for url in all_incident_urls | |
] | |
with tqdm(total=total_incidents, desc="Scraping incidents") as pbar: | |
for future in as_completed(futures): | |
try: | |
future.result() | |
pbar.update(1) | |
except Exception as e: | |
logger.error(f"Error processing incident: {e}") | |
logger.info(f"Scraped {len(self.incidents_data)} incidents") | |
if self.incidents_data: | |
return self.save_data() | |
else: | |
logger.error("No incident data scraped") | |
return None, None | |
class NOAAIncidentDB: | |
""" | |
Manages NOAA incident data using ChromaDB for vector storage and search. | |
""" | |
def __init__(self, | |
persist_directory: str = "noaa_db", | |
embedding_model: str = "all-MiniLM-L6-v2"): | |
""" | |
Initialize the database with ChromaDB settings. | |
Args: | |
persist_directory (str): Directory for ChromaDB storage | |
embedding_model (str): Model name for embeddings | |
""" | |
self.persist_directory = persist_directory | |
os.makedirs(self.persist_directory, exist_ok=True) | |
# Initialize ChromaDB client | |
self.client = chromadb.Client(Settings( | |
persist_directory=self.persist_directory, | |
anonymized_telemetry=False | |
)) | |
# Setup embedding function | |
self.embedding_function = embedding_functions.SentenceTransformerEmbeddingFunction( | |
model_name=embedding_model | |
) | |
# Get or create collection | |
self.collection = self.client.get_or_create_collection( | |
name="noaa_incidents", | |
embedding_function=self.embedding_function, | |
metadata={"description": "NOAA incident reports database"} | |
) | |
def load_incidents(self, csv_path: str) -> int: | |
""" | |
Load incidents from CSV into ChromaDB. | |
Args: | |
csv_path (str): Path to CSV file | |
Returns: | |
int: Number of incidents loaded | |
""" | |
logger.info(f"Loading incidents from {csv_path}") | |
try: | |
df = pd.read_csv(csv_path, dtype=str) | |
df = df.where(pd.notna(df), None) | |
documents = [] | |
metadatas = [] | |
ids = [] | |
for idx, row in df.iterrows(): | |
# Generate unique ID | |
# Continue from the previous code... | |
# Generate unique ID using title, date, location and index | |
unique_string = str(row.get('title', '')) + '_' + str(row.get('date', '')) + '_' + str(row.get('location', '')) + '_' + str(idx) | |
incident_id = "incident_" + hashlib.md5(unique_string.encode()).hexdigest()[:8] | |
# Create searchable document content | |
doc_content = "\n".join([ | |
"Incident: " + str(row.get('title', 'N/A')), | |
"Location: " + str(row.get('location', 'N/A')), | |
"Date: " + str(row.get('date', 'N/A')), | |
"Details: " + str(row.get('initial_notification', '')) | |
]) | |
# Create metadata | |
metadata = { | |
'title': str(row.get('title', 'N/A')), | |
'date': str(row.get('date', 'N/A')), | |
'location': str(row.get('location', 'N/A')) | |
} | |
# Add any additional fields present | |
for col in df.columns: | |
if col not in ['title', 'date', 'location'] and pd.notna(row[col]): | |
metadata[col.lower().replace(' ', '_')] = str(row[col]) | |
documents.append(doc_content.strip()) | |
metadatas.append(metadata) | |
ids.append(incident_id) | |
# Add to database in batches | |
total_documents = len(documents) | |
for i in range(0, total_documents, BATCH_SIZE): | |
batch_end = min(i + BATCH_SIZE, total_documents) | |
self.collection.add( | |
documents=documents[i:batch_end], | |
metadatas=metadatas[i:batch_end], | |
ids=ids[i:batch_end] | |
) | |
logger.info(f"Added batch {i // BATCH_SIZE + 1} with {batch_end - i} incidents") | |
logger.info(f"Successfully loaded {total_documents} incidents into ChromaDB") | |
return total_documents | |
except Exception as e: | |
logger.error(f"Error loading incidents from CSV: {e}") | |
return 0 | |
def search(self, query: str, n_results: int = 5) -> List[Dict]: | |
""" | |
Search for incidents matching the query. | |
Args: | |
query (str): Search query | |
n_results (int): Number of results to return | |
Returns: | |
List[Dict]: List of matching incidents | |
""" | |
try: | |
results = self.collection.query( | |
query_texts=[query], | |
n_results=n_results, | |
include=['metadatas', 'documents', 'ids'] | |
) | |
formatted_results = [] | |
for doc, metadata, incident_id in zip( | |
results['documents'][0], | |
results['metadatas'][0], | |
results['ids'][0] | |
): | |
result = { | |
'id': incident_id, | |
'title': metadata.get('title', 'N/A'), | |
'date': metadata.get('date', 'N/A'), | |
'location': metadata.get('location', 'N/A'), | |
'details': doc, | |
'metadata': metadata | |
} | |
formatted_results.append(result) | |
return formatted_results | |
except Exception as e: | |
logger.error(f"Error during search: {e}") | |
return [] | |
def delete_collection(self): | |
"""Delete the current collection.""" | |
try: | |
self.client.delete_collection("noaa_incidents") | |
logger.info("Collection deleted successfully") | |
except Exception as e: | |
logger.error(f"Error deleting collection: {e}") | |
def get_collection_stats(self) -> Dict: | |
""" | |
Get statistics about the current collection. | |
Returns: | |
Dict: Collection statistics | |
""" | |
try: | |
count = self.collection.count() | |
return { | |
"total_documents": count, | |
"collection_name": "noaa_incidents", | |
"embedding_model": self.embedding_function.model_name | |
} | |
except Exception as e: | |
logger.error(f"Error getting collection stats: {e}") | |
return {} | |
if __name__ == "__main__": | |
# Example usage | |
scraper = NOAAIncidentScraper(max_workers=5) | |
csv_file, json_file = scraper.run(validate_first=True) | |
if csv_file: | |
db = NOAAIncidentDB() | |
num_loaded = db.load_incidents(csv_file) | |
logger.info(f"Loaded {num_loaded} incidents into database") | |
# Example search | |
results = db.search("oil spill near coral reefs", n_results=5) | |
for i, result in enumerate(results, 1): | |
print(f"\nResult {i}:") | |
print(f"Title: {result['title']}") | |
print(f"Date: {result['date']}") | |
print(f"Location: {result['location']}") | |
print(f"Details: {result['details']}\n") |