import pandas as pd import numpy as np import matplotlib.pyplot as plt from scipy.optimize import curve_fit from sklearn.preprocessing import MinMaxScaler import warnings warnings.filterwarnings("ignore") import plotly.graph_objects as go from utilities import (channel_name_formating) ## reading input data df= pd.read_csv('response_curves_input_file.csv') df.dropna(inplace=True) df['Date'] = pd.to_datetime(df['Date']) df.reset_index(inplace=True) channel_cols = [ 'BroadcastTV', 'CableTV', 'Connected&OTTTV', 'DisplayProspecting', 'DisplayRetargeting', 'Video', 'SocialProspecting', 'SocialRetargeting', 'SearchBrand', 'SearchNon-brand', 'DigitalPartners', 'Audio', 'Email'] spend_cols = [ 'tv_broadcast_spend', 'tv_cable_spend', 'stream_video_spend', 'disp_prospect_spend', 'disp_retarget_spend', 'olv_spend', 'social_prospect_spend', 'social_retarget_spend', 'search_brand_spend', 'search_nonbrand_spend', 'cm_spend', 'audio_spend', 'email_spend'] prospect_cols = [ 'Broadcast TV_Prospects', 'Cable TV_Prospects', 'Connected & OTT TV_Prospects', 'Display Prospecting_Prospects', 'Display Retargeting_Prospects', 'Video_Prospects', 'Social Prospecting_Prospects', 'Social Retargeting_Prospects', 'Search Brand_Prospects', 'Search Non-brand_Prospects', 'Digital Partners_Prospects', 'Audio_Prospects', 'Email_Prospects'] def hill_equation(x, Kd, n): return x**n / (Kd**n + x**n) def hill_func(x_data,y_data,x_minmax,y_minmax): # Fit the Hill equation to the data initial_guess = [1, 1] # Initial guess for Kd and n params, covariance = curve_fit(hill_equation, x_data, y_data, p0=initial_guess,maxfev = 1000) # Extract the fitted parameters Kd_fit, n_fit = params # Generate y values using the fitted parameters y_fit = hill_equation(x_data, Kd_fit, n_fit) x_data_inv = x_minmax.inverse_transform(np.array(x_data).reshape(-1,1)) y_data_inv = y_minmax.inverse_transform(np.array(y_data).reshape(-1,1)) y_fit_inv = y_minmax.inverse_transform(np.array(y_fit).reshape(-1,1)) # # Plot the original data and the fitted curve # plt.scatter(x_data_inv, y_data_inv, label='Actual Data') # plt.scatter(x_data_inv, y_fit_inv, label='Fit Data',color='red') # # plt.line(x_data_inv, y_fit_inv, label=f'Fitted Hill Equation (Kd={Kd_fit:.2f}, n={n_fit:.2f})', color='red') # plt.xlabel('Ligand Concentration') # plt.ylabel('Fraction of Binding') # plt.title('Fitting Hill Equation to Data') # plt.legend() # plt.show() return y_fit,y_fit_inv,Kd_fit, n_fit def data_output(channel,X,y,y_fit_inv,x_ext_data,y_fit_inv_ext): fit_col = 'Fit_Data_'+channel plot_df = pd.DataFrame() plot_df[f'{channel}_Spends'] = X plot_df['Date'] = df['Date'] plot_df['MAT'] = df['MAT'] y_fit_inv_v2 = [] for i in range(len(y_fit_inv)): y_fit_inv_v2.append(y_fit_inv[i][0]) plot_df[fit_col] = y_fit_inv_v2 # adding extra data y_fit_inv_v2_ext = [] for i in range(len(y_fit_inv_ext)): y_fit_inv_v2_ext.append(y_fit_inv_ext[i][0]) # # # # print(x_ext_data) ext_df = pd.DataFrame() ext_df[f'{channel}_Spends'] = x_ext_data ext_df[fit_col] = y_fit_inv_v2_ext ext_df['Date'] = [ np.datetime64('1950-01-01'), np.datetime64('1950-06-15'), np.datetime64('1950-12-31') ] ext_df['MAT'] = ["ext","ext","ext"] # # # # print(ext_df) plot_df= plot_df.append(ext_df) return plot_df def input_data(df,spend_col,prospect_col): X = np.array(df[spend_col].tolist()) y = np.array(df[prospect_col].tolist()) x_minmax = MinMaxScaler() x_scaled = x_minmax.fit_transform(df[[spend_col]]) x_data = [] for i in range(len(x_scaled)): x_data.append(x_scaled[i][0]) y_minmax = MinMaxScaler() y_scaled = y_minmax.fit_transform(df[[prospect_col]]) y_data = [] for i in range(len(y_scaled)): y_data.append(y_scaled[i][0]) return X,y,x_data,y_data,x_minmax,y_minmax def extend_s_curve(x_max,x_minmax,y_minmax, Kd_fit, n_fit): # # # # print(x_max) x_ext_data = [x_max*1.2,x_max*1.3,x_max*1.5] # x_ext_data = [1500000,2000000,2500000] # x_ext_data = [x_max+100,x_max+200,x_max+5000] x_scaled = x_minmax.transform(pd.DataFrame(x_ext_data)) x_data = [] for i in range(len(x_scaled)): x_data.append(x_scaled[i][0]) # # # # print(x_data) y_fit = hill_equation(x_data, Kd_fit, n_fit) y_fit_inv = y_minmax.inverse_transform(np.array(y_fit).reshape(-1,1)) return x_ext_data,y_fit_inv def fit_data(spend_col,prospect_col,channel): ### getting k and n parameters temp_df = df[df[spend_col]>0] temp_df.reset_index(inplace=True) X,y,x_data,y_data,x_minmax,y_minmax = input_data(temp_df,spend_col,prospect_col) y_fit, y_fit_inv, Kd_fit, n_fit = hill_func(x_data,y_data,x_minmax,y_minmax) # # # # print('k: ',Kd_fit) # # # # print('n: ', n_fit) ##### extend_s_curve x_ext_data,y_fit_inv_ext= extend_s_curve(temp_df[spend_col].max(),x_minmax,y_minmax, Kd_fit, n_fit) plot_df = data_output(channel,X,y,y_fit_inv,x_ext_data,y_fit_inv_ext) return plot_df plotly_data = fit_data(spend_cols[0],prospect_cols[0],channel_cols[0]) plotly_data.tail() for i in range(1,13): # # # # print(i) pdf = fit_data(spend_cols[i],prospect_cols[i],channel_cols[i]) plotly_data = plotly_data.merge(pdf,on = ["Date","MAT"],how = "left") def response_curves(channel,x_modified,y_modified): # Initialize the Plotly figure fig = go.Figure() x_col = (channel+"_Spends").replace('\xa0', '') y_col = ("Fit_Data_"+channel).replace('\xa0', '') # fig.add_trace(go.Scatter( # x=plotly_data[x_col], # y=plotly_data[y_col], # mode='markers', # name=x_col.replace('_Spends', '') # )) fig.add_trace(go.Scatter( x=plotly_data.sort_values(by=x_col, ascending=True)[x_col], y=plotly_data.sort_values(by=x_col, ascending=True)[y_col], mode='lines+markers', name=x_col.replace('_Spends', '') )) plotly_data2 = plotly_data.copy() plotly_data2 = plotly_data[plotly_data[x_col].isnull()==False] # # print(plotly_data[plotly_data2['Date'] == plotly_data2['Date'].max()][x_col]) # .dropna(subset=[x_col]).reset_index(inplace = True) fig.add_trace(go.Scatter( x=plotly_data[plotly_data2['Date'] == plotly_data2['Date'].max()][x_col], y=plotly_data[plotly_data2['Date'] == plotly_data2['Date'].max()][y_col], mode='markers', marker=dict( size=13 # Adjust the size value to make the markers larger or smaller , color = 'yellow' ), name="Current Spends" )) fig.add_trace(go.Scatter( x=[x_modified/104], y=[y_modified/104], mode='markers', marker=dict( size=13 # Adjust the size value to make the markers larger or smaller , color = 'blue' ), name="Optimised Spends" )) # Update layout with titles fig.update_layout( title=channel+' Response Curve', xaxis_title='Weekly Spends', yaxis_title='Prospects' ) # Show the figure return fig import pandas as pd import numpy as np import matplotlib.pyplot as plt from scipy.optimize import curve_fit from sklearn.preprocessing import MinMaxScaler import warnings warnings.filterwarnings("ignore") import plotly.graph_objects as go ## reading input data df= pd.read_csv('response_curves_input_file.csv') df.dropna(inplace=True) df['Date'] = pd.to_datetime(df['Date']) df.reset_index(inplace=True) channel_cols = [ 'BroadcastTV', 'CableTV', 'Connected&OTTTV', 'DisplayProspecting', 'DisplayRetargeting', 'Video', 'SocialProspecting', 'SocialRetargeting', 'SearchBrand', 'SearchNon-brand', 'DigitalPartners', 'Audio', 'Email'] spend_cols = [ 'tv_broadcast_spend', 'tv_cable_spend', 'stream_video_spend', 'disp_prospect_spend', 'disp_retarget_spend', 'olv_spend', 'social_prospect_spend', 'social_retarget_spend', 'search_brand_spend', 'search_nonbrand_spend', 'cm_spend', 'audio_spend', 'email_spend'] prospect_cols = [ 'Broadcast TV_Prospects', 'Cable TV_Prospects', 'Connected & OTT TV_Prospects', 'Display Prospecting_Prospects', 'Display Retargeting_Prospects', 'Video_Prospects', 'Social Prospecting_Prospects', 'Social Retargeting_Prospects', 'Search Brand_Prospects', 'Search Non-brand_Prospects', 'Digital Partners_Prospects', 'Audio_Prospects', 'Email_Prospects'] def hill_equation(x, Kd, n): return x**n / (Kd**n + x**n) def hill_func(x_data,y_data,x_minmax,y_minmax): # Fit the Hill equation to the data initial_guess = [1, 1] # Initial guess for Kd and n params, covariance = curve_fit(hill_equation, x_data, y_data, p0=initial_guess,maxfev = 1000) # Extract the fitted parameters Kd_fit, n_fit = params # Generate y values using the fitted parameters y_fit = hill_equation(x_data, Kd_fit, n_fit) x_data_inv = x_minmax.inverse_transform(np.array(x_data).reshape(-1,1)) y_data_inv = y_minmax.inverse_transform(np.array(y_data).reshape(-1,1)) y_fit_inv = y_minmax.inverse_transform(np.array(y_fit).reshape(-1,1)) # # Plot the original data and the fitted curve # plt.scatter(x_data_inv, y_data_inv, label='Actual Data') # plt.scatter(x_data_inv, y_fit_inv, label='Fit Data',color='red') # # plt.line(x_data_inv, y_fit_inv, label=f'Fitted Hill Equation (Kd={Kd_fit:.2f}, n={n_fit:.2f})', color='red') # plt.xlabel('Ligand Concentration') # plt.ylabel('Fraction of Binding') # plt.title('Fitting Hill Equation to Data') # plt.legend() # plt.show() return y_fit,y_fit_inv,Kd_fit, n_fit def data_output(channel,X,y,y_fit_inv,x_ext_data,y_fit_inv_ext): fit_col = 'Fit_Data_'+channel plot_df = pd.DataFrame() plot_df[f'{channel}_Spends'] = X plot_df['Date'] = df['Date'] plot_df['MAT'] = df['MAT'] y_fit_inv_v2 = [] for i in range(len(y_fit_inv)): y_fit_inv_v2.append(y_fit_inv[i][0]) plot_df[fit_col] = y_fit_inv_v2 # adding extra data y_fit_inv_v2_ext = [] for i in range(len(y_fit_inv_ext)): y_fit_inv_v2_ext.append(y_fit_inv_ext[i][0]) # # # # print(x_ext_data) ext_df = pd.DataFrame() ext_df[f'{channel}_Spends'] = x_ext_data ext_df[fit_col] = y_fit_inv_v2_ext ext_df['Date'] = [ np.datetime64('1950-01-01'), np.datetime64('1950-06-15'), np.datetime64('1950-12-31') ] ext_df['MAT'] = ["ext","ext","ext"] # # # # print(ext_df) plot_df= plot_df.append(ext_df) return plot_df def input_data(df,spend_col,prospect_col): X = np.array(df[spend_col].tolist()) y = np.array(df[prospect_col].tolist()) x_minmax = MinMaxScaler() x_scaled = x_minmax.fit_transform(df[[spend_col]]) x_data = [] for i in range(len(x_scaled)): x_data.append(x_scaled[i][0]) y_minmax = MinMaxScaler() y_scaled = y_minmax.fit_transform(df[[prospect_col]]) y_data = [] for i in range(len(y_scaled)): y_data.append(y_scaled[i][0]) return X,y,x_data,y_data,x_minmax,y_minmax def extend_s_curve(x_max,x_minmax,y_minmax, Kd_fit, n_fit): # # # # print(x_max) x_ext_data = [x_max*1.2,x_max*1.3,x_max*1.5] # x_ext_data = [1500000,2000000,2500000] # x_ext_data = [x_max+100,x_max+200,x_max+5000] x_scaled = x_minmax.transform(pd.DataFrame(x_ext_data)) x_data = [] for i in range(len(x_scaled)): x_data.append(x_scaled[i][0]) # # # # print(x_data) y_fit = hill_equation(x_data, Kd_fit, n_fit) y_fit_inv = y_minmax.inverse_transform(np.array(y_fit).reshape(-1,1)) return x_ext_data,y_fit_inv def fit_data(spend_col,prospect_col,channel): ### getting k and n parameters temp_df = df[df[spend_col]>0] temp_df.reset_index(inplace=True) X,y,x_data,y_data,x_minmax,y_minmax = input_data(temp_df,spend_col,prospect_col) y_fit, y_fit_inv, Kd_fit, n_fit = hill_func(x_data,y_data,x_minmax,y_minmax) # # # # print('k: ',Kd_fit) # # # # print('n: ', n_fit) ##### extend_s_curve x_ext_data,y_fit_inv_ext= extend_s_curve(temp_df[spend_col].max(),x_minmax,y_minmax, Kd_fit, n_fit) plot_df = data_output(channel,X,y,y_fit_inv,x_ext_data,y_fit_inv_ext) return plot_df plotly_data = fit_data(spend_cols[0],prospect_cols[0],channel_cols[0]) plotly_data.tail() for i in range(1,13): # # # # print(i) pdf = fit_data(spend_cols[i],prospect_cols[i],channel_cols[i]) plotly_data = plotly_data.merge(pdf,on = ["Date","MAT"],how = "left") def response_curves(channel,x_modified,y_modified): # Initialize the Plotly figure fig = go.Figure() x_col = (channel+"_Spends").replace('\xa0', '') y_col = ("Fit_Data_"+channel).replace('\xa0', '') # fig.add_trace(go.Scatter( # x=plotly_data[x_col], # y=plotly_data[y_col], # mode='markers', # name=x_col.replace('_Spends', '') # )) plotly_data1 = plotly_data[plotly_data["MAT"]!="ext"] fig.add_trace(go.Scatter( x=plotly_data1.sort_values(by=x_col, ascending=True)[x_col], y=plotly_data1.sort_values(by=x_col, ascending=True)[y_col], mode='lines', marker=dict(color = 'blue'), name=x_col.replace('_Spends', '') )) dividing_parameter = len(plotly_data1[plotly_data1[x_col].isnull()==False]) # print(dividing_parameter) plotly_data2 = plotly_data.copy() plotly_data2 = plotly_data[plotly_data[x_col].isnull()==False] plotly_data2 = plotly_data2[plotly_data2["MAT"]!="ext"] # .dropna(subset=[x_col]).reset_index(inplace = True) fig.add_trace(go.Scatter( x=np.array(plotly_data2[x_col].mean()), y=np.array(plotly_data2[y_col].mean()), mode='markers', marker=dict( size=13 # Adjust the size value to make the markers larger or smaller , color = '#516DA6' ), name="Current Spends" )) # # print(dividing_parameter) fig.add_trace(go.Scatter( x=[x_modified/dividing_parameter], y=[y_modified/dividing_parameter], mode='markers', marker=dict( size=13 # Adjust the size value to make the markers larger or smaller , color = '#4ACAD9' ), name="Optimised Spends" )) # Update layout with titles fig.update_layout( title={ 'text': channel_name_formating(channel)+' Response Curve', 'font': { 'size': 24, 'family': 'Arial', 'color': 'black', # 'bold': True } }, # title=channel_name_formating(channel)+' Response Curve', xaxis_title='Weekly Spends', yaxis_title='Prospects' ) # Show the figure return fig