Jatin Mehra
Add pytest configuration and restructure test files; move tests to core directory and update imports
5b02b7b
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import time
from collections import deque
import logging
class Metrics:
def __init__(self, total_requests=0, successful_requests=0, average_response_time=0.0, uptime=0.0):
"""
Initialize the Metrics class.
Args:
total_requests (int): Total number of requests.
successful_requests (int): Number of successful requests.
average_response_time (float): Average response time.
uptime (float): The total uptime in seconds.
"""
self.total_requests = total_requests
self.successful_requests = successful_requests
self.average_response_time = average_response_time
self.uptime = uptime
self.start_time = time.time()
def to_dict(self):
"""
Convert the metrics to a dictionary format.
"""
return {
"total_requests": self.total_requests,
"successful_requests": self.successful_requests,
"average_response_time": self.average_response_time,
"uptime": self.uptime + (time.time() - self.start_time)
}
@classmethod
def from_dict(cls, metrics_dict):
"""
Create a Metrics instance from a dictionary.
Args:
metrics_dict (dict): A dictionary containing the metrics.
"""
instance = cls(
total_requests=metrics_dict.get("total_requests", 0),
successful_requests=metrics_dict.get("successful_requests", 0),
average_response_time=metrics_dict.get("average_response_time", 0.0),
uptime=metrics_dict.get("uptime", 0.0)
)
return instance
class RateLimiter:
"""
Implements rate limiting using a sliding window approach.
Attributes:
requests_per_minute (int): Maximum allowed requests per minute
requests (deque): Queue storing request timestamps
Example:
>>> limiter = RateLimiter(requests_per_minute=60)
>>> limiter.can_proceed()
True
"""
def __init__(self, requests_per_minute: int = 60):
"""
Initialize rate limiter with requests per minute limit.
Args:
requests_per_minute (int): Maximum allowed requests per minute
"""
self.requests_per_minute = requests_per_minute
self.requests = deque()
def can_proceed(self) -> bool:
"""
Check if a new request can proceed based on rate limits.
Returns:
bool: True if request can proceed, False otherwise
"""
now = time.time()
# Remove requests older than 1 minute
while self.requests and self.requests[0] < now - 60:
self.requests.popleft()
if len(self.requests) < self.requests_per_minute:
self.requests.append(now)
return True
return False
class MetricsCollector:
"""
Collects and manages application metrics and rate limiting.
Combines Metrics and RateLimiter functionality to provide
comprehensive monitoring capabilities.
Example:
>>> collector = MetricsCollector()
>>> collector.record_request(success=True, response_time=0.1, tokens_used=100)
"""
def __init__(self):
"""Initialize metrics collector with default Metrics and RateLimiter."""
self.metrics = Metrics()
self.rate_limiter = RateLimiter()
def record_request(self, success: bool, response_time: float, tokens_used: int):
"""
Record a request and update metrics.
Args:
success (bool): Whether the request was successful
response_time (float): Request response time in seconds
tokens_used (int): Number of tokens used in the request
"""
self.metrics.total_requests += 1
if success:
self.metrics.successful_requests += 1
else:
# Ensure 'failed_requests' attribute is present in the Metrics class
if not hasattr(self.metrics, "failed_requests"):
self.metrics.failed_requests = 0
self.metrics.failed_requests += 1
# Update average response time
self.metrics.average_response_time = (
(self.metrics.average_response_time * (self.metrics.total_requests - 1) + response_time)
/ self.metrics.total_requests
)
# Ensure 'total_tokens_used' attribute is present in the Metrics class
if not hasattr(self.metrics, "total_tokens_used"):
self.metrics.total_tokens_used = 0
self.metrics.total_tokens_used += tokens_used