Spaces:
Running
Running
#import tensorflow as tf # Tensorflow use deprecated | |
import torch | |
import pandas as pd | |
import numpy as np | |
from typing import Type, Dict, List, Tuple | |
from datetime import datetime | |
PandasDataFrame = Type[pd.DataFrame] | |
PandasSeries = Type[pd.Series] | |
MatchedResults = Dict[str,Tuple[str,int]] | |
array = List[str] | |
today = datetime.now().strftime("%d%m%Y") | |
today_rev = datetime.now().strftime("%Y%m%d") | |
# # Neural net functions | |
def vocab_lookup(characters: str, vocab) -> (int, np.ndarray): | |
""" | |
Taken from the function from the addressnet package by Jason Rigby | |
Converts a string into a list of vocab indices | |
:param characters: the string to convert | |
:param training: if True, artificial typos will be introduced | |
:return: the string length and an array of vocab indices | |
""" | |
result = list() | |
for c in characters.lower(): | |
try: | |
result.append(vocab.index(c) + 1) | |
except ValueError: | |
result.append(0) | |
return len(characters), np.array(result, dtype=np.int64) | |
# ## Neural net predictor functions | |
def text_to_model_input_local(in_text, vocab, model_type = "estimator"): | |
addresses_out = [] | |
model_input_out = [] | |
encoded_text = [] | |
# Calculate longest string length | |
import heapq | |
# get the index of the largest element in the list | |
index = heapq.nlargest(1, range(len(in_text)), key=lambda x: len(in_text[x]))[0] | |
# use the index to get the corresponding string | |
longest_string = len(in_text[index]) | |
#print("Longest string is: " + str(longest_string)) | |
for x in range(0, len(in_text)): | |
out = vocab_lookup(in_text[x], vocab) | |
addresses_out.append(out) | |
#print(out) | |
# Tensorflow model use deprecated | |
# if model_type == "estimator": | |
# model_input_add= tf.train.Example(features=tf.train.Features(feature={ | |
# 'lengths': tf.train.Feature(int64_list=tf.train.Int64List(value=[out[0]])), | |
# 'encoded_text': tf.train.Feature(int64_list=tf.train.Int64List(value=out[1].tolist())) | |
# })).SerializeToString() | |
# model_input_out.append(model_input_add) | |
if model_type == "keras": | |
encoded_text.append(out[1]) | |
# Tensorflow model use deprecated | |
# if model_type == "keras": | |
# # Pad out the strings so they're all the same length. 69 seems to be the value for spaces | |
# model_input_out = tf.keras.utils.pad_sequences(encoded_text, maxlen=longest_string, padding="post", truncating="post", value=0)#69) | |
return addresses_out, model_input_out | |
def reformat_predictions_local(predict_out): | |
predictions_list_reformat = [] | |
for x in range(0,len(predict_out['pred_output_classes'])): | |
new_entry = {'class_ids': predict_out['pred_output_classes'][x], 'probabilities': predict_out['probabilities'][x]} | |
predictions_list_reformat.append(new_entry) | |
return predictions_list_reformat | |
def predict_serve_conv_local(in_text:List[str], labels_list, predictions) -> List[Dict[str, str]]: | |
class_names = [l.replace("_code", "") for l in labels_list] | |
class_names = [l.replace("_abbreviation", "") for l in class_names] | |
#print(input_text) | |
#print(list(zip(input_text, predictions))) | |
for addr, res in zip(in_text, predictions): | |
#print(zip(input_text, predictions)) | |
mappings = dict() | |
#print(addr.upper()) | |
#print(res['class_ids']) | |
for char, class_id in zip(addr.upper(), res['class_ids']): | |
#print(char) | |
if class_id == 0: | |
continue | |
cls = class_names[class_id - 1] | |
mappings[cls] = mappings.get(cls, "") + char | |
#print(mappings) | |
yield mappings | |
#return mappings | |
def prep_predict_export(prediction_outputs, in_text): | |
out_list = list(prediction_outputs) | |
df_out = pd.DataFrame(out_list) | |
#print(in_text) | |
#print(df_out) | |
df_out["address"] = in_text | |
return out_list, df_out | |
def full_predict_func(list_to_predict, model, vocab, labels_list): | |
if hasattr(model, "summary"): # Indicates this is a keras model rather than an estimator | |
model_type = "keras" | |
else: model_type = "estimator" | |
list_to_predict = [x.upper() for x in list_to_predict] | |
addresses_out, model_input = text_to_model_input_local(list_to_predict, vocab, model_type) | |
if hasattr(model, "summary"): | |
probs = model.predict(model_input, use_multiprocessing=True) | |
classes = probs.argmax(axis=-1) | |
predictions = {'pred_output_classes':classes, 'probabilities':probs} | |
else: | |
print("Tensorflow use deprecated") | |
#predictions = model.signatures["predict_output"](predictor_inputs=tf.constant(model_input)) # This was for when using the contrib module | |
#predictions = model.signatures["serving_default"](predictor_inputs=tf.constant(model_input)) | |
predictions_list_reformat = reformat_predictions_local(predictions) | |
#### Final output as list or dataframe | |
output = predict_serve_conv_local(list(list_to_predict), labels_list, predictions_list_reformat) | |
list_out, predict_df = prep_predict_export(output, list_to_predict) | |
# Add organisation as a column if it doesn't already exist | |
if 'Organisation' not in predict_df.columns: | |
predict_df['Organisation'] = "" | |
return list_out, predict_df | |
# - | |
def predict_torch(model, model_type, input_text, word_to_index, device): | |
#print(device) | |
# Convert input_text to tensor of character indices | |
indexed_texts = [[word_to_index.get(char, word_to_index['<UNK>']) for char in text] for text in input_text] | |
# Calculate max_len based on indexed_texts | |
max_len = max(len(text) for text in indexed_texts) | |
# Pad sequences and convert to tensor | |
padded_texts = torch.tensor([text + [word_to_index['<pad>']] * (max_len - len(text)) for text in indexed_texts]) | |
with torch.no_grad(): | |
texts = padded_texts.to(device) | |
if (model_type == "lstm") | (model_type == "gru"): | |
text_lengths = texts.ne(word_to_index['<pad>']).sum(dim=1) | |
predictions = model(texts, text_lengths) | |
if model_type == "transformer": | |
# Call model with texts and pad_idx | |
predictions = model(texts, word_to_index['<pad>']) | |
# Convert predictions to most likely category indices | |
_, predicted_indices = predictions.max(2) | |
return predicted_indices | |
def torch_predictions_to_dicts(input_text, predicted_indices, index_to_category): | |
results = [] | |
for i, text in enumerate(input_text): | |
# Treat each character in the input text as a "token" | |
tokens = list(text) # Convert string to a list of characters | |
# Create a dictionary for the current text | |
curr_dict = {} | |
# Iterate over the predicted categories and the tokens together | |
for category_index, token in zip(predicted_indices[i], tokens): | |
# Convert the category index to its name | |
category_name = index_to_category[category_index.item()] | |
# Append the token to the category in the dictionary (or create the category if it doesn't exist) | |
if category_name in curr_dict: | |
curr_dict[category_name] += token # No space needed between characters | |
else: | |
curr_dict[category_name] = token | |
results.append(curr_dict) | |
return results | |
def torch_prep_predict_export(prediction_outputs, in_text): | |
#out_list = list(prediction_outputs) | |
df_out = pd.DataFrame(prediction_outputs).drop("IGNORE", axis = 1) | |
#print(in_text) | |
#print(df_out) | |
df_out["address"] = in_text | |
return df_out | |
def full_predict_torch(model, model_type, input_text, word_to_index, cat_to_idx, device): | |
input_text = [x.upper() for x in input_text] | |
predicted_indices = predict_torch(model, model_type, input_text, word_to_index, device) | |
index_to_category = {v: k for k, v in cat_to_idx.items()} | |
results_dict = torch_predictions_to_dicts(input_text, predicted_indices, index_to_category) | |
df_out = torch_prep_predict_export(results_dict, input_text) | |
return results_dict, df_out | |
def post_predict_clean(predict_df, orig_search_df, ref_address_cols, search_df_key_field): | |
# Add address to ref_address_cols | |
ref_address_cols_add = ref_address_cols.copy() | |
ref_address_cols_add.extend(['address']) | |
# Create column if it doesn't exist | |
for x in ref_address_cols: | |
predict_df[x] = predict_df.get(x, np.nan) | |
predict_df = predict_df[ref_address_cols_add] | |
#Columns that are in the ref and model, but are not matched in this instance, need to be filled in with blanks | |
predict_cols_match = list(predict_df.drop(["address"],axis=1).columns) | |
predict_cols_match_uprn = predict_cols_match.copy() | |
predict_cols_match_uprn.append("UPRN") | |
pred_output_missing_cols = list(set(ref_address_cols) - set(predict_cols_match)) | |
predict_df[pred_output_missing_cols] = np.nan | |
predict_df = predict_df.fillna("").infer_objects(copy=False) | |
#Convert all columns to string | |
all_columns = list(predict_df) # Creates list of all column headers | |
predict_df[all_columns] = predict_df[all_columns].astype(str) | |
predict_df = predict_df.replace("\.0","",regex=True) | |
#When comparing with ref, the postcode existing in the data will be used to compare rather than the postcode predicted by the model. This is to minimise errors in matching | |
predict_df = predict_df.rename(columns={"Postcode":"Postcode_predict"}) | |
orig_search_df_pc = orig_search_df[[search_df_key_field, "postcode"]].rename(columns={"postcode":"Postcode"}).reset_index(drop=True) | |
predict_df = predict_df.merge(orig_search_df_pc, left_index=True, right_index=True, how = "left") | |
predict_df[search_df_key_field] = predict_df[search_df_key_field].astype(str) | |
return predict_df | |