Chatbot2 / my_memory_logic.py
Phoenix21's picture
Update my_memory_logic.py
b752340 verified
raw
history blame
2.49 kB
# my_memory_logic.py
import os
# Import the PipelineRunnable from pipeline.py
from pipeline import pipeline_runnable
from langchain.schema import BaseChatMessageHistory
from langchain_community.chat_message_histories import ChatMessageHistory
from langchain.runnables.history import RunnableWithMessageHistory
###############################################################################
# 1) In-memory store: session_id -> ChatMessageHistory
###############################################################################
store = {} # e.g., { "abc123": ChatMessageHistory(...) }
def get_session_history(session_id: str) -> BaseChatMessageHistory:
"""
Retrieve or create a ChatMessageHistory object for the given session_id.
This ensures each session_id has its own conversation history.
"""
if session_id not in store:
store[session_id] = ChatMessageHistory()
return store[session_id]
###############################################################################
# 2) Create the RunnableWithMessageHistory (session-based chain)
###############################################################################
# This wraps your `pipeline_runnable` so it automatically reads/writes
# conversation history from get_session_history for each session.
conversational_rag_chain = RunnableWithMessageHistory(
pipeline_runnable, # the Runnable from pipeline.py
get_session_history, # fetches or creates ChatMessageHistory by session_id
input_messages_key="input", # key in the dict for user's new query
history_messages_key="chat_history", # key for existing chat logs
output_messages_key="answer" # key for final output
)
###############################################################################
# 3) A convenience function to run a query with session-based memory
###############################################################################
def run_with_session_memory(user_query: str, session_id: str) -> str:
"""
A helper that calls our `conversational_rag_chain`
with a given session_id. Returns the final 'answer'.
"""
# We invoke the chain with the user query;
# the chain automatically updates the session’s chat history.
response = conversational_rag_chain.invoke(
{"input": user_query},
config={
"configurable": {
"session_id": session_id
}
}
)
return response["answer"]