# import os | |
# import tempfile | |
# import streamlit as st | |
# from langchain_openai import ChatOpenAI | |
# from langchain.document_loaders import UnstructuredFileLoader | |
# from langchain_community.vectorstores import FAISS | |
# from langchain.embeddings import HuggingFaceEmbeddings | |
# from langchain.text_splitter import CharacterTextSplitter | |
# from langchain.chains import RetrievalQA | |
# from langchain_openai import OpenAIEmbeddings | |
# from langchain.vectorstores import FAISS | |
# from langchain import PromptTemplate | |
# from langchain_text_splitters import ( | |
# Language, | |
# RecursiveCharacterTextSplitter, | |
# ) | |
# import io | |
# import PyPDF2 | |
# import requests | |
# from bs4 import BeautifulSoup | |
# from selenium import webdriver | |
# from selenium.webdriver.chrome.service import Service | |
# from selenium.webdriver.chrome.options import Options | |
# from selenium.webdriver.common.by import By | |
# from selenium.webdriver.support.ui import WebDriverWait | |
# from selenium.webdriver.support import expected_conditions as EC | |
# from urllib.parse import urlparse, urljoin | |
# from firecrawl import FirecrawlApp | |
# import pymupdf4llm | |
# import pathlib | |
# import time | |
# # Models and parameters | |
# embedding_model = "text-embedding-3-small" | |
# llm_model = "gpt-4o-mini-2024-07-18" | |
# llm = ChatOpenAI(model=llm_model, temperature=0) | |
# # Hyperparameters | |
# PDF_CHUNK_SIZE = 1024 | |
# PDF_CHUNK_OVERLAP = 256 | |
# WEB_CHUNK_SIZE = 1024 | |
# WEB_CHUNK_OVERLAP = 256 | |
# TEXT_BLOB_CHUNK_SIZE = 1024 | |
# TEXT_BLOB_CHUNK_OVERLAP = 256 | |
# QA_CHUNK_SIZE = 1024 | |
# QA_CHUNK_OVERLAP = 256 | |
# k = 3 | |
# system_prompt = """You are a helpful assistant designed to answer questions based on the provided context. | |
# Using the context provided, please answer the user question as accurately and informatively as possible. If no relevant context is available, inform the user that you do not have the information needed to answer their question. | |
# """ | |
# def setup_driver(): | |
# options = Options() | |
# options.add_argument("--verbose") | |
# options.add_argument('--no-sandbox') | |
# options.add_argument('--headless') | |
# options.add_argument('--disable-gpu') | |
# options.add_argument("--window-size=1920, 1200") | |
# options.add_argument('--disable-dev-shm-usage') | |
# return webdriver.Chrome(options=options) | |
# def normalize_url(url): | |
# parsed = urlparse(url) | |
# normalized = f"https://{parsed.netloc.replace('www.', '')}{parsed.path.rstrip('/')}" | |
# if parsed.query: | |
# normalized += f"?{parsed.query}" | |
# return normalized | |
# def parse_web_page(url, base_url, visited_urls, driver, max_urls=None): | |
# normalized_url = normalize_url(url) | |
# if normalized_url in visited_urls: | |
# return | |
# visited_urls.add(normalized_url) | |
# driver.get(url) | |
# page_source = driver.page_source | |
# soup = BeautifulSoup(page_source, 'html.parser') | |
# for link in soup.find_all('a', href=True): | |
# next_url = urljoin(url, link['href']) | |
# parsed_next_url = urlparse(next_url) | |
# normalized_next_url = normalize_url(next_url) | |
# if (parsed_next_url.netloc == urlparse(base_url).netloc and | |
# normalized_next_url not in visited_urls): | |
# if max_urls is None or len(visited_urls) < max_urls: | |
# parse_web_page(normalized_next_url, base_url, visited_urls, driver, max_urls) | |
# if max_urls is not None and len(visited_urls) >= max_urls: | |
# break | |
# def crawl_url(url, max_urls): | |
# driver = setup_driver() | |
# visited_urls = set() | |
# parse_web_page(url, url, visited_urls, driver, max_urls) | |
# driver.quit() | |
# return visited_urls | |
# def process_webpage(web_content): | |
# md_splitter = RecursiveCharacterTextSplitter.from_language( | |
# language=Language.MARKDOWN, chunk_size=WEB_CHUNK_SIZE, chunk_overlap=WEB_CHUNK_OVERLAP | |
# ) | |
# return md_splitter.create_documents([web_content]) | |
# # def process_pdf(pdf_file): | |
# # md_text = pymupdf4llm.to_markdown(pdf_file) | |
# # md_splitter = RecursiveCharacterTextSplitter.from_language( | |
# # language=Language.MARKDOWN, chunk_size=PDF_CHUNK_SIZE, chunk_overlap=PDF_CHUNK_OVERLAP | |
# # ) | |
# # return md_splitter.create_documents([md_text]) | |
# def process_pdf(pdf_file): | |
# # Create a temporary file | |
# with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as temp_file: | |
# # Write the contents of the uploaded file to the temporary file | |
# temp_file.write(pdf_file.getvalue()) | |
# temp_file_path = temp_file.name | |
# try: | |
# # Use the temporary file path with pymupdf4llm | |
# md_text = pymupdf4llm.to_markdown(temp_file_path) | |
# md_splitter = RecursiveCharacterTextSplitter.from_language( | |
# language=Language.MARKDOWN, chunk_size=PDF_CHUNK_SIZE, chunk_overlap=PDF_CHUNK_OVERLAP | |
# ) | |
# return md_splitter.create_documents([md_text]) | |
# finally: | |
# # Remove the temporary file | |
# os.unlink(temp_file_path) | |
# def process_text_blob(text_blob): | |
# text_splitter = RecursiveCharacterTextSplitter(chunk_size=TEXT_BLOB_CHUNK_SIZE, chunk_overlap=TEXT_BLOB_CHUNK_OVERLAP | |
# ) | |
# return text_splitter.create_documents([text_blob]) | |
# def process_qa_document(qa_data): | |
# qa_pairs = qa_data.splitlines() | |
# formatted_qa_text = "" | |
# for pair in qa_pairs: | |
# if ';' in pair: | |
# question, answer = pair.split(';', 1) | |
# formatted_qa_text += f"Q: {question.strip()}\nA: {answer.strip()}\n\n" | |
# qa_splitter = RecursiveCharacterTextSplitter(chunk_size=QA_CHUNK_SIZE, chunk_overlap=QA_CHUNK_OVERLAP | |
# ) | |
# return qa_splitter.create_documents([formatted_qa_text]) | |
# def create_combined_knowledge_base(all_text_chunks): | |
# embeddings = OpenAIEmbeddings(model=embedding_model) | |
# return FAISS.from_documents(all_text_chunks, embeddings) | |
# def main(): | |
# st.title("Interactive Chatbot with Custom Knowledge Base") | |
# all_text_chunks = [] | |
# st.sidebar.header("Select Knowledge Sources") | |
# pdf_files = st.sidebar.file_uploader("Upload PDF files", type="pdf", accept_multiple_files=True) | |
# if pdf_files: | |
# for pdf_file in pdf_files: | |
# pdf_text_chunks = process_pdf(pdf_file) | |
# for chunk in pdf_text_chunks: | |
# chunk.metadata['source'] = pdf_file.name | |
# all_text_chunks.extend(pdf_text_chunks) | |
# text_blob = st.sidebar.text_area("Enter a large text blob") | |
# if text_blob: | |
# text_blob_chunks = process_text_blob(text_blob.strip()) | |
# for chunk in text_blob_chunks: | |
# chunk.metadata['source'] = "Text Blob" | |
# all_text_chunks.extend(text_blob_chunks) | |
# qa_data = st.sidebar.text_area("Enter QA data (Q;A per line)") | |
# if qa_data: | |
# qa_doc_chunks = process_qa_document(qa_data.strip()) | |
# for chunk in qa_doc_chunks: | |
# chunk.metadata['source'] = "QA Document" | |
# all_text_chunks.extend(qa_doc_chunks) | |
# url = st.sidebar.text_input("Enter webpage URL") | |
# if url: | |
# max_urls = st.sidebar.number_input("Max URLs to crawl", min_value=1, value=10) | |
# sub_urls = crawl_url(url, max_urls) | |
# sub_urls_list = list(sub_urls) | |
# selected_urls = st.sidebar.multiselect("Select sub-URLs to scrape", sub_urls_list) | |
# for selected_sub_url in selected_urls: | |
# app = FirecrawlApp(api_key=os.getenv("FIRECRAWL_API_KEY")) | |
# web_content = app.scrape_url(selected_sub_url)['markdown'] | |
# webpage_chunks = process_webpage(web_content) | |
# for chunk in webpage_chunks: | |
# chunk.metadata['source'] = selected_sub_url | |
# all_text_chunks.extend(webpage_chunks) | |
# if st.sidebar.button("Create Knowledge Base"): | |
# combined_knowledge_base = create_combined_knowledge_base(all_text_chunks) | |
# st.session_state['knowledge_base'] = combined_knowledge_base | |
# st.success("Knowledge base created successfully!") | |
# if 'knowledge_base' in st.session_state: | |
# st.header("Ask a Question") | |
# question = st.text_input("Enter your question") | |
# if st.button("Get Answer"): | |
# knowledge_base = st.session_state['knowledge_base'] | |
# template = ''' | |
# %s | |
# ------------------------------- | |
# Context: {context} | |
# Question: {question} | |
# Answer: | |
# ''' % (system_prompt) | |
# prompt = PromptTemplate( | |
# template=template, | |
# input_variables=['context', 'question'] | |
# ) | |
# qa_chain = RetrievalQA.from_chain_type( | |
# llm, | |
# retriever=knowledge_base.as_retriever(search_kwargs={"k": k}), | |
# return_source_documents=True, | |
# chain_type_kwargs={"prompt": prompt} | |
# ) | |
# response = qa_chain.invoke({"query": question}) | |
# st.write(f"**Answer:** {response['result']}") | |
# with st.expander("Sources used"): | |
# for doc in response['source_documents']: | |
# st.write(f"**Source:** {doc.metadata['source']}") | |
# st.write(f"**Content:**\n{doc.page_content}\n{'-'*40}") | |
# if __name__ == "__main__": | |
# main() | |
# import os | |
# import tempfile | |
# import streamlit as st | |
# from langchain_openai import ChatOpenAI | |
# from langchain.document_loaders import UnstructuredFileLoader | |
# from langchain_community.vectorstores import FAISS | |
# from langchain.embeddings import HuggingFaceEmbeddings | |
# from langchain.text_splitter import CharacterTextSplitter | |
# from langchain.chains import RetrievalQA | |
# from langchain_openai import OpenAIEmbeddings | |
# from langchain.vectorstores import FAISS | |
# from langchain import PromptTemplate | |
# from langchain_text_splitters import ( | |
# Language, | |
# RecursiveCharacterTextSplitter, | |
# ) | |
# # import asyncio | |
# # from crawl4ai import AsyncWebCrawler | |
# import pymupdf4llm | |
# import pathlib | |
# import time | |
# from firecrawl import FirecrawlApp | |
# # Models and parameters | |
# embedding_model = "text-embedding-3-small" | |
# llm_model = "gpt-4o-mini-2024-07-18" | |
# llm = ChatOpenAI(model=llm_model, temperature=0) | |
# # Hyperparameters | |
# PDF_CHUNK_SIZE = 1024 | |
# PDF_CHUNK_OVERLAP = 256 | |
# WEB_CHUNK_SIZE = 1024 | |
# WEB_CHUNK_OVERLAP = 256 | |
# k = 3 | |
# system_prompt = """You are a helpful assistant designed to answer questions based on the provided context. | |
# Using the context provided, please answer the user question as accurately and informatively as possible. If no relevant context is available, inform the user that you do not have the information needed to answer their question. | |
# """ | |
# def process_pdf(pdf_file): | |
# with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as temp_file: | |
# temp_file.write(pdf_file.getvalue()) | |
# temp_file_path = temp_file.name | |
# try: | |
# md_text = pymupdf4llm.to_markdown(temp_file_path) | |
# md_splitter = RecursiveCharacterTextSplitter.from_language( | |
# language=Language.MARKDOWN, chunk_size=PDF_CHUNK_SIZE, chunk_overlap=PDF_CHUNK_OVERLAP | |
# ) | |
# return md_splitter.create_documents([md_text]) | |
# finally: | |
# os.unlink(temp_file_path) | |
# async def crawl_urls(urls): | |
# async with AsyncWebCrawler() as crawler: | |
# tasks = [crawler.arun(url=url) for url in urls] | |
# results = await asyncio.gather(*tasks) | |
# return {url: result.markdown for url, result in zip(urls, results)} | |
# def create_combined_knowledge_base(all_text_chunks): | |
# embeddings = OpenAIEmbeddings(model=embedding_model) | |
# return FAISS.from_documents(all_text_chunks, embeddings) | |
# def main(): | |
# st.title("Interactive Chatbot with Custom Knowledge Base") | |
# all_text_chunks = [] | |
# st.sidebar.header("Select Knowledge Sources") | |
# pdf_files = st.sidebar.file_uploader("Upload PDF files", type="pdf", accept_multiple_files=True) | |
# if pdf_files: | |
# for pdf_file in pdf_files: | |
# pdf_text_chunks = process_pdf(pdf_file) | |
# for chunk in pdf_text_chunks: | |
# chunk.metadata['source'] = pdf_file.name | |
# all_text_chunks.extend(pdf_text_chunks) | |
# urls_input = st.sidebar.text_area("Enter webpage URLs (separated by ';')") | |
# urls = [url.strip() for url in urls_input.split(';') if url.strip()] | |
# # if urls and st.sidebar.button("Fetch Content"): | |
# # app = FirecrawlApp(api_key=os.getenv("FIRECRAWL_API_KEY")) | |
# # web_contents = app.scrape_url(urls)['markdown'] | |
# # if urls and st.sidebar.button("Fetch Content"): | |
# # app = FirecrawlApp(api_key=os.getenv("FIRECRAWL_API_KEY")) | |
# # web_contents = app.scrape_url(urls if isinstance(urls, list) else [urls])['markdown'] | |
# # for url, content in web_contents.items(): | |
# # webpage_chunks = RecursiveCharacterTextSplitter.from_language( | |
# # language=Language.MARKDOWN, chunk_size=WEB_CHUNK_SIZE, chunk_overlap=WEB_CHUNK_OVERLAP | |
# # ).create_documents([content]) | |
# # for chunk in webpage_chunks: | |
# # chunk.metadata['source'] = url | |
# # all_text_chunks.extend(webpage_chunks) | |
# if urls and st.sidebar.button("Fetch Content"): | |
# app = FirecrawlApp(api_key=os.getenv("FIRECRAWL_API_KEY")) | |
# web_contents = {} | |
# # Loop through each URL and scrape individually | |
# for url in urls: | |
# try: | |
# result = app.scrape_url(url) # Scrape a single URL | |
# web_contents[url] = result['markdown'] | |
# except Exception as e: | |
# st.error(f"Error scraping URL {url}: {e}") | |
# # Process the content of the scraped URLs | |
# for url, content in web_contents.items(): | |
# webpage_chunks = RecursiveCharacterTextSplitter.from_language( | |
# language=Language.MARKDOWN, chunk_size=WEB_CHUNK_SIZE, chunk_overlap=WEB_CHUNK_OVERLAP | |
# ).create_documents([content]) | |
# for chunk in webpage_chunks: | |
# chunk.metadata['source'] = url | |
# all_text_chunks.extend(webpage_chunks) | |
# if st.sidebar.button("Create Knowledge Base"): | |
# combined_knowledge_base = create_combined_knowledge_base(all_text_chunks) | |
# st.session_state['knowledge_base'] = combined_knowledge_base | |
# st.success("Knowledge base created successfully!") | |
# if 'knowledge_base' in st.session_state: | |
# st.header("Ask a Question") | |
# question = st.text_input("Enter your question") | |
# if question and st.button("Get Answer"): | |
# knowledge_base = st.session_state['knowledge_base'] | |
# template = ''' | |
# %s | |
# ------------------------------- | |
# Context: {context} | |
# Question: {question} | |
# Answer: | |
# ''' % (system_prompt) | |
# prompt = PromptTemplate( | |
# template=template, | |
# input_variables=['context', 'question'] | |
# ) | |
# qa_chain = RetrievalQA.from_chain_type( | |
# llm, | |
# retriever=knowledge_base.as_retriever(search_kwargs={"k": k}), | |
# return_source_documents=True, | |
# chain_type_kwargs={"prompt": prompt} | |
# ) | |
# response = qa_chain.invoke({"query": question}) | |
# st.write(f"**Answer:** {response['result']}") | |
# with st.expander("Sources used"): | |
# for doc in response['source_documents']: | |
# st.write(f"**Source:** {doc.metadata['source']}") | |
# st.write(f"**Content:** {doc.page_content}\n{'-'*40}") | |
# if __name__ == "__main__": | |
# main() | |
# import os | |
# import tempfile | |
# import streamlit as st | |
# from langchain_openai import ChatOpenAI | |
# from langchain.document_loaders import UnstructuredFileLoader | |
# from langchain_community.vectorstores import FAISS | |
# from langchain.embeddings import HuggingFaceEmbeddings | |
# from langchain.text_splitter import CharacterTextSplitter | |
# from langchain.chains import RetrievalQA | |
# from langchain_openai import OpenAIEmbeddings | |
# from langchain.vectorstores import FAISS | |
# from langchain import PromptTemplate | |
# from langchain_text_splitters import ( | |
# Language, | |
# RecursiveCharacterTextSplitter, | |
# ) | |
# import pymupdf4llm | |
# import pathlib | |
# import time | |
# from firecrawl import FirecrawlApp | |
# # Models and parameters | |
# embedding_model = "text-embedding-3-small" | |
# llm_model = "gpt-4o-mini-2024-07-18" | |
# llm = ChatOpenAI(model=llm_model, temperature=0) | |
# # Hyperparameters | |
# PDF_CHUNK_SIZE = 1024 | |
# PDF_CHUNK_OVERLAP = 256 | |
# WEB_CHUNK_SIZE = 1024 | |
# WEB_CHUNK_OVERLAP = 256 | |
# k = 3 | |
# system_prompt = """You are a helpful assistant designed to answer questions based on the provided context. | |
# Using the context provided, please answer the user question as accurately and informatively as possible. If no relevant context is available, inform the user that you do not have the information needed to answer their question. | |
# """ | |
# def process_pdf(pdf_file): | |
# with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as temp_file: | |
# temp_file.write(pdf_file.getvalue()) | |
# temp_file_path = temp_file.name | |
# try: | |
# md_text = pymupdf4llm.to_markdown(temp_file_path) | |
# md_splitter = RecursiveCharacterTextSplitter.from_language( | |
# language=Language.MARKDOWN, chunk_size=PDF_CHUNK_SIZE, chunk_overlap=PDF_CHUNK_OVERLAP | |
# ) | |
# return md_splitter.create_documents([md_text]) | |
# finally: | |
# os.unlink(temp_file_path) | |
# def create_combined_knowledge_base(all_text_chunks): | |
# embeddings = OpenAIEmbeddings(model=embedding_model) | |
# return FAISS.from_documents(all_text_chunks, embeddings) | |
# def main(): | |
# st.title("Dex Agent with Dynamic Knowledge Base") | |
# all_text_chunks = [] | |
# st.sidebar.header("Select Knowledge Sources") | |
# # Upload PDF files | |
# pdf_files = st.sidebar.file_uploader("Upload PDF files", type="pdf", accept_multiple_files=True) | |
# if pdf_files: | |
# for pdf_file in pdf_files: | |
# pdf_text_chunks = process_pdf(pdf_file) | |
# for chunk in pdf_text_chunks: | |
# chunk.metadata['source'] = pdf_file.name | |
# all_text_chunks.extend(pdf_text_chunks) | |
# # Input webpage URLs | |
# urls_input = st.sidebar.text_area("Enter webpage URLs (separated by ';')") | |
# urls = [url.strip() for url in urls_input.split(';') if url.strip()] | |
# if urls: | |
# # Scrape URLs and process their content | |
# app = FirecrawlApp(api_key=os.getenv("FIRECRAWL_API_KEY")) | |
# web_contents = {} | |
# # Loop through each URL and scrape individually | |
# for url in urls: | |
# try: | |
# result = app.scrape_url(url) # Scrape a single URL | |
# web_contents[url] = result['markdown'] | |
# except Exception as e: | |
# st.error(f"Error scraping URL {url}: {e}") | |
# # Process the content of the scraped URLs | |
# for url, content in web_contents.items(): | |
# webpage_chunks = RecursiveCharacterTextSplitter.from_language( | |
# language=Language.MARKDOWN, chunk_size=WEB_CHUNK_SIZE, chunk_overlap=WEB_CHUNK_OVERLAP | |
# ).create_documents([content]) | |
# for chunk in webpage_chunks: | |
# chunk.metadata['source'] = url | |
# all_text_chunks.extend(webpage_chunks) | |
# if st.sidebar.button("Create Knowledge Base"): | |
# if not all_text_chunks: | |
# st.error("No data available to create knowledge base. Please upload PDFs or provide URLs.") | |
# else: | |
# combined_knowledge_base = create_combined_knowledge_base(all_text_chunks) | |
# st.session_state['knowledge_base'] = combined_knowledge_base | |
# st.success("Knowledge base created successfully!") | |
# if 'knowledge_base' in st.session_state: | |
# st.header("Ask a Question") | |
# question = st.text_input("Enter your question") | |
# # Display the "Get Answer" button immediately after the knowledge base is created | |
# if question and st.button("Get Answer"): | |
# knowledge_base = st.session_state['knowledge_base'] | |
# template = ''' | |
# %s | |
# ------------------------------- | |
# Context: {context} | |
# Question: {question} | |
# Answer: | |
# ''' % (system_prompt) | |
# prompt = PromptTemplate( | |
# template=template, | |
# input_variables=['context', 'question'] | |
# ) | |
# qa_chain = RetrievalQA.from_chain_type( | |
# llm, | |
# retriever=knowledge_base.as_retriever(search_kwargs={"k": k}), | |
# return_source_documents=True, | |
# chain_type_kwargs={"prompt": prompt} | |
# ) | |
# response = qa_chain.invoke({"query": question}) | |
# st.write(f"Answer: {response['result']}") | |
# with st.expander("Sources used"): | |
# for doc in response['source_documents']: | |
# st.write(f"Source: {doc.metadata['source']}") | |
# st.write(f"Content: {doc.page_content}") | |
# if __name__ == "__main__": | |
# main() | |
# import os | |
# import tempfile | |
# import streamlit as st | |
# from langchain_openai import ChatOpenAI | |
# from langchain.document_loaders import UnstructuredFileLoader | |
# from langchain_community.vectorstores import FAISS | |
# from langchain.embeddings import HuggingFaceEmbeddings | |
# from langchain.text_splitter import CharacterTextSplitter | |
# from langchain.chains import RetrievalQA | |
# from langchain_openai import OpenAIEmbeddings | |
# from langchain.vectorstores import FAISS | |
# from langchain import PromptTemplate | |
# from langchain_text_splitters import ( | |
# Language, | |
# RecursiveCharacterTextSplitter, | |
# ) | |
# import pymupdf4llm | |
# import pathlib | |
# import time | |
# from firecrawl import FirecrawlApp | |
# # Models and parameters | |
# embedding_model = "text-embedding-3-small" | |
# llm_model = "gpt-4o-mini-2024-07-18" | |
# llm = ChatOpenAI(model=llm_model, temperature=0) | |
# # Hyperparameters | |
# PDF_CHUNK_SIZE = 1024 | |
# PDF_CHUNK_OVERLAP = 256 | |
# WEB_CHUNK_SIZE = 1024 | |
# WEB_CHUNK_OVERLAP = 256 | |
# k = 3 | |
# system_prompt = """You are a helpful assistant designed to answer questions based on the provided context. | |
# Using the context provided, please answer the user question as accurately and informatively as possible. If no relevant context is available, inform the user that you do not have the information needed to answer their question. | |
# """ | |
# def process_pdf(pdf_file): | |
# with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as temp_file: | |
# temp_file.write(pdf_file.getvalue()) | |
# temp_file_path = temp_file.name | |
# try: | |
# md_text = pymupdf4llm.to_markdown(temp_file_path) | |
# md_splitter = RecursiveCharacterTextSplitter.from_language( | |
# language=Language.MARKDOWN, chunk_size=PDF_CHUNK_SIZE, chunk_overlap=PDF_CHUNK_OVERLAP | |
# ) | |
# return md_splitter.create_documents([md_text]) | |
# finally: | |
# os.unlink(temp_file_path) | |
# def create_combined_knowledge_base(all_text_chunks): | |
# embeddings = OpenAIEmbeddings(model=embedding_model) | |
# return FAISS.from_documents(all_text_chunks, embeddings) | |
# def main(): | |
# st.title("Interactive Chatbot with Custom Knowledge Base") | |
# all_text_chunks = [] | |
# st.sidebar.header("Select Knowledge Sources") | |
# # Upload PDF files | |
# pdf_files = st.sidebar.file_uploader("Upload PDF files", type="pdf", accept_multiple_files=True) | |
# if pdf_files: | |
# for pdf_file in pdf_files: | |
# pdf_text_chunks = process_pdf(pdf_file) | |
# for chunk in pdf_text_chunks: | |
# chunk.metadata['source'] = pdf_file.name | |
# all_text_chunks.extend(pdf_text_chunks) | |
# # Input webpage URLs | |
# urls_input = st.sidebar.text_area("Enter webpage URLs (separated by ';')") | |
# urls = [url.strip() for url in urls_input.split(';') if url.strip()] | |
# if urls: | |
# # Scrape URLs and process their content | |
# app = FirecrawlApp(api_key=os.getenv("FIRECRAWL_API_KEY")) | |
# web_contents = {} | |
# # Loop through each URL and scrape individually | |
# for url in urls: | |
# try: | |
# result = app.scrape_url(url) # Scrape a single URL | |
# web_contents[url] = result['markdown'] | |
# except Exception as e: | |
# st.error(f"Error scraping URL {url}: {e}") | |
# # Process the content of the scraped URLs | |
# for url, content in web_contents.items(): | |
# webpage_chunks = RecursiveCharacterTextSplitter.from_language( | |
# language=Language.MARKDOWN, chunk_size=WEB_CHUNK_SIZE, chunk_overlap=WEB_CHUNK_OVERLAP | |
# ).create_documents([content]) | |
# for chunk in webpage_chunks: | |
# chunk.metadata['source'] = url | |
# all_text_chunks.extend(webpage_chunks) | |
# if st.sidebar.button("Create Knowledge Base"): | |
# if not all_text_chunks: | |
# st.error("No data available to create knowledge base. Please upload PDFs or provide URLs.") | |
# else: | |
# combined_knowledge_base = create_combined_knowledge_base(all_text_chunks) | |
# st.session_state['knowledge_base'] = combined_knowledge_base | |
# st.success("Knowledge base created successfully!") | |
# if 'knowledge_base' in st.session_state: | |
# st.header("Ask a Question") | |
# # Make sure the "Get Answer" button is always visible | |
# question = st.text_input("Enter your question") | |
# # Always show the "Get Answer" button if the knowledge base is created | |
# if st.button("Get Answer"): | |
# knowledge_base = st.session_state['knowledge_base'] | |
# template = ''' | |
# %s | |
# ------------------------------- | |
# Context: {context} | |
# Question: {question} | |
# Answer: | |
# ''' % (system_prompt) | |
# prompt = PromptTemplate( | |
# template=template, | |
# input_variables=['context', 'question'] | |
# ) | |
# qa_chain = RetrievalQA.from_chain_type( | |
# llm, | |
# retriever=knowledge_base.as_retriever(search_kwargs={"k": k}), | |
# return_source_documents=True, | |
# chain_type_kwargs={"prompt": prompt} | |
# ) | |
# response = qa_chain.invoke({"query": question}) | |
# st.write(f"**Answer:** {response['result']}") | |
# with st.expander("Sources used"): | |
# for doc in response['source_documents']: | |
# st.write(f"**Source:** {doc.metadata['source']}") | |
# st.write(f"**Content:** {doc.page_content}\n{'-'*40}") | |
# if __name__ == "__main__": | |
# main() | |
# import os | |
# import tempfile | |
# import streamlit as st | |
# from langchain_openai import ChatOpenAI | |
# from langchain.document_loaders import UnstructuredFileLoader | |
# from langchain_community.vectorstores import FAISS | |
# from langchain.embeddings import HuggingFaceEmbeddings | |
# from langchain.text_splitter import CharacterTextSplitter | |
# from langchain.chains import RetrievalQA | |
# from langchain_openai import OpenAIEmbeddings | |
# from langchain.vectorstores import FAISS | |
# from langchain import PromptTemplate | |
# from langchain_text_splitters import ( | |
# Language, | |
# RecursiveCharacterTextSplitter, | |
# ) | |
# import pymupdf4llm | |
# import pathlib | |
# import time | |
# from firecrawl import FirecrawlApp | |
# import tempfile | |
# import pymupdf4llm | |
# # Models and parameters | |
# embedding_model = "text-embedding-3-small" | |
# llm_model = "gpt-4o-mini-2024-07-18" | |
# llm = ChatOpenAI(model=llm_model, temperature=0) | |
# # Hyperparameters | |
# PDF_CHUNK_SIZE = 2048 | |
# PDF_CHUNK_OVERLAP = 512 | |
# WEB_CHUNK_SIZE = 2048 | |
# WEB_CHUNK_OVERLAP = 512 | |
# k = 5 | |
# system_prompt = """You are a helpful assistant designed to answer questions based on the provided context. | |
# Using the context provided, please answer the user question as accurately and informatively as possible. If no relevant context is available, inform the user that you do not have the information needed to answer their question. | |
# """ | |
# # Log current working directory | |
# # st.write("Current working directory:", os.getcwd()) | |
# # # List all files in the current directory | |
# # st.write("Files in current directory:", os.listdir(os.getcwd())) | |
# # def process_pdf(pdf_file): | |
# # with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as temp_file: | |
# # temp_file.write(pdf_file.getvalue()) | |
# # temp_file_path = temp_file.name | |
# # try: | |
# # md_text = pymupdf4llm.to_markdown(temp_file_path) | |
# # md_splitter = RecursiveCharacterTextSplitter.from_language( | |
# # language=Language.MARKDOWN, chunk_size=PDF_CHUNK_SIZE, chunk_overlap=PDF_CHUNK_OVERLAP | |
# # ) | |
# # return md_splitter.create_documents([md_text]) | |
# # finally: | |
# # os.unlink(temp_file_path) | |
# def process_pdf(pdf_file): | |
# # Create a temporary file to store the uploaded PDF | |
# with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as temp_file: | |
# temp_file.write(pdf_file.read()) # Read the content of the uploaded PDF file | |
# temp_file_path = temp_file.name | |
# try: | |
# md_text = pymupdf4llm.to_markdown(temp_file_path) # Process the PDF content | |
# md_splitter = RecursiveCharacterTextSplitter.from_language( | |
# language=Language.MARKDOWN, chunk_size=PDF_CHUNK_SIZE, chunk_overlap=PDF_CHUNK_OVERLAP | |
# ) | |
# return md_splitter.create_documents([md_text]) | |
# finally: | |
# # Clean up the temporary file | |
# os.unlink(temp_file_path) | |
# def create_combined_knowledge_base(all_text_chunks): | |
# embeddings = OpenAIEmbeddings(model=embedding_model) | |
# return FAISS.from_documents(all_text_chunks, embeddings) | |
# def main(): | |
# st.title("Interactive Chatbot with Custom Knowledge Base") | |
# all_text_chunks = [] | |
# st.sidebar.header("Select Knowledge Sources") | |
# # Specify your PDF files and URLs here | |
# pdf_files = ["bitcoin.pdf", "whitepaper-v3.pdf"] # Replace with actual paths | |
# pdf_file_paths = [os.path.join(os.getcwd(), pdf_file) for pdf_file in pdf_files] | |
# urls = ["https://www.coinbase.com/it/learn/crypto-basics/what-is-a-dex", "https://ethereum.org/en/whitepaper/"] # Replace with actual URLs | |
# # Process PDF files | |
# for pdf_file_path in pdf_file_paths: | |
# with open(pdf_file_path, "rb") as pdf_file: | |
# pdf_text_chunks = process_pdf(pdf_file) | |
# for chunk in pdf_text_chunks: | |
# chunk.metadata['source'] = pdf_file_path | |
# all_text_chunks.extend(pdf_text_chunks) | |
# # Scrape content from URLs | |
# app = FirecrawlApp(api_key=os.getenv("FIRECRAWL_API_KEY")) | |
# web_contents = {} | |
# for url in urls: | |
# try: | |
# result = app.scrape_url(url) | |
# web_contents[url] = result['markdown'] | |
# except Exception as e: | |
# st.error(f"Error scraping URL {url}: {e}") | |
# # Process the content of the scraped URLs | |
# for url, content in web_contents.items(): | |
# webpage_chunks = RecursiveCharacterTextSplitter.from_language( | |
# language=Language.MARKDOWN, chunk_size=WEB_CHUNK_SIZE, chunk_overlap=WEB_CHUNK_OVERLAP | |
# ).create_documents([content]) | |
# for chunk in webpage_chunks: | |
# chunk.metadata['source'] = url | |
# all_text_chunks.extend(webpage_chunks) | |
# # Check if the FAISS index exists locally | |
# current_dir = os.getcwd() | |
# # Define the path to save the FAISS index within the current directory | |
# faiss_index_path = os.path.join(current_dir, "faiss_index") | |
# # faiss_index_path = "faiss_index" | |
# if os.path.exists(faiss_index_path): | |
# # Load the existing FAISS index | |
# embeddings = OpenAIEmbeddings(model=embedding_model) | |
# knowledge_base = FAISS.load_local(faiss_index_path, embeddings, allow_dangerous_deserialization=True) | |
# st.success("Knowledge base loaded successfully!") | |
# else: | |
# # Create and save the new FAISS index | |
# knowledge_base = create_combined_knowledge_base(all_text_chunks) | |
# knowledge_base.save_local(faiss_index_path) | |
# st.success("Knowledge base created and saved successfully!") | |
# # Allow users to ask questions | |
# st.header("Ask a Question") | |
# # Always show the "Get Answer" button, and input the question | |
# question = st.text_input("Enter your question") | |
# # Always display the button | |
# if st.button("Get Answer"): | |
# if question: # Proceed only if a question is entered | |
# template = ''' | |
# %s | |
# ------------------------------- | |
# Context: {context} | |
# Question: {question} | |
# Answer: | |
# ''' % (system_prompt) | |
# prompt = PromptTemplate( | |
# template=template, | |
# input_variables=['context', 'question'] | |
# ) | |
# # Check if the knowledge base is loaded in session state | |
# knowledge_base = st.session_state.get('knowledge_base') | |
# if knowledge_base: | |
# try: | |
# qa_chain = RetrievalQA.from_chain_type( | |
# llm, | |
# retriever=knowledge_base.as_retriever(search_kwargs={"k": k}), | |
# return_source_documents=True, | |
# chain_type_kwargs={"prompt": prompt} | |
# ) | |
# # Get the response from the QA chain | |
# response = qa_chain.invoke({"query": question}) | |
# # Display the answer | |
# st.write(f"**Answer:** {response['result']}") | |
# with st.expander("Sources used"): | |
# for doc in response['source_documents']: | |
# st.write(f"**Source:** {doc.metadata['source']}") | |
# st.write(f"**Content:** {doc.page_content}\n{'-'*40}") | |
# except Exception as e: | |
# st.error(f"An error occurred while fetching the answer: {e}") | |
# else: | |
# st.error("Knowledge base is not loaded. Please create or load the knowledge base.") | |
# else: | |
# st.warning("Please enter a question to get an answer.") | |
# if __name__ == "__main__": | |
# main() | |
import os | |
import tempfile | |
import streamlit as st | |
from langchain_openai import ChatOpenAI | |
from langchain.document_loaders import UnstructuredFileLoader | |
from langchain_community.vectorstores import FAISS | |
from langchain.embeddings import OpenAIEmbeddings | |
from langchain.text_splitter import CharacterTextSplitter | |
from langchain.chains import RetrievalQA | |
from langchain import PromptTemplate | |
from langchain_text_splitters import ( | |
Language, | |
RecursiveCharacterTextSplitter, | |
) | |
import pymupdf4llm | |
import pathlib | |
import time | |
from firecrawl import FirecrawlApp | |
import tempfile | |
# Models and parameters | |
embedding_model = "text-embedding-3-small" | |
llm_model = "gpt-4o-mini-2024-07-18" | |
llm = ChatOpenAI(model=llm_model, temperature=0) | |
# Hyperparameters | |
PDF_CHUNK_SIZE = 2048 | |
PDF_CHUNK_OVERLAP = 512 | |
WEB_CHUNK_SIZE = 2048 | |
WEB_CHUNK_OVERLAP = 512 | |
k = 5 | |
system_prompt = """You are a helpful assistant designed to answer questions based on the provided context. | |
Using the context provided, please answer the user question as accurately and informatively as possible. If no relevant context is available, inform the user that you do not have the information needed to answer their question. | |
""" | |
def process_pdf(pdf_file): | |
# Create a temporary file to store the uploaded PDF | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as temp_file: | |
temp_file.write(pdf_file.read()) # Read the content of the uploaded PDF file | |
temp_file_path = temp_file.name | |
try: | |
md_text = pymupdf4llm.to_markdown(temp_file_path) # Process the PDF content | |
md_splitter = RecursiveCharacterTextSplitter.from_language( | |
language=Language.MARKDOWN, chunk_size=PDF_CHUNK_SIZE, chunk_overlap=PDF_CHUNK_OVERLAP | |
) | |
return md_splitter.create_documents([md_text]) | |
finally: | |
# Clean up the temporary file | |
os.unlink(temp_file_path) | |
def create_combined_knowledge_base(all_text_chunks): | |
embeddings = OpenAIEmbeddings(model=embedding_model) | |
return FAISS.from_documents(all_text_chunks, embeddings) | |
def main(): | |
st.title("Dex agent with Dynamic Knowledge Base") | |
all_text_chunks = [] | |
st.sidebar.header("Select Knowledge Sources") | |
# Specify your PDF files and URLs here | |
pdf_files = ["bitcoin.pdf", "whitepaper-v3.pdf"] # Replace with actual paths | |
pdf_file_paths = [os.path.join(os.getcwd(), pdf_file) for pdf_file in pdf_files] | |
urls = ["https://www.coinbase.com/it/learn/crypto-basics/what-is-a-dex", "https://ethereum.org/en/whitepaper/"] # Replace with actual URLs | |
# Process PDF files | |
for pdf_file_path in pdf_file_paths: | |
with open(pdf_file_path, "rb") as pdf_file: | |
pdf_text_chunks = process_pdf(pdf_file) | |
for chunk in pdf_text_chunks: | |
chunk.metadata['source'] = pdf_file_path | |
all_text_chunks.extend(pdf_text_chunks) | |
# Scrape content from URLs | |
app = FirecrawlApp(api_key=os.getenv("FIRECRAWL_API_KEY")) | |
web_contents = {} | |
for url in urls: | |
try: | |
result = app.scrape_url(url) | |
web_contents[url] = result['markdown'] | |
except Exception as e: | |
st.error(f"Error scraping URL {url}: {e}") | |
# Process the content of the scraped URLs | |
for url, content in web_contents.items(): | |
webpage_chunks = RecursiveCharacterTextSplitter.from_language( | |
language=Language.MARKDOWN, chunk_size=WEB_CHUNK_SIZE, chunk_overlap=WEB_CHUNK_OVERLAP | |
).create_documents([content]) | |
for chunk in webpage_chunks: | |
chunk.metadata['source'] = url | |
all_text_chunks.extend(webpage_chunks) | |
# Check if the FAISS index exists locally | |
current_dir = os.getcwd() | |
# Define the path to save the FAISS index within the current directory | |
faiss_index_path = os.path.join(current_dir, "faiss_index") | |
if os.path.exists(faiss_index_path): | |
# Load the existing FAISS index | |
embeddings = OpenAIEmbeddings(model=embedding_model) | |
knowledge_base = FAISS.load_local(faiss_index_path, embeddings, allow_dangerous_deserialization=True) | |
st.success("Knowledge base loaded successfully!") | |
else: | |
# Create and save the new FAISS index | |
knowledge_base = create_combined_knowledge_base(all_text_chunks) | |
knowledge_base.save_local(faiss_index_path) | |
st.success("Knowledge base created and saved successfully!") | |
# Store the knowledge base in session state to persist between interactions | |
st.session_state['knowledge_base'] = knowledge_base | |
# Allow users to ask questions | |
st.header("Ask a Question") | |
# Always show the "Get Answer" button, and input the question | |
question = st.text_input("Enter your question") | |
# Always display the button | |
if st.button("Get Answer"): | |
if question: # Proceed only if a question is entered | |
template = ''' | |
%s | |
------------------------------- | |
Context: {context} | |
Question: {question} | |
Answer: | |
''' % (system_prompt) | |
prompt = PromptTemplate( | |
template=template, | |
input_variables=['context', 'question'] | |
) | |
# Check if the knowledge base is loaded in session state | |
knowledge_base = st.session_state.get('knowledge_base') | |
if knowledge_base: | |
try: | |
qa_chain = RetrievalQA.from_chain_type( | |
llm, | |
retriever=knowledge_base.as_retriever(search_kwargs={"k": k}), | |
return_source_documents=True, | |
chain_type_kwargs={"prompt": prompt} | |
) | |
# Get the response from the QA chain | |
response = qa_chain.invoke({"query": question}) | |
# Display the answer | |
st.write(f"**Answer:** {response['result']}") | |
with st.expander("Sources used"): | |
for doc in response['source_documents']: | |
st.write(f"**Source:** {doc.metadata['source']}") | |
st.write(f"**Content:** {doc.page_content}\n{'-'*40}") | |
except Exception as e: | |
st.error(f"An error occurred while fetching the answer: {e}") | |
else: | |
st.error("Knowledge base is not loaded. Please create or load the knowledge base.") | |
else: | |
st.warning("Please enter a question to get an answer.") | |
if __name__ == "__main__": | |
main() | |