Spaces:
Runtime error
Runtime error
import streamlit as st | |
from annotated_text import annotated_text, annotation | |
import fitz | |
import os | |
import chromadb | |
import uuid | |
from pathlib import Path | |
import os | |
os.environ['OPENAI_API_KEY'] = os.environ['OPEN_API_KEY'] | |
st.title("Contracts Multiple File Search ") | |
import pandas as pd | |
import time | |
from langchain.retrievers import BM25Retriever, EnsembleRetriever | |
from langchain.schema import Document | |
from langchain.vectorstores import Chroma | |
from langchain.embeddings import HuggingFaceEmbeddings | |
embedding = HuggingFaceEmbeddings(model_name='BAAI/bge-base-en-v1.5') | |
from FlagEmbedding import FlagReranker | |
reranker = FlagReranker('BAAI/bge-reranker-base') | |
import spacy | |
# Load the English model from SpaCy | |
from spacy_download import load_spacy | |
nlp = load_spacy("en_core_web_sm") | |
def util_upload_file_and_return_list_docs(uploaded_files): | |
#util_del_cwd() | |
list_docs = [] | |
list_save_path = [] | |
for uploaded_file in uploaded_files: | |
save_path = Path(os.getcwd(), uploaded_file.name) | |
with open(save_path, mode='wb') as w: | |
w.write(uploaded_file.getvalue()) | |
#print('save_path:', save_path) | |
docs = fitz.open(save_path) | |
list_docs.append(docs) | |
list_save_path.append(save_path) | |
return(list_docs, list_save_path) | |
#### Helper Functions to Split using Rolling Window (recomm : use smaller rolling window ) | |
def split_txt_file_synthetic_sentence_rolling(ctxt, sentence_size_in_chars, sliding_size_in_chars,debug=False): | |
sliding_size_in_chars = sentence_size_in_chars - sliding_size_in_chars | |
pos_start = 0 | |
pos_end = len(ctxt) | |
final_return = [] | |
if(debug): | |
print('pos_start : ',pos_start) | |
print('pos_end : ',pos_end) | |
if(pos_end<sentence_size_in_chars): | |
return([{'section_org_text':ctxt[pos_start:pos_end],'section_char_start':pos_start,'section_char_end':pos_end}]) | |
if(sentence_size_in_chars<sliding_size_in_chars): | |
return(None) | |
stop_condition = False | |
start = pos_start | |
end = start + sentence_size_in_chars | |
mydict = {} | |
mydict['section_org_text'] = ctxt[start:end] | |
mydict['section_char_start'] = start | |
mydict['section_char_end'] = end | |
final_return.append(mydict) | |
#### First Time ENDS | |
while(stop_condition==False): | |
start = end - sliding_size_in_chars | |
end = start + sentence_size_in_chars | |
if(end>pos_end): | |
if(start<pos_end): | |
end = pos_end | |
mydict = {} | |
mydict['section_org_text'] = ctxt[start:end] | |
mydict['section_char_start'] = start | |
mydict['section_char_end'] = end | |
final_return.append(mydict) | |
stop_condition=True | |
else: | |
stop_condition=True | |
else: | |
mydict = {} | |
mydict['section_org_text'] = ctxt[start:end] | |
mydict['section_char_start'] = start | |
mydict['section_char_end'] = end | |
final_return.append(mydict) | |
if(debug): | |
print('start : ', start) | |
print('end : ', end) | |
return(final_return) | |
### helper to make string out of iw_status | |
# def util_get_list_page_and_passage(docs): | |
# page_documents = [] | |
# passage_documents = [] | |
# for txt_index, txt_page in enumerate(docs): | |
# page_document = txt_page.get_text()##.encode("utf8") # get plain text (is in UTF-8) | |
# page_documents.append(page_document) | |
# sections = split_txt_file_synthetic_sentence_rolling(page_document,700,200) | |
# for sub_sub_index, sub_sub_item in enumerate(sections): | |
# sub_text=sub_sub_item['section_org_text'] | |
# passage_document = Document(page_content=sub_text, metadata={"page_index": txt_index}) | |
# passage_documents.append(passage_document) | |
# return(page_documents,passage_documents) | |
def split_into_sentences_with_offsets(text): | |
""" | |
Splits a paragraph into sentences and returns them along with their start and end offsets. | |
:param text: The input text to be split into sentences. | |
:return: A list of tuples, each containing a sentence and its start and end offsets. | |
""" | |
doc = nlp(text) | |
return [(sent.text, sent.start_char, sent.end_char) for sent in doc.sents] | |
def util_get_list_page_and_passage(list_docs, list_save_path): | |
#page_documents = [] | |
passage_documents = [] | |
for ind_doc, docs in enumerate(list_docs): | |
for txt_index, txt_page in enumerate(docs): | |
page_document = txt_page.get_text()##.encode("utf8") # get plain text (is in UTF-8) | |
#page_documents.append(page_document) | |
sections = split_into_sentences_with_offsets(page_document) | |
for sub_sub_index, sub_sub_item in enumerate(sections): | |
sub_text=sub_sub_item[0] | |
passage_document = Document(page_content=sub_text, metadata={"page_content": page_document,"page_index": txt_index, "file_name" : str(list_save_path[ind_doc])}) | |
passage_documents.append(passage_document) | |
return(passage_documents) | |
# def util_index_chromadb_passages(): | |
# ##### PROCESSING | |
# # create client and a new collection | |
# collection_name = str(uuid.uuid4().hex) | |
# chroma_client = chromadb.EphemeralClient() | |
# chroma_collection = chroma_client.get_or_create_collection(collection_name) | |
# # define embedding function | |
# embed_model = LangchainEmbedding(HuggingFaceEmbeddings(model_name="BAAI/bge-small-en")) | |
# vector_store = ChromaVectorStore(chroma_collection=chroma_collection) | |
# return(chroma_client,chroma_collection,collection_name,vector_store,embed_model) | |
def util_get_only_content_inside_loop(page_no,page_documents): | |
for index, item in enumerate(page_documents): | |
if(page_documents[index].metadata['txt_page_index']==page_no): | |
return(page_documents[index].get_content()) | |
return(None) | |
# def util_get_list_pageno_and_contents(page_documents,passage_documents,passage_nodes): | |
# ''' page no starts with index 1 ''' | |
# return_value = [] | |
# for index, item in enumerate(passage_nodes): | |
# page_no = passage_nodes[index].metadata['txt_page_index'] | |
# page_content = util_get_only_content_inside_loop(page_no,page_documents) | |
# return_value.append((page_no+1,page_content)) | |
# return(return_value) | |
def util_get_list_pageno_and_contents(some_query_passage,passage_documents,passage_nodes): | |
''' page no starts with index 1 ''' | |
return_value = [] | |
rescore = reranker.compute_score([[some_query_passage , x.page_content] for x in passage_nodes]) | |
print('rescore :: ',rescore) | |
tmp_array = [] | |
for i, x in enumerate(passage_nodes): | |
tmp_dict = {"passage_content":x.page_content, | |
"page_no":int(x.metadata['page_index'])+1, | |
"page_content":str(x.metadata['page_content']), | |
"file_name": str(x.metadata['file_name']), | |
"score" : float(rescore[i])} | |
tmp_array.append(tmp_dict) | |
df = pd.DataFrame(tmp_array) | |
df = df.sort_values(by='score', ascending=False) | |
df = df.drop_duplicates(subset=['file_name'], keep='first') | |
df = df[["passage_content","file_name","page_no","page_content"]] | |
return(df) | |
# # def util_openai_extract_entity(example_passage, example_entity, page_content): | |
# # import openai | |
# # openai.api_key = os.environ['OPENAI_API_KEY'] | |
# # content = """Find the Entity based on Text . Return empty string if Entity does not exists. Here is one example below | |
# # Text: """ + example_passage + """ | |
# # Entity: """ + example_entity + """ | |
# # Text: """ + page_content + """ | |
# # Entity: """ | |
# # return_value = openai.ChatCompletion.create(model="gpt-4",temperature=0.0001,messages=[{"role": "user", "content": content},]) | |
# # return(str(return_value['choices'][0]['message']['content'])) | |
def util_openai_extract_clause(example_prompt, page_content): | |
import openai | |
openai.api_key = os.environ['OPENAI_API_KEY'] | |
content = example_prompt | |
content = content + "\n Answer precisely; do not add anything extra, and try to locate the answer in the below context \n context: " | |
return_value = openai.ChatCompletion.create(model="gpt-4o-mini",temperature=0.0001,messages=[{"role": "user", "content": content + "\n" + page_content},]) | |
return(str(return_value['choices'][0]['message']['content'])) | |
def util_openai_hyde(example_prompt): | |
import openai | |
openai.api_key = os.environ['OPENAI_API_KEY'] | |
content = example_prompt | |
return_value = openai.ChatCompletion.create(model="gpt-4o-mini",temperature=0.0001,messages=[ | |
{"role": "system", "content": "You are a legal contract lawyer. generate a summary from below text " + "\n"}, | |
{"role": "user", "content": example_prompt + "\n"}, | |
] | |
) | |
return(str(return_value['choices'][0]['message']['content'])) | |
def util_openai_format (example_passage, page_content): | |
''' | |
annotated_text(" ",annotation("ENTITY : ", str(page_no)),) | |
''' | |
if(True): | |
found_value = util_openai_extract_clause(example_passage, page_content) | |
if(len(found_value)>0): | |
found_value = found_value.strip() | |
first_index = page_content.find(found_value) | |
if(first_index!=-1): | |
print('first_index : ',first_index) | |
print('found_value : ',found_value) | |
return(annotated_text(page_content[0:first_index-1],annotation(found_value, " FOUND ENTITY "),page_content[first_index+len(found_value):])) | |
return(annotated_text(page_content)) | |
def util_openai_modify_prompt(example_prompt, page_content): | |
import openai | |
openai.api_key = os.environ['OPENAI_API_KEY'] | |
my_prompt = """Expand the original Query to show exact resuls for extraction\n | |
Query: """ + example_prompt # + """\nDocument: """ + page_content + """ """ | |
return_value = openai.ChatCompletion.create(model="gpt-4o-mini",temperature=0.0001,messages=[{"role": "user", "content": my_prompt},]) | |
return(str(return_value['choices'][0]['message']['content'])) | |
# def create_bm25_page_rank(page_list_retrieve, page_query): | |
# """ page_corpus : array of page text , page_query is user query """ | |
# from operator import itemgetter | |
# from rank_bm25 import BM25Okapi | |
# tokenized_corpus = [doc.split(" ") for x, doc in page_list_retrieve] | |
# tokenized_query = page_query.split(" ") | |
# bm25 = BM25Okapi(tokenized_corpus) | |
# doc_scores = bm25.get_scores(tokenized_query).tolist() | |
# tmp_list = [] | |
# for index, item in enumerate(page_list_retrieve): | |
# tmp_list.append((item[0], item[1],doc_scores[index])) | |
# tmp_list = sorted(tmp_list, key=itemgetter(2), reverse=True) | |
# return(tmp_list) | |
passage_documents = [] | |
with st.form("my_form"): | |
multi = '''1. Download and Upload Multiple contracts | |
e.g. https://www.barc.gov.in/tenders/GCC-LPS.pdf | |
e.g. https://www.montrosecounty.net/DocumentCenter/View/823/Sample-Construction-Contract | |
''' | |
st.markdown(multi) | |
multi = '''2. Insert Query to search or find similar language ''' | |
st.markdown(multi) | |
multi = '''3. Press Index.''' | |
st.markdown(multi) | |
multi = ''' | |
** Attempt is made for appropriate page and passage retrieval ** \n | |
''' | |
st.markdown(multi) | |
#uploaded_file = st.file_uploader("Choose a file") | |
list_docs = [] | |
list_save_path = [] | |
uploaded_files = st.file_uploader("Choose file(s)", accept_multiple_files=True) | |
print('uploaded_files ', uploaded_files) | |
single_example_passage = st.text_area('Enter Query Here',"What is Governing Law ") | |
submitted = st.form_submit_button("Index and Calculate") | |
if submitted and (uploaded_files is not None): | |
list_docs, list_save_path = util_upload_file_and_return_list_docs(uploaded_files) | |
# print('list_docs ' ,list_docs) | |
# print('list_save_path ' , list_save_path) | |
passage_documents = util_get_list_page_and_passage(list_docs, list_save_path) | |
bm25_retriever = BM25Retriever.from_documents(passage_documents) | |
bm25_retriever.k = 2 | |
chroma_vectorstore = Chroma.from_documents(passage_documents, embedding) | |
chroma_retriever = chroma_vectorstore.as_retriever(search_kwargs={"k": 2}) | |
ensemble_retriever = EnsembleRetriever(retrievers=[bm25_retriever, chroma_retriever],weights=[0.25, 0.75]) | |
passage_nodes = ensemble_retriever.get_relevant_documents(single_example_passage) | |
print('len(passage_nodes):', len(passage_nodes)) | |
time.sleep(5) | |
df = util_get_list_pageno_and_contents(single_example_passage,passage_documents,passage_nodes) | |
st.write(df) | |
# print('len(page_list_retrieve):', len(page_list_retrieve)) | |
# if(len(page_list_retrieve)>0): | |
# page_list_retrieve = list(set(page_list_retrieve)) | |
# for iindex in page_list_retrieve: | |
# page_no = iindex[0] | |
# page_content = iindex[1] | |
# annotated_text(" ",annotation("RELEVANT PAGENO : ", str(page_no), font_family="Comic Sans MS", border="2px dashed red"),) | |
# util_openai_format(single_example_passage, page_content) | |
# annotated_text(" ",annotation("RELEVANT PASSAGE : ", "", font_family="Comic Sans MS", border="2px dashed red"),) | |
# st.write(found_passage) | |
# pchroma_client = chromadb.Client() | |
# for citem in pchroma_client.list_collections(): | |
# print(citem.name) | |