1"""
2This module contains Multi Layer Perceptrons (MLP), also known as feedforward artifical
3neural networks, and deep neural networks (DNNs) for estimating the state transition function.
4"""
5import pickle
6import numpy as np
7from epyt_flow.topology import NetworkTopology
8from sklearn.neural_network import MLPRegressor
9from sklearn.preprocessing import StandardScaler
10import torch
11from torch import nn
12from torch.utils.data import TensorDataset, DataLoader
13
14from .surrogates import StateTransitionModel
15
16
[docs]
17class SimpleMlpStateTransitionModel(StateTransitionModel):
18 """
19 Multi-layer perceptron state transition model.
20 Implemented in `scikit-learn <https://scikit-learn.org/stable/index.html>`_.
21
22 Parameters
23 ----------
24 hidden_layers_size : `list[int]`, optional
25 Dimensionality of the hidden layers.
26
27 The default is [128].
28 activation : `str`, optional
29 Activation function for the hidden layers.
30
31 The default is 'tanh'
32 max_iter : `int`, optional
33 Maximum number of training itertions.
34
35 The default is 500.
36 """
37 def __init__(self, hidden_layer_sizes: list[int] = [128],
38 activation: str = "tanh", max_iter: int = 500, normalize: bool = True, **kwds):
39 self._wdn_topology = None
40 self._input_size = None
41 self._state_size = None
42 self._normalize = normalize
43
44 if self._normalize is True:
45 self._scaler = StandardScaler()
46 else:
47 self._scaler = None
48
49 self._mlp = MLPRegressor(hidden_layer_sizes=hidden_layer_sizes,
50 activation=activation, max_iter=max_iter)
51
52 super().__init__(**kwds)
53
[docs]
54 def init(self, wdn_topology: NetworkTopology, input_size: int, state_size: int) -> None:
55 self._wdn_topology = wdn_topology
56 self._input_size = input_size
57 self._state_size = state_size
58
[docs]
59 def fit(self, cur_state: np.ndarray, next_time_varying_quantity: np.ndarray,
60 next_state: np.ndarray) -> None:
61 X = np.concatenate((cur_state, next_time_varying_quantity), axis=1)
62
63 if self._normalize is True:
64 X = self._scaler.fit_transform(X)
65
66 self._mlp.fit(X, next_state)
67
[docs]
68 def partial_fit(self, cur_state: np.ndarray, next_time_varying_quantity: np.ndarray,
69 next_state: np.ndarray) -> None:
70 X = np.concatenate((cur_state, next_time_varying_quantity), axis=1)
71
72 if self._normalize is True:
73 self._scaler.partial_fit(X)
74 X = self._scaler.transform(X)
75
76 self._mlp.partial_fit(X, next_state)
77
[docs]
78 def predict(self, cur_state: np.ndarray,
79 next_time_varying_quantity: np.ndarray) -> np.ndarray:
80 X = np.concatenate((cur_state, next_time_varying_quantity), axis=1)
81
82 if self._normalize is True:
83 X = self._scaler.transform(X)
84
85 return self._mlp.predict(X)
86
87
[docs]
88class DnnStateTransitionModel(StateTransitionModel):
89 """
90 Neural network state transition model.
91 Implemented in `PyTorch <https://pytorch.org/>`_.
92
93 Parameters
94 ----------
95 hidden_layers_size : `list[int]`, optional
96 Dimensionality of the hidden layers.
97
98 The default is [128].
99 activation : `str`, optional
100 Activation function for the hidden layers.
101
102 The default is 'tanh'
103 last_layer_activation : `str`, optional
104 Activation function of the last layer.
105 If None, no acitvation function will be applied in the last layer.
106
107 The default is None.
108 max_iter : `int`, optional
109 Maximum number of training itertions.
110
111 The default is 200.
112 device : `str`, optional
113 Device used for the computation.
114
115 The default is 'cpu'
116 normalization_layer : `bool`, optional
117 If True, the first layer is a normalization layer.
118
119 The default is True.
120 normalize_input_output : `bool`, optional
121 If True, input is scaled and the target as well -- i.e. the scaled state is predicted.
122 Can not be used in conjunction with 'normalization_layer'.
123
124 The default is false.
125 dropout : `float`, optional
126 Specifies the dropout probability of in the input layer.
127 If 0, no dropout will be used.
128
129 The default is 0.
130 batch_size : `int`, optional
131 Batch size for training. Be aware that the batch size might have an influence
132 on the normalization layer.
133
134 The default is 128
135 """
136 def __init__(self, hidden_layers_size: list[int] = [128],
137 activation: str = "tanh", last_layer_activation: str = None,
138 max_iter: int = 200, device: str = "cpu", normalization_layer: bool = True,
139 normalize_input_output: bool = False,
140 dropout: float = 0., batch_size: int = 128,
141 **kwds):
142 self._hidden_layers_size = hidden_layers_size
143 self._activation = activation
144 self._last_layer_activation = last_layer_activation
145 self._max_iter = max_iter
146 self._device = device
147 self._normalization_layer = normalization_layer
148 self._normalize_input_output = normalize_input_output
149 self._dropout = dropout
150 self._batch_size = batch_size
151 self._model = None
152 self._wdn_topology = None
153 self._input_size = None
154 self._state_size = None
155 self._scaler = None
156
157 if normalization_layer is True and normalization_layer is True:
158 raise ValueError("'normalization_layer' and 'normalization_layer' " +
159 "can not be used at the same time.")
160
161 if normalize_input_output is True:
162 self._scaler = StandardScaler()
163
164 super().__init__(**kwds)
165
166 def _get_activation_func(self, activation_desc: str) -> nn.Module:
167 if activation_desc == "relu":
168 return nn.ReLU()
169 elif activation_desc == "tanh":
170 return nn.Tanh()
171 else:
172 return None
173
[docs]
174 def init(self, wdn_topology: NetworkTopology, input_size: int, state_size: int) -> None:
175 self._wdn_topology = wdn_topology
176 self._input_size = input_size
177 self._state_size = state_size
178
179 layers = []
180
181 if self._normalization_layer is True:
182 layers.append(nn.BatchNorm1d(self._input_size,))
183
184 if self._dropout > 0.:
185 layers.append(nn.Dropout(p=self._dropout))
186
187 layers.append(nn.Linear(self._input_size, self._hidden_layers_size[0]))
188 for i in range(1, len(self._hidden_layers_size)):
189 layers.append(self._get_activation_func(self._activation))
190 layers.append(nn.Linear(self._hidden_layers_size[i-1], self._hidden_layers_size[i]))
191
192 layers.append(self._get_activation_func(self._activation))
193 layers.append(nn.Linear(self._hidden_layers_size[-1], self._state_size))
194
195 if self._last_layer_activation is not None:
196 layers.append(self._get_activation_func(self._last_layer_activation))
197
198 self._model = nn.Sequential(*layers)
199
[docs]
200 def load_from_file(self, file_path: str) -> None:
201 """
202 Loads model's weights and the standard scaler from a given file.
203
204 Parameters
205 ----------
206 file_path : `str`
207 File path.
208 """
209 self._model = torch.load(file_path, weights_only=False)
210
211 if self._normalize_input_output is True:
212 with open(f"{file_path}.pickle", "rb") as f_in:
213 self._scaler = pickle.load(f_in)
214
[docs]
215 def save_to_file(self, file_path: str) -> None:
216 """
217 Saves model's weights and the standard scaler to a file.
218
219 Parameters
220 ----------
221 file_path : `str`
222 File path.
223 """
224 torch.save(self._model, file_path)
225
226 if self._normalize_input_output is True:
227 with open(f"{file_path}.pickle", "wb") as f_in:
228 pickle.dump(self._scaler, f_in)
229
230 def _forward(self, x: torch.Tensor) -> torch.Tensor:
231 return self._model(x)
232
[docs]
233 def compute_jacobian(self, cur_state: np.ndarray,
234 next_time_varying_quantity: np.ndarray) -> np.ndarray:
235 """
236 Computes the Jacobian w.r.t. a given state (incl. control signals).
237
238 Parameters
239 ----------
240 cur_state : numpy.ndarray <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`_
241 Current state of the system.
242 next_time_varying_quantity : numpy.ndarray <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`_
243 Time varying events (incl. control signals) that are relevant for evolving the state.
244
245 Returns
246 -------
247 numpy.ndarray <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`_
248 Jacobian.
249 """
250 X = np.concatenate((cur_state, next_time_varying_quantity), axis=1)
251
252 if self._normalize_input_output:
253 X = self._scaler.transform(X)
254
255 jac = torch.autograd.functional.jacobian(self._forward, torch.Tensor(X)).detach().cpu().numpy()
256
257 return jac
258
259 def _fit(self, X: np.ndarray, y: np.ndarray) -> None:
260 # Wrap data
261 X_train = torch.Tensor(X)
262 y_train = torch.Tensor(y)
263
264 train_data_set = TensorDataset(X_train, y_train)
265 train_data_loader = DataLoader(train_data_set, shuffle=True, batch_size=self._batch_size)
266
267 # Loss function and optimizer
268 loss_func = nn.MSELoss()
269 optimizer = torch.optim.Adam(self._model.parameters())
270
271 # Run training
272 self._model.train()
273 for _ in range(self._max_iter):
274 for batch, (X, y) in enumerate(train_data_loader):
275 X, y = X.to(self._device), y.to(self._device)
276
277 # Compute prediction error
278 pred = self._forward(X)
279 loss = loss_func(pred, y)
280
281 # Backpropagation
282 loss.backward()
283 optimizer.step()
284 optimizer.zero_grad()
285
286 if batch % 100 == 0:
287 loss, current = loss.item(), (batch + 1) * len(X)
288 print(f"loss: {loss:>7f}")
289
290 self._model.train(False)
291
[docs]
292 def fit(self, cur_state: np.ndarray, next_time_varying_quantity: np.ndarray,
293 next_state: np.ndarray) -> None:
294 X = np.concatenate((cur_state, next_time_varying_quantity), axis=1)
295
296 if self._normalize_input_output:
297 X = self._scaler.fit_transform(X)
298
299 dummy_next_flows = np.zeros((next_state.shape[0], X.shape[1] - next_state.shape[1]))
300 next_state_ = self._scaler.transform(np.concatenate((next_state, dummy_next_flows),
301 axis=1))
302 next_state = next_state_[:, :next_state.shape[1]]
303
304 self._fit(X, next_state)
305
[docs]
306 def partial_fit(self, cur_state: np.ndarray, next_time_varying_quantity: np.ndarray,
307 next_state: np.ndarray) -> None:
308 raise NotImplementedError()
309
[docs]
310 def predict(self, cur_state: np.ndarray,
311 next_time_varying_quantity: np.ndarray,
312 invert_output_scaling: bool = False) -> np.ndarray:
313 X = np.concatenate((cur_state, next_time_varying_quantity), axis=1)
314 if self._normalize_input_output:
315 X = self._scaler.transform(X)
316
317 Y_pred = self._forward(torch.Tensor(X)).detach().cpu().numpy()
318
319 if invert_output_scaling is True:
320 return self.invert_output_scaling(Y_pred)
321 else:
322 return Y_pred
323
[docs]
324 def invert_output_scaling(self, Y_pred: np.ndarray) -> np.ndarray:
325 """
326 Inverts the scaling of the output (i.e. predicted state).
327
328 Parameters
329 ----------
330 Y_pred : numpy.ndarray <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`_
331 Predicted state.
332
333 Returns
334 -------
335 numpy.ndarray <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`_
336 Unscaled predicted state.
337 """
338 if self._normalize_input_output is not True:
339 raise ValueError("Output is not scaled!")
340
341 dummy_control = np.ones((Y_pred.shape[0], self._scaler.n_features_in_ - Y_pred.shape[1]))
342 Y_pred_ = self._scaler.inverse_transform(np.concatenate((Y_pred, dummy_control), axis=1))
343 return Y_pred_[:, :Y_pred.shape[1]]