|
import os |
|
import re |
|
import time |
|
import pandas as pd |
|
import streamlit as st |
|
|
|
from openfactcheck.base import OpenFactCheck |
|
from openfactcheck.app.utils import metric_card |
|
|
|
|
|
def extract_text(claim): |
|
""" |
|
Extracts text from a claim that might be a string formatted as a dictionary. |
|
""" |
|
|
|
match = re.search(r"'text': '([^']+)'", claim) |
|
if match: |
|
return match.group(1) |
|
return claim |
|
|
|
|
|
|
|
def evaluate_response(ofc: OpenFactCheck): |
|
""" |
|
This function creates a Streamlit app to evaluate the factuality of a LLM response. |
|
""" |
|
|
|
|
|
response_evaluator = ofc.ResponseEvaluator |
|
|
|
|
|
st.session_state.claimprocessors = ofc.list_claimprocessors() |
|
st.session_state.retrievers = ofc.list_retrievers() |
|
st.session_state.verifiers = ofc.list_verifiers() |
|
|
|
st.write("This is where you can check factuality of a LLM response.") |
|
|
|
|
|
st.write("Customize FactChecker") |
|
|
|
|
|
col1, col2, col3 = st.columns(3) |
|
with col1: |
|
if "claimprocessor" not in st.session_state: |
|
st.session_state.claimprocessor = st.selectbox( |
|
"Select Claim Processor", list(st.session_state.claimprocessors) |
|
) |
|
else: |
|
st.session_state.claimprocessor = st.selectbox( |
|
"Select Claim Processor", |
|
list(st.session_state.claimprocessors), |
|
index=list(st.session_state.claimprocessors).index(st.session_state.claimprocessor), |
|
) |
|
with col2: |
|
if "retriever" not in st.session_state: |
|
st.session_state.retriever = st.selectbox("Select Retriever", list(st.session_state.retrievers)) |
|
else: |
|
st.session_state.retriever = st.selectbox( |
|
"Select Retriever", |
|
list(st.session_state.retrievers), |
|
index=list(st.session_state.retrievers).index(st.session_state.retriever), |
|
) |
|
with col3: |
|
if "verifier" not in st.session_state: |
|
st.session_state.verifier = st.selectbox("Select Verifier", list(st.session_state.verifiers)) |
|
else: |
|
st.session_state.verifier = st.selectbox( |
|
"Select Verifier", |
|
list(st.session_state.verifiers), |
|
index=list(st.session_state.verifiers).index(st.session_state.verifier), |
|
) |
|
|
|
|
|
if "input_text" not in st.session_state: |
|
st.session_state.input_text = { |
|
"text": st.text_area("Enter LLM response here", "This is a sample LLM response.") |
|
} |
|
else: |
|
st.session_state.input_text = { |
|
"text": st.text_area("Enter LLM response here", st.session_state.input_text["text"]) |
|
} |
|
|
|
|
|
if st.button("Check Factuality"): |
|
with st.status("Checking factuality...", expanded=True) as status: |
|
|
|
st.write("Configuring pipeline...") |
|
ofc.init_pipeline_manually( |
|
[st.session_state.claimprocessor, st.session_state.retriever, st.session_state.verifier] |
|
) |
|
st.write("Pipeline configured...") |
|
|
|
|
|
st.write("Evaluating response...") |
|
|
|
response = response_evaluator.evaluate_streaming(st.session_state.input_text) |
|
st.write("Response evaluated...") |
|
|
|
status.update(label="Factuality checked...", state="complete", expanded=False) |
|
|
|
|
|
pipeline_str = " ┈➤ ".join( |
|
[st.session_state.claimprocessor, st.session_state.retriever, st.session_state.verifier] |
|
) |
|
st.info(f"""**Pipeline**: \n{pipeline_str}""") |
|
|
|
|
|
st.session_state.final_response = None |
|
|
|
col1, col2 = st.columns([3, 1]) |
|
with col1: |
|
|
|
def process_stream(responses): |
|
""" |
|
Process each response from the stream as a simulated chat output. |
|
This function yields each word from the formatted text of the response, |
|
adding a slight delay to simulate typing in a chat. |
|
""" |
|
|
|
for response in responses: |
|
if "claimprocessor" in response["solver_name"]: |
|
|
|
output_text = response["output"] |
|
|
|
|
|
detected_claims = output_text.get("claims", []) |
|
|
|
|
|
formatted_text = "### Detected Claims\n" |
|
formatted_text += "\n".join( |
|
f"{i}. {extract_text(claim)}" for i, claim in enumerate(detected_claims, start=1) |
|
) |
|
formatted_text += "\n" |
|
|
|
with col2: |
|
metric_card(label="Detected Claims", value=len(detected_claims)) |
|
|
|
|
|
for word in formatted_text.split(" "): |
|
yield word + " " |
|
time.sleep(0.01) |
|
|
|
st.session_state.claimprocessor_flag = True |
|
|
|
elif "retriever" in response["solver_name"]: |
|
|
|
output_text = response["output"] |
|
|
|
questions = [] |
|
evidences = [] |
|
for _, claim_with_evidences in output_text.get("claims_with_evidences", {}).items(): |
|
for claim_with_evidence in claim_with_evidences: |
|
questions.append(claim_with_evidence[0]) |
|
evidences.append(claim_with_evidence[1]) |
|
|
|
with col2: |
|
metric_card(label="Retrieved Evidences", value=len(evidences)) |
|
|
|
elif "verifier" in response["solver_name"]: |
|
|
|
output_text = response["output"] |
|
|
|
|
|
details = output_text.get("detail", None) |
|
if details is None: |
|
detail_text = "The verifier did not provide any detail. Please use other verifiers for more information." |
|
else: |
|
detail_text = "" |
|
|
|
|
|
claims = 0 |
|
false_claims = 0 |
|
true_claims = 0 |
|
controversial_claims = 0 |
|
unverified_claims = 0 |
|
for i, detail in enumerate(details): |
|
|
|
factuality = str(detail.get("factuality", None)) |
|
if factuality is not None: |
|
claim = detail.get("claim", "") |
|
if factuality == "-1" or factuality == "False": |
|
detail_text += f'##### :red[{str(i+1) + ". " + extract_text(claim)}]' |
|
detail_text += "\n" |
|
claims += 1 |
|
false_claims += 1 |
|
elif factuality == "1" or factuality == "True": |
|
detail_text += f'##### :green[{str(i+1) + ". " + extract_text(claim)}]' |
|
detail_text += "\n" |
|
claims += 1 |
|
true_claims += 1 |
|
elif factuality == "0": |
|
detail_text += f'##### :orange[{str(i+1) + ". " + extract_text(claim)}]' |
|
detail_text += "\n" |
|
claims += 1 |
|
controversial_claims += 1 |
|
else: |
|
detail_text += f'##### :purple[{str(i+1) + ". " + extract_text(claim)}]' |
|
detail_text += "\n" |
|
claims += 1 |
|
unverified_claims += 1 |
|
else: |
|
st.error("Factuality not found in the verifier output.") |
|
|
|
|
|
if detail.get("error", None) != "None": |
|
detail_text += f"- **Error**: {detail.get('error', '')}" |
|
detail_text += "\n" |
|
|
|
|
|
if detail.get("reasoning", None) != "None": |
|
detail_text += f"- **Reasoning**: {detail.get('reasoning', '')}" |
|
detail_text += "\n" |
|
|
|
|
|
if detail.get("correction", None) != "": |
|
detail_text += f"- **Correction**: {detail.get('correction', '')}" |
|
detail_text += "\n" |
|
|
|
|
|
if detail.get("evidences", None) != "": |
|
evidence_text = "" |
|
questions_evidences = {} |
|
for evidence in detail.get("evidences", []): |
|
if evidence[0] not in questions_evidences: |
|
questions_evidences[evidence[0]] = [] |
|
questions_evidences[evidence[0]].append(evidence[1]) |
|
for question, evidences in questions_evidences.items(): |
|
evidence_text += f"- **Evidences against Question**: :orange[{question}]" |
|
evidence_text += "\n" |
|
for evidence in evidences: |
|
evidence_text += f" - {evidence}\n" |
|
detail_text += evidence_text |
|
|
|
|
|
formatted_text = "### Factuality Detail\n" |
|
formatted_text += "Factuality of each claim is color-coded (:red[red means false], :green[green means true], :orange[orange means controversial], :violet[violet means unverified]).\n" |
|
formatted_text += f"{detail_text}\n" |
|
formatted_text += "\n" |
|
|
|
|
|
with col2: |
|
metric_card( |
|
label="Supported Claims", |
|
value=true_claims, |
|
background_color="#D1ECF1", |
|
border_left_color="#17A2B8", |
|
) |
|
metric_card( |
|
label="Conflicted Claims", |
|
value=false_claims, |
|
background_color="#D1ECF1", |
|
border_left_color="#17A2B8", |
|
) |
|
metric_card( |
|
label="Controversial Claims", |
|
value=controversial_claims, |
|
background_color="#D1ECF1", |
|
border_left_color="#17A2B8", |
|
) |
|
metric_card( |
|
label="Unverified Claims", |
|
value=unverified_claims, |
|
background_color="#D1ECF1", |
|
border_left_color="#17A2B8", |
|
) |
|
|
|
|
|
overall_factuality = output_text.get("label", "Unknown") |
|
with col2: |
|
with st.container(): |
|
if overall_factuality: |
|
metric_card( |
|
label="Overall Factuality", |
|
value="True", |
|
background_color="#D4EDDA", |
|
border_left_color="#28A745", |
|
) |
|
elif not overall_factuality: |
|
metric_card( |
|
label="Overall Factuality", |
|
value="False", |
|
background_color="#F8D7DA", |
|
border_left_color="#DC3545", |
|
) |
|
|
|
|
|
overall_credibility = true_claims / claims if claims > 0 else 0 |
|
with col2: |
|
if overall_credibility > 0.75 and overall_credibility <= 1: |
|
|
|
metric_card( |
|
label="Overall Credibility", |
|
value=f"{overall_credibility:.2%}", |
|
background_color="#D4EDDA", |
|
border_left_color="#28A745", |
|
) |
|
elif overall_credibility > 0.25 and overall_credibility <= 0.75: |
|
|
|
metric_card( |
|
label="Overall Credibility", |
|
value=f"{overall_credibility:.2%}", |
|
background_color="#FFF3CD", |
|
border_left_color="#FFC107", |
|
) |
|
else: |
|
|
|
metric_card( |
|
label="Overall Credibility", |
|
value=f"{overall_credibility:.2%}", |
|
background_color="#F8D7DA", |
|
border_left_color="#DC3545", |
|
) |
|
|
|
|
|
for word in formatted_text.split(" "): |
|
yield word + " " |
|
time.sleep(0.01) |
|
|
|
st.write_stream(process_stream(response)) |
|
|