|
import re |
|
import time |
|
import pandas as pd |
|
import streamlit as st |
|
|
|
from openfactcheck.core.base import OpenFactCheck |
|
from openfactcheck.app.utils import metric_card |
|
|
|
def extract_text(claim): |
|
""" |
|
Extracts text from a claim that might be a string formatted as a dictionary. |
|
""" |
|
|
|
match = re.search(r"'text': '([^']+)'", claim) |
|
if match: |
|
return match.group(1) |
|
return claim |
|
|
|
|
|
def evaluate_response(ofc: OpenFactCheck): |
|
""" |
|
This function creates a Streamlit app to evaluate the factuality of a LLM response. |
|
""" |
|
|
|
|
|
st.session_state.claimprocessors = ofc.list_claimprocessors() |
|
st.session_state.retrievers = ofc.list_retrievers() |
|
st.session_state.verifiers = ofc.list_verifiers() |
|
|
|
st.write("This is where you can check factuality of a LLM response.") |
|
|
|
|
|
st.write("Customize FactChecker") |
|
|
|
|
|
col1, col2, col3 = st.columns(3) |
|
with col1: |
|
if "claimprocessor" not in st.session_state: |
|
st.session_state.claimprocessor = st.selectbox("Select Claim Processor", list(st.session_state.claimprocessors)) |
|
else: |
|
st.session_state.claimprocessor = st.selectbox("Select Claim Processor", list(st.session_state.claimprocessors), index=list(st.session_state.claimprocessors).index(st.session_state.claimprocessor)) |
|
with col2: |
|
if "retriever" not in st.session_state: |
|
st.session_state.retriever = st.selectbox("Select Retriever", list(st.session_state.retrievers)) |
|
else: |
|
st.session_state.retriever = st.selectbox("Select Retriever", list(st.session_state.retrievers), index=list(st.session_state.retrievers).index(st.session_state.retriever)) |
|
with col3: |
|
if "verifier" not in st.session_state: |
|
st.session_state.verifier = st.selectbox("Select Verifier", list(st.session_state.verifiers)) |
|
else: |
|
st.session_state.verifier = st.selectbox("Select Verifier", list(st.session_state.verifiers), index=list(st.session_state.verifiers).index(st.session_state.verifier)) |
|
|
|
|
|
if "input_text" not in st.session_state: |
|
st.session_state.input_text = {"text": st.text_area("Enter LLM response here", "This is a sample LLM response.")} |
|
else: |
|
st.session_state.input_text = {"text": st.text_area("Enter LLM response here", st.session_state.input_text["text"])} |
|
|
|
|
|
if st.button("Check Factuality"): |
|
with st.status("Checking factuality...", expanded=True) as status: |
|
|
|
st.write("Configuring pipeline...") |
|
ofc.init_pipeline_manually([st.session_state.claimprocessor, st.session_state.retriever, st.session_state.verifier]) |
|
st.write("Pipeline configured...") |
|
|
|
|
|
st.write("Evaluating response...") |
|
|
|
response = ofc(st.session_state.input_text, stream=True) |
|
st.write("Response evaluated...") |
|
|
|
status.update(label="Factuality checked...", state="complete", expanded=False) |
|
|
|
|
|
pipeline_str = " ┈➤ ".join([st.session_state.claimprocessor, st.session_state.retriever, st.session_state.verifier]) |
|
st.info(f"""**Pipeline**: \n{pipeline_str}""") |
|
|
|
|
|
st.session_state.final_response = None |
|
|
|
col1, col2 = st.columns([3, 1]) |
|
with col1: |
|
def process_stream(responses): |
|
""" |
|
Process each response from the stream as a simulated chat output. |
|
This function yields each word from the formatted text of the response, |
|
adding a slight delay to simulate typing in a chat. |
|
""" |
|
|
|
for response in responses: |
|
if "claimprocessor" in response["solver_name"]: |
|
|
|
output_text = response["output"] |
|
|
|
|
|
detected_claims = output_text.get("claims", []) |
|
|
|
|
|
formatted_text = "### Detected Claims\n" |
|
formatted_text += "\n".join(f"{i}. {extract_text(claim)}" for i, claim in enumerate(detected_claims, start=1)) |
|
formatted_text += "\n" |
|
|
|
with col2: |
|
metric_card(label="Detected Claims", value=len(detected_claims)) |
|
|
|
|
|
for word in formatted_text.split(" "): |
|
yield word + " " |
|
time.sleep(0.01) |
|
|
|
st.session_state.claimprocessor_flag = True |
|
|
|
elif "retriever" in response["solver_name"]: |
|
|
|
output_text = response["output"] |
|
|
|
evidences = [] |
|
for _, claim_with_evidences in output_text.get("claims_with_evidences", {}).items(): |
|
for evidence in claim_with_evidences: |
|
evidences.append(evidence[1]) |
|
|
|
|
|
|
|
|
|
|
|
|
|
with col2: |
|
metric_card(label="Retrieved Evidences", value=len(evidences)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
elif "verifier" in response["solver_name"]: |
|
|
|
output_text = response["output"] |
|
|
|
|
|
details = output_text.get("detail", None) |
|
if details is None: |
|
detail_text = "The verifier did not provide any detail. Please use other verifiers for more information." |
|
else: |
|
detail_text = "" |
|
|
|
|
|
claims=0 |
|
false_claims = 0 |
|
true_claims = 0 |
|
for i, detail in enumerate(details): |
|
if detail.get("factuality", None) is not None: |
|
claim=detail.get("claim", "") |
|
if detail.get("factuality", None) == -1: |
|
detail_text += f'##### :red[{str(i+1) + ". " + extract_text(claim)}]' |
|
detail_text += "\n" |
|
claims += 1 |
|
false_claims += 1 |
|
elif detail.get("factuality", None) == 1: |
|
detail_text += f'##### :green[{str(i+1) + ". " + extract_text(claim)}]' |
|
detail_text += "\n" |
|
claims += 1 |
|
true_claims += 1 |
|
else: |
|
detail_text += f'##### :yellow[{str(i+1) + ". " + extract_text(claim)}]' |
|
detail_text += "\n" |
|
claims += 1 |
|
else: |
|
st.error("Factuality not found in the verifier output.") |
|
|
|
|
|
if detail.get("error", None) is not "None": |
|
detail_text += f"- **Error**: {detail.get('error', '')}" |
|
detail_text += "\n" |
|
|
|
|
|
if detail.get("reasoning", None) is not "None": |
|
detail_text += f"- **Reasoning**: {detail.get('reasoning', '')}" |
|
detail_text += "\n" |
|
|
|
|
|
if detail.get("correction", None) is not "": |
|
detail_text += f"- **Correction**: {detail.get('correction', '')}" |
|
detail_text += "\n" |
|
|
|
|
|
if detail.get("evidence", None) is not "": |
|
evidence_text = "" |
|
for evidence in detail.get("evidences", []): |
|
evidence_text += f" - {evidence[1]}" |
|
evidence_text += "\n" |
|
detail_text += f"- **Evidence**:\n{evidence_text}" |
|
|
|
|
|
|
|
formatted_text = "### Factuality Detail\n" |
|
formatted_text += "Factuality of each claim is color-coded (red:[red means false], green:[green means true], yellow:[yellow means unknown]) as follows:\n" |
|
formatted_text += f"{detail_text}\n" |
|
formatted_text += "\n" |
|
|
|
|
|
with col2: |
|
metric_card(label="Supported Claims", value=true_claims, background_color="#D1ECF1", border_left_color="#17A2B8") |
|
metric_card(label="Conflicted Claims", value=false_claims, background_color="#D1ECF1", border_left_color="#17A2B8") |
|
|
|
|
|
overall_factuality = output_text.get("label", "Unknown") |
|
with col2: |
|
with st.container(): |
|
if overall_factuality == True: |
|
metric_card(label="Overall Factuality", value="True", background_color="#D4EDDA", border_left_color="#28A745") |
|
elif overall_factuality == False: |
|
metric_card(label="Overall Factuality", value="False", background_color="#F8D7DA", border_left_color="#DC3545") |
|
|
|
|
|
overall_credibility = true_claims / claims if claims > 0 else 0 |
|
with col2: |
|
if overall_credibility > 0.75 and overall_credibility <= 1: |
|
|
|
metric_card(label="Overall Credibility", value=f"{overall_credibility:.2%}", background_color="#D4EDDA", border_left_color="#28A745") |
|
elif overall_credibility > 0.25 and overall_credibility <= 0.75: |
|
|
|
metric_card(label="Overall Credibility", value=f"{overall_credibility:.2%}", background_color="#FFF3CD", border_left_color="#FFC107") |
|
else: |
|
|
|
metric_card(label="Overall Credibility", value=f"{overall_credibility:.2%}", background_color="#F8D7DA", border_left_color="#DC3545") |
|
|
|
|
|
for word in formatted_text.split(" "): |
|
yield word + " " |
|
time.sleep(0.01) |
|
|
|
st.write_stream(process_stream(response)) |
|
|