File size: 3,100 Bytes
21999ba
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import streamlit as st
from typing import List, TypedDict
import utils as U
from data import userActivities


class ActivityLog(TypedDict):
    id: str
    created_at: str
    updated_at: str


ActivityLogs = List[ActivityLog]


def saveLatestActivity():
    emailId = st.session_state.get("user", {}).get("email")
    if not emailId:
        U.pprint("No user email found")
        return

    activityId = st.session_state.activityId

    if activityId:
        userActivities.updateUserActivityById(
            id=activityId,
            chatHistory=st.session_state.get("chatHistory", []),
            messages=st.session_state.get("messages", []),
            buttons=st.session_state.get("buttons", [])
        )
        U.pprint("Current activity updated")
    else:
        activityId = userActivities.createUserActivity(
            emailId=emailId,
            chatHistory=st.session_state.get("chatHistory", []),
            messages=st.session_state.get("messages", []),
            buttons=st.session_state.get("buttons", [])
        )
        U.pprint("New activity created")
        st.session_state.activityId = activityId


def __convertActivitiesToLog(activities) -> ActivityLogs:
    if not activities:
        return []

    logs = []
    for activity in activities:
        logs.append({
            "id": activity.get("id"),
            "created_at": activity.get("created_at"),
            "updated_at": activity.get("updated_at"),
        })
    return logs


def __retrieveUserActivities():
    emailId = st.session_state.get("user", {}).get("email")
    if not emailId:
        U.pprint("No user email found")
        return

    activities = userActivities.getUserActivities(emailId)
    st.session_state.userActivitiesLog = __convertActivitiesToLog(activities)
    U.pprint(f"{len(activities)=}")
    return activities


def restoreUserActivity(activityId: str = None):
    activities = __retrieveUserActivities()
    if activityId:
        activities = [activity for activity in activities if activity.get("id") == activityId]

    if activities:
        latestActivity = activities[0]
        U.pprint(f"{latestActivity=}")
        activityId = latestActivity.get("id")
        messages = latestActivity.get("messages", [])
        chatHistory = latestActivity.get("chat_history", [])
        buttons = latestActivity.get("buttons", [])

        st.session_state.activityId = activityId
        st.session_state.messages = messages
        st.session_state.chatHistory = chatHistory

        if chatHistory and buttons:
            st.session_state.chatHistory[-1]["buttons"] = buttons

        st.rerun()


def resetActivity():
    keysToDelete = [
        "activityId",
        "messages",
        "chatHistory",
        "userActivitiesLog",
        "buttons",
        "buttonValue",
        "isStoryChosen",
        "selectedStory",
        "selectedStoryTitle",
        "startMsg",
        "isStartMsgChosen"
    ]
    for key in keysToDelete:
        if key in st.session_state:
            del st.session_state[key]

    __retrieveUserActivities()
    st.rerun()