# noaa_incidents.py import os import hashlib import json import pickle import threading import time from pathlib import Path from datetime import datetime from typing import Dict, List, Optional, Tuple import logging import pandas as pd import requests from bs4 import BeautifulSoup from concurrent.futures import ThreadPoolExecutor, as_completed from tqdm import tqdm import chromadb from chromadb.utils import embedding_functions from chromadb.config import Settings # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Constants BASE_URL = "https://incidentnews.noaa.gov" BROWSE_URL = f"{BASE_URL}/browse/date" OUTPUT_DIR = Path("output") CACHE_DIR = Path("cache") MAX_RETRIES = 3 BATCH_SIZE = 500 class NOAAIncidentScraper: """ Scrapes NOAA Incident News data with caching and multi-threading support. Optimized for Hugging Face Spaces environment. """ def __init__(self, max_workers: int = 5, cache_dir: str = 'cache'): """ Initialize the scraper with custom configuration. Args: max_workers (int): Maximum number of concurrent threads cache_dir (str): Directory for caching web responses """ self.base_url = BASE_URL self.browse_url = BROWSE_URL self.headers = { 'User-Agent': ('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ' '(KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36') } self.incidents_data = [] self.data_lock = threading.Lock() self.max_workers = max_workers self.cache_dir = Path(cache_dir) self.cache_dir.mkdir(exist_ok=True) self.session = requests.Session() self.session.headers.update(self.headers) def get_cache_path(self, url: str) -> Path: """Generate a cache file path for a given URL.""" url_hash = hashlib.md5(url.encode()).hexdigest() return self.cache_dir / f"{url_hash}.cache" def get_cached_response(self, url: str) -> Optional[str]: """Retrieve cached response for a URL.""" cache_path = self.get_cache_path(url) if cache_path.exists(): try: with cache_path.open('rb') as f: return pickle.load(f) except Exception as e: logger.warning(f"Error loading cache for {url}: {e}") return None return None def cache_response(self, url: str, content: str): """Cache response content for a URL.""" cache_path = self.get_cache_path(url) try: with cache_path.open('wb') as f: pickle.dump(content, f) except Exception as e: logger.warning(f"Error caching response for {url}: {e}") def fetch_url(self, url: str, use_cache: bool = True) -> Optional[str]: """ Fetch URL content with caching and retry mechanism. Args: url (str): URL to fetch use_cache (bool): Whether to use cached response Returns: Optional[str]: Response content or None if failed """ if use_cache: cached = self.get_cached_response(url) if cached: return cached for attempt in range(MAX_RETRIES): try: response = self.session.get(url, timeout=10) response.raise_for_status() content = response.text if use_cache: self.cache_response(url, content) return content except requests.RequestException as e: wait_time = min(2 ** attempt, 10) # Exponential backoff with max 10s logger.warning(f"Attempt {attempt + 1}/{MAX_RETRIES} failed for {url}: {e}") if attempt < MAX_RETRIES - 1: logger.info(f"Retrying in {wait_time} seconds...") time.sleep(wait_time) else: logger.error(f"Failed to fetch {url} after {MAX_RETRIES} attempts") return None def validate_incident_data(self, incident_data: Dict) -> bool: """ Validate scraped incident data. Args: incident_data (Dict): Incident data to validate Returns: bool: True if data is valid """ required_fields = ['title', 'date', 'location'] return all(incident_data.get(field) for field in required_fields) def get_total_pages(self) -> int: """Get the total number of pages to scrape.""" content = self.fetch_url(self.browse_url) if not content: return 0 try: soup = BeautifulSoup(content, 'html.parser') pagination = soup.find('ul', class_='pagination') if pagination: last_page = int(pagination.find_all('li')[-2].text) return last_page except Exception as e: logger.error(f"Error determining total pages: {e}") return 1 def scrape_incident_page(self, incident_url: str, use_cache: bool = True) -> Optional[Dict]: """ Scrape detailed information from an individual incident page. Args: incident_url (str): URL of the incident page use_cache (bool): Whether to use cached response Returns: Optional[Dict]: Incident data or None if failed """ try: full_url = f"{self.base_url}{incident_url}" content = self.fetch_url(full_url, use_cache) if not content: return None soup = BeautifulSoup(content, 'html.parser') incident_details = {} # Extract Title title = soup.find('h1') incident_details['title'] = title.text.strip() if title else None # Extract Initial Notification initial_notification = soup.find('p', class_='sub') incident_details['initial_notification'] = ( initial_notification.text.strip() if initial_notification else None ) # Extract Location and Date location_date = soup.find('p', class_='') if location_date: location = location_date.find('span', class_='glyphicon-map-marker') if location and location.next_sibling: incident_details['location'] = location.next_sibling.strip() date = location_date.find('span', class_='incident-date') if date: incident_details['date'] = date.text.strip() # Extract Additional Details from Table details_table = soup.find('table', class_='table') if details_table: for row in details_table.find_all('tr'): cols = row.find_all('td') if len(cols) == 2: key = cols[0].text.strip().rstrip(':').lower().replace(' ', '_') value = cols[1].text.strip() incident_details[key] = value if not self.validate_incident_data(incident_details): logger.warning(f"Invalid incident data for {incident_url}") return None return incident_details except Exception as e: logger.error(f"Error scraping incident page {incident_url}: {e}") return None def scrape_listing_page(self, page_number: int, use_cache: bool = True) -> List[str]: """ Scrape a single listing page of incidents. Args: page_number (int): Page number to scrape use_cache (bool): Whether to use cached response Returns: List[str]: List of incident URLs """ url = f"{self.browse_url}?page={page_number}" try: content = self.fetch_url(url, use_cache) if not content: return [] soup = BeautifulSoup(content, 'html.parser') incidents_table = soup.find('table', class_='incident-links') if not incidents_table: logger.warning(f"No incidents table found on page {page_number}") return [] incident_urls = [] for row in incidents_table.find_all('tr')[1:]: # Skip header row cols = row.find_all('td') if len(cols) >= 3: incident_link = cols[0].find('a') if incident_link and 'href' in incident_link.attrs: incident_urls.append(incident_link['href']) return incident_urls except Exception as e: logger.error(f"Error scraping listing page {page_number}: {e}") return [] def process_incident(self, incident_url: str, use_cache: bool = True) -> Optional[str]: """Process a single incident URL with thread safety.""" incident_data = self.scrape_incident_page(incident_url, use_cache) if incident_data: with self.data_lock: self.incidents_data.append(incident_data) return incident_url return None def save_data(self) -> Tuple[str, str]: """ Save the scraped data to CSV and JSON files. Returns: Tuple[str, str]: Paths to saved CSV and JSON files """ OUTPUT_DIR.mkdir(exist_ok=True) timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') try: # Save to CSV csv_filename = OUTPUT_DIR / f'noaa_incidents_{timestamp}.csv' df = pd.DataFrame(self.incidents_data) df.to_csv(csv_filename, index=False) # Save to JSON json_filename = OUTPUT_DIR / f'noaa_incidents_{timestamp}.json' with open(json_filename, 'w', encoding='utf-8') as f: json.dump(self.incidents_data, f, indent=4) logger.info(f"Data saved to {csv_filename} and {json_filename}") return str(csv_filename), str(json_filename) except Exception as e: logger.error(f"Error saving data: {e}") return None, None def run(self, validate_first: bool = True) -> Tuple[Optional[str], Optional[str]]: """ Run the complete scraping process. Args: validate_first (bool): Whether to validate first page before full scrape Returns: Tuple[Optional[str], Optional[str]]: Paths to saved CSV and JSON files """ logger.info("Starting NOAA IncidentNews scraper...") if validate_first: logger.info("Performing initial validation run...") test_page = self.scrape_listing_page(1, use_cache=False) if not test_page: logger.error("Unable to scrape the first page. Aborting.") return None, None test_incident = self.scrape_incident_page(test_page[0], use_cache=False) if not test_incident: logger.error("Unable to scrape test incident. Aborting.") return None, None logger.info("Validation successful, proceeding with full scrape...") total_pages = self.get_total_pages() logger.info(f"Found {total_pages} pages to scrape") # Collect incident URLs all_incident_urls = [] logger.info("Collecting incident URLs...") with tqdm(total=total_pages, desc="Collecting URLs") as pbar: for page in range(1, total_pages + 1): urls = self.scrape_listing_page(page) all_incident_urls.extend(urls) pbar.update(1) total_incidents = len(all_incident_urls) logger.info(f"Found {total_incidents} incidents to process") if total_incidents == 0: logger.error("No incidents found to process") return None, None # Process incidents logger.info("Processing incidents...") with ThreadPoolExecutor(max_workers=self.max_workers) as executor: futures = [ executor.submit(self.process_incident, url) for url in all_incident_urls ] with tqdm(total=total_incidents, desc="Scraping incidents") as pbar: for future in as_completed(futures): try: future.result() pbar.update(1) except Exception as e: logger.error(f"Error processing incident: {e}") logger.info(f"Scraped {len(self.incidents_data)} incidents") if self.incidents_data: return self.save_data() else: logger.error("No incident data scraped") return None, None class NOAAIncidentDB: """ Manages NOAA incident data using ChromaDB for vector storage and search. """ def __init__(self, persist_directory: str = "noaa_db", embedding_model: str = "all-MiniLM-L6-v2"): """ Initialize the database with ChromaDB settings. Args: persist_directory (str): Directory for ChromaDB storage embedding_model (str): Model name for embeddings """ self.persist_directory = persist_directory os.makedirs(self.persist_directory, exist_ok=True) # Initialize ChromaDB client self.client = chromadb.Client(Settings( persist_directory=self.persist_directory, anonymized_telemetry=False )) # Setup embedding function self.embedding_function = embedding_functions.SentenceTransformerEmbeddingFunction( model_name=embedding_model ) # Get or create collection self.collection = self.client.get_or_create_collection( name="noaa_incidents", embedding_function=self.embedding_function, metadata={"description": "NOAA incident reports database"} ) def load_incidents(self, csv_path: str) -> int: """ Load incidents from CSV into ChromaDB. Args: csv_path (str): Path to CSV file Returns: int: Number of incidents loaded """ logger.info(f"Loading incidents from {csv_path}") try: df = pd.read_csv(csv_path, dtype=str) df = df.where(pd.notna(df), None) documents = [] metadatas = [] ids = [] for idx, row in df.iterrows(): # Generate unique ID # Continue from the previous code... # Generate unique ID using title, date, location and index unique_string = str(row.get('title', '')) + '_' + str(row.get('date', '')) + '_' + str(row.get('location', '')) + '_' + str(idx) incident_id = "incident_" + hashlib.md5(unique_string.encode()).hexdigest()[:8] # Create searchable document content doc_content = "\n".join([ "Incident: " + str(row.get('title', 'N/A')), "Location: " + str(row.get('location', 'N/A')), "Date: " + str(row.get('date', 'N/A')), "Details: " + str(row.get('initial_notification', '')) ]) # Create metadata metadata = { 'title': str(row.get('title', 'N/A')), 'date': str(row.get('date', 'N/A')), 'location': str(row.get('location', 'N/A')) } # Add any additional fields present for col in df.columns: if col not in ['title', 'date', 'location'] and pd.notna(row[col]): metadata[col.lower().replace(' ', '_')] = str(row[col]) documents.append(doc_content.strip()) metadatas.append(metadata) ids.append(incident_id) # Add to database in batches total_documents = len(documents) for i in range(0, total_documents, BATCH_SIZE): batch_end = min(i + BATCH_SIZE, total_documents) self.collection.add( documents=documents[i:batch_end], metadatas=metadatas[i:batch_end], ids=ids[i:batch_end] ) logger.info(f"Added batch {i // BATCH_SIZE + 1} with {batch_end - i} incidents") logger.info(f"Successfully loaded {total_documents} incidents into ChromaDB") return total_documents except Exception as e: logger.error(f"Error loading incidents from CSV: {e}") return 0 def search(self, query: str, n_results: int = 5) -> List[Dict]: """ Search for incidents matching the query. Args: query (str): Search query n_results (int): Number of results to return Returns: List[Dict]: List of matching incidents """ try: results = self.collection.query( query_texts=[query], n_results=n_results, include=['metadatas', 'documents', 'ids'] ) formatted_results = [] for doc, metadata, incident_id in zip( results['documents'][0], results['metadatas'][0], results['ids'][0] ): result = { 'id': incident_id, 'title': metadata.get('title', 'N/A'), 'date': metadata.get('date', 'N/A'), 'location': metadata.get('location', 'N/A'), 'details': doc, 'metadata': metadata } formatted_results.append(result) return formatted_results except Exception as e: logger.error(f"Error during search: {e}") return [] def delete_collection(self): """Delete the current collection.""" try: self.client.delete_collection("noaa_incidents") logger.info("Collection deleted successfully") except Exception as e: logger.error(f"Error deleting collection: {e}") def get_collection_stats(self) -> Dict: """ Get statistics about the current collection. Returns: Dict: Collection statistics """ try: count = self.collection.count() return { "total_documents": count, "collection_name": "noaa_incidents", "embedding_model": self.embedding_function.model_name } except Exception as e: logger.error(f"Error getting collection stats: {e}") return {} if __name__ == "__main__": # Example usage scraper = NOAAIncidentScraper(max_workers=5) csv_file, json_file = scraper.run(validate_first=True) if csv_file: db = NOAAIncidentDB() num_loaded = db.load_incidents(csv_file) logger.info(f"Loaded {num_loaded} incidents into database") # Example search results = db.search("oil spill near coral reefs", n_results=5) for i, result in enumerate(results, 1): print(f"\nResult {i}:") print(f"Title: {result['title']}") print(f"Date: {result['date']}") print(f"Location: {result['location']}") print(f"Details: {result['details']}\n")