|
import pandas as pd |
|
import argparse |
|
import glob |
|
import os |
|
import re |
|
from tools.helper_functions import output_folder |
|
from sklearn.feature_extraction.text import TfidfVectorizer |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
import nltk |
|
from nltk.corpus import stopwords |
|
from nltk.tokenize import word_tokenize |
|
from nltk.stem import PorterStemmer |
|
import numpy as np |
|
import random |
|
import string |
|
from typing import List |
|
|
|
nltk.download('punkt') |
|
nltk.download('stopwords') |
|
nltk.download('punkt_tab') |
|
|
|
similarity_threshold = 0.9 |
|
|
|
stop_words = set(stopwords.words('english')) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
stemmer = PorterStemmer() |
|
vectorizer = TfidfVectorizer() |
|
|
|
def combine_ocr_output_text(input_files): |
|
""" |
|
Combines text from multiple CSV files containing page and text columns. |
|
Groups text by file and page number, concatenating text within these groups. |
|
|
|
Args: |
|
input_files (list): List of paths to CSV files |
|
|
|
Returns: |
|
pd.DataFrame: Combined dataframe with columns [file, page, text] |
|
""" |
|
all_data = [] |
|
output_files = [] |
|
|
|
if isinstance(input_files, str): |
|
file_paths_list = [input_files] |
|
else: |
|
file_paths_list = input_files |
|
|
|
for file in file_paths_list: |
|
|
|
if isinstance(file, str): |
|
file_path = file |
|
else: |
|
file_path = file.name |
|
|
|
|
|
df = pd.read_csv(file_path) |
|
|
|
|
|
if 'page' not in df.columns or 'text' not in df.columns: |
|
print(f"Warning: Skipping {file_path} - missing required columns 'page' and 'text'") |
|
continue |
|
|
|
|
|
grouped = df.groupby('page')['text'].apply(' '.join).reset_index() |
|
|
|
|
|
grouped['file'] = os.path.basename(file_path) |
|
|
|
all_data.append(grouped) |
|
|
|
if not all_data: |
|
raise ValueError("No valid CSV files were processed") |
|
|
|
|
|
combined_df = pd.concat(all_data, ignore_index=True) |
|
|
|
|
|
combined_df = combined_df[['file', 'page', 'text']] |
|
|
|
output_combined_file_path = output_folder + "combined_ocr_output_files.csv" |
|
combined_df.to_csv(output_combined_file_path, index=None) |
|
|
|
output_files.append(output_combined_file_path) |
|
|
|
return combined_df, output_files |
|
|
|
def process_data(df, column:str): |
|
''' |
|
Clean and stem text columns in a data frame |
|
''' |
|
|
|
def _clean_text(raw_text): |
|
|
|
clean = re.sub(r'<.*?>', '', raw_text) |
|
clean = re.sub(r' ', ' ', clean) |
|
clean = re.sub(r'\r\n', ' ', clean) |
|
clean = re.sub(r'<', ' ', clean) |
|
clean = re.sub(r'>', ' ', clean) |
|
clean = re.sub(r'<strong>', ' ', clean) |
|
clean = re.sub(r'</strong>', ' ', clean) |
|
|
|
|
|
clean = clean.replace(u'\xa0', u' ') |
|
|
|
clean = ' '.join(clean.split()) |
|
|
|
|
|
words = word_tokenize(clean.lower()) |
|
|
|
|
|
words = [word for word in words if word.isalpha()] |
|
|
|
|
|
words = [word for word in words if word not in stop_words] |
|
|
|
|
|
return ' '.join(words) |
|
|
|
|
|
def _apply_stemming(text): |
|
|
|
words = word_tokenize(text.lower()) |
|
|
|
|
|
stemmed_words = [stemmer.stem(word) for word in words] |
|
|
|
|
|
return ' '.join(stemmed_words) |
|
|
|
|
|
|
|
|
|
df['text_clean'] = df[column].apply(_clean_text) |
|
df['text_clean'] = df['text_clean'].apply(_apply_stemming) |
|
|
|
return df |
|
|
|
def identify_similar_pages(input_files:List[str]): |
|
|
|
output_paths = [] |
|
|
|
df, output_files = combine_ocr_output_text(input_files) |
|
|
|
output_paths.extend(output_files) |
|
|
|
|
|
df = process_data(df, 'text') |
|
|
|
|
|
tfidf_matrix = vectorizer.fit_transform(df['text_clean']) |
|
|
|
|
|
similarity_matrix = cosine_similarity(tfidf_matrix) |
|
|
|
|
|
np.fill_diagonal(similarity_matrix, 0) |
|
similar_pages = np.argwhere(similarity_matrix > similarity_threshold) |
|
|
|
|
|
|
|
|
|
similarity_df = pd.DataFrame({ |
|
'Page1_Index': similar_pages[:, 0], |
|
'Page2_Index': similar_pages[:, 1], |
|
'Page1_File': similar_pages[:, 0], |
|
'Page2_File': similar_pages[:, 1], |
|
'Similarity_Score': similarity_matrix[similar_pages[:, 0], similar_pages[:, 1]] |
|
}) |
|
|
|
|
|
similarity_df = similarity_df[similarity_df['Page1_Index'] < similarity_df['Page2_Index']] |
|
|
|
|
|
similarity_df['Page1_File'] = similarity_df['Page1_File'].map(df['file']) |
|
similarity_df['Page2_File'] = similarity_df['Page2_File'].map(df['file']) |
|
|
|
similarity_df['Page1_Page'] = similarity_df['Page1_Index'].map(df['page']) |
|
similarity_df['Page2_Page'] = similarity_df['Page2_Index'].map(df['page']) |
|
|
|
similarity_df['Page1_Text'] = similarity_df['Page1_Index'].map(df['text']) |
|
similarity_df['Page2_Text'] = similarity_df['Page2_Index'].map(df['text']) |
|
|
|
similarity_df_out = similarity_df[['Page1_File', 'Page1_Page', 'Page2_File', 'Page2_Page', 'Similarity_Score', 'Page1_Text', 'Page2_Text']] |
|
similarity_df_out = similarity_df_out.sort_values(["Page1_File", "Page1_Page", "Page2_File", "Page2_Page", "Similarity_Score"], ascending=[True, True, True, True, False]) |
|
|
|
|
|
similarity_file_output_path = output_folder + 'page_similarity_results.csv' |
|
similarity_df_out.to_csv(similarity_file_output_path, index=False) |
|
|
|
output_paths.append(similarity_file_output_path) |
|
|
|
if not similarity_df_out.empty: |
|
unique_files = similarity_df_out['Page2_File'].unique() |
|
for redact_file in unique_files: |
|
output_file_name = output_folder + redact_file + "_whole_page.csv" |
|
whole_pages_to_redact_df = similarity_df_out.loc[similarity_df_out['Page2_File']==redact_file,:][['Page2_Page']] |
|
whole_pages_to_redact_df.to_csv(output_file_name, header=None, index=None) |
|
|
|
output_paths.append(output_file_name) |
|
|
|
|
|
return similarity_df_out, output_paths |
|
|
|
|
|
|
|
def perturb_text_with_errors(series): |
|
|
|
def _perturb_text(text, error_probability=0.1): |
|
words = text.split() |
|
perturbed_words = [] |
|
|
|
for word in words: |
|
if random.random() < error_probability: |
|
perturbation_type = random.choice(['char_error', 'extra_space', 'extra_punctuation']) |
|
|
|
if perturbation_type == 'char_error': |
|
idx = random.randint(0, len(word) - 1) |
|
char = random.choice(string.ascii_lowercase) |
|
word = word[:idx] + char + word[idx:] |
|
|
|
elif perturbation_type == 'extra_space': |
|
word = ' ' + word + ' ' |
|
|
|
elif perturbation_type == 'extra_punctuation': |
|
punctuation = random.choice(string.punctuation) |
|
idx = random.randint(0, len(word)) |
|
word = word[:idx] + punctuation + word[idx:] |
|
|
|
perturbed_words.append(word) |
|
|
|
return ' '.join(perturbed_words) |
|
|
|
series = series.apply(lambda x: _perturb_text(x, error_probability=0.1)) |
|
|
|
return series |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|