transactify / data_preprocessing.py
ananthakrishnan
tech: model creation
0cb9929
raw
history blame
4.28 kB
# Import Required Libraries:
import numpy as np
import pandas as pd
import torch
from transformers import BertTokenizer
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
import re
# Read the data
def read_data(path):
try:
df = pd.read_csv(path)
if df.empty:
print("The file is empty.")
return None
return df
except FileNotFoundError:
print(f"File not found at: {path}")
return None
except Exception as e:
print(f"An error occurred: {e}")
return None
# Path to your data file
data_path = r"E:\transactify\transactify\Dataset\transaction_data.csv"
# Read the data and check if it was loaded successfully
data = read_data(data_path)
if data is not None:
print("Data loaded successfully:")
print(data.head(15))
else:
print("Data loading failed. Exiting...")
exit()
# Cleaning the text
def clean_text(text):
text = text.lower() # Converting uppercase to lowercase
text = re.sub(r"\d+", " ", text) # Removing digits in the text
text = re.sub(r"[^\w\s]", " ", text) # Removing punctuations
text = text.strip() # Remove extra spaces
return text
# Preprocessing the data
def preprocessing_data(df, max_length=20):
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
input_ids = []
attention_masks = []
# Ensure the dataframe has the required columns
if "Transaction Description" not in df.columns or "Category" not in df.columns:
raise ValueError("The required columns 'Transaction Description' and 'Category' are missing from the dataset.")
for description in df["Transaction Description"]:
cleaned_text = clean_text(description)
# Debugging print statements
# print(f"Original Description: {description}")
# print(f"Cleaned Text: {cleaned_text}")
# Only tokenize if the cleaned text is not empty
if cleaned_text:
encoded_dict = tokenizer.encode_plus(
cleaned_text,
add_special_tokens=True, # Add special tokens for BERT
max_length=max_length,
pad_to_max_length=True,
return_attention_mask=True,
return_tensors="pt",
truncation=True
)
input_ids.append(encoded_dict['input_ids']) # Append input IDs
attention_masks.append(encoded_dict['attention_mask']) # Append attention masks
else:
print("Cleaned text is empty, skipping...")
# Debugging output to check sizes
print(f"Total input_ids collected: {len(input_ids)}")
print(f"Total attention_masks collected: {len(attention_masks)}")
if not input_ids:
raise ValueError("No input_ids were collected. Check the cleaning process.")
# Concatenating the list of tensors to form a single tensor
input_ids = torch.cat(input_ids, dim=0)
attention_masks = torch.cat(attention_masks, dim=0)
# Encoding the labels
labelencoder = LabelEncoder()
labels = labelencoder.fit_transform(df["Category"])
labels = torch.tensor(labels, dtype=torch.long) # Convert labels to LongTensor
return input_ids, attention_masks, labels, labelencoder
# Split the data into train and test sets
def split_data(input_ids, attention_masks, labels, test_size=0.2, random_state=42):
X_train_ids, X_test_ids, y_train, y_test = train_test_split(
input_ids, labels, test_size=test_size, random_state=random_state
)
X_train_masks, X_test_masks = train_test_split(
attention_masks, test_size=test_size, random_state=random_state
)
return X_train_ids, X_test_ids, X_train_masks, X_test_masks, y_train, y_test
# Preprocess the data and split into train and test sets
input_ids, attention_masks, labels, labelencoder = preprocessing_data(data)
X_train_ids, X_test_ids, X_train_masks, X_test_masks, y_train, y_test = split_data(input_ids, attention_masks, labels)
# Output the sizes of the splits for confirmation
print(f"Training set size: {X_train_ids.shape[0]}")
print(f"Test set size: {X_test_ids.shape[0]}")