File size: 6,848 Bytes
755ac75 adc9f0c 755ac75 adc9f0c 755ac75 bc28645 755ac75 bc28645 755ac75 adc9f0c 755ac75 02f17d0 bc28645 755ac75 bc28645 755ac75 bc28645 755ac75 bc28645 755ac75 bc28645 755ac75 bc28645 755ac75 bc28645 755ac75 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
import random
from datetime import datetime, timedelta, date, time
import pandas as pd
import numpy as np
from typing import List, Iterator, Dict, Any, Optional
def generate_random_data(
date: date,
start_time: time,
end_time: time,
count: int,
response_time_range: (int, int),
null_percentage: float
) -> pd.DataFrame:
start_datetime: datetime = datetime.combine(date, start_time)
end_datetime: datetime = datetime.combine(date, end_time)
random_timestamps: List[datetime] = [
start_datetime + timedelta(seconds=random.randint(0, int((end_datetime - start_datetime).total_seconds())))
for _ in range(count)
random_response_times: List[Optional[int]] = [
random.randint(response_time_range[0], response_time_range[1]) for _ in range(count)
null_count: int = int(null_percentage * count)
null_indices: List[int] = random.sample(range(count), null_count)
for idx in null_indices:
random_response_times[idx] = None
data: Dict[str, Any] = {
'Timestamp': random_timestamps,
'ResponseTime(ms)': random_response_times
df: pd.DataFrame = pd.DataFrame(data)
return df
def calculate_percentile(
df: pd.DataFrame,
freq: str,
percentile: float
) -> pd.DataFrame:
percentile_df: pd.DataFrame = df.groupby(pd.Grouper(key='Timestamp', freq=freq))["ResponseTime(ms)"]\
.quantile(percentile).reset_index(name=f"p{int(percentile * 100)}_ResponseTime(ms)")
percentile_df.replace(to_replace=np.nan, value=None, inplace=True)
return percentile_df
def aggregate_data(
df: pd.DataFrame,
period_length: str,
) -> pd.DataFrame:
if df.empty:
return pd.DataFrame() # Return an empty DataFrame if input is empty
aggregation_funcs = {
'p50': lambda x: np.percentile(x.dropna(), 50) if not x.dropna().empty else np.nan,
'p95': lambda x: np.percentile(x.dropna(), 95) if not x.dropna().empty else np.nan,
'p99': lambda x: np.percentile(x.dropna(), 99) if not x.dropna().empty else np.nan,
'max': lambda x: np.max(x.dropna()) if not x.dropna().empty else np.nan,
'min': lambda x: np.min(x.dropna()) if not x.dropna().empty else np.nan,
'average': lambda x: np.mean(x.dropna()) if not x.dropna().empty else np.nan
summary_df = df.groupby(pd.Grouper(key='Timestamp', freq=period_length)).agg(
p50=('ResponseTime(ms)', aggregation_funcs['p50']),
p95=('ResponseTime(ms)', aggregation_funcs['p95']),
p99=('ResponseTime(ms)', aggregation_funcs['p99']),
max=('ResponseTime(ms)', aggregation_funcs['max']),
min=('ResponseTime(ms)', aggregation_funcs['min']),
average=('ResponseTime(ms)', aggregation_funcs['average']),
return summary_df
def re_aggregate_data(
df: pd.DataFrame,
period_length: str,
) -> pd.DataFrame:
if df.empty:
return pd.DataFrame() # Return an empty DataFrame if input is empty
aggregation_funcs = {
'p50': lambda x: np.percentile(x.dropna(), 50) if not x.dropna().empty else np.nan,
'p95': lambda x: np.percentile(x.dropna(), 95) if not x.dropna().empty else np.nan,
'p99': lambda x: np.percentile(x.dropna(), 99) if not x.dropna().empty else np.nan,
'max': lambda x: np.max(x.dropna()) if not x.dropna().empty else np.nan,
'min': lambda x: np.min(x.dropna()) if not x.dropna().empty else np.nan,
'average': lambda x: np.mean(x.dropna()) if not x.dropna().empty else np.nan
summary_df = df.groupby(pd.Grouper(key='Timestamp', freq=period_length)).agg(
p50=('p50', aggregation_funcs['p50']),
p95=('p95', aggregation_funcs['p95']),
p99=('p99', aggregation_funcs['p99']),
max=('max', aggregation_funcs['max']),
min=('min', aggregation_funcs['min']),
average=('average', aggregation_funcs['average']),
return summary_df
def chunk_list(input_list: List[Any], size: int = 3) -> Iterator[List[Any]]:
while input_list:
chunk: List[Any] = input_list[:size]
yield chunk
input_list = input_list[size:]
def evaluate_alarm_state(
summary_df: pd.DataFrame,
threshold: int,
datapoints_to_alarm: int,
evaluation_range: int,
aggregation_function: str,
alarm_condition: str
) -> pd.DataFrame:
data_points: List[Optional[float]] = list(summary_df[aggregation_function].values)
data_table_dict: Dict[str, List[Any]] = {
"DataPoints": [],
"# of data points that must be filled": [],
"MISSING": [],
"IGNORE": [],
def check_condition(value, threshold, condition):
if condition == '>':
return value > threshold
elif condition == '>=':
return value >= threshold
elif condition == '<':
return value < threshold
elif condition == '<=':
return value <= threshold
for chunk in chunk_list(input_list=data_points, size=evaluation_range):
data_point_repr: str = ''
num_dp_that_must_be_filled: int = 0
for dp in chunk:
if str(dp).lower() == "nan":
dp_symbol = '⚫️'
elif check_condition(dp, threshold, alarm_condition):
dp_symbol = '🔴'
dp_symbol = '🟢'
data_point_repr += dp_symbol
if len(chunk) < evaluation_range:
data_point_repr += '⚫️' * (evaluation_range - len(chunk))
if data_point_repr.count('⚫️') > (evaluation_range - datapoints_to_alarm):
num_dp_that_must_be_filled = datapoints_to_alarm - sum([data_point_repr.count('🟢'), data_point_repr.count('🔴')])
data_table_dict["# of data points that must be filled"].append(num_dp_that_must_be_filled)
if num_dp_that_must_be_filled > 0:
data_table_dict["MISSING"].append("INSUFFICIENT_DATA" if data_point_repr.count('⚫️') == evaluation_range else "Retain current state")
data_table_dict["IGNORE"].append("Retain current state")
data_table_dict["NOT BREACHING"].append("OK")
data_table_dict["IGNORE"].append("Retain current state")
data_table_dict["BREACHING"].append("ALARM" if '🔴' * datapoints_to_alarm in data_point_repr else "OK")
data_table_dict["NOT BREACHING"].append("ALARM" if '🟢' * datapoints_to_alarm not in data_point_repr else "OK")
return pd.DataFrame(data_table_dict)