Spaces:
Sleeping
Sleeping
File size: 9,189 Bytes
4c96918 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
import os
import pickle
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from googleapiclient.discovery import build
from datetime import datetime, timedelta
import gradio as gr
# SCOPES for Google Calendar API
SCOPES = ['https://www.googleapis.com/auth/calendar.readonly', 'https://www.googleapis.com/auth/calendar']
def authenticate_google():
"""Authenticate and return Google Calendar service."""
creds = None
try:
if os.path.exists('token.pickle'):
try:
with open('token.pickle', 'rb') as token:
creds = pickle.load(token)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
try:
creds.refresh(Request())
except Exception:
os.remove('token.pickle')
creds = None
else:
os.remove('token.pickle')
creds = None
except Exception:
os.remove('token.pickle')
creds = None
if not creds:
if not os.path.exists('credentials.json'):
raise FileNotFoundError(
"credentials.json file not found. Please download it from Google Cloud Console."
)
flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)
creds = flow.run_local_server(port=0)
with open('token.pickle', 'wb') as token:
pickle.dump(creds, token)
return build('calendar', 'v3', credentials=creds)
except Exception as e:
raise Exception(f"Authentication failed: {str(e)}")
def create_event(service, event_name, start_time, end_time, time_zone='Asia/Kolkata'):
"""Create an event in Google Calendar."""
try:
event = {
'summary': event_name,
'start': {
'dateTime': start_time.isoformat(),
'timeZone': time_zone,
},
'end': {
'dateTime': end_time.isoformat(),
'timeZone': time_zone,
},
}
created_event = service.events().insert(calendarId='primary', body=event).execute()
return f'Event "{event_name}" created successfully!'
except Exception as e:
error_msg = f"Failed to create event: {str(e)}"
if "invalid_grant" in str(e):
if os.path.exists('token.pickle'):
os.remove('token.pickle')
error_msg += "\nPlease restart the application to re-authenticate."
raise Exception(error_msg)
def parse_date_time(date_str, time_str):
"""Parse date and time strings."""
try:
date_parts = date_str.split('-')
if len(date_parts) != 3:
raise ValueError("Date must be in DD-MM-YYYY format")
day, month, year = map(int, date_parts)
try:
time_obj = datetime.strptime(time_str.strip().upper(), '%I:%M %p')
except ValueError:
raise ValueError("Time must be in HH:MM AM/PM format")
start_time = datetime(year, month, day,
time_obj.hour, time_obj.minute)
return start_time
except ValueError as e:
raise ValueError(str(e))
def get_conversation_stage(history):
"""Determine the current stage of the conversation."""
user_messages = [msg for role, msg in history if role == "User"]
if not user_messages:
return 0
return len(user_messages)
def conversational_flow(history, user_input):
"""Handle the conversation flow with improved input processing."""
if not user_input:
return history
# Check for restart command at any point
if user_input.lower() in ['restart', 'start over', 'reset']:
history.clear()
history.append(("Bot", "Conversation restarted. Hi! How may I assist you?"))
return history
# Add user input to history
history.append(("User", user_input))
stage = get_conversation_stage(history)
# Process based on conversation stage
if stage == 1: # After first input (Book an appointment)
if "appointment" in user_input.lower() or "book" in user_input.lower():
history.append(("Bot", "What is the purpose of the appointment?"))
else:
history.pop() # Remove invalid input
history.append(("Bot", "I can help you book an appointment. Please say 'Book an appointment' to start."))
elif stage == 2: # After purpose
event_name = user_input
history.append(("Bot", f"Great! You want to create an event for '{event_name}'. What is the date? (DD-MM-YYYY)"))
elif stage == 3: # After date
date_str = user_input
try:
# Validate date format
date_parts = date_str.split('-')
if len(date_parts) != 3 or not all(part.isdigit() for part in date_parts):
raise ValueError("Date must be in DD-MM-YYYY format")
history.append(("Bot", "What is the start time? (HH:MM AM/PM)"))
except ValueError as e:
history.pop() # Remove invalid date
history.append(("Bot", f"Invalid date format: {str(e)}. Please enter the date in DD-MM-YYYY format."))
elif stage == 4: # After start time
start_time_str = user_input
try:
datetime.strptime(start_time_str.strip().upper(), '%I:%M %p')
history.append(("Bot", "What is the end time? (HH:MM AM/PM)"))
except ValueError:
history.pop()
history.append(("Bot", "Invalid time format. Please enter the time in HH:MM AM/PM format (e.g., 02:30 PM)."))
elif stage == 5: # After end time
end_time_str = user_input
try:
# Extract event details from history
event_name = [msg for role, msg in history if role == "User"][1]
date_str = [msg for role, msg in history if role == "User"][2]
start_time_str = [msg for role, msg in history if role == "User"][3]
# Parse start and end times
start_time = parse_date_time(date_str, start_time_str)
end_time = parse_date_time(date_str, end_time_str)
# Validate end time is after start time
if end_time <= start_time:
raise ValueError("End time must be after start time")
# Create event
try:
service = authenticate_google()
result = create_event(service, event_name, start_time, end_time)
history.append(("Bot", result))
history.append(("Bot", "Do you need to book any other appointments? (yes/no)"))
except Exception as e:
history.append(("Bot", f"Error: {str(e)}"))
except ValueError as e:
history.pop()
history.append(("Bot", f"Invalid time: {str(e)}. Please enter a valid end time in HH:MM AM/PM format."))
elif stage == 6: # After yes/no
if user_input.lower() == "yes":
history.clear()
history.append(("Bot", "Hi! How may I assist you?"))
elif user_input.lower() == "no":
history.append(("Bot", "Thank you! Have a nice day."))
else:
history.pop()
history.append(("Bot", "Please answer with 'yes' or 'no'."))
return history
def display_chat(history):
"""Display chat messages."""
chat_display = []
for role, message in history:
if role == "User":
chat_display.append([message, None])
else:
chat_display.append([None, message])
return chat_display
# Create Gradio interface
with gr.Blocks() as iface:
gr.Markdown("""
<h1>Google Calendar Appointment Bot</h1>
<p>Type 'restart' at any time to start over.</p>
""")
chatbot = gr.Chatbot()
user_input = gr.Textbox(placeholder="Type your response here...")
send_button = gr.Button("Send")
restart_button = gr.Button("Restart Conversation")
def reset_conversation():
history = [("Bot", "Hi! How may I assist you?")]
return display_chat(history), history
def handle_input(user_input, history):
if not user_input.strip():
return display_chat(history), history, ""
history = conversational_flow(history, user_input)
return display_chat(history), history, ""
history_state = gr.State([])
send_button.click(handle_input, inputs=[user_input, history_state], outputs=[chatbot, history_state, user_input])
user_input.submit(handle_input, inputs=[user_input, history_state], outputs=[chatbot, history_state, user_input])
restart_button.click(reset_conversation, outputs=[chatbot, history_state])
iface.load(reset_conversation, outputs=[chatbot, history_state])
if __name__ == "__main__":
iface.launch() |