db / noaa_incidents.py
latterworks's picture
Update noaa_incidents.py
3098121 verified
# noaa_incidents.py
import os
import hashlib
import json
import pickle
import threading
import time
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Optional, Tuple
import logging
import pandas as pd
import requests
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import chromadb
from chromadb.utils import embedding_functions
from chromadb.config import Settings
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Constants
BASE_URL = "https://incidentnews.noaa.gov"
BROWSE_URL = f"{BASE_URL}/browse/date"
OUTPUT_DIR = Path("output")
CACHE_DIR = Path("cache")
MAX_RETRIES = 3
BATCH_SIZE = 500
class NOAAIncidentScraper:
"""
Scrapes NOAA Incident News data with caching and multi-threading support.
Optimized for Hugging Face Spaces environment.
"""
def __init__(self, max_workers: int = 5, cache_dir: str = 'cache'):
"""
Initialize the scraper with custom configuration.
Args:
max_workers (int): Maximum number of concurrent threads
cache_dir (str): Directory for caching web responses
"""
self.base_url = BASE_URL
self.browse_url = BROWSE_URL
self.headers = {
'User-Agent': ('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
'(KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36')
}
self.incidents_data = []
self.data_lock = threading.Lock()
self.max_workers = max_workers
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
self.session = requests.Session()
self.session.headers.update(self.headers)
def get_cache_path(self, url: str) -> Path:
"""Generate a cache file path for a given URL."""
url_hash = hashlib.md5(url.encode()).hexdigest()
return self.cache_dir / f"{url_hash}.cache"
def get_cached_response(self, url: str) -> Optional[str]:
"""Retrieve cached response for a URL."""
cache_path = self.get_cache_path(url)
if cache_path.exists():
try:
with cache_path.open('rb') as f:
return pickle.load(f)
except Exception as e:
logger.warning(f"Error loading cache for {url}: {e}")
return None
return None
def cache_response(self, url: str, content: str):
"""Cache response content for a URL."""
cache_path = self.get_cache_path(url)
try:
with cache_path.open('wb') as f:
pickle.dump(content, f)
except Exception as e:
logger.warning(f"Error caching response for {url}: {e}")
def fetch_url(self, url: str, use_cache: bool = True) -> Optional[str]:
"""
Fetch URL content with caching and retry mechanism.
Args:
url (str): URL to fetch
use_cache (bool): Whether to use cached response
Returns:
Optional[str]: Response content or None if failed
"""
if use_cache:
cached = self.get_cached_response(url)
if cached:
return cached
for attempt in range(MAX_RETRIES):
try:
response = self.session.get(url, timeout=10)
response.raise_for_status()
content = response.text
if use_cache:
self.cache_response(url, content)
return content
except requests.RequestException as e:
wait_time = min(2 ** attempt, 10) # Exponential backoff with max 10s
logger.warning(f"Attempt {attempt + 1}/{MAX_RETRIES} failed for {url}: {e}")
if attempt < MAX_RETRIES - 1:
logger.info(f"Retrying in {wait_time} seconds...")
time.sleep(wait_time)
else:
logger.error(f"Failed to fetch {url} after {MAX_RETRIES} attempts")
return None
def validate_incident_data(self, incident_data: Dict) -> bool:
"""
Validate scraped incident data.
Args:
incident_data (Dict): Incident data to validate
Returns:
bool: True if data is valid
"""
required_fields = ['title', 'date', 'location']
return all(incident_data.get(field) for field in required_fields)
def get_total_pages(self) -> int:
"""Get the total number of pages to scrape."""
content = self.fetch_url(self.browse_url)
if not content:
return 0
try:
soup = BeautifulSoup(content, 'html.parser')
pagination = soup.find('ul', class_='pagination')
if pagination:
last_page = int(pagination.find_all('li')[-2].text)
return last_page
except Exception as e:
logger.error(f"Error determining total pages: {e}")
return 1
def scrape_incident_page(self, incident_url: str, use_cache: bool = True) -> Optional[Dict]:
"""
Scrape detailed information from an individual incident page.
Args:
incident_url (str): URL of the incident page
use_cache (bool): Whether to use cached response
Returns:
Optional[Dict]: Incident data or None if failed
"""
try:
full_url = f"{self.base_url}{incident_url}"
content = self.fetch_url(full_url, use_cache)
if not content:
return None
soup = BeautifulSoup(content, 'html.parser')
incident_details = {}
# Extract Title
title = soup.find('h1')
incident_details['title'] = title.text.strip() if title else None
# Extract Initial Notification
initial_notification = soup.find('p', class_='sub')
incident_details['initial_notification'] = (
initial_notification.text.strip() if initial_notification else None
)
# Extract Location and Date
location_date = soup.find('p', class_='')
if location_date:
location = location_date.find('span', class_='glyphicon-map-marker')
if location and location.next_sibling:
incident_details['location'] = location.next_sibling.strip()
date = location_date.find('span', class_='incident-date')
if date:
incident_details['date'] = date.text.strip()
# Extract Additional Details from Table
details_table = soup.find('table', class_='table')
if details_table:
for row in details_table.find_all('tr'):
cols = row.find_all('td')
if len(cols) == 2:
key = cols[0].text.strip().rstrip(':').lower().replace(' ', '_')
value = cols[1].text.strip()
incident_details[key] = value
if not self.validate_incident_data(incident_details):
logger.warning(f"Invalid incident data for {incident_url}")
return None
return incident_details
except Exception as e:
logger.error(f"Error scraping incident page {incident_url}: {e}")
return None
def scrape_listing_page(self, page_number: int, use_cache: bool = True) -> List[str]:
"""
Scrape a single listing page of incidents.
Args:
page_number (int): Page number to scrape
use_cache (bool): Whether to use cached response
Returns:
List[str]: List of incident URLs
"""
url = f"{self.browse_url}?page={page_number}"
try:
content = self.fetch_url(url, use_cache)
if not content:
return []
soup = BeautifulSoup(content, 'html.parser')
incidents_table = soup.find('table', class_='incident-links')
if not incidents_table:
logger.warning(f"No incidents table found on page {page_number}")
return []
incident_urls = []
for row in incidents_table.find_all('tr')[1:]: # Skip header row
cols = row.find_all('td')
if len(cols) >= 3:
incident_link = cols[0].find('a')
if incident_link and 'href' in incident_link.attrs:
incident_urls.append(incident_link['href'])
return incident_urls
except Exception as e:
logger.error(f"Error scraping listing page {page_number}: {e}")
return []
def process_incident(self, incident_url: str, use_cache: bool = True) -> Optional[str]:
"""Process a single incident URL with thread safety."""
incident_data = self.scrape_incident_page(incident_url, use_cache)
if incident_data:
with self.data_lock:
self.incidents_data.append(incident_data)
return incident_url
return None
def save_data(self) -> Tuple[str, str]:
"""
Save the scraped data to CSV and JSON files.
Returns:
Tuple[str, str]: Paths to saved CSV and JSON files
"""
OUTPUT_DIR.mkdir(exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
try:
# Save to CSV
csv_filename = OUTPUT_DIR / f'noaa_incidents_{timestamp}.csv'
df = pd.DataFrame(self.incidents_data)
df.to_csv(csv_filename, index=False)
# Save to JSON
json_filename = OUTPUT_DIR / f'noaa_incidents_{timestamp}.json'
with open(json_filename, 'w', encoding='utf-8') as f:
json.dump(self.incidents_data, f, indent=4)
logger.info(f"Data saved to {csv_filename} and {json_filename}")
return str(csv_filename), str(json_filename)
except Exception as e:
logger.error(f"Error saving data: {e}")
return None, None
def run(self, validate_first: bool = True) -> Tuple[Optional[str], Optional[str]]:
"""
Run the complete scraping process.
Args:
validate_first (bool): Whether to validate first page before full scrape
Returns:
Tuple[Optional[str], Optional[str]]: Paths to saved CSV and JSON files
"""
logger.info("Starting NOAA IncidentNews scraper...")
if validate_first:
logger.info("Performing initial validation run...")
test_page = self.scrape_listing_page(1, use_cache=False)
if not test_page:
logger.error("Unable to scrape the first page. Aborting.")
return None, None
test_incident = self.scrape_incident_page(test_page[0], use_cache=False)
if not test_incident:
logger.error("Unable to scrape test incident. Aborting.")
return None, None
logger.info("Validation successful, proceeding with full scrape...")
total_pages = self.get_total_pages()
logger.info(f"Found {total_pages} pages to scrape")
# Collect incident URLs
all_incident_urls = []
logger.info("Collecting incident URLs...")
with tqdm(total=total_pages, desc="Collecting URLs") as pbar:
for page in range(1, total_pages + 1):
urls = self.scrape_listing_page(page)
all_incident_urls.extend(urls)
pbar.update(1)
total_incidents = len(all_incident_urls)
logger.info(f"Found {total_incidents} incidents to process")
if total_incidents == 0:
logger.error("No incidents found to process")
return None, None
# Process incidents
logger.info("Processing incidents...")
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = [
executor.submit(self.process_incident, url)
for url in all_incident_urls
]
with tqdm(total=total_incidents, desc="Scraping incidents") as pbar:
for future in as_completed(futures):
try:
future.result()
pbar.update(1)
except Exception as e:
logger.error(f"Error processing incident: {e}")
logger.info(f"Scraped {len(self.incidents_data)} incidents")
if self.incidents_data:
return self.save_data()
else:
logger.error("No incident data scraped")
return None, None
class NOAAIncidentDB:
"""
Manages NOAA incident data using ChromaDB for vector storage and search.
"""
def __init__(self,
persist_directory: str = "noaa_db",
embedding_model: str = "all-MiniLM-L6-v2"):
"""
Initialize the database with ChromaDB settings.
Args:
persist_directory (str): Directory for ChromaDB storage
embedding_model (str): Model name for embeddings
"""
self.persist_directory = persist_directory
os.makedirs(self.persist_directory, exist_ok=True)
# Initialize ChromaDB client
self.client = chromadb.Client(Settings(
persist_directory=self.persist_directory,
anonymized_telemetry=False
))
# Setup embedding function
self.embedding_function = embedding_functions.SentenceTransformerEmbeddingFunction(
model_name=embedding_model
)
# Get or create collection
self.collection = self.client.get_or_create_collection(
name="noaa_incidents",
embedding_function=self.embedding_function,
metadata={"description": "NOAA incident reports database"}
)
def load_incidents(self, csv_path: str) -> int:
"""
Load incidents from CSV into ChromaDB.
Args:
csv_path (str): Path to CSV file
Returns:
int: Number of incidents loaded
"""
logger.info(f"Loading incidents from {csv_path}")
try:
df = pd.read_csv(csv_path, dtype=str)
df = df.where(pd.notna(df), None)
documents = []
metadatas = []
ids = []
for idx, row in df.iterrows():
# Generate unique ID
# Continue from the previous code...
# Generate unique ID using title, date, location and index
unique_string = str(row.get('title', '')) + '_' + str(row.get('date', '')) + '_' + str(row.get('location', '')) + '_' + str(idx)
incident_id = "incident_" + hashlib.md5(unique_string.encode()).hexdigest()[:8]
# Create searchable document content
doc_content = "\n".join([
"Incident: " + str(row.get('title', 'N/A')),
"Location: " + str(row.get('location', 'N/A')),
"Date: " + str(row.get('date', 'N/A')),
"Details: " + str(row.get('initial_notification', ''))
])
# Create metadata
metadata = {
'title': str(row.get('title', 'N/A')),
'date': str(row.get('date', 'N/A')),
'location': str(row.get('location', 'N/A'))
}
# Add any additional fields present
for col in df.columns:
if col not in ['title', 'date', 'location'] and pd.notna(row[col]):
metadata[col.lower().replace(' ', '_')] = str(row[col])
documents.append(doc_content.strip())
metadatas.append(metadata)
ids.append(incident_id)
# Add to database in batches
total_documents = len(documents)
for i in range(0, total_documents, BATCH_SIZE):
batch_end = min(i + BATCH_SIZE, total_documents)
self.collection.add(
documents=documents[i:batch_end],
metadatas=metadatas[i:batch_end],
ids=ids[i:batch_end]
)
logger.info(f"Added batch {i // BATCH_SIZE + 1} with {batch_end - i} incidents")
logger.info(f"Successfully loaded {total_documents} incidents into ChromaDB")
return total_documents
except Exception as e:
logger.error(f"Error loading incidents from CSV: {e}")
return 0
def search(self, query: str, n_results: int = 5) -> List[Dict]:
"""
Search for incidents matching the query.
Args:
query (str): Search query
n_results (int): Number of results to return
Returns:
List[Dict]: List of matching incidents
"""
try:
results = self.collection.query(
query_texts=[query],
n_results=n_results,
include=['metadatas', 'documents', 'ids']
)
formatted_results = []
for doc, metadata, incident_id in zip(
results['documents'][0],
results['metadatas'][0],
results['ids'][0]
):
result = {
'id': incident_id,
'title': metadata.get('title', 'N/A'),
'date': metadata.get('date', 'N/A'),
'location': metadata.get('location', 'N/A'),
'details': doc,
'metadata': metadata
}
formatted_results.append(result)
return formatted_results
except Exception as e:
logger.error(f"Error during search: {e}")
return []
def delete_collection(self):
"""Delete the current collection."""
try:
self.client.delete_collection("noaa_incidents")
logger.info("Collection deleted successfully")
except Exception as e:
logger.error(f"Error deleting collection: {e}")
def get_collection_stats(self) -> Dict:
"""
Get statistics about the current collection.
Returns:
Dict: Collection statistics
"""
try:
count = self.collection.count()
return {
"total_documents": count,
"collection_name": "noaa_incidents",
"embedding_model": self.embedding_function.model_name
}
except Exception as e:
logger.error(f"Error getting collection stats: {e}")
return {}
if __name__ == "__main__":
# Example usage
scraper = NOAAIncidentScraper(max_workers=5)
csv_file, json_file = scraper.run(validate_first=True)
if csv_file:
db = NOAAIncidentDB()
num_loaded = db.load_incidents(csv_file)
logger.info(f"Loaded {num_loaded} incidents into database")
# Example search
results = db.search("oil spill near coral reefs", n_results=5)
for i, result in enumerate(results, 1):
print(f"\nResult {i}:")
print(f"Title: {result['title']}")
print(f"Date: {result['date']}")
print(f"Location: {result['location']}")
print(f"Details: {result['details']}\n")