Spaces:
Running
Running
File size: 10,297 Bytes
dd1cbb4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 |
#import tensorflow as tf # Tensorflow use deprecated
import torch
import pandas as pd
import numpy as np
from typing import Type, Dict, List, Tuple
from datetime import datetime
PandasDataFrame = Type[pd.DataFrame]
PandasSeries = Type[pd.Series]
MatchedResults = Dict[str,Tuple[str,int]]
array = List[str]
today = datetime.now().strftime("%d%m%Y")
today_rev = datetime.now().strftime("%Y%m%d")
# # Neural net functions
def vocab_lookup(characters: str, vocab) -> (int, np.ndarray):
"""
Taken from the function from the addressnet package by Jason Rigby
Converts a string into a list of vocab indices
:param characters: the string to convert
:param training: if True, artificial typos will be introduced
:return: the string length and an array of vocab indices
"""
result = list()
for c in characters.lower():
try:
result.append(vocab.index(c) + 1)
except ValueError:
result.append(0)
return len(characters), np.array(result, dtype=np.int64)
# ## Neural net predictor functions
def text_to_model_input_local(in_text, vocab, model_type = "estimator"):
addresses_out = []
model_input_out = []
encoded_text = []
# Calculate longest string length
import heapq
# get the index of the largest element in the list
index = heapq.nlargest(1, range(len(in_text)), key=lambda x: len(in_text[x]))[0]
# use the index to get the corresponding string
longest_string = len(in_text[index])
#print("Longest string is: " + str(longest_string))
for x in range(0, len(in_text)):
out = vocab_lookup(in_text[x], vocab)
addresses_out.append(out)
#print(out)
# Tensorflow model use deprecated
# if model_type == "estimator":
# model_input_add= tf.train.Example(features=tf.train.Features(feature={
# 'lengths': tf.train.Feature(int64_list=tf.train.Int64List(value=[out[0]])),
# 'encoded_text': tf.train.Feature(int64_list=tf.train.Int64List(value=out[1].tolist()))
# })).SerializeToString()
# model_input_out.append(model_input_add)
if model_type == "keras":
encoded_text.append(out[1])
# Tensorflow model use deprecated
# if model_type == "keras":
# # Pad out the strings so they're all the same length. 69 seems to be the value for spaces
# model_input_out = tf.keras.utils.pad_sequences(encoded_text, maxlen=longest_string, padding="post", truncating="post", value=0)#69)
return addresses_out, model_input_out
def reformat_predictions_local(predict_out):
predictions_list_reformat = []
for x in range(0,len(predict_out['pred_output_classes'])):
new_entry = {'class_ids': predict_out['pred_output_classes'][x], 'probabilities': predict_out['probabilities'][x]}
predictions_list_reformat.append(new_entry)
return predictions_list_reformat
def predict_serve_conv_local(in_text:List[str], labels_list, predictions) -> List[Dict[str, str]]:
class_names = [l.replace("_code", "") for l in labels_list]
class_names = [l.replace("_abbreviation", "") for l in class_names]
#print(input_text)
#print(list(zip(input_text, predictions)))
for addr, res in zip(in_text, predictions):
#print(zip(input_text, predictions))
mappings = dict()
#print(addr.upper())
#print(res['class_ids'])
for char, class_id in zip(addr.upper(), res['class_ids']):
#print(char)
if class_id == 0:
continue
cls = class_names[class_id - 1]
mappings[cls] = mappings.get(cls, "") + char
#print(mappings)
yield mappings
#return mappings
def prep_predict_export(prediction_outputs, in_text):
out_list = list(prediction_outputs)
df_out = pd.DataFrame(out_list)
#print(in_text)
#print(df_out)
df_out["address"] = in_text
return out_list, df_out
def full_predict_func(list_to_predict, model, vocab, labels_list):
if hasattr(model, "summary"): # Indicates this is a keras model rather than an estimator
model_type = "keras"
else: model_type = "estimator"
list_to_predict = [x.upper() for x in list_to_predict]
addresses_out, model_input = text_to_model_input_local(list_to_predict, vocab, model_type)
if hasattr(model, "summary"):
probs = model.predict(model_input, use_multiprocessing=True)
classes = probs.argmax(axis=-1)
predictions = {'pred_output_classes':classes, 'probabilities':probs}
else:
print("Tensorflow use deprecated")
#predictions = model.signatures["predict_output"](predictor_inputs=tf.constant(model_input)) # This was for when using the contrib module
#predictions = model.signatures["serving_default"](predictor_inputs=tf.constant(model_input))
predictions_list_reformat = reformat_predictions_local(predictions)
#### Final output as list or dataframe
output = predict_serve_conv_local(list(list_to_predict), labels_list, predictions_list_reformat)
list_out, predict_df = prep_predict_export(output, list_to_predict)
# Add organisation as a column if it doesn't already exist
if 'Organisation' not in predict_df.columns:
predict_df['Organisation'] = ""
return list_out, predict_df
# -
def predict_torch(model, model_type, input_text, word_to_index, device):
#print(device)
# Convert input_text to tensor of character indices
indexed_texts = [[word_to_index.get(char, word_to_index['<UNK>']) for char in text] for text in input_text]
# Calculate max_len based on indexed_texts
max_len = max(len(text) for text in indexed_texts)
# Pad sequences and convert to tensor
padded_texts = torch.tensor([text + [word_to_index['<pad>']] * (max_len - len(text)) for text in indexed_texts])
with torch.no_grad():
texts = padded_texts.to(device)
if (model_type == "lstm") | (model_type == "gru"):
text_lengths = texts.ne(word_to_index['<pad>']).sum(dim=1)
predictions = model(texts, text_lengths)
if model_type == "transformer":
# Call model with texts and pad_idx
predictions = model(texts, word_to_index['<pad>'])
# Convert predictions to most likely category indices
_, predicted_indices = predictions.max(2)
return predicted_indices
def torch_predictions_to_dicts(input_text, predicted_indices, index_to_category):
results = []
for i, text in enumerate(input_text):
# Treat each character in the input text as a "token"
tokens = list(text) # Convert string to a list of characters
# Create a dictionary for the current text
curr_dict = {}
# Iterate over the predicted categories and the tokens together
for category_index, token in zip(predicted_indices[i], tokens):
# Convert the category index to its name
category_name = index_to_category[category_index.item()]
# Append the token to the category in the dictionary (or create the category if it doesn't exist)
if category_name in curr_dict:
curr_dict[category_name] += token # No space needed between characters
else:
curr_dict[category_name] = token
results.append(curr_dict)
return results
def torch_prep_predict_export(prediction_outputs, in_text):
#out_list = list(prediction_outputs)
df_out = pd.DataFrame(prediction_outputs).drop("IGNORE", axis = 1)
#print(in_text)
#print(df_out)
df_out["address"] = in_text
return df_out
def full_predict_torch(model, model_type, input_text, word_to_index, cat_to_idx, device):
input_text = [x.upper() for x in input_text]
predicted_indices = predict_torch(model, model_type, input_text, word_to_index, device)
index_to_category = {v: k for k, v in cat_to_idx.items()}
results_dict = torch_predictions_to_dicts(input_text, predicted_indices, index_to_category)
df_out = torch_prep_predict_export(results_dict, input_text)
return results_dict, df_out
def post_predict_clean(predict_df, orig_search_df, ref_address_cols, search_df_key_field):
# Add address to ref_address_cols
ref_address_cols_add = ref_address_cols.copy()
ref_address_cols_add.extend(['address'])
# Create column if it doesn't exist
for x in ref_address_cols:
predict_df[x] = predict_df.get(x, np.nan)
predict_df = predict_df[ref_address_cols_add]
#Columns that are in the ref and model, but are not matched in this instance, need to be filled in with blanks
predict_cols_match = list(predict_df.drop(["address"],axis=1).columns)
predict_cols_match_uprn = predict_cols_match.copy()
predict_cols_match_uprn.append("UPRN")
pred_output_missing_cols = list(set(ref_address_cols) - set(predict_cols_match))
predict_df[pred_output_missing_cols] = np.nan
predict_df = predict_df.fillna("").infer_objects(copy=False)
#Convert all columns to string
all_columns = list(predict_df) # Creates list of all column headers
predict_df[all_columns] = predict_df[all_columns].astype(str)
predict_df = predict_df.replace("\.0","",regex=True)
#When comparing with ref, the postcode existing in the data will be used to compare rather than the postcode predicted by the model. This is to minimise errors in matching
predict_df = predict_df.rename(columns={"Postcode":"Postcode_predict"})
orig_search_df_pc = orig_search_df[[search_df_key_field, "postcode"]].rename(columns={"postcode":"Postcode"}).reset_index(drop=True)
predict_df = predict_df.merge(orig_search_df_pc, left_index=True, right_index=True, how = "left")
predict_df[search_df_key_field] = predict_df[search_df_key_field].astype(str)
return predict_df
|