File size: 10,214 Bytes
24ff08a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
import re
import numpy as np
from typing import List, Dict, Tuple, Optional
from collections import Counter, defaultdict
import pandas as pd
from sklearn.utils import resample
import logging
from functools import lru_cache

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ClimateTextPreprocessor:
    def __init__(self):
        # Scientific terms with expanded categories
        self.scientific_terms = {
            'measurement_terms': {
                'temperature', 'degree', 'celsius', 'fahrenheit', 'kelvin',
                'ppm', 'concentration', 'level', 'rate', 'trend', 'average',
                'increase', 'decrease', 'change', 'variation'
            },
            'climate_terms': {
                'climate', 'weather', 'warming', 'cooling', 'atmosphere',
                'greenhouse', 'carbon', 'dioxide', 'co2', 'methane', 'emission',
                'pollution', 'temperature', 'environment', 'environmental'
            },
            'scientific_bodies': {
                'ipcc', 'nasa', 'noaa', 'wmo', 'epa', 'met office',
                'national academy', 'research', 'university', 'laboratory',
                'institute', 'scientist', 'researcher', 'expert', 'study'
            },
            'earth_systems': {
                'ocean', 'sea level', 'glacier', 'ice sheet', 'permafrost',
                'arctic', 'antarctic', 'atmosphere', 'sea ice', 'temperature',
                'ecosystem', 'biodiversity', 'forest', 'precipitation', 'drought'
            }
        }
        
        # Compile regex patterns
        self.patterns = self._compile_patterns()
        
        # Initialize statistics
        self.reset_stats()
    
    def _compile_patterns(self) -> Dict[str, re.Pattern]:
        """Compile regex patterns for efficient processing"""
        return {
            'numbers': re.compile(r'\d+(?:\.\d+)?'),
            'temperature': re.compile(r'\d+(?:\.\d+)?\s*(?:°[CF]|degrees?(?:\s+[CF])?|celsius|fahrenheit)'),
            'year': re.compile(r'\b(?:19|20)\d{2}\b'),
            'urls': re.compile(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'),
            'special_chars': re.compile(r'[^a-zA-Z0-9\s]')
        }
    
    def reset_stats(self):
        """Reset statistics tracking"""
        self.stats = {
            'total_processed': 0,
            'avg_length': 0,
            'scientific_terms_frequency': defaultdict(Counter),
            'errors': Counter()
        }
    
    @lru_cache(maxsize=1000)
    def clean_text(self, text: str) -> str:
        """Basic text cleaning with caching"""
        try:
            # Convert to lowercase
            text = text.lower()
            
            # Standardize scientific notation
            text = re.sub(r'co2\b', 'carbon dioxide', text)
            text = re.sub(r'(\d+)f\b', r'\1 fahrenheit', text)
            text = re.sub(r'(\d+)c\b', r'\1 celsius', text)
            
            # Remove URLs
            text = self.patterns['urls'].sub('', text)
            
            # Standardize whitespace
            text = ' '.join(text.split())
            
            return text.strip()
        except Exception as e:
            logger.error(f"Error cleaning text: {str(e)}")
            self.stats['errors']['cleaning'] += 1
            return text
    
    def extract_numerical_features(self, text: str) -> Dict:
        """Extract features related to numerical claims"""
        try:
            features = {}
            
            # Find all numbers
            numbers = self.patterns['numbers'].findall(text)
            features['has_numbers'] = bool(numbers)
            features['number_count'] = len(numbers)
            
            # Temperature mentions
            features['has_temperature'] = bool(self.patterns['temperature'].search(text))
            
            # Year mentions
            features['has_year'] = bool(self.patterns['year'].search(text))
            
            return features
        except Exception as e:
            logger.error(f"Error extracting numerical features: {str(e)}")
            self.stats['errors']['numerical_extraction'] += 1
            return {'has_numbers': False, 'number_count': 0, 'has_temperature': False, 'has_year': False}
    
    def extract_scientific_features(self, text: str) -> Dict:
        """Extract features related to scientific terms"""
        try:
            features = {}
            text_lower = text.lower()
            
            # Count terms in each category
            for category, terms in self.scientific_terms.items():
                found_terms = []
                for term in terms:
                    if term in text_lower:
                        found_terms.append(term)
                        self.stats['scientific_terms_frequency'][category][term] += 1
                
                features[f'{category}_count'] = len(found_terms)
                features[f'{category}_terms'] = found_terms
            
            return features
        except Exception as e:
            logger.error(f"Error extracting scientific features: {str(e)}")
            self.stats['errors']['scientific_extraction'] += 1
            return {
                **{f'{cat}_count': 0 for cat in self.scientific_terms},
                **{f'{cat}_terms': [] for cat in self.scientific_terms}
            }
    
    def extract_features(self, text: str) -> Dict:
        """Extract all features from text"""
        try:
            features = {
                'original_length': len(text),
                'cleaned_text': self.clean_text(text)
            }
            
            # Extract basic numerical and scientific features
            features.update(self.extract_numerical_features(text))
            features.update(self.extract_scientific_features(features['cleaned_text']))
            
            # Calculate densities
            words = features['cleaned_text'].split()
            word_count = len(words) if words else 1
            
            total_scientific_terms = sum(
                features.get(f'{cat}_count', 0) 
                for cat in self.scientific_terms.keys()
            )
            
            features['scientific_density'] = total_scientific_terms / word_count
            features['numerical_density'] = features['number_count'] / word_count
            
            # Update statistics
            self.stats['total_processed'] += 1
            self.stats['avg_length'] = (
                (self.stats['avg_length'] * (self.stats['total_processed'] - 1) + len(text))
                / self.stats['total_processed']
            )
            
            return features
            
        except Exception as e:
            logger.error(f"Error in feature extraction: {str(e)}")
            self.stats['errors']['feature_extraction'] += 1
            return self._get_default_features()
    
    def _get_default_features(self) -> Dict:
        """Return default features for error cases"""
        return {
            'original_length': 0,
            'cleaned_text': '',
            'has_numbers': False,
            'number_count': 0,
            'has_temperature': False,
            'has_year': False,
            'scientific_density': 0.0,
            'numerical_density': 0.0
        }
    
    def process_batch(self, texts: List[str], labels: Optional[List[str]] = None, 

                     balance: bool = False) -> Tuple[List[Dict], Optional[List[str]]]:
        """Process a batch of texts and optionally their labels"""
        try:
            # Extract features for all texts
            features_list = []
            for text in texts:
                try:
                    features = self.extract_features(text)
                    features_list.append(features)
                except Exception as e:
                    logger.error(f"Error processing text: {str(e)}")
                    features_list.append(self._get_default_features())
            
            # Balance dataset if requested and labels are provided
            if balance and labels and len(set(labels)) > 1:
                return self.balance_dataset(features_list, labels)
            
            return features_list, labels
            
        except Exception as e:
            logger.error(f"Error in batch processing: {str(e)}")
            self.stats['errors']['batch_processing'] += 1
            return [self._get_default_features() for _ in texts], labels
    
    def balance_dataset(self, features: List[Dict], labels: List[str]) -> Tuple[List[Dict], List[str]]:
        """Balance dataset using oversampling of minority classes"""
        try:
            df = pd.DataFrame({
                'features': features,
                'label': labels
            })
            
            # Get size of majority class
            max_size = df['label'].value_counts().max()
            
            # Oversample minority classes
            balanced_dfs = []
            for label in df['label'].unique():
                label_df = df[df['label'] == label]
                if len(label_df) < max_size:
                    resampled_df = resample(
                        label_df,
                        replace=True,
                        n_samples=max_size,
                        random_state=42
                    )
                    balanced_dfs.append(resampled_df)
                else:
                    balanced_dfs.append(label_df)
            
            balanced_df = pd.concat(balanced_dfs)
            return balanced_df['features'].tolist(), balanced_df['label'].tolist()
            
        except Exception as e:
            logger.error(f"Error balancing dataset: {str(e)}")
            self.stats['errors']['balancing'] += 1
            return features, labels
    
    def get_stats(self) -> Dict:
        """Return current preprocessing statistics"""
        return self.stats