File size: 8,457 Bytes
07e173c 0b802db |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 |
# function support rag pipeline
from typing import List
from langchain.vectorstores import Chroma
from langchain.retrievers.multi_vector import MultiVectorRetriever
from langchain.storage import InMemoryStore
import uuid
from langchain.document_loaders import TextLoader, DirectoryLoader
import os
from sentence_transformers.cross_encoder import CrossEncoder
import numpy as np
from langchain.schema import BaseRetriever, Document
from typing import List
from langchain.callbacks.manager import CallbackManagerForRetrieverRun
from langchain.vectorstores import VectorStore
from langchain.load import dumps, loads
from typing import Any
def load_data(data_path):
folders = os.listdir(data_path)
dir_loaders = []
loaded_documents = []
for folder in folders:
dir_loader = DirectoryLoader(os.path.join(data_path, folder), loader_cls=TextLoader)
dir_loaders.append(dir_loader)
for dir_loader in dir_loaders:
loaded_documents.extend(dir_loader.load())
return loaded_documents
def process_data(data: List[str], child_text_splitter, embedding, vectorstore_name: str) -> MultiVectorRetriever:
# The vectorstore to use to index the child chunks
vectorstore = Chroma(
collection_name=vectorstore_name,
embedding_function=embedding,
# collection_metadata={"hnsw:space": "cosine"}
)
# The storage layer for the parent documents
store = InMemoryStore()
id_key = "doc_id"
# The retriever (empty to start)
retriever = MultiVectorRetriever(
vectorstore=vectorstore,
docstore=store,
id_key=id_key,
search_kwargs={"k": 25}
)
doc_ids = [str(uuid.uuid4()) for _ in data]
sub_docs = []
for i, doc in enumerate(data):
_id = doc_ids[i]
_sub_docs = child_text_splitter.split_documents([doc])
for _doc in _sub_docs:
_doc.metadata[id_key] = _id
sub_docs.extend(_sub_docs)
retriever.vectorstore.add_documents(sub_docs)
retriever.docstore.mset(list(zip(doc_ids, data)))
return vectorstore, retriever
class CustomRetriever(BaseRetriever):
# vectorstores:Chroma
retriever:Any
def reciprocal_rank_fusion(self, results: list[list], k=60):
""" Reciprocal_rank_fusion that takes multiple lists of ranked documents
and an optional parameter k used in the RRF formula """
# Initialize a dictionary to hold fused scores for each unique document
fused_scores = {}
# Iterate through each list of ranked documents
for docs in results:
# Iterate through each document in the list, with its rank (position in the list)
for rank, doc in enumerate(docs):
# Convert the document to a string format to use as a key (assumes documents can be serialized to JSON)
doc_str = dumps(doc)
# If the document is not yet in the fused_scores dictionary, add it with an initial score of 0
if doc_str not in fused_scores:
fused_scores[doc_str] = 0
# Retrieve the current score of the document, if any
previous_score = fused_scores[doc_str]
# Update the score of the document using the RRF formula: 1 / (rank + k)
fused_scores[doc_str] += 1 / (rank + k)
# Sort the documents based on their fused scores in descending order to get the final reranked results
reranked_results = [
(loads(doc), score)
for doc, score in sorted(fused_scores.items(), key=lambda x: x[1], reverse=True) #[:10] #Top 10
]
# Return the reranked results as a list of tuples, each containing the document and its fused score
rr_list=[]
for doc in reranked_results:
rr_list.append(doc[0])
return rr_list
def _get_relevant_documents(
self, queries: list, *, run_manager: CallbackManagerForRetrieverRun
) -> List[Document]:
# Use your existing retriever to get the documents
documents=[]
for i in range(len(queries)):
document = self.retriever.get_relevant_documents(queries[i], callbacks=run_manager.get_child())
documents.append(document)
unique_documents = self.reciprocal_rank_fusion(documents)
# Get page content
docs_content = []
for i in range(len(unique_documents)):
docs_content.append(unique_documents[i].page_content)
# model = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
# model = CrossEncoder('nnngoc/ms-marco-MiniLM-L-6-v2-641M')
# model = CrossEncoder('nnngoc/ms-marco-MiniLM-L-6-v2-642M-2') *
# model = CrossEncoder('nnngoc/ms-marco-MiniLM-L-6-v2-644M-1')
# model = CrossEncoder('nnngoc/ms-marco-MiniLM-L-6-v2-32-2M-2')
# model = CrossEncoder('nnngoc/ms-marco-MiniLM-L-6-v2-32-5M-1')
model = CrossEncoder('nnngoc/ms-marco-MiniLM-L-6-v2-32-6M-1')
# So we create the respective sentence combinations
sentence_combinations = [[queries[0], document] for document in docs_content]
# Compute the similarity scores for these combinations
similarity_scores = model.predict(sentence_combinations)
# Sort the scores in decreasing order
sim_scores_argsort = reversed(np.argsort(similarity_scores))
# Store the rerank document in new list
docs = []
for idx in sim_scores_argsort:
docs.append(unique_documents[idx])
docs_top_10 = docs[0:10]
return docs_top_10
import cohere
COHERE_API_KEY = 'axMzubIv9l3UTObYnIaHuZhE6tR3Nj8eGReXTws9'
class CustomRetriever1(BaseRetriever):
# vectorstores:Chroma
retriever:Any
def reciprocal_rank_fusion(self, results: list[list], k=60):
""" Reciprocal_rank_fusion that takes multiple lists of ranked documents
and an optional parameter k used in the RRF formula """
# Initialize a dictionary to hold fused scores for each unique document
fused_scores = {}
# Iterate through each list of ranked documents
for docs in results:
# Iterate through each document in the list, with its rank (position in the list)
for rank, doc in enumerate(docs):
# Convert the document to a string format to use as a key (assumes documents can be serialized to JSON)
doc_str = dumps(doc)
# If the document is not yet in the fused_scores dictionary, add it with an initial score of 0
if doc_str not in fused_scores:
fused_scores[doc_str] = 0
# Retrieve the current score of the document, if any
previous_score = fused_scores[doc_str]
# Update the score of the document using the RRF formula: 1 / (rank + k)
fused_scores[doc_str] += 1 / (rank + k)
# Sort the documents based on their fused scores in descending order to get the final reranked results
reranked_results = [
(loads(doc), score)
for doc, score in sorted(fused_scores.items(), key=lambda x: x[1], reverse=True) #[:10] #Top 10
]
# Return the reranked results as a list of tuples, each containing the document and its fused score
rr_list=[]
for doc in reranked_results:
rr_list.append(doc[0])
return rr_list[:30]
def _get_relevant_documents(
self, queries: list, *, run_manager: CallbackManagerForRetrieverRun
) -> List[Document]:
# Use your existing retriever to get the documents
documents=[]
for i in range(len(queries)):
document = self.retriever.get_relevant_documents(queries[i], callbacks=run_manager.get_child())
documents.append(document)
unique_documents = self.reciprocal_rank_fusion(documents)
# Get page content
docs_content = []
for i in range(len(unique_documents)):
docs_content.append(unique_documents[i].page_content)
co = cohere.Client(COHERE_API_KEY)
results = co.rerank(query=queries[0], documents=docs_content, top_n=10, model='rerank-multilingual-v3.0', return_documents=True)
reranked_indices = [result.index for result in results.results]
sorted_documents = [unique_documents[idx] for idx in reranked_indices]
return sorted_documents |