Custom Environments

EPyT-Control also allows the user to easily create their own control environments for developing and testing control strategies and methods.

Creating a custom control environment requires two steps:

  1. Creating a scenario

  2. Creating an environment

Creating a Scenario

Specifying the (EPANET or EPANET-MSX) scenario that will be used in the control environment. The specification is needed as an epyt_flow.simulation.ScenarioConfig instance.

Note that the observation space (i.e. input to the agent/control strategy) is automatically derived from the specified sensor configuration.

Creating an Environment

Creating a custom environment requires deriving a child class from HydraulicControlEnv (if you are dealing with an EPANET scenario) or AdvancedQualityControlEnv (if you are dealing with an EPANET-MSX scenario).

Note

Note that EpanetControlEnv is a synonym for HydraulicControlEnv and EpanetMsxControlEnv is a synonym for AdvancedQualityControlEnv.

In this child class, you have to overwrite and implement the _compute_reward_function() function. This function gets as an input the system state as a epyt_flow.simulation.ScadaData instance, and must return a reward – this function can make arbitrary changes to the given epyt_flow.simulation.ScadaData instance because it will be discarded after this function is called.

The __init__ function of the parent class requires the scenario configuration (as a epyt_flow.simulation.ScenarioConfig instance) describing the scenario for which a control strategy is required.

Note

The observation space (i.e. input to the agent/control strategy) is automatically derived from the sensor configuration specified in the scenario configuration.

Furthermore, the action space has to be specied as well. For that, HydraulicControlEnv and AdvancedQualityControlEnv provide arguments where a list of all actions (per action type) specify the action space – please see the following tables for an overview of all supported types of actions.

Possible actions in an EPANET scenario (i.e. an HydraulicControlEnv instance):

Implementation

Description

ValveStateAction

Opening/Closing a valve.

PumpStateAction

Starting/Stopping a pump.

PumpSpeedAction

Setting the speed of a pump.

ChemicalInjectionAction

Injecting a chemical.

Possible actions in an EPANET-MSX scenario (i.e. an AdvancedQualityControlEnv instance):

Implementation

Description

SpeciesInjectionAction

Injecting a specific species.

Multi-Config Environments

The environments HydraulicControlEnv and AdvancedQualityControlEnv can only handle a single EPANET or EPANET-MSX scenario.

However, the corresponding equivalents MultiConfigHydraulicControlEnv (also available as MultiConfigEpanetControlEnv) and MultiConfigAdvancedQualityControlEnv (also available as MultiConfigEpanetMsxControlEnv) support an arbitrary number of scenarios that are processed in a Round-robin scheduling scheme – i.e. the environment switches to the next scenario whenever the current scenario is finished.

Example

Example of creating an EPANET-MSX environment for controlling the chlorine (CL2) injection in the Hanoi network (given as “Hanoi.inp”), where we place a chlorine injection pump at the reservoir (node “1”). The dynamics of chlorine are described in “cl2.msx” which is given as well. The objective is to make sure that the chlorine concentration stays within a pre-defined bound.

First, we have to create a new scenario, specify the CL2 source (will be used for controlling the CL2 injection in the environment), and specify a sensor configuration from which the observation space will be derived automatically:

with ScenarioSimulator(f_inp_in="Hanoi.inp", f_msx_in="cl2.msx") as scenario:
    # Set simulation duration to 21 days -- see EPANET-MSX bug
    scenario.set_general_parameters(simulation_duration=to_seconds(days=21))

    # Place a chlorine injection pump at the reservoirs (node "1")
    scenario.add_species_injection_source(species_id="CL2",
                                          node_id="1",
                                          pattern=np.array([1]),
                                          source_type=EpanetConstants.EN_MASS,
                                          pattern_id=f"cl2-injection-at-node_1")

    # Place flow sensors everywhere
    scenario.sensor_config = SensorConfig.create_empty_sensor_config(sim.sensor_config)
    scenario.set_flow_sensors(scenario.sensor_config.links)

    # Export .inp and .msx files
    scenario.save_to_epanet_file(inp_file_path="hanoi-cl2.inp",
                                 msx_file_path="hanoi-cl2.msx")

    # Export scenario
    scenario.get_scenario_config().save_to_file("hanoi-cl2")

Second, we create the environment – there is only one action (CL2 injection at the reservoir) and we decide not to re-run the hydraulic simulation when the environment is reset:

class MyEnv(AdvancedQualityControlEnv):
    def __init__(self, scenario_config_file_in: str):
        cl_injection_action = SpeciesInjectionAction(species_id="CL2",
                                                     node_id="1",
                                                     pattern_id="cl2-injection-at-node_1",
                                                     source_type_id=EpanetConstants.EN_MASS,
                                                     upper_bound=10000.)

        scenario_config = ScenarioConfig.load_from_file(scenario_config_file_in)
        super().__init__(scenario_config=scenario_config,
                         action_space=[cl_injection_action],
                         autoreset=True,
                         rerun_hydraulics_when_reset=False)

        self.__sensor_config_reward = None

    def _compute_reward_function(self, scada_data: ScadaData) -> float:
        # Regulation Limits
        lower_cl_bound = .3  # (mg/l)
        upper_cl_bound = 2.  # (mg/l)

        # Change the sensor configuration to measure the CL2 concentration at every node
        if self.__sensor_config_reward is None:
            self.__sensor_config_reward = SensorConfig.create_empty_sensor_config(scada_data.sensor_config)
            self.__sensor_config_reward.bulk_species_node_sensors = {"CL2": scada_data.sensor_config.nodes}
        scada_data.change_sensor_config(self.__sensor_config_reward)

        nodes_quality = scada_data.get_data_bulk_species_node_concentration({"CL2": scada_data.sensor_config.nodes})

        # Sum up (negative) residuals for out-of-bounds Cl concentrations at nodes -- i.e.
        # reward of zero means everything is okay, while a negative reward
        # denotes Cl concentration bound violations
        reward = 0.

        upper_bound_violation_idx = nodes_quality > upper_cl_bound
        reward += -1. * np.sum(nodes_quality[upper_bound_violation_idx] - upper_cl_bound)

        lower_bound_violation_idx = nodes_quality < lower_cl_bound
        reward += np.sum(nodes_quality[lower_bound_violation_idx] - lower_cl_bound)

        return reward