reab5555 commited on
Commit
de7ec56
·
verified ·
1 Parent(s): 4f6d176

Upload clean.py

Browse files
Files changed (1) hide show
  1. clean.py +132 -58
clean.py CHANGED
@@ -1,8 +1,12 @@
1
  import re
2
- import pandas as pd
3
- import numpy as np
4
- from time import perf_counter
 
 
 
5
  import time
 
6
 
7
  # Constants
8
  EMPTY_THRESHOLD = 0.5
@@ -11,183 +15,253 @@ VALID_DATA_THRESHOLD = 0.5
11
 
12
  def print_dataframe_info(df, step=""):
13
  num_columns = len(df.columns)
14
- num_rows = len(df)
15
  num_cells = num_columns * num_rows
16
  print(f"{step}Dataframe info:")
17
  print(f" Number of columns: {num_columns}")
18
  print(f" Number of rows: {num_rows}")
19
  print(f" Total number of cells: {num_cells}")
20
 
 
21
  def check_and_normalize_column_headers(df):
22
  print("Checking and normalizing column headers...")
23
- df.columns = df.columns.str.lower().str.replace(' ', '_')
24
- df.columns = [re.sub(r'[^0-9a-zA-Z_]', '', col) for col in df.columns]
 
 
 
 
 
 
 
 
 
25
  print("Column names have been normalized.")
26
  return df
27
 
 
28
  def remove_empty_columns(df, threshold=EMPTY_THRESHOLD):
29
  print(f"Removing columns with less than {threshold * 100}% valid data...")
30
- return df.dropna(axis=1, thresh=int(threshold * len(df)))
 
 
 
 
 
 
 
31
 
32
  def remove_empty_rows(df, threshold=EMPTY_THRESHOLD):
33
  print(f"Removing rows with less than {threshold * 100}% valid data...")
34
- return df.dropna(thresh=int(threshold * len(df.columns)))
 
 
 
 
 
 
 
 
 
 
 
 
35
 
36
  def drop_rows_with_nas(df, threshold=VALID_DATA_THRESHOLD):
37
  print(f"Dropping rows with NAs for columns with more than {threshold * 100}% valid data...")
38
- valid_columns = df.columns[df.notna().mean() > threshold]
39
- return df.dropna(subset=valid_columns)
 
 
 
 
 
 
 
 
 
 
40
 
41
  def check_typos(df, column_name, threshold=2, top_n=100):
42
- if df[column_name].dtype != 'object':
 
43
  print(f"Skipping typo check for column {column_name} as it is not a string type.")
44
  return None
45
 
46
  print(f"Checking for typos in column: {column_name}")
47
 
48
  try:
49
- value_counts = df[column_name].value_counts()
50
- top_values = value_counts.head(top_n).index.tolist()
 
 
 
51
 
 
 
 
 
 
52
  def find_similar_strings(value):
53
- if pd.isna(value):
54
  return []
55
- return [tv for tv in top_values if value != tv and levenshtein_distance(value, tv) <= threshold]
 
 
 
 
 
 
 
56
 
57
- df['possible_typos'] = df[column_name].apply(find_similar_strings)
58
- typos_df = df[df['possible_typos'].apply(len) > 0][[column_name, 'possible_typos']]
59
 
60
- typo_count = len(typos_df)
 
61
  if typo_count > 0:
62
  print(f"Potential typos found in column {column_name}: {typo_count}")
63
- print(typos_df.head(10))
64
  return typos_df
65
  else:
66
  print(f"No potential typos found in column {column_name}")
67
  return None
68
 
 
 
 
69
  except Exception as e:
70
  print(f"Unexpected error in check_typos for column {column_name}: {str(e)}")
71
  return None
72
 
73
- def levenshtein_distance(s1, s2):
74
- if len(s1) < len(s2):
75
- return levenshtein_distance(s2, s1)
76
- if len(s2) == 0:
77
- return len(s1)
78
- previous_row = range(len(s2) + 1)
79
- for i, c1 in enumerate(s1):
80
- current_row = [i + 1]
81
- for j, c2 in enumerate(s2):
82
- insertions = previous_row[j + 1] + 1
83
- deletions = current_row[j] + 1
84
- substitutions = previous_row[j] + (c1 != c2)
85
- current_row.append(min(insertions, deletions, substitutions))
86
- previous_row = current_row
87
- return previous_row[-1]
88
 
89
  def transform_string_column(df, column_name):
90
  print(f"Transforming string column: {column_name}")
91
- df[column_name] = df[column_name].str.lower()
92
- df[column_name] = df[column_name].str.strip()
93
- df[column_name] = df[column_name].str.replace(r'\s+', ' ', regex=True)
94
- df[column_name] = df[column_name].str.replace(r'[^a-zA-Z0-9\s/:.-]', '', regex=True)
 
 
 
 
95
  return df
96
 
 
97
  def clean_column(df, column_name):
98
  print(f"Cleaning column: {column_name}")
99
  start_time = perf_counter()
 
 
100
 
101
- if df[column_name].dtype == 'object':
102
  typos_df = check_typos(df, column_name)
103
- if typos_df is not None and len(typos_df) > 0:
104
  print(f"Detailed typos for column {column_name}:")
105
- print(typos_df)
106
  df = transform_string_column(df, column_name)
107
- elif pd.api.types.is_numeric_dtype(df[column_name]):
108
- df[column_name] = pd.to_numeric(df[column_name], errors='coerce')
 
 
109
 
110
  end_time = perf_counter()
111
  print(f"Time taken to clean {column_name}: {end_time - start_time:.6f} seconds")
112
  return df
113
 
 
114
  def remove_outliers(df, column):
115
  print(f"Removing outliers from column: {column}")
116
- q1 = df[column].quantile(0.25)
117
- q3 = df[column].quantile(0.75)
 
 
118
  iqr = q3 - q1
119
  lower_bound = q1 - 1.5 * iqr
120
  upper_bound = q3 + 1.5 * iqr
121
- return df[(df[column] >= lower_bound) & (df[column] <= upper_bound)]
 
 
122
 
123
  def calculate_nonconforming_cells(df):
124
- return df.isna().sum().to_dict()
 
 
 
 
125
 
126
  def get_numeric_columns(df):
127
- return df.select_dtypes(include=[np.number]).columns.tolist()
128
 
129
  def remove_duplicates_from_primary_key(df, primary_key_column):
130
  print(f"Removing duplicates based on primary key column: {primary_key_column}")
131
- return df.drop_duplicates(subset=[primary_key_column])
132
 
133
- def clean_data(df, primary_key_column, progress):
134
  start_time = time.time()
135
  process_times = {}
136
 
137
  print("Starting data validation and cleaning...")
138
  print_dataframe_info(df, "Initial - ")
139
 
 
140
  nonconforming_cells_before = calculate_nonconforming_cells(df)
141
 
 
142
  progress(0.1, desc="Normalizing column headers")
143
  step_start_time = time.time()
144
  df = check_and_normalize_column_headers(df)
145
  process_times['Normalize headers'] = time.time() - step_start_time
146
 
 
147
  progress(0.2, desc="Removing empty columns")
148
  step_start_time = time.time()
149
  df = remove_empty_columns(df)
150
- print('2) count of valid rows:', len(df))
151
  process_times['Remove empty columns'] = time.time() - step_start_time
152
 
 
153
  progress(0.3, desc="Removing empty rows")
154
  step_start_time = time.time()
155
  df = remove_empty_rows(df)
156
- print('3) count of valid rows:', len(df))
157
  process_times['Remove empty rows'] = time.time() - step_start_time
158
 
 
159
  progress(0.4, desc="Dropping rows with NAs")
160
  step_start_time = time.time()
161
  df = drop_rows_with_nas(df)
162
- print('4) count of valid rows:', len(df))
163
  process_times['Drop rows with NAs'] = time.time() - step_start_time
164
 
 
165
  column_cleaning_times = {}
166
  total_columns = len(df.columns)
167
  for index, column in enumerate(df.columns):
168
  progress(0.5 + (0.2 * (index / total_columns)), desc=f"Cleaning column: {column}")
169
  column_start_time = time.time()
170
  df = clean_column(df, column)
171
- print('5) count of valid rows:', len(df))
172
  column_cleaning_times[f"Clean column: {column}"] = time.time() - column_start_time
173
  process_times.update(column_cleaning_times)
174
 
 
175
  progress(0.7, desc="Removing outliers")
176
  step_start_time = time.time()
177
  numeric_columns = get_numeric_columns(df)
178
  numeric_columns = [col for col in numeric_columns if col != primary_key_column]
179
  for column in numeric_columns:
180
  df = remove_outliers(df, column)
181
- print('6) count of valid rows:', len(df))
182
  process_times['Remove outliers'] = time.time() - step_start_time
183
 
 
184
  progress(0.8, desc="Removing duplicates from primary key")
185
  step_start_time = time.time()
186
  df = remove_duplicates_from_primary_key(df, primary_key_column)
187
- print('7) count of valid rows:', len(df))
188
- process_times['Remove duplicates from primary key'] = time.time() - step_start_time
189
 
190
  print("Cleaning process completed.")
191
  print_dataframe_info(df, "Final - ")
192
 
193
- return df, nonconforming_cells_before, process_times
 
1
  import re
2
+
3
+ from pyspark.sql import SparkSession
4
+ from pyspark.sql.functions import col, isnan, when, count, lower, regexp_replace, to_date, to_timestamp, udf, \
5
+ levenshtein, array, lit, trim, size, coalesce
6
+ from pyspark.sql.types import DoubleType, IntegerType, StringType, DateType, TimestampType, ArrayType
7
+ from pyspark.sql.utils import AnalysisException
8
  import time
9
+ from time import perf_counter
10
 
11
  # Constants
12
  EMPTY_THRESHOLD = 0.5
 
15
 
16
  def print_dataframe_info(df, step=""):
17
  num_columns = len(df.columns)
18
+ num_rows = df.count()
19
  num_cells = num_columns * num_rows
20
  print(f"{step}Dataframe info:")
21
  print(f" Number of columns: {num_columns}")
22
  print(f" Number of rows: {num_rows}")
23
  print(f" Total number of cells: {num_cells}")
24
 
25
+
26
  def check_and_normalize_column_headers(df):
27
  print("Checking and normalizing column headers...")
28
+
29
+ for old_name in df.columns:
30
+ # Create the new name using string manipulation
31
+ new_name = old_name.lower().replace(' ', '_')
32
+
33
+ # Remove any non-alphanumeric characters (excluding underscores)
34
+ new_name = re.sub(r'[^0-9a-zA-Z_]', '', new_name)
35
+
36
+ # Rename the column
37
+ df = df.withColumnRenamed(old_name, new_name)
38
+
39
  print("Column names have been normalized.")
40
  return df
41
 
42
+
43
  def remove_empty_columns(df, threshold=EMPTY_THRESHOLD):
44
  print(f"Removing columns with less than {threshold * 100}% valid data...")
45
+
46
+ # Calculate the percentage of non-null values for each column
47
+ df_stats = df.select(
48
+ [((count(when(col(c).isNotNull(), c)) / count('*')) >= threshold).alias(c) for c in df.columns])
49
+ valid_columns = [c for c in df_stats.columns if df_stats.select(c).first()[0]]
50
+
51
+ return df.select(valid_columns)
52
+
53
 
54
  def remove_empty_rows(df, threshold=EMPTY_THRESHOLD):
55
  print(f"Removing rows with less than {threshold * 100}% valid data...")
56
+
57
+ # Count the number of non-null values for each row
58
+ expr = sum([when(col(c).isNotNull(), lit(1)).otherwise(lit(0)) for c in df.columns])
59
+ df_valid_count = df.withColumn('valid_count', expr)
60
+
61
+ # Filter rows based on the threshold
62
+ total_columns = len(df.columns)
63
+ df_filtered = df_valid_count.filter(col('valid_count') >= threshold * total_columns)
64
+
65
+ print('count of valid rows:', df_filtered.count())
66
+
67
+ return df_filtered.drop('valid_count')
68
+
69
 
70
  def drop_rows_with_nas(df, threshold=VALID_DATA_THRESHOLD):
71
  print(f"Dropping rows with NAs for columns with more than {threshold * 100}% valid data...")
72
+
73
+ # Calculate the percentage of non-null values for each column
74
+ df_stats = df.select([((count(when(col(c).isNotNull(), c)) / count('*'))).alias(c) for c in df.columns])
75
+
76
+ # Get columns with more than threshold valid data
77
+ valid_columns = [c for c in df_stats.columns if df_stats.select(c).first()[0] > threshold]
78
+
79
+ # Drop rows with NAs only for the valid columns
80
+ for column in valid_columns:
81
+ df = df.filter(col(column).isNotNull())
82
+
83
+ return df
84
 
85
  def check_typos(df, column_name, threshold=2, top_n=100):
86
+ # Check if the column is of StringType
87
+ if not isinstance(df.schema[column_name].dataType, StringType):
88
  print(f"Skipping typo check for column {column_name} as it is not a string type.")
89
  return None
90
 
91
  print(f"Checking for typos in column: {column_name}")
92
 
93
  try:
94
+ # Get value counts for the specific column
95
+ value_counts = df.groupBy(column_name).count().orderBy("count", ascending=False)
96
+
97
+ # Take top N most frequent values
98
+ top_values = [row[column_name] for row in value_counts.limit(top_n).collect()]
99
 
100
+ # Broadcast the top values to all nodes
101
+ broadcast_top_values = df.sparkSession.sparkContext.broadcast(top_values)
102
+
103
+ # Define UDF to find similar strings
104
+ @udf(returnType=ArrayType(StringType()))
105
  def find_similar_strings(value):
106
+ if value is None:
107
  return []
108
+ similar = []
109
+ for top_value in broadcast_top_values.value:
110
+ if value != top_value and levenshtein(value, top_value) <= threshold:
111
+ similar.append(top_value)
112
+ return similar
113
+
114
+ # Apply the UDF to the column
115
+ df_with_typos = df.withColumn("possible_typos", find_similar_strings(col(column_name)))
116
 
117
+ # Filter rows with possible typos and select only the relevant columns
118
+ typos_df = df_with_typos.filter(size("possible_typos") > 0).select(column_name, "possible_typos")
119
 
120
+ # Check if there are any potential typos
121
+ typo_count = typos_df.count()
122
  if typo_count > 0:
123
  print(f"Potential typos found in column {column_name}: {typo_count}")
124
+ typos_df.show(10, truncate=False)
125
  return typos_df
126
  else:
127
  print(f"No potential typos found in column {column_name}")
128
  return None
129
 
130
+ except AnalysisException as e:
131
+ print(f"Error analyzing column {column_name}: {str(e)}")
132
+ return None
133
  except Exception as e:
134
  print(f"Unexpected error in check_typos for column {column_name}: {str(e)}")
135
  return None
136
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
137
 
138
  def transform_string_column(df, column_name):
139
  print(f"Transforming string column: {column_name}")
140
+ # Lower case transformation (if applicable)
141
+ df = df.withColumn(column_name, lower(col(column_name)))
142
+ # Remove leading and trailing spaces
143
+ df = df.withColumn(column_name, trim(col(column_name)))
144
+ # Replace multiple spaces with a single space
145
+ df = df.withColumn(column_name, regexp_replace(col(column_name), "\\s+", " "))
146
+ # Remove special characters except those used in dates and times
147
+ df = df.withColumn(column_name, regexp_replace(col(column_name), "[^a-zA-Z0-9\\s/:.-]", ""))
148
  return df
149
 
150
+
151
  def clean_column(df, column_name):
152
  print(f"Cleaning column: {column_name}")
153
  start_time = perf_counter()
154
+ # Get the data type of the current column
155
+ column_type = df.schema[column_name].dataType
156
 
157
+ if isinstance(column_type, StringType):
158
  typos_df = check_typos(df, column_name)
159
+ if typos_df is not None and typos_df.count() > 0:
160
  print(f"Detailed typos for column {column_name}:")
161
+ typos_df.show(truncate=False)
162
  df = transform_string_column(df, column_name)
163
+
164
+ elif isinstance(column_type, (DoubleType, IntegerType)):
165
+ # For numeric columns, we'll do a simple null check
166
+ df = df.withColumn(column_name, when(col(column_name).isNull(), lit(None)).otherwise(col(column_name)))
167
 
168
  end_time = perf_counter()
169
  print(f"Time taken to clean {column_name}: {end_time - start_time:.6f} seconds")
170
  return df
171
 
172
+ # Update the remove_outliers function to work on a single column
173
  def remove_outliers(df, column):
174
  print(f"Removing outliers from column: {column}")
175
+
176
+ stats = df.select(column).summary("25%", "75%").collect()
177
+ q1 = float(stats[0][1])
178
+ q3 = float(stats[1][1])
179
  iqr = q3 - q1
180
  lower_bound = q1 - 1.5 * iqr
181
  upper_bound = q3 + 1.5 * iqr
182
+ df = df.filter((col(column) >= lower_bound) & (col(column) <= upper_bound))
183
+
184
+ return df
185
 
186
  def calculate_nonconforming_cells(df):
187
+ nonconforming_cells = {}
188
+ for column in df.columns:
189
+ nonconforming_count = df.filter(col(column).isNull() | isnan(column)).count()
190
+ nonconforming_cells[column] = nonconforming_count
191
+ return nonconforming_cells
192
 
193
  def get_numeric_columns(df):
194
+ return [field.name for field in df.schema.fields if isinstance(field.dataType, (IntegerType, DoubleType))]
195
 
196
  def remove_duplicates_from_primary_key(df, primary_key_column):
197
  print(f"Removing duplicates based on primary key column: {primary_key_column}")
198
+ return df.dropDuplicates([primary_key_column])
199
 
200
+ def clean_data(spark, df, primary_key_column, progress):
201
  start_time = time.time()
202
  process_times = {}
203
 
204
  print("Starting data validation and cleaning...")
205
  print_dataframe_info(df, "Initial - ")
206
 
207
+ # Calculate nonconforming cells before cleaning
208
  nonconforming_cells_before = calculate_nonconforming_cells(df)
209
 
210
+ # Step 1: Normalize column headers
211
  progress(0.1, desc="Normalizing column headers")
212
  step_start_time = time.time()
213
  df = check_and_normalize_column_headers(df)
214
  process_times['Normalize headers'] = time.time() - step_start_time
215
 
216
+ # Step 2: Remove empty columns
217
  progress(0.2, desc="Removing empty columns")
218
  step_start_time = time.time()
219
  df = remove_empty_columns(df)
220
+ print('2) count of valid rows:', df.count())
221
  process_times['Remove empty columns'] = time.time() - step_start_time
222
 
223
+ # Step 3: Remove empty rows
224
  progress(0.3, desc="Removing empty rows")
225
  step_start_time = time.time()
226
  df = remove_empty_rows(df)
227
+ print('3) count of valid rows:', df.count())
228
  process_times['Remove empty rows'] = time.time() - step_start_time
229
 
230
+ # Step 4: Drop rows with NAs for columns with more than 50% valid data
231
  progress(0.4, desc="Dropping rows with NAs")
232
  step_start_time = time.time()
233
  df = drop_rows_with_nas(df)
234
+ print('4) count of valid rows:', df.count())
235
  process_times['Drop rows with NAs'] = time.time() - step_start_time
236
 
237
+ # Step 5: Clean columns (including typo checking and string transformation)
238
  column_cleaning_times = {}
239
  total_columns = len(df.columns)
240
  for index, column in enumerate(df.columns):
241
  progress(0.5 + (0.2 * (index / total_columns)), desc=f"Cleaning column: {column}")
242
  column_start_time = time.time()
243
  df = clean_column(df, column)
244
+ print('5) count of valid rows:', df.count())
245
  column_cleaning_times[f"Clean column: {column}"] = time.time() - column_start_time
246
  process_times.update(column_cleaning_times)
247
 
248
+ # Step 6: Remove outliers from numeric columns (excluding primary key)
249
  progress(0.7, desc="Removing outliers")
250
  step_start_time = time.time()
251
  numeric_columns = get_numeric_columns(df)
252
  numeric_columns = [col for col in numeric_columns if col != primary_key_column]
253
  for column in numeric_columns:
254
  df = remove_outliers(df, column)
255
+ print('6) count of valid rows:', df.count())
256
  process_times['Remove outliers'] = time.time() - step_start_time
257
 
258
+ # Step 7: Remove duplicates from primary key column
259
  progress(0.8, desc="Removing duplicates from primary key")
260
  step_start_time = time.time()
261
  df = remove_duplicates_from_primary_key(df, primary_key_column)
262
+ print('7) count of valid rows:', df.count())
 
263
 
264
  print("Cleaning process completed.")
265
  print_dataframe_info(df, "Final - ")
266
 
267
+ return df, nonconforming_cells_before, process_times