import numpy as np import matplotlib.pylab as plt import ruptures as rpt import streamlit as st from ruptures.metrics import precision_recall from ruptures.metrics import hausdorff from ruptures.metrics import randindex st.title("Change Point Detection") # Generating Signal def pw_constant_input(n,dim,n_bkps,sigma): """Piecewise constant (pw_constant)""" # n, dim # number of samples, dimension # n_bkps, sigma # number of change points, noise standard deviation signal, bkps = rpt.pw_constant(n, dim, n_bkps, noise_std=sigma) rpt.display(signal, bkps) return signal,bkps def pw_linear_input(n,dim,n_bkps,sigma): """Piecewise Linear""" # creation of data # n, dim = 500, 3 # number of samples, dimension of the covariates # n_bkps, sigma = 3, 5 # number of change points, noise standart deviation signal, bkps = rpt.pw_linear(n, dim, n_bkps, noise_std=sigma) rpt.display(signal, bkps) return signal,bkps def pw_normal_input(n,dim,n_bkps,sigma): """Piecewise 2D Gaussian process (pw_normal)#""" # creation of data #n = 500 # number of samples #n_bkps = 3 # number of change points signal, bkps = rpt.pw_normal(n, n_bkps) rpt.display(signal, bkps) return signal,bkps def pw_wavy_input(n,dim,n_bkps,sigma): # creation of data #n, dim = 500, 3 # number of samples, dimension #n_bkps, sigma = 3, 5 # number of change points, noise standart deviation signal, bkps = rpt.pw_wavy(n, n_bkps, noise_std=sigma) rpt.display(signal, bkps) return signal,bkps input_list = ['piecewiseConstant','piecewiseLinear','piecewiseNormal','piecewiseSinusoidal'] generate_signal = st.selectbox(label = "Choose an input signal", options = input_list) n,dim,n_bkps,sigma = st.columns(4) with n: n= st.number_input('No of Samples',min_value=100,step=1) with dim: dim = st.number_input('No of dimesions',min_value=1,max_value = 5,step=1) with n_bkps: n_bkps = st.number_input('No of breakpoints',min_value=2,step=1) with sigma: sigma = st.number_input('Variance',min_value=0,max_value=4,step=1) if generate_signal == 'piecewiseConstant': signal,bkps = pw_constant_input(n,dim,n_bkps,sigma) elif generate_signal== 'piecewiseLinear': signal,bkps = pw_linear_input(n,dim,n_bkps,sigma) elif generate_signal == 'piecewiseNormal': signal,bkps = pw_normal_input(n,dim,n_bkps,sigma) else: signal,bkps= pw_wavy_input(n,dim,n_bkps,sigma) fig, axarr = rpt.display(signal,bkps) st.pyplot(fig) def dynp_method(signal,bkps,n_bkps): # change point detection model = "l1" # "l2", "rbf" algo = rpt.Dynp(model=model, min_size=3, jump=5).fit(signal) my_bkps = algo.predict(n_bkps) # show results fig,axarr = rpt.show.display(signal, bkps, my_bkps, figsize=(10, 2)) #plt.show() st.pyplot(fig) return my_bkps def pelt_method(signal,bkps,n_bkps): # change point detection model = "l1" # "l2", "rbf" algo = rpt.Pelt(model=model, min_size=n_bkps, jump=5).fit(signal) my_bkps = algo.predict(pen=n_bkps) # show results fig, ax_arr = rpt.display(signal, bkps, my_bkps, figsize=(10, 2)) st.pyplot(fig) return my_bkps def bin_seg_method(signal,bkps,n_bkps): # change point detection model = "l2" # "l1", "rbf", "linear", "normal", "ar",... algo = rpt.Binseg(model=model).fit(signal) my_bkps = algo.predict(n_bkps) # show results fg,axxarr = rpt.show.display(signal, bkps, my_bkps, figsize=(10, 2)) st.pyplot(fig) return my_bkps def bot_up_seg(signal,bkps,n_bkps): # change point detection model = "l2" # "l1", "rbf", "linear", "normal", "ar",... algo = rpt.Binseg(model=model).fit(signal) my_bkps = algo.predict(n_bkps) # show results fig,axxar = rpt.show.display(signal, bkps, my_bkps, figsize=(10, 2)) st.pyplot(fig) return my_bkps def win_sli_seg(signal,bkps,n_bkps): # change point detection model = "l2" # "l1", "rbf", "linear", "normal", "ar" algo = rpt.Window(width=40, model=model).fit(signal) my_bkps = algo.predict(n_bkps) # show results fig,axxar= rpt.show.display(signal, bkps, my_bkps, figsize=(10, 2)) st.pyplot(fig) return my_bkps searchmethod_list = ['Dynamic Programming','Pelt','Binary Segmentation','Bottom-up Segmentation','Window sliding segmentation'] detection_model = st.selectbox(label = "Choose a Detection Method",options = searchmethod_list) if detection_model== 'Dynamic Programming': bkps1 = dynp_method(signal,bkps,n_bkps) elif detection_model=='Pelt': bkps1 = pelt_method(signal,bkps,n_bkps) elif detection_model=='Binary Segmentation': bkps1 = bin_seg_method(signal,bkps,n_bkps) elif detection_model=='Bottom-up Segmentation': bkps1 = bot_up_seg(signal,bkps,n_bkps) else: bkps1 = win_sli_seg(signal,bkps,n_bkps) p, r = precision_recall(bkps, bkps1) st.header('Precision and Recall') st.write(p, r) st.header('Hausdorff metric') st.write(hausdorff(bkps, bkps1)) st.header('Rand index') st.write(randindex(bkps, bkps1)) st.write(""" For a detailed description please look through our Documentation """) url = 'https://huggingface.co/spaces/ThirdEyeData/ChangePointDetection/blob/main/README.md' st.markdown(f''' ''', unsafe_allow_html=True)