reab5555 commited on
Commit
9fa4f5e
1 Parent(s): ffaac50

Upload clean.py

Browse files
Files changed (1) hide show
  1. clean.py +192 -262
clean.py CHANGED
@@ -1,262 +1,192 @@
1
- import re
2
-
3
- import time
4
- from time import perf_counter
5
-
6
- # Constants
7
- EMPTY_THRESHOLD = 0.5
8
- LOW_COUNT_THRESHOLD = 2
9
- VALID_DATA_THRESHOLD = 0.5
10
-
11
- def print_dataframe_info(df, step=""):
12
- num_columns = len(df.columns)
13
- num_rows = df.count()
14
- num_cells = num_columns * num_rows
15
- print(f"{step}Dataframe info:")
16
- print(f" Number of columns: {num_columns}")
17
- print(f" Number of rows: {num_rows}")
18
- print(f" Total number of cells: {num_cells}")
19
-
20
-
21
- def check_and_normalize_column_headers(df):
22
- print("Checking and normalizing column headers...")
23
-
24
- for old_name in df.columns:
25
- # Create the new name using string manipulation
26
- new_name = old_name.lower().replace(' ', '_')
27
-
28
- # Remove any non-alphanumeric characters (excluding underscores)
29
- new_name = re.sub(r'[^0-9a-zA-Z_]', '', new_name)
30
-
31
- # Rename the column
32
- df = df.withColumnRenamed(old_name, new_name)
33
-
34
- print("Column names have been normalized.")
35
- return df
36
-
37
-
38
- def remove_empty_columns(df, threshold=EMPTY_THRESHOLD):
39
- print(f"Removing columns with less than {threshold * 100}% valid data...")
40
-
41
- # Calculate the percentage of non-null values for each column
42
- df_stats = df.select(
43
- [((count(when(col(c).isNotNull(), c)) / count('*')) >= threshold).alias(c) for c in df.columns])
44
- valid_columns = [c for c in df_stats.columns if df_stats.select(c).first()[0]]
45
-
46
- return df.select(valid_columns)
47
-
48
-
49
- def remove_empty_rows(df, threshold=EMPTY_THRESHOLD):
50
- print(f"Removing rows with less than {threshold * 100}% valid data...")
51
-
52
- # Count the number of non-null values for each row
53
- expr = sum([when(col(c).isNotNull(), lit(1)).otherwise(lit(0)) for c in df.columns])
54
- df_valid_count = df.withColumn('valid_count', expr)
55
-
56
- # Filter rows based on the threshold
57
- total_columns = len(df.columns)
58
- df_filtered = df_valid_count.filter(col('valid_count') >= threshold * total_columns)
59
-
60
- print('count of valid rows:', df_filtered.count())
61
-
62
- return df_filtered.drop('valid_count')
63
-
64
-
65
- def drop_rows_with_nas(df, threshold=VALID_DATA_THRESHOLD):
66
- print(f"Dropping rows with NAs for columns with more than {threshold * 100}% valid data...")
67
-
68
- # Calculate the percentage of non-null values for each column
69
- df_stats = df.select([((count(when(col(c).isNotNull(), c)) / count('*'))).alias(c) for c in df.columns])
70
-
71
- # Get columns with more than threshold valid data
72
- valid_columns = [c for c in df_stats.columns if df_stats.select(c).first()[0] > threshold]
73
-
74
- # Drop rows with NAs only for the valid columns
75
- for column in valid_columns:
76
- df = df.filter(col(column).isNotNull())
77
-
78
- return df
79
-
80
- def check_typos(df, column_name, threshold=2, top_n=100):
81
- # Check if the column is of StringType
82
- if not isinstance(df.schema[column_name].dataType, StringType):
83
- print(f"Skipping typo check for column {column_name} as it is not a string type.")
84
- return None
85
-
86
- print(f"Checking for typos in column: {column_name}")
87
-
88
- try:
89
- # Get value counts for the specific column
90
- value_counts = df.groupBy(column_name).count().orderBy("count", ascending=False)
91
-
92
- # Take top N most frequent values
93
- top_values = [row[column_name] for row in value_counts.limit(top_n).collect()]
94
-
95
- # Broadcast the top values to all nodes
96
- broadcast_top_values = df.sparkSession.sparkContext.broadcast(top_values)
97
-
98
- # Define UDF to find similar strings
99
- @udf(returnType=ArrayType(StringType()))
100
- def find_similar_strings(value):
101
- if value is None:
102
- return []
103
- similar = []
104
- for top_value in broadcast_top_values.value:
105
- if value != top_value and levenshtein(value, top_value) <= threshold:
106
- similar.append(top_value)
107
- return similar
108
-
109
- # Apply the UDF to the column
110
- df_with_typos = df.withColumn("possible_typos", find_similar_strings(col(column_name)))
111
-
112
- # Filter rows with possible typos and select only the relevant columns
113
- typos_df = df_with_typos.filter(size("possible_typos") > 0).select(column_name, "possible_typos")
114
-
115
- # Check if there are any potential typos
116
- typo_count = typos_df.count()
117
- if typo_count > 0:
118
- print(f"Potential typos found in column {column_name}: {typo_count}")
119
- typos_df.show(10, truncate=False)
120
- return typos_df
121
- else:
122
- print(f"No potential typos found in column {column_name}")
123
- return None
124
-
125
- except AnalysisException as e:
126
- print(f"Error analyzing column {column_name}: {str(e)}")
127
- return None
128
- except Exception as e:
129
- print(f"Unexpected error in check_typos for column {column_name}: {str(e)}")
130
- return None
131
-
132
-
133
- def transform_string_column(df, column_name):
134
- print(f"Transforming string column: {column_name}")
135
- # Lower case transformation (if applicable)
136
- df = df.withColumn(column_name, lower(col(column_name)))
137
- # Remove leading and trailing spaces
138
- df = df.withColumn(column_name, trim(col(column_name)))
139
- # Replace multiple spaces with a single space
140
- df = df.withColumn(column_name, regexp_replace(col(column_name), "\\s+", " "))
141
- # Remove special characters except those used in dates and times
142
- df = df.withColumn(column_name, regexp_replace(col(column_name), "[^a-zA-Z0-9\\s/:.-]", ""))
143
- return df
144
-
145
-
146
- def clean_column(df, column_name):
147
- print(f"Cleaning column: {column_name}")
148
- start_time = perf_counter()
149
- # Get the data type of the current column
150
- column_type = df.schema[column_name].dataType
151
-
152
- if isinstance(column_type, StringType):
153
- typos_df = check_typos(df, column_name)
154
- if typos_df is not None and typos_df.count() > 0:
155
- print(f"Detailed typos for column {column_name}:")
156
- typos_df.show(truncate=False)
157
- df = transform_string_column(df, column_name)
158
-
159
- elif isinstance(column_type, (DoubleType, IntegerType)):
160
- # For numeric columns, we'll do a simple null check
161
- df = df.withColumn(column_name, when(col(column_name).isNull(), lit(None)).otherwise(col(column_name)))
162
-
163
- end_time = perf_counter()
164
- print(f"Time taken to clean {column_name}: {end_time - start_time:.6f} seconds")
165
- return df
166
-
167
- # Update the remove_outliers function to work on a single column
168
- def remove_outliers(df, column):
169
- print(f"Removing outliers from column: {column}")
170
-
171
- stats = df.select(column).summary("25%", "75%").collect()
172
- q1 = float(stats[0][1])
173
- q3 = float(stats[1][1])
174
- iqr = q3 - q1
175
- lower_bound = q1 - 1.5 * iqr
176
- upper_bound = q3 + 1.5 * iqr
177
- df = df.filter((col(column) >= lower_bound) & (col(column) <= upper_bound))
178
-
179
- return df
180
-
181
- def calculate_nonconforming_cells(df):
182
- nonconforming_cells = {}
183
- for column in df.columns:
184
- nonconforming_count = df.filter(col(column).isNull() | isnan(column)).count()
185
- nonconforming_cells[column] = nonconforming_count
186
- return nonconforming_cells
187
-
188
- def get_numeric_columns(df):
189
- return [field.name for field in df.schema.fields if isinstance(field.dataType, (IntegerType, DoubleType))]
190
-
191
- def remove_duplicates_from_primary_key(df, primary_key_column):
192
- print(f"Removing duplicates based on primary key column: {primary_key_column}")
193
- return df.dropDuplicates([primary_key_column])
194
-
195
- def clean_data(spark, df, primary_key_column, progress):
196
- start_time = time.time()
197
- process_times = {}
198
-
199
- print("Starting data validation and cleaning...")
200
- print_dataframe_info(df, "Initial - ")
201
-
202
- # Calculate nonconforming cells before cleaning
203
- nonconforming_cells_before = calculate_nonconforming_cells(df)
204
-
205
- # Step 1: Normalize column headers
206
- progress(0.1, desc="Normalizing column headers")
207
- step_start_time = time.time()
208
- df = check_and_normalize_column_headers(df)
209
- process_times['Normalize headers'] = time.time() - step_start_time
210
-
211
- # Step 2: Remove empty columns
212
- progress(0.2, desc="Removing empty columns")
213
- step_start_time = time.time()
214
- df = remove_empty_columns(df)
215
- print('2) count of valid rows:', df.count())
216
- process_times['Remove empty columns'] = time.time() - step_start_time
217
-
218
- # Step 3: Remove empty rows
219
- progress(0.3, desc="Removing empty rows")
220
- step_start_time = time.time()
221
- df = remove_empty_rows(df)
222
- print('3) count of valid rows:', df.count())
223
- process_times['Remove empty rows'] = time.time() - step_start_time
224
-
225
- # Step 4: Drop rows with NAs for columns with more than 50% valid data
226
- progress(0.4, desc="Dropping rows with NAs")
227
- step_start_time = time.time()
228
- df = drop_rows_with_nas(df)
229
- print('4) count of valid rows:', df.count())
230
- process_times['Drop rows with NAs'] = time.time() - step_start_time
231
-
232
- # Step 5: Clean columns (including typo checking and string transformation)
233
- column_cleaning_times = {}
234
- total_columns = len(df.columns)
235
- for index, column in enumerate(df.columns):
236
- progress(0.5 + (0.2 * (index / total_columns)), desc=f"Cleaning column: {column}")
237
- column_start_time = time.time()
238
- df = clean_column(df, column)
239
- print('5) count of valid rows:', df.count())
240
- column_cleaning_times[f"Clean column: {column}"] = time.time() - column_start_time
241
- process_times.update(column_cleaning_times)
242
-
243
- # Step 6: Remove outliers from numeric columns (excluding primary key)
244
- progress(0.7, desc="Removing outliers")
245
- step_start_time = time.time()
246
- numeric_columns = get_numeric_columns(df)
247
- numeric_columns = [col for col in numeric_columns if col != primary_key_column]
248
- for column in numeric_columns:
249
- df = remove_outliers(df, column)
250
- print('6) count of valid rows:', df.count())
251
- process_times['Remove outliers'] = time.time() - step_start_time
252
-
253
- # Step 7: Remove duplicates from primary key column
254
- progress(0.8, desc="Removing duplicates from primary key")
255
- step_start_time = time.time()
256
- df = remove_duplicates_from_primary_key(df, primary_key_column)
257
- print('7) count of valid rows:', df.count())
258
-
259
- print("Cleaning process completed.")
260
- print_dataframe_info(df, "Final - ")
261
-
262
- return df, nonconforming_cells_before, process_times
 
1
+ import re
2
+ import pandas as pd
3
+ import numpy as np
4
+ from time import perf_counter
5
+ import time
6
+
7
+ # Constants
8
+ EMPTY_THRESHOLD = 0.5
9
+ LOW_COUNT_THRESHOLD = 2
10
+ VALID_DATA_THRESHOLD = 0.5
11
+
12
+ def print_dataframe_info(df, step=""):
13
+ num_columns = len(df.columns)
14
+ num_rows = len(df)
15
+ num_cells = num_columns * num_rows
16
+ print(f"{step}Dataframe info:")
17
+ print(f" Number of columns: {num_columns}")
18
+ print(f" Number of rows: {num_rows}")
19
+ print(f" Total number of cells: {num_cells}")
20
+
21
+ def check_and_normalize_column_headers(df):
22
+ print("Checking and normalizing column headers...")
23
+ df.columns = df.columns.str.lower().str.replace(' ', '_')
24
+ df.columns = [re.sub(r'[^0-9a-zA-Z_]', '', col) for col in df.columns]
25
+ print("Column names have been normalized.")
26
+ return df
27
+
28
+ def remove_empty_columns(df, threshold=EMPTY_THRESHOLD):
29
+ print(f"Removing columns with less than {threshold * 100}% valid data...")
30
+ return df.dropna(axis=1, thresh=int(threshold * len(df)))
31
+
32
+ def remove_empty_rows(df, threshold=EMPTY_THRESHOLD):
33
+ print(f"Removing rows with less than {threshold * 100}% valid data...")
34
+ return df.dropna(thresh=int(threshold * len(df.columns)))
35
+
36
+ def drop_rows_with_nas(df, threshold=VALID_DATA_THRESHOLD):
37
+ print(f"Dropping rows with NAs for columns with more than {threshold * 100}% valid data...")
38
+ valid_columns = df.columns[df.notna().mean() > threshold]
39
+ return df.dropna(subset=valid_columns)
40
+
41
+ def check_typos(df, column_name, threshold=2, top_n=100):
42
+ if df[column_name].dtype != 'object':
43
+ print(f"Skipping typo check for column {column_name} as it is not a string type.")
44
+ return None
45
+
46
+ print(f"Checking for typos in column: {column_name}")
47
+
48
+ try:
49
+ value_counts = df[column_name].value_counts()
50
+ top_values = value_counts.head(top_n).index.tolist()
51
+
52
+ def find_similar_strings(value):
53
+ if pd.isna(value):
54
+ return []
55
+ return [tv for tv in top_values if value != tv and levenshtein_distance(value, tv) <= threshold]
56
+
57
+ df['possible_typos'] = df[column_name].apply(find_similar_strings)
58
+ typos_df = df[df['possible_typos'].apply(len) > 0][[column_name, 'possible_typos']]
59
+
60
+ typo_count = len(typos_df)
61
+ if typo_count > 0:
62
+ print(f"Potential typos found in column {column_name}: {typo_count}")
63
+ print(typos_df.head(10))
64
+ return typos_df
65
+ else:
66
+ print(f"No potential typos found in column {column_name}")
67
+ return None
68
+
69
+ except Exception as e:
70
+ print(f"Unexpected error in check_typos for column {column_name}: {str(e)}")
71
+ return None
72
+
73
+ def levenshtein_distance(s1, s2):
74
+ if len(s1) < len(s2):
75
+ return levenshtein_distance(s2, s1)
76
+ if len(s2) == 0:
77
+ return len(s1)
78
+ previous_row = range(len(s2) + 1)
79
+ for i, c1 in enumerate(s1):
80
+ current_row = [i + 1]
81
+ for j, c2 in enumerate(s2):
82
+ insertions = previous_row[j + 1] + 1
83
+ deletions = current_row[j] + 1
84
+ substitutions = previous_row[j] + (c1 != c2)
85
+ current_row.append(min(insertions, deletions, substitutions))
86
+ previous_row = current_row
87
+ return previous_row[-1]
88
+
89
+ def transform_string_column(df, column_name):
90
+ print(f"Transforming string column: {column_name}")
91
+ df[column_name] = df[column_name].str.lower()
92
+ df[column_name] = df[column_name].str.strip()
93
+ df[column_name] = df[column_name].str.replace(r'\s+', ' ', regex=True)
94
+ df[column_name] = df[column_name].str.replace(r'[^a-zA-Z0-9\s/:.-]', '', regex=True)
95
+ return df
96
+
97
+ def clean_column(df, column_name):
98
+ print(f"Cleaning column: {column_name}")
99
+ start_time = perf_counter()
100
+
101
+ if df[column_name].dtype == 'object':
102
+ typos_df = check_typos(df, column_name)
103
+ if typos_df is not None and len(typos_df) > 0:
104
+ print(f"Detailed typos for column {column_name}:")
105
+ print(typos_df)
106
+ df = transform_string_column(df, column_name)
107
+ elif pd.api.types.is_numeric_dtype(df[column_name]):
108
+ df[column_name] = pd.to_numeric(df[column_name], errors='coerce')
109
+
110
+ end_time = perf_counter()
111
+ print(f"Time taken to clean {column_name}: {end_time - start_time:.6f} seconds")
112
+ return df
113
+
114
+ def remove_outliers(df, column):
115
+ print(f"Removing outliers from column: {column}")
116
+ q1 = df[column].quantile(0.25)
117
+ q3 = df[column].quantile(0.75)
118
+ iqr = q3 - q1
119
+ lower_bound = q1 - 1.5 * iqr
120
+ upper_bound = q3 + 1.5 * iqr
121
+ return df[(df[column] >= lower_bound) & (df[column] <= upper_bound)]
122
+
123
+ def calculate_nonconforming_cells(df):
124
+ return df.isna().sum().to_dict()
125
+
126
+ def get_numeric_columns(df):
127
+ return df.select_dtypes(include=[np.number]).columns.tolist()
128
+
129
+ def remove_duplicates_from_primary_key(df, primary_key_column):
130
+ print(f"Removing duplicates based on primary key column: {primary_key_column}")
131
+ return df.drop_duplicates(subset=[primary_key_column])
132
+
133
+ def clean_data(df, primary_key_column, progress):
134
+ start_time = time.time()
135
+ process_times = {}
136
+
137
+ print("Starting data validation and cleaning...")
138
+ print_dataframe_info(df, "Initial - ")
139
+
140
+ nonconforming_cells_before = calculate_nonconforming_cells(df)
141
+
142
+ progress(0.1, desc="Normalizing column headers")
143
+ step_start_time = time.time()
144
+ df = check_and_normalize_column_headers(df)
145
+ process_times['Normalize headers'] = time.time() - step_start_time
146
+
147
+ progress(0.2, desc="Removing empty columns")
148
+ step_start_time = time.time()
149
+ df = remove_empty_columns(df)
150
+ print('2) count of valid rows:', len(df))
151
+ process_times['Remove empty columns'] = time.time() - step_start_time
152
+
153
+ progress(0.3, desc="Removing empty rows")
154
+ step_start_time = time.time()
155
+ df = remove_empty_rows(df)
156
+ print('3) count of valid rows:', len(df))
157
+ process_times['Remove empty rows'] = time.time() - step_start_time
158
+
159
+ progress(0.4, desc="Dropping rows with NAs")
160
+ step_start_time = time.time()
161
+ df = drop_rows_with_nas(df)
162
+ print('4) count of valid rows:', len(df))
163
+ process_times['Drop rows with NAs'] = time.time() - step_start_time
164
+
165
+ column_cleaning_times = {}
166
+ total_columns = len(df.columns)
167
+ for index, column in enumerate(df.columns):
168
+ progress(0.5 + (0.2 * (index / total_columns)), desc=f"Cleaning column: {column}")
169
+ column_start_time = time.time()
170
+ df = clean_column(df, column)
171
+ print('5) count of valid rows:', len(df))
172
+ column_cleaning_times[f"Clean column: {column}"] = time.time() - column_start_time
173
+ process_times.update(column_cleaning_times)
174
+
175
+ progress(0.7, desc="Removing outliers")
176
+ step_start_time = time.time()
177
+ numeric_columns = get_numeric_columns(df)
178
+ numeric_columns = [col for col in numeric_columns if col != primary_key_column]
179
+ for column in numeric_columns:
180
+ df = remove_outliers(df, column)
181
+ print('6) count of valid rows:', len(df))
182
+ process_times['Remove outliers'] = time.time() - step_start_time
183
+
184
+ progress(0.8, desc="Removing duplicates from primary key")
185
+ step_start_time = time.time()
186
+ df = remove_duplicates_from_primary_key(df, primary_key_column)
187
+ print('7) count of valid rows:', len(df))
188
+
189
+ print("Cleaning process completed.")
190
+ print_dataframe_info(df, "Final - ")
191
+
192
+ return df, nonconforming_cells_before, process_times