Spaces:
Running
Running
File size: 8,297 Bytes
14fcd71 7c4edb2 14fcd71 7c4edb2 14fcd71 b020941 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
import os
import streamlit as st
import requests
import msal
# π€ Load environment variables (Ensure these are set!)
APPLICATION_ID_KEY = os.getenv('APPLICATION_ID_KEY')
CLIENT_SECRET_KEY = os.getenv('CLIENT_SECRET_KEY')
AUTHORITY_URL = 'https://login.microsoftonline.com/common' # Use 'common' for multi-tenant apps
REDIRECT_URI = 'http://localhost:8501' # π Make sure this matches your app's redirect URI
# π― Define the scopes your app will need
SCOPES = ['User.Read', 'Calendars.ReadWrite']
# π οΈ Initialize the MSAL client
def get_msal_app():
return msal.ConfidentialClientApplication(
client_id=APPLICATION_ID_KEY,
client_credential=CLIENT_SECRET_KEY,
authority=AUTHORITY_URL
)
# π Acquire access token using authorization code
def get_access_token(code):
client_instance = get_msal_app()
result = client_instance.acquire_token_by_authorization_code(
code=code,
scopes=SCOPES,
redirect_uri=REDIRECT_URI
)
if 'access_token' in result:
return result['access_token']
else:
st.error('Error acquiring token: ' + str(result.get('error_description')))
st.stop()
# πββοΈ Main application function
def main():
st.title("π¦ Simple Streamlit App with MS Graph API")
# π Sidebar navigation
st.sidebar.title("Navigation")
menu = st.sidebar.radio("Go to", [
"1οΈβ£ Dashboard with Widgets",
"π Landing Page",
"π
Upcoming Events",
"π Schedule",
"π Agenda",
"π Event Details",
"β Add Event",
"π Filter By"
])
# π Authentication
if 'access_token' not in st.session_state:
# π΅οΈββοΈ Check for authorization code in query parameters
query_params = st.get_query_params()
if 'code' in query_params:
code = query_params['code'][0]
st.write('π Acquiring access token...')
access_token = get_access_token(code)
st.session_state['access_token'] = access_token
st.rerun() # Reload the app to clear the code from URL
else:
# π’ Prompt user to log in
client_instance = get_msal_app()
authorization_url = client_instance.get_authorization_request_url(
scopes=SCOPES,
redirect_uri=REDIRECT_URI
)
st.write('π Please [click here]({}) to log in and authorize the app.'.format(authorization_url))
st.stop()
else:
# π₯³ User is authenticated, proceed with the app
access_token = st.session_state['access_token']
headers = {'Authorization': 'Bearer ' + access_token}
# π€ Greet the user
response = requests.get('https://graph.microsoft.com/v1.0/me', headers=headers)
if response.status_code == 200:
user_info = response.json()
st.sidebar.write(f"π Hello, {user_info['displayName']}!")
else:
st.error('Failed to fetch user info.')
st.write(response.text)
# ποΈ Handle menu options
if menu == "1οΈβ£ Dashboard with Widgets":
st.header("1οΈβ£ Dashboard with Widgets")
st.write("Widgets will be displayed here. ποΈ")
# Add your widgets here
elif menu == "π Landing Page":
st.header("π Landing Page")
st.write("Welcome to the app! π₯³")
# Add landing page content here
elif menu == "π
Upcoming Events":
st.header("π
Upcoming Events")
events = get_upcoming_events(access_token)
for event in events:
st.write(f"π {event['subject']} on {event['start']['dateTime']}")
# Display upcoming events
elif menu == "π Schedule":
st.header("π Schedule")
schedule = get_schedule(access_token)
st.write(schedule)
# Display schedule
elif menu == "π Agenda":
st.header("π Agenda")
st.write("Your agenda for today. π")
# Display agenda
elif menu == "π Event Details":
st.header("π Event Details")
event_id = st.text_input("Enter Event ID")
if event_id:
event_details = get_event_details(access_token, event_id)
st.write(event_details)
# Display event details based on ID
elif menu == "β Add Event":
st.header("β Add Event")
event_subject = st.text_input("Event Subject")
event_start = st.date_input("Event Start Date")
event_start_time = st.time_input("Event Start Time")
event_end = st.date_input("Event End Date")
event_end_time = st.time_input("Event End Time")
if st.button("Add Event"):
event_details = {
"subject": event_subject,
"start": {
"dateTime": f"{event_start}T{event_start_time}",
"timeZone": "UTC"
},
"end": {
"dateTime": f"{event_end}T{event_end_time}",
"timeZone": "UTC"
}
}
add_event(access_token, event_details)
st.success("Event added successfully! π")
# Form to add new event
elif menu == "π Filter By":
st.header("π Filter Events")
filter_criteria = st.text_input("Enter filter criteria")
if filter_criteria:
filtered_events = filter_events(access_token, filter_criteria)
for event in filtered_events:
st.write(f"π
{event['subject']} on {event['start']['dateTime']}")
# Filter events based on criteria
else:
st.write("Please select a menu option.")
# π
Function to get upcoming events
def get_upcoming_events(access_token):
headers = {'Authorization': 'Bearer ' + access_token}
response = requests.get('https://graph.microsoft.com/v1.0/me/events?$orderby=start/dateTime&$top=10', headers=headers)
if response.status_code == 200:
events = response.json().get('value', [])
return events
else:
st.error('Failed to fetch upcoming events.')
st.write(response.text)
return []
# π Function to get schedule (Placeholder)
def get_schedule(access_token):
# Implement API call to get schedule
return "π Your schedule goes here."
# β Function to add a new event
def add_event(access_token, event_details):
headers = {
'Authorization': 'Bearer ' + access_token,
'Content-Type': 'application/json'
}
response = requests.post('https://graph.microsoft.com/v1.0/me/events', headers=headers, json=event_details)
if response.status_code == 201:
st.success('Event created successfully! π')
else:
st.error('Failed to create event.')
st.write(response.text)
# π Function to get event details
def get_event_details(access_token, event_id):
headers = {'Authorization': 'Bearer ' + access_token}
response = requests.get(f'https://graph.microsoft.com/v1.0/me/events/{event_id}', headers=headers)
if response.status_code == 200:
event = response.json()
return event
else:
st.error('Failed to fetch event details.')
st.write(response.text)
return {}
# π Function to filter events
def filter_events(access_token, filter_criteria):
headers = {'Authorization': 'Bearer ' + access_token}
# Implement filtering logic based on criteria
response = requests.get(f"https://graph.microsoft.com/v1.0/me/events?$filter=startswith(subject,'{filter_criteria}')", headers=headers)
if response.status_code == 200:
events = response.json().get('value', [])
return events
else:
st.error('Failed to filter events.')
st.write(response.text)
return []
# π Run the main function
if __name__ == "__main__":
main()
|