import gradio as gr |
import os |
import shutil |
import requests |
import zipfile |
from PyPDF2 import PdfFileReader, PdfFileWriter |
import PyPDF2 |
from io import BytesIO |
from reportlab.lib.pagesizes import letter |
from reportlab.platypus import SimpleDocTemplate,Preformatted |
from reportlab.platypus import Image as RLImage |
from reportlab.platypus import Paragraph, Spacer |
from reportlab.lib.styles import getSampleStyleSheet |
from reportlab.lib.utils import ImageReader |
from PIL import Image |
import os |
from langchain.indexes.vectorstore import VectorstoreIndexCreator |
from langchain.chains import VectorDBQA,VectorDBQAWithSourcesChain |
from langchain import OpenAI |
from langchain.document_loaders import UnstructuredPDFLoader |
from langchain.vectorstores.faiss import FAISS |
from langchain.embeddings.openai import OpenAIEmbeddings |
from flask import send_file |
class REPOGPT: |
def __init__(self) -> None: |
self.repo_link = None |
self.api_key = None |
def init_agent(self, api_key, repo_link = None, load_vectorstore = None): |
try: |
os.remove('merged.pdf') |
except: |
pass |
self.repo_link = repo_link |
self.api_key = api_key |
self.load_vectorstore = load_vectorstore |
assert self.api_key != None, "You need to provide an API key" |
self.REPOGPT_Initialized() |
return gr.update(visible = True),'Initialize Finished' |
def REPOGPT_Initialized(self,image_included = False): |
os.environ["OPENAI_API_KEY"] = self.api_key |
if self.load_vectorstore == None: |
loader = UnstructuredPDFLoader( self.create_repo_pdf(self.repo_link,image_included = image_included)) |
self.index = VectorstoreIndexCreator(vectorstore_cls = FAISS).from_loaders([loader]) |
self.vectorstore = self.index.vectorstore |
print(' vectorstore created') |
else: |
embeddings = OpenAIEmbeddings() |
self.vectorstore = FAISS.load_local(self.load_vectorstore,embeddings =embeddings) |
print(' vectorstore loaded') |
self.qa = VectorDBQA.from_chain_type(llm =OpenAI(temperature=0, model_name="gpt-3.5-turbo"), chain_type = "stuff",vectorstore = self.vectorstore ) |
def download_repo_zip(self, link, output_folder = "main.zip"): |
username = link.split('/')[3] |
repo = link.split('/')[4] |
zip_url = f"https://github.com/{username}/{repo}/archive/refs/heads/master.zip" |
self.zip_url = zip_url |
response = requests.get(zip_url) |
response.raise_for_status() |
with open('main.zip', 'wb') as f: |
f.write(response.content) |
def extract_zip(self, zip_file, destination_folder): |
with zipfile.ZipFile(zip_file) as zf: |
zf.extractall(destination_folder) |
folder_name = zf.namelist()[0] |
return folder_name |
def convert_to_pdf(self, input_path, output_path): |
if input_path.endswith(".pdf"): |
buffer = BytesIO() |
doc = SimpleDocTemplate(buffer, pagesize=letter) |
styles = getSampleStyleSheet() |
elements = [] |
heading = Paragraph(f"File path: {input_path}", styles["Heading2"]) |
elements.append(heading) |
elements.append(Spacer(1, 12)) |
doc.build(elements) |
buffer.seek(0) |
new_pdf = PdfFileReader(buffer) |
with open(input_path, "rb") as f: |
input_pdf = PdfFileReader(f) |
pdf_writer = PdfFileWriter() |
for page_num in range(new_pdf.getNumPages()): |
pdf_writer.addPage(new_pdf.getPage(page_num)) |
for page_num in range(input_pdf.getNumPages()): |
pdf_writer.addPage(input_pdf.getPage(page_num)) |
with open(output_path, "wb") as f: |
pdf_writer.write(f) |
elif input_path.lower().endswith((".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff")): |
img = Image.open(input_path) |
img_reader = ImageReader(img) |
img_width, img_height = img.size |
aspect_ratio = img_height / img_width |
max_pdf_width = letter[0] - 2 * 72 |
max_pdf_height = letter[1] - 2 * 72 |
if img_width > max_pdf_width: |
img_width = max_pdf_width |
img_height = img_width * aspect_ratio |
if img_height > max_pdf_height: |
img_height = max_pdf_height |
img_width = img_height / aspect_ratio |
img_width = int(img_width) |
img_height = int(img_height) |
img = img.resize((int(img_width), int(img_height))) |
img = img.resize((int(img_width), int(img_height))) |
img.save(output_path, "PNG") |
doc = SimpleDocTemplate(output_path, pagesize=letter) |
styles = getSampleStyleSheet() |
elements = [] |
heading = Paragraph(f" {input_path}", styles["Heading2"]) |
elements.append(heading) |
elements.append(Spacer(1, 12)) |
img_rl = RLImage(input_path, width=img_width, height=img_height, kind='proportional') |
elements.append(img_rl) |
doc.build(elements) |
else: |
with open(input_path, "r") as f: |
content = f.read() |
doc = SimpleDocTemplate(output_path, pagesize=letter) |
styles = getSampleStyleSheet() |
elements = [] |
heading = Paragraph(f"{input_path}", styles["Heading2"]) |
elements.append(heading) |
elements.append(Spacer(1, 12)) |
text = Preformatted(content, style=styles["Code"]) |
elements.append(text) |
doc.build(elements) |
def merge_pdfs(self, pdf_files, output_path): |
pdf_writer = PyPDF2.PdfWriter() |
for pdf_file in pdf_files: |
with open(pdf_file, "rb") as f: |
try: |
pdf_reader = PyPDF2.PdfReader(f) |
if pdf_reader.is_encrypted: |
print(f"{pdf_file} is encrypted. Skipping.") |
continue |
except: |
print(f"{pdf_file} is not a valid PDF. Skipping.") |
continue |
for page_num in range(len(pdf_reader.pages)): |
pdf_writer.add_page(pdf_reader.pages[page_num]) |
with open(output_path, "wb") as f: |
pdf_writer.write(f) |
def get_pdf(self): |
return self.merged_pdf_path |
def save_indexDB(self,save_path = 'indexDB.json'): |
self.vectorstore.save_local(save_path) |
print("indexDB saved at: ", save_path) |
def create_repo_pdf(self, repo_link, image_included = False, merged_pdf = "temp_merged.pdf"): |
self.merged_pdf_path = merged_pdf |
self.download_repo_zip(repo_link) |
folder_name = self.extract_zip('./main.zip', './') |
ingnore_list = ['__pycache__',] |
if not image_included: |
ingnore_list.append('.jpg') |
ingnore_list.append('.png') |
ingnore_list.append('.jpeg') |
ingnore_list.append('.gif') |
ingnore_list.append('.bmp') |
ingnore_list.append('.tiff') |
print('folder_name: ', folder_name) |
pdf_files = [] |
for root, dirs, files in os.walk(folder_name): |
for file in files: |
input_file = os.path.join(root, file) |
if any(x in input_file for x in ingnore_list): |
continue |
os.makedirs("temp", exist_ok=True) |
output_file = os.path.join("temp", os.path.splitext(file)[0] + ".pdf") |
try: |
self.convert_to_pdf(input_file, output_file) |
except: |
print("Error converting file: ", input_file) |
continue |
pdf_files.append(output_file) |
self.merge_pdfs(pdf_files, self.merged_pdf_path) |
os.remove("main.zip") |
shutil.rmtree(folder_name) |
shutil.rmtree("temp") |
return self.merged_pdf_path |
def Answer_quetsion(self, question): |
return self.qa.run(question) |
def Answer_quetsion_with_source(self, question): |
return self.qa({"question": question}, return_only_outputs = True) |
def call_output(string = 'REPOGPT Initializing'): |
return string |
def download_file(filename = 'merged.pdf'): |
return send_file(filename, as_attachment=True) |
repogpt = REPOGPT() |
with gr.Blocks() as demo: |
with gr.Row(): |
gr.Markdown("<h3><center>REPOGPT</center></h3>") |
gr.Markdown( |
"""This is a demo to the work [REPOGPT](https://github.com/wuchangsheng951/RepoGPT).<br> |
This space connects ChatGPT and RepoGPT is a Python library that allows you to search and answer questions about a GitHub repository's content.<br> |
""" |
) |
with gr.Row(): |
apikey = gr.Textbox( |
placeholder="Paste your OpenAI API key here to start Visual ChatGPT(sk-...) and press Enter ↵️", |
show_label=True, |
label = 'OpenAI API key', |
lines=1, |
type="password", |
) |
with gr.Row(): |
repo_link = gr.Textbox( |
placeholder="Paste your repo_link and press Enter ↵️", |
label = 'repo_link like: https://github.com/wuchangsheng951/RepoGPT', |
show_label=True, |
lines=1, |
) |
with gr.Column(scale=0.7): |
Initialize = gr.Button("Initialize RepoGPT") |
output = gr.Textbox(label="Output Box") |
with gr.Row(visible=False) as input_raws: |
with gr.Column(scale=0.7): |
txt = gr.Textbox(show_label=False, placeholder="Enter your question").style(container=False) |
with gr.Column(scale=0.4): |
AQ = gr.Button("Ask a Question").style(container=False) |
gr.Examples( |
examples=["Whats the name of this repo?", |
"Whats this repo for?", |
"How can I use this. Example code ? Step by step", |
"how can I use this Experiment trackers ? Step by step", |
"how can I Performing gradient accumulation with Accelerate? Step by step?", |
"Make it like water-color painting", |
"What is the background color", |
"Describe this image", |
"please detect the depth of this image", |
"Can you use this depth image to generate a cute dog", |
], |
inputs=txt |
) |
apikey.submit(repogpt.init_agent, [apikey,repo_link], [input_raws, output]) |
Initialize.click(repogpt.init_agent, [apikey,repo_link], [input_raws, output]) |
apikey.submit(call_output, [],[output]) |
txt.submit(repogpt.Answer_quetsion, [txt], [output]) |
AQ.click(repogpt.Answer_quetsion, [txt], [output]) |
demo.launch() |