Spaces:
Sleeping
Sleeping
from fastapi import APIRouter, HTTPException, Query | |
import pandas as pd | |
from database import supabase | |
from dotenv import load_dotenv | |
from schemas import ( | |
SatisfactionRequest, PerformanceRequest, RetentionRequest, TrainingRequest | |
) | |
from utils.load_models import ( | |
satisfaction_model, performance_model, retention_model, training_model, label_enc | |
) | |
router = APIRouter() | |
# Fetch data from Supabase | |
try: | |
response = supabase.table("HR_analysis").select("*").execute() | |
data = pd.DataFrame(response.data) if response.data else pd.DataFrame() | |
except Exception as e: | |
print(f"Error fetching data: {e}") | |
data = pd.DataFrame() | |
# Convert date columns | |
for col in ['Survey Date', 'StartDate', 'DOB']: | |
if col in data.columns: | |
data[col] = pd.to_datetime(data[col], errors='coerce') | |
# Calculate Age | |
if 'DOB' in data.columns: | |
data['Age'] = (pd.to_datetime("today") - data['DOB']).dt.days // 365 | |
# Clean Performance Score | |
score_map = {"Exceeds": 5, "Fully Meets": 4, "Needs Improvement": 3, "PIP": 2} | |
if 'Performance Score' in data.columns: | |
data['Performance Score'] = data['Performance Score'].map(lambda x: score_map.get(str(x).strip(), None)) | |
data['Performance Score'] = pd.to_numeric(data['Performance Score'], errors='coerce') | |
def satisfaction_analysis(department: str = Query(None, description="Filter by department")): | |
""" | |
Get average satisfaction score for each department. | |
Args: | |
department (str, optional): Filter by department name. | |
Returns: | |
list: A list of average satisfaction scores per department. | |
""" | |
try: | |
if "DepartmentType" not in data.columns or "Satisfaction Score" not in data.columns: | |
raise HTTPException(status_code=500, detail="Required columns missing in dataset") | |
filtered_data = data.copy() | |
if department: | |
department = department.strip().title() | |
filtered_data = filtered_data[filtered_data["DepartmentType"].str.strip().str.title() == department] | |
if filtered_data.empty: | |
return [] | |
result = filtered_data.groupby("DepartmentType")["Satisfaction Score"].mean().reset_index() | |
return result.to_dict(orient="records") | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
def department_performance(): | |
""" | |
Get average performance score and employee rating by department. | |
Returns: | |
list: A list of average scores per department. | |
""" | |
try: | |
result = data.groupby("DepartmentType")[["Performance Score", "Current Employee Rating"]].mean().reset_index() | |
return result.to_dict(orient="records") | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
def training_analytics(program_name: str = Query(None, description="Filter by training program name")): | |
""" | |
Get training program analytics. | |
Args: | |
program_name (str, optional): Filter by training program name. | |
Returns: | |
list: Training completion rates per program. | |
""" | |
try: | |
filtered_data = data if program_name is None else data[data["Training Program Name"] == program_name] | |
if filtered_data.empty: | |
return [] | |
result = filtered_data.groupby("Training Program Name")["Training Outcome"].value_counts(normalize=True).unstack(fill_value=0) | |
return result.reset_index().to_dict(orient="records") | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
def engagement_performance(): | |
""" | |
Get correlation between engagement score and performance score. | |
Returns: | |
dict: Correlation coefficient. | |
""" | |
try: | |
correlation = data[['Engagement Score', 'Performance Score']].corr().iloc[0, 1] | |
return {"correlation_coefficient": correlation} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
def cost_benefit_analysis(): | |
""" | |
Calculate Return on Investment (ROI) for training programs. | |
Returns: | |
list: ROI per department. | |
""" | |
try: | |
result = data.groupby("DepartmentType").apply(lambda x: x['Performance Score'].mean() / x['Training Cost'].sum()).reset_index(name="ROI") | |
return result.to_dict(orient="records") | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
def training_effectiveness(): | |
""" | |
Get average performance score after training. | |
Returns: | |
list: Average performance score per training program. | |
""" | |
try: | |
result = data.groupby("Training Program Name")["Performance Score"].mean().reset_index() | |
return result.to_dict(orient="records") | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
def diversity_dashboard(): | |
""" | |
Get gender diversity breakdown by department. | |
Returns: | |
list: Percentage distribution of genders per department. | |
""" | |
try: | |
diversity_metrics = data.groupby("DepartmentType")["GenderCode"].value_counts(normalize=True).unstack(fill_value=0).reset_index() | |
return diversity_metrics.to_dict(orient="records") | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
def worklife_balance_impact(): | |
""" | |
Get correlation between work-life balance score and performance score. | |
Returns: | |
dict: Correlation coefficient between work-life balance and performance. | |
""" | |
try: | |
if "Work-Life Balance Score" not in data.columns or "Performance Score" not in data.columns: | |
raise HTTPException(status_code=500, detail="Required columns missing in dataset") | |
correlation = data[['Work-Life Balance Score', 'Performance Score']].corr().iloc[0, 1] | |
return {"correlation_coefficient": round(correlation, 3)} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
def career_development(employee_id: str = Query(None, description="Filter by Employee ID")): | |
""" | |
Get career development data. | |
Args: | |
employee_id (str, optional): Filter by employee ID. | |
Returns: | |
list: Career movements per employee. | |
""" | |
try: | |
filtered_data = data if employee_id is None else data[data["Employee ID"] == employee_id] | |
career_progress = filtered_data.groupby("Employee ID")["StartDate"].count().reset_index(name="Career Movements") | |
return career_progress.to_dict(orient="records") | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
# ✅ Prediction Endpoints | |
def predict_satisfaction(data: SatisfactionRequest): | |
""" | |
Predict employee satisfaction score. | |
Args: | |
data (SatisfactionRequest): Satisfaction model inputs. | |
Returns: | |
dict: Predicted satisfaction score. | |
""" | |
try: | |
prediction = satisfaction_model.predict([[data.engagement_score, data.work_life_balance_score, data.performance_score]]) | |
return {'satisfaction_score': prediction[0]} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
def predict_performance(data: PerformanceRequest): | |
""" | |
Predict employee performance score. | |
Args: | |
data (PerformanceRequest): Performance model inputs. | |
Returns: | |
dict: Predicted performance score. | |
""" | |
try: | |
prediction = performance_model.predict([[data.satisfaction_score, data.engagement_score, data.training_duration, data.training_cost]]) | |
return {'performance_score': prediction[0]} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
def predict_retention(data: RetentionRequest): | |
""" | |
Predict employee retention risk. | |
Args: | |
data (RetentionRequest): Retention model inputs. | |
Returns: | |
dict: Predicted retention risk. | |
""" | |
try: | |
prediction = retention_model.predict([[data.satisfaction_score, data.engagement_score, data.performance_score]]) | |
result = label_enc.inverse_transform(prediction) | |
return {'retention_risk': result[0]} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
def predict_training(data: TrainingRequest): | |
""" | |
Predict training success. | |
Args: | |
data (TrainingRequest): Training model inputs. | |
Returns: | |
dict: Predicted training success. | |
""" | |
try: | |
prediction = training_model.predict([[data.training_type, data.training_duration, data.training_cost]]) | |
result = label_enc.inverse_transform(prediction) | |
return {'training_success': result[0]} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |