File size: 6,848 Bytes
755ac75
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
adc9f0c
 
755ac75
 
 
 
 
adc9f0c
755ac75
bc28645
 
 
755ac75
bc28645
 
 
 
 
 
755ac75
 
 
 
 
 
 
 
 
 
 
 
adc9f0c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
755ac75
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
02f17d0
bc28645
755ac75
bc28645
755ac75
bc28645
 
755ac75
 
bc28645
755ac75
bc28645
 
755ac75
 
 
 
 
bc28645
755ac75
 
 
 
 
 
bc28645
 
755ac75
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
import random
from datetime import datetime, timedelta, date, time
import pandas as pd
import numpy as np
from typing import List, Iterator, Dict, Any, Optional

def generate_random_data(
    date: date,
    start_time: time,
    end_time: time,
    count: int,
    response_time_range: (int, int),
    null_percentage: float
) -> pd.DataFrame:
    start_datetime: datetime = datetime.combine(date, start_time)
    end_datetime: datetime = datetime.combine(date, end_time)

    random_timestamps: List[datetime] = [
        start_datetime + timedelta(seconds=random.randint(0, int((end_datetime - start_datetime).total_seconds())))
        for _ in range(count)
    ]
    random_timestamps.sort()

    random_response_times: List[Optional[int]] = [
        random.randint(response_time_range[0], response_time_range[1]) for _ in range(count)
    ]

    null_count: int = int(null_percentage * count)
    null_indices: List[int] = random.sample(range(count), null_count)
    for idx in null_indices:
        random_response_times[idx] = None

    data: Dict[str, Any] = {
        'Timestamp': random_timestamps,
        'ResponseTime(ms)': random_response_times
    }
    df: pd.DataFrame = pd.DataFrame(data)
    return df

def calculate_percentile(
    df: pd.DataFrame,
    freq: str,
    percentile: float
) -> pd.DataFrame:
    percentile_df: pd.DataFrame = df.groupby(pd.Grouper(key='Timestamp', freq=freq))["ResponseTime(ms)"]\
                                    .quantile(percentile).reset_index(name=f"p{int(percentile * 100)}_ResponseTime(ms)")
    percentile_df.replace(to_replace=np.nan, value=None, inplace=True)
    return percentile_df

def aggregate_data(
    df: pd.DataFrame,
    period_length: str,
) -> pd.DataFrame:
    if df.empty:
        return pd.DataFrame()  # Return an empty DataFrame if input is empty

    aggregation_funcs = {
        'p50': lambda x: np.percentile(x.dropna(), 50) if not x.dropna().empty else np.nan,
        'p95': lambda x: np.percentile(x.dropna(), 95) if not x.dropna().empty else np.nan,
        'p99': lambda x: np.percentile(x.dropna(), 99) if not x.dropna().empty else np.nan,
        'max': lambda x: np.max(x.dropna()) if not x.dropna().empty else np.nan,
        'min': lambda x: np.min(x.dropna()) if not x.dropna().empty else np.nan,
        'average': lambda x: np.mean(x.dropna()) if not x.dropna().empty else np.nan
    }

    summary_df = df.groupby(pd.Grouper(key='Timestamp', freq=period_length)).agg(
        p50=('ResponseTime(ms)', aggregation_funcs['p50']),
        p95=('ResponseTime(ms)', aggregation_funcs['p95']),
        p99=('ResponseTime(ms)', aggregation_funcs['p99']),
        max=('ResponseTime(ms)', aggregation_funcs['max']),
        min=('ResponseTime(ms)', aggregation_funcs['min']),
        average=('ResponseTime(ms)', aggregation_funcs['average']),
    ).reset_index()
    return summary_df

def re_aggregate_data(
    df: pd.DataFrame,
    period_length: str,
) -> pd.DataFrame:
    if df.empty:
        return pd.DataFrame()  # Return an empty DataFrame if input is empty

    aggregation_funcs = {
        'p50': lambda x: np.percentile(x.dropna(), 50) if not x.dropna().empty else np.nan,
        'p95': lambda x: np.percentile(x.dropna(), 95) if not x.dropna().empty else np.nan,
        'p99': lambda x: np.percentile(x.dropna(), 99) if not x.dropna().empty else np.nan,
        'max': lambda x: np.max(x.dropna()) if not x.dropna().empty else np.nan,
        'min': lambda x: np.min(x.dropna()) if not x.dropna().empty else np.nan,
        'average': lambda x: np.mean(x.dropna()) if not x.dropna().empty else np.nan
    }

    summary_df = df.groupby(pd.Grouper(key='Timestamp', freq=period_length)).agg(
        p50=('p50', aggregation_funcs['p50']),
        p95=('p95', aggregation_funcs['p95']),
        p99=('p99', aggregation_funcs['p99']),
        max=('max', aggregation_funcs['max']),
        min=('min', aggregation_funcs['min']),
        average=('average', aggregation_funcs['average']),
    ).reset_index()
    return summary_df


def chunk_list(input_list: List[Any], size: int = 3) -> Iterator[List[Any]]:
    while input_list:
        chunk: List[Any] = input_list[:size]
        yield chunk
        input_list = input_list[size:]

def evaluate_alarm_state(
    summary_df: pd.DataFrame,
    threshold: int,
    datapoints_to_alarm: int,
    evaluation_range: int,
    aggregation_function: str,
    alarm_condition: str
) -> pd.DataFrame:
    data_points: List[Optional[float]] = list(summary_df[aggregation_function].values)

    data_table_dict: Dict[str, List[Any]] = {
        "DataPoints": [],
        "# of data points that must be filled": [],
        "MISSING": [],
        "IGNORE": [],
        "BREACHING": [],
        "NOT BREACHING": []
    }

    def check_condition(value, threshold, condition):
        if condition == '>':
            return value > threshold
        elif condition == '>=':
            return value >= threshold
        elif condition == '<':
            return value < threshold
        elif condition == '<=':
            return value <= threshold

    for chunk in chunk_list(input_list=data_points, size=evaluation_range):
        data_point_repr: str = ''
        num_dp_that_must_be_filled: int = 0

        for dp in chunk:
            if str(dp).lower() == "nan":
                dp_symbol = '⚫️'
            elif check_condition(dp, threshold, alarm_condition):
                dp_symbol = '🔴'
            else:
                dp_symbol = '🟢'
            data_point_repr += dp_symbol

        if len(chunk) < evaluation_range:
            data_point_repr += '⚫️' * (evaluation_range - len(chunk))

        if data_point_repr.count('⚫️') > (evaluation_range - datapoints_to_alarm):
            num_dp_that_must_be_filled = datapoints_to_alarm - sum([data_point_repr.count('🟢'), data_point_repr.count('🔴')])

        data_table_dict["DataPoints"].append(data_point_repr)
        data_table_dict["# of data points that must be filled"].append(num_dp_that_must_be_filled)

        if num_dp_that_must_be_filled > 0:
            data_table_dict["MISSING"].append("INSUFFICIENT_DATA" if data_point_repr.count('⚫️') == evaluation_range else "Retain current state")
            data_table_dict["IGNORE"].append("Retain current state")
            data_table_dict["BREACHING"].append("ALARM")
            data_table_dict["NOT BREACHING"].append("OK")
        else:
            data_table_dict["MISSING"].append("OK")
            data_table_dict["IGNORE"].append("Retain current state")
            data_table_dict["BREACHING"].append("ALARM" if '🔴' * datapoints_to_alarm in data_point_repr else "OK")
            data_table_dict["NOT BREACHING"].append("ALARM" if '🟢' * datapoints_to_alarm not in data_point_repr else "OK")

    return pd.DataFrame(data_table_dict)