import streamlit as st import pdf2image import utils import numpy as np import cv2 import os import io from PIL import Image import re import shutil import zipfile from io import BytesIO temp_figure_dir="pdf_figures/" import time # get https://github.com/oschwartz10612/poppler-windows/releases/tag/v22.01.0-0 # poppler-utils: # Installed: 22.02.0-2ubuntu0.4 # install https://github.com/UB-Mannheim/tesseract/wiki #page extraction disabled def clean_filename(filename, replace_char=' '): # Check for empty filename or None if not filename or filename.isspace(): return None # Return None or maybe an empty string, depending on your requirements cleaned_name = filename.strip() # Trim whitespace from the ends # Platform-specific checks and clean-up if os.name == 'nt': # Windows invalid_chars = r'<>:"/\\|?*\0' invalid_names = {"CON", "PRN", "AUX", "NUL", "COM1", "COM2", "COM3", "COM4", "COM5", "COM6", "COM7", "COM8", "COM9", "LPT1", "LPT2", "LPT3", "LPT4", "LPT5", "LPT6", "LPT7", "LPT8", "LPT9"} # Replace invalid names with a placeholder or modify it in a specific way base_name, _, ext = cleaned_name.partition('.') if base_name.upper() in invalid_names: cleaned_name = replace_char * len(base_name) + '.' + ext else: # POSIX (Linux, macOS, etc.) invalid_chars = '/\0' # Remove invalid characters for char in invalid_chars: cleaned_name = cleaned_name.replace(char, replace_char) # Optionally, remove any double spaces and strip leading/trailing spaces cleaned_name = re.sub(' +', ' ', cleaned_name).strip() return cleaned_name def manage_temp_to_be_zipped_directory(directory_path): if os.path.exists(directory_path): # Remove the directory and all its contents shutil.rmtree(directory_path) print(f"Directory '{directory_path}' was removed.") # Optionally, you might want to recreate the directory immediately after deleting os.makedirs(directory_path) print(f"Directory '{directory_path}' was recreated.") else: # Create the directory since it does not exist os.makedirs(directory_path) print(f"Directory '{directory_path}' was created.") def zip_directory(directory_path): zip_buffer = BytesIO() with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: for root, dirs, files in os.walk(directory_path): for file in files: file_path = os.path.join(root, file) zip_file.write(file_path, arcname=file) zip_buffer.seek(0) return zip_buffer def is_new_pdf_upload(uploaded_file): if 'last_pdf_uploaded_file' in st.session_state: # Check if the newly uploaded file is different from the last one if (uploaded_file.name != st.session_state.last_pdf_uploaded_file['name'] or uploaded_file.size != st.session_state.last_pdf_uploaded_file['size']): st.session_state.last_pdf_uploaded_file = {'name': uploaded_file.name, 'size': uploaded_file.size} # st.write("A new src image file has been uploaded.") return True else: # st.write("The same src image file has been re-uploaded.") return False else: # st.write("This is the first file upload detected.") st.session_state.last_pdf_uploaded_file = {'name': uploaded_file.name, 'size': uploaded_file.size} return True # Store current file details in session state big_text = """

Locked PDF Ingestion

""" # Display the styled text st.markdown(big_text, unsafe_allow_html=True) if 'is_initialized' not in st.session_state: pdf_path = 'uploaded_pdf/data_sheet.pdf' st.session_state['is_initialized'] = True # page_count = utils.get_pdf_page_count(pdf_path) # print("page_count=",page_count) # page_count=5 # print("new page_count=",page_count) # read_pdf_progress_bar = st.progress(0) # st.session_state.color_image_list = [] # st.session_state.gray_image_np_list = [] # for page_number in range(page_count): # image = pdf2image.convert_from_path(pdf_path, first_page=page_number+1, last_page=page_number+1) # st.session_state.color_image_list.append(image[0]) # progress_percentage = (page_number) / (page_count-1) # read_pdf_progress_bar.progress(progress_percentage) # gray_pdf_image_np_list = [] # read_pdf_progress_bar.progress(0) # for index, image in enumerate(st.session_state.color_image_list): # image_np = np.array(image) # st.session_state.gray_image_np_list.append(cv2.cvtColor(np.array(image_np), cv2.COLOR_BGR2GRAY)) # progress_percentage = (index) / (page_count - 1) # read_pdf_progress_bar.progress(progress_percentage) # # cv2.line(st.session_state.gray_image_np_list[37], (174, 227), (174, 1790), 0, 2) # # cv2.line(st.session_state.gray_image_np_list[37], (1550, 227), (1550, 1790), 0, 2) # # cv2.line(st.session_state.gray_image_np_list[38], (226,227),(226,1444), 0,3) # # cv2.line(st.session_state.gray_image_np_list[38], (1601,227),(1601,1444), 0,2) # st.session_state.img_index = 0 # st.session_state.stop_button_clicked=False # # st.image(st.session_state.gray_image_np_list[38]) uploaded_locked_pdf_file = st.file_uploader("Upload a locked pdf", type=['pdf']) st.markdown( f'Sample 1 download and then upload to above', unsafe_allow_html=True) if uploaded_locked_pdf_file is not None: if is_new_pdf_upload(uploaded_locked_pdf_file): # To see details # file_details = {"FileName": uploaded_driving_video_file.name, "FileType": uploaded_driving_video_file.type, "FileSize": uploaded_driving_video_file.size} # st.write(file_details) save_path = './uploaded_videos' if not os.path.exists(save_path): os.makedirs(save_path) with open(os.path.join(save_path, uploaded_locked_pdf_file.name), "wb") as f: f.write(uploaded_locked_pdf_file.getbuffer()) # Write the file to the specified location st.success(f'Saved file temp_{uploaded_locked_pdf_file.name} in {save_path}') st.session_state.uploaded_pdf_path=os.path.join(save_path, uploaded_locked_pdf_file.name) st.session_state.page_count = utils.get_pdf_page_count(st.session_state.uploaded_pdf_path) print("page_count=",st.session_state.page_count) if 'extracted_text' in st.session_state: del st.session_state.extracted_text st.rerun() if 'page_count' in st.session_state: st.write(f"total page count = {st.session_state.page_count}") if 'num_pages_to_extract'not in st.session_state: st.session_state.num_pages_to_extract = st.slider('Number of pages to extract:', min_value=1, max_value=st.session_state.page_count, value=5, key='num_pages_to_extract_slider') else: st.session_state.num_pages_to_extract = st.slider('Number of pages to extract:', min_value=1, max_value=st.session_state.page_count, value=st.session_state.num_pages_to_extract , key='num_pages_to_extract_slider') st.write(f"num of pages to extract {st.session_state.num_pages_to_extract}") if 'run_button' in st.session_state and st.session_state.run_button == True: st.session_state.running = True else: st.session_state.running = False read_pdf_progress_bar = st.progress(0) if st.button('Extract Pages', disabled=st.session_state.running, key='run_button'): st.session_state.color_image_list = [] st.session_state.gray_image_np_list = [] st.session_state.pdf_figures_image_list=[] pdf_tables_image_list=[] st.session_state.pdf_text_list=[] for page_number in range(st.session_state.num_pages_to_extract): image = pdf2image.convert_from_path(st.session_state.uploaded_pdf_path, first_page=page_number+1, last_page=page_number+1) st.session_state.color_image_list.append(image[0]) progress_percentage = (page_number) / (st.session_state.num_pages_to_extract-1) read_pdf_progress_bar.progress(progress_percentage) read_pdf_progress_bar.progress(0) for index, image in enumerate(st.session_state.color_image_list): image_np = np.array(image) st.session_state.gray_image_np_list.append(cv2.cvtColor(np.array(image_np), cv2.COLOR_BGR2GRAY)) progress_percentage = (index) / (st.session_state.num_pages_to_extract - 1) read_pdf_progress_bar.progress(progress_percentage) st.session_state.extracted_text = "" manage_temp_to_be_zipped_directory(temp_figure_dir) for index, gray_pdf_image_np in enumerate(st.session_state.gray_image_np_list): print("index="+str(index)) figures_image_list,tables_image_list,text=utils.gray_pdf_image_np_to_text(index,gray_pdf_image_np, debug=True) st.session_state.pdf_figures_image_list.append(figures_image_list) if st.session_state.pdf_figures_image_list[index]: for pdf_figure_text_image in st.session_state.pdf_figures_image_list[index]: raw_image_file_name = f"page_{index}_{pdf_figure_text_image[0]}.png" cleaned_image_file_name = clean_filename(raw_image_file_name) Image.fromarray(pdf_figure_text_image[1]).save(temp_figure_dir+cleaned_image_file_name) pdf_tables_image_list.append(tables_image_list) st.session_state.pdf_text_list.append(text) st.session_state.extracted_text=st.session_state.extracted_text+f"\n" + text + f"\n\n>" # st.write(text) # print(text) progress_percentage = (index) / (st.session_state.num_pages_to_extract - 1) read_pdf_progress_bar.progress(progress_percentage) st.session_state.figure_zip_bytes=zip_directory(temp_figure_dir) #add_animation_to_image() #st.session_state['video_generated'] = True st.rerun() if 'extracted_text' in st.session_state: string_buffer = io.StringIO(st.session_state.extracted_text) txt_file_path=uploaded_locked_pdf_file.name.replace(".pdf", ".txt") st.download_button(label="Download Extraction txt File", data=string_buffer.getvalue(), file_name=txt_file_path, mime="text/plain") download_figure_zip_file_name = uploaded_locked_pdf_file.name.replace(".pdf", "_figures.zip") st.download_button( label="Download Figures ZIP", data=st.session_state.figure_zip_bytes, file_name=download_figure_zip_file_name, mime="application/zip" ) # st.image(Image.fromarray(bgr_image)) # for index,pdf_text in enumerate(st.session_state.pdf_text_list): for index, gray_pdf_image_np in enumerate(st.session_state.gray_image_np_list): st.write(f"Page {index+1} \n\n {st.session_state.pdf_text_list[index]}\n") if not st.session_state.pdf_figures_image_list[index]: st.write("no figures") else: for pdf_figure_text_image in st.session_state.pdf_figures_image_list[index]: st.write(pdf_figure_text_image[0]) st.image(Image.fromarray(pdf_figure_text_image[1])) # for index, gray_pdf_image_np in enumerate(st.session_state.gray_image_np_list[0:5], start=0): # print("index="+str(index)) # # text=utils.gray_pdf_image_np_to_text(index,gray_pdf_image_np, debug=True) # st.write(text) #if 'img_index' not in st.session_state: # if st.button("Stop"): # st.session_state.stop_button_clicked = True # st.write(str(st.session_state.img_index+1) +"/" + str(len(st.session_state.color_image_list))) # st.image(st.session_state.gray_image_np_list[st.session_state.img_index], use_column_width=True) # if not st.session_state.stop_button_clicked: # if st.session_state.img_index < len(st.session_state.color_image_list) - 1: # st.session_state.img_index += 1 # time.sleep(3) # st.rerun() # col1, col2 = st.columns(2) # with col1: # if st.button("Previous"): # print("Previous pressed") # # Decrease index, wrap around if it goes below 0 # print("st.session_state.img_index =", str(st.session_state.img_index)) # if st.session_state.img_index > 0: # print("case 1 before st.session_state.img_index =",str(st.session_state.img_index)) # st.session_state.img_index -= 1 # print("case 2 after st.session_state.img_index =", str(st.session_state.img_index)) # else: # print("case 2 st.session_state.img_index =", str(st.session_state.img_index)) # st.session_state.img_index = len(st.session_state.color_image_list) - 1 # with col2: # if st.button("Next"): # # print("Next pressed") # # Increase index, wrap around if it goes past the last image # if st.session_state.img_index < len(st.session_state.color_image_list) - 1: # st.session_state.img_index += 1 # # else: # st.session_state.img_index = 0 # # # total_pages = 100 # print(f"total_pages = {total_pages}") # st.write(f"total_pages = {total_pages}") # for page_number in range(total_pages): # pdf_image_list = convert_from_path(pdf_path) # images = convert_from_path(pdf_path, first_page=page_number + 1, last_page=page_number + 1) # progress = (page_number + 1) / total_pages * 100 # print(f"Progress: {progress:.2f}%") # print("done")