Source code for epyt_control.envs.actions.actuator_state_actions

  1"""
  2This module contains classes for modeling actuator state action spaces --
  3i.e. controlling pump and valve states.
  4"""
  5from copy import deepcopy
  6from epyt_flow.gym import ScenarioControlEnv
  7from epyt_flow.simulation.events import ActuatorConstants
  8from gymnasium.spaces import Space, Discrete
  9
 10from .actions import Action
 11
 12
[docs] 13class ActuatorStateAction(Action): 14 """ 15 Base class of an actuator state action. 16 17 Parameters 18 ---------- 19 state_space : `list[int]` 20 List of possible actions that can be taken by the agent. 21 """ 22 def __init__(self, state_space: list[int], **kwds): 23 if not isinstance(state_space, list): 24 raise TypeError("'state_space' must be an instance of 'list[int]' " + 25 f"but not of '{type(state_space)}'") 26 if any(not isinstance(state, int) for state in state_space): 27 raise TypeError("All states in 'state_space' must be an instance of 'int'") 28 self._action_space = state_space 29 30 super().__init__(**kwds) 31 32 def __eq__(self, other) -> bool: 33 return super().__eq__(other) and self._action_space == other.action_space 34 35 def __str__(self) -> str: 36 return super().__str__() + f"Action space: {self._action_space}" 37 38 @property 39 def action_space(self) -> list[int]: 40 """ 41 Returns the list of possible actions that an agent can take. 42 43 Returns 44 ------- 45 `list[int]` 46 List of possible actions. 47 """ 48 return deepcopy(self._action_space) 49
[docs] 50 def to_gym_action_space(self) -> Space: 51 return Discrete(len(self._action_space))
52 53
[docs] 54class ValveStateAction(ActuatorStateAction): 55 """ 56 Action for controlling the state of a valve. 57 58 Parameters 59 ---------- 60 valve_id : `str` 61 ID of the valve. 62 """ 63 def __init__(self, valve_id: str, **kwds): 64 self._valve_id = valve_id 65 66 super().__init__(state_space=[ActuatorConstants.EN_OPEN, ActuatorConstants.EN_CLOSED], 67 **kwds) 68 69 def __eq__(self, other) -> bool: 70 return super().__eq__(other) and self.valve_id == other.valve_id 71 72 def __str__(self) -> str: 73 return super().__str__() + f"Valve ID: {self._valve_id}" 74 75 @property 76 def valve_id(self) -> str: 77 """ 78 Returns the ID of the valve. 79 80 Returns 81 ------- 82 `str` 83 ID of the valve. 84 """ 85 return self._valve_id 86
[docs] 87 def apply(self, env: ScenarioControlEnv, action_value: int) -> None: 88 env.set_valve_status(self._valve_id, action_value)
89 90
[docs] 91class PumpStateAction(ActuatorStateAction): 92 """ 93 Action for controling the state of a pump. 94 95 Parameters 96 ---------- 97 pump_id : `str` 98 ID of the pump. 99 """ 100 def __init__(self, pump_id: str, **kwds): 101 self._pump_id = pump_id 102 103 super().__init__(state_space=[ActuatorConstants.EN_OPEN, ActuatorConstants.EN_CLOSED], 104 **kwds) 105 106 def __eq__(self, other) -> bool: 107 return super().__eq__(other) and self._pump_id == other.pump_id 108 109 def __str__(self) -> str: 110 return super().__str__() + f"Pump ID: {self._pump_id}" 111 112 @property 113 def pump_id(self) -> str: 114 """ 115 Return the ID of the pump. 116 117 Returns 118 ------- 119 `str` 120 ID of the pump. 121 """ 122 return self._pump_id 123
[docs] 124 def apply(self, env: ScenarioControlEnv, action_value) -> None: 125 env.set_pump_status(self._pump_id, action_value)