# my_memory_logic.py | |
import os | |
# Import the PipelineRunnable from pipeline.py | |
from pipeline import pipeline_runnable | |
from langchain.schema import BaseChatMessageHistory | |
from langchain_community.chat_message_histories import ChatMessageHistory | |
from langchain.runnables.history import RunnableWithMessageHistory | |
############################################################################### | |
# 1) In-memory store: session_id -> ChatMessageHistory | |
############################################################################### | |
store = {} # e.g., { "abc123": ChatMessageHistory(...) } | |
def get_session_history(session_id: str) -> BaseChatMessageHistory: | |
""" | |
Retrieve or create a ChatMessageHistory object for the given session_id. | |
This ensures each session_id has its own conversation history. | |
""" | |
if session_id not in store: | |
store[session_id] = ChatMessageHistory() | |
return store[session_id] | |
############################################################################### | |
# 2) Create the RunnableWithMessageHistory (session-based chain) | |
############################################################################### | |
# This wraps your `pipeline_runnable` so it automatically reads/writes | |
# conversation history from get_session_history for each session. | |
conversational_rag_chain = RunnableWithMessageHistory( | |
pipeline_runnable, # the Runnable from pipeline.py | |
get_session_history, # fetches or creates ChatMessageHistory by session_id | |
input_messages_key="input", # key in the dict for user's new query | |
history_messages_key="chat_history", # key for existing chat logs | |
output_messages_key="answer" # key for final output | |
) | |
############################################################################### | |
# 3) A convenience function to run a query with session-based memory | |
############################################################################### | |
def run_with_session_memory(user_query: str, session_id: str) -> str: | |
""" | |
A helper that calls our `conversational_rag_chain` | |
with a given session_id. Returns the final 'answer'. | |
""" | |
# We invoke the chain with the user query; | |
# the chain automatically updates the session’s chat history. | |
response = conversational_rag_chain.invoke( | |
{"input": user_query}, | |
config={ | |
"configurable": { | |
"session_id": session_id | |
} | |
} | |
) | |
return response["answer"] | |