import streamlit as st from typing import List, TypedDict import utils as U from data import userActivities class ActivityLog(TypedDict): id: str created_at: str updated_at: str ActivityLogs = List[ActivityLog] def saveLatestActivity(): emailId = st.session_state.get("user", {}).get("email") if not emailId: U.pprint("No user email found") return activityId = st.session_state.activityId if activityId: userActivities.updateUserActivityById( id=activityId, chatHistory=st.session_state.get("chatHistory", []), messages=st.session_state.get("messages", []), buttons=st.session_state.get("buttons", []) ) U.pprint("Current activity updated") else: activityId = userActivities.createUserActivity( emailId=emailId, chatHistory=st.session_state.get("chatHistory", []), messages=st.session_state.get("messages", []), buttons=st.session_state.get("buttons", []) ) U.pprint("New activity created") st.session_state.activityId = activityId def __convertActivitiesToLog(activities) -> ActivityLogs: if not activities: return [] logs = [] for activity in activities: logs.append({ "id": activity.get("id"), "created_at": activity.get("created_at"), "updated_at": activity.get("updated_at"), }) return logs def __retrieveUserActivities(): emailId = st.session_state.get("user", {}).get("email") if not emailId: U.pprint("No user email found") return activities = userActivities.getUserActivities(emailId) st.session_state.userActivitiesLog = __convertActivitiesToLog(activities) U.pprint(f"{len(activities)=}") return activities def restoreUserActivity(activityId: str = None): activities = __retrieveUserActivities() if activityId: activities = [activity for activity in activities if activity.get("id") == activityId] if activities: latestActivity = activities[0] U.pprint(f"{latestActivity=}") activityId = latestActivity.get("id") messages = latestActivity.get("messages", []) chatHistory = latestActivity.get("chat_history", []) buttons = latestActivity.get("buttons", []) st.session_state.activityId = activityId st.session_state.messages = messages st.session_state.chatHistory = chatHistory if chatHistory and buttons: st.session_state.chatHistory[-1]["buttons"] = buttons st.rerun() def resetActivity(): keysToDelete = [ "activityId", "messages", "chatHistory", "userActivitiesLog", "buttons", "buttonValue", "isStoryChosen", "selectedStory", "selectedStoryTitle", "startMsg", "isStartMsgChosen" ] for key in keysToDelete: if key in st.session_state: del st.session_state[key] __retrieveUserActivities() st.rerun()