1"""
2This module contains a base class for an EPANET-MSX control environment --
3i.e. controlling the injection and reaction of one or multiple species in an
4EPANET-MSX scenario (no control over pumps, valves, etc.).
5"""
6import os
7import uuid
8from typing import Optional, Any
9import warnings
10import numpy as np
11from epyt_flow.simulation import ScenarioConfig, ScenarioSimulator, ScadaData
12from epyt_flow.utils import get_temp_folder
13from gymnasium.spaces import Dict
14from gymnasium.spaces.utils import flatten_space
15from gymnasium import Env
16
17from .actions.quality_actions import SpeciesInjectionAction
18from .rl_env import RlEnv
19
20
[docs]
21class EpanetMsxControlEnv(RlEnv):
22 """
23 Base class for advanced quality control scenarios -- i.e. EPANET-MSX control scenarios.
24
25 Parameters
26 ----------
27 scenario_config : `epyt_flow.simulation.ScenarioConfig <https://epyt-flow.readthedocs.io/en/stable/epyt_flow.simulation.html#epyt_flow.simulation.scenario_config.ScenarioConfig>`_
28 Configuration of the scenario.
29 action_space : list[:class:`~epyt_control.actions.quality_actions.SpeciesInjectionAction`]
30 The action spaces (i.e. list of species injections) that have to be controlled by the agent.
31 rerun_hydraulics_when_reset : `bool`, optional
32 If True, the hydraulic simulation is going to be re-run when the environment is reset,
33 otherwise the hydraulics from the initial run are re-used and the scenario will
34 also not be reloaded -- i.e. reload_scenario_when_reset=False.
35
36 The default is True.
37 hyd_file_in : `str`, optional
38 Path to an EPANET .hyd file containing the simulated hydraulics.
39 Can only be used in conjunction with 'hyd_scada_in'.
40 If set, hydraulics will not be simulated but taken from the specified file.
41
42 The default is None.
43 hyd_scada_in : `epyt_flow.simulation.ScadaData <https://epyt-flow.readthedocs.io/en/stable/epyt_flow.simulation.scada.html#epyt_flow.simulation.scada.scada_data.ScadaData>`_, optional
44 ScadaData instance containing the simulated hydraulics -- must match the hydraulics
45 from 'hyd_file_in'. Can only be used in conjunction with 'hyd_file_in'.
46
47 The default is None.
48 """
49 def __init__(self, scenario_config: ScenarioConfig,
50 action_space: list[SpeciesInjectionAction],
51 rerun_hydraulics_when_reset: bool = True,
52 hyd_file_in: str = None, hyd_scada_in: ScadaData = None,**kwds):
53 if not isinstance(action_space, list):
54 raise TypeError("'action_space' must be an instance of " +
55 "`list[SpeciesInjectionActionSpace]` " +
56 f"but not of '{type(action_space)}'")
57 if any(not isinstance(action_desc, SpeciesInjectionAction)
58 for action_desc in action_space):
59 raise TypeError("All items in 'action_space' must be an instance of " +
60 "'epyt_control.actions.quality_actions.SpeciesInjectionAction'")
61 if len(action_space) == 0:
62 raise ValueError("Empty action space")
63 if not isinstance(rerun_hydraulics_when_reset, bool):
64 raise TypeError("'rerun_hydraulics_when_reset' must be an instance of 'bool' " +
65 f"but not of '{type(rerun_hydraulics_when_reset)}'")
66 if "reload_scenario_when_reset" in kwds:
67 if kwds["reload_scenario_when_reset"] is True and rerun_hydraulics_when_reset is False:
68 raise ValueError("'rerun_hydraulics_when_reset' must be True " +
69 "if 'reload_scenario_when_reset=True'")
70 else:
71 if rerun_hydraulics_when_reset is False:
72 kwds["reload_scenario_when_reset"] = False
73
74 if hyd_scada_in is not None and hyd_file_in is not None:
75 if rerun_hydraulics_when_reset is True:
76 raise ValueError("'rerun_hydraulics_when_reset' must be False " +
77 "if pre-computed hydraulics are provided")
78
79 self._rerun_hydraulics_when_reset = rerun_hydraulics_when_reset
80 self._hyd_export = os.path.join(get_temp_folder(),
81 f"epytcontrol_env_MSX_{uuid.uuid4()}.hyd")
82 gym_action_space = flatten_space(Dict({f"{action_space.species_id}-{action_space.node_id}":
83 action_space.to_gym_action_space()
84 for action_space in action_space}))
85
86 super().__init__(scenario_config=scenario_config, gym_action_space=gym_action_space,
87 action_space=action_space, hyd_scada_in=hyd_scada_in,
88 hyd_file_in=hyd_file_in, **kwds)
89
[docs]
90 def reset(self, seed: Optional[int] = None, options: Optional[dict[str, Any]] = None
91 ) -> tuple[np.ndarray, dict]:
92 Env.reset(self, seed=seed)
93
94 if self._rerun_hydraulics_when_reset is True:
95 scada_data = super().reset()
96 else:
97 if self._scenario_sim is None or self._reload_scenario_when_reset:
98 self._scenario_sim = ScenarioSimulator(
99 scenario_config=self._scenario_config)
100
101 # Run hydraulic simulation first if necessary
102 if self._hyd_file_in is not None:
103 self._hyd_export = self._hyd_file_in
104 self._hydraulic_scada_data = self._hyd_scada_in
105 else:
106 sim = self._scenario_sim.run_hydraulic_simulation
107 self._hydraulic_scada_data = sim(hyd_export=self._hyd_export,
108 reapply_uncertainties=self.reapply_uncertainties_at_reset)
109 else:
110 # Abort current simulation if any is runing
111 try:
112 next(self._sim_generator)
113 self._sim_generator.send(True)
114 except StopIteration:
115 pass
116
117 # Run advanced quality analysis (EPANET-MSX) on top of the computed hydraulics
118 gen = self._scenario_sim.run_advanced_quality_simulation_as_generator
119 self._sim_generator = gen(self._hyd_export, support_abort=True,
120 reapply_uncertainties=self.reapply_uncertainties_at_reset)
121
122 scada_data = self._next_sim_itr()
123
124 r = scada_data
125 if isinstance(r, tuple):
126 r, _ = r
127
128 cur_time = int(r.sensor_readings_time[0])
129 hyd_scada = self._hydraulic_scada_data.extract_time_window(start_time=cur_time,
130 end_time=cur_time)
131 r.join(hyd_scada)
132
133 r = self._get_observation(r)
134
135 return r, {"scada_data": scada_data}
136
137
138AdvancedQualityControlEnv = EpanetMsxControlEnv
139
140
[docs]
141class MultiConfigEpanetMsxControlEnv(EpanetMsxControlEnv):
142 """
143 Base class for advanced quality control scenarios (i.e. EPANET-MSX control scenarios) that can
144 handle multiple scenario configurations -- those scenarios are utilized in a round-robin
145 scheduling scheme (i.e. autorest=True).
146
147 Note that all scenarios must share the same action and observation space.
148
149 Parameters
150 ----------
151 scenario_configs : list[`epyt_flow.simulation.ScenarioConfig <https://epyt-flow.readthedocs.io/en/stable/epyt_flow.simulation.html#epyt_flow.simulation.scenario_config.ScenarioConfig>`_]
152 Configuration of the scenario. Note that all sceanrios must share the same action and
153 observation space.
154 action_space : list[:class:`~epyt_control.actions.quality_actions.SpeciesInjectionAction`]
155 The action spaces (i.e. list of species injections) that have to be controlled by the agent.
156 Must be the same for all scenarios specified in 'scenario_configs'.
157 rerun_hydraulics_when_reset : `bool`, optional
158 If True, the hydraulic simulation is going to be re-run when the environment is reset,
159 otherwise the hydraulics from the initial run are re-used and the scenario will
160 also not be reloaded -- i.e. reload_scenario_when_reset=False.
161 precomputed_hydraulics : list[tuple[str, `epyt_flow.simulation.ScadaData <https://epyt-flow.readthedocs.io/en/stable/epyt_flow.simulation.scada.html#epyt_flow.simulation.scada.scada_data.ScadaData>`_]], optional
162 Pre-computed hydraulics -- i.e., for each scenario in 'scenario_configs', a tuple of a
163 path to an EPANET generatd .hyd file and a ScadaData instance -- that are used instead
164 of re-running the hydraulic simulation.
165 If used, 'rerun_hydraulics_when_reset' must be False.
166
167 The default is None.
168 """
169 def __init__(self, scenario_configs: list[ScenarioConfig],
170 action_space: list[SpeciesInjectionAction],
171 rerun_hydraulics_when_reset: bool = True,
172 precomputed_hydraulics: list[tuple[str, ScadaData]] = None, **kwds):
173 if not isinstance(scenario_configs, list):
174 raise TypeError("'scenario_configs' must be an instance of " +
175 "epyt_flow.simulation.ScenarioConfig but " +
176 f"not of '{type(scenario_configs)}'")
177 if any(not isinstance(scenario_config, ScenarioConfig)
178 for scenario_config in scenario_configs):
179 raise TypeError("All items in 'scenario_config' must be instances of " +
180 "epyt_flow.simulation.ScenarioConfig")
181
182 if len(scenario_configs) > 10:
183 warnings.warn("You are using many scenarios. You might face issues w.r.t. " +
184 "memory consumption as well as with the maximum number of open files " +
185 "allowed by the operating system.", UserWarning)
186
187 self._scenario_configs = scenario_configs
188 self._scenario_sims = [None] * len(scenario_configs)
189 self._hyd_exports = [os.path.join(get_temp_folder(),
190 f"epytcontrol_env_MSX_{uuid.uuid4()}.hyd")
191 for _ in range(len(scenario_configs))]
192 self._hydraulic_scada_datas = [None] * len(scenario_configs)
193 self._current_scenario_idx = 0
194 self._use_precomputed_hydraulics = False
195
196 if precomputed_hydraulics is not None:
197 def __raise_type_error():
198 raise TypeError("'precomputed_hydraulics' must be an instance of " +
199 "'list[tuple[str, epyt_flow.simulation.ScadaData]]'")
200
201 if not isinstance(precomputed_hydraulics, list):
202 __raise_type_error()
203 if any(not isinstance(hyd, tuple) for hyd in precomputed_hydraulics):
204 __raise_type_error()
205 if any(not isinstance(hyd[0], str) or not isinstance(hyd[1], ScadaData)
206 for hyd in precomputed_hydraulics):
207 __raise_type_error()
208 if len(precomputed_hydraulics) != len(scenario_configs):
209 raise ValueError("Length of 'precomputed_hydraulics' must be equal to the " +
210 "number of scenarios in 'scenario_configs'")
211 if rerun_hydraulics_when_reset is True:
212 raise ValueError("'rerun_hydraulics_when_reset' msut be False if " +
213 "pre-computed hydraulics are used")
214
215 self._hyd_exports = []
216 self._hydraulic_scada_datas = []
217 for hyd_file_in, scada_hyd_in in precomputed_hydraulics:
218 self._hyd_exports.append(hyd_file_in)
219 self._hydraulic_scada_datas.append(scada_hyd_in)
220
221 self._use_precomputed_hydraulics = True
222
223 super().__init__(self._scenario_configs[self._current_scenario_idx],
224 action_space, rerun_hydraulics_when_reset,
225 autoreset=True, **kwds)
226 self._hyd_export = self._hyd_exports[self._current_scenario_idx]
227 self._hydraulic_scada_data = self._hydraulic_scada_datas[self._current_scenario_idx]
228
[docs]
229 def reset(self, seed: Optional[int] = None, options: Optional[dict[str, Any]] = None
230 ) -> tuple[np.ndarray, dict]:
231 # Back up current simulation
232 self._scenario_sims[self._current_scenario_idx] = self._scenario_sim
233 self._hydraulic_scada_datas[self._current_scenario_idx] = self._hydraulic_scada_data
234
235 # Move on to next scenario
236 self._current_scenario_idx = (self._current_scenario_idx + 1) % len(self._scenario_configs)
237 self._scenario_config = self._scenario_configs[self._current_scenario_idx]
238 self._scenario_sim = self._scenario_sims[self._current_scenario_idx]
239 self._hyd_export = self._hyd_exports[self._current_scenario_idx]
240 self._hydraulic_scada_data = self._hydraulic_scada_datas[self._current_scenario_idx]
241
242 return super().reset(seed, options)
243
244
245MultiConfigAdvancedQualityControlEnv = MultiConfigEpanetMsxControlEnv