{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Pump Control\n", "\n", "This example demonstrates how to build and apply reinforcement learning to a continous pump speed control environment." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from IPython.display import display, HTML\n", "display(HTML('\\\"Open'))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%pip install epyt-control --quiet" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "import pandas as pd\n", "from stable_baselines3 import SAC\n", "from gymnasium.wrappers import RescaleAction, NormalizeObservation\n", "from epyt_flow.simulation import ScenarioSimulator, ScenarioConfig, ScadaData\n", "from epyt_control.envs import HydraulicControlEnv\n", "from epyt_control.envs.actions import PumpSpeedAction" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create a control environment based on a special version of [Anytown](https://waterfutures.github.io/WaterBenchmarkHub/benchmarks/network-Anytown.html) where three (parallel) pumps next to a reservoir have to be controlled such that some pressure constraints at all nodes are satisfied.\n", "The observations (i.e. input to the controller) are the pressure at every junction in the network, as well as the efficiency of every pump." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def create_scenario(f_inp_in: str) -> tuple[ScenarioConfig, list[str]]:\n", " \"\"\"\n", " Creates a new scenario for a given .inp file.\n", " Note that pressure sensors are placed at every junction.\n", " \"\"\"\n", " with ScenarioSimulator(f_inp_in=f_inp_in) as scenario:\n", " # Sensors = input to the agent (control strategy)\n", " # Place pressure sensors at all junctions\n", " junctions = scenario.sensor_config.nodes\n", " for tank_id in scenario.sensor_config.tanks:\n", " junctions.remove(tank_id)\n", " scenario.set_pressure_sensors(sensor_locations=junctions)\n", "\n", " # Place pump efficiency sensors at every pump\n", " scenario.place_pump_efficiency_sensors_everywhere()\n", "\n", " # Place flow sensors at every pump and tank connection\n", " topo = scenario.get_topology()\n", " tank_connections = []\n", " for tank in topo.get_all_tanks():\n", " for link, _ in topo.get_adjacent_links(tank):\n", " tank_connections.append(link)\n", "\n", " flow_sensors = tank_connections + scenario.sensor_config.pumps\n", " scenario.set_flow_sensors(flow_sensors)\n", "\n", " # Return the scenario config and tank connections\n", " return scenario.get_scenario_config(), tank_connections" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class ContinuousPumpControlEnv(HydraulicControlEnv):\n", " \"\"\"\n", " Class implementing a continous pump speed environment --\n", " i.e. a continous action space for the pump speed.\n", " \"\"\"\n", " def __init__(self):\n", " f_inp_in = \"Anytown.inp\"\n", " scenario_config, tank_connections = create_scenario(f_inp_in)\n", "\n", " self._tank_connections = tank_connections\n", " self._network_constraints = {\"min_pressure\": 28.1227832,\n", " \"max_pressure\": 70,\n", " \"max_pump_efficiencies\": pd.Series({\"b1\": .65,\n", " \"b2\": .65,\n", " \"b3\": .65})}\n", " self._objective_weights = {\"pressure_violation\": .9,\n", " \"abs_tank_flow\": .02,\n", " \"pump_efficiency\": .08}\n", "\n", " super().__init__(scenario_config=scenario_config,\n", " pumps_speed_actions=[PumpSpeedAction(pump_id=p_id,\n", " speed_upper_bound=4.0)\n", " for p_id in scenario_config.sensor_config.pumps],\n", " autoreset=True,\n", " reload_scenario_when_reset=False)\n", "\n", " def _compute_reward_function(self, scada_data: ScadaData) -> float:\n", " # Compute different objectives and final reward\n", " pressure_data = scada_data.get_data_pressures()\n", " tanks_flow_data = scada_data.get_data_flows(sensor_locations=self._tank_connections)\n", " pumps_flow_data = scada_data.get_data_flows(sensor_locations=scada_data.sensor_config.pumps)\n", " pump_efficiency = scada_data.get_data_pumps_efficiency()\n", "\n", " pressure_violations = np.logical_or(\n", " pressure_data > self._network_constraints[\"max_pressure\"],\n", " pressure_data < self._network_constraints[\"min_pressure\"]\n", " ).any(axis=0).sum()\n", " n_sensors = pressure_data.shape[1]\n", " pressure_obj = float(1 - pressure_violations / n_sensors)\n", "\n", " total_abs_tank_flow = np.abs(tanks_flow_data).sum(axis=None)\n", " total_pump_flow = pumps_flow_data.sum(axis=None)\n", " tank_obj = float(total_pump_flow / (total_pump_flow + total_abs_tank_flow))\n", "\n", " pump_efficiencies = pd.Series(\n", " pump_efficiency.mean(axis=0),\n", " index=scada_data.sensor_config.pumps\n", " )\n", " max_pump_efficiencies = self._network_constraints[\"max_pump_efficiencies\"]\n", " normalized_pump_efficiencies = pump_efficiencies / max_pump_efficiencies\n", " pump_efficiency_obj = normalized_pump_efficiencies.mean()\n", "\n", " reward = self._objective_weights[\"pressure_violation\"] * pressure_obj + \\\n", " self._objective_weights[\"abs_tank_flow\"] * tank_obj + \\\n", " self._objective_weights[\"pump_efficiency\"] * pump_efficiency_obj\n", "\n", " return reward" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "env = ContinuousPumpControlEnv()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Improve learning by appling some standard wrapper to the environment for [normalizing the observations](https://gymnasium.farama.org/api/wrappers/observation_wrappers/#gymnasium.wrappers.NormalizeObservation) and [re-scaling the action space](https://gymnasium.farama.org/api/wrappers/action_wrappers/#gymnasium.wrappers.RescaleAction):" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Wrap environment\n", "env = NormalizeObservation(env)\n", "env = RescaleAction(env, min_action=-1, max_action=1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Use the [Soft Actor Critic (SAC)](https://stable-baselines3.readthedocs.io/en/master/modules/sac.html) method for learning a policy (i.e. control strategy).\n", "\n", "Note that inceasing the number of time steps might improve the performance." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Apply a simple policy learner\n", "# You might want to add wrappers (e.g. normalizing inputs, rewards, etc.) and logging here\n", "# Also, inceasing the number of time steps might help as well\n", "model = SAC(\"MlpPolicy\", env)\n", "model.learn(total_timesteps=100)\n", "model.save(\"my_model_pumpspeed.zip\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Do not forget to close the environment by calling the [close()](https://epyt-flow.readthedocs.io/en/stable/epyt_flow.gym.html#epyt_flow.gym.scenario_control_env.ScenarioControlEnv.close) function:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "env.close()" ] } ], "metadata": { "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.16" } }, "nbformat": 4, "nbformat_minor": 2 }