{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# PID Control of Chlorine Injection\n", "\n", "This example demonstrates how to use a simple PID controller for controlling the Chlorine injection in a simple EPANET scenario." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from IPython.display import display, HTML\n", "display(HTML('\\\"Open'))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%pip install epyt-control --quiet" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "from epyt_flow.data.benchmarks import load_leakdb_scenarios\n", "from epyt_flow.simulation import ScenarioSimulator, EpanetConstants, ModelUncertainty, \\\n", " ScenarioConfig, ScadaData, SensorConfig\n", "from epyt_flow.uncertainty import AbsoluteGaussianUncertainty\n", "from epyt_flow.utils import to_seconds, plot_timeseries_data\n", "\n", "from epyt_control.envs import EpanetControlEnv\n", "from epyt_control.envs.actions import ChemicalInjectionAction\n", "from epyt_control.controllers import PidController" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create a simple EPANET scenario based on the Hanoi network from [LeakDB](https://waterfutures.github.io/WaterBenchmarkHub/benchmarks/KIOS-LeakDB.html) where a single chlorine injection pump at the reservoir must be controlled:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def create_scenario():\n", " # Create a scenario based the LeakDB Hanoi\n", " [scenario_config] = load_leakdb_scenarios(scenarios_id=list(range(1)), use_net1=False)\n", " with ScenarioSimulator(scenario_config=scenario_config) as sim:\n", " # Set simulation duration to 20 days\n", " sim.set_general_parameters(simulation_duration=to_seconds(days=20))\n", "\n", " # Enable chlorine simulation and place a chlorine injection pump at the reservoir\n", " sim.enable_chemical_analysis()\n", "\n", " reservoid_node_id, = sim.epanet_api.get_all_reservoirs_id()[0]\n", " sim.add_quality_source(node_id=reservoid_node_id,\n", " pattern=np.array([1.]),\n", " source_type=EpanetConstants.EN_MASS,\n", " pattern_id=\"my-chl-injection\")\n", "\n", " # Set initial concentration and simple (constant) reactions\n", " for node_idx in sim.epanet_api.get_all_nodes_idx():\n", " sim.epanet_api.set_node_init_quality(node_idx, 0)\n", " for link_idx in sim.epanet_api.get_all_links_idx():\n", " sim.epanet_api.setlinkvalue(link_idx, EpanetConstants.EN_BULKORDER, -.5)\n", " sim.epanet_api.setlinkvalue(link_idx, EpanetConstants.EN_WALLORDER, -.01)\n", "\n", " # Set flow and chlorine sensors everywhere\n", " sim.sensor_config = SensorConfig.create_empty_sensor_config(sim.sensor_config)\n", " sim.set_flow_sensors(sim.sensor_config.links)\n", " sim.set_node_quality_sensors(sim.sensor_config.nodes)\n", "\n", " # Specify uncertainties\n", " my_uncertainties = {\"global_demand_pattern_uncertainty\": AbsoluteGaussianUncertainty(mean=0, scale=.2)}\n", " sim.set_model_uncertainty(ModelUncertainty(**my_uncertainties))\n", "\n", " # Export scenario\n", " sim.save_to_epanet_file(\"cl_injection_scenario.inp\")\n", " sim.get_scenario_config().save_to_file(\"cl_injection_scenario\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "create_scenario()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create a simple environment derived from [epyt_control.envs.EpanetControlEnv](https://epyt-control.readthedocs.io/en/stable/epyt_control.envs.html#epyt_control.envs.hydraulic_control_env.EpanetControlEnv) (equivalent to [epyt_control.envs.HydraulicControlEnv](https://epyt-control.readthedocs.io/en/stable/epyt_control.envs.html#epyt_control.envs.hydraulic_control_env.HydraulicControlEnv)) where the aforementioned chlorine injection pump at the reservoir (node ID \"1\") must be controlled such that the chlorine concentration at all nodes is between $0.2$ mg/l and $2$ mg/l:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class SimpleChlorineInjectionEnv(EpanetControlEnv):\n", " \"\"\"\n", " A simple environment for controlling the chlorine injection.\n", " \"\"\"\n", " def __init__(self):\n", " scenario_config_file_in = \"cl_injection_scenario.epytflow_scenario_config\"\n", "\n", " super().__init__(scenario_config=ScenarioConfig.load_from_file(scenario_config_file_in),\n", " chemical_injection_actions=[ChemicalInjectionAction(node_id=\"1\",\n", " pattern_id=\"my-chl-injection\",\n", " source_type_id=EpanetConstants.EN_MASS,\n", " upper_bound=15000.)],\n", " autoreset=False,\n", " reload_scenario_when_reset=False)\n", "\n", " def _compute_reward_function(self, scada_data: ScadaData) -> float:\n", " \"\"\"\n", " Computes the current reward based on the current sensors readings (i.e. SCADA data).\n", " The reward is zero iff all chlorine bounds are satisfied and negative otherwise.\n", "\n", " Parameters\n", " ----------\n", " :class:`epyt_flow.simulation.ScadaData`\n", " Current sensor readings.\n", "\n", " Returns\n", " -------\n", " `float`\n", " Current reward.\n", " \"\"\"\n", " # Sum up (negative) residuals for out of bounds Cl concentrations at nodes -- i.e.\n", " # reward of zero means everythings is okay, while a negative reward\n", " # denotes Cl concentration bound violations\n", " reward = 0.\n", "\n", " # Regulation Limits\n", " upper_cl_bound = 2. # (mg/l)\n", " lower_cl_bound = .3 # (mg/l)\n", "\n", " new_sensor_config = SensorConfig.create_empty_sensor_config(scada_data.sensor_config)\n", " new_sensor_config.quality_node_sensors = scada_data.sensor_config.nodes\n", " old_sensor_config = scada_data.sensor_config\n", " scada_data.change_sensor_config(new_sensor_config)\n", "\n", " nodes_quality = scada_data.get_data_nodes_quality()\n", "\n", " upper_bound_violation_idx = nodes_quality > upper_cl_bound\n", " reward += -1. * np.sum(nodes_quality[upper_bound_violation_idx] - upper_cl_bound)\n", "\n", " lower_bound_violation_idx = nodes_quality < lower_cl_bound\n", " reward += np.sum(nodes_quality[lower_bound_violation_idx] - lower_cl_bound)\n", "\n", " scada_data.change_sensor_config(old_sensor_config)\n", " return reward" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create/Load environment\n", "env = SimpleChlorineInjectionEnv() " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create a simple [PID controller](https://epyt-control.readthedocs.io/en/stable/epyt_control.controllers.html#epyt_control.controllers.pid.PidController) for controlling the chlorine (Cl) injection. Recall that a reward of zero indicates that Cl bounds at all nodes are satisfied! Also, note that a better performance couod be achieved by properly tuning the gain coefficients:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pid_control = PidController(proportional_gain=10., integral_gain=10.,\n", " derivative_gain=0.,\n", " target_value=0.,\n", " action_lower_bound=float(env.action_space.low),\n", " action_upper_bound=float(env.action_space.high))\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Run the controller -- i.e. execute controller on the environment:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Reset environment\n", "env.reset()\n", "reward = 0\n", "\n", "# Run controller and environment\n", "rewards = []\n", "actions = []\n", "while True:\n", " # Compute chlorine injection action\n", " act = [pid_control.step(reward)]\n", "\n", " # Execute Cl injection and observe a reward\n", " _, reward, terminated, _, _ = env.step(act)\n", " if terminated is True:\n", " break\n", "\n", " # Show observed reward and chosen action\n", " rewards.append(reward)\n", " actions.append(*act)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Show results -- i.e. reward and action (Cl injection) over time:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Show reward and actions over time\n", "plot_timeseries_data(np.array(rewards).reshape(1, -1),\n", " y_axis_label=\"Reward\",\n", " x_axis_label=\"Time steps (30min)\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "plot_timeseries_data(np.array(actions).reshape(1, -1),\n", " y_axis_label=\"Cl injection $mg$\",\n", " x_axis_label=\"Time steps (30min)\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Do not forget to close the environment by calling the [close()](https://epyt-flow.readthedocs.io/en/stable/epyt_flow.gym.html#epyt_flow.gym.scenario_control_env.ScenarioControlEnv.close) function:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "env.close()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.16" } }, "nbformat": 4, "nbformat_minor": 2 }