|
|
|
import numpy as np |
|
import pandas as pd |
|
|
|
import torch |
|
from transformers import BertTokenizer |
|
from sklearn.preprocessing import LabelEncoder |
|
from sklearn.model_selection import train_test_split |
|
import re |
|
|
|
|
|
def read_data(path): |
|
try: |
|
df = pd.read_csv(path) |
|
if df.empty: |
|
print("The file is empty.") |
|
return None |
|
return df |
|
except FileNotFoundError: |
|
print(f"File not found at: {path}") |
|
return None |
|
except Exception as e: |
|
print(f"An error occurred: {e}") |
|
return None |
|
|
|
|
|
data_path = r"E:\transactify\transactify\Dataset\transaction_data.csv" |
|
|
|
|
|
data = read_data(data_path) |
|
if data is not None: |
|
print("Data loaded successfully:") |
|
print(data.head(15)) |
|
else: |
|
print("Data loading failed. Exiting...") |
|
exit() |
|
|
|
|
|
def clean_text(text): |
|
text = text.lower() |
|
text = re.sub(r"\d+", " ", text) |
|
text = re.sub(r"[^\w\s]", " ", text) |
|
text = text.strip() |
|
return text |
|
|
|
|
|
def preprocessing_data(df, max_length=20): |
|
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased") |
|
|
|
input_ids = [] |
|
attention_masks = [] |
|
|
|
|
|
if "Transaction Description" not in df.columns or "Category" not in df.columns: |
|
raise ValueError("The required columns 'Transaction Description' and 'Category' are missing from the dataset.") |
|
|
|
for description in df["Transaction Description"]: |
|
cleaned_text = clean_text(description) |
|
|
|
|
|
|
|
|
|
|
|
|
|
if cleaned_text: |
|
encoded_dict = tokenizer.encode_plus( |
|
cleaned_text, |
|
add_special_tokens=True, |
|
max_length=max_length, |
|
pad_to_max_length=True, |
|
return_attention_mask=True, |
|
return_tensors="pt", |
|
truncation=True |
|
) |
|
|
|
input_ids.append(encoded_dict['input_ids']) |
|
attention_masks.append(encoded_dict['attention_mask']) |
|
else: |
|
print("Cleaned text is empty, skipping...") |
|
|
|
|
|
print(f"Total input_ids collected: {len(input_ids)}") |
|
print(f"Total attention_masks collected: {len(attention_masks)}") |
|
|
|
if not input_ids: |
|
raise ValueError("No input_ids were collected. Check the cleaning process.") |
|
|
|
|
|
input_ids = torch.cat(input_ids, dim=0) |
|
attention_masks = torch.cat(attention_masks, dim=0) |
|
|
|
|
|
labelencoder = LabelEncoder() |
|
labels = labelencoder.fit_transform(df["Category"]) |
|
labels = torch.tensor(labels, dtype=torch.long) |
|
|
|
return input_ids, attention_masks, labels, labelencoder |
|
|
|
|
|
def split_data(input_ids, attention_masks, labels, test_size=0.2, random_state=42): |
|
X_train_ids, X_test_ids, y_train, y_test = train_test_split( |
|
input_ids, labels, test_size=test_size, random_state=random_state |
|
) |
|
|
|
X_train_masks, X_test_masks = train_test_split( |
|
attention_masks, test_size=test_size, random_state=random_state |
|
) |
|
|
|
return X_train_ids, X_test_ids, X_train_masks, X_test_masks, y_train, y_test |
|
|
|
|
|
input_ids, attention_masks, labels, labelencoder = preprocessing_data(data) |
|
X_train_ids, X_test_ids, X_train_masks, X_test_masks, y_train, y_test = split_data(input_ids, attention_masks, labels) |
|
|
|
|
|
print(f"Training set size: {X_train_ids.shape[0]}") |
|
print(f"Test set size: {X_test_ids.shape[0]}") |
|
|