Spaces:
Running
Running
import streamlit as st | |
import pandas as pd | |
import plotly.express as px | |
import plotly.graph_objects as go | |
import numpy as np | |
import plotly.express as px | |
import plotly.graph_objects as go | |
import pandas as pd | |
import seaborn as sns | |
import matplotlib.pyplot as plt | |
import datetime | |
from utilities import set_header,initialize_data,load_local_css | |
from scipy.optimize import curve_fit | |
import statsmodels.api as sm | |
from plotly.subplots import make_subplots | |
st.set_page_config( | |
page_title="Data Validation", | |
page_icon=":shark:", | |
layout="wide", | |
initial_sidebar_state='collapsed' | |
) | |
load_local_css('styles.css') | |
set_header() | |
def format_numbers(x): | |
if abs(x) >= 1e6: | |
# Format as millions with one decimal place and commas | |
return f'{x/1e6:,.1f}M' | |
elif abs(x) >= 1e3: | |
# Format as thousands with one decimal place and commas | |
return f'{x/1e3:,.1f}K' | |
else: | |
# Format with one decimal place and commas for values less than 1000 | |
return f'{x:,.1f}' | |
def format_axis(x): | |
if isinstance(x, tuple): | |
x = x[0] # Extract the numeric value from the tuple | |
if abs(x) >= 1e6: | |
return f'{x / 1e6:.0f}M' | |
elif abs(x) >= 1e3: | |
return f'{x / 1e3:.0f}k' | |
else: | |
return f'{x:.0f}' | |
attributred_app_installs=pd.read_csv("attributed_app_installs.csv") | |
attributred_app_installs_tactic=pd.read_excel('attributed_app_installs_tactic.xlsx') | |
data=pd.read_excel('Channel_wise_imp_click_spends.xlsx') | |
data['Date']=pd.to_datetime(data['Date']) | |
st.header('Saturation Curves') | |
# st.dataframe(data.head(2)) | |
st.markdown('Data QC') | |
st.markdown('Channel wise summary') | |
summary_df=data.groupby(data['Date'].dt.strftime('%B %Y')).sum() | |
summary_df=summary_df.sort_index(key=lambda x: pd.to_datetime(x, format='%B %Y')) | |
st.dataframe(summary_df.applymap(format_numbers)) | |
def line_plot_target(df,target,title): | |
df=df | |
df['Date_unix'] = df['Date'].apply(lambda x: x.timestamp()) | |
# Perform polynomial fitting | |
coefficients = np.polyfit(df['Date_unix'], df[target], 1) | |
# st.dataframe(df) | |
coefficients = np.polyfit(df['Date'].view('int64'), df[target], 1) | |
trendline = np.poly1d(coefficients) | |
fig = go.Figure() | |
fig.add_trace(go.Scatter(x=df['Date'], y=df[target], mode='lines', name=target,line=dict(color='#11B6BD'))) | |
trendline_x = df['Date'] | |
trendline_y = trendline(df['Date'].view('int64')) | |
fig.add_trace(go.Scatter(x=trendline_x, y=trendline_y, mode='lines', name='Trendline', line=dict(color='#739FAE'))) | |
fig.update_layout( | |
title=title, | |
xaxis=dict(type='date') | |
) | |
for year in df['Date'].dt.year.unique()[1:]: | |
january_1 = pd.Timestamp(year=year, month=1, day=1) | |
fig.add_shape( | |
go.layout.Shape( | |
type="line", | |
x0=january_1, | |
x1=january_1, | |
y0=0, | |
y1=1, | |
xref="x", | |
yref="paper", | |
line=dict(color="grey", width=1.5, dash="dash"), | |
) | |
) | |
return fig | |
channels_d= data.columns[:28] | |
channels=list(set([col.replace('_impressions','').replace('_clicks','').replace('_spend','') for col in channels_d if col.lower()!='date'])) | |
channel= st.selectbox('Select Channel_name',channels) | |
target_column = st.selectbox('Select Channel)',[col for col in data.columns if col.startswith(channel)]) | |
fig=line_plot_target(data, target=str(target_column), title=f'{str(target_column)} Over Time') | |
st.plotly_chart(fig, use_container_width=True) | |
# st.markdown('## Saturation Curve') | |
st.header('Build saturation curve') | |
# Your data | |
# st.write(len(attributred_app_installs)) | |
# st.write(len(data)) | |
# col=st.columns(3) | |
# with col[0]: | |
col=st.columns(2) | |
with col[0]: | |
if st.checkbox('Cap Outliers'): | |
x = data[target_column] | |
x.index=data['Date'] | |
# st.write(x) | |
result = sm.tsa.seasonal_decompose(x, model='additive') | |
x_resid=result.resid | |
# fig = make_subplots(rows=1, cols=1, shared_xaxes=True, vertical_spacing=0.02) | |
# trace_x = go.Scatter(x=data['Date'], y=x, mode='lines', name='x') | |
# fig.add_trace(trace_x) | |
# trace_x_resid = go.Scatter(x=data['Date'], y=x_resid, mode='lines', name='x_resid', yaxis='y2',line=dict(color='orange')) | |
# fig.add_trace(trace_x_resid) | |
# fig.update_layout(title='', | |
# xaxis=dict(title='Date'), | |
# yaxis=dict(title='x', side='left'), | |
# yaxis2=dict(title='x_resid', side='right')) | |
# st.title('') | |
# st.plotly_chart(fig) | |
# x=result.resid | |
# x=x.fillna(0) | |
x_mean = np.mean(x) | |
x_std = np.std(x) | |
x_scaled = (x - x_mean) / x_std | |
lower_threshold = -2.0 | |
upper_threshold = 2.0 | |
x_scaled = np.clip(x_scaled, lower_threshold, upper_threshold) | |
else: | |
x = data[target_column] | |
x_mean = np.mean(x) | |
x_std = np.std(x) | |
x_scaled = (x - x_mean) / x_std | |
with col[1]: | |
if st.checkbox('Attributed'): | |
column=[col for col in attributred_app_installs.columns if col in target_column] | |
data['app_installs_appsflyer']=attributred_app_installs[column] | |
y=data['app_installs_appsflyer'] | |
title='Attributed-App_installs_appsflyer' | |
# st.dataframe(y) | |
# st.dataframe(x) | |
# st.dataframe(x_scaled) | |
else: | |
y=data["app_installs_appsflyer"] | |
title='App_installs_appsflyer' | |
# st.write(len(y)) | |
# Curve fitting function | |
def sigmoid(x, K, a, x0): | |
return K / (1 + np.exp(-a * (x - x0))) | |
initial_K = np.max(y) | |
initial_a = 1 | |
initial_x0 = 0 | |
columns=st.columns(3) | |
with columns[0]: | |
K = st.number_input('K (Amplitude)', min_value=0.01, max_value=2.0 * np.max(y), value=float(initial_K), step=5.0) | |
with columns[1]: | |
a = st.number_input('a (Slope)', min_value=0.01, max_value=5.0, value=float(initial_a), step=0.5) | |
with columns[2]: | |
x0 = st.number_input('x0 (Center)', min_value=float(min(x_scaled)), max_value=float(max(x_scaled)), value=float(initial_x0), step=2.0) | |
params, _ = curve_fit(sigmoid, x_scaled, y, p0=[K, a, x0], maxfev=20000) | |
x_slider = st.slider('X Value', min_value=float(min(x)), max_value=float(max(x))+1, value=float(x_mean), step=1.) | |
# Calculate the corresponding value on the fitted curve | |
x_slider_scaled = (x_slider - x_mean) / x_std | |
y_slider_fit = sigmoid(x_slider_scaled, *params) | |
# Display the corresponding value | |
st.write(f'{target_column}: {format_numbers(x_slider)}') | |
st.write(f'Corresponding App_installs: {format_numbers(y_slider_fit)}') | |
# Scatter plot of your data | |
fig = px.scatter(data_frame=data, x=x_scaled, y=y, labels={'x': f'{target_column}', 'y': 'App Installs'}, title=title) | |
# Add the fitted sigmoid curve to the plot | |
x_fit = np.linspace(min(x_scaled), max(x_scaled), 100) # Generate x values for the curve | |
y_fit = sigmoid(x_fit, *params) | |
fig.add_trace(px.line(x=x_fit, y=y_fit).data[0]) | |
fig.data[1].update(line=dict(color='orange')) | |
fig.add_vline(x=x_slider_scaled, line_dash='dash', line_color='red', annotation_text=f'{format_numbers(x_slider)}') | |
x_tick_labels = {format_axis(x_scaled[i]): format_axis(x[i]) for i in range(len(x_scaled))} | |
num_points = 30 # Number of points you want to select | |
keys = list(x_tick_labels.keys()) | |
values = list(x_tick_labels.values()) | |
spacing = len(keys) // num_points # Calculate the spacing | |
if spacing==0: | |
spacing=15 | |
selected_keys = keys[::spacing] | |
selected_values = values[::spacing] | |
else: | |
selected_keys = keys[::spacing] | |
selected_values = values[::spacing] | |
# Update the x-axis ticks with the selected keys and values | |
fig.update_xaxes(tickvals=selected_keys, ticktext=selected_values) | |
fig.update_xaxes(tickvals=list(x_tick_labels.keys()), ticktext=list(x_tick_labels.values())) | |
# Show the plot using st.plotly_chart | |
fig.update_xaxes(showgrid=False) | |
fig.update_yaxes(showgrid=False) | |
fig.update_layout( | |
width=600, # Adjust the width as needed | |
height=600 # Adjust the height as needed | |
) | |
st.plotly_chart(fig) | |
st.markdown('Tactic level') | |
if channel=='paid_social': | |
tactic_data=pd.read_excel("Tatcic_paid.xlsx",sheet_name='paid_social_impressions') | |
else: | |
tactic_data=pd.read_excel("Tatcic_paid.xlsx",sheet_name='digital_app_display_impressions') | |
target_column = st.selectbox('Select Channel)',[col for col in tactic_data.columns if col!='Date' and col!='app_installs_appsflyer']) | |
fig=line_plot_target(tactic_data, target=str(target_column), title=f'{str(target_column)} Over Time') | |
st.plotly_chart(fig, use_container_width=True) | |
if st.checkbox('Cap Outliers',key='tactic1'): | |
x = tactic_data[target_column] | |
x_mean = np.mean(x) | |
x_std = np.std(x) | |
x_scaled = (x - x_mean) / x_std | |
lower_threshold = -2.0 | |
upper_threshold = 2.0 | |
x_scaled = np.clip(x_scaled, lower_threshold, upper_threshold) | |
else: | |
x = tactic_data[target_column] | |
x_mean = np.mean(x) | |
x_std = np.std(x) | |
x_scaled = (x - x_mean) / x_std | |
if st.checkbox('Attributed',key='tactic2'): | |
column=[col for col in attributred_app_installs_tactic.columns if col in target_column] | |
tactic_data['app_installs_appsflyer']=attributred_app_installs_tactic[column] | |
y=tactic_data['app_installs_appsflyer'] | |
title='Attributed-App_installs_appsflyer' | |
# st.dataframe(y) | |
# st.dataframe(x) | |
# st.dataframe(x_scaled) | |
else: | |
y=data["app_installs_appsflyer"] | |
title='App_installs_appsflyer' | |
# st.write(len(y)) | |
# Curve fitting function | |
def sigmoid(x, K, a, x0): | |
return K / (1 + np.exp(-a * (x - x0))) | |
# Curve fitting | |
# st.dataframe(x_scaled.head(3)) | |
# # y=y.astype(float) | |
# st.dataframe(y.head(3)) | |
initial_K = np.max(y) | |
initial_a = 1 | |
initial_x0 = 0 | |
K = st.number_input('K (Amplitude)', min_value=0.01, max_value=2.0 * np.max(y), value=float(initial_K), step=5.0,key='tactic3') | |
a = st.number_input('a (Slope)', min_value=0.01, max_value=5.0, value=float(initial_a), step=2.0,key='tactic41') | |
x0 = st.number_input('x0 (Center)', min_value=float(min(x_scaled)), max_value=float(max(x_scaled)), value=float(initial_x0), step=2.0,key='tactic4') | |
params, _ = curve_fit(sigmoid, x_scaled, y, p0=[K, a, x0], maxfev=20000) | |
# Slider to vary x | |
x_slider = st.slider('X Value', min_value=float(min(x)), max_value=float(max(x)), value=float(x_mean), step=1.,key='tactic7') | |
# Calculate the corresponding value on the fitted curve | |
x_slider_scaled = (x_slider - x_mean) / x_std | |
y_slider_fit = sigmoid(x_slider_scaled, *params) | |
# Display the corresponding value | |
st.write(f'{target_column}: {format_axis(x_slider)}') | |
st.write(f'Corresponding App_installs: {format_axis(y_slider_fit)}') | |
# Scatter plot of your data | |
fig = px.scatter(data_frame=data, x=x_scaled, y=y, labels={'x': f'{target_column}', 'y': 'App Installs'}, title=title) | |
# Add the fitted sigmoid curve to the plot | |
x_fit = np.linspace(min(x_scaled), max(x_scaled), 100) # Generate x values for the curve | |
y_fit = sigmoid(x_fit, *params) | |
fig.add_trace(px.line(x=x_fit, y=y_fit).data[0]) | |
fig.data[1].update(line=dict(color='orange')) | |
fig.add_vline(x=x_slider_scaled, line_dash='dash', line_color='red', annotation_text=f'{format_numbers(x_slider)}') | |
x_tick_labels = {format_axis((x_scaled[i],0)): format_axis(x[i]) for i in range(len(x_scaled))} | |
num_points = 50 # Number of points you want to select | |
keys = list(x_tick_labels.keys()) | |
values = list(x_tick_labels.values()) | |
spacing = len(keys) // num_points # Calculate the spacing | |
if spacing==0: | |
spacing=2 | |
selected_keys = keys[::spacing] | |
selected_values = values[::spacing] | |
else: | |
selected_keys = keys[::spacing] | |
selected_values = values[::spacing] | |
# Update the x-axis ticks with the selected keys and values | |
fig.update_xaxes(tickvals=selected_keys, ticktext=selected_values) | |
# Round the x-axis and y-axis tick values to zero decimal places | |
fig.update_xaxes(tickformat=".f") # Format x-axis ticks to zero decimal places | |
fig.update_yaxes(tickformat=".f") # Format y-axis ticks to zero decimal places | |
# Show the plot using st.plotly_chart | |
fig.update_xaxes(showgrid=False) | |
fig.update_yaxes(showgrid=False) | |
fig.update_layout( | |
width=600, # Adjust the width as needed | |
height=600 # Adjust the height as needed | |
) | |
st.plotly_chart(fig) |