Spaces:
Sleeping
Sleeping
import streamlit as st | |
import pandas as pd | |
from datetime import time, date | |
from utils import generate_random_data, evaluate_alarm_state, aggregate_data, re_aggregate_data | |
from textwrap import dedent | |
from matplotlib import pyplot as plt | |
# Constants | |
HARD_CODED_DATE = date(2024, 7, 26) | |
def main(): | |
st.title("Streamlit App for Data Generation and Analysis") | |
# Initialize session state | |
initialize_session_state() | |
# Section 1 - Generate random data | |
st.header("Section 1 - Generate Random Data") | |
generate_data_form() | |
if not st.session_state.df.empty: | |
display_dataframe("Raw Event Data", st.session_state.df) | |
st.line_chart(st.session_state.df.set_index("Timestamp")) | |
# Section 2 - Calculate Aggregations | |
st.header("Section 2 - Calculate Aggregations") | |
aggregation_form() | |
if not st.session_state.aggregated_df.empty: | |
display_dataframe("Aggregated Summary Data (Storage)", st.session_state.aggregated_df) | |
aggregation_function_input__storage = st.selectbox( | |
"Aggregation Function (Storage)", | |
['p50', 'p95', 'p99', 'max', 'min', 'average'], | |
key='aggregation_function_input__storage', | |
help="Select the aggregation function for visualizing the data." | |
) | |
st.line_chart(st.session_state.aggregated_df.set_index("Timestamp")[aggregation_function_input__storage]) | |
# Section 3 - Summary Data Aggregated by Period | |
st.header("Section 3 - Summary Data Aggregated by Period") | |
summary_by_period_form() | |
if not st.session_state.summary_by_period_df.empty: | |
display_dataframe("Summary Data Aggregated by Period (for Alarm)", st.session_state.summary_by_period_df) | |
aggregation_function_input__alarm = st.selectbox( | |
"Aggregation Function (Alarm)", | |
['p50', 'p95', 'p99', 'max', 'min', 'average'], | |
key='aggregation_function_input__alarm', | |
help="Select the aggregation function for visualizing the data." | |
) | |
st.line_chart(st.session_state.summary_by_period_df.set_index("Timestamp")[aggregation_function_input__alarm]) | |
# Section 4 - Evaluate Alarm State | |
st.header("Section 4 - Evaluate Alarm State") | |
alarm_state_form() | |
if not st.session_state.alarm_state_df.empty: | |
plot_time_series(st.session_state.summary_by_period_df, st.session_state.threshold_input, st.session_state.alarm_condition_input, st.session_state.evaluation_range_input) | |
display_alarm_state_evaluation(st.session_state.alarm_state_df) | |
display_key_tables() | |
def initialize_session_state() -> None: | |
if 'df' not in st.session_state: | |
st.session_state.df = pd.DataFrame() | |
if 'aggregated_df' not in st.session_state: | |
st.session_state.aggregated_df = pd.DataFrame() | |
if 'summary_by_period_df' not in st.session_state: | |
st.session_state.summary_by_period_df = pd.DataFrame() | |
if 'alarm_state_df' not in st.session_state: | |
st.session_state.alarm_state_df = pd.DataFrame() | |
def generate_data_form() -> None: | |
with st.form(key='generate_data_form'): | |
start_time_input = st.time_input("Start Time", time(12, 0), help="Select the start time for generating random data.") | |
end_time_input = st.time_input("End Time", time(12, 30), help="Select the end time for generating random data.") | |
count_input = st.slider("Count", min_value=1, max_value=200, value=60, help="Specify the number of data points to generate.") | |
response_time_range_input = st.slider("Response Time Range (ms)", min_value=50, max_value=300, value=(100, 250), help="Select the range of response times in milliseconds.") | |
null_percentage_input = st.slider("Null Percentage", min_value=0.0, max_value=1.0, value=0.5, help="Select the percentage of null values in the generated data.") | |
submit_button = st.form_submit_button(label='Generate Data') | |
if submit_button: | |
st.session_state.df = generate_random_data( | |
date=HARD_CODED_DATE, | |
start_time=start_time_input, | |
end_time=end_time_input, | |
count=count_input, | |
response_time_range=response_time_range_input, | |
null_percentage=null_percentage_input | |
) | |
def aggregation_form() -> None: | |
freq_input = st.selectbox("Period (bin)", ['1min', '5min', '15min'], key='freq_input', help="Select the frequency for aggregating the data.") | |
if not st.session_state.df.empty: | |
st.session_state.aggregated_df = aggregate_data(st.session_state.df, freq_input) | |
def summary_by_period_form() -> None: | |
period_length_input = st.selectbox("Period Length", ['1min', '5min', '15min'], key='period_length_input', help="Select the period length for aggregating the summary data.") | |
if not st.session_state.aggregated_df.empty: | |
st.session_state.summary_by_period_df = re_aggregate_data(st.session_state.aggregated_df, period_length_input) | |
def alarm_state_form() -> None: | |
threshold_input = st.slider("Threshold (ms)", min_value=50, max_value=300, value=150, key='threshold_input', help="Specify the threshold value for evaluating the alarm state.") | |
datapoints_to_alarm_input = st.number_input("Datapoints to Alarm", min_value=1, value=3, key='datapoints_to_alarm_input', help="Specify the number of data points required to trigger an alarm.") | |
evaluation_range_input = st.number_input("Evaluation Range", min_value=1, value=5, key='evaluation_range_input', help="Specify the range of data points to evaluate for alarm state.") | |
alarm_condition_input = st.selectbox( | |
"Alarm Condition", | |
['>', '>=', '<', '<='], | |
key='alarm_condition_input', | |
help="Select the condition for evaluating the alarm state." | |
) | |
if not st.session_state.summary_by_period_df.empty: | |
st.session_state.alarm_state_df = evaluate_alarm_state( | |
summary_df=st.session_state.summary_by_period_df, | |
threshold=threshold_input, | |
datapoints_to_alarm=datapoints_to_alarm_input, | |
evaluation_range=evaluation_range_input, | |
aggregation_function=st.session_state.aggregation_function_input__alarm, | |
alarm_condition=alarm_condition_input | |
) | |
def display_dataframe(title: str, df: pd.DataFrame) -> None: | |
st.write(title) | |
st.dataframe(df) | |
def plot_time_series(df: pd.DataFrame, threshold: int, alarm_condition: str, evaluation_range: int) -> None: | |
timestamps = df['Timestamp'] | |
response_times = df[st.session_state.aggregation_function_input__alarm] | |
segments = [] | |
current_segment = {'timestamps': [], 'values': []} | |
for timestamp, value in zip(timestamps, response_times): | |
if pd.isna(value): | |
if current_segment['timestamps']: | |
segments.append(current_segment) | |
current_segment = {'timestamps': [], 'values': []} | |
else: | |
current_segment['timestamps'].append(timestamp) | |
current_segment['values'].append(value) | |
if current_segment['timestamps']: | |
segments.append(current_segment) | |
fig, ax1 = plt.subplots() | |
color = 'tab:blue' | |
ax1.set_xlabel('Timestamp') | |
ax1.set_ylabel('Response Time (ms)', color=color) | |
for segment in segments: | |
ax1.plot(segment['timestamps'], segment['values'], color=color, linewidth=0.5) | |
ax1.scatter(segment['timestamps'], segment['values'], color=color, s=10) | |
line_style = '--' if alarm_condition in ['<', '>'] else '-' | |
ax1.axhline(y=threshold, color='r', linestyle=line_style, linewidth=0.8, label='Threshold') | |
ax1.tick_params(axis='y', labelcolor=color) | |
if alarm_condition in ['<=', '<']: | |
ax1.fill_between(timestamps, 0, threshold, color='pink', alpha=0.3) | |
else: | |
ax1.fill_between(timestamps, threshold, response_times.max(), color='pink', alpha=0.3) | |
period_indices = range(len(df)) | |
ax2 = ax1.twiny() | |
ax2.set_xticks(period_indices) | |
ax2.set_xticklabels(period_indices, fontsize=8) | |
ax2.set_xlabel('Time Periods', fontsize=8) | |
ax2.xaxis.set_tick_params(width=0.5) | |
for idx in period_indices: | |
if idx % evaluation_range == 0: | |
ax1.axvline(x=df['Timestamp'].iloc[idx], color='green', linestyle='-', alpha=0.3) | |
max_value = max(filter(lambda x: x is not None, df[st.session_state.aggregation_function_input__alarm])) | |
ax1.text(df['Timestamp'].iloc[idx], max_value * 0.95, f"[{idx // evaluation_range}]", rotation=90, verticalalignment='bottom', color='grey', alpha=0.7, fontsize=8) | |
else: | |
ax1.axvline(x=df['Timestamp'].iloc[idx], color='grey', linestyle='--', alpha=0.3) | |
ax1.annotate('Alarm threshold', xy=(0.98, threshold), xycoords=('axes fraction', 'data'), ha='right', va='bottom', fontsize=8, color='red', backgroundcolor='none') | |
fig.tight_layout() | |
st.pyplot(fig) | |
def display_alarm_state_evaluation(df: pd.DataFrame) -> None: | |
st.write("Alarm State Evaluation") | |
st.dataframe(df) | |
def display_key_tables() -> None: | |
st.write("### Key") | |
# Symbols | |
st.write("#### Symbols") | |
symbol_data = { | |
"Symbol": ["🔴", "⚫️", "🟢"], | |
"Meaning": [ | |
"Breaching data point: This data point exceeds the threshold.", | |
"Missing data point: This data point is missing or not reported.", | |
"Non-breaching data point: This data point is within the threshold." | |
] | |
} | |
symbol_df = pd.DataFrame(symbol_data) | |
st.table(symbol_df) | |
# Columns | |
st.write(dedent(""" #### Columns: Strategies for handling missing data points [docs](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/AlarmThatSendsEmail.html#alarms-and-missing-data) | |
Sometimes, no metric events may have been reported during a given time period. In this case, | |
you must decide how you will treat missing data points. Ignore it? Or consider it a failure. | |
Here are the 4 supported strategies in AWS: | |
""")) | |
column_data = { | |
"Strategy": ["missing", "ignore", "breaching", "notBreaching"], | |
"Explanation": [ | |
"If all data points in the alarm evaluation range are missing, the alarm transitions to INSUFFICIENT_DATA. Possible values: INSUFFICIENT_DATA, Retain current state, ALARM, OK.", | |
"The current alarm state is maintained. Possible values: Retain current state, ALARM, OK.", | |
"Missing data points are treated as \"bad\" and breaching the threshold. Possible values: ALARM, OK.", | |
"Missing data points are treated as \"good\" and within the threshold. Possible values: ALARM, OK." | |
] | |
} | |
column_df = pd.DataFrame(column_data) | |
st.table(column_df) | |
# States | |
st.write("#### States") | |
state_data = { | |
"State": ["ALARM", "OK", "Retain current state", "INSUFFICIENT_DATA"], | |
"Description": [ | |
"Alarm state is triggered.", | |
"Everything is within the threshold.", | |
"The current alarm state is maintained.", | |
"Not enough data to make a determination." | |
] | |
} | |
state_df = pd.DataFrame(state_data) | |
st.table(state_df) | |
if __name__ == "__main__": | |
main() | |