Simulator-UOPX / response_curves_model_quality.py
Pragya Jatav
m1
ad7f3dc
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.optimize import curve_fit
from sklearn.preprocessing import MinMaxScaler
import warnings
warnings.filterwarnings("ignore")
import plotly.graph_objects as go
from utilities import (channel_name_formating)
## reading input data
df= pd.read_csv('response_curves_input_file.csv')
df.dropna(inplace=True)
df['Date'] = pd.to_datetime(df['Date'])
df.reset_index(inplace=True)
channel_cols = [
'BroadcastTV',
'CableTV',
'Connected&OTTTV',
'DisplayProspecting',
'DisplayRetargeting',
'Video',
'SocialProspecting',
'SocialRetargeting',
'SearchBrand',
'SearchNon-brand',
'DigitalPartners',
'Audio',
'Email']
spend_cols = [
'tv_broadcast_spend',
'tv_cable_spend',
'stream_video_spend',
'disp_prospect_spend',
'disp_retarget_spend',
'olv_spend',
'social_prospect_spend',
'social_retarget_spend',
'search_brand_spend',
'search_nonbrand_spend',
'cm_spend',
'audio_spend',
'email_spend']
prospect_cols = [
'Broadcast TV_Prospects',
'Cable TV_Prospects',
'Connected & OTT TV_Prospects',
'Display Prospecting_Prospects',
'Display Retargeting_Prospects',
'Video_Prospects',
'Social Prospecting_Prospects',
'Social Retargeting_Prospects',
'Search Brand_Prospects',
'Search Non-brand_Prospects',
'Digital Partners_Prospects',
'Audio_Prospects',
'Email_Prospects']
def hill_equation(x, Kd, n):
return x**n / (Kd**n + x**n)
def hill_func(x_data,y_data,x_minmax,y_minmax):
# Fit the Hill equation to the data
initial_guess = [1, 1] # Initial guess for Kd and n
params, covariance = curve_fit(hill_equation, x_data, y_data, p0=initial_guess,maxfev = 1000)
# Extract the fitted parameters
Kd_fit, n_fit = params
# Generate y values using the fitted parameters
y_fit = hill_equation(x_data, Kd_fit, n_fit)
x_data_inv = x_minmax.inverse_transform(np.array(x_data).reshape(-1,1))
y_data_inv = y_minmax.inverse_transform(np.array(y_data).reshape(-1,1))
y_fit_inv = y_minmax.inverse_transform(np.array(y_fit).reshape(-1,1))
# # Plot the original data and the fitted curve
# plt.scatter(x_data_inv, y_data_inv, label='Actual Data')
# plt.scatter(x_data_inv, y_fit_inv, label='Fit Data',color='red')
# # plt.line(x_data_inv, y_fit_inv, label=f'Fitted Hill Equation (Kd={Kd_fit:.2f}, n={n_fit:.2f})', color='red')
# plt.xlabel('Ligand Concentration')
# plt.ylabel('Fraction of Binding')
# plt.title('Fitting Hill Equation to Data')
# plt.legend()
# plt.show()
return y_fit,y_fit_inv,Kd_fit, n_fit
def data_output(channel,X,y,y_fit_inv,x_ext_data,y_fit_inv_ext):
fit_col = 'Fit_Data_'+channel
plot_df = pd.DataFrame()
plot_df[f'{channel}_Spends'] = X
plot_df['Date'] = df['Date']
plot_df['MAT'] = df['MAT']
y_fit_inv_v2 = []
for i in range(len(y_fit_inv)):
y_fit_inv_v2.append(y_fit_inv[i][0])
plot_df[fit_col] = y_fit_inv_v2
# adding extra data
y_fit_inv_v2_ext = []
for i in range(len(y_fit_inv_ext)):
y_fit_inv_v2_ext.append(y_fit_inv_ext[i][0])
# # # # print(x_ext_data)
ext_df = pd.DataFrame()
ext_df[f'{channel}_Spends'] = x_ext_data
ext_df[fit_col] = y_fit_inv_v2_ext
ext_df['Date'] = [
np.datetime64('1950-01-01'),
np.datetime64('1950-06-15'),
np.datetime64('1950-12-31')
]
ext_df['MAT'] = ["ext","ext","ext"]
# # # # print(ext_df)
plot_df= plot_df.append(ext_df)
return plot_df
def input_data(df,spend_col,prospect_col):
X = np.array(df[spend_col].tolist())
y = np.array(df[prospect_col].tolist())
x_minmax = MinMaxScaler()
x_scaled = x_minmax.fit_transform(df[[spend_col]])
x_data = []
for i in range(len(x_scaled)):
x_data.append(x_scaled[i][0])
y_minmax = MinMaxScaler()
y_scaled = y_minmax.fit_transform(df[[prospect_col]])
y_data = []
for i in range(len(y_scaled)):
y_data.append(y_scaled[i][0])
return X,y,x_data,y_data,x_minmax,y_minmax
def extend_s_curve(x_max,x_minmax,y_minmax, Kd_fit, n_fit):
# # # # print(x_max)
x_ext_data = [x_max*1.2,x_max*1.3,x_max*1.5]
# x_ext_data = [1500000,2000000,2500000]
# x_ext_data = [x_max+100,x_max+200,x_max+5000]
x_scaled = x_minmax.transform(pd.DataFrame(x_ext_data))
x_data = []
for i in range(len(x_scaled)):
x_data.append(x_scaled[i][0])
# # # # print(x_data)
y_fit = hill_equation(x_data, Kd_fit, n_fit)
y_fit_inv = y_minmax.inverse_transform(np.array(y_fit).reshape(-1,1))
return x_ext_data,y_fit_inv
def fit_data(spend_col,prospect_col,channel):
### getting k and n parameters
temp_df = df[df[spend_col]>0]
temp_df.reset_index(inplace=True)
X,y,x_data,y_data,x_minmax,y_minmax = input_data(temp_df,spend_col,prospect_col)
y_fit, y_fit_inv, Kd_fit, n_fit = hill_func(x_data,y_data,x_minmax,y_minmax)
# # # # print('k: ',Kd_fit)
# # # # print('n: ', n_fit)
##### extend_s_curve
x_ext_data,y_fit_inv_ext= extend_s_curve(temp_df[spend_col].max(),x_minmax,y_minmax, Kd_fit, n_fit)
plot_df = data_output(channel,X,y,y_fit_inv,x_ext_data,y_fit_inv_ext)
return plot_df
plotly_data = fit_data(spend_cols[0],prospect_cols[0],channel_cols[0])
plotly_data.tail()
for i in range(1,13):
# # # # print(i)
pdf = fit_data(spend_cols[i],prospect_cols[i],channel_cols[i])
plotly_data = plotly_data.merge(pdf,on = ["Date","MAT"],how = "left")
def response_curves(channel,x_modified,y_modified):
# Initialize the Plotly figure
fig = go.Figure()
x_col = (channel+"_Spends").replace('\xa0', '')
y_col = ("Fit_Data_"+channel).replace('\xa0', '')
# fig.add_trace(go.Scatter(
# x=plotly_data[x_col],
# y=plotly_data[y_col],
# mode='markers',
# name=x_col.replace('_Spends', '')
# ))
fig.add_trace(go.Scatter(
x=plotly_data.sort_values(by=x_col, ascending=True)[x_col],
y=plotly_data.sort_values(by=x_col, ascending=True)[y_col],
mode='lines+markers',
name=x_col.replace('_Spends', '')
))
plotly_data2 = plotly_data.copy()
plotly_data2 = plotly_data[plotly_data[x_col].isnull()==False]
# # print(plotly_data[plotly_data2['Date'] == plotly_data2['Date'].max()][x_col])
# .dropna(subset=[x_col]).reset_index(inplace = True)
fig.add_trace(go.Scatter(
x=plotly_data[plotly_data2['Date'] == plotly_data2['Date'].max()][x_col],
y=plotly_data[plotly_data2['Date'] == plotly_data2['Date'].max()][y_col],
mode='markers',
marker=dict(
size=13 # Adjust the size value to make the markers larger or smaller
, color = 'yellow'
),
name="Current Spends"
))
fig.add_trace(go.Scatter(
x=[x_modified/104],
y=[y_modified/104],
mode='markers',
marker=dict(
size=13 # Adjust the size value to make the markers larger or smaller
, color = 'blue'
),
name="Optimised Spends"
))
# Update layout with titles
fig.update_layout(
title=channel+' Response Curve',
xaxis_title='Weekly Spends',
yaxis_title='Prospects'
)
# Show the figure
return fig
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.optimize import curve_fit
from sklearn.preprocessing import MinMaxScaler
import warnings
warnings.filterwarnings("ignore")
import plotly.graph_objects as go
## reading input data
df= pd.read_csv('response_curves_input_file.csv')
df.dropna(inplace=True)
df['Date'] = pd.to_datetime(df['Date'])
df.reset_index(inplace=True)
channel_cols = [
'BroadcastTV',
'CableTV',
'Connected&OTTTV',
'DisplayProspecting',
'DisplayRetargeting',
'Video',
'SocialProspecting',
'SocialRetargeting',
'SearchBrand',
'SearchNon-brand',
'DigitalPartners',
'Audio',
'Email']
spend_cols = [
'tv_broadcast_spend',
'tv_cable_spend',
'stream_video_spend',
'disp_prospect_spend',
'disp_retarget_spend',
'olv_spend',
'social_prospect_spend',
'social_retarget_spend',
'search_brand_spend',
'search_nonbrand_spend',
'cm_spend',
'audio_spend',
'email_spend']
prospect_cols = [
'Broadcast TV_Prospects',
'Cable TV_Prospects',
'Connected & OTT TV_Prospects',
'Display Prospecting_Prospects',
'Display Retargeting_Prospects',
'Video_Prospects',
'Social Prospecting_Prospects',
'Social Retargeting_Prospects',
'Search Brand_Prospects',
'Search Non-brand_Prospects',
'Digital Partners_Prospects',
'Audio_Prospects',
'Email_Prospects']
def hill_equation(x, Kd, n):
return x**n / (Kd**n + x**n)
def hill_func(x_data,y_data,x_minmax,y_minmax):
# Fit the Hill equation to the data
initial_guess = [1, 1] # Initial guess for Kd and n
params, covariance = curve_fit(hill_equation, x_data, y_data, p0=initial_guess,maxfev = 1000)
# Extract the fitted parameters
Kd_fit, n_fit = params
# Generate y values using the fitted parameters
y_fit = hill_equation(x_data, Kd_fit, n_fit)
x_data_inv = x_minmax.inverse_transform(np.array(x_data).reshape(-1,1))
y_data_inv = y_minmax.inverse_transform(np.array(y_data).reshape(-1,1))
y_fit_inv = y_minmax.inverse_transform(np.array(y_fit).reshape(-1,1))
# # Plot the original data and the fitted curve
# plt.scatter(x_data_inv, y_data_inv, label='Actual Data')
# plt.scatter(x_data_inv, y_fit_inv, label='Fit Data',color='red')
# # plt.line(x_data_inv, y_fit_inv, label=f'Fitted Hill Equation (Kd={Kd_fit:.2f}, n={n_fit:.2f})', color='red')
# plt.xlabel('Ligand Concentration')
# plt.ylabel('Fraction of Binding')
# plt.title('Fitting Hill Equation to Data')
# plt.legend()
# plt.show()
return y_fit,y_fit_inv,Kd_fit, n_fit
def data_output(channel,X,y,y_fit_inv,x_ext_data,y_fit_inv_ext):
fit_col = 'Fit_Data_'+channel
plot_df = pd.DataFrame()
plot_df[f'{channel}_Spends'] = X
plot_df['Date'] = df['Date']
plot_df['MAT'] = df['MAT']
y_fit_inv_v2 = []
for i in range(len(y_fit_inv)):
y_fit_inv_v2.append(y_fit_inv[i][0])
plot_df[fit_col] = y_fit_inv_v2
# adding extra data
y_fit_inv_v2_ext = []
for i in range(len(y_fit_inv_ext)):
y_fit_inv_v2_ext.append(y_fit_inv_ext[i][0])
# # # # print(x_ext_data)
ext_df = pd.DataFrame()
ext_df[f'{channel}_Spends'] = x_ext_data
ext_df[fit_col] = y_fit_inv_v2_ext
ext_df['Date'] = [
np.datetime64('1950-01-01'),
np.datetime64('1950-06-15'),
np.datetime64('1950-12-31')
]
ext_df['MAT'] = ["ext","ext","ext"]
# # # # print(ext_df)
plot_df= plot_df.append(ext_df)
return plot_df
def input_data(df,spend_col,prospect_col):
X = np.array(df[spend_col].tolist())
y = np.array(df[prospect_col].tolist())
x_minmax = MinMaxScaler()
x_scaled = x_minmax.fit_transform(df[[spend_col]])
x_data = []
for i in range(len(x_scaled)):
x_data.append(x_scaled[i][0])
y_minmax = MinMaxScaler()
y_scaled = y_minmax.fit_transform(df[[prospect_col]])
y_data = []
for i in range(len(y_scaled)):
y_data.append(y_scaled[i][0])
return X,y,x_data,y_data,x_minmax,y_minmax
def extend_s_curve(x_max,x_minmax,y_minmax, Kd_fit, n_fit):
# # # # print(x_max)
x_ext_data = [x_max*1.2,x_max*1.3,x_max*1.5]
# x_ext_data = [1500000,2000000,2500000]
# x_ext_data = [x_max+100,x_max+200,x_max+5000]
x_scaled = x_minmax.transform(pd.DataFrame(x_ext_data))
x_data = []
for i in range(len(x_scaled)):
x_data.append(x_scaled[i][0])
# # # # print(x_data)
y_fit = hill_equation(x_data, Kd_fit, n_fit)
y_fit_inv = y_minmax.inverse_transform(np.array(y_fit).reshape(-1,1))
return x_ext_data,y_fit_inv
def fit_data(spend_col,prospect_col,channel):
### getting k and n parameters
temp_df = df[df[spend_col]>0]
temp_df.reset_index(inplace=True)
X,y,x_data,y_data,x_minmax,y_minmax = input_data(temp_df,spend_col,prospect_col)
y_fit, y_fit_inv, Kd_fit, n_fit = hill_func(x_data,y_data,x_minmax,y_minmax)
# # # # print('k: ',Kd_fit)
# # # # print('n: ', n_fit)
##### extend_s_curve
x_ext_data,y_fit_inv_ext= extend_s_curve(temp_df[spend_col].max(),x_minmax,y_minmax, Kd_fit, n_fit)
plot_df = data_output(channel,X,y,y_fit_inv,x_ext_data,y_fit_inv_ext)
return plot_df
plotly_data = fit_data(spend_cols[0],prospect_cols[0],channel_cols[0])
plotly_data.tail()
for i in range(1,13):
# # # # print(i)
pdf = fit_data(spend_cols[i],prospect_cols[i],channel_cols[i])
plotly_data = plotly_data.merge(pdf,on = ["Date","MAT"],how = "left")
def response_curves(channel,x_modified,y_modified):
# Initialize the Plotly figure
fig = go.Figure()
x_col = (channel+"_Spends").replace('\xa0', '')
y_col = ("Fit_Data_"+channel).replace('\xa0', '')
# fig.add_trace(go.Scatter(
# x=plotly_data[x_col],
# y=plotly_data[y_col],
# mode='markers',
# name=x_col.replace('_Spends', '')
# ))
plotly_data1 = plotly_data[plotly_data["MAT"]!="ext"]
fig.add_trace(go.Scatter(
x=plotly_data1.sort_values(by=x_col, ascending=True)[x_col],
y=plotly_data1.sort_values(by=x_col, ascending=True)[y_col],
mode='lines',
marker=dict(color = 'blue'),
name=x_col.replace('_Spends', '')
))
dividing_parameter = len(plotly_data1[plotly_data1[x_col].isnull()==False])
# print(dividing_parameter)
plotly_data2 = plotly_data.copy()
plotly_data2 = plotly_data[plotly_data[x_col].isnull()==False]
plotly_data2 = plotly_data2[plotly_data2["MAT"]!="ext"]
# .dropna(subset=[x_col]).reset_index(inplace = True)
fig.add_trace(go.Scatter(
x=np.array(plotly_data2[x_col].mean()),
y=np.array(plotly_data2[y_col].mean()),
mode='markers',
marker=dict(
size=13 # Adjust the size value to make the markers larger or smaller
, color = '#516DA6'
),
name="Current Spends"
))
# # print(dividing_parameter)
fig.add_trace(go.Scatter(
x=[x_modified/dividing_parameter],
y=[y_modified/dividing_parameter],
mode='markers',
marker=dict(
size=13 # Adjust the size value to make the markers larger or smaller
, color = '#4ACAD9'
),
name="Optimised Spends"
))
# Update layout with titles
fig.update_layout(
title={
'text': channel_name_formating(channel)+' Response Curve',
'font': {
'size': 24,
'family': 'Arial',
'color': 'black',
# 'bold': True
}
},
# title=channel_name_formating(channel)+' Response Curve',
xaxis_title='Weekly Spends',
yaxis_title='Prospects'
)
# Show the figure
return fig