import streamlit as st
from datetime import datetime
import pandas as pd
import plotly.express as px
import firebase_admin
from firebase_admin import credentials, firestore, auth, exceptions
from dateutil.relativedelta import relativedelta
import os
import cloudinary
import cloudinary.uploader
from dotenv import load_dotenv
# Configure page first
st.set_page_config(
page_title="DAILY TASK MANAGEMENT AND MONITORING",
layout="wide",
page_icon="📋",
menu_items={
'Get Help': 'https://github.com',
'Report a bug': "https://github.com",
'About': "# Task Management System v4.2"
}
)
# Custom CSS for enhanced UI
st.markdown("""
""", unsafe_allow_html=True)
def initialize_firebase():
if not firebase_admin._apps:
cred = credentials.Certificate("firebase_credentials.json")
firebase_admin.initialize_app(cred)
return firestore.client(), auth
# Load environment variables
load_dotenv()
# Configure Cloudinary
cloudinary.config(
cloud_name=os.getenv("CLOUDINARY_CLOUD_NAME"),
api_key=os.getenv("CLOUDINARY_API_KEY"),
api_secret=os.getenv("CLOUDINARY_API_SECRET")
)
def create_default_admin(auth, db):
try:
admin_email = "admin@example.com"
try:
admin_user = auth.get_user_by_email(admin_email)
admin_ref = db.collection("users").document(admin_user.uid)
if not admin_ref.get().exists:
admin_ref.set({
"email": admin_email,
"is_admin": True,
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
})
except auth.UserNotFoundError:
admin_user = auth.create_user(
email=admin_email,
password="Temp@2025",
display_name="Admin"
)
auth.set_custom_user_claims(admin_user.uid, {'admin': True})
db.collection("users").document(admin_user.uid).set({
"email": admin_email,
"is_admin": True,
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
})
except Exception as e:
st.error(f"Admin setup error: {str(e)}")
def main():
# Initialize Firebase
db, firebase_auth = initialize_firebase()
create_default_admin(firebase_auth, db)
# Session State Management
if "authenticated" not in st.session_state:
st.session_state.update({
"authenticated": False,
"email": "",
"is_admin": False,
"first_login": True,
"user_uid": ""
})
# Authentication Flow
if not st.session_state.authenticated:
st.markdown("
", unsafe_allow_html=True)
col1, col2 = st.columns(2)
with col1:
with st.expander("🔑 Sign In", expanded=True):
with st.form("signin_form"):
email = st.text_input("📧 Email")
password = st.text_input("🔒 Password", type="password")
if st.form_submit_button("🚀 Login"):
try:
user = firebase_auth.get_user_by_email(email)
user_doc = db.collection("users").document(user.uid).get()
is_admin = user.custom_claims.get('admin', False) if user.custom_claims else False
if user_doc.exists:
is_admin = user_doc.to_dict().get('is_admin', is_admin)
st.session_state.update({
"authenticated": True,
"email": user.email,
"user_uid": user.uid,
"is_admin": is_admin
})
st.rerun()
except Exception as e:
st.error(f"🔐 Authentication failed: {str(e)}")
with col2:
with st.expander("📝 Sign Up", expanded=True):
with st.form("signup_form"):
new_email = st.text_input("📨 New Email")
new_password = st.text_input("🔏 New Password", type="password")
if st.form_submit_button("🌟 Register"):
try:
user = firebase_auth.create_user(email=new_email, password=new_password)
db.collection("users").document(user.uid).set({
"email": new_email,
"is_admin": False,
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
})
st.success("🎉 Registration successful! Please sign in.")
except Exception as e:
st.error(f"⚠️ Registration failed: {str(e)}")
return
# Main Application Interface
st.markdown(f"""
""", unsafe_allow_html=True)
# Core Functions
def get_tasks():
try:
tasks = db.collection("tasks").where("user", "==", st.session_state.email).stream()
return [{"id": task.id, **task.to_dict()} for task in tasks]
except Exception as e:
st.error(f"📦 Error loading tasks: {str(e)}")
return []
def delete_task(task_id):
try:
db.collection("tasks").document(task_id).delete()
st.success("🗑️ Task deleted successfully!")
st.rerun()
except Exception as e:
st.error(f"❌ Delete failed: {str(e)}")
# Reminders System
if st.session_state.first_login:
with st.expander("🔔 Task Reminders", expanded=True):
tasks = get_tasks()
overdue = []
due_soon = []
for task in tasks:
if task['status'] != "Completed":
task_date = datetime.strptime(task['date'], "%Y-%m-%d").date()
delta = (task_date - datetime.today().date()).days
if delta < 0:
overdue.append(task)
elif 0 <= delta <= 3:
due_soon.append(task)
if overdue:
st.error("##### ⚠️ Overdue Tasks")
for task in overdue:
st.markdown(f"**{task['task']}** ({task['project']}) - Due {task['date']}")
if due_soon:
st.warning("##### ⏳ Upcoming Deadlines")
for task in due_soon:
st.markdown(f"**{task['task']}** ({task['project']}) - Due {task['date']}")
if not overdue and not due_soon:
st.info("🌟 All tasks are up to date!")
st.session_state.first_login = False
# Navigation Menu
menu_items = ["🏠 Dashboard", "📥 Task Entry", "👀 Task Explorer", "✏️ Edit Tasks", "⚙️ Settings"]
if st.session_state.is_admin:
menu_items.insert(3, "🏗️ Project Management")
with st.sidebar:
st.title("🔍 Navigation")
menu = st.radio("", menu_items, label_visibility="collapsed")
# Dashboard View
if menu == "🏠 Dashboard":
tasks = get_tasks()
if tasks:
df = pd.DataFrame(tasks)
col1, col2, col3, col4 = st.columns(4)
col1.metric("📌 Total Tasks", len(df))
col2.metric("✅ Completed", len(df[df['status'] == "Completed"]))
col3.metric("⏳ In Progress", len(df[df['status'] == "In Progress"]))
col4.metric("📭 Pending", len(df[df['status'] == "Pending"]))
with st.container(border=True):
st.subheader("📊 Task Status Distribution")
fig = px.pie(df, names='status', hole=0.4,
color_discrete_sequence=px.colors.qualitative.Pastel1)
st.plotly_chart(fig, use_container_width=True)
# New Timeline Visualization
with st.container(border=True):
st.subheader("⏳ Task Timeline")
timeline_df = df.copy()
# Convert date and create timeline visualization
timeline_df['date'] = pd.to_datetime(timeline_df['date'])
timeline_df['Start'] = timeline_df['date'] - pd.DateOffset(days=1) # Fake start date for visualization
timeline_df['Finish'] = timeline_df['date']
# Create Gantt-style timeline
fig = px.timeline(
timeline_df,
x_start="Start",
x_end="Finish",
y="task",
color="status",
color_discrete_map={
"Pending": "#FFE4B5",
"In Progress": "#87CEEB",
"Completed": "#98FB98"
},
title="Task Schedule Overview",
labels={"task": "Task", "date": "Due Date"},
hover_data=["project", "type"]
)
# Customize layout
fig.update_yaxes(autorange="reversed", title_text="Tasks")
fig.update_xaxes(title_text="Timeline")
fig.update_layout(
height=500,
showlegend=True,
hovermode="closest",
xaxis=dict(showgrid=True, tickformat="%b %d\n%Y"),
margin=dict(l=0, r=0, t=40, b=20)
)
# Add custom hover template
fig.update_traces(
hovertemplate="%{y}
"
"Project: %{customdata[0]}
"
"Type: %{customdata[1]}
"
"Due Date: %{x|%b %d, %Y}"
)
st.plotly_chart(fig, use_container_width=True)
else:
st.info("📭 No tasks found. Start by adding new tasks!")
# Task Entry
elif menu == "📥 Task Entry":
# Step 1: Task Type Selection
st.subheader("➕ Add New Task")
task_type_selection = st.radio(
"Select Task Type",
["One-Time Task", "Recurring Task"],
horizontal=True
)
# Step 2: Task Details Form
with st.form(key="task_form", clear_on_submit=True):
col1, col2 = st.columns(2)
# Left Column
with col1:
task = st.text_area("📝 Task Description", height=100)
task_type = st.selectbox(
"📦 Task Type",
["Design", "Procurement", "Construction", "Testing", "Other"]
)
# Right Column
with col2:
projects = [p.id for p in db.collection("projects").stream()] + ["Add New Project"]
project = st.selectbox("🏗️ Project", projects)
if project == "Add New Project":
project = st.text_input("✨ New Project Name")
status = st.selectbox("📌 Status", ["Pending", "In Progress", "Completed"])
date = st.date_input("📅 Due Date", min_value=datetime.today())
# Task Assignment
user_emails = ["None"] + [user.to_dict().get('email') for user in db.collection("users").stream()]
assigned_to = st.selectbox(
"👤 Assign To",
options=user_emails,
help="Assign this task to a team member"
)
# Recurrence Settings (only for recurring tasks)
if task_type_selection == "Recurring Task":
recurrence = st.selectbox("🔄 Repeat", ["Daily", "Weekly", "Monthly"])
end_condition = st.radio(
"End Condition",
["End Date", "Number of Occurrences"],
horizontal=True
)
if end_condition == "End Date":
end_date = st.date_input(
"Repeat Until",
min_value=date + relativedelta(days=1),
help="Recurrence end date (inclusive)"
)
else:
num_occurrences = st.number_input(
"Number of Occurrences",
min_value=2,
max_value=365,
value=5,
help="Total number of task instances"
)
else:
recurrence = "None"
end_condition = None
end_date = None
num_occurrences = None
# File Upload Section
uploaded_files = st.file_uploader(
"📎 Attachments",
type=["pdf", "docx", "xlsx", "png", "jpg", "jpeg"],
accept_multiple_files=True,
help="Upload relevant files (max 10MB each)"
)
# Submit Button
submitted = st.form_submit_button("💾 Save Task", use_container_width=True)
if submitted:
if not task.strip():
st.error("❌ Task description cannot be empty!")
st.stop()
try:
# Validate recurrence (if applicable)
if task_type_selection == "Recurring Task":
if end_condition == "End Date" and (not end_date or end_date <= date):
st.error("❌ End date must be after initial due date")
st.stop()
elif end_condition == "Number of Occurrences" and num_occurrences < 2:
st.error("❌ Number of occurrences must be at least 2")
st.stop()
# Process attachments
attachments = []
if uploaded_files:
for file in uploaded_files:
if file.size > 10 * 1024 * 1024: # 10MB limit
st.error(f"❌ File {file.name} exceeds 10MB limit")
st.stop()
# Upload to Cloudinary
result = cloudinary.uploader.upload(
file,
folder=f"attachments/{st.session_state.user_uid}/",
resource_type="auto"
)
attachments.append({
"name": file.name,
"url": result['secure_url'],
"type": file.type,
"size": file.size
})
# Generate recurring dates (if applicable)
dates = [date]
if task_type_selection == "Recurring Task":
current_date = date
if end_condition == "End Date":
while current_date < end_date:
if recurrence == "Daily":
current_date += relativedelta(days=1)
elif recurrence == "Weekly":
current_date += relativedelta(weeks=1)
elif recurrence == "Monthly":
current_date += relativedelta(months=1)
if current_date <= end_date:
dates.append(current_date)
else:
for _ in range(num_occurrences - 1):
if recurrence == "Daily":
current_date += relativedelta(days=1)
elif recurrence == "Weekly":
current_date += relativedelta(weeks=1)
elif recurrence == "Monthly":
current_date += relativedelta(months=1)
dates.append(current_date)
# Batch write to Firestore
batch = db.batch()
for idx, task_date in enumerate(dates):
task_ref = db.collection("tasks").document()
batch.set(task_ref, {
"user": st.session_state.email,
"task": task.strip(),
"type": task_type,
"project": project,
"status": status,
"date": str(task_date),
"assigned_to": assigned_to if assigned_to != "None" else None,
"recurrence": {
"type": recurrence if task_type_selection == "Recurring Task" else "None",
"original_date": str(date),
"end_condition": end_condition if task_type_selection == "Recurring Task" else None,
"end_date": str(end_date) if task_type_selection == "Recurring Task" and end_condition == "End Date" else None,
"num_occurrences": num_occurrences if task_type_selection == "Recurring Task" and end_condition == "Number of Occurrences" else None,
"sequence": idx + 1
},
"attachments": attachments,
"comments": [], # Initialize comments as empty list
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
})
batch.commit()
st.success(f"✅ Created {len(dates)} {'task' if len(dates) == 1 else 'tasks'} with {len(attachments)} attachments!")
st.rerun()
except cloudinary.exceptions.Error as e:
st.error(f"❌ Cloudinary upload failed: {str(e)}")
except Exception as e:
st.error(f"❌ Error: {str(e)}")
# Task Explorer (Updated Date Filter)
elif menu == "👀 Task Explorer":
tasks = get_tasks()
if tasks:
df = pd.DataFrame(tasks)
# Ensure 'attachments', 'recurrence', and 'comments' columns exist
if 'attachments' not in df.columns:
df['attachments'] = [[] for _ in range(len(df))]
if 'recurrence' not in df.columns:
df['recurrence'] = [{} for _ in range(len(df))]
if 'comments' not in df.columns:
df['comments'] = [[] for _ in range(len(df))]
# Convert NaN values to empty dictionaries
df['recurrence'] = df['recurrence'].apply(lambda x: x if isinstance(x, dict) else {})
# Filters
col1, col2, col3 = st.columns(3)
with col1:
status_filter = st.multiselect(
"📌 Filter by Status",
options=df['status'].unique().tolist(),
default=df['status'].unique().tolist()
)
with col2:
project_filter = st.multiselect(
"🏗️ Filter by Project",
options=df['project'].unique().tolist(),
default=df['project'].unique().tolist()
)
with col3:
enable_date_filter = st.checkbox("📅 Filter by Date")
date_filter = None
if enable_date_filter:
date_filter = st.date_input("Select Date")
# Apply filters
filtered_df = df[
(df['status'].isin(status_filter)) &
(df['project'].isin(project_filter))
]
if enable_date_filter and date_filter:
filtered_df = filtered_df[
filtered_df['date'] == date_filter.strftime("%Y-%m-%d")
]
# Display results
if not filtered_df.empty:
for _, row in filtered_df.iterrows():
with st.container(border=True):
col1, col2 = st.columns([5,1])
# Task Details
with col1:
st.markdown(f"""
{row['task']}
{row['status']}
📅 {row['date']}
🏗️ {row['project']}
{f"👤 {row['assigned_to']}" if row.get('assigned_to') else ""}
""", unsafe_allow_html=True)
# Recurrence Details
recurrence = row.get('recurrence', {}) # Safely get recurrence
if recurrence.get('type') != "None":
with st.expander("🔁 Recurrence Details"):
st.write(f"**Type:** {recurrence.get('type', 'None')}")
st.write(f"**Start Date:** {recurrence.get('original_date', 'N/A')}")
if recurrence.get('end_date'):
st.write(f"**End Date:** {recurrence['end_date']}")
else:
st.write(f"**Occurrences:** {recurrence.get('num_occurrences', 'N/A')}")
# Attachments
attachments = row.get('attachments', []) # Safely get attachments
if attachments:
with st.expander(f"📎 Attachments ({len(attachments) if isinstance(attachments, list) else 0})"):
for att in (attachments if isinstance(attachments, list) else []):
st.markdown(f"""
{att['type']} | {att['size'] // 1024} KB
""", unsafe_allow_html=True)
# Comments
comments = row.get('comments', []) # Safely get comments
with st.expander(f"💬 Comments ({len(comments) if isinstance(comments, list) else 0})"):
for comment in (comments if isinstance(comments, list) else []):
st.markdown(f"""
{comment.get('user', 'Unknown')}
{comment.get('timestamp', 'N/A')}
{comment.get('comment', '')}
""", unsafe_allow_html=True)
# Add new comment
new_comment = st.text_area("Add a comment", key=f"comment_{row['id']}")
if st.button("💬 Post Comment", key=f"post_{row['id']}"):
if new_comment.strip():
db.collection("tasks").document(row['id']).update({
"comments": firestore.ArrayUnion([{
"user": st.session_state.email,
"comment": new_comment.strip(),
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}])
})
st.rerun()
# Delete Button
with col2:
if st.button("🗑️", key=f"del_{row['id']}"):
delete_task(row['id'])
st.rerun()
# Summary
st.markdown(f"📄 Showing {len(filtered_df)} of {len(df)} tasks")
else:
st.info("🔍 No tasks match the current filters")
else:
st.info("📭 No tasks found. Start by adding new tasks!")
# Edit Tasks
elif menu == "✏️ Edit Tasks":
tasks = get_tasks()
if tasks:
task_options = [f"{t['task']} ({t['project']})" for t in tasks]
selected = st.selectbox("Select Task", task_options)
task = next(t for t in tasks if f"{t['task']} ({t['project']})" == selected)
with st.form(key="edit_form"):
col1, col2 = st.columns(2)
# Left Column
with col1:
new_task = st.text_area("Description", value=task['task'], height=100)
new_type = st.selectbox(
"Type",
["Design", "Procurement", "Construction", "Testing", "Other"],
index=["Design", "Procurement", "Construction", "Testing", "Other"].index(task['type'])
)
# Right Column
with col2:
projects = [p.id for p in db.collection("projects").stream()] + ["Add New Project"]
new_project = st.selectbox(
"Project",
projects,
index=projects.index(task['project']) if task['project'] in projects else 0
)
if new_project == "Add New Project":
new_project = st.text_input("New Project Name")
new_status = st.selectbox(
"Status",
["Pending", "In Progress", "Completed"],
index=["Pending", "In Progress", "Completed"].index(task['status'])
)
new_date = st.date_input(
"Due Date",
value=datetime.strptime(task['date'], "%Y-%m-%d").date()
)
# Recurrence Settings
recurrence = task.get('recurrence', {}).get('type', "None")
if recurrence != "None":
st.subheader("🔄 Recurrence Settings")
new_recurrence = st.selectbox(
"Recurrence Type",
["Daily", "Weekly", "Monthly"],
index=["Daily", "Weekly", "Monthly"].index(recurrence)
)
end_condition = task.get('recurrence', {}).get('end_condition', "End Date")
new_end_condition = st.radio(
"End Condition",
["End Date", "Number of Occurrences"],
index=0 if end_condition == "End Date" else 1,
horizontal=True
)
if new_end_condition == "End Date":
new_end_date = st.date_input(
"Repeat Until",
value=datetime.strptime(task.get('recurrence', {}).get('end_date', str(new_date + relativedelta(days=1))), "%Y-%m-%d").date(),
min_value=new_date + relativedelta(days=1),
help="Recurrence end date (inclusive)"
)
else:
new_num_occurrences = st.number_input(
"Number of Occurrences",
min_value=2,
max_value=365,
value=task.get('recurrence', {}).get('num_occurrences', 5),
help="Total number of task instances"
)
else:
new_recurrence = "None"
new_end_condition = None
new_end_date = None
new_num_occurrences = None
# Attachment Management
st.subheader("📎 Attachments")
attachments = task.get('attachments', [])
if attachments:
for att in attachments:
col1, col2 = st.columns([4,1])
with col1:
st.markdown(f"""
{att['type']} | {att['size'] // 1024} KB
""", unsafe_allow_html=True)
with col2:
if st.button("🗑️", key=f"del_att_{att['name']}"):
# Remove attachment from task
updated_attachments = [a for a in attachments if a['name'] != att['name']]
db.collection("tasks").document(task['id']).update({
"attachments": updated_attachments
})
st.rerun()
# New Attachments
new_attachments = st.file_uploader(
"Add New Attachments",
type=["pdf", "docx", "xlsx", "png", "jpg", "jpeg"],
accept_multiple_files=True,
help="Upload new files (max 10MB each)"
)
# Submit Button
submitted = st.form_submit_button("💾 Save Changes")
if submitted:
try:
# Process new attachments
if new_attachments:
for file in new_attachments:
if file.size > 10 * 1024 * 1024: # 10MB limit
st.error(f"❌ File {file.name} exceeds 10MB limit")
st.stop()
result = cloudinary.uploader.upload(
file,
folder=f"attachments/{st.session_state.user_uid}/",
resource_type="auto"
)
attachments.append({
"name": file.name,
"url": result['secure_url'],
"type": file.type,
"size": file.size
})
# Update task
db.collection("tasks").document(task['id']).update({
"task": new_task,
"type": new_type,
"project": new_project,
"status": new_status,
"date": str(new_date),
"recurrence": {
"type": new_recurrence,
"original_date": str(new_date),
"end_condition": new_end_condition if new_recurrence != "None" else None,
"end_date": str(new_end_date) if new_recurrence != "None" and new_end_condition == "End Date" else None,
"num_occurrences": new_num_occurrences if new_recurrence != "None" and new_end_condition == "Number of Occurrences" else None,
"sequence": task.get('recurrence', {}).get('sequence', 1)
},
"attachments": attachments,
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
})
st.success("✅ Task updated successfully!")
st.rerun()
except cloudinary.exceptions.Error as e:
st.error(f"❌ Cloudinary upload failed: {str(e)}")
except Exception as e:
st.error(f"❌ Update failed: {str(e)}")
else:
st.info("📭 No tasks to edit")
# Project Management (Admin Only)
elif menu == "🏗️ Project Management" and st.session_state.is_admin:
st.subheader("🏗️ Project Management")
projects = [p.id for p in db.collection("projects").stream()]
if projects:
st.write("### Existing Projects")
for project in projects:
col1, col2 = st.columns([4,1])
with col1:
st.write(f"🏢 {project}")
with col2:
if st.button(f"🗑️ {project}", key=f"del_{project}"):
db.collection("projects").document(project).delete()
st.rerun()
with st.form("project_form"):
st.write("### Add New Project")
new_project = st.text_input("Project Name")
if st.form_submit_button("💾 Create Project"):
if new_project:
db.collection("projects").document(new_project).set({
"created_by": st.session_state.email,
"created_at": datetime.now().strftime("%Y-%m-%d")
})
st.rerun()
# Settings
elif menu == "⚙️ Settings":
with st.form("password_form"):
st.subheader("🔒 Change Password")
old_pass = st.text_input("Current Password", type="password")
new_pass = st.text_input("New Password", type="password")
confirm_pass = st.text_input("Confirm Password", type="password")
if st.form_submit_button("🔄 Update Password"):
if new_pass == confirm_pass:
try:
auth.update_user(st.session_state.user_uid, password=new_pass)
st.success("🔑 Password updated!")
except Exception as e:
st.error(f"❌ Error: {str(e)}")
else:
st.error("🔒 Passwords don't match!")
if st.button("🚪 Logout"):
st.session_state.authenticated = False
st.rerun()
if __name__ == "__main__":
main()