import numpy as np from scipy.optimize import minimize, LinearConstraint, NonlinearConstraint from collections import OrderedDict import pandas as pd from numerize.numerize import numerize def class_to_dict(class_instance): attr_dict = {} if isinstance(class_instance, Channel): attr_dict["type"] = "Channel" attr_dict["name"] = class_instance.name attr_dict["dates"] = class_instance.dates attr_dict["spends"] = class_instance.actual_spends attr_dict["conversion_rate"] = class_instance.conversion_rate attr_dict["modified_spends"] = class_instance.modified_spends attr_dict["modified_sales"] = class_instance.modified_sales attr_dict["response_curve_type"] = class_instance.response_curve_type attr_dict["response_curve_params"] = class_instance.response_curve_params attr_dict["penalty"] = class_instance.penalty attr_dict["bounds"] = class_instance.bounds attr_dict["actual_total_spends"] = class_instance.actual_total_spends attr_dict["actual_total_sales"] = class_instance.actual_total_sales attr_dict["modified_total_spends"] = class_instance.modified_total_spends attr_dict["modified_total_sales"] = class_instance.modified_total_sales attr_dict["actual_mroi"] = class_instance.get_marginal_roi("actual") attr_dict["modified_mroi"] = class_instance.get_marginal_roi("modified") elif isinstance(class_instance, Scenario): attr_dict["type"] = "Scenario" attr_dict["name"] = class_instance.name channels = [] for channel in class_instance.channels.values(): channels.append(class_to_dict(channel)) attr_dict["channels"] = channels attr_dict["constant"] = class_instance.constant attr_dict["correction"] = class_instance.correction attr_dict["actual_total_spends"] = class_instance.actual_total_spends attr_dict["actual_total_sales"] = class_instance.actual_total_sales attr_dict["modified_total_spends"] = class_instance.modified_total_spends attr_dict["modified_total_sales"] = class_instance.modified_total_sales return attr_dict def class_from_dict(attr_dict): if attr_dict["type"] == "Channel": return Channel.from_dict(attr_dict) elif attr_dict["type"] == "Scenario": return Scenario.from_dict(attr_dict) class Channel: def __init__( self, name, dates, spends, response_curve_type, response_curve_params, bounds,channel_bounds_min,channel_bounds_max, conversion_rate=1, modified_spends=None, penalty=True, ): self.name = name self.dates = dates self.conversion_rate = conversion_rate self.actual_spends = spends.copy() if modified_spends is None: self.modified_spends = self.actual_spends.copy() else: self.modified_spends = modified_spends self.response_curve_type = response_curve_type self.response_curve_params = response_curve_params self.bounds = bounds self.channel_bounds_min = channel_bounds_min self.channel_bounds_max = channel_bounds_max self.penalty = penalty self.upper_limit = self.actual_spends.max() + self.actual_spends.std() self.power = np.ceil(np.log(self.actual_spends.max()) / np.log(10)) - 3 self.actual_sales = None self.actual_sales = self.response_curve(self.actual_spends) self.actual_total_spends = self.actual_spends.sum() self.actual_total_sales = self.actual_sales.sum() self.modified_sales = self.calculate_sales() self.modified_total_spends = self.modified_spends.sum() self.modified_total_sales = self.modified_sales.sum() self.delta_spends = self.modified_total_spends - self.actual_total_spends self.delta_sales = self.modified_total_sales - self.actual_total_sales def update_penalty(self, penalty): self.penalty = penalty def _modify_spends(self, spends_array, total_spends): return spends_array * total_spends / spends_array.sum() def modify_spends(self, total_spends): self.modified_spends = ( self.modified_spends * total_spends / self.modified_spends.sum() ) def calculate_sales(self): return self.response_curve(self.modified_spends) def response_curve(self, x): if self.penalty: x = np.where( x < self.upper_limit, x, self.upper_limit + (x - self.upper_limit) * self.upper_limit / x, ) if self.response_curve_type == "s-curve": if self.power >= 0: x = x / 10**self.power x = x.astype("float64") K = self.response_curve_params["K"] b = self.response_curve_params["b"] a = self.response_curve_params["a"] x0 = self.response_curve_params["x0"] sales = K / (1 + b * np.exp(-a * (x - x0))) if self.response_curve_type == "linear": beta = self.response_curve_params["beta"] sales = beta * x return sales def get_marginal_roi(self, flag): K = self.response_curve_params["K"] a = self.response_curve_params["a"] # x = self.modified_total_spends # if self.power >= 0 : # x = x / 10**self.power # x = x.astype('float64') # return K*b*a*np.exp(-a*(x-x0)) / (1 + b * np.exp(-a*(x - x0)))**2 if flag == "actual": y = self.response_curve(self.actual_spends) # spends_array = self.actual_spends # total_spends = self.actual_total_spends # total_sales = self.actual_total_sales else: y = self.response_curve(self.modified_spends) # spends_array = self.modified_spends # total_spends = self.modified_total_spends # total_sales = self.modified_total_sales # spends_inc_1 = self._modify_spends(spends_array, total_spends+1) mroi = a * (y) * (1 - y / K) return mroi.sum() / len(self.modified_spends) # spends_inc_1 = self.spends_array + 1 # new_total_sales = self.response_curve(spends_inc_1).sum() # return (new_total_sales - total_sales) / len(self.modified_spends) def update(self, total_spends): self.modify_spends(total_spends) self.modified_sales = self.calculate_sales() self.modified_total_spends = self.modified_spends.sum() self.modified_total_sales = self.modified_sales.sum() self.delta_spends = self.modified_total_spends - self.actual_total_spends self.delta_sales = self.modified_total_sales - self.actual_total_sales def update_bounds_min(self, modified_bound): self.channel_bounds_min = modified_bound def update_bounds_max(self, modified_bound): self.channel_bounds_max = modified_bound def intialize(self): self.new_spends = self.old_spends def __str__(self): return f"{self.name},{self.actual_total_sales}, {self.modified_total_spends}" @classmethod def from_dict(cls, attr_dict): return Channel( name=attr_dict["name"], dates=attr_dict["dates"], spends=attr_dict["spends"], bounds=attr_dict["bounds"], modified_spends=attr_dict["modified_spends"], response_curve_type=attr_dict["response_curve_type"], response_curve_params=attr_dict["response_curve_params"], penalty=attr_dict["penalty"], ) def update_response_curves(self, response_curve_params): self.response_curve_params = response_curve_params class Scenario: def __init__(self, name, channels, constant, correction): self.name = name self.channels = channels self.constant = constant self.correction = correction self.actual_total_spends = self.calculate_modified_total_spends() self.actual_total_sales = self.calculate_actual_total_sales() self.modified_total_sales = self.calculate_modified_total_sales() self.modified_total_spends = self.calculate_modified_total_spends() self.delta_spends = self.modified_total_spends - self.actual_total_spends self.delta_sales = self.modified_total_sales - self.actual_total_sales def update_penalty(self, value): for channel in self.channels.values(): channel.update_penalty(value) def calculate_modified_total_spends(self): total_actual_spends = 0.0 for channel in self.channels.values(): total_actual_spends += channel.actual_total_spends * channel.conversion_rate return total_actual_spends def calculate_modified_total_spends(self): total_modified_spends = 0.0 for channel in self.channels.values(): # import streamlit as st # st.write(channel.modified_total_spends ) total_modified_spends += ( channel.modified_total_spends * channel.conversion_rate ) return total_modified_spends def calculate_actual_total_sales(self): total_actual_sales = self.constant.sum() + self.correction.sum() for channel in self.channels.values(): total_actual_sales += channel.actual_total_sales return total_actual_sales def calculate_modified_total_sales(self): total_modified_sales = self.constant.sum() + self.correction.sum() for channel in self.channels.values(): total_modified_sales += channel.modified_total_sales return total_modified_sales def update(self, channel_name, modified_spends): self.channels[channel_name].update(modified_spends) self.modified_total_sales = self.calculate_modified_total_sales() self.modified_total_spends = self.calculate_modified_total_spends() self.delta_spends = self.modified_total_spends - self.actual_total_spends self.delta_sales = self.modified_total_sales - self.actual_total_sales def update_bounds_min(self, channel_name,modified_bound): # self.modify_spends(total_spends) self.channels[channel_name].update_bounds_min(modified_bound) def update_bounds_max(self, channel_name,modified_bound): # self.modify_spends(total_spends) self.channels[channel_name].update_bounds_max(modified_bound) # def optimize_spends(self, sales_percent, channels_list, algo="COBYLA"): # desired_sales = self.actual_total_sales * (1 + sales_percent / 100.0) # def constraint(x): # for ch, spends in zip(channels_list, x): # self.update(ch, spends) # return self.modified_total_sales - desired_sales # bounds = [] # for ch in channels_list: # bounds.append( # (1 + np.array([-50.0, 100.0]) / 100.0) # * self.channels[ch].actual_total_spends # ) # initial_point = [] # for bound in bounds: # initial_point.append(bound[0]) # power = np.ceil(np.log(sum(initial_point)) / np.log(10)) # constraints = [NonlinearConstraint(constraint, -1.0, 1.0)] # res = minimize( # lambda x: sum(x) / 10 ** (power), # bounds=bounds, # x0=initial_point, # constraints=constraints, # method=algo, # options={"maxiter": int(2e7), "catol": 1}, # ) # for channel_name, modified_spends in zip(channels_list, res.x): # self.update(channel_name, modified_spends) # return zip(channels_list, res.x) def optimize_spends(self, sales_percent, channels_list, algo="trust-constr"): print("%"*100) desired_sales = self.actual_total_sales * (1 + sales_percent / 100.0) def constraint(x): for ch, spends in zip(channels_list, x): self.update(ch, spends) return self.modified_total_sales - desired_sales bounds = [] for ch in channels_list: bounds.append( (1+np.array([-50.0, 100.0]) / 100.0) * self.channels[ch].actual_total_spends ) initial_point = [] for bound in bounds: initial_point.append(bound[0]) power = np.ceil(np.log(sum(initial_point)) / np.log(10)) constraints = [NonlinearConstraint(constraint, -1.0, 1.0)] res = minimize( lambda x: sum(x) / 10 ** (power), bounds=bounds, x0=initial_point, constraints=constraints, method=algo, options={"maxiter": int(2e7), "xtol": 10}, ) for channel_name, modified_spends in zip(channels_list, res.x): self.update(channel_name, modified_spends) return zip(channels_list, res.x) def optimize(self, spends_percent, channels_list): # channels_list = self.channels.keys() num_channels = len(channels_list) spends_constant = [] spends_constraint = 0.0 for channel_name in channels_list: # spends_constraint += self.channels[channel_name].modified_total_spends spends_constant.append(self.channels[channel_name].conversion_rate) spends_constraint += ( self.channels[channel_name].actual_total_spends * self.channels[channel_name].conversion_rate ) spends_constraint = spends_constraint * (1 + spends_percent / 100) # constraint= LinearConstraint(np.ones((num_channels,)), lb = spends_constraint, ub = spends_constraint) constraint = LinearConstraint( np.array(spends_constant), lb=spends_constraint, ub=spends_constraint, ) bounds = [] old_spends = [] for channel_name in channels_list: _channel_class = self.channels[channel_name] channel_bounds = _channel_class.bounds channel_actual_total_spends = _channel_class.actual_total_spends * ( (1 + spends_percent / 100) ) old_spends.append(channel_actual_total_spends) bounds.append((1+ channel_bounds / 100) * channel_actual_total_spends) def objective_function(x): for channel_name, modified_spends in zip(channels_list, x): self.update(channel_name, modified_spends) return -1 * self.modified_total_sales print(bounds) print("$"*100) res = minimize( lambda x: objective_function(x) / 1e3, method="trust-constr", x0=old_spends, constraints=constraint, bounds=bounds, options={"maxiter": int(1e7), "xtol": 50}, ) # res = dual_annealing( # objective_function, # x0=old_spends, # mi # constraints=constraint, # bounds=bounds, # tol=1e-16 # ) print(res) for channel_name, modified_spends in zip(channels_list, res.x): self.update(channel_name, modified_spends) return zip(channels_list, res.x) def save(self): details = {} actual_list = [] modified_list = [] data = {} channel_data = [] summary_rows = [] actual_list.append( { "name": "Total", "Spends": self.actual_total_spends, "Sales": self.actual_total_sales, } ) modified_list.append( { "name": "Total", "Spends": self.modified_total_spends, "Sales": self.modified_total_sales, } ) for channel in self.channels.values(): name_mod = channel.name.replace("_", " ") if name_mod.lower().endswith(" imp"): name_mod = name_mod.replace("Imp", " Impressions") summary_rows.append( [ name_mod, channel.actual_total_spends, channel.modified_total_spends, channel.actual_total_sales, channel.modified_total_sales, round(channel.actual_total_sales / channel.actual_total_spends, 2), round( channel.modified_total_sales / channel.modified_total_spends, 2, ), channel.get_marginal_roi("actual"), channel.get_marginal_roi("modified"), ] ) data[channel.name] = channel.modified_spends data["Date"] = channel.dates data["Sales"] = ( data.get("Sales", np.zeros((len(channel.dates),))) + channel.modified_sales ) actual_list.append( { "name": channel.name, "Spends": channel.actual_total_spends, "Sales": channel.actual_total_sales, "ROI": round( channel.actual_total_sales / channel.actual_total_spends, 2 ), } ) modified_list.append( { "name": channel.name, "Spends": channel.modified_total_spends, "Sales": channel.modified_total_sales, "ROI": round( channel.modified_total_sales / channel.modified_total_spends, 2, ), "Marginal ROI": channel.get_marginal_roi("modified"), } ) channel_data.append( { "channel": channel.name, "spends_act": channel.actual_total_spends, "spends_mod": channel.modified_total_spends, "sales_act": channel.actual_total_sales, "sales_mod": channel.modified_total_sales, } ) summary_rows.append( [ "Total", self.actual_total_spends, self.modified_total_spends, self.actual_total_sales, self.modified_total_sales, round(self.actual_total_sales / self.actual_total_spends, 2), round(self.modified_total_sales / self.modified_total_spends, 2), 0.0, 0.0, ] ) details["Actual"] = actual_list details["Modified"] = modified_list columns_index = pd.MultiIndex.from_product( [[""], ["Channel"]], names=["first", "second"] ) columns_index = columns_index.append( pd.MultiIndex.from_product( [["Spends", "NRPU", "ROI", "MROI"], ["Actual", "Simulated"]], names=["first", "second"], ) ) details["Summary"] = pd.DataFrame(summary_rows, columns=columns_index) data_df = pd.DataFrame(data) channel_list = list(self.channels.keys()) data_df = data_df[["Date", *channel_list, "Sales"]] details["download"] = { "data_df": data_df, "channels_df": pd.DataFrame(channel_data), "total_spends_act": self.actual_total_spends, "total_sales_act": self.actual_total_sales, "total_spends_mod": self.modified_total_spends, "total_sales_mod": self.modified_total_sales, } return details @classmethod def from_dict(cls, attr_dict): channels_list = attr_dict["channels"] channels = { channel["name"]: class_from_dict(channel) for channel in channels_list } return Scenario( name=attr_dict["name"], channels=channels, constant=attr_dict["constant"], correction=attr_dict["correction"], )