Spaces:
Sleeping
Sleeping
# src/tools/quality_tools.py | |
from typing import Dict, List, Optional, Any | |
from typing_extensions import TypedDict # If using TypedDict | |
from langchain_core.tools import tool | |
from datetime import datetime, timedelta | |
from ..utils.logger import setup_logger | |
logger = setup_logger(__name__) | |
class QualityTools: | |
def analyze_patient_satisfaction( | |
self, | |
satisfaction_scores: List[float], | |
feedback_comments: List[str], | |
department: Optional[str] = None | |
) -> Dict: | |
"""Analyze patient satisfaction scores and feedback""" | |
try: | |
analysis = { | |
"metrics": { | |
"average_score": sum(satisfaction_scores) / len(satisfaction_scores), | |
"total_responses": len(satisfaction_scores), | |
"score_distribution": {}, | |
"trend": "stable" | |
}, | |
"feedback_analysis": { | |
"positive_themes": [], | |
"negative_themes": [], | |
"improvement_areas": [] | |
}, | |
"recommendations": [] | |
} | |
# Analyze score distribution | |
for score in satisfaction_scores: | |
category = int(score) | |
analysis["metrics"]["score_distribution"][category] = \ | |
analysis["metrics"]["score_distribution"].get(category, 0) + 1 | |
# Basic sentiment analysis of feedback | |
positive_keywords = ["great", "excellent", "good", "satisfied", "helpful"] | |
negative_keywords = ["poor", "bad", "slow", "unhappy", "dissatisfied"] | |
for comment in feedback_comments: | |
comment_lower = comment.lower() | |
# Analyze positive feedback | |
for keyword in positive_keywords: | |
if keyword in comment_lower: | |
analysis["feedback_analysis"]["positive_themes"].append(keyword) | |
# Analyze negative feedback | |
for keyword in negative_keywords: | |
if keyword in comment_lower: | |
analysis["feedback_analysis"]["negative_themes"].append(keyword) | |
# Generate recommendations | |
if analysis["metrics"]["average_score"] < 7.0: | |
analysis["recommendations"].append("Implement immediate satisfaction improvement plan") | |
return analysis | |
except Exception as e: | |
logger.error(f"Error analyzing patient satisfaction: {str(e)}") | |
raise | |
def monitor_clinical_outcomes( | |
self, | |
outcomes_data: List[Dict], | |
benchmark_metrics: Dict[str, float] | |
) -> Dict: | |
"""Monitor and analyze clinical outcomes against benchmarks""" | |
try: | |
analysis = { | |
"outcome_metrics": {}, | |
"benchmark_comparison": {}, | |
"critical_deviations": [], | |
"success_areas": [] | |
} | |
# Analyze outcomes by category | |
for outcome in outcomes_data: | |
category = outcome["category"] | |
if category not in analysis["outcome_metrics"]: | |
analysis["outcome_metrics"][category] = { | |
"success_rate": 0, | |
"complication_rate": 0, | |
"readmission_rate": 0, | |
"total_cases": 0 | |
} | |
# Update metrics | |
metrics = analysis["outcome_metrics"][category] | |
metrics["total_cases"] += 1 | |
metrics["success_rate"] = (metrics["success_rate"] * (metrics["total_cases"] - 1) + | |
outcome["success"]) / metrics["total_cases"] | |
# Compare with benchmarks | |
if category in benchmark_metrics: | |
benchmark = benchmark_metrics[category] | |
deviation = metrics["success_rate"] - benchmark | |
if deviation < -0.1: # More than 10% below benchmark | |
analysis["critical_deviations"].append({ | |
"category": category, | |
"deviation": deviation, | |
"current_rate": metrics["success_rate"], | |
"benchmark": benchmark | |
}) | |
elif deviation > 0.05: # More than 5% above benchmark | |
analysis["success_areas"].append({ | |
"category": category, | |
"improvement": deviation, | |
"current_rate": metrics["success_rate"] | |
}) | |
return analysis | |
except Exception as e: | |
logger.error(f"Error monitoring clinical outcomes: {str(e)}") | |
raise | |
def track_compliance_metrics( | |
self, | |
compliance_data: List[Dict], | |
audit_period: str | |
) -> Dict: | |
"""Track and analyze compliance with medical standards and regulations""" | |
try: | |
analysis = { | |
"compliance_rate": 0, | |
"violations": [], | |
"risk_areas": [], | |
"audit_summary": { | |
"period": audit_period, | |
"total_checks": len(compliance_data), | |
"passed_checks": 0, | |
"failed_checks": 0 | |
} | |
} | |
# Analyze compliance checks | |
for check in compliance_data: | |
if check["compliant"]: | |
analysis["audit_summary"]["passed_checks"] += 1 | |
else: | |
analysis["audit_summary"]["failed_checks"] += 1 | |
analysis["violations"].append({ | |
"standard": check["standard"], | |
"severity": check["severity"], | |
"date": check["date"] | |
}) | |
# Identify risk areas | |
if check["severity"] == "high" or check.get("repeat_violation", False): | |
analysis["risk_areas"].append({ | |
"area": check["standard"], | |
"risk_level": "high", | |
"recommendations": ["Immediate action required", | |
"Staff training needed"] | |
}) | |
# Calculate overall compliance rate | |
total_checks = analysis["audit_summary"]["total_checks"] | |
if total_checks > 0: | |
analysis["compliance_rate"] = (analysis["audit_summary"]["passed_checks"] / | |
total_checks * 100) | |
return analysis | |
except Exception as e: | |
logger.error(f"Error tracking compliance metrics: {str(e)}") | |
raise# quality_tools implementation | |