-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnn.py
104 lines (74 loc) · 3.42 KB
/
nn.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import tensorflow as tf
import tensorflow.keras as keras
import numpy as np
import numpy.random as rng
def zscore(X):
mu = np.mean(X, axis=0, keepdims=True)
std = np.std(X, axis=0, ddof=1, keepdims=True)
return zscore_mu_std(X, mu, std)
def zscore_mu_std(X, mu, std):
Z = (X - mu) / (1e-6+std)
return Z, mu, std
class FeedforwardNN:
def __init__(self, cfg):
self._cfg = cfg
def train(self, Xtrain, Ytrain, Xvalid=None, Yvalid=None):
self._cfg['n_input'] = Xtrain.shape[1]
self._make_model()
# normalize inputs and outputs for better convergence
Xtrain, self._Xtrain_mu, self._Xtrain_std = zscore(Xtrain)
if Xvalid is not None:
Xvalid,_,_ = zscore_mu_std(Xvalid, self._Xtrain_mu, self._Xtrain_std)
Ytrain, self._Ytrain_mu, self._Ytrain_std = zscore(Ytrain)
if Xvalid is not None:
Yvalid,_,_ = zscore_mu_std(Yvalid, self._Ytrain_mu, self._Ytrain_std)
if self._cfg.get('scramble', False):
Xtrain = Xtrain[rng.permutation(Xtrain.shape[0]), :]
callbacks = []
if Xvalid is not None:
callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=self._cfg['patience'], restore_best_weights=True)
callbacks.append(callback)
self._model.fit(Xtrain,
Ytrain,
batch_size=int(Xtrain.shape[0] * self._cfg['batch_size_p']),
validation_data=(Xvalid, Yvalid) if Xvalid is not None else (),
epochs=self._cfg['n_epochs'],
verbose=self._cfg['verbose'],
callbacks=callbacks)
def _make_model(self):
cfg = self._cfg
keras.backend.clear_session()
input_layer = keras.layers.Input(shape=(cfg['n_input'],))
layer = input_layer
if cfg['n_hidden'] > 0:
layer = keras.layers.Dense(cfg['n_hidden'], activation=cfg['hidden_activation'])(layer)
layer = self._make_dropout(layer)
output_activation = 'linear'
loss = keras.losses.MeanAbsoluteError()
output_layer = keras.layers.Dense(cfg['n_output'], activation=output_activation)(layer)
model = keras.Model(input_layer, output_layer)
model.compile(
loss=loss,
optimizer=keras.optimizers.Nadam()
)
self._model = model
def _make_dropout(self, input_layer):
if self._cfg['stochastic']:
return keras.layers.Dropout(self._cfg['dropout_p'])(input_layer, training=True)
else:
return keras.layers.Dropout(self._cfg['dropout_p'])(input_layer)
def predict(self, X, n_samples=1):
X,_,_ = zscore_mu_std(X, self._Xtrain_mu, self._Xtrain_std)
if not self._cfg['stochastic']:
return self._predict(X)
samples = []
for i in range(n_samples):
samples.append(self._predict(X))
samples = np.array(samples)
return samples
def _predict(self, X):
return self._model.predict(X, batch_size=X.shape[0]) * self._Ytrain_std + self._Ytrain_mu
def evaluate(self, X, Y):
X,_,_ = zscore_mu_std(X, self._Xtrain_mu, self._Xtrain_std)
Y = (Y - self._Ytrain_mu) / self._Ytrain_std
return self._model.evaluate(X, Y)