from datetime import timedelta |
from datetime import datetime |
import os.path |
from google.auth.transport.requests import Request |
from google.oauth2.credentials import Credentials |
from google_auth_oauthlib.flow import InstalledAppFlow |
from googleapiclient.discovery import build |
from googleapiclient.errors import HttpError |
import smtplib |
from email.mime.multipart import MIMEMultipart |
from email.mime.text import MIMEText |
import pandas as pd |
from langchain.prompts import PromptTemplate |
from langchain.chains import LLMChain |
from langchain_groq import ChatGroq |
from langchain.agents import tool |
from schemas import symptoms,bookSlot,deleteSlot,reschedule_event,listevent,checkevent |
llm = ChatGroq( |
model="llama3-8b-8192", |
temperature=0, |
max_tokens=None, |
timeout=None, |
max_retries=2, |
api_key=API_KEY |
) |
SCOPES = ["https://www.googleapis.com/auth/calendar"] |
doctor_df = pd.read_csv('doctor_specialization_dummy_data.csv') |
EMAIL_SENDER = "[email protected]" |
EMAIL_PASSWORD = "wlxf poqr wgsh qvqs" |
def get_service(): |
"""Create and return the Google Calendar API service.""" |
print('this function is called.') |
creds = None |
if os.path.exists("token.json"): |
creds = Credentials.from_authorized_user_file("token.json", SCOPES) |
if not creds or not creds.valid: |
if creds and creds.expired and creds.refresh_token: |
creds.refresh(Request()) |
else: |
flow = InstalledAppFlow.from_client_secrets_file("credentials.json", SCOPES) |
creds = flow.run_local_server(port=0) |
with open("token.json", "w") as token: |
token.write(creds.to_json()) |
return build("calendar", "v3", credentials=creds) |
def send_email(to_email, subject, body): |
"""Send an email notification to the participants.""" |
try: |
msg = MIMEMultipart() |
msg['From'] = EMAIL_SENDER |
msg['To'] = to_email |
msg['Subject'] = subject |
msg.attach(MIMEText(body, 'plain')) |
server = smtplib.SMTP('smtp.gmail.com', 587) |
server.starttls() |
text = msg.as_string() |
server.sendmail(EMAIL_SENDER, to_email, text) |
server.quit() |
print(f"Email sent to {to_email}") |
except Exception as e: |
print(f"Failed to send email: {e}") |
def is_valid_booking_time(start_time): |
start_time_dt = datetime.strptime(start_time, "%Y-%m-%dT%H:%M:%S%z") |
today = datetime.now(start_time_dt.tzinfo) |
if start_time_dt < today or start_time_dt > (today + timedelta(days=7)): |
return False, "You can only book appointments within the next 7 days." |
if start_time_dt.weekday() >= 5: |
return False, "Appointments can only be booked Monday to Friday." |
if not (10 <= start_time_dt.hour < 19): |
return False, "Appointments can only be scheduled between 10 AM and 7 PM." |
return True, None |
import pytz |
@tool("check-event", args_schema=checkevent, return_direct=True) |
def check_slots(date): |
""" |
This function is to check available slot for a given date, excluding times when events are booked. |
It only returns valid slots that fall on weekdays (Mon-Fri) between 10 AM to 7 PM. |
Args: |
date (str): The date for which to check availability (e.g., '2024-09-17'). |
Returns: |
str: Formatted string of available 1-hour slots or a message if no slots are available. |
""" |
start_time = f"{date}T10:00:00+05:30" |
end_time = f"{date}T19:00:00+05:30" |
service = get_service() |
valid, message = is_valid_booking_time(start_time) |
if not valid: |
return message |
events_result = service.events().list( |
calendarId='primary', |
timeMin=start_time, |
timeMax=end_time, |
singleEvents=True, |
orderBy='startTime' |
).execute() |
events = events_result.get('items', []) |
work_start = datetime.fromisoformat(f"{date}T10:00:00+05:30") |
work_end = datetime.fromisoformat(f"{date}T19:00:00+05:30") |
available_slots = [] |
current_time = work_start |
for event in events: |
event_start = datetime.fromisoformat(event['start']['dateTime']) |
event_end = datetime.fromisoformat(event['end']['dateTime']) |
while current_time + timedelta(hours=1) <= event_start: |
slot_start = current_time |
slot_end = current_time + timedelta(hours=1) |
valid, message = is_valid_booking_time(slot_start.isoformat()) |
if valid: |
available_slots.append((slot_start, slot_end)) |
current_time += timedelta(hours=1) |
current_time = max(current_time, event_end) |
while current_time + timedelta(hours=1) <= work_end: |
slot_start = current_time |
slot_end = current_time + timedelta(hours=1) |
valid, message = is_valid_booking_time(slot_start.isoformat()) |
if valid: |
available_slots.append((slot_start, slot_end)) |
current_time += timedelta(hours=1) |
if available_slots: |
formatted_output = f"π
**Available Slots for {date}:\n**\n\n" |
for slot in available_slots: |
start_time = slot[0].strftime('%I:%M %p') |
end_time = slot[1].strftime('%I:%M %p') |
formatted_output += f"\nπ {start_time} - {end_time}\n" |
formatted_output += "\n⨠Each slot is for a 1-hour appointment." |
formatted_output += "\n\nπ‘ To book an appointment, please specify your preferred time slot." |
return formatted_output |
else: |
return "π I'm sorry, but there are no available slots for the requested date. Would you like to check another date?" |
def check_slot_availability(start_time, end_time): |
kolkata_tz = pytz.timezone('Asia/Kolkata') |
start_time_dt = datetime.strptime(start_time, "%Y-%m-%dT%H:%M:%S%z") |
end_time_dt = datetime.strptime(end_time, "%Y-%m-%dT%H:%M:%S%z") |
time_min = start_time_dt.isoformat() |
time_max = end_time_dt.isoformat() |
service=get_service() |
events_result = service.events().list( |
calendarId="primary", |
timeMin=time_min, |
timeMax=time_max, |
singleEvents=True, |
orderBy="startTime" |
).execute() |
events = events_result.get("items", []) |
return len(events) == 0 |
def find_event_by_time(start_time): |
""" |
Finds an event by its start time in the user's Google Calendar. |
Args: |
start_time (str): The start time of the event in ISO format (e.g., '2024-09-17T14:30:00+05:30'). |
Returns: |
dict or None: The event details if found, otherwise None. |
""" |
try: |
print(f"Searching for event starting at {start_time}") |
service=get_service() |
start_time_dt = datetime.fromisoformat(start_time) |
end_time_dt = start_time_dt + timedelta(hours=1) |
end_time = end_time_dt.isoformat() |
events_result = service.events().list( |
calendarId="primary", |
timeMin=start_time, |
timeMax=end_time, |
singleEvents=True, |
orderBy="startTime", |
).execute() |
events = events_result.get("items", []) |
print(f"Events found: {events}") |
for event in events: |
event_start = event['start'].get('dateTime') |
if event_start == start_time: |
print(f"Matching event found: {event['summary']} at {event_start}") |
return event |
print(f"No event found starting at {start_time}") |
return None |
except HttpError as error: |
print(f"An error occurred: {error}") |
return None |
except Exception as e: |
print(f"Unexpected error: {e}") |
return None |
@tool("list_event", args_schema=listevent, return_direct=True) |
def list_upcoming_events(target_date): |
""" |
Lists the upcoming events on the user's calendar for a specific date. |
Args: |
target_date (str): The date to filter events on, in 'YYYY-MM-DD' format. |
Returns: |
string: Summary of the events |
""" |
service=get_service() |
target_date_obj = datetime.strptime(target_date, "%Y-%m-%d") |
time_min = target_date_obj.isoformat() + "Z" |
time_max = (target_date_obj + timedelta(days=1)).isoformat() + "Z" |
print(f"Getting events for {target_date}") |
try: |
events_result = ( |
service.events() |
.list( |
calendarId="primary", |
timeMin=time_min, |
timeMax=time_max, |
singleEvents=True, |
orderBy="startTime", |
) |
.execute() |
) |
events = events_result.get("items", []) |
if not events: |
print(f"No events found for {target_date}.") |
else: |
for event in events: |
start = event["start"].get("dateTime", event["start"].get("date")) |
result=start, event["summary"] |
return result |
except HttpError as error: |
print(f"An error occurred: {error}") |
@tool("book-slot-tool", args_schema=bookSlot, return_direct=True) |
def book_slot(start_time, end_time, summary): |
""" |
This functions is to boos a appointment/slot for user by creating an event on the calendar. |
Args: |
start_time (str): The start time of the slot (e.g., '2024-09-16T14:00:00+05:30'). |
end_time (str): The end time of the slot (e.g., '2024-09-16T15:00:00+05:30'). |
summary (str): Summary or title of the event. |
Returns: |
str: Confirmation message if the event is created or an error message if booking fails. |
""" |
service = get_service() |
is_valid, error_message = is_valid_booking_time(start_time) |
if not is_valid: |
formatted_output = "β **Invalid Booking Time**\n\n" |
formatted_output += f"{error_message}\n" |
formatted_output += "\nAppointments can only be scheduled:\n" |
formatted_output += "π
Monday to Friday\n" |
formatted_output += "π Between 10 AM and 7 PM\n" |
formatted_output += "π Within the next 7 days\n" |
return formatted_output |
if not check_slot_availability(start_time, end_time): |
formatted_output = "β **Slot Unavailable**\n\n" |
formatted_output += "\nThe requested slot is not available.\n" |
formatted_output += "\nPlease choose another time." |
return formatted_output |
event = { |
'summary': summary, |
'start': { |
'dateTime': start_time, |
'timeZone': 'Asia/Kolkata', |
}, |
'end': { |
'dateTime': end_time, |
'timeZone': 'Asia/Kolkata', |
}, |
} |
try: |
event_result = service.events().insert(calendarId='primary', body=event).execute() |
formatted_output = "β
**Event Created Successfully!**\n\n" |
formatted_output += f"\nπ **Summary:** {summary}\n" |
formatted_output += f"\nπ **Start Time:** {start_time}\n" |
formatted_output += f"\nπ **End Time:** {end_time}\n\n" |
formatted_output += "\nπ
The event has been added to your primary calendar." |
return formatted_output |
except HttpError as error: |
return f"β **An error occurred:** {error}\n\nPlease try again or contact support if the issue persists." |
@tool("delete-slot-tool", args_schema=deleteSlot, return_direct=True) |
def delete_event(start_time): |
""" |
Deletes an event by start time on the user's calendar. |
Args: |
start_time (str): The start time of the event (e.g., '2024-09-20T15:30:00+05:30'). |
Returns: |
str: Confirmation message if the event is deleted. |
""" |
service = get_service() |
event = find_event_by_time(start_time) |
if event: |
try: |
service.events().delete(calendarId='primary', eventId=event['id']).execute() |
formatted_output = "\nποΈ **Event Deleted Successfully!**\n\n" |
formatted_output += f"\nπ **Summary:** {event['summary']}\n" |
formatted_output += f"\nπ **Start Time:** {event['start']['dateTime']}\n\n" |
formatted_output += "\nThe event has been removed from your primary calendar." |
return formatted_output |
except HttpError as error: |
return f"β **An error occurred:** {error}\n\nPlease try again or contact support if the issue persists." |
else: |
formatted_output = "β **No Event Found**\n\n" |
formatted_output += f"π **\nRequested Start Time:** {start_time}\n\n" |
formatted_output += "\nNo event was found for the specified time. Please check the time and try again." |
return formatted_output |
@tool("reschedule-event-tool", args_schema=reschedule_event, return_direct=True) |
def reschedule_event(start_time, new_start_time, new_end_time): |
""" |
Reschedules an existing event by providing new start and end times. |
Args: |
start_time (str): The start time of the existing event (e.g., '2024-09-18T14:00:00+05:30'). |
new_start_time (str): The new start time of the event (e.g., '2024-09-18T12:00:00+05:30'). |
new_end_time (str): The new end time of the event (e.g., '2024-09-18T14:00:00+05:30'). |
Returns: |
str: Confirmation message if the event is rescheduled or an error message if rescheduling fails. |
""" |
service = get_service() |
if not is_valid_booking_time(start_time): |
formatted_output = "β **Invalid Booking Time**\n\n" |
formatted_output += "\nAppointments can only be scheduled:\n" |
formatted_output += "\nπ
Monday to Friday\n" |
formatted_output += "\nπ Between 10 AM and 7 PM\n" |
formatted_output += "\nπ Within the next 7 days\n" |
return formatted_output |
if not check_slot_availability(new_start_time, new_end_time): |
formatted_output = "β **Slot Unavailable**\n\n" |
formatted_output += "\nThe requested slot is not available.\n" |
formatted_output += "\nPlease choose another time." |
return formatted_output |
try: |
event = find_event_by_time(start_time) |
if not event: |
formatted_output = "β **No Event Found**\n\n" |
formatted_output += f"\nπ **Requested Start Time:** {start_time}\n\n" |
formatted_output += "\nNo event was found for the specified time. Please check the time and try again." |
return formatted_output |
event['start']['dateTime'] = new_start_time |
event['end']['dateTime'] = new_end_time |
updated_event = service.events().update(calendarId='primary', eventId=event['id'], body=event).execute() |
formatted_output = "β
**Event Rescheduled Successfully!**\n\n" |
formatted_output += f"\nπ **Original Start Time:** {start_time}\n" |
formatted_output += f"\nπ **New Start Time:** {new_start_time}\n" |
formatted_output += f"\nπ **New End Time:** {new_end_time}\n\n" |
formatted_output += "\nYour appointment has been updated in the calendar." |
return formatted_output |
except HttpError as error: |
return f"β **An error occurred:** {error}\n\nPlease try again or contact support if the issue persists." |
@tool("suggest-doctors-tool", return_direct=True) |
def suggest_specialization(symptoms): |
"""This function understands user's symptoms and suggests specialization and doctor details. |
Args: |
symptoms (string): symptoms of patients/user |
Returns: |
string: Formatted string with specialization and doctor details |
""" |
prompt_template = """ |
You are a highly knowledgeable Healthcare AI specialized in directing patients to the appropriate medical professionals. |
Greet the user politely. Based on the symptoms provided below, suggest the most suitable doctor's specialization for the patient to visit. |
Respond with only the name of the specialization in one word, without any additional explanations. |
Here are a few examples for reference: |
- Symptoms: skin rash, itching, dryness β Answer: "Dermatologist" |
- Symptoms: chest pain, shortness of breath β Answer: "Cardiologist" |
- Symptoms: joint pain, stiffness, swelling β Answer: "Rheumatologist" |
- Symptoms: persistent cough, fever, sore throat β Answer: "Pulmonologist" |
- Symptoms: frequent headaches, dizziness, vision problems β Answer: "Neurologist" |
Now, based on the symptoms below, provide the specialization in one word. |
Symptoms: {symptoms} |
""" |
prompt = PromptTemplate(input_variables=["symptoms"], template=prompt_template) |
chain = LLMChain(llm=llm, prompt=prompt) |
try: |
result = chain.run(symptoms=symptoms) |
specialization = result.split('\n')[-1].strip().lower() |
relevant_doctors = doctor_df[doctor_df['Specialization'].str.contains(specialization, case=False, na=False)] |
if relevant_doctors.empty: |
return ("Based on your symptoms, I recommend consulting a General Practitioner. " |
"Unfortunately, no specific doctors matching your symptoms were found in our database.") |
formatted_output = f"Based on your symptoms, I recommend consulting an {specialization.capitalize()}.\n\n" |
formatted_output += "Here are the details of a suitable doctor:\n\n" |
doctor = relevant_doctors.iloc[0] |
formatted_output += f"π¨ββοΈ **Doctor**: {doctor['Doctor Name']}\n" |
formatted_output += f"\nπ₯ **Specialization**: {doctor['Specialization']}\n" |
formatted_output += f"\nπ **Contact**: {doctor['Contact Number']}\n" |
formatted_output += f"\nπ’ **Address**: {doctor['Address']}\n" |
return formatted_output |
except Exception as e: |
return "I apologize, but I encountered an issue while processing your request. Please try describing your symptoms again, or consider consulting a General Practitioner for a comprehensive evaluation." |