File size: 3,100 Bytes
21999ba |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
import streamlit as st
from typing import List, TypedDict
import utils as U
from data import userActivities
class ActivityLog(TypedDict):
id: str
created_at: str
updated_at: str
ActivityLogs = List[ActivityLog]
def saveLatestActivity():
emailId = st.session_state.get("user", {}).get("email")
if not emailId:
U.pprint("No user email found")
return
activityId = st.session_state.activityId
if activityId:
userActivities.updateUserActivityById(
id=activityId,
chatHistory=st.session_state.get("chatHistory", []),
messages=st.session_state.get("messages", []),
buttons=st.session_state.get("buttons", [])
)
U.pprint("Current activity updated")
else:
activityId = userActivities.createUserActivity(
emailId=emailId,
chatHistory=st.session_state.get("chatHistory", []),
messages=st.session_state.get("messages", []),
buttons=st.session_state.get("buttons", [])
)
U.pprint("New activity created")
st.session_state.activityId = activityId
def __convertActivitiesToLog(activities) -> ActivityLogs:
if not activities:
return []
logs = []
for activity in activities:
logs.append({
"id": activity.get("id"),
"created_at": activity.get("created_at"),
"updated_at": activity.get("updated_at"),
})
return logs
def __retrieveUserActivities():
emailId = st.session_state.get("user", {}).get("email")
if not emailId:
U.pprint("No user email found")
return
activities = userActivities.getUserActivities(emailId)
st.session_state.userActivitiesLog = __convertActivitiesToLog(activities)
U.pprint(f"{len(activities)=}")
return activities
def restoreUserActivity(activityId: str = None):
activities = __retrieveUserActivities()
if activityId:
activities = [activity for activity in activities if activity.get("id") == activityId]
if activities:
latestActivity = activities[0]
U.pprint(f"{latestActivity=}")
activityId = latestActivity.get("id")
messages = latestActivity.get("messages", [])
chatHistory = latestActivity.get("chat_history", [])
buttons = latestActivity.get("buttons", [])
st.session_state.activityId = activityId
st.session_state.messages = messages
st.session_state.chatHistory = chatHistory
if chatHistory and buttons:
st.session_state.chatHistory[-1]["buttons"] = buttons
st.rerun()
def resetActivity():
keysToDelete = [
"activityId",
"messages",
"chatHistory",
"userActivitiesLog",
"buttons",
"buttonValue",
"isStoryChosen",
"selectedStory",
"selectedStoryTitle",
"startMsg",
"isStartMsgChosen"
]
for key in keysToDelete:
if key in st.session_state:
del st.session_state[key]
__retrieveUserActivities()
st.rerun()
|