Source code for epyt_control.envs.advanced_quality_control_env

  1"""
  2This module contains a base class for an EPANET-MSX control environment --
  3i.e. controlling the injection and reaction of one or multiple species in an
  4EPANET-MSX scenario (no control over pumps, valves, etc.).
  5"""
  6import os
  7import uuid
  8from typing import Optional, Any
  9import warnings
 10import numpy as np
 11from epyt_flow.simulation import ScenarioConfig, ScenarioSimulator, ScadaData
 12from epyt_flow.utils import get_temp_folder
 13from gymnasium.spaces import Dict
 14from gymnasium.spaces.utils import flatten_space
 15from gymnasium import Env
 16
 17from .actions.quality_actions import SpeciesInjectionAction
 18from .rl_env import RlEnv
 19
 20
[docs] 21class EpanetMsxControlEnv(RlEnv): 22 """ 23 Base class for advanced quality control scenarios -- i.e. EPANET-MSX control scenarios. 24 25 Parameters 26 ---------- 27 scenario_config : `epyt_flow.simulation.ScenarioConfig <https://epyt-flow.readthedocs.io/en/stable/epyt_flow.simulation.html#epyt_flow.simulation.scenario_config.ScenarioConfig>`_ 28 Configuration of the scenario. 29 action_space : list[:class:`~epyt_control.actions.quality_actions.SpeciesInjectionAction`] 30 The action spaces (i.e. list of species injections) that have to be controlled by the agent. 31 rerun_hydraulics_when_reset : `bool`, optional 32 If True, the hydraulic simulation is going to be re-run when the environment is reset, 33 otherwise the hydraulics from the initial run are re-used and the scenario will 34 also not be reloaded -- i.e. reload_scenario_when_reset=False. 35 36 The default is True. 37 hyd_file_in : `str`, optional 38 Path to an EPANET .hyd file containing the simulated hydraulics. 39 Can only be used in conjunction with 'hyd_scada_in'. 40 If set, hydraulics will not be simulated but taken from the specified file. 41 42 The default is None. 43 hyd_scada_in : `epyt_flow.simulation.ScadaData <https://epyt-flow.readthedocs.io/en/stable/epyt_flow.simulation.scada.html#epyt_flow.simulation.scada.scada_data.ScadaData>`_, optional 44 ScadaData instance containing the simulated hydraulics -- must match the hydraulics 45 from 'hyd_file_in'. Can only be used in conjunction with 'hyd_file_in'. 46 47 The default is None. 48 """ 49 def __init__(self, scenario_config: ScenarioConfig, 50 action_space: list[SpeciesInjectionAction], 51 rerun_hydraulics_when_reset: bool = True, 52 hyd_file_in: str = None, hyd_scada_in: ScadaData = None,**kwds): 53 if not isinstance(action_space, list): 54 raise TypeError("'action_space' must be an instance of " + 55 "`list[SpeciesInjectionActionSpace]` " + 56 f"but not of '{type(action_space)}'") 57 if any(not isinstance(action_desc, SpeciesInjectionAction) 58 for action_desc in action_space): 59 raise TypeError("All items in 'action_space' must be an instance of " + 60 "'epyt_control.actions.quality_actions.SpeciesInjectionAction'") 61 if len(action_space) == 0: 62 raise ValueError("Empty action space") 63 if not isinstance(rerun_hydraulics_when_reset, bool): 64 raise TypeError("'rerun_hydraulics_when_reset' must be an instance of 'bool' " + 65 f"but not of '{type(rerun_hydraulics_when_reset)}'") 66 if "reload_scenario_when_reset" in kwds: 67 if kwds["reload_scenario_when_reset"] is True and rerun_hydraulics_when_reset is False: 68 raise ValueError("'rerun_hydraulics_when_reset' must be True " + 69 "if 'reload_scenario_when_reset=True'") 70 else: 71 if rerun_hydraulics_when_reset is False: 72 kwds["reload_scenario_when_reset"] = False 73 74 if hyd_scada_in is not None and hyd_file_in is not None: 75 if rerun_hydraulics_when_reset is True: 76 raise ValueError("'rerun_hydraulics_when_reset' must be False " + 77 "if pre-computed hydraulics are provided") 78 79 self._rerun_hydraulics_when_reset = rerun_hydraulics_when_reset 80 self._hyd_export = os.path.join(get_temp_folder(), 81 f"epytcontrol_env_MSX_{uuid.uuid4()}.hyd") 82 gym_action_space = flatten_space(Dict({f"{action_space.species_id}-{action_space.node_id}": 83 action_space.to_gym_action_space() 84 for action_space in action_space})) 85 86 super().__init__(scenario_config=scenario_config, gym_action_space=gym_action_space, 87 action_space=action_space, hyd_scada_in=hyd_scada_in, 88 hyd_file_in=hyd_file_in, **kwds) 89
[docs] 90 def reset(self, seed: Optional[int] = None, options: Optional[dict[str, Any]] = None 91 ) -> tuple[np.ndarray, dict]: 92 Env.reset(self, seed=seed) 93 94 if self._rerun_hydraulics_when_reset is True: 95 scada_data = super().reset() 96 else: 97 if self._scenario_sim is None or self._reload_scenario_when_reset: 98 self._scenario_sim = ScenarioSimulator( 99 scenario_config=self._scenario_config) 100 101 # Run hydraulic simulation first if necessary 102 if self._hyd_file_in is not None: 103 self._hyd_export = self._hyd_file_in 104 self._hydraulic_scada_data = self._hyd_scada_in 105 else: 106 sim = self._scenario_sim.run_hydraulic_simulation 107 self._hydraulic_scada_data = sim(hyd_export=self._hyd_export, 108 reapply_uncertainties=self.reapply_uncertainties_at_reset) 109 else: 110 # Abort current simulation if any is runing 111 try: 112 next(self._sim_generator) 113 self._sim_generator.send(True) 114 except StopIteration: 115 pass 116 117 # Run advanced quality analysis (EPANET-MSX) on top of the computed hydraulics 118 gen = self._scenario_sim.run_advanced_quality_simulation_as_generator 119 self._sim_generator = gen(self._hyd_export, support_abort=True, 120 reapply_uncertainties=self.reapply_uncertainties_at_reset) 121 122 scada_data = self._next_sim_itr() 123 124 r = scada_data 125 if isinstance(r, tuple): 126 r, _ = r 127 128 cur_time = int(r.sensor_readings_time[0]) 129 hyd_scada = self._hydraulic_scada_data.extract_time_window(start_time=cur_time, 130 end_time=cur_time) 131 r.join(hyd_scada) 132 133 r = self._get_observation(r) 134 135 return r, {"scada_data": scada_data}
136 137 138AdvancedQualityControlEnv = EpanetMsxControlEnv 139 140
[docs] 141class MultiConfigEpanetMsxControlEnv(EpanetMsxControlEnv): 142 """ 143 Base class for advanced quality control scenarios (i.e. EPANET-MSX control scenarios) that can 144 handle multiple scenario configurations -- those scenarios are utilized in a round-robin 145 scheduling scheme (i.e. autorest=True). 146 147 Note that all scenarios must share the same action and observation space. 148 149 Parameters 150 ---------- 151 scenario_configs : list[`epyt_flow.simulation.ScenarioConfig <https://epyt-flow.readthedocs.io/en/stable/epyt_flow.simulation.html#epyt_flow.simulation.scenario_config.ScenarioConfig>`_] 152 Configuration of the scenario. Note that all sceanrios must share the same action and 153 observation space. 154 action_space : list[:class:`~epyt_control.actions.quality_actions.SpeciesInjectionAction`] 155 The action spaces (i.e. list of species injections) that have to be controlled by the agent. 156 Must be the same for all scenarios specified in 'scenario_configs'. 157 rerun_hydraulics_when_reset : `bool`, optional 158 If True, the hydraulic simulation is going to be re-run when the environment is reset, 159 otherwise the hydraulics from the initial run are re-used and the scenario will 160 also not be reloaded -- i.e. reload_scenario_when_reset=False. 161 precomputed_hydraulics : list[tuple[str, `epyt_flow.simulation.ScadaData <https://epyt-flow.readthedocs.io/en/stable/epyt_flow.simulation.scada.html#epyt_flow.simulation.scada.scada_data.ScadaData>`_]], optional 162 Pre-computed hydraulics -- i.e., for each scenario in 'scenario_configs', a tuple of a 163 path to an EPANET generatd .hyd file and a ScadaData instance -- that are used instead 164 of re-running the hydraulic simulation. 165 If used, 'rerun_hydraulics_when_reset' must be False. 166 167 The default is None. 168 """ 169 def __init__(self, scenario_configs: list[ScenarioConfig], 170 action_space: list[SpeciesInjectionAction], 171 rerun_hydraulics_when_reset: bool = True, 172 precomputed_hydraulics: list[tuple[str, ScadaData]] = None, **kwds): 173 if not isinstance(scenario_configs, list): 174 raise TypeError("'scenario_configs' must be an instance of " + 175 "epyt_flow.simulation.ScenarioConfig but " + 176 f"not of '{type(scenario_configs)}'") 177 if any(not isinstance(scenario_config, ScenarioConfig) 178 for scenario_config in scenario_configs): 179 raise TypeError("All items in 'scenario_config' must be instances of " + 180 "epyt_flow.simulation.ScenarioConfig") 181 182 if len(scenario_configs) > 10: 183 warnings.warn("You are using many scenarios. You might face issues w.r.t. " + 184 "memory consumption as well as with the maximum number of open files " + 185 "allowed by the operating system.", UserWarning) 186 187 self._scenario_configs = scenario_configs 188 self._scenario_sims = [None] * len(scenario_configs) 189 self._hyd_exports = [os.path.join(get_temp_folder(), 190 f"epytcontrol_env_MSX_{uuid.uuid4()}.hyd") 191 for _ in range(len(scenario_configs))] 192 self._hydraulic_scada_datas = [None] * len(scenario_configs) 193 self._current_scenario_idx = 0 194 self._use_precomputed_hydraulics = False 195 196 if precomputed_hydraulics is not None: 197 def __raise_type_error(): 198 raise TypeError("'precomputed_hydraulics' must be an instance of " + 199 "'list[tuple[str, epyt_flow.simulation.ScadaData]]'") 200 201 if not isinstance(precomputed_hydraulics, list): 202 __raise_type_error() 203 if any(not isinstance(hyd, tuple) for hyd in precomputed_hydraulics): 204 __raise_type_error() 205 if any(not isinstance(hyd[0], str) or not isinstance(hyd[1], ScadaData) 206 for hyd in precomputed_hydraulics): 207 __raise_type_error() 208 if len(precomputed_hydraulics) != len(scenario_configs): 209 raise ValueError("Length of 'precomputed_hydraulics' must be equal to the " + 210 "number of scenarios in 'scenario_configs'") 211 if rerun_hydraulics_when_reset is True: 212 raise ValueError("'rerun_hydraulics_when_reset' msut be False if " + 213 "pre-computed hydraulics are used") 214 215 self._hyd_exports = [] 216 self._hydraulic_scada_datas = [] 217 for hyd_file_in, scada_hyd_in in precomputed_hydraulics: 218 self._hyd_exports.append(hyd_file_in) 219 self._hydraulic_scada_datas.append(scada_hyd_in) 220 221 self._use_precomputed_hydraulics = True 222 223 super().__init__(self._scenario_configs[self._current_scenario_idx], 224 action_space, rerun_hydraulics_when_reset, 225 autoreset=True, **kwds) 226 self._hyd_export = self._hyd_exports[self._current_scenario_idx] 227 self._hydraulic_scada_data = self._hydraulic_scada_datas[self._current_scenario_idx] 228
[docs] 229 def reset(self, seed: Optional[int] = None, options: Optional[dict[str, Any]] = None 230 ) -> tuple[np.ndarray, dict]: 231 # Back up current simulation 232 self._scenario_sims[self._current_scenario_idx] = self._scenario_sim 233 self._hydraulic_scada_datas[self._current_scenario_idx] = self._hydraulic_scada_data 234 235 # Move on to next scenario 236 self._current_scenario_idx = (self._current_scenario_idx + 1) % len(self._scenario_configs) 237 self._scenario_config = self._scenario_configs[self._current_scenario_idx] 238 self._scenario_sim = self._scenario_sims[self._current_scenario_idx] 239 self._hyd_export = self._hyd_exports[self._current_scenario_idx] 240 self._hydraulic_scada_data = self._hydraulic_scada_datas[self._current_scenario_idx] 241 242 return super().reset(seed, options)
243 244 245MultiConfigAdvancedQualityControlEnv = MultiConfigEpanetMsxControlEnv