SFL / MLP_Model.py
amirhosseinkarami's picture
Add code files
bae498f
import time
import numpy as np
from sklearn.neural_network import MLPRegressor
from sklearn.utils.validation import column_or_1d
import Settings as settings
from DataUtils import make_y_multi_safe
hidden_layer_sizes = (5, 5)
max_iter_mlp = 500000
class MLP_Model:
def __init__(self):
self.name = "MLP Model"
self.short_name = "MLP"
self.hidden_layer_sizes = hidden_layer_sizes
self.solver = 'adam'
self.max_iter = max_iter_mlp
self.warm_start = False
self.verbose = False
self.est_mlp = MLPRegressor(hidden_layer_sizes=self.hidden_layer_sizes,
solver=self.solver,
activation='relu',
max_iter=self.max_iter,
verbose=self.verbose,
tol=1e-7,
warm_start=self.warm_start,
n_iter_no_change=100)
def get_formula_string(self):
return "(neural black box)"
# current_inputs = ["X{}".format(i + 1) for i in range(settings.num_features)]
# # print(current_inputs)
# matrices = self.est_mlp.coefs_
# vectors = self.est_mlp.intercepts_
# for i in range(len(matrices)):
# current_outputs = []
#
# for j in range(matrices[i].shape[1]):
# current_term = [vectors[i][j]]
# for k in range(matrices[i].shape[0]):
# sys.stdout.flush()
# current_term.append(["*", matrices[i][k][j], current_inputs[k]])
#
# current_output = current_term[-1]
# for k in range(len(current_term), 1, -1):
# current_output = ["+", current_term[k - 2], current_output]
# current_outputs.append(current_output)
# current_inputs = [["max", 0, old_out] for old_out in current_outputs]
#
# # [-1] since we don't do relu activation on the last layer.
# return current_inputs[0][-1]
def get_formula(self):
return "(neural black box)"
# return self.get_formula_string()
def train(self, X, Y):
X = np.reshape(X, [X.shape[0], -1])
Y = np.reshape(Y, [-1, 1])
Y = column_or_1d(Y)
self.est_mlp.fit(X, Y)
return None
def predict(self, X):
return self.est_mlp.predict(X)
# Mean square error
def test(self, X, Y):
X = np.reshape(X, [X.shape[0], -1])
y_hat = np.reshape(self.est_mlp.predict(X), [1, -1])[0]
y_gold = np.reshape(Y, [1, -1])[0]
our_sum = 0
for i in range(len(y_gold)):
our_sum += (y_hat[i] - y_gold[i]) ** 2
return our_sum / len(y_gold)
def reset(self):
self.est_mlp = MLPRegressor(hidden_layer_sizes=self.hidden_layer_sizes,
solver=self.solver,
activation='relu',
max_iter=self.max_iter,
verbose=self.verbose,
tol=1e-7,
warm_start=self.warm_start,
n_iter_no_change=100)
def soft_reset(self):
self.est_mlp = MLPRegressor(hidden_layer_sizes=self.hidden_layer_sizes,
solver=self.solver,
activation='relu',
max_iter=self.max_iter,
verbose=self.verbose,
tol=1e-7,
warm_start=self.warm_start,
n_iter_no_change=100)
def get_simple_formula(self, digits=None):
full_formula = self.get_formula_string()
return full_formula
# return DataUtils.simplify_formula(full_formula, digits=digits)
def real_repeat_train(self, x, y=None,
num_repeats=settings.num_train_repeat_processes,
test_x=None, test_y=None,
verbose=True):
# we still reduce train set size if only 1 repeat
train_set_size = int(len(x) * settings.quick_train_fraction + 0.1)
x = np.array(x)
if y is not None:
y = np.array(y)
sample = np.random.choice(range(x.shape[0]), size=train_set_size, replace=False)
train_x = x[sample][:]
if y is not None:
train_y = y[sample]
else:
train_y = None
out_sample = [aaa for aaa in range(x.shape[0]) if aaa not in sample]
valid_x = x[out_sample][:]
if y is not None:
valid_y = y[out_sample]
# valid_y = self.make_y_multi_safe(valid_y)
else:
valid_y = None
best_formula = ""
best_iter = 0
best_validation = 999999
best_err = 999999
old_time = time.time()
if verbose:
print("Beginning {} repeat sessions of {} iterations each.".format(num_repeats,
settings.num_train_steps_in_repeat_mode))
print()
start_time = time.time()
old_time = start_time
for train_iter in range(1, 1 + num_repeats):
if verbose:
print("Repeated train session {} of {}.".format(train_iter, num_repeats))
self.soft_reset()
self.train(train_x, train_y)
valid_err = self.test(valid_x, valid_y)
current_time = time.time()
if verbose:
# print(self.get_simple_formula())
print("Attained validation error: {:.5f}".format(valid_err))
if valid_err < best_validation:
best_validation = valid_err
best_formula = self.get_simple_formula()
best_iter = train_iter
if test_x is not None:
safe_test_y = make_y_multi_safe(test_y)
best_err = self.test(test_x, safe_test_y)
else:
best_err = valid_err
if verbose:
print(">>> New best model!")
print(best_formula)
if verbose:
iters_per_minute = 60.0 / (current_time - old_time)
print("Took {:.2f} minutes.".format((current_time - old_time) / 60))
print("Est. {:.2f} minutes remaining.".format((num_repeats - train_iter) / iters_per_minute))
print()
old_time = current_time
if verbose:
print("Total time for repeat process: {:.2f} minutes.".format((time.time() - start_time) / 60))
return best_formula, best_iter, best_err
# Does not repeat train. sorry.
def repeat_train(self, x, y=None,
num_repeats=settings.num_train_repeat_processes,
test_x=None, test_y=None,
verbose=True):
# we still reduce train set size if only 1 repeat
train_set_size = int(len(x) * settings.quick_train_fraction + 0.1)
x = np.array(x)
if y is not None:
y = np.array(y)
sample = np.random.choice(range(x.shape[0]), size=train_set_size, replace=False)
train_x = x[sample][:]
if y is not None:
train_y = y[sample]
else:
train_y = None
out_sample = [aaa for aaa in range(x.shape[0]) if aaa not in sample]
valid_x = x[out_sample][:]
if y is not None:
valid_y = y[out_sample]
# valid_y = self.make_y_multi_safe(valid_y)
else:
valid_y = None
if verbose:
start_time = time.time()
old_time = start_time
self.soft_reset()
self.train(train_x, train_y)
current_time = time.time()
best_formula = self.get_simple_formula()
if test_x is not None:
safe_test_y = make_y_multi_safe(test_y)
best_err = self.test(test_x, safe_test_y)
else:
best_err = self.test(valid_x, valid_y)
if verbose:
print(">>> New best model!")
print(best_formula)
if verbose:
print("Took {:.2f} minutes.".format((current_time - old_time) / 60))
print()
if verbose:
print("Total time for repeat process: {:.2f} minutes.".format((time.time() - start_time) / 60))
return best_formula, 0, best_err