|
import sys |
|
import os |
|
from contextlib import contextmanager |
|
|
|
from ..reranker import rerank_docs |
|
from ..retriever import ClimateQARetriever |
|
|
|
|
|
|
|
def divide_into_parts(target, parts): |
|
|
|
base = target // parts |
|
|
|
remainder = target % parts |
|
|
|
result = [] |
|
|
|
for i in range(parts): |
|
if i < remainder: |
|
|
|
result.append(base + 1) |
|
else: |
|
|
|
result.append(base) |
|
|
|
return result |
|
|
|
|
|
@contextmanager |
|
def suppress_output(): |
|
|
|
with open(os.devnull, 'w') as devnull: |
|
|
|
old_stdout = sys.stdout |
|
old_stderr = sys.stderr |
|
|
|
sys.stdout = devnull |
|
sys.stderr = devnull |
|
try: |
|
yield |
|
finally: |
|
|
|
sys.stdout = old_stdout |
|
sys.stderr = old_stderr |
|
|
|
|
|
|
|
def make_retriever_node(vectorstore,reranker,rerank_by_question=True, k_final=15, k_before_reranking=100, k_summary=5): |
|
|
|
def retrieve_documents(state): |
|
|
|
POSSIBLE_SOURCES = ["IPCC","IPBES","IPOS","OpenAlex"] |
|
questions = state["questions"] |
|
|
|
|
|
if "sources_input" not in state or state["sources_input"] is None: |
|
sources_input = ["auto"] |
|
else: |
|
sources_input = state["sources_input"] |
|
auto_mode = "auto" in sources_input |
|
|
|
|
|
|
|
|
|
if rerank_by_question: |
|
k_by_question = divide_into_parts(k_final,len(questions)) |
|
|
|
docs = [] |
|
|
|
for i,q in enumerate(questions): |
|
|
|
sources = q["sources"] |
|
question = q["question"] |
|
|
|
|
|
if auto_mode: |
|
sources = [x for x in sources if x in POSSIBLE_SOURCES] |
|
|
|
|
|
else: |
|
sources = sources_input |
|
|
|
|
|
|
|
retriever = ClimateQARetriever( |
|
vectorstore=vectorstore, |
|
sources = sources, |
|
|
|
min_size = 200, |
|
k_summary = k_summary,k_total = k_before_reranking, |
|
threshold = 0.5, |
|
) |
|
docs_question = retriever.get_relevant_documents(question) |
|
|
|
|
|
if reranker is not None: |
|
with suppress_output(): |
|
docs_question = rerank_docs(reranker,docs_question,question) |
|
else: |
|
|
|
for doc in docs_question: |
|
doc.metadata["reranking_score"] = doc.metadata["similarity_score"] |
|
|
|
|
|
if rerank_by_question: |
|
docs_question = docs_question[:k_by_question[i]] |
|
|
|
|
|
for doc in docs_question: |
|
doc.metadata["sources_used"] = sources |
|
|
|
|
|
docs.extend(docs_question) |
|
|
|
|
|
|
|
docs = sorted(docs, key=lambda x: x.metadata["reranking_score"], reverse=True) |
|
docs = docs[:k_final] |
|
|
|
new_state = {"documents":docs} |
|
return new_state |
|
|
|
return retrieve_documents |
|
|
|
|