PID Control of Chlorine Injection

This example demonstrates how to use a simple PID controller for controlling the Chlorine injection in a simple EPANET scenario.

[1]:
from IPython.display import display, HTML
display(HTML('<a target=\"_blank\" href=\"https://colab.research.google.com/github/WaterFutures/EPyT-Control/blob/main/docs/examples/pid_control.ipynb\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>'))
Open In Colab
[2]:
%pip install epyt-control --quiet
Note: you may need to restart the kernel to use updated packages.
[3]:
import numpy as np
from epyt_flow.data.benchmarks import load_leakdb_scenarios
from epyt_flow.simulation import ScenarioSimulator, EpanetConstants, ModelUncertainty, \
    ScenarioConfig, ScadaData, SensorConfig
from epyt_flow.uncertainty import AbsoluteGaussianUncertainty
from epyt_flow.utils import to_seconds, plot_timeseries_data

from epyt_control.envs import EpanetControlEnv
from epyt_control.envs.actions import ChemicalInjectionAction
from epyt_control.controllers import PidController

Create a simple EPANET scenario based on the Hanoi network from LeakDB where a single chlorine injection pump at the reservoir must be controlled:

[4]:
def create_scenario():
    # Create a scenario based the LeakDB Hanoi
    [scenario_config] = load_leakdb_scenarios(scenarios_id=list(range(1)), use_net1=False)
    with ScenarioSimulator(scenario_config=scenario_config) as sim:
        # Set simulation duration to 20 days
        sim.set_general_parameters(simulation_duration=to_seconds(days=20))

        # Enable chlorine simulation and place a chlorine injection pump at the reservoir
        sim.enable_chemical_analysis()

        reservoid_node_id, = sim.epanet_api.get_all_reservoirs_id()[0]
        sim.add_quality_source(node_id=reservoid_node_id,
                               pattern=np.array([1.]),
                               source_type=EpanetConstants.EN_MASS,
                               pattern_id="my-chl-injection")

        # Set initial concentration and simple (constant) reactions
        for node_idx in sim.epanet_api.get_all_nodes_idx():
            sim.epanet_api.set_node_init_quality(node_idx, 0)
        for link_idx in sim.epanet_api.get_all_links_idx():
            sim.epanet_api.setlinkvalue(link_idx, EpanetConstants.EN_BULKORDER, -.5)
            sim.epanet_api.setlinkvalue(link_idx, EpanetConstants.EN_WALLORDER, -.01)

        # Set flow and chlorine sensors everywhere
        sim.sensor_config = SensorConfig.create_empty_sensor_config(sim.sensor_config)
        sim.set_flow_sensors(sim.sensor_config.links)
        sim.set_node_quality_sensors(sim.sensor_config.nodes)

        # Specify uncertainties
        my_uncertainties = {"global_demand_pattern_uncertainty": AbsoluteGaussianUncertainty(mean=0, scale=.2)}
        sim.set_model_uncertainty(ModelUncertainty(**my_uncertainties))

        # Export scenario
        sim.save_to_epanet_file("cl_injection_scenario.inp")
        sim.get_scenario_config().save_to_file("cl_injection_scenario")
[5]:
create_scenario()

Create a simple environment derived from epyt_control.envs.EpanetControlEnv (equivalent to epyt_control.envs.HydraulicControlEnv) where the aforementioned chlorine injection pump at the reservoir (node ID “1”) must be controlled such that the chlorine concentration at all nodes is between \(0.2\) mg/l and \(2\) mg/l:

[6]:
class SimpleChlorineInjectionEnv(EpanetControlEnv):
    """
    A simple environment for controlling the chlorine injection.
    """
    def __init__(self):
        scenario_config_file_in = "cl_injection_scenario.epytflow_scenario_config"

        super().__init__(scenario_config=ScenarioConfig.load_from_file(scenario_config_file_in),
                         chemical_injection_actions=[ChemicalInjectionAction(node_id="1",
                                                                             pattern_id="my-chl-injection",
                                                                             source_type_id=EpanetConstants.EN_MASS,
                                                                             upper_bound=15000.)],
                         autoreset=False,
                         reload_scenario_when_reset=False)

    def _compute_reward_function(self, scada_data: ScadaData) -> float:
        """
        Computes the current reward based on the current sensors readings (i.e. SCADA data).
        The reward is zero iff all chlorine bounds are satisfied and negative otherwise.

        Parameters
        ----------
        :class:`epyt_flow.simulation.ScadaData`
            Current sensor readings.

        Returns
        -------
        `float`
            Current reward.
        """
        # Sum up (negative) residuals for out of bounds Cl concentrations at nodes -- i.e.
        # reward of zero means everythings is okay, while a negative reward
        # denotes Cl concentration bound violations
        reward = 0.

        # Regulation Limits
        upper_cl_bound = 2.  # (mg/l)
        lower_cl_bound = .3  # (mg/l)

        new_sensor_config = SensorConfig.create_empty_sensor_config(scada_data.sensor_config)
        new_sensor_config.quality_node_sensors = scada_data.sensor_config.nodes
        old_sensor_config = scada_data.sensor_config
        scada_data.change_sensor_config(new_sensor_config)

        nodes_quality = scada_data.get_data_nodes_quality()

        upper_bound_violation_idx = nodes_quality > upper_cl_bound
        reward += -1. * np.sum(nodes_quality[upper_bound_violation_idx] - upper_cl_bound)

        lower_bound_violation_idx = nodes_quality < lower_cl_bound
        reward += np.sum(nodes_quality[lower_bound_violation_idx] - lower_cl_bound)

        scada_data.change_sensor_config(old_sensor_config)
        return reward
[7]:
# Create/Load environment
env = SimpleChlorineInjectionEnv()

Create a simple PID controller for controlling the chlorine (Cl) injection. Recall that a reward of zero indicates that Cl bounds at all nodes are satisfied! Also, note that a better performance couod be achieved by properly tuning the gain coefficients:

[8]:
pid_control = PidController(proportional_gain=10., integral_gain=10.,
                            derivative_gain=0.,
                            target_value=0.,
                            action_lower_bound=float(env.action_space.low),
                            action_upper_bound=float(env.action_space.high))

/tmp/ipykernel_986/3140074456.py:4: DeprecationWarning: Conversion of an array with ndim > 0 to a scalar is deprecated, and will error in future. Ensure you extract a single element from your array before performing this operation. (Deprecated NumPy 1.25.)
  action_lower_bound=float(env.action_space.low),
/tmp/ipykernel_986/3140074456.py:5: DeprecationWarning: Conversion of an array with ndim > 0 to a scalar is deprecated, and will error in future. Ensure you extract a single element from your array before performing this operation. (Deprecated NumPy 1.25.)
  action_upper_bound=float(env.action_space.high))

Run the controller – i.e. execute controller on the environment:

[9]:
# Reset environment
env.reset()
reward = 0

# Run controller and environment
rewards = []
actions = []
while True:
    # Compute chlorine injection action
    act = [pid_control.step(reward)]

    # Execute Cl injection and observe a reward
    _, reward, terminated, _, _ = env.step(act)
    if terminated is True:
        break

    # Show observed reward and chosen action
    rewards.append(reward)
    actions.append(*act)

Show results – i.e. reward and action (Cl injection) over time:

[10]:
# Show reward and actions over time
plot_timeseries_data(np.array(rewards).reshape(1, -1),
                     y_axis_label="Reward",
                     x_axis_label="Time steps (30min)")
../_images/examples_pid_control_15_0.png
[10]:
<Axes: xlabel='Time steps (30min)', ylabel='Reward'>
[11]:

plot_timeseries_data(np.array(actions).reshape(1, -1), y_axis_label="Cl injection $mg$", x_axis_label="Time steps (30min)")
../_images/examples_pid_control_16_0.png
[11]:
<Axes: xlabel='Time steps (30min)', ylabel='Cl injection $mg$'>

Do not forget to close the environment by calling the close() function:

[12]:
env.close()
[ ]: