|
import re
|
|
import numpy as np
|
|
from typing import List, Dict, Tuple, Optional
|
|
from collections import Counter, defaultdict
|
|
import pandas as pd
|
|
from sklearn.utils import resample
|
|
import logging
|
|
from functools import lru_cache
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class ClimateTextPreprocessor:
|
|
def __init__(self):
|
|
|
|
self.scientific_terms = {
|
|
'measurement_terms': {
|
|
'temperature', 'degree', 'celsius', 'fahrenheit', 'kelvin',
|
|
'ppm', 'concentration', 'level', 'rate', 'trend', 'average',
|
|
'increase', 'decrease', 'change', 'variation'
|
|
},
|
|
'climate_terms': {
|
|
'climate', 'weather', 'warming', 'cooling', 'atmosphere',
|
|
'greenhouse', 'carbon', 'dioxide', 'co2', 'methane', 'emission',
|
|
'pollution', 'temperature', 'environment', 'environmental'
|
|
},
|
|
'scientific_bodies': {
|
|
'ipcc', 'nasa', 'noaa', 'wmo', 'epa', 'met office',
|
|
'national academy', 'research', 'university', 'laboratory',
|
|
'institute', 'scientist', 'researcher', 'expert', 'study'
|
|
},
|
|
'earth_systems': {
|
|
'ocean', 'sea level', 'glacier', 'ice sheet', 'permafrost',
|
|
'arctic', 'antarctic', 'atmosphere', 'sea ice', 'temperature',
|
|
'ecosystem', 'biodiversity', 'forest', 'precipitation', 'drought'
|
|
}
|
|
}
|
|
|
|
|
|
self.patterns = self._compile_patterns()
|
|
|
|
|
|
self.reset_stats()
|
|
|
|
def _compile_patterns(self) -> Dict[str, re.Pattern]:
|
|
"""Compile regex patterns for efficient processing"""
|
|
return {
|
|
'numbers': re.compile(r'\d+(?:\.\d+)?'),
|
|
'temperature': re.compile(r'\d+(?:\.\d+)?\s*(?:°[CF]|degrees?(?:\s+[CF])?|celsius|fahrenheit)'),
|
|
'year': re.compile(r'\b(?:19|20)\d{2}\b'),
|
|
'urls': re.compile(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'),
|
|
'special_chars': re.compile(r'[^a-zA-Z0-9\s]')
|
|
}
|
|
|
|
def reset_stats(self):
|
|
"""Reset statistics tracking"""
|
|
self.stats = {
|
|
'total_processed': 0,
|
|
'avg_length': 0,
|
|
'scientific_terms_frequency': defaultdict(Counter),
|
|
'errors': Counter()
|
|
}
|
|
|
|
@lru_cache(maxsize=1000)
|
|
def clean_text(self, text: str) -> str:
|
|
"""Basic text cleaning with caching"""
|
|
try:
|
|
|
|
text = text.lower()
|
|
|
|
|
|
text = re.sub(r'co2\b', 'carbon dioxide', text)
|
|
text = re.sub(r'(\d+)f\b', r'\1 fahrenheit', text)
|
|
text = re.sub(r'(\d+)c\b', r'\1 celsius', text)
|
|
|
|
|
|
text = self.patterns['urls'].sub('', text)
|
|
|
|
|
|
text = ' '.join(text.split())
|
|
|
|
return text.strip()
|
|
except Exception as e:
|
|
logger.error(f"Error cleaning text: {str(e)}")
|
|
self.stats['errors']['cleaning'] += 1
|
|
return text
|
|
|
|
def extract_numerical_features(self, text: str) -> Dict:
|
|
"""Extract features related to numerical claims"""
|
|
try:
|
|
features = {}
|
|
|
|
|
|
numbers = self.patterns['numbers'].findall(text)
|
|
features['has_numbers'] = bool(numbers)
|
|
features['number_count'] = len(numbers)
|
|
|
|
|
|
features['has_temperature'] = bool(self.patterns['temperature'].search(text))
|
|
|
|
|
|
features['has_year'] = bool(self.patterns['year'].search(text))
|
|
|
|
return features
|
|
except Exception as e:
|
|
logger.error(f"Error extracting numerical features: {str(e)}")
|
|
self.stats['errors']['numerical_extraction'] += 1
|
|
return {'has_numbers': False, 'number_count': 0, 'has_temperature': False, 'has_year': False}
|
|
|
|
def extract_scientific_features(self, text: str) -> Dict:
|
|
"""Extract features related to scientific terms"""
|
|
try:
|
|
features = {}
|
|
text_lower = text.lower()
|
|
|
|
|
|
for category, terms in self.scientific_terms.items():
|
|
found_terms = []
|
|
for term in terms:
|
|
if term in text_lower:
|
|
found_terms.append(term)
|
|
self.stats['scientific_terms_frequency'][category][term] += 1
|
|
|
|
features[f'{category}_count'] = len(found_terms)
|
|
features[f'{category}_terms'] = found_terms
|
|
|
|
return features
|
|
except Exception as e:
|
|
logger.error(f"Error extracting scientific features: {str(e)}")
|
|
self.stats['errors']['scientific_extraction'] += 1
|
|
return {
|
|
**{f'{cat}_count': 0 for cat in self.scientific_terms},
|
|
**{f'{cat}_terms': [] for cat in self.scientific_terms}
|
|
}
|
|
|
|
def extract_features(self, text: str) -> Dict:
|
|
"""Extract all features from text"""
|
|
try:
|
|
features = {
|
|
'original_length': len(text),
|
|
'cleaned_text': self.clean_text(text)
|
|
}
|
|
|
|
|
|
features.update(self.extract_numerical_features(text))
|
|
features.update(self.extract_scientific_features(features['cleaned_text']))
|
|
|
|
|
|
words = features['cleaned_text'].split()
|
|
word_count = len(words) if words else 1
|
|
|
|
total_scientific_terms = sum(
|
|
features.get(f'{cat}_count', 0)
|
|
for cat in self.scientific_terms.keys()
|
|
)
|
|
|
|
features['scientific_density'] = total_scientific_terms / word_count
|
|
features['numerical_density'] = features['number_count'] / word_count
|
|
|
|
|
|
self.stats['total_processed'] += 1
|
|
self.stats['avg_length'] = (
|
|
(self.stats['avg_length'] * (self.stats['total_processed'] - 1) + len(text))
|
|
/ self.stats['total_processed']
|
|
)
|
|
|
|
return features
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in feature extraction: {str(e)}")
|
|
self.stats['errors']['feature_extraction'] += 1
|
|
return self._get_default_features()
|
|
|
|
def _get_default_features(self) -> Dict:
|
|
"""Return default features for error cases"""
|
|
return {
|
|
'original_length': 0,
|
|
'cleaned_text': '',
|
|
'has_numbers': False,
|
|
'number_count': 0,
|
|
'has_temperature': False,
|
|
'has_year': False,
|
|
'scientific_density': 0.0,
|
|
'numerical_density': 0.0
|
|
}
|
|
|
|
def process_batch(self, texts: List[str], labels: Optional[List[str]] = None,
|
|
balance: bool = False) -> Tuple[List[Dict], Optional[List[str]]]:
|
|
"""Process a batch of texts and optionally their labels"""
|
|
try:
|
|
|
|
features_list = []
|
|
for text in texts:
|
|
try:
|
|
features = self.extract_features(text)
|
|
features_list.append(features)
|
|
except Exception as e:
|
|
logger.error(f"Error processing text: {str(e)}")
|
|
features_list.append(self._get_default_features())
|
|
|
|
|
|
if balance and labels and len(set(labels)) > 1:
|
|
return self.balance_dataset(features_list, labels)
|
|
|
|
return features_list, labels
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in batch processing: {str(e)}")
|
|
self.stats['errors']['batch_processing'] += 1
|
|
return [self._get_default_features() for _ in texts], labels
|
|
|
|
def balance_dataset(self, features: List[Dict], labels: List[str]) -> Tuple[List[Dict], List[str]]:
|
|
"""Balance dataset using oversampling of minority classes"""
|
|
try:
|
|
df = pd.DataFrame({
|
|
'features': features,
|
|
'label': labels
|
|
})
|
|
|
|
|
|
max_size = df['label'].value_counts().max()
|
|
|
|
|
|
balanced_dfs = []
|
|
for label in df['label'].unique():
|
|
label_df = df[df['label'] == label]
|
|
if len(label_df) < max_size:
|
|
resampled_df = resample(
|
|
label_df,
|
|
replace=True,
|
|
n_samples=max_size,
|
|
random_state=42
|
|
)
|
|
balanced_dfs.append(resampled_df)
|
|
else:
|
|
balanced_dfs.append(label_df)
|
|
|
|
balanced_df = pd.concat(balanced_dfs)
|
|
return balanced_df['features'].tolist(), balanced_df['label'].tolist()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error balancing dataset: {str(e)}")
|
|
self.stats['errors']['balancing'] += 1
|
|
return features, labels
|
|
|
|
def get_stats(self) -> Dict:
|
|
"""Return current preprocessing statistics"""
|
|
return self.stats |