import streamlit as st from datetime import datetime import pandas as pd import plotly.express as px import firebase_admin from firebase_admin import credentials, firestore, auth, exceptions from dateutil.relativedelta import relativedelta import os import cloudinary import cloudinary.uploader from dotenv import load_dotenv # Configure page first st.set_page_config( page_title="DAILY TASK MANAGEMENT AND MONITORING", layout="wide", page_icon="📋", menu_items={ 'Get Help': 'https://github.com', 'Report a bug': "https://github.com", 'About': "# Task Management System v4.2" } ) # Custom CSS for enhanced UI st.markdown(""" """, unsafe_allow_html=True) def initialize_firebase(): if not firebase_admin._apps: cred = credentials.Certificate("firebase_credentials.json") firebase_admin.initialize_app(cred) return firestore.client(), auth # Load environment variables load_dotenv() # Configure Cloudinary cloudinary.config( cloud_name=os.getenv("CLOUDINARY_CLOUD_NAME"), api_key=os.getenv("CLOUDINARY_API_KEY"), api_secret=os.getenv("CLOUDINARY_API_SECRET") ) def create_default_admin(auth, db): try: admin_email = "admin@example.com" try: admin_user = auth.get_user_by_email(admin_email) admin_ref = db.collection("users").document(admin_user.uid) if not admin_ref.get().exists: admin_ref.set({ "email": admin_email, "is_admin": True, "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S") }) except auth.UserNotFoundError: admin_user = auth.create_user( email=admin_email, password="Temp@2025", display_name="Admin" ) auth.set_custom_user_claims(admin_user.uid, {'admin': True}) db.collection("users").document(admin_user.uid).set({ "email": admin_email, "is_admin": True, "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S") }) except Exception as e: st.error(f"Admin setup error: {str(e)}") def main(): # Initialize Firebase db, firebase_auth = initialize_firebase() create_default_admin(firebase_auth, db) # Session State Management if "authenticated" not in st.session_state: st.session_state.update({ "authenticated": False, "email": "", "is_admin": False, "first_login": True, "user_uid": "" }) # Authentication Flow if not st.session_state.authenticated: st.markdown("

📋 DAILY TASK MANAGEMENT AND MONITORING

Daily Task Monitoring & Productivity Tracking

", unsafe_allow_html=True) col1, col2 = st.columns(2) with col1: with st.expander("🔑 Sign In", expanded=True): with st.form("signin_form"): email = st.text_input("📧 Email") password = st.text_input("🔒 Password", type="password") if st.form_submit_button("🚀 Login"): try: user = firebase_auth.get_user_by_email(email) user_doc = db.collection("users").document(user.uid).get() is_admin = user.custom_claims.get('admin', False) if user.custom_claims else False if user_doc.exists: is_admin = user_doc.to_dict().get('is_admin', is_admin) st.session_state.update({ "authenticated": True, "email": user.email, "user_uid": user.uid, "is_admin": is_admin }) st.rerun() except Exception as e: st.error(f"🔐 Authentication failed: {str(e)}") with col2: with st.expander("📝 Sign Up", expanded=True): with st.form("signup_form"): new_email = st.text_input("📨 New Email") new_password = st.text_input("🔏 New Password", type="password") if st.form_submit_button("🌟 Register"): try: user = firebase_auth.create_user(email=new_email, password=new_password) db.collection("users").document(user.uid).set({ "email": new_email, "is_admin": False, "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S") }) st.success("🎉 Registration successful! Please sign in.") except Exception as e: st.error(f"⚠️ Registration failed: {str(e)}") return # Main Application Interface st.markdown(f"""

📋 DAILY TASK MANAGEMENT AND MONITORING

Welcome back, {st.session_state.email} {"👑" if st.session_state.is_admin else ""}

""", unsafe_allow_html=True) # Core Functions def get_tasks(): try: tasks = db.collection("tasks").where("user", "==", st.session_state.email).stream() return [{"id": task.id, **task.to_dict()} for task in tasks] except Exception as e: st.error(f"📦 Error loading tasks: {str(e)}") return [] def delete_task(task_id): try: db.collection("tasks").document(task_id).delete() st.success("🗑️ Task deleted successfully!") st.rerun() except Exception as e: st.error(f"❌ Delete failed: {str(e)}") # Reminders System if st.session_state.first_login: with st.expander("🔔 Task Reminders", expanded=True): tasks = get_tasks() overdue = [] due_soon = [] for task in tasks: if task['status'] != "Completed": task_date = datetime.strptime(task['date'], "%Y-%m-%d").date() delta = (task_date - datetime.today().date()).days if delta < 0: overdue.append(task) elif 0 <= delta <= 3: due_soon.append(task) if overdue: st.error("##### ⚠️ Overdue Tasks") for task in overdue: st.markdown(f"**{task['task']}** ({task['project']}) - Due {task['date']}") if due_soon: st.warning("##### ⏳ Upcoming Deadlines") for task in due_soon: st.markdown(f"**{task['task']}** ({task['project']}) - Due {task['date']}") if not overdue and not due_soon: st.info("🌟 All tasks are up to date!") st.session_state.first_login = False # Navigation Menu menu_items = ["🏠 Dashboard", "📥 Task Entry", "👀 Task Explorer", "✏️ Edit Tasks", "⚙️ Settings"] if st.session_state.is_admin: menu_items.insert(3, "🏗️ Project Management") with st.sidebar: st.title("🔍 Navigation") menu = st.radio("", menu_items, label_visibility="collapsed") # Dashboard View if menu == "🏠 Dashboard": tasks = get_tasks() if tasks: df = pd.DataFrame(tasks) col1, col2, col3, col4 = st.columns(4) col1.metric("📌 Total Tasks", len(df)) col2.metric("✅ Completed", len(df[df['status'] == "Completed"])) col3.metric("⏳ In Progress", len(df[df['status'] == "In Progress"])) col4.metric("📭 Pending", len(df[df['status'] == "Pending"])) with st.container(border=True): st.subheader("📊 Task Status Distribution") fig = px.pie(df, names='status', hole=0.4, color_discrete_sequence=px.colors.qualitative.Pastel1) st.plotly_chart(fig, use_container_width=True) # New Timeline Visualization with st.container(border=True): st.subheader("⏳ Task Timeline") timeline_df = df.copy() # Convert date and create timeline visualization timeline_df['date'] = pd.to_datetime(timeline_df['date']) timeline_df['Start'] = timeline_df['date'] - pd.DateOffset(days=1) # Fake start date for visualization timeline_df['Finish'] = timeline_df['date'] # Create Gantt-style timeline fig = px.timeline( timeline_df, x_start="Start", x_end="Finish", y="task", color="status", color_discrete_map={ "Pending": "#FFE4B5", "In Progress": "#87CEEB", "Completed": "#98FB98" }, title="Task Schedule Overview", labels={"task": "Task", "date": "Due Date"}, hover_data=["project", "type"] ) # Customize layout fig.update_yaxes(autorange="reversed", title_text="Tasks") fig.update_xaxes(title_text="Timeline") fig.update_layout( height=500, showlegend=True, hovermode="closest", xaxis=dict(showgrid=True, tickformat="%b %d\n%Y"), margin=dict(l=0, r=0, t=40, b=20) ) # Add custom hover template fig.update_traces( hovertemplate="%{y}
" "Project: %{customdata[0]}
" "Type: %{customdata[1]}
" "Due Date: %{x|%b %d, %Y}" ) st.plotly_chart(fig, use_container_width=True) else: st.info("📭 No tasks found. Start by adding new tasks!") # Task Entry elif menu == "📥 Task Entry": # Step 1: Task Type Selection st.subheader("➕ Add New Task") task_type_selection = st.radio( "Select Task Type", ["One-Time Task", "Recurring Task"], horizontal=True ) # Step 2: Task Details Form with st.form(key="task_form", clear_on_submit=True): col1, col2 = st.columns(2) # Left Column with col1: task = st.text_area("📝 Task Description", height=100) task_type = st.selectbox( "📦 Task Type", ["Design", "Procurement", "Construction", "Testing", "Other"] ) # Right Column with col2: projects = [p.id for p in db.collection("projects").stream()] + ["Add New Project"] project = st.selectbox("🏗️ Project", projects) if project == "Add New Project": project = st.text_input("✨ New Project Name") status = st.selectbox("📌 Status", ["Pending", "In Progress", "Completed"]) date = st.date_input("📅 Due Date", min_value=datetime.today()) # Task Assignment user_emails = ["None"] + [user.to_dict().get('email') for user in db.collection("users").stream()] assigned_to = st.selectbox( "👤 Assign To", options=user_emails, help="Assign this task to a team member" ) # Recurrence Settings (only for recurring tasks) if task_type_selection == "Recurring Task": recurrence = st.selectbox("🔄 Repeat", ["Daily", "Weekly", "Monthly"]) end_condition = st.radio( "End Condition", ["End Date", "Number of Occurrences"], horizontal=True ) if end_condition == "End Date": end_date = st.date_input( "Repeat Until", min_value=date + relativedelta(days=1), help="Recurrence end date (inclusive)" ) else: num_occurrences = st.number_input( "Number of Occurrences", min_value=2, max_value=365, value=5, help="Total number of task instances" ) else: recurrence = "None" end_condition = None end_date = None num_occurrences = None # File Upload Section uploaded_files = st.file_uploader( "📎 Attachments", type=["pdf", "docx", "xlsx", "png", "jpg", "jpeg"], accept_multiple_files=True, help="Upload relevant files (max 10MB each)" ) # Submit Button submitted = st.form_submit_button("💾 Save Task", use_container_width=True) if submitted: if not task.strip(): st.error("❌ Task description cannot be empty!") st.stop() try: # Validate recurrence (if applicable) if task_type_selection == "Recurring Task": if end_condition == "End Date" and (not end_date or end_date <= date): st.error("❌ End date must be after initial due date") st.stop() elif end_condition == "Number of Occurrences" and num_occurrences < 2: st.error("❌ Number of occurrences must be at least 2") st.stop() # Process attachments attachments = [] if uploaded_files: for file in uploaded_files: if file.size > 10 * 1024 * 1024: # 10MB limit st.error(f"❌ File {file.name} exceeds 10MB limit") st.stop() # Upload to Cloudinary result = cloudinary.uploader.upload( file, folder=f"attachments/{st.session_state.user_uid}/", resource_type="auto" ) attachments.append({ "name": file.name, "url": result['secure_url'], "type": file.type, "size": file.size }) # Generate recurring dates (if applicable) dates = [date] if task_type_selection == "Recurring Task": current_date = date if end_condition == "End Date": while current_date < end_date: if recurrence == "Daily": current_date += relativedelta(days=1) elif recurrence == "Weekly": current_date += relativedelta(weeks=1) elif recurrence == "Monthly": current_date += relativedelta(months=1) if current_date <= end_date: dates.append(current_date) else: for _ in range(num_occurrences - 1): if recurrence == "Daily": current_date += relativedelta(days=1) elif recurrence == "Weekly": current_date += relativedelta(weeks=1) elif recurrence == "Monthly": current_date += relativedelta(months=1) dates.append(current_date) # Batch write to Firestore batch = db.batch() for idx, task_date in enumerate(dates): task_ref = db.collection("tasks").document() batch.set(task_ref, { "user": st.session_state.email, "task": task.strip(), "type": task_type, "project": project, "status": status, "date": str(task_date), "assigned_to": assigned_to if assigned_to != "None" else None, "recurrence": { "type": recurrence if task_type_selection == "Recurring Task" else "None", "original_date": str(date), "end_condition": end_condition if task_type_selection == "Recurring Task" else None, "end_date": str(end_date) if task_type_selection == "Recurring Task" and end_condition == "End Date" else None, "num_occurrences": num_occurrences if task_type_selection == "Recurring Task" and end_condition == "Number of Occurrences" else None, "sequence": idx + 1 }, "attachments": attachments, "comments": [], # Initialize comments as empty list "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S") }) batch.commit() st.success(f"✅ Created {len(dates)} {'task' if len(dates) == 1 else 'tasks'} with {len(attachments)} attachments!") st.rerun() except cloudinary.exceptions.Error as e: st.error(f"❌ Cloudinary upload failed: {str(e)}") except Exception as e: st.error(f"❌ Error: {str(e)}") # Task Explorer (Updated Date Filter) elif menu == "👀 Task Explorer": tasks = get_tasks() if tasks: df = pd.DataFrame(tasks) # Ensure 'attachments', 'recurrence', and 'comments' columns exist if 'attachments' not in df.columns: df['attachments'] = [[] for _ in range(len(df))] if 'recurrence' not in df.columns: df['recurrence'] = [{} for _ in range(len(df))] if 'comments' not in df.columns: df['comments'] = [[] for _ in range(len(df))] # Convert NaN values to empty dictionaries df['recurrence'] = df['recurrence'].apply(lambda x: x if isinstance(x, dict) else {}) # Filters col1, col2, col3 = st.columns(3) with col1: status_filter = st.multiselect( "📌 Filter by Status", options=df['status'].unique().tolist(), default=df['status'].unique().tolist() ) with col2: project_filter = st.multiselect( "🏗️ Filter by Project", options=df['project'].unique().tolist(), default=df['project'].unique().tolist() ) with col3: enable_date_filter = st.checkbox("📅 Filter by Date") date_filter = None if enable_date_filter: date_filter = st.date_input("Select Date") # Apply filters filtered_df = df[ (df['status'].isin(status_filter)) & (df['project'].isin(project_filter)) ] if enable_date_filter and date_filter: filtered_df = filtered_df[ filtered_df['date'] == date_filter.strftime("%Y-%m-%d") ] # Display results if not filtered_df.empty: for _, row in filtered_df.iterrows(): with st.container(border=True): col1, col2 = st.columns([5,1]) # Task Details with col1: st.markdown(f"""

{row['task']}

{row['status']} 📅 {row['date']} 🏗️ {row['project']} {f"👤 {row['assigned_to']}" if row.get('assigned_to') else ""}
""", unsafe_allow_html=True) # Recurrence Details recurrence = row.get('recurrence', {}) # Safely get recurrence if recurrence.get('type') != "None": with st.expander("🔁 Recurrence Details"): st.write(f"**Type:** {recurrence.get('type', 'None')}") st.write(f"**Start Date:** {recurrence.get('original_date', 'N/A')}") if recurrence.get('end_date'): st.write(f"**End Date:** {recurrence['end_date']}") else: st.write(f"**Occurrences:** {recurrence.get('num_occurrences', 'N/A')}") # Attachments attachments = row.get('attachments', []) # Safely get attachments if attachments: with st.expander(f"📎 Attachments ({len(attachments) if isinstance(attachments, list) else 0})"): for att in (attachments if isinstance(attachments, list) else []): st.markdown(f"""
📄 {att['name']} 🔗 View
{att['type']} | {att['size'] // 1024} KB
""", unsafe_allow_html=True) # Comments comments = row.get('comments', []) # Safely get comments with st.expander(f"💬 Comments ({len(comments) if isinstance(comments, list) else 0})"): for comment in (comments if isinstance(comments, list) else []): st.markdown(f"""
{comment.get('user', 'Unknown')} {comment.get('timestamp', 'N/A')}

{comment.get('comment', '')}

""", unsafe_allow_html=True) # Add new comment new_comment = st.text_area("Add a comment", key=f"comment_{row['id']}") if st.button("💬 Post Comment", key=f"post_{row['id']}"): if new_comment.strip(): db.collection("tasks").document(row['id']).update({ "comments": firestore.ArrayUnion([{ "user": st.session_state.email, "comment": new_comment.strip(), "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") }]) }) st.rerun() # Delete Button with col2: if st.button("🗑️", key=f"del_{row['id']}"): delete_task(row['id']) st.rerun() # Summary st.markdown(f"📄 Showing {len(filtered_df)} of {len(df)} tasks") else: st.info("🔍 No tasks match the current filters") else: st.info("📭 No tasks found. Start by adding new tasks!") # Edit Tasks elif menu == "✏️ Edit Tasks": tasks = get_tasks() if tasks: task_options = [f"{t['task']} ({t['project']})" for t in tasks] selected = st.selectbox("Select Task", task_options) task = next(t for t in tasks if f"{t['task']} ({t['project']})" == selected) with st.form(key="edit_form"): col1, col2 = st.columns(2) # Left Column with col1: new_task = st.text_area("Description", value=task['task'], height=100) new_type = st.selectbox( "Type", ["Design", "Procurement", "Construction", "Testing", "Other"], index=["Design", "Procurement", "Construction", "Testing", "Other"].index(task['type']) ) # Right Column with col2: projects = [p.id for p in db.collection("projects").stream()] + ["Add New Project"] new_project = st.selectbox( "Project", projects, index=projects.index(task['project']) if task['project'] in projects else 0 ) if new_project == "Add New Project": new_project = st.text_input("New Project Name") new_status = st.selectbox( "Status", ["Pending", "In Progress", "Completed"], index=["Pending", "In Progress", "Completed"].index(task['status']) ) new_date = st.date_input( "Due Date", value=datetime.strptime(task['date'], "%Y-%m-%d").date() ) # Recurrence Settings recurrence = task.get('recurrence', {}).get('type', "None") if recurrence != "None": st.subheader("🔄 Recurrence Settings") new_recurrence = st.selectbox( "Recurrence Type", ["Daily", "Weekly", "Monthly"], index=["Daily", "Weekly", "Monthly"].index(recurrence) ) end_condition = task.get('recurrence', {}).get('end_condition', "End Date") new_end_condition = st.radio( "End Condition", ["End Date", "Number of Occurrences"], index=0 if end_condition == "End Date" else 1, horizontal=True ) if new_end_condition == "End Date": new_end_date = st.date_input( "Repeat Until", value=datetime.strptime(task.get('recurrence', {}).get('end_date', str(new_date + relativedelta(days=1))), "%Y-%m-%d").date(), min_value=new_date + relativedelta(days=1), help="Recurrence end date (inclusive)" ) else: new_num_occurrences = st.number_input( "Number of Occurrences", min_value=2, max_value=365, value=task.get('recurrence', {}).get('num_occurrences', 5), help="Total number of task instances" ) else: new_recurrence = "None" new_end_condition = None new_end_date = None new_num_occurrences = None # Attachment Management st.subheader("📎 Attachments") attachments = task.get('attachments', []) if attachments: for att in attachments: col1, col2 = st.columns([4,1]) with col1: st.markdown(f"""
📄 {att['name']} 🔗 View
{att['type']} | {att['size'] // 1024} KB
""", unsafe_allow_html=True) with col2: if st.button("🗑️", key=f"del_att_{att['name']}"): # Remove attachment from task updated_attachments = [a for a in attachments if a['name'] != att['name']] db.collection("tasks").document(task['id']).update({ "attachments": updated_attachments }) st.rerun() # New Attachments new_attachments = st.file_uploader( "Add New Attachments", type=["pdf", "docx", "xlsx", "png", "jpg", "jpeg"], accept_multiple_files=True, help="Upload new files (max 10MB each)" ) # Submit Button submitted = st.form_submit_button("💾 Save Changes") if submitted: try: # Process new attachments if new_attachments: for file in new_attachments: if file.size > 10 * 1024 * 1024: # 10MB limit st.error(f"❌ File {file.name} exceeds 10MB limit") st.stop() result = cloudinary.uploader.upload( file, folder=f"attachments/{st.session_state.user_uid}/", resource_type="auto" ) attachments.append({ "name": file.name, "url": result['secure_url'], "type": file.type, "size": file.size }) # Update task db.collection("tasks").document(task['id']).update({ "task": new_task, "type": new_type, "project": new_project, "status": new_status, "date": str(new_date), "recurrence": { "type": new_recurrence, "original_date": str(new_date), "end_condition": new_end_condition if new_recurrence != "None" else None, "end_date": str(new_end_date) if new_recurrence != "None" and new_end_condition == "End Date" else None, "num_occurrences": new_num_occurrences if new_recurrence != "None" and new_end_condition == "Number of Occurrences" else None, "sequence": task.get('recurrence', {}).get('sequence', 1) }, "attachments": attachments, "updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S") }) st.success("✅ Task updated successfully!") st.rerun() except cloudinary.exceptions.Error as e: st.error(f"❌ Cloudinary upload failed: {str(e)}") except Exception as e: st.error(f"❌ Update failed: {str(e)}") else: st.info("📭 No tasks to edit") # Project Management (Admin Only) elif menu == "🏗️ Project Management" and st.session_state.is_admin: st.subheader("🏗️ Project Management") projects = [p.id for p in db.collection("projects").stream()] if projects: st.write("### Existing Projects") for project in projects: col1, col2 = st.columns([4,1]) with col1: st.write(f"🏢 {project}") with col2: if st.button(f"🗑️ {project}", key=f"del_{project}"): db.collection("projects").document(project).delete() st.rerun() with st.form("project_form"): st.write("### Add New Project") new_project = st.text_input("Project Name") if st.form_submit_button("💾 Create Project"): if new_project: db.collection("projects").document(new_project).set({ "created_by": st.session_state.email, "created_at": datetime.now().strftime("%Y-%m-%d") }) st.rerun() # Settings elif menu == "⚙️ Settings": with st.form("password_form"): st.subheader("🔒 Change Password") old_pass = st.text_input("Current Password", type="password") new_pass = st.text_input("New Password", type="password") confirm_pass = st.text_input("Confirm Password", type="password") if st.form_submit_button("🔄 Update Password"): if new_pass == confirm_pass: try: auth.update_user(st.session_state.user_uid, password=new_pass) st.success("🔑 Password updated!") except Exception as e: st.error(f"❌ Error: {str(e)}") else: st.error("🔒 Passwords don't match!") if st.button("🚪 Logout"): st.session_state.authenticated = False st.rerun() if __name__ == "__main__": main()