Simulator-UOPX / pages /3_Saved_Scenarios.py
Pragya Jatav
m1
803ac82
raw
history blame
26.4 kB
import streamlit as st
from numerize.numerize import numerize
import io
import pandas as pd
from utilities import (format_numbers,decimal_formater,
channel_name_formating,
load_local_css,set_header,
initialize_data,
load_authenticator)
from openpyxl import Workbook
from openpyxl.styles import Alignment,Font,PatternFill
import pickle
import streamlit_authenticator as stauth
import yaml
from yaml import SafeLoader
from classes import class_from_dict
import plotly.graph_objects as go
from pptx import Presentation
from pptx.util import Inches
from io import BytesIO
import plotly.io as pio
st.set_page_config(layout='wide')
load_local_css('styles.css')
set_header()
scenarios_to_compare = []
st.title("Saved Scenarios")
# for k, v in st.session_state.items():
# if k not in ['logout', 'login','config'] and not k.startswith('FormSubmitter'):
# st.session_state[k] = v
def comparison_scenarios_df():
## create summary page
if len(scenarios_to_compare) == 0:
return
summary_df_spend = None
summary_df_prospect = None
# summary_df_efficiency = None
#=# # print(scenarios_to_download)
for scenario_name in scenarios_to_compare:
scenario_dict = st.session_state['saved_scenarios'][scenario_name]
_spends = []
column_names = ['Date']
_sales = None
dates = None
summary_rows_spend = []
summary_rows_prospects = []
for channel in scenario_dict['channels']:
if dates is None:
dates = channel.get('dates')
_spends.append(dates)
if _sales is None:
_sales = channel.get('modified_sales')
else:
_sales += channel.get('modified_sales')
_spends.append(channel.get('modified_spends') * channel.get('conversion_rate'))
column_names.append(channel.get('name'))
name_mod = channel_name_formating(channel['name'])
summary_rows_spend.append([name_mod,
channel.get('modified_total_spends') * channel.get('conversion_rate')])
summary_rows_prospects.append([name_mod,
channel.get('modified_total_sales')])
_spends.append(_sales)
# column_names.append('NRPU')
# scenario_df = pd.DataFrame(_spends).T
# scenario_df.columns = column_names
# summary_rows.append(['Total',
# scenario_dict.get('modified_total_spends') ,
# scenario_dict.get('modified_total_sales'),
# scenario_dict.get('modified_total_sales') / scenario_dict.get('modified_total_spends'),
# '-',
# scenario_dict.get('modified_total_spends') / scenario_dict.get('modified_total_sales')])
# columns_index = pd.MultiIndex.from_product([[''],['Channel']], names=["first", "second"])
# columns_index = columns_index.append(pd.MultiIndex.from_product([[scenario_name],['Spends','NRPU','ROI','MROI','Spends per NRPU']], names=["first", "second"]))
columns_index = ['Channel',scenario_name]
if summary_df_spend is None:
summary_df_spend = pd.DataFrame(summary_rows_spend, columns = columns_index)
summary_df_spend = summary_df_spend.set_index('Channel')
else:
_df = pd.DataFrame(summary_rows_spend, columns = columns_index)
_df = _df.set_index('Channel')
summary_df_spend = summary_df_spend.merge(_df, left_index=True, right_index=True)
if summary_df_prospect is None:
summary_df_prospect = pd.DataFrame(summary_rows_prospects, columns = columns_index)
summary_df_prospect = summary_df_prospect.set_index('Channel')
else:
_df = pd.DataFrame(summary_rows_prospects, columns = columns_index)
_df = _df.set_index('Channel')
summary_df_prospect = summary_df_prospect.merge(_df, left_index=True, right_index=True)
st.session_state['disable_download_button'] = False
efficiency_df = pd.DataFrame(index = summary_df_prospect.index)
for c in summary_df_spend.columns:
efficiency_df[c] = (summary_df_prospect[c]/summary_df_prospect[c].sum())/(summary_df_spend[c]/summary_df_spend[c].sum())
efficiency_df[c] = efficiency_df[c].round(2)
return summary_df_spend,summary_df_prospect,efficiency_df
import matplotlib.colors as mcolors
import plotly.colors as pc
def rgb_to_hex(rgb):
"""Convert RGB tuple to hex color."""
return mcolors.to_hex(rgb)
def generate_color_gradient(start_color, end_color, num_colors):
"""Generate a list of hex color codes transitioning from start_color to end_color."""
if num_colors == 1:
return [start_color]
# Define the color scale from start to end color using hex codes
colorscale = [[0, start_color], [1, end_color]]
# Generate the colors
colors = pc.sample_colorscale(
colorscale,
[i / (num_colors - 1) for i in range(num_colors)],
colortype='hex' # Use 'rgb' to get colors in RGB format
)
# print(colors)
# Convert RGB tuples to hex
# hex_colors = [rgb_to_hex(color) for color in colors]
return colors
# def generate_color_gradient(start_color, end_color, num_colors):
# import plotly.express as px
# """Generate a list of hex color codes transitioning from start_color to end_color."""
# colors = px.colors.sequential.Plasma # Using a built-in color scale
# custom_colors = px.colors.sample_colorscale(
# colorscale=[[0, start_color], [1, end_color]],
# n_colors=num_colors
# )
# return custom_colors
def plot_comparison_chart(df,metric,custom_colors):
# print(metric)
# print(custom_colors)
custom_colors = [
"#4169E1", # Royal Blue
"#ADD8E6", # Light Blue
"#FF7F50" , # Coral
"#87CEEB", # Sky Blue
"#FA8072", # Salmon
"#1E90FF", # Dodger Blue
"#00008B" ,
"#F08080", # Light Coral
"#FF8C00", # Dark Orange
"#FFA500", # Orange
]
# Create traces for each column
traces = []
for i,column in enumerate(df.columns):
# print(i)
# print(custom_colors[i])
traces.append(go.Bar(
x=df.index,
y=df[column],
name=column,
text=df[column].apply(numerize), # Adding text for each point
textposition='auto',
hoverinfo='x+y+text',
marker_color = custom_colors[i]
))
# Create the layout
layout = go.Layout(
title='Comparing '+ metric,
xaxis_title="Channels",
yaxis_title=metric,
barmode='group'
)
# Create the figure
fig = go.Figure(data=traces, layout=layout)
fig.update_layout(
legend=dict(
orientation="h", # Horizontal orientation
yanchor="top", # Anchor the legend at the top
y=-0.45, # Position the legend below the plot area
xanchor="center", # Center the legend horizontally
x=0.5 # Center the legend on the x-axis
)
)
return fig
def save_ppt_file(fig1,fig2,fig3):
# Initialize PowerPoint presentation
prs = Presentation()
# Helper function to add Plotly figure to slide
def add_plotly_chart_to_slide(slide, fig, left, top, width, height):
img_stream = BytesIO()
pio.write_image(fig, img_stream, format='png',engine="orca")
slide.shapes.add_picture(img_stream, left, top, width, height)
slide_1 = prs.slides.add_slide(prs.slide_layouts[6])
# title_1 = slide_1.shapes.title
# title_1.text = "Comparing Spends"
add_plotly_chart_to_slide(slide_1, fig1, Inches(0), Inches(0.25), width=Inches(10), height=Inches(6))
slide_2 = prs.slides.add_slide(prs.slide_layouts[6])
# title_2 = slide_2.shapes.title
# title_2.text = "Comparing Contributions"
add_plotly_chart_to_slide(slide_2, fig2, Inches(0), Inches(0.25), width=Inches(10), height=Inches(6))
slide_3 = prs.slides.add_slide(prs.slide_layouts[6])
# title_3 = slide_3.shapes.title
# title_3.text = "Comparing Efficiency"
add_plotly_chart_to_slide(slide_3, fig3, Inches(0), Inches(0.25), width=Inches(10), height=Inches(6))
ppt_stream = BytesIO()
prs.save(ppt_stream)
ppt_stream.seek(0)
return ppt_stream.getvalue()
def create_comparison_plots():
# comparison_scenarios_df()
spends_df, prospects_df, efficiency_df = comparison_scenarios_df()
# st.dataframe(spends_df)
blue = "#0000FF" # Blue
green = "#00FF00" # Green
red = "#FF0000" # Red
custom_colors = generate_color_gradient(blue, red, spends_df.shape[1])
st.plotly_chart(plot_comparison_chart(spends_df,"Spends",custom_colors),use_container_width=True)
st.plotly_chart(plot_comparison_chart(prospects_df,"Contributions",custom_colors),use_container_width=True)
st.plotly_chart(plot_comparison_chart(efficiency_df,"Efficiency",custom_colors),use_container_width=True)
fig1 = plot_comparison_chart(spends_df,"Spends",custom_colors)
fig2 = plot_comparison_chart(prospects_df,"Contributions",custom_colors)
fig3 = plot_comparison_chart(efficiency_df,"Efficiency",custom_colors)
ppt_file = save_ppt_file(fig1,fig2,fig3)
# Add a download button
st.download_button(
label="Download Comparision Analysis",
data=ppt_file,
file_name="MMM_Scenario_Comparision.pptx",
mime="application/vnd.openxmlformats-officedocument.presentationml.presentation"
)
def create_scenario_summary(scenario_dict):
summary_rows = []
actual_total_spends = scenario_dict.get('actual_total_spends'),
modified_total_spends = scenario_dict.get('modified_total_spends'),
actual_total_sales = scenario_dict.get('actual_total_sales'),
modified_total_sales = scenario_dict.get('modified_total_sales')
# st.write(modified_total_spends[0])
# st.write(actual_total_spends[0])
# st.write(modified_total_sales)
# st.write(actual_total_sales[0])
# st.write(modified_total_spends[0])
for channel_dict in scenario_dict['channels']:
# st.write(channel_dict['name'])
name_mod = channel_name_formating(channel_dict['name'])
summary_rows.append([name_mod,
channel_dict.get('actual_total_spends') * channel_dict.get('conversion_rate'),
channel_dict.get('modified_total_spends') * channel_dict.get('conversion_rate'),
channel_dict.get('actual_total_sales') ,
channel_dict.get('modified_total_sales'),
# channel_dict.get('modified_total_sales')/modified_total_spends[0],
# channel_dict.get('modified_total_sales')/modified_total_spends[0]
# 1,2
(channel_dict.get('actual_total_sales') /actual_total_sales[0])/(channel_dict.get('actual_total_spends') /actual_total_spends[0] ),
(channel_dict.get('modified_total_sales') /modified_total_sales )/(channel_dict.get('modified_total_spends') /modified_total_spends[0] )
# # # channel_dict.get('actual_mroi'),
# channel_dict.get('modified_mroi'),
# channel_dict.get('actual_total_spends') * channel_dict.get('conversion_rate') / channel_dict.get('actual_total_sales'),
# channel_dict.get('modified_total_spends') * channel_dict.get('conversion_rate') / channel_dict.get('modified_total_sales')
])
summary_rows.append(['Total',
scenario_dict.get('actual_total_spends'),
scenario_dict.get('modified_total_spends'),
scenario_dict.get('actual_total_sales'),
scenario_dict.get('modified_total_sales'),
1.0,
1.0
# scenario_dict.get('actual_total_sales') / scenario_dict.get('actual_total_spends'),
# scenario_dict.get('modified_total_sales') / scenario_dict.get('modified_total_spends'),
# '-',
# '-',
# scenario_dict.get('actual_total_spends') / scenario_dict.get('actual_total_sales'),
# scenario_dict.get('modified_total_spends') / scenario_dict.get('modified_total_sales')
])
adf = pd.DataFrame(summary_rows)
# st.write(adf.columns)
adf.columns = ["1","2","3","4","5","6","7"]
adf.index = adf["1"].to_list() #["1","2","3","4","5","6","7","8","9","10","11","12","13","14"]
adf.drop(columns= ["1"],inplace= True)
# columns_index = pd.MultiIndex.from_product([[''],['Channel']], names=["",""])
# columns_index = columns_index.append(pd.MultiIndex.from_product([['Spends','Prospects',"Efficiency"],['Actual','Simulated']], names=["",""]))
columns_index = pd.MultiIndex.from_product([['Spends','Prospects',"Efficiency"],['Actual','Simulated']], names=["",""])
adf.columns = columns_index
return adf # pd.DataFrame(summary_rows, columns=columns_index)
def summary_df_to_worksheet(df, ws):
heading_fill = PatternFill(fill_type='solid',start_color='FFFFFFFF',end_color='FFFFFFFF')
# Define border style
border_style = Border(
left=Side(border_style='thin', color='00000000'),
right=Side(border_style='thin', color='00000000'),
top=Side(border_style='thin', color='00000000'),
bottom=Side(border_style='thin', color='00000000')
)
number_format = '0.00'
for j,header in enumerate(df.columns.values):
col = j + 1
for i in range(1,3):
ws.cell(row=i, column=j + 1, value=header[i - 1]).font = Font(bold=True, color='00000000')
ws.cell(row=i,column=j+1).fill = heading_fill
# ws.cell.border = border_style
# if col > 1 and (col - 6)%5==0:
# ws.merge_cells(start_row=1, end_row=1, start_column = col-3, end_column=col)
# ws.cell(row=1,column=col).alignment = Alignment(horizontal='center')
# # ws.cell.border = border_style
# Apply borders to all cells, including empty cells
for row in ws.iter_rows():
for cell in row:
cell.border = border_style
for i,row in enumerate(df.itertuples()):
for j,value in enumerate(row):
if j == 0:
continue
# elif (j-2)%4 == 0 or (j-3)%4 == 0:
# ws.cell(row=i+3, column = j, value=value)
# # cell.border = border_style
# # .number_format = '$#,##0.0'
# if isinstance(value, (int, float)):
# cell.number_format = number_format
else:
ws.cell(row=i+3, column = j, value=value)
# cell.border = border_style
if isinstance(value, (int, float)):
cell.number_format = '$#,##0.0'
# cell.number_format = number_format
# Auto-size columns
for col in ws.columns:
max_length = 15
column = col[0].column_letter
for cell in col:
try:
if len(str(cell.value)) > max_length:
max_length = len(cell.value)
except:
pass
adjusted_width = (max_length + 2)
ws.column_dimensions[column].width = adjusted_width
from openpyxl.utils import get_column_letter
from openpyxl.styles import Font, PatternFill
from openpyxl.styles import PatternFill, Font, Alignment, Border, Side
import logging
def scenario_df_to_worksheet(df, ws):
heading_fill = PatternFill(start_color='FF11B6BD', end_color='FF11B6BD', fill_type='solid')
for j, header in enumerate(df.columns.values):
cell = ws.cell(row=1, column=j + 1, value=header)
cell.font = Font(bold=True, color='FF11B6BD')
cell.fill = heading_fill
for i, row in enumerate(df.itertuples()):
for j, value in enumerate(row[1:], start=1): # Start from index 1 to skip the index column
try:
cell = ws.cell(row=i + 2, column=j, value=value)
if isinstance(value, (int, float)):
cell.number_format = '$#,##0.0'
elif isinstance(value, str):
cell.value = value[:32767]
else:
cell.value = str(value)
except ValueError as e:
logging.error(f"Error assigning value '{value}' to cell {get_column_letter(j)}{i+2}: {e}")
cell.value = None # Assign None to the cell where the error occurred
return ws
def download_scenarios():
"""
Makes a excel with all saved scenarios and saves it locally
"""
## create summary page
if len(scenarios_to_download) == 0:
return
wb = Workbook()
wb.iso_dates = True
wb.remove(wb.active)
st.session_state['xlsx_buffer'] = io.BytesIO()
summary_df = None
## # print(scenarios_to_download)
for scenario_name in scenarios_to_download:
scenario_dict = st.session_state['saved_scenarios'][scenario_name]
_spends = []
column_names = ['Date']
_sales = None
dates = None
summary_rows = []
for channel in scenario_dict['channels']:
if dates is None:
dates = channel.get('dates')
_spends.append(dates)
if _sales is None:
_sales = channel.get('modified_sales')
else:
_sales += channel.get('modified_sales')
_spends.append(channel.get('modified_spends') * channel.get('conversion_rate'))
column_names.append(channel.get('name'))
name_mod = channel_name_formating(channel['name'])
summary_rows.append([name_mod,
channel.get('modified_total_spends') * channel.get('conversion_rate') ,
channel.get('modified_total_sales'),
# channel.get('modified_total_sales') / channel.get('modified_total_spends') * channel.get('conversion_rate'),
# channel.get('modified_mroi'),
# channel.get('modified_total_sales') / channel.get('modified_total_spends') * channel.get('conversion_rate')
])
_spends.append(_sales)
column_names.append('NRPU')
scenario_df = pd.DataFrame(_spends).T
scenario_df.columns = column_names
## write to sheet
# ws = wb.create_sheet(scenario_name)
# scenario_df_to_worksheet(scenario_df, ws)
summary_rows.append(['Total',
scenario_dict.get('modified_total_spends') ,
scenario_dict.get('modified_total_sales'),
# scenario_dict.get('modified_total_sales') / scenario_dict.get('modified_total_spends'),
# '-',
# scenario_dict.get('modified_total_spends') / scenario_dict.get('modified_total_sales')
])
columns_index = pd.MultiIndex.from_product([[''],['Channel']], names=["first", "second"])
columns_index = columns_index.append(pd.MultiIndex.from_product([[scenario_name],['Spends','Prospects',
# 'ROI','MROI','Spends per NRPU'
]], names=["first", "second"]))
if summary_df is None:
summary_df = pd.DataFrame(summary_rows, columns = columns_index)
summary_df = summary_df.set_index(('','Channel'))
else:
_df = pd.DataFrame(summary_rows, columns = columns_index)
_df = _df.set_index(('','Channel'))
summary_df = summary_df.merge(_df, left_index=True, right_index=True)
ws = wb.create_sheet('Summary',0)
summary_df_to_worksheet(summary_df.reset_index(), ws)
wb.save(st.session_state['xlsx_buffer'])
st.session_state['disable_download_button'] = False
def disable_download_button():
st.session_state['disable_download_button'] =True
def transform(x):
if x.name == ("",'Channel'):
return x
elif x.name[0] == 'Efficiency' or x.name[0] == 'MROI':
return x.apply(lambda y : y if isinstance(y,str) else decimal_formater(format_numbers(y,include_indicator=False,n_decimals=2),n_decimals=2))
else:
return x.apply(lambda y : y if isinstance(y,str) else format_numbers(y))
def delete_scenario():
if selected_scenario in st.session_state['saved_scenarios']:
del st.session_state['saved_scenarios'][selected_scenario]
with open('../saved_scenarios.pkl', 'wb') as f:
pickle.dump(st.session_state['saved_scenarios'],f)
def load_scenario():
if selected_scenario in st.session_state['saved_scenarios']:
st.session_state['scenario'] = class_from_dict(selected_scenario_details)
authenticator = st.session_state.get('authenticator')
if authenticator is None:
authenticator = load_authenticator()
name, authentication_status, username = authenticator.login('Login', 'main')
auth_status = st.session_state.get('authentication_status')
if auth_status == True:
is_state_initiaized = st.session_state.get('initialized',False)
if not is_state_initiaized:
## # print("Scenario page state reloaded")
initialize_data(target_file = "Overview_data_test_panel@#prospects.xlsx")
saved_scenarios = st.session_state['saved_scenarios']
if len(saved_scenarios) ==0:
st.header('No saved scenarios')
else:
with st.sidebar:
with st.expander('View Scenario Details'):
st.markdown("""<hr>""", unsafe_allow_html=True)
selected_scenario = st.selectbox('Select the scenario',list(saved_scenarios.keys()))
# selected_scenario = st.radio(
# 'Pick a scenario to view details',
# list(saved_scenarios.keys())
# )
st.button('Delete scenario', on_click=delete_scenario)
with st.expander('Download Scenario'):
st.markdown("""<hr>""", unsafe_allow_html=True)
scenarios_to_download = st.multiselect('Select scenarios to download',
list(saved_scenarios.keys()))
st.button('Prepare download',on_click=download_scenarios)
st.download_button(
label="Download Scenarios",
data=st.session_state['xlsx_buffer'].getvalue(),
file_name="scenarios.xlsx",
mime="application/vnd.ms-excel",
disabled= st.session_state['disable_download_button'],
on_click= disable_download_button
)
with st.expander('Compare Scenarios'):
st.markdown("""<hr>""", unsafe_allow_html=True)
scenarios_to_compare = st.multiselect('Select scenarios to compare',
list(saved_scenarios.keys()))
st.button('Compare')
column_1, column_2,column_3 = st.columns((6,1,1))
with column_1:
st.markdown(f'<span style="font-size:28px"><strong>Selected Scenario:</strong> {selected_scenario}</span>', unsafe_allow_html=True)
# st.header(f"Selected Scenario: {selected_scenario}")
# with column_3:
# st.write("")
# st.button('Delete scenario', on_click=delete_scenario)
# with column_3:
# st.button('Load Scenario', on_click=load_scenario)
selected_scenario_details = saved_scenarios[selected_scenario]
pd.set_option('display.max_colwidth', 100)
# st.table(create_scenario_summary(selected_scenario_details))
# st.table(create_scenario_summary(selected_scenario_details).transform(transform))
adf = create_scenario_summary(selected_scenario_details).transform(transform)
# adf1 = adf[('Spends', 'Actual'),
# ( 'Spends', 'Simulated'),
# ( 'Prospects','Actual'),
# ( 'Prospects', 'Simulated')].transform(transform)
# adf2 = adf[('Efficiency', 'Actual'),
# ('Efficiency', 'Simulated')].round(2)
# st.write(adf.columns)
# adf = adf.set_index([('', 'Channel')])#, inplace=True)
# st.table(adf)
st.markdown(adf.style.set_table_styles(
[
# {
# 'selector': 'th',
# 'props': [('background-color', '#1167bd')]
# },
# {
# 'selector' : 'tr:nth-child(even)',
# 'props' : [('background-color', '#11B6BD')]
# }
]).to_html(),unsafe_allow_html=True)
st.markdown("<br><br>", unsafe_allow_html=True)
with st.expander('Scenario Comparison'):
st.header("Scenario Comparison")
if len(scenarios_to_compare)== 0:
st.write("")
else:
create_comparison_plots()
elif auth_status == False:
st.error('Username/Password is incorrect')
if auth_status != True:
try:
username_forgot_pw, email_forgot_password, random_password = authenticator.forgot_password('Forgot password')
if username_forgot_pw:
st.success('New password sent securely')
# Random password to be transferred to user securely
elif username_forgot_pw == False:
st.error('Username not found')
except Exception as e:
st.error(e)
# create_comparison_plots()