Spaces:
Runtime error
Runtime error
import time | |
import numpy as np | |
from sklearn.neural_network import MLPRegressor | |
from sklearn.utils.validation import column_or_1d | |
import Settings as settings | |
from DataUtils import make_y_multi_safe | |
hidden_layer_sizes = (5, 5) | |
max_iter_mlp = 500000 | |
class MLP_Model: | |
def __init__(self): | |
self.name = "MLP Model" | |
self.short_name = "MLP" | |
self.hidden_layer_sizes = hidden_layer_sizes | |
self.solver = 'adam' | |
self.max_iter = max_iter_mlp | |
self.warm_start = False | |
self.verbose = False | |
self.est_mlp = MLPRegressor(hidden_layer_sizes=self.hidden_layer_sizes, | |
solver=self.solver, | |
activation='relu', | |
max_iter=self.max_iter, | |
verbose=self.verbose, | |
tol=1e-7, | |
warm_start=self.warm_start, | |
n_iter_no_change=100) | |
def get_formula_string(self): | |
return "(neural black box)" | |
# current_inputs = ["X{}".format(i + 1) for i in range(settings.num_features)] | |
# # print(current_inputs) | |
# matrices = self.est_mlp.coefs_ | |
# vectors = self.est_mlp.intercepts_ | |
# for i in range(len(matrices)): | |
# current_outputs = [] | |
# | |
# for j in range(matrices[i].shape[1]): | |
# current_term = [vectors[i][j]] | |
# for k in range(matrices[i].shape[0]): | |
# sys.stdout.flush() | |
# current_term.append(["*", matrices[i][k][j], current_inputs[k]]) | |
# | |
# current_output = current_term[-1] | |
# for k in range(len(current_term), 1, -1): | |
# current_output = ["+", current_term[k - 2], current_output] | |
# current_outputs.append(current_output) | |
# current_inputs = [["max", 0, old_out] for old_out in current_outputs] | |
# | |
# # [-1] since we don't do relu activation on the last layer. | |
# return current_inputs[0][-1] | |
def get_formula(self): | |
return "(neural black box)" | |
# return self.get_formula_string() | |
def train(self, X, Y): | |
X = np.reshape(X, [X.shape[0], -1]) | |
Y = np.reshape(Y, [-1, 1]) | |
Y = column_or_1d(Y) | |
self.est_mlp.fit(X, Y) | |
return None | |
def predict(self, X): | |
return self.est_mlp.predict(X) | |
# Mean square error | |
def test(self, X, Y): | |
X = np.reshape(X, [X.shape[0], -1]) | |
y_hat = np.reshape(self.est_mlp.predict(X), [1, -1])[0] | |
y_gold = np.reshape(Y, [1, -1])[0] | |
our_sum = 0 | |
for i in range(len(y_gold)): | |
our_sum += (y_hat[i] - y_gold[i]) ** 2 | |
return our_sum / len(y_gold) | |
def reset(self): | |
self.est_mlp = MLPRegressor(hidden_layer_sizes=self.hidden_layer_sizes, | |
solver=self.solver, | |
activation='relu', | |
max_iter=self.max_iter, | |
verbose=self.verbose, | |
tol=1e-7, | |
warm_start=self.warm_start, | |
n_iter_no_change=100) | |
def soft_reset(self): | |
self.est_mlp = MLPRegressor(hidden_layer_sizes=self.hidden_layer_sizes, | |
solver=self.solver, | |
activation='relu', | |
max_iter=self.max_iter, | |
verbose=self.verbose, | |
tol=1e-7, | |
warm_start=self.warm_start, | |
n_iter_no_change=100) | |
def get_simple_formula(self, digits=None): | |
full_formula = self.get_formula_string() | |
return full_formula | |
# return DataUtils.simplify_formula(full_formula, digits=digits) | |
def real_repeat_train(self, x, y=None, | |
num_repeats=settings.num_train_repeat_processes, | |
test_x=None, test_y=None, | |
verbose=True): | |
# we still reduce train set size if only 1 repeat | |
train_set_size = int(len(x) * settings.quick_train_fraction + 0.1) | |
x = np.array(x) | |
if y is not None: | |
y = np.array(y) | |
sample = np.random.choice(range(x.shape[0]), size=train_set_size, replace=False) | |
train_x = x[sample][:] | |
if y is not None: | |
train_y = y[sample] | |
else: | |
train_y = None | |
out_sample = [aaa for aaa in range(x.shape[0]) if aaa not in sample] | |
valid_x = x[out_sample][:] | |
if y is not None: | |
valid_y = y[out_sample] | |
# valid_y = self.make_y_multi_safe(valid_y) | |
else: | |
valid_y = None | |
best_formula = "" | |
best_iter = 0 | |
best_validation = 999999 | |
best_err = 999999 | |
old_time = time.time() | |
if verbose: | |
print("Beginning {} repeat sessions of {} iterations each.".format(num_repeats, | |
settings.num_train_steps_in_repeat_mode)) | |
print() | |
start_time = time.time() | |
old_time = start_time | |
for train_iter in range(1, 1 + num_repeats): | |
if verbose: | |
print("Repeated train session {} of {}.".format(train_iter, num_repeats)) | |
self.soft_reset() | |
self.train(train_x, train_y) | |
valid_err = self.test(valid_x, valid_y) | |
current_time = time.time() | |
if verbose: | |
# print(self.get_simple_formula()) | |
print("Attained validation error: {:.5f}".format(valid_err)) | |
if valid_err < best_validation: | |
best_validation = valid_err | |
best_formula = self.get_simple_formula() | |
best_iter = train_iter | |
if test_x is not None: | |
safe_test_y = make_y_multi_safe(test_y) | |
best_err = self.test(test_x, safe_test_y) | |
else: | |
best_err = valid_err | |
if verbose: | |
print(">>> New best model!") | |
print(best_formula) | |
if verbose: | |
iters_per_minute = 60.0 / (current_time - old_time) | |
print("Took {:.2f} minutes.".format((current_time - old_time) / 60)) | |
print("Est. {:.2f} minutes remaining.".format((num_repeats - train_iter) / iters_per_minute)) | |
print() | |
old_time = current_time | |
if verbose: | |
print("Total time for repeat process: {:.2f} minutes.".format((time.time() - start_time) / 60)) | |
return best_formula, best_iter, best_err | |
# Does not repeat train. sorry. | |
def repeat_train(self, x, y=None, | |
num_repeats=settings.num_train_repeat_processes, | |
test_x=None, test_y=None, | |
verbose=True): | |
# we still reduce train set size if only 1 repeat | |
train_set_size = int(len(x) * settings.quick_train_fraction + 0.1) | |
x = np.array(x) | |
if y is not None: | |
y = np.array(y) | |
sample = np.random.choice(range(x.shape[0]), size=train_set_size, replace=False) | |
train_x = x[sample][:] | |
if y is not None: | |
train_y = y[sample] | |
else: | |
train_y = None | |
out_sample = [aaa for aaa in range(x.shape[0]) if aaa not in sample] | |
valid_x = x[out_sample][:] | |
if y is not None: | |
valid_y = y[out_sample] | |
# valid_y = self.make_y_multi_safe(valid_y) | |
else: | |
valid_y = None | |
if verbose: | |
start_time = time.time() | |
old_time = start_time | |
self.soft_reset() | |
self.train(train_x, train_y) | |
current_time = time.time() | |
best_formula = self.get_simple_formula() | |
if test_x is not None: | |
safe_test_y = make_y_multi_safe(test_y) | |
best_err = self.test(test_x, safe_test_y) | |
else: | |
best_err = self.test(valid_x, valid_y) | |
if verbose: | |
print(">>> New best model!") | |
print(best_formula) | |
if verbose: | |
print("Took {:.2f} minutes.".format((current_time - old_time) / 60)) | |
print() | |
if verbose: | |
print("Total time for repeat process: {:.2f} minutes.".format((time.time() - start_time) / 60)) | |
return best_formula, 0, best_err | |