import os import re import requests import pysrt from langchain_community.document_loaders import ( PyMuPDFLoader, Docx2txtLoader, YoutubeLoader, WebBaseLoader, TextLoader, ) from langchain_community.document_loaders import UnstructuredMarkdownLoader from llama_parse import LlamaParse from langchain.schema import Document import logging from langchain.text_splitter import RecursiveCharacterTextSplitter from ragatouille import RAGPretrainedModel from langchain.chains import LLMChain from langchain_community.llms import OpenAI from langchain import PromptTemplate import json from concurrent.futures import ThreadPoolExecutor from urllib.parse import urljoin import html2text import bs4 import tempfile import PyPDF2 try: from modules.dataloader.helpers import get_metadata from modules.config.constants import OPENAI_API_KEY, LLAMA_CLOUD_API_KEY except: from dataloader.helpers import get_metadata from config.constants import OPENAI_API_KEY, LLAMA_CLOUD_API_KEY logger = logging.getLogger(__name__) BASE_DIR = os.getcwd() class PDFReader: def __init__(self): pass def get_loader(self, pdf_path): loader = PyMuPDFLoader(pdf_path) return loader def get_documents(self, loader): return loader.load() class LlamaParser: def __init__(self): self.GPT_API_KEY = OPENAI_API_KEY self.LLAMA_CLOUD_API_KEY = LLAMA_CLOUD_API_KEY print(f"LLAMA_CLOUD_API_KEY: {LLAMA_CLOUD_API_KEY}") self.parse_url = "" self.headers = { 'Accept': 'application/json', 'Authorization': 'Bearer llx-vap5Bk2zbYLfqTq2aZDvNHwscvsBPQiSjvLOGkgUa9SS8CWB' } self.parser = LlamaParse( api_key=LLAMA_CLOUD_API_KEY, result_type="markdown", verbose=True, language="en", gpt4o_mode=False, # gpt4o_api_key=OPENAI_API_KEY, parsing_instruction="The provided documents are PDFs of lecture slides of deep learning material. They contain LaTeX equations, images, and text. The goal is to extract the text, images and equations from the slides and convert them to markdown format. The markdown should be clean and easy to read, and any math equation should be converted to LaTeX, between $$. For images, give a description and if you can, a source." ) def parse(self, pdf_path): pdf_name = os.path.basename(pdf_path) documents = self.parser.load_data(pdf_path) documents = [document.to_langchain_format() for document in documents] os.remove(pdf_path) # cleanup, just in case return documents def make_request(self, pdf_url): payload = { "gpt4o_mode": "false", "parsing_instruction": "The provided document is a PDF of lecture slides of deep learning material. They contain LaTeX equations, images, and text. The goal is to extract the text, images and equations from the slides and convert them to markdown format. The markdown should be clean and easy to read, and any math equation should be converted to LaTeX, between $$. For images, give a description and if you can, a source.", } files = [ ('file', ('file', requests.get(pdf_url).content, 'application/octet-stream')) ] response = requests.request( "POST", self.parse_url, headers=self.headers, data=payload, files=files) return response.json()['id'], response.json()['status'] async def get_result(self, job_id): url = f"{job_id}/result/markdown" response = requests.request("GET", url, headers=self.headers, data={}) return response.json()['markdown'] async def _parse(self, pdf_path): job_id, status = self.make_request(pdf_path) while status != "SUCCESS": url = f"{job_id}" response = requests.request("GET", url, headers=self.headers, data={}) status = response.json()["status"] result = await self.get_result(job_id) documents = [ Document( page_content=result, metadata={"source": pdf_path} ) ] return documents async def _parse(self, pdf_path): return await self._parse(pdf_path) class HTMLReader: def __init__(self): pass def read_url(self, url): response = requests.get(url) if response.status_code == 200: return response.text else: logger.warning(f"Failed to download HTML from URL: {url}") return None def check_links(self, base_url, html_content): soup = bs4.BeautifulSoup(html_content, "html.parser") for link in soup.find_all("a"): href = link.get("href") if not href or href.startswith("#"): continue elif not href.startswith("https"): href = href.replace("http", "https") absolute_url = urljoin(base_url, href) link['href'] = absolute_url resp = requests.head(absolute_url) if resp.status_code != 200: logger.warning(f"Link {absolute_url} is broken") logger.warning(f"Status code: {resp.status_code}") return str(soup) def html_to_md(self, url, html_content): html_processed = self.check_links(url, html_content) markdown_content = html2text.html2text(html_processed) return markdown_content def read_html(self, url): html_content = self.read_url(url) if html_content: return self.html_to_md(url, html_content) else: return None class FileReader: def __init__(self, logger, kind): self.logger = logger self.kind = kind if kind == "llama": self.pdf_reader = LlamaParser() else: self.pdf_reader = PDFReader() self.web_reader = HTMLReader() def extract_text_from_pdf(self, pdf_path): text = "" with open(pdf_path, "rb") as file: reader = PyPDF2.PdfReader(file) num_pages = len(reader.pages) for page_num in range(num_pages): page = reader.pages[page_num] text += page.extract_text() return text @staticmethod def download_pdf_from_url(pdf_url): print("Downloading PDF from URL: ", pdf_url) response = requests.get(pdf_url) if response.status_code == 200: with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as temp_file: temp_file.write(response.content) temp_file_path = return temp_file_path else: self.logger.error(f"Failed to download PDF from URL: {pdf_url}") return None def read_pdf(self, temp_file_path: str): if self.kind == "llama": documents = self.pdf_reader.parse(temp_file_path) # if using async else: loader = self.pdf_reader.get_loader(temp_file_path) documents = self.pdf_reader.get_documents(loader) return documents def read_txt(self, temp_file_path: str): loader = TextLoader(temp_file_path, autodetect_encoding=True) return loader.load() def read_docx(self, temp_file_path: str): loader = Docx2txtLoader(temp_file_path) return loader.load() def read_srt(self, temp_file_path: str): subs = text = "" for sub in subs: text += sub.text return [Document(page_content=text)] def read_youtube_transcript(self, url: str): loader = YoutubeLoader.from_youtube_url( url, add_video_info=True, language=["en"], translation="en" ) return loader.load() def read_html(self, url: str): loader = WebBaseLoader(url) return loader.load() def read_tex_from_url(self, tex_url): response = requests.get(tex_url) if response.status_code == 200: return [Document(page_content=response.text)] else: self.logger.error(f"Failed to fetch .tex file from URL: {tex_url}") return None class ChunkProcessor: def __init__(self, config, logger): self.config = config self.logger = logger self.document_data = {} self.document_metadata = {} self.document_chunks_full = [] if config["splitter_options"]["use_splitter"]: if config["splitter_options"]["split_by_token"]: self.splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder( chunk_size=config["splitter_options"]["chunk_size"], chunk_overlap=config["splitter_options"]["chunk_overlap"], separators=config["splitter_options"]["chunk_separators"], disallowed_special=(), ) else: self.splitter = RecursiveCharacterTextSplitter( chunk_size=config["splitter_options"]["chunk_size"], chunk_overlap=config["splitter_options"]["chunk_overlap"], separators=config["splitter_options"]["chunk_separators"], disallowed_special=(), ) else: self.splitter = None"ChunkProcessor instance created") def remove_delimiters(self, document_chunks: list): for chunk in document_chunks: for delimiter in self.config["splitter_options"]["delimiters_to_remove"]: chunk.page_content = re.sub(delimiter, " ", chunk.page_content) return document_chunks def remove_chunks(self, document_chunks: list): front = self.config["splitter_options"]["front_chunk_to_remove"] end = self.config["splitter_options"]["last_chunks_to_remove"] for _ in range(front): del document_chunks[0] for _ in range(end): document_chunks.pop() return document_chunks def process_chunks( self, documents, file_type="txt", source="", page=0, metadata={} ): documents = [Document(page_content=documents, source=source, page=page)] if ( file_type == "txt" or file_type == "docx" or file_type == "srt" or file_type == "tex" ): document_chunks = self.splitter.split_documents(documents) elif file_type == "pdf": document_chunks = documents # Full page for now # add the source and page number back to the metadata for chunk in document_chunks: chunk.metadata["source"] = source chunk.metadata["page"] = page # add the metadata extracted from the document for key, value in metadata.items(): chunk.metadata[key] = value if self.config["splitter_options"]["remove_leftover_delimiters"]: document_chunks = self.remove_delimiters(document_chunks) if self.config["splitter_options"]["remove_chunks"]: document_chunks = self.remove_chunks(document_chunks) return document_chunks def chunk_docs(self, file_reader, uploaded_files, weblinks): addl_metadata = get_metadata( "", "", ) # For any additional metadata with ThreadPoolExecutor() as executor: self.process_file, uploaded_files, range(len(uploaded_files)), [file_reader] * len(uploaded_files), [addl_metadata] * len(uploaded_files), ) self.process_weblink, weblinks, range(len(weblinks)), [file_reader] * len(weblinks), [addl_metadata] * len(weblinks), ) document_names = [ f"{file_name}_{page_num}" for file_name, pages in self.document_data.items() for page_num in pages.keys() ] documents = [ page for doc in self.document_data.values() for page in doc.values() ] document_metadata = [ page for doc in self.document_metadata.values() for page in doc.values() ] self.save_document_data() f"Total document chunks extracted: {len(self.document_chunks_full)}" ) return self.document_chunks_full, document_names, documents, document_metadata def process_documents( self, documents, file_path, file_type, metadata_source, addl_metadata ): file_data = {} file_metadata = {} for doc in documents: # if len(doc.page_content) <= 400: # better approach to filter out non-informative documents # continue page_num = doc.metadata.get("page", 0) file_data[page_num] = doc.page_content metadata = ( addl_metadata.get(file_path, {}) if metadata_source == "file" else {"source": file_path, "page": page_num} ) file_metadata[page_num] = metadata if self.config["vectorstore"]["db_option"] not in ["RAGatouille"]: document_chunks = self.process_chunks( doc.page_content, file_type, source=file_path, page=page_num, metadata=metadata, ) self.document_chunks_full.extend(document_chunks) self.document_data[file_path] = file_data self.document_metadata[file_path] = file_metadata def process_file(self, file_path, file_index, file_reader, addl_metadata): file_name = os.path.basename(file_path) storage_dir = os.path.join(os.getcwd(), self.config["vectorstore"]["data_path"]) local_path = os.path.join(storage_dir, file_name) if not os.path.exists(local_path): local_path = FileReader.download_pdf_from_url(pdf_url=file_path) if file_name in self.document_data: return file_type = file_name.split(".")[-1].lower()"Reading file {file_index + 1}: {local_path}") read_methods = { "pdf": file_reader.read_pdf, "txt": file_reader.read_txt, "docx": file_reader.read_docx, "srt": file_reader.read_srt, "tex": file_reader.read_tex_from_url, } if file_type not in read_methods: self.logger.warning(f"Unsupported file type: {file_type}") return try: documents = read_methods[file_type](local_path) self.process_documents( documents, local_path, file_type, "file", addl_metadata ) except Exception as e: self.logger.error(f"Error processing file {file_name}: {str(e)}") def process_weblink(self, link, link_index, file_reader, addl_metadata): if link in self.document_data: return"Reading link {link_index + 1} : {link}") try: if "youtube" in link: documents = file_reader.read_youtube_transcript(link) else: documents = file_reader.read_html(link) self.process_documents(documents, link, "txt", "link", addl_metadata) except Exception as e: self.logger.error(f"Error Reading link {link_index + 1} : {link}: {str(e)}") def save_document_data(self): if not os.path.exists(f"{self.config['log_chunk_dir']}/docs"): os.makedirs(f"{self.config['log_chunk_dir']}/docs") f"Creating directory {self.config['log_chunk_dir']}/docs for document data" ) f"Saving document content to {self.config['log_chunk_dir']}/docs/doc_content.json" ) if not os.path.exists(f"{self.config['log_chunk_dir']}/metadata"): os.makedirs(f"{self.config['log_chunk_dir']}/metadata") f"Creating directory {self.config['log_chunk_dir']}/metadata for document metadata" ) f"Saving document metadata to {self.config['log_chunk_dir']}/metadata/doc_metadata.json" ) with open( f"{self.config['log_chunk_dir']}/docs/doc_content.json", "w" ) as json_file: json.dump(self.document_data, json_file, indent=4) with open( f"{self.config['log_chunk_dir']}/metadata/doc_metadata.json", "w" ) as json_file: json.dump(self.document_metadata, json_file, indent=4) def load_document_data(self): with open( f"{self.config['log_chunk_dir']}/docs/doc_content.json", "r" ) as json_file: self.document_data = json.load(json_file) with open( f"{self.config['log_chunk_dir']}/metadata/doc_metadata.json", "r" ) as json_file: self.document_metadata = json.load(json_file) class DataLoader: def __init__(self, config, logger=None): self.file_reader = FileReader(logger=logger, kind=config["llm_params"]["pdf_reader"]) self.chunk_processor = ChunkProcessor(config, logger=logger) def get_chunks(self, uploaded_files, weblinks): return self.chunk_processor.chunk_docs( self.file_reader, uploaded_files, weblinks ) if __name__ == "__main__": import yaml logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) with open("../code/modules/config/config.yml", "r") as f: config = yaml.safe_load(f) STORAGE_DIR = os.path.join(BASE_DIR, config['vectorstore']["data_path"]) uploaded_files = [ os.path.join(STORAGE_DIR, file) for file in os.listdir(STORAGE_DIR) if file != "urls.txt" ] data_loader = DataLoader(config, logger=logger) document_chunks, document_names, documents, document_metadata = ( data_loader.get_chunks( uploaded_files, [""], ) ) print(document_names) print(len(document_chunks))