Source code for epyt_control.signal_processing.state_forecasting.mlp_surrogate

  1"""
  2This module contains Multi Layer Perceptrons (MLP), also known as feedforward artifical
  3neural networks, and deep neural networks (DNNs) for estimating the state transition function.
  4"""
  5import pickle
  6import numpy as np
  7from epyt_flow.topology import NetworkTopology
  8from sklearn.neural_network import MLPRegressor
  9from sklearn.preprocessing import StandardScaler
 10import torch
 11from torch import nn
 12from torch.utils.data import TensorDataset, DataLoader
 13
 14from .surrogates import StateTransitionModel
 15
 16
[docs] 17class SimpleMlpStateTransitionModel(StateTransitionModel): 18 """ 19 Multi-layer perceptron state transition model. 20 Implemented in `scikit-learn <https://scikit-learn.org/stable/index.html>`_. 21 22 Parameters 23 ---------- 24 hidden_layers_size : `list[int]`, optional 25 Dimensionality of the hidden layers. 26 27 The default is [128]. 28 activation : `str`, optional 29 Activation function for the hidden layers. 30 31 The default is 'tanh' 32 max_iter : `int`, optional 33 Maximum number of training itertions. 34 35 The default is 500. 36 """ 37 def __init__(self, hidden_layer_sizes: list[int] = [128], 38 activation: str = "tanh", max_iter: int = 500, normalize: bool = True, **kwds): 39 self._wdn_topology = None 40 self._input_size = None 41 self._state_size = None 42 self._normalize = normalize 43 44 if self._normalize is True: 45 self._scaler = StandardScaler() 46 else: 47 self._scaler = None 48 49 self._mlp = MLPRegressor(hidden_layer_sizes=hidden_layer_sizes, 50 activation=activation, max_iter=max_iter) 51 52 super().__init__(**kwds) 53
[docs] 54 def init(self, wdn_topology: NetworkTopology, input_size: int, state_size: int) -> None: 55 self._wdn_topology = wdn_topology 56 self._input_size = input_size 57 self._state_size = state_size
58
[docs] 59 def fit(self, cur_state: np.ndarray, next_time_varying_quantity: np.ndarray, 60 next_state: np.ndarray) -> None: 61 X = np.concatenate((cur_state, next_time_varying_quantity), axis=1) 62 63 if self._normalize is True: 64 X = self._scaler.fit_transform(X) 65 66 self._mlp.fit(X, next_state)
67
[docs] 68 def partial_fit(self, cur_state: np.ndarray, next_time_varying_quantity: np.ndarray, 69 next_state: np.ndarray) -> None: 70 X = np.concatenate((cur_state, next_time_varying_quantity), axis=1) 71 72 if self._normalize is True: 73 self._scaler.partial_fit(X) 74 X = self._scaler.transform(X) 75 76 self._mlp.partial_fit(X, next_state)
77
[docs] 78 def predict(self, cur_state: np.ndarray, 79 next_time_varying_quantity: np.ndarray) -> np.ndarray: 80 X = np.concatenate((cur_state, next_time_varying_quantity), axis=1) 81 82 if self._normalize is True: 83 X = self._scaler.transform(X) 84 85 return self._mlp.predict(X)
86 87
[docs] 88class DnnStateTransitionModel(StateTransitionModel): 89 """ 90 Neural network state transition model. 91 Implemented in `PyTorch <https://pytorch.org/>`_. 92 93 Parameters 94 ---------- 95 hidden_layers_size : `list[int]`, optional 96 Dimensionality of the hidden layers. 97 98 The default is [128]. 99 activation : `str`, optional 100 Activation function for the hidden layers. 101 102 The default is 'tanh' 103 last_layer_activation : `str`, optional 104 Activation function of the last layer. 105 If None, no acitvation function will be applied in the last layer. 106 107 The default is None. 108 max_iter : `int`, optional 109 Maximum number of training itertions. 110 111 The default is 200. 112 device : `str`, optional 113 Device used for the computation. 114 115 The default is 'cpu' 116 normalization_layer : `bool`, optional 117 If True, the first layer is a normalization layer. 118 119 The default is True. 120 normalize_input_output : `bool`, optional 121 If True, input is scaled and the target as well -- i.e. the scaled state is predicted. 122 Can not be used in conjunction with 'normalization_layer'. 123 124 The default is false. 125 dropout : `float`, optional 126 Specifies the dropout probability of in the input layer. 127 If 0, no dropout will be used. 128 129 The default is 0. 130 batch_size : `int`, optional 131 Batch size for training. Be aware that the batch size might have an influence 132 on the normalization layer. 133 134 The default is 128 135 """ 136 def __init__(self, hidden_layers_size: list[int] = [128], 137 activation: str = "tanh", last_layer_activation: str = None, 138 max_iter: int = 200, device: str = "cpu", normalization_layer: bool = True, 139 normalize_input_output: bool = False, 140 dropout: float = 0., batch_size: int = 128, 141 **kwds): 142 self._hidden_layers_size = hidden_layers_size 143 self._activation = activation 144 self._last_layer_activation = last_layer_activation 145 self._max_iter = max_iter 146 self._device = device 147 self._normalization_layer = normalization_layer 148 self._normalize_input_output = normalize_input_output 149 self._dropout = dropout 150 self._batch_size = batch_size 151 self._model = None 152 self._wdn_topology = None 153 self._input_size = None 154 self._state_size = None 155 self._scaler = None 156 157 if normalization_layer is True and normalization_layer is True: 158 raise ValueError("'normalization_layer' and 'normalization_layer' " + 159 "can not be used at the same time.") 160 161 if normalize_input_output is True: 162 self._scaler = StandardScaler() 163 164 super().__init__(**kwds) 165 166 def _get_activation_func(self, activation_desc: str) -> nn.Module: 167 if activation_desc == "relu": 168 return nn.ReLU() 169 elif activation_desc == "tanh": 170 return nn.Tanh() 171 else: 172 return None 173
[docs] 174 def init(self, wdn_topology: NetworkTopology, input_size: int, state_size: int) -> None: 175 self._wdn_topology = wdn_topology 176 self._input_size = input_size 177 self._state_size = state_size 178 179 layers = [] 180 181 if self._normalization_layer is True: 182 layers.append(nn.BatchNorm1d(self._input_size,)) 183 184 if self._dropout > 0.: 185 layers.append(nn.Dropout(p=self._dropout)) 186 187 layers.append(nn.Linear(self._input_size, self._hidden_layers_size[0])) 188 for i in range(1, len(self._hidden_layers_size)): 189 layers.append(self._get_activation_func(self._activation)) 190 layers.append(nn.Linear(self._hidden_layers_size[i-1], self._hidden_layers_size[i])) 191 192 layers.append(self._get_activation_func(self._activation)) 193 layers.append(nn.Linear(self._hidden_layers_size[-1], self._state_size)) 194 195 if self._last_layer_activation is not None: 196 layers.append(self._get_activation_func(self._last_layer_activation)) 197 198 self._model = nn.Sequential(*layers)
199
[docs] 200 def load_from_file(self, file_path: str) -> None: 201 """ 202 Loads model's weights and the standard scaler from a given file. 203 204 Parameters 205 ---------- 206 file_path : `str` 207 File path. 208 """ 209 self._model = torch.load(file_path, weights_only=False) 210 211 if self._normalize_input_output is True: 212 with open(f"{file_path}.pickle", "rb") as f_in: 213 self._scaler = pickle.load(f_in)
214
[docs] 215 def save_to_file(self, file_path: str) -> None: 216 """ 217 Saves model's weights and the standard scaler to a file. 218 219 Parameters 220 ---------- 221 file_path : `str` 222 File path. 223 """ 224 torch.save(self._model, file_path) 225 226 if self._normalize_input_output is True: 227 with open(f"{file_path}.pickle", "wb") as f_in: 228 pickle.dump(self._scaler, f_in)
229 230 def _forward(self, x: torch.Tensor) -> torch.Tensor: 231 return self._model(x) 232
[docs] 233 def compute_jacobian(self, cur_state: np.ndarray, 234 next_time_varying_quantity: np.ndarray) -> np.ndarray: 235 """ 236 Computes the Jacobian w.r.t. a given state (incl. control signals). 237 238 Parameters 239 ---------- 240 cur_state : numpy.ndarray <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`_ 241 Current state of the system. 242 next_time_varying_quantity : numpy.ndarray <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`_ 243 Time varying events (incl. control signals) that are relevant for evolving the state. 244 245 Returns 246 ------- 247 numpy.ndarray <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`_ 248 Jacobian. 249 """ 250 X = np.concatenate((cur_state, next_time_varying_quantity), axis=1) 251 252 if self._normalize_input_output: 253 X = self._scaler.transform(X) 254 255 jac = torch.autograd.functional.jacobian(self._forward, torch.Tensor(X)).detach().cpu().numpy() 256 257 return jac
258 259 def _fit(self, X: np.ndarray, y: np.ndarray) -> None: 260 # Wrap data 261 X_train = torch.Tensor(X) 262 y_train = torch.Tensor(y) 263 264 train_data_set = TensorDataset(X_train, y_train) 265 train_data_loader = DataLoader(train_data_set, shuffle=True, batch_size=self._batch_size) 266 267 # Loss function and optimizer 268 loss_func = nn.MSELoss() 269 optimizer = torch.optim.Adam(self._model.parameters()) 270 271 # Run training 272 self._model.train() 273 for _ in range(self._max_iter): 274 for batch, (X, y) in enumerate(train_data_loader): 275 X, y = X.to(self._device), y.to(self._device) 276 277 # Compute prediction error 278 pred = self._forward(X) 279 loss = loss_func(pred, y) 280 281 # Backpropagation 282 loss.backward() 283 optimizer.step() 284 optimizer.zero_grad() 285 286 if batch % 100 == 0: 287 loss, current = loss.item(), (batch + 1) * len(X) 288 print(f"loss: {loss:>7f}") 289 290 self._model.train(False) 291
[docs] 292 def fit(self, cur_state: np.ndarray, next_time_varying_quantity: np.ndarray, 293 next_state: np.ndarray) -> None: 294 X = np.concatenate((cur_state, next_time_varying_quantity), axis=1) 295 296 if self._normalize_input_output: 297 X = self._scaler.fit_transform(X) 298 299 dummy_next_flows = np.zeros((next_state.shape[0], X.shape[1] - next_state.shape[1])) 300 next_state_ = self._scaler.transform(np.concatenate((next_state, dummy_next_flows), 301 axis=1)) 302 next_state = next_state_[:, :next_state.shape[1]] 303 304 self._fit(X, next_state)
305
[docs] 306 def partial_fit(self, cur_state: np.ndarray, next_time_varying_quantity: np.ndarray, 307 next_state: np.ndarray) -> None: 308 raise NotImplementedError()
309
[docs] 310 def predict(self, cur_state: np.ndarray, 311 next_time_varying_quantity: np.ndarray, 312 invert_output_scaling: bool = False) -> np.ndarray: 313 X = np.concatenate((cur_state, next_time_varying_quantity), axis=1) 314 if self._normalize_input_output: 315 X = self._scaler.transform(X) 316 317 Y_pred = self._forward(torch.Tensor(X)).detach().cpu().numpy() 318 319 if invert_output_scaling is True: 320 return self.invert_output_scaling(Y_pred) 321 else: 322 return Y_pred
323
[docs] 324 def invert_output_scaling(self, Y_pred: np.ndarray) -> np.ndarray: 325 """ 326 Inverts the scaling of the output (i.e. predicted state). 327 328 Parameters 329 ---------- 330 Y_pred : numpy.ndarray <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`_ 331 Predicted state. 332 333 Returns 334 ------- 335 numpy.ndarray <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`_ 336 Unscaled predicted state. 337 """ 338 if self._normalize_input_output is not True: 339 raise ValueError("Output is not scaled!") 340 341 dummy_control = np.ones((Y_pred.shape[0], self._scaler.n_features_in_ - Y_pred.shape[1])) 342 Y_pred_ = self._scaler.inverse_transform(np.concatenate((Y_pred, dummy_control), axis=1)) 343 return Y_pred_[:, :Y_pred.shape[1]]