Spaces:
Running
Running
import numpy as np | |
from scipy.optimize import minimize, LinearConstraint, NonlinearConstraint | |
from collections import OrderedDict | |
import pandas as pd | |
from numerize.numerize import numerize | |
# from gekko import GEKKO | |
def class_to_dict(class_instance): | |
attr_dict = {} | |
if isinstance(class_instance, Channel): | |
attr_dict["type"] = "Channel" | |
attr_dict["name"] = class_instance.name | |
attr_dict["dates"] = class_instance.dates | |
attr_dict["spends"] = class_instance.actual_spends | |
attr_dict["conversion_rate"] = class_instance.conversion_rate | |
attr_dict["modified_spends"] = class_instance.modified_spends | |
attr_dict["modified_sales"] = class_instance.modified_sales | |
attr_dict["response_curve_type"] = class_instance.response_curve_type | |
attr_dict["response_curve_params"] = class_instance.response_curve_params | |
attr_dict["penalty"] = class_instance.penalty | |
attr_dict["bounds"] = class_instance.bounds | |
attr_dict["actual_total_spends"] = class_instance.actual_total_spends | |
attr_dict["actual_total_sales"] = class_instance.actual_total_sales | |
attr_dict["modified_total_spends"] = class_instance.modified_total_spends | |
attr_dict["modified_total_sales"] = class_instance.modified_total_sales | |
# attr_dict["actual_mroi"] = class_instance.get_marginal_roi("actual") | |
# attr_dict["modified_mroi"] = class_instance.get_marginal_roi("modified") | |
elif isinstance(class_instance, Scenario): | |
attr_dict["type"] = "Scenario" | |
attr_dict["name"] = class_instance.name | |
channels = [] | |
for channel in class_instance.channels.values(): | |
channels.append(class_to_dict(channel)) | |
attr_dict["channels"] = channels | |
attr_dict["constant"] = class_instance.constant | |
attr_dict["correction"] = class_instance.correction | |
attr_dict["actual_total_spends"] = class_instance.actual_total_spends | |
attr_dict["actual_total_sales"] = class_instance.actual_total_sales | |
attr_dict["modified_total_spends"] = class_instance.modified_total_spends | |
attr_dict["modified_total_sales"] = class_instance.modified_total_sales | |
return attr_dict | |
def class_from_dict(attr_dict): | |
if attr_dict["type"] == "Channel": | |
return Channel.from_dict(attr_dict) | |
elif attr_dict["type"] == "Scenario": | |
return Scenario.from_dict(attr_dict) | |
class Channel: | |
def __init__( | |
self, | |
name, | |
dates, | |
spends, | |
sales, | |
response_curve_type, | |
response_curve_params, | |
bounds,channel_bounds_min,channel_bounds_max, | |
conversion_rate=1, | |
modified_spends=None, | |
penalty=True, | |
): | |
self.name = name | |
self.dates = dates | |
self.conversion_rate = conversion_rate | |
self.actual_spends = spends.copy() | |
self.actual_sales = sales.copy() | |
if modified_spends is None: | |
self.modified_spends = self.actual_spends.copy() | |
else: | |
self.modified_spends = modified_spends | |
self.response_curve_type = response_curve_type | |
self.response_curve_params = response_curve_params | |
self.bounds = bounds | |
self.channel_bounds_min = channel_bounds_min | |
self.channel_bounds_max = channel_bounds_max | |
self.penalty = penalty | |
self.upper_limit = self.actual_spends.max() + self.actual_spends.std() | |
self.power = np.ceil(np.log(self.actual_spends.max()) / np.log(10)) - 3 | |
# self.actual_sales = None | |
# self.actual_sales = self.response_curve(self.actual_spends)#sales.copy()# | |
self.actual_total_spends = self.actual_spends.sum() | |
self.actual_total_sales = self.actual_sales.sum() | |
self.modified_sales = self.calculate_sales() | |
self.modified_total_spends = self.modified_spends.sum() | |
self.modified_total_sales = self.modified_sales.sum() | |
self.delta_spends = self.modified_total_spends - self.actual_total_spends | |
self.delta_sales = self.modified_total_sales - self.actual_total_sales | |
def update_penalty(self, penalty): | |
self.penalty = penalty | |
def _modify_spends(self, spends_array, total_spends): | |
return spends_array * total_spends / spends_array.sum() | |
def modify_spends(self, total_spends): | |
self.modified_spends = ( | |
self.modified_spends * total_spends / self.modified_spends.sum() | |
) | |
def calculate_sales(self): | |
# # print("in calc_sales") | |
return self.response_curve(self.modified_spends) | |
def hill_equation(x, Kd, n): | |
return x**n / (Kd**n + x**n) | |
def response_curve(self, x): | |
if self.penalty: | |
x = np.where( | |
x < self.upper_limit, | |
x, | |
self.upper_limit + (x - self.upper_limit) * self.upper_limit / x, | |
) | |
if self.response_curve_type == "hill-eq": | |
# dividing_parameter = check_dividing_parameter() | |
# # print("lalala") | |
# # print(self.name) | |
if len(x) == 1: | |
dividing_rate = 104 | |
# x = np.sum(x) | |
else: | |
dividing_rate = 1 | |
# x = np.sum(x) | |
# dividing_rate = 104 | |
Kd= self.response_curve_params["Kd"] | |
n= self.response_curve_params["n"] | |
x_min= self.response_curve_params["x_min"] | |
x_max= self.response_curve_params["x_max"] | |
y_min= self.response_curve_params["y_min"] | |
y_max= self.response_curve_params['y_max'] | |
# # print(x_min) | |
# # print(Kd,n,x_min,x_max,y_min,y_max) | |
# # print(np.sum(x)/104) | |
x_inp = ( x/dividing_rate- x_min) / (x_max - x_min) | |
# # print("x",x) | |
# # print("x_inp",x_inp) | |
x_out = x_inp**n / (Kd**n + x_inp**n) #self.hill_equation(x_inp,Kd, n) | |
# # print("x_out",x_out) | |
x_val_inv = (x_out*x_max + (1 - x_out) * x_min) | |
sales = (x_val_inv*y_min/y_max)*dividing_rate | |
# sales = ((x_max - x_min)*x_out + x_min)*dividing_rate | |
sales[np.isnan(sales)] = 0 | |
# # print(sales) | |
# # print(np.sum(sales)) | |
# # print("sales",sales) | |
if self.response_curve_type == "s-curve": | |
if self.power >= 0: | |
x = x / 10**self.power | |
x = x.astype("float64") | |
K = self.response_curve_params["Kd"] | |
b = self.response_curve_params["b"] | |
a = self.response_curve_params["a"] | |
x0 = self.response_curve_params["x0"] | |
sales = K / (1 + b * np.exp(-a * (x - x0))) | |
if self.response_curve_type == "linear": | |
beta = self.response_curve_params["beta"] | |
sales = beta * x | |
return sales | |
def get_marginal_roi(self, flag): | |
K = self.response_curve_params["K"] | |
a = self.response_curve_params["a"] | |
# x = self.modified_total_spends | |
# if self.power >= 0 : | |
# x = x / 10**self.power | |
# x = x.astype('float64') | |
# return K*b*a*np.exp(-a*(x-x0)) / (1 + b * np.exp(-a*(x - x0)))**2 | |
if flag == "actual": | |
y = self.response_curve(self.actual_spends) | |
# spends_array = self.actual_spends | |
# total_spends = self.actual_total_spends | |
# total_sales = self.actual_total_sales | |
else: | |
y = self.response_curve(self.modified_spends) | |
# spends_array = self.modified_spends | |
# total_spends = self.modified_total_spends | |
# total_sales = self.modified_total_sales | |
# spends_inc_1 = self._modify_spends(spends_array, total_spends+1) | |
mroi = a * (y) * (1 - y / K) | |
return mroi.sum() / len(self.modified_spends) | |
# spends_inc_1 = self.spends_array + 1 | |
# new_total_sales = self.response_curve(spends_inc_1).sum() | |
# return (new_total_sales - total_sales) / len(self.modified_spends) | |
def update(self, total_spends): | |
self.modify_spends(total_spends) | |
self.modified_sales = self.calculate_sales() | |
self.modified_total_spends = self.modified_spends.sum() | |
self.modified_total_sales = self.modified_sales.sum() | |
self.delta_spends = self.modified_total_spends - self.actual_total_spends | |
self.delta_sales = self.modified_total_sales - self.actual_total_sales | |
def update_bounds_min(self, modified_bound): | |
self.channel_bounds_min = modified_bound | |
def update_bounds_max(self, modified_bound): | |
self.channel_bounds_max = modified_bound | |
def intialize(self): | |
self.new_spends = self.old_spends | |
def __str__(self): | |
return f"{self.name},{self.actual_total_sales}, {self.modified_total_spends}" | |
def from_dict(cls, attr_dict): | |
return Channel( | |
name=attr_dict["name"], | |
dates=attr_dict["dates"], | |
spends=attr_dict["spends"], | |
bounds=attr_dict["bounds"], | |
modified_spends=attr_dict["modified_spends"], | |
response_curve_type=attr_dict["response_curve_type"], | |
response_curve_params=attr_dict["response_curve_params"], | |
penalty=attr_dict["penalty"], | |
) | |
def update_response_curves(self, response_curve_params): | |
self.response_curve_params = response_curve_params | |
class Scenario: | |
def __init__(self, name, channels, constant, correction): | |
self.name = name | |
self.channels = channels | |
self.constant = constant | |
self.correction = correction | |
self.actual_total_spends = self.calculate_modified_total_spends() | |
self.actual_total_sales = self.calculate_actual_total_sales() | |
self.modified_total_sales = self.calculate_modified_total_sales() | |
self.modified_total_spends = self.calculate_modified_total_spends() | |
self.delta_spends = self.modified_total_spends - self.actual_total_spends | |
self.delta_sales = self.modified_total_sales - self.actual_total_sales | |
def update_penalty(self, value): | |
for channel in self.channels.values(): | |
channel.update_penalty(value) | |
def calculate_modified_total_spends(self): | |
total_actual_spends = 0.0 | |
for channel in self.channels.values(): | |
total_actual_spends += channel.actual_total_spends * channel.conversion_rate | |
return total_actual_spends | |
def calculate_modified_total_spends(self): | |
total_modified_spends = 0.0 | |
for channel in self.channels.values(): | |
# import streamlit as st | |
# st.write(channel.modified_total_spends ) | |
total_modified_spends += ( | |
channel.modified_total_spends * channel.conversion_rate | |
) | |
return total_modified_spends | |
def calculate_actual_total_sales(self): | |
total_actual_sales = 0 #self.constant.sum() + | |
# # print(self.correction) | |
for channel in self.channels.values(): | |
total_actual_sales += channel.actual_total_sales | |
# # print(channel.actual_total_sales) | |
# # print(total_actual_sales) | |
return total_actual_sales | |
def calculate_modified_total_sales(self): | |
total_modified_sales = 0 #self.constant.sum() + self.correction.sum() | |
for channel in self.channels.values(): | |
# print(channel,channel.modified_total_sales) | |
total_modified_sales += channel.modified_total_sales | |
return total_modified_sales | |
def update(self, channel_name, modified_spends): | |
self.channels[channel_name].update(modified_spends) | |
self.modified_total_sales = self.calculate_modified_total_sales() | |
self.modified_total_spends = self.calculate_modified_total_spends() | |
self.delta_spends = self.modified_total_spends - self.actual_total_spends | |
self.delta_sales = self.modified_total_sales - self.actual_total_sales | |
def update_bounds_min(self, channel_name,modified_bound): | |
# self.modify_spends(total_spends) | |
self.channels[channel_name].update_bounds_min(modified_bound) | |
def update_bounds_max(self, channel_name,modified_bound): | |
# self.modify_spends(total_spends) | |
self.channels[channel_name].update_bounds_max(modified_bound) | |
# def optimize_spends(self, sales_percent, channels_list, algo="COBYLA"): | |
# desired_sales = self.actual_total_sales * (1 + sales_percent / 100.0) | |
# def constraint(x): | |
# for ch, spends in zip(channels_list, x): | |
# self.update(ch, spends) | |
# return self.modified_total_sales - desired_sales | |
# bounds = [] | |
# for ch in channels_list: | |
# bounds.append( | |
# (1 + np.array([-50.0, 100.0]) / 100.0) | |
# * self.channels[ch].actual_total_spends | |
# ) | |
# initial_point = [] | |
# for bound in bounds: | |
# initial_point.append(bound[0]) | |
# power = np.ceil(np.log(sum(initial_point)) / np.log(10)) | |
# constraints = [NonlinearConstraint(constraint, -1.0, 1.0)] | |
# res = minimize( | |
# lambda x: sum(x) / 10 ** (power), | |
# bounds=bounds, | |
# x0=initial_point, | |
# constraints=constraints, | |
# method=algo, | |
# options={"maxiter": int(2e7), "catol": 1}, | |
# ) | |
# for channel_name, modified_spends in zip(channels_list, res.x): | |
# self.update(channel_name, modified_spends) | |
# return zip(channels_list, res.x) | |
def hill_equation(x, Kd, n): | |
return x**n / (Kd**n + x**n) | |
def cost_func(channel,x): | |
x_inp = (x/104 - param_dicts["x_min"][channel]) / (param_dicts["x_max"][channel] - param_dicts["x_min"][channel]) | |
# # print(x_inp) | |
x_out = hill_equation(x_inp, param_dicts["Kd"][channel], param_dicts["n"][channel]) | |
# # print(x_out) | |
# | |
return (param_dicts["y_max"][channel] - param_dicts["y_min"][channel])*(x_out + param_dicts["y_min"][channel])*104 | |
def optimize_spends(self, sales_percent, channels_list, algo="trust-constr"): | |
# # print("%"*100) | |
desired_sales = self.actual_total_sales * (1 + sales_percent / 100.0) | |
def constraint(x): | |
for ch, spends in zip(channels_list, x): | |
self.update(ch, spends) | |
return self.modified_total_sales - desired_sales | |
bounds = [] | |
for ch in channels_list: | |
bounds.append( | |
(1+np.array([-50.0, 100.0]) / 100.0) | |
* self.channels[ch].actual_total_spends | |
) | |
# # print(self.channels[ch].actual_total_spends) | |
initial_point = [] | |
for bound in bounds: | |
initial_point.append(bound[0]) | |
# initial_point = np.nan_to_num(initial_point, nan=0.0, posinf=0.0, neginf=0.0) | |
power = np.ceil(np.log(sum(initial_point)) / np.log(10)) | |
constraints = [NonlinearConstraint(constraint, -1.0, 1.0)] | |
res = minimize( | |
lambda x: sum(x) / 10 ** (power), | |
bounds=bounds, | |
x0=initial_point, | |
constraints=constraints, | |
method=algo, | |
options={"maxiter": int(2e7), "xtol": 10}, | |
) | |
for channel_name, modified_spends in zip(channels_list, res.x): | |
self.update(channel_name, modified_spends) | |
return zip(channels_list, res.x) | |
def optimize(self, spends_percent, channels_list): | |
# channels_list = self.channels.keys() | |
num_channels = len(channels_list) | |
spends_constant = [] | |
spends_constraint = 0.0 | |
for channel_name in channels_list: | |
# spends_constraint += self.channels[channel_name].modified_total_spends | |
spends_constant.append(self.channels[channel_name].conversion_rate) | |
spends_constraint += ( | |
self.channels[channel_name].actual_total_spends | |
* self.channels[channel_name].conversion_rate | |
) | |
spends_constraint = spends_constraint * (1 + spends_percent / 100) | |
# constraint= LinearConstraint(np.ones((num_channels,)), lb = spends_constraint, ub = spends_constraint) | |
constraint = LinearConstraint( | |
np.array(spends_constant), | |
lb=spends_constraint, | |
ub=spends_constraint, | |
) | |
bounds = [] | |
old_spends = [] | |
for channel_name in channels_list: | |
_channel_class = self.channels[channel_name] | |
channel_bounds = _channel_class.bounds | |
channel_actual_total_spends = _channel_class.actual_total_spends * ( | |
(1 + spends_percent / 100) | |
) | |
old_spends.append(channel_actual_total_spends) | |
bounds.append((1+ channel_bounds / 100) * channel_actual_total_spends) | |
def objective_function(x): | |
for channel_name, modified_spends in zip(channels_list, x): | |
self.update(channel_name, modified_spends) | |
return -1 * self.modified_total_sales | |
# # print(bounds) | |
# # print("$"*100) | |
res = minimize( | |
lambda x: objective_function(x) / 1e3, | |
method="trust-constr", | |
x0=old_spends, | |
constraints=constraint, | |
bounds=bounds, | |
options={"maxiter": int(1e7), "xtol": 50}, | |
) | |
# res = dual_annealing( | |
# objective_function, | |
# x0=old_spends, | |
# mi | |
# constraints=constraint, | |
# bounds=bounds, | |
# tol=1e-16 | |
# ) | |
# # print(res) | |
for channel_name, modified_spends in zip(channels_list, res.x): | |
self.update(channel_name, modified_spends) | |
return zip(channels_list, res.x) | |
def hill_equation(self,x, Kd, n): | |
return x**n / (Kd**n + x**n) | |
def cost_func(self ,channel,x): | |
response_curve_params = pd.read_excel("response_curves_parameters.xlsx",index_col = "channel") | |
param_dicts = {col: response_curve_params[col].to_dict() for col in response_curve_params.columns} | |
x_inp = (x/104 - param_dicts["x_min"][channel]) / (param_dicts["x_max"][channel] - param_dicts["x_min"][channel]) | |
# # print(x_inp) | |
x_out = self.hill_equation(x_inp, param_dicts["Kd"][channel], param_dicts["n"][channel]) | |
# # print(x_out) | |
# | |
return (param_dicts["y_max"][channel] - param_dicts["y_min"][channel])*(x_out + param_dicts["y_min"][channel])*104 | |
# def spends_optimisation(self, spends_percent,channels_list): | |
# m = GEKKO(remote=False) | |
# # Define variables | |
# # Initialize 13 variables with specific bounds | |
# response_curve_params = pd.read_excel(r"C:\Users\PragyaJatav\Downloads\Untitled Folder 2\simulator uploaded - Copy\Simulator-UOPX\response_curves_parameters.xlsx",index_col = "channel") | |
# param_dicts = {col: response_curve_params[col].to_dict() for col in response_curve_params.columns} | |
# initial_values = list(param_dicts["x_min"].values()) | |
# current_spends = list(param_dicts["current_spends"].values()) | |
# lower_bounds = list(param_dicts["x_min"].values()) | |
# num_channels = len(channels_list) | |
# x_vars=[] | |
# x_vars = [m.Var(value=param_dicts["current_spends"][_], lb=param_dicts["x_min"][_]*104, ub=5*param_dicts["current_spends"][_]) for _ in channels_list] | |
# # print(x_vars) | |
# # x_vars,lower_bounds | |
# # Define the objective function to minimize | |
# cost = 0 | |
# spends = 0 | |
# i = 0 | |
# for i,c in enumerate(channels_list): | |
# # # print(c) | |
# # # print(x_vars[i]) | |
# cost = cost + (self.cost_func(c, x_vars[i])) | |
# spends = spends +x_vars[i] | |
# m.Maximize(cost) | |
# # Define constraints | |
# m.Equation(spends == sum(current_spends)*(1 + spends_percent / 100)) | |
# m.Equation(spends <= sum(current_spends)*0.5) | |
# m.Equation(spends >= sum(current_spends)*1.5) | |
# m.solve(disp=True) | |
# for i, var in enumerate(x_vars): | |
# # print(f"x{i+1} = {var.value[0]}") | |
# for channel_name, modified_spends in zip(channels_list, x_vars): | |
# self.update(channel_name, modified_spends.value[0]) | |
# return zip(channels_list, x_vars) | |
def save(self): | |
details = {} | |
actual_list = [] | |
modified_list = [] | |
data = {} | |
channel_data = [] | |
summary_rows = [] | |
actual_list.append( | |
{ | |
"name": "Total", | |
"Spends": self.actual_total_spends, | |
"Sales": self.actual_total_sales, | |
} | |
) | |
modified_list.append( | |
{ | |
"name": "Total", | |
"Spends": self.modified_total_spends, | |
"Sales": self.modified_total_sales, | |
} | |
) | |
for channel in self.channels.values(): | |
name_mod = channel.name.replace("_", " ") | |
if name_mod.lower().endswith(" imp"): | |
name_mod = name_mod.replace("Imp", " Impressions") | |
summary_rows.append( | |
[ | |
name_mod, | |
channel.actual_total_spends, | |
channel.modified_total_spends, | |
channel.actual_total_sales, | |
channel.modified_total_sales, | |
round(channel.actual_total_sales / channel.actual_total_spends, 2), | |
round( | |
channel.modified_total_sales / channel.modified_total_spends, | |
2, | |
), | |
channel.get_marginal_roi("actual"), | |
channel.get_marginal_roi("modified"), | |
] | |
) | |
data[channel.name] = channel.modified_spends | |
data["Date"] = channel.dates | |
data["Sales"] = ( | |
data.get("Sales", np.zeros((len(channel.dates),))) | |
+ channel.modified_sales | |
) | |
actual_list.append( | |
{ | |
"name": channel.name, | |
"Spends": channel.actual_total_spends, | |
"Sales": channel.actual_total_sales, | |
"ROI": round( | |
channel.actual_total_sales / channel.actual_total_spends, 2 | |
), | |
} | |
) | |
modified_list.append( | |
{ | |
"name": channel.name, | |
"Spends": channel.modified_total_spends, | |
"Sales": channel.modified_total_sales, | |
"ROI": round( | |
channel.modified_total_sales / channel.modified_total_spends, | |
2, | |
), | |
"Marginal ROI": channel.get_marginal_roi("modified"), | |
} | |
) | |
channel_data.append( | |
{ | |
"channel": channel.name, | |
"spends_act": channel.actual_total_spends, | |
"spends_mod": channel.modified_total_spends, | |
"sales_act": channel.actual_total_sales, | |
"sales_mod": channel.modified_total_sales, | |
} | |
) | |
summary_rows.append( | |
[ | |
"Total", | |
self.actual_total_spends, | |
self.modified_total_spends, | |
self.actual_total_sales, | |
self.modified_total_sales, | |
round(self.actual_total_sales / self.actual_total_spends, 2), | |
round(self.modified_total_sales / self.modified_total_spends, 2), | |
0.0, | |
0.0, | |
] | |
) | |
details["Actual"] = actual_list | |
details["Modified"] = modified_list | |
columns_index = pd.MultiIndex.from_product( | |
[[""], ["Channel"]], names=["first", "second"] | |
) | |
columns_index = columns_index.append( | |
pd.MultiIndex.from_product( | |
[["Spends", "NRPU", "ROI", "MROI"], ["Actual", "Simulated"]], | |
names=["first", "second"], | |
) | |
) | |
details["Summary"] = pd.DataFrame(summary_rows, columns=columns_index) | |
data_df = pd.DataFrame(data) | |
channel_list = list(self.channels.keys()) | |
data_df = data_df[["Date", *channel_list, "Sales"]] | |
details["download"] = { | |
"data_df": data_df, | |
"channels_df": pd.DataFrame(channel_data), | |
"total_spends_act": self.actual_total_spends, | |
"total_sales_act": self.actual_total_sales, | |
"total_spends_mod": self.modified_total_spends, | |
"total_sales_mod": self.modified_total_sales, | |
} | |
return details | |
def from_dict(cls, attr_dict): | |
channels_list = attr_dict["channels"] | |
channels = { | |
channel["name"]: class_from_dict(channel) for channel in channels_list | |
} | |
return Scenario( | |
name=attr_dict["name"], | |
channels=channels, | |
constant=attr_dict["constant"], | |
correction=attr_dict["correction"], | |
) | |