import os from typing import List, Tuple, Dict, Union, Any import requests import numpy as np import openai import pandas as pd import streamlit as st from langchain.document_loaders import TextLoader from langchain.embeddings.sentence_transformer import SentenceTransformerEmbeddings from langchain.text_splitter import CharacterTextSplitter from langchain.vectorstores import Chroma from scipy.spatial.distance import cosine openai.api_key = os.environ["OPENAI_API_KEY"] def call_chatgpt(prompt: str) -> str: """ Uses the OpenAI API to generate an AI response to a prompt. Args: prompt: A string representing the prompt to send to the OpenAI API. Returns: A string representing the AI's generated response. """ # Use the OpenAI API to generate a response based on the input prompt. response = openai.Completion.create( model="gpt-3.5-turbo-instruct", prompt=prompt, temperature=0.5, max_tokens=500, top_p=1, frequency_penalty=0, presence_penalty=0, ) # Extract the text from the first (and only) choice in the response output. ans = response.choices[0]["text"] # Return the generated AI response. return ans # def ai_judge(prompt: str) -> float: # """ # Uses the ChatGPT function to identify whether the content can answer the question # Args: # prompt: A string that represents the prompt # Returns: # float: A score # """ # return call_chatgpt(prompt) def ai_judge(sentence1: str, sentence2: str) -> float: HF_TOKEN = os.environ["HF_TOKEN"] API_URL = "https://api-inference.huggingface.co/models/sentence-transformers/msmarco-distilbert-base-tas-b" headers = {"Authorization": f"Bearer {HF_TOKEN}"} def helper(payload): response = requests.post(API_URL, headers=headers, json=payload) return response.json() data = helper( { "inputs": { "source_sentence": sentence1, "sentences": [sentence2] } } ) return data def query(payload: Dict[str, Any]) -> Dict[str, Any]: """ Sends a JSON payload to a predefined API URL and returns the JSON response. Args: payload (Dict[str, Any]): The JSON payload to be sent to the API. Returns: Dict[str, Any]: The JSON response received from the API. """ # API endpoint URL API_URL = "https://sks7h7h5qkhoxwxo.us-east-1.aws.endpoints.huggingface.cloud" # Headers to indicate both the request and response formats are JSON headers = { "Accept": "application/json", "Content-Type": "application/json" } # Sending a POST request with the JSON payload and headers response = requests.post(API_URL, headers=headers, json=payload) # Returning the JSON response return response.json() def llama2_7b_ysa(prompt: str) -> str: """ Queries a model and retrieves the generated text based on the given prompt. This function sends a prompt to a model (presumably named 'llama2_7b') and extracts the generated text from the model's response. It's tailored for handling responses from a specific API or model query structure where the response is expected to be a list of dictionaries, with at least one dictionary containing a key 'generated_text'. Parameters: - prompt (str): The text prompt to send to the model. Returns: - str: The generated text response from the model. Note: - The function assumes that the 'query' function is previously defined and accessible within the same scope or module. It should send a request to the model and return the response in a structured format. - The 'parameters' dictionary is passed empty but can be customized to include specific request parameters as needed by the model API. """ # Define the query payload with the prompt and any additional parameters query_payload: Dict[str, Any] = { "inputs": prompt, "parameters": {} } # Send the query to the model and store the output response output = query(query_payload) # Extract the 'generated_text' from the first item in the response list response: str = output[0]['generated_text'] return response ## rag strategy 1 # file_names = [f"output_files/file_{i}.txt" for i in range(131)] # # file_names = [f"output_files_large/file_{i}.txt" for i in range(1310)] # # Initialize an empty list to hold all documents # all_documents = [] # this is just a copy, you don't have to use this # # Iterate over each file and load its contents # for file_name in file_names: # loader = TextLoader(file_name) # documents = loader.load() # all_documents.extend(documents) # # Split the loaded documents into chunks # text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0) # docs = text_splitter.split_documents(all_documents) # # Create the open-source embedding function # embedding_function = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2") # # embedding_function = SentenceTransformer("all-MiniLM-L6-v2") # # embedding_function = openai_text_embedding # # Load the documents into Chroma # db = Chroma.from_documents(docs, embedding_function) ## rag strategy 2 from datasets import load_dataset import chromadb import string dataset = load_dataset("eagle0504/youthless-homeless-shelter-web-scrape-dataset-qa-formatted") client = chromadb.Client() random_number = np.random.randint(low=1e9, high=1e10) random_string = ''.join(np.random.choice(list(string.ascii_uppercase + string.digits), size=10)) combined_string = f"{random_number}{random_string}" collection = client.create_collection(combined_string) # Embed and store the first N supports for this demo L = len(dataset["train"]['questions']) collection.add( ids=[str(i) for i in range(0, L)], # IDs are just strings documents=dataset["train"]['questions'], # Enter questions here metadatas=[{"type": "support"} for _ in range(0, L)], ) st.title("Youth Homelessness Chatbot") # Initialize chat history if "messages" not in st.session_state: st.session_state.messages = [] # Display chat messages from history on app rerun for message in st.session_state.messages: with st.chat_message(message["role"]): st.markdown(message["content"]) st.sidebar.markdown( """ ### Instructions: This app guides you through YSA's website, utilizing a RAG-ready Q&A dataset [here](https://huggingface.co/datasets/eagle0504/youthless-homeless-shelter-web-scrape-dataset-qa-formatted) for chatbot assistance. 🤖 Enter a question, and it finds similar ones in the database, offering answers with a distance score to gauge relevance—the lower the score, the closer the match. 🎯 For better accuracy and to reduce errors, user feedback helps refine the database. ✨ """) st.sidebar.success("Please enter a distance threshold (we advise to set it around 0.2).") special_threshold = st.sidebar.number_input("Insert a number", value=0.2, placeholder="Type a number...") # 0.3 clear_button = st.sidebar.button("Clear Conversation", key="clear") if clear_button: st.session_state.messages = [] # React to user input if prompt := st.chat_input("Tell me about YSA"): # Display user message in chat message container st.chat_message("user").markdown(prompt) # Add user message to chat history st.session_state.messages.append({"role": "user", "content": prompt}) question = prompt with st.spinner("Wait for it..."): # strategy 1 # docs = db.similarity_search(question) # docs_2 = db.similarity_search_with_score(question) # docs_2_table = pd.DataFrame( # { # "source": [docs_2[i][0].metadata["source"] for i in range(len(docs))], # "content": [docs_2[i][0].page_content for i in range(len(docs))], # "distances": [docs_2[i][1] for i in range(len(docs))], # } # ) # ref_from_db_search = docs_2_table["content"] # strategy 2 results = collection.query( query_texts=question, n_results=5 ) idx = results["ids"][0] idx = [int(i) for i in idx] ref = pd.DataFrame( { "idx": idx, "questions": [dataset["train"]['questions'][i] for i in idx], "answers": [dataset["train"]['answers'][i] for i in idx], "distances": results["distances"][0] } ) # special_threshold = st.sidebar.slider('How old are you?', 0, 0.6, 0.1) # 0.3 filtered_ref = ref[ref["distances"] < special_threshold] if filtered_ref.shape[0] > 0: st.success("There are highly relevant information in our database.") ref_from_db_search = filtered_ref["answers"] final_ref = filtered_ref else: st.warning("The database may not have relevant information to help your question so please be aware of hallucinations.") ref_from_db_search = ref["answers"] final_ref = ref try: llm_response = llama2_7b_ysa(question) except: llm_response = "Sorry, the inference endpoint is temporarily down. 😔" finetuned_llm_guess = ["from_llm", question, llm_response, 0] final_ref.loc[-1] = finetuned_llm_guess final_ref.index = final_ref.index + 1 # add ai judge as additional rating independent_ai_judge_score = [] for i in range(final_ref.shape[0]): this_quest = question this_content = final_ref["answers"][i] # prompt_for_ai_judge = f""" # The user asked a question: {question} # We have found this content: {this_content} # From 0 to 10, rate how well the content answer the user's question. # Only produce a number from 0 to 10 while 10 being the best at answer user's question. # If the content is a list of questions or not related to the user's question or it says inference endpoint is down, then you should say 0, because it does not answer user's question. # """ this_score = ai_judge(question, this_content) independent_ai_judge_score.append(this_score[0]) final_ref["ai_judge"] = independent_ai_judge_score engineered_prompt = f""" Based on the context: {ref_from_db_search}, answer the user question: {question}. Answer the question directly (don't say "based on the context, ...") """ answer = call_chatgpt(engineered_prompt) response = answer # Display assistant response in chat message container with st.chat_message("assistant"): with st.spinner("Wait for it..."): st.markdown(response) with st.expander("See reference:"): st.table(final_ref) # Add assistant response to chat history st.session_state.messages.append({"role": "assistant", "content": response}) # st.session_state.messages.append( # {"role": "assistant", "content": final_ref.to_json()} # )