Hamza1702 commited on
Commit
de6262a
1 Parent(s): 8cb066e

Create utils.py

Browse files
Files changed (1) hide show
  1. utils.py +433 -0
utils.py ADDED
@@ -0,0 +1,433 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ general utility functions for loading, saving, and manipulating data
3
+ """
4
+
5
+ import os
6
+ import logging
7
+ import pprint as pp
8
+ import re
9
+ import shutil # zipfile formats
10
+ import warnings
11
+ from datetime import datetime
12
+ from os.path import basename, getsize, join
13
+ from pathlib import Path
14
+ import logging
15
+
16
+ import pandas as pd
17
+ import requests
18
+ from natsort import natsorted
19
+ from symspellpy import SymSpell
20
+ from tqdm.auto import tqdm
21
+
22
+ import warnings
23
+
24
+ warnings.filterwarnings(
25
+ action="ignore", message=".*the GPL-licensed package `unidecode` is not installed*"
26
+ ) # cleantext GPL-licensed package reminder is annoying
27
+
28
+
29
+ class DisableLogger:
30
+ def __enter__(self):
31
+ logging.disable(logging.CRITICAL)
32
+
33
+ def __exit__(self, exit_type, exit_value, exit_traceback):
34
+ logging.disable(logging.NOTSET)
35
+
36
+
37
+ with DisableLogger():
38
+ from cleantext import clean
39
+
40
+
41
+ def clear_loggers():
42
+ for handler in logging.root.handlers[:]:
43
+ logging.root.removeHandler(handler)
44
+
45
+
46
+ def get_timestamp():
47
+ return datetime.now().strftime("%b-%d-%Y_t-%H")
48
+
49
+
50
+ def print_spacer(n=1):
51
+ """print_spacer - print a spacer line"""
52
+ print("\n -------- " * n)
53
+
54
+
55
+ def remove_trailing_punctuation(text: str):
56
+ """
57
+ remove_trailing_punctuation - remove trailing punctuation from a string
58
+
59
+ Args:
60
+ text (str): [string to be cleaned]
61
+
62
+ Returns:
63
+ [str]: [cleaned string]
64
+ """
65
+ return text.strip("?!.,;:")
66
+
67
+
68
+ def correct_phrase_load(my_string: str):
69
+ """
70
+ correct_phrase_load [basic / unoptimized implementation of SymSpell to correct a string]
71
+
72
+ Args:
73
+ my_string (str): [text to be corrected]
74
+
75
+ Returns:
76
+ str: the corrected string
77
+ """
78
+ sym_spell = SymSpell(max_dictionary_edit_distance=2, prefix_length=7)
79
+
80
+ dictionary_path = (
81
+ r"symspell_rsc/frequency_dictionary_en_82_765.txt" # from repo root
82
+ )
83
+ bigram_path = (
84
+ r"symspell_rsc/frequency_bigramdictionary_en_243_342.txt" # from repo root
85
+ )
86
+ # term_index is the column of the term and count_index is the
87
+ # column of the term frequency
88
+ sym_spell.load_dictionary(dictionary_path, term_index=0, count_index=1)
89
+ sym_spell.load_bigram_dictionary(bigram_path, term_index=0, count_index=2)
90
+
91
+ # max edit distance per lookup (per single word, not per whole input string)
92
+ suggestions = sym_spell.lookup_compound(
93
+ clean(my_string), max_edit_distance=2, ignore_non_words=True
94
+ )
95
+ if len(suggestions) < 1:
96
+ return my_string
97
+ else:
98
+ first_result = suggestions[0]
99
+ return first_result._term
100
+
101
+
102
+ def fast_scandir(dirname: str):
103
+ """
104
+ fast_scandir [an os.path-based means to return all subfolders in a given filepath]
105
+
106
+ """
107
+
108
+ subfolders = [f.path for f in os.scandir(dirname) if f.is_dir()]
109
+ for dirname in list(subfolders):
110
+ subfolders.extend(fast_scandir(dirname))
111
+ return subfolders # list
112
+
113
+
114
+ def create_folder(directory: str):
115
+
116
+ os.makedirs(directory, exist_ok=True)
117
+
118
+
119
+ def chunks(lst: list, n: int):
120
+ """
121
+ chunks - Yield successive n-sized chunks from lst
122
+ Args: lst (list): list to be chunked
123
+ n (int): size of chunks
124
+
125
+ """
126
+
127
+ for i in range(0, len(lst), n):
128
+ yield lst[i : i + n]
129
+
130
+
131
+ def shorten_list(
132
+ list_of_strings: list, max_chars: int = 512, no_blanks=True, verbose=False
133
+ ):
134
+ """a helper function that iterates through a list backwards, adding to a new list.
135
+
136
+ When <max_chars> is met, that list entry is not added.
137
+ Args:
138
+ list_of_strings (list): list of strings to be shortened
139
+ max_chars (int, optional): maximum number of characters in a the list in total. Defaults to 512.
140
+ no_blanks (bool, optional): if True, blank strings are not added to the new list. Defaults to True.
141
+ verbose (bool, optional): if True, print the list of strings before and after the shorten. Defaults to False.
142
+ """
143
+ list_of_strings = [
144
+ str(x) for x in list_of_strings
145
+ ] # convert to strings if not already
146
+ shortened_list = []
147
+ total_len = 0
148
+ for i, string in enumerate(list_of_strings[::-1], start=1):
149
+
150
+ if len(string.strip()) == 0 and no_blanks:
151
+ continue
152
+ if len(string) + total_len >= max_chars:
153
+ logging.info(f"string # {i} puts total over limit, breaking ")
154
+ break
155
+ total_len += len(string)
156
+ shortened_list.insert(0, string)
157
+ if len(shortened_list) == 0:
158
+ logging.info(f"shortened list with max_chars={max_chars} has no entries")
159
+ if verbose:
160
+ print(f"total length of list is {total_len} chars")
161
+ return shortened_list
162
+
163
+
164
+ def chunky_pandas(my_df, num_chunks: int = 4):
165
+ """
166
+ chunky_pandas [split dataframe into `num_chunks` equal chunks, return each inside a list]
167
+
168
+ Args:
169
+ my_df (pd.DataFrame)
170
+ num_chunks (int, optional): Defaults to 4.
171
+
172
+ Returns:
173
+ list: a list of dataframes
174
+ """
175
+ n = int(len(my_df) // num_chunks)
176
+ list_df = [my_df[i : i + n] for i in range(0, my_df.shape[0], n)]
177
+
178
+ return list_df
179
+
180
+
181
+ def load_dir_files(
182
+ directory: str, req_extension=".txt", return_type="list", verbose=False
183
+ ):
184
+ """
185
+ load_dir_files - an os.path based method of returning all files with extension `req_extension` in a given directory and subdirectories
186
+
187
+ Args:
188
+
189
+
190
+ Returns:
191
+ list or dict: an iterable of filepaths or a dict of filepaths and their respective filenames
192
+ """
193
+ appr_files = []
194
+ # r=root, d=directories, f = files
195
+ for r, d, f in os.walk(directory):
196
+ for prefile in f:
197
+ if prefile.endswith(req_extension):
198
+ fullpath = os.path.join(r, prefile)
199
+ appr_files.append(fullpath)
200
+
201
+ appr_files = natsorted(appr_files)
202
+
203
+ if verbose:
204
+ print("A list of files in the {} directory are: \n".format(directory))
205
+ if len(appr_files) < 10:
206
+ pp.pprint(appr_files)
207
+ else:
208
+ pp.pprint(appr_files[:10])
209
+ print("\n and more. There are a total of {} files".format(len(appr_files)))
210
+
211
+ if return_type.lower() == "list":
212
+ return appr_files
213
+ else:
214
+ if verbose:
215
+ print("returning dictionary")
216
+
217
+ appr_file_dict = {}
218
+ for this_file in appr_files:
219
+ appr_file_dict[basename(this_file)] = this_file
220
+
221
+ return appr_file_dict
222
+
223
+
224
+ def URL_string_filter(text):
225
+ """
226
+ URL_string_filter - filter out nonstandard "text" characters
227
+
228
+ """
229
+ custom_printable = (
230
+ "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ._"
231
+ )
232
+
233
+ filtered = "".join((filter(lambda i: i in custom_printable, text)))
234
+
235
+ return filtered
236
+
237
+
238
+ def getFilename_fromCd(cd):
239
+ """getFilename_fromCd - get the filename from a given cd str"""
240
+ if not cd:
241
+ return None
242
+ fname = re.findall("filename=(.+)", cd)
243
+ if len(fname) > 0:
244
+ output = fname[0]
245
+ elif cd.find("/"):
246
+ possible_fname = cd.rsplit("/", 1)[1]
247
+ output = URL_string_filter(possible_fname)
248
+ else:
249
+ output = None
250
+ return output
251
+
252
+
253
+ def get_zip_URL(
254
+ URLtoget: str,
255
+ extract_loc: str = None,
256
+ file_header: str = "dropboxexport_",
257
+ verbose: bool = False,
258
+ ):
259
+ """get_zip_URL - download a zip file from a given URL and extract it to a given location"""
260
+
261
+ r = requests.get(URLtoget, allow_redirects=True)
262
+ names = getFilename_fromCd(r.headers.get("content-disposition"))
263
+ fixed_fnames = names.split(";") # split the multiple results
264
+ this_filename = file_header + URL_string_filter(fixed_fnames[0])
265
+
266
+ # define paths and save the zip file
267
+ if extract_loc is None:
268
+ extract_loc = "dropbox_dl"
269
+ dl_place = join(os.getcwd(), extract_loc)
270
+ create_folder(dl_place)
271
+ save_loc = join(os.getcwd(), this_filename)
272
+ open(save_loc, "wb").write(r.content)
273
+ if verbose:
274
+ print("downloaded file size was {} MB".format(getsize(save_loc) / 1000000))
275
+
276
+ # unpack the archive
277
+ shutil.unpack_archive(save_loc, extract_dir=dl_place)
278
+ if verbose:
279
+ print("extracted zip file - ", datetime.now())
280
+ x = load_dir_files(dl_place, req_extension="", verbose=verbose)
281
+
282
+ # remove original
283
+ try:
284
+ os.remove(save_loc)
285
+ del save_loc
286
+ except Exception:
287
+ print("unable to delete original zipfile - check if exists", datetime.now())
288
+
289
+ print("finished extracting zip - ", datetime.now())
290
+
291
+ return dl_place
292
+
293
+
294
+ def merge_dataframes(data_dir: str, ext=".xlsx", verbose=False):
295
+ """
296
+ merge_dataframes - given a filepath, loads and attempts to merge all files as dataframes
297
+
298
+ Args:
299
+ data_dir (str): [root directory to search in]
300
+ ext (str, optional): [anticipate file extension for the dataframes ]. Defaults to '.xlsx'.
301
+
302
+ Returns:
303
+ pd.DataFrame(): merged dataframe of all files
304
+ """
305
+
306
+ src = Path(data_dir)
307
+ src_str = str(src.resolve())
308
+ mrg_df = pd.DataFrame()
309
+
310
+ all_reports = load_dir_files(directory=src_str, req_extension=ext, verbose=verbose)
311
+
312
+ failed = []
313
+
314
+ for df_path in tqdm(all_reports, total=len(all_reports), desc="joining data..."):
315
+
316
+ try:
317
+ this_df = pd.read_excel(df_path).convert_dtypes()
318
+
319
+ mrg_df = pd.concat([mrg_df, this_df], axis=0)
320
+ except Exception:
321
+ short_p = os.path.basename(df_path)
322
+ print(
323
+ f"WARNING - file with extension {ext} and name {short_p} could not be read."
324
+ )
325
+ failed.append(short_p)
326
+
327
+ if len(failed) > 0:
328
+ print("failed to merge {} files, investigate as needed")
329
+
330
+ if verbose:
331
+ pp.pprint(mrg_df.info(True))
332
+
333
+ return mrg_df
334
+
335
+
336
+ def download_URL(url: str, file=None, dlpath=None, verbose=False):
337
+ """
338
+ download_URL - download a file from a URL and show progress bar
339
+
340
+ Parameters
341
+ ----------
342
+ url : str, URL to download
343
+ file : str, optional, default None, name of file to save to. If None, will use the filename from the URL
344
+ dlpath : str, optional, default None, path to save the file to. If None, will save to the current working directory
345
+ verbose : bool, optional, default False, print progress bar
346
+
347
+ Returns
348
+ -------
349
+ str - path to the downloaded file
350
+ """
351
+
352
+ if file is None:
353
+ if "?dl=" in url:
354
+ # is a dropbox link
355
+ prefile = url.split("/")[-1]
356
+ filename = str(prefile).split("?dl=")[0]
357
+ else:
358
+ filename = url.split("/")[-1]
359
+
360
+ file = clean(filename)
361
+ if dlpath is None:
362
+ dlpath = Path.cwd() # save to current working directory
363
+ else:
364
+ dlpath = Path(dlpath) # make a path object
365
+
366
+ r = requests.get(url, stream=True, allow_redirects=True)
367
+ total_size = int(r.headers.get("content-length"))
368
+ initial_pos = 0
369
+ dl_loc = dlpath / file
370
+ with open(str(dl_loc.resolve()), "wb") as f:
371
+ with tqdm(
372
+ total=total_size,
373
+ unit="B",
374
+ unit_scale=True,
375
+ desc=file,
376
+ initial=initial_pos,
377
+ ascii=True,
378
+ ) as pbar:
379
+ for ch in r.iter_content(chunk_size=1024):
380
+ if ch:
381
+ f.write(ch)
382
+ pbar.update(len(ch))
383
+
384
+ if verbose:
385
+ print(f"\ndownloaded {file} to {dlpath}\n")
386
+
387
+ return str(dl_loc.resolve())
388
+
389
+
390
+ def dl_extract_zip(
391
+ URLtoget: str,
392
+ extract_loc: str = None,
393
+ file_header: str = "TEMP_archive_dl_",
394
+ verbose: bool = False,
395
+ ):
396
+ """
397
+ dl_extract_zip - generic function to download a zip file and extract it
398
+
399
+ Parameters
400
+ ----------
401
+ URLtoget : str, zip file URL to download
402
+ extract_loc : str, optional, default None, path to save the zip file to. If None, will save to the current working directory
403
+ file_header : str, optional, default 'TEMP_archive_dl_', prefix for the zip file name
404
+ verbose : bool, optional, default False, print progress bar
405
+
406
+ Returns
407
+ -------
408
+ str - path to the downloaded and extracted folder
409
+ """
410
+
411
+ extract_loc = Path(extract_loc)
412
+ extract_loc.mkdir(parents=True, exist_ok=True)
413
+
414
+ save_loc = download_URL(
415
+ url=URLtoget, file=f"{file_header}.zip", dlpath=None, verbose=verbose
416
+ )
417
+
418
+ shutil.unpack_archive(save_loc, extract_dir=extract_loc)
419
+
420
+ if verbose:
421
+ print("extracted zip file - ", datetime.now())
422
+ x = load_dir_files(extract_loc, req_extension="", verbose=verbose)
423
+
424
+ # remove original
425
+ try:
426
+ os.remove(save_loc)
427
+ del save_loc
428
+ except Exception as e:
429
+ warnings.warn(message=f"unable to delete original zipfile due to {e}")
430
+ if verbose:
431
+ print("finished extracting zip - ", datetime.now())
432
+
433
+ return extract_loc