封装器¶
- class gymnasium.vector.VectorWrapper(env: VectorEnv)[source]¶
封装向量化环境以实现模块化转换。
此类是所有向量化环境封装器的基类。子类可以重写某些方法来改变原始向量化环境的行为,而无需修改原始代码。
注意
如果子类重写了
__init__()
,请不要忘记调用super().__init__(env)
。- 参数:
env – 要封装的环境
- step(actions: ActType) tuple[ObsType, ArrayType, ArrayType, ArrayType, dict[str, Any]] [source]¶
使用动作逐步遍历所有环境,并返回批处理数据。
- class gymnasium.vector.VectorObservationWrapper(env: VectorEnv)[source]¶
封装向量化环境以实现观测值的模块化转换。
相当于向量化环境的
gymnasium.ObservationWrapper
。- 参数:
env – 向量环境。
- class gymnasium.vector.VectorActionWrapper(env: VectorEnv)[source]¶
封装向量化环境以实现动作的模块化转换。
相当于向量化环境的
gymnasium.ActionWrapper
。- 参数:
env – 要封装的环境
- class gymnasium.vector.VectorRewardWrapper(env: VectorEnv)[source]¶
封装向量化环境以实现奖励的模块化转换。
相当于向量化环境的
gymnasium.RewardWrapper
。- 参数:
env – 要封装的环境
仅用于向量环境的封装器¶
- class gymnasium.wrappers.vector.DictInfoToList(env: VectorEnv)[source]¶
将向量化环境的信息从
dict
转换为List[dict]
。此封装器将向量环境的信息格式从字典转换为字典列表。此封装器旨在用于向量环境。如果使用其他对信息执行操作(例如 RecordEpisodeStatistics)的封装器,则此封装器需要是外层封装器。
即
DictInfoToList(RecordEpisodeStatistics(vector_env))
示例
>>> import numpy as np >>> dict_info = { ... "k": np.array([0., 0., 0.5, 0.3]), ... "_k": np.array([False, False, True, True]) ... } ... >>> list_info = [{}, {}, {"k": 0.5}, {"k": 0.3}]
- 向量环境示例
>>> import numpy as np >>> import gymnasium as gym >>> envs = gym.make_vec("CartPole-v1", num_envs=3) >>> obs, info = envs.reset(seed=123) >>> info {} >>> envs = DictInfoToList(envs) >>> obs, info = envs.reset(seed=123) >>> info [{}, {}, {}]
- 另一个向量环境示例
>>> import numpy as np >>> import gymnasium as gym >>> envs = gym.make_vec("HalfCheetah-v4", num_envs=2) >>> _ = envs.reset(seed=123) >>> _ = envs.action_space.seed(123) >>> _, _, _, _, infos = envs.step(envs.action_space.sample()) >>> infos {'x_position': array([0.03332211, 0.10172355]), '_x_position': array([ True, True]), 'x_velocity': array([-0.06296527, 0.89345848]), '_x_velocity': array([ True, True]), 'reward_run': array([-0.06296527, 0.89345848]), '_reward_run': array([ True, True]), 'reward_ctrl': array([-0.24503504, -0.21944423], dtype=float32), '_reward_ctrl': array([ True, True])} >>> envs = DictInfoToList(envs) >>> _ = envs.reset(seed=123) >>> _ = envs.action_space.seed(123) >>> _, _, _, _, infos = envs.step(envs.action_space.sample()) >>> infos [{'x_position': np.float64(0.0333221090036294), 'x_velocity': np.float64(-0.06296527291998574), 'reward_run': np.float64(-0.06296527291998574), 'reward_ctrl': np.float32(-0.24503504)}, {'x_position': np.float64(0.10172354684460168), 'x_velocity': np.float64(0.8934584807363618), 'reward_run': np.float64(0.8934584807363618), 'reward_ctrl': np.float32(-0.21944423)}]
- 更改日志
v0.24.0 - 最初添加为
VectorListInfo
v1.0.0 - 重命名为
DictInfoToList
- 参数:
env (Env) – 要应用封装器的环境
- class gymnasium.wrappers.vector.VectorizeTransformObservation(env: VectorEnv, wrapper: type[TransformObservation], **kwargs: Any)[source]¶
将单智能体观测转换封装器向量化,用于向量环境。
大多数用于单智能体环境的 lambda 观测封装器都有向量化实现,建议用户直接从 gymnasium.wrappers.vector… 导入并使用它们。以下示例说明了需要自定义 lambda 观测封装器的情况。
- 示例 - 正常观测
>>> import gymnasium as gym >>> envs = gym.make_vec("CartPole-v1", num_envs=3, vectorization_mode="sync") >>> obs, info = envs.reset(seed=123) >>> envs.close() >>> obs array([[ 0.01823519, -0.0446179 , -0.02796401, -0.03156282], [ 0.02852531, 0.02858594, 0.0469136 , 0.02480598], [ 0.03517495, -0.000635 , -0.01098382, -0.03203924]], dtype=float32)
- 示例 - 应用自定义 lambda 观测封装器,复制环境中的观测值
>>> import numpy as np >>> import gymnasium as gym >>> from gymnasium.spaces import Box >>> from gymnasium.wrappers import TransformObservation >>> envs = gym.make_vec("CartPole-v1", num_envs=3, vectorization_mode="sync") >>> old_space = envs.single_observation_space >>> new_space = Box(low=np.array([old_space.low, old_space.low]), high=np.array([old_space.high, old_space.high])) >>> envs = VectorizeTransformObservation(envs, wrapper=TransformObservation, func=lambda x: np.array([x, x]), observation_space=new_space) >>> obs, info = envs.reset(seed=123) >>> envs.close() >>> obs array([[[ 0.01823519, -0.0446179 , -0.02796401, -0.03156282], [ 0.01823519, -0.0446179 , -0.02796401, -0.03156282]], [[ 0.02852531, 0.02858594, 0.0469136 , 0.02480598], [ 0.02852531, 0.02858594, 0.0469136 , 0.02480598]], [[ 0.03517495, -0.000635 , -0.01098382, -0.03203924], [ 0.03517495, -0.000635 , -0.01098382, -0.03203924]]], dtype=float32)
- 参数:
env – 要封装的向量环境。
wrapper – 要向量化的封装器
**kwargs – 封装器的关键字参数
- class gymnasium.wrappers.vector.VectorizeTransformAction(env: VectorEnv, wrapper: type[TransformAction], **kwargs: Any)[source]¶
将单智能体动作转换封装器向量化,用于向量环境。
- 示例 - 无动作转换
>>> import gymnasium as gym >>> envs = gym.make_vec("MountainCarContinuous-v0", num_envs=3) >>> _ = envs.action_space.seed(123) >>> obs, info = envs.reset(seed=123) >>> obs, rew, term, trunc, info = envs.step(envs.action_space.sample()) >>> envs.close() >>> obs array([[-4.6343064e-01, 9.8971417e-05], [-4.4488689e-01, -1.9375233e-03], [-4.3118435e-01, -1.5342437e-03]], dtype=float32)
- 示例 - 添加一个对动作应用 ReLU 的转换
>>> import gymnasium as gym >>> from gymnasium.wrappers import TransformAction >>> envs = gym.make_vec("MountainCarContinuous-v0", num_envs=3) >>> envs = VectorizeTransformAction(envs, wrapper=TransformAction, func=lambda x: (x > 0.0) * x, action_space=envs.single_action_space) >>> _ = envs.action_space.seed(123) >>> obs, info = envs.reset(seed=123) >>> obs, rew, term, trunc, info = envs.step(envs.action_space.sample()) >>> envs.close() >>> obs array([[-4.6343064e-01, 9.8971417e-05], [-4.4354835e-01, -5.9898634e-04], [-4.3034542e-01, -6.9532328e-04]], dtype=float32)
- 参数:
env – 要封装的向量环境
wrapper – 要向量化的封装器
**kwargs – LambdaAction 封装器的参数
- class gymnasium.wrappers.vector.VectorizeTransformReward(env: VectorEnv, wrapper: type[TransformReward], **kwargs: Any)[source]¶
将单智能体奖励转换封装器向量化,用于向量环境。
- 一个对奖励应用 ReLU 的示例
>>> import gymnasium as gym >>> from gymnasium.wrappers import TransformReward >>> envs = gym.make_vec("MountainCarContinuous-v0", num_envs=3) >>> envs = VectorizeTransformReward(envs, wrapper=TransformReward, func=lambda x: (x > 0.0) * x) >>> _ = envs.action_space.seed(123) >>> obs, info = envs.reset(seed=123) >>> obs, rew, term, trunc, info = envs.step(envs.action_space.sample()) >>> envs.close() >>> rew array([-0., -0., -0.])
- 参数:
env – 要封装的向量环境。
wrapper – 要向量化的封装器
**kwargs – 封装器的关键字参数
向量化通用封装器¶
- class gymnasium.wrappers.vector.RecordEpisodeStatistics(env: VectorEnv, buffer_length: int = 100, stats_key: str = 'episode')[source]¶
此封装器将跟踪累积奖励和回合长度。
在向量化环境中的任何回合结束时,回合统计数据将使用键
episode
添加到info
中,并且_episode
键用于指示已终止或截断回合的环境索引。>>> infos = { ... ... ... "episode": { ... "r": "<array of cumulative reward for each done sub-environment>", ... "l": "<array of episode length for each done sub-environment>", ... "t": "<array of elapsed time since beginning of episode for each done sub-environment>" ... }, ... "_episode": "<boolean array of length num-envs>" ... }
此外,最近的奖励和回合长度存储在缓冲区中,可以通过
wrapped_env.return_queue
和wrapped_env.length_queue
分别访问。- 变量:
return_queue – 最近
deque_size
个回合的累积奖励length_queue – 最近
deque_size
个回合的长度
示例
>>> from pprint import pprint >>> import gymnasium as gym >>> envs = gym.make_vec("CartPole-v1", num_envs=3) >>> envs = RecordEpisodeStatistics(envs) >>> obs, info = envs.reset(123) >>> _ = envs.action_space.seed(123) >>> end = False >>> while not end: ... obs, rew, term, trunc, info = envs.step(envs.action_space.sample()) ... end = term.any() or trunc.any() ... >>> envs.close() >>> pprint(info) {'_episode': array([ True, False, False]), '_final_info': array([ True, False, False]), '_final_observation': array([ True, False, False]), 'episode': {'l': array([11, 0, 0], dtype=int32), 'r': array([11., 0., 0.], dtype=float32), 't': array([0.007812, 0. , 0. ], dtype=float32)}, 'final_info': array([{}, None, None], dtype=object), 'final_observation': array([array([ 0.11448676, 0.9416149 , -0.20946532, -1.7619033 ], dtype=float32), None, None], dtype=object)}
- 参数:
env (Env) – 要应用封装器的环境
buffer_length – 缓冲区
return_queue
、length_queue
和time_queue
的大小stats_key – 用于保存数据的信息键
已实现的观测封装器¶
- class gymnasium.wrappers.vector.TransformObservation(env: VectorEnv, func: Callable[[ObsType], Any], observation_space: Space | None = None, single_observation_space: Space | None = None)[source]¶
通过提供给封装器的函数来转换观测值。
此函数允许手动指定向量观测函数以及单观测函数。例如,当可以并行处理向量观测或通过其他更优化的方法进行处理时,这是可取的。否则,应使用
VectorizeTransformObservation
,其中只需定义single_func
。- 示例 - 无观测转换
>>> import gymnasium as gym >>> envs = gym.make_vec("CartPole-v1", num_envs=3, vectorization_mode="sync") >>> obs, info = envs.reset(seed=123) >>> obs array([[ 0.01823519, -0.0446179 , -0.02796401, -0.03156282], [ 0.02852531, 0.02858594, 0.0469136 , 0.02480598], [ 0.03517495, -0.000635 , -0.01098382, -0.03203924]], dtype=float32) >>> envs.close()
- 示例 - 有观测转换
>>> import gymnasium as gym >>> from gymnasium.spaces import Box >>> def scale_and_shift(obs): ... return (obs - 1.0) * 2.0 ... >>> import gymnasium as gym >>> envs = gym.make_vec("CartPole-v1", num_envs=3, vectorization_mode="sync") >>> new_obs_space = Box(low=envs.observation_space.low, high=envs.observation_space.high) >>> envs = TransformObservation(envs, func=scale_and_shift, observation_space=new_obs_space) >>> obs, info = envs.reset(seed=123) >>> obs array([[-1.9635296, -2.0892358, -2.055928 , -2.0631256], [-1.9429494, -1.9428282, -1.9061728, -1.9503881], [-1.9296501, -2.00127 , -2.0219676, -2.0640786]], dtype=float32) >>> envs.close()
- 参数:
env – 要封装的向量环境
func – 一个将转换向量观测值的函数。如果此转换后的观测值超出
env.observation_space
的观测空间,则提供一个observation_space
。observation_space – 封装器的观测空间。如果为 None,则从
single_observation_space
计算。如果single_observation_space
也未提供,则假定与env.observation_space
相同。single_observation_space – 非向量化环境的观测空间。如果为 None,则假定与
env.single_observation_space
相同。
- class gymnasium.wrappers.vector.FilterObservation(env: VectorEnv, filter_keys: Sequence[str | int])[source]¶
用于过滤字典或元组观测空间的向量封装器。
- 示例 - 创建一个带有字典空间的向量化环境,演示如何过滤键
>>> import numpy as np >>> import gymnasium as gym >>> from gymnasium.spaces import Dict, Box >>> from gymnasium.wrappers import TransformObservation >>> from gymnasium.wrappers.vector import VectorizeTransformObservation, FilterObservation >>> envs = gym.make_vec("CartPole-v1", num_envs=3, vectorization_mode="sync") >>> make_dict = lambda x: {"obs": x, "junk": np.array([0.0])} >>> new_space = Dict({"obs": envs.single_observation_space, "junk": Box(low=-1.0, high=1.0)}) >>> envs = VectorizeTransformObservation(env=envs, wrapper=TransformObservation, func=make_dict, observation_space=new_space) >>> envs = FilterObservation(envs, ["obs"]) >>> obs, info = envs.reset(seed=123) >>> envs.close() >>> obs {'obs': array([[ 0.01823519, -0.0446179 , -0.02796401, -0.03156282], [ 0.02852531, 0.02858594, 0.0469136 , 0.02480598], [ 0.03517495, -0.000635 , -0.01098382, -0.03203924]], dtype=float32)}
- 参数:
env – 要封装的向量环境
filter_keys – 要包含的子空间,对于
Dict
和Tuple
空间分别使用字符串列表或整数列表
- class gymnasium.wrappers.vector.FlattenObservation(env: VectorEnv)[source]¶
将观测值展平的观测封装器。
示例
>>> import gymnasium as gym >>> envs = gym.make_vec("CarRacing-v3", num_envs=3, vectorization_mode="sync") >>> obs, info = envs.reset(seed=123) >>> obs.shape (3, 96, 96, 3) >>> envs = FlattenObservation(envs) >>> obs, info = envs.reset(seed=123) >>> obs.shape (3, 27648) >>> envs.close()
- 参数:
env – 要封装的向量环境
- class gymnasium.wrappers.vector.GrayscaleObservation(env: VectorEnv, keep_dim: bool = False)[source]¶
将 RGB 图像转换为灰度图的观测封装器。
示例
>>> import gymnasium as gym >>> envs = gym.make_vec("CarRacing-v3", num_envs=3, vectorization_mode="sync") >>> obs, info = envs.reset(seed=123) >>> obs.shape (3, 96, 96, 3) >>> envs = GrayscaleObservation(envs) >>> obs, info = envs.reset(seed=123) >>> obs.shape (3, 96, 96) >>> envs.close()
- 参数:
env – 要封装的向量环境
keep_dim – 是否在观测中保留通道,如果为
True
,则obs.shape == 3
,否则obs.shape == 2
- class gymnasium.wrappers.vector.ResizeObservation(env: VectorEnv, shape: tuple[int, ...])[source]¶
使用 OpenCV 将图像观测值调整为指定形状。
示例
>>> import gymnasium as gym >>> envs = gym.make_vec("CarRacing-v3", num_envs=3, vectorization_mode="sync") >>> obs, info = envs.reset(seed=123) >>> obs.shape (3, 96, 96, 3) >>> envs = ResizeObservation(envs, shape=(28, 28)) >>> obs, info = envs.reset(seed=123) >>> obs.shape (3, 28, 28, 3) >>> envs.close()
- 参数:
env – 要封装的向量环境
shape – 调整后的观测形状
- class gymnasium.wrappers.vector.ReshapeObservation(env: VectorEnv, shape: int | tuple[int, ...])[source]¶
将基于数组的观测值重塑为指定形状。
示例
>>> import gymnasium as gym >>> envs = gym.make_vec("CarRacing-v3", num_envs=3, vectorization_mode="sync") >>> obs, info = envs.reset(seed=123) >>> obs.shape (3, 96, 96, 3) >>> envs = ReshapeObservation(envs, shape=(9216, 3)) >>> obs, info = envs.reset(seed=123) >>> obs.shape (3, 9216, 3) >>> envs.close()
- 参数:
env – 要封装的向量环境
shape – 重塑后的观测空间
- class gymnasium.wrappers.vector.RescaleObservation(env: VectorEnv, min_obs: floating | integer | ndarray, max_obs: floating | integer | ndarray)[source]¶
将观测值线性重新缩放到最小值和最大值之间。
示例
>>> import gymnasium as gym >>> envs = gym.make_vec("MountainCar-v0", num_envs=3, vectorization_mode="sync") >>> obs, info = envs.reset(seed=123) >>> obs.min() np.float32(-0.46352962) >>> obs.max() np.float32(0.0) >>> envs = RescaleObservation(envs, min_obs=-5.0, max_obs=5.0) >>> obs, info = envs.reset(seed=123) >>> obs.min() np.float32(-0.90849805) >>> obs.max() np.float32(0.0) >>> envs.close()
- 参数:
env – 要封装的向量环境
min_obs – 新的最小观测边界
max_obs – 新的最大观测边界
- class gymnasium.wrappers.vector.DtypeObservation(env: VectorEnv, dtype: Any)[source]¶
用于转换观测值数据类型的观测封装器。
示例
>>> import numpy as np >>> import gymnasium as gym >>> envs = gym.make_vec("CartPole-v1", num_envs=3, vectorization_mode="sync") >>> obs, info = envs.reset(seed=123) >>> obs.dtype dtype('float32') >>> envs = DtypeObservation(envs, dtype=np.float64) >>> obs, info = envs.reset(seed=123) >>> obs.dtype dtype('float64') >>> envs.close()
- 参数:
env – 要封装的向量环境
dtype – 观测值的新数据类型
- class gymnasium.wrappers.vector.NormalizeObservation(env: VectorEnv, epsilon: float = 1e-8)[source]¶
此封装器将标准化观测值,使得每个坐标都以单位方差为中心。
属性 _update_running_mean 允许冻结/继续观测统计信息的运行平均值计算。如果为 True(默认),RunningMeanStd 将在每次步进和重置调用时更新。如果为 False,则使用计算出的统计信息,但不再更新;这可以在评估期间使用。
注意
归一化取决于过去的轨迹,如果封装器是新实例化的或策略最近发生了变化,观测值将不会正确归一化。
- 没有归一化奖励封装器的示例
>>> import gymnasium as gym >>> envs = gym.make_vec("CartPole-v1", num_envs=3, vectorization_mode="sync") >>> obs, info = envs.reset(seed=123) >>> _ = envs.action_space.seed(123) >>> for _ in range(100): ... obs, *_ = envs.step(envs.action_space.sample()) >>> np.mean(obs) np.float32(0.024251968) >>> np.std(obs) np.float32(0.62259156) >>> envs.close()
- 有归一化奖励封装器的示例
>>> import gymnasium as gym >>> envs = gym.make_vec("CartPole-v1", num_envs=3, vectorization_mode="sync") >>> envs = NormalizeObservation(envs) >>> obs, info = envs.reset(seed=123) >>> _ = envs.action_space.seed(123) >>> for _ in range(100): ... obs, *_ = envs.step(envs.action_space.sample()) >>> np.mean(obs) np.float32(-0.2359734) >>> np.std(obs) np.float32(1.1938739) >>> envs.close()
- 参数:
env (Env) – 要应用封装器的环境
epsilon – 在缩放观测值时使用的稳定性参数。
已实现的动作封装器¶
- class gymnasium.wrappers.vector.TransformAction(env: VectorEnv, func: Callable[[ActType], Any], action_space: Space | None = None, single_action_space: Space | None = None)[source]¶
通过提供给封装器的函数来转换动作。
函数
func
将应用于所有向量动作。如果func
的观测值超出env
动作空间的边界,则提供一个action_space
来指定向量化环境的动作空间。- 示例 - 无动作转换
>>> import gymnasium as gym >>> envs = gym.make_vec("MountainCarContinuous-v0", num_envs=3) >>> _ = envs.action_space.seed(123) >>> obs, info = envs.reset(seed=123) >>> for _ in range(10): ... obs, rew, term, trunc, info = envs.step(envs.action_space.sample()) ... >>> envs.close() >>> obs array([[-0.46553135, -0.00142543], [-0.498371 , -0.00715587], [-0.46515748, -0.00624371]], dtype=float32)
- 示例 - 有动作转换
>>> import gymnasium as gym >>> from gymnasium.spaces import Box >>> def shrink_action(act): ... return act * 0.3 ... >>> envs = gym.make_vec("MountainCarContinuous-v0", num_envs=3) >>> new_action_space = Box(low=shrink_action(envs.action_space.low), high=shrink_action(envs.action_space.high)) >>> envs = TransformAction(env=envs, func=shrink_action, action_space=new_action_space) >>> _ = envs.action_space.seed(123) >>> obs, info = envs.reset(seed=123) >>> for _ in range(10): ... obs, rew, term, trunc, info = envs.step(envs.action_space.sample()) ... >>> envs.close() >>> obs array([[-0.48468155, -0.00372536], [-0.47599354, -0.00545912], [-0.46543318, -0.00615723]], dtype=float32)
- 参数:
env – 要封装的向量环境
func – 一个将转换动作的函数。如果此转换后的动作超出
env.action_space
的动作空间,则提供一个action_space
。action_space – 封装器的动作空间。如果为 None,则从
single_action_space
计算。如果single_action_space
也未提供,则假定与env.action_space
相同。single_action_space – 非向量化环境的动作空间。如果为 None,则假定与
env.single_action_space
相同。
- class gymnasium.wrappers.vector.ClipAction(env: VectorEnv)[source]¶
将连续动作剪裁到有效的
Box
观测空间边界内。- 示例 - 将超出边界的动作传递给环境进行剪裁。
>>> import numpy as np >>> import gymnasium as gym >>> envs = gym.make_vec("MountainCarContinuous-v0", num_envs=3) >>> envs = ClipAction(envs) >>> _ = envs.action_space.seed(123) >>> obs, info = envs.reset(seed=123) >>> obs, rew, term, trunc, info = envs.step(np.array([5.0, -5.0, 2.0])) >>> envs.close() >>> obs array([[-0.4624777 , 0.00105192], [-0.44504836, -0.00209899], [-0.42884544, 0.00080468]], dtype=float32)
- 参数:
env – 要封装的向量环境
- class gymnasium.wrappers.vector.RescaleAction(env: VectorEnv, min_action: float | int | ndarray, max_action: float | int | ndarray)[source]¶
将环境的连续动作空间仿射重新缩放到范围 [min_action, max_action]。
- 示例 - 无动作缩放
>>> import numpy as np >>> import gymnasium as gym >>> envs = gym.make_vec("MountainCarContinuous-v0", num_envs=3) >>> _ = envs.action_space.seed(123) >>> obs, info = envs.reset(seed=123) >>> for _ in range(10): ... obs, rew, term, trunc, info = envs.step(0.5 * np.ones((3, 1))) ... >>> envs.close() >>> obs array([[-0.44799727, 0.00266526], [-0.4351738 , 0.00133522], [-0.42683297, 0.00048403]], dtype=float32)
- 示例 - 有动作缩放
>>> import numpy as np >>> import gymnasium as gym >>> envs = gym.make_vec("MountainCarContinuous-v0", num_envs=3) >>> envs = RescaleAction(envs, 0.0, 1.0) >>> _ = envs.action_space.seed(123) >>> obs, info = envs.reset(seed=123) >>> for _ in range(10): ... obs, rew, term, trunc, info = envs.step(0.5 * np.ones((3, 1))) ... >>> envs.close() >>> obs array([[-0.48657528, -0.00395268], [-0.47377947, -0.00529102], [-0.46546045, -0.00614867]], dtype=float32)
- 参数:
env (Env) – 要封装的向量环境
min_action (float, int or np.ndarray) – 每个动作的最小值。这可以是 numpy 数组或标量。
max_action (float, int or np.ndarray) – 每个动作的最大值。这可以是 numpy 数组或标量。
已实现的奖励封装器¶
- class gymnasium.wrappers.vector.TransformReward(env: VectorEnv, func: Callable[[ArrayType], ArrayType])[source]¶
一个奖励封装器,允许自定义函数修改步进奖励。
- 有奖励转换的示例
>>> import gymnasium as gym >>> from gymnasium.spaces import Box >>> def scale_and_shift(rew): ... return (rew - 1.0) * 2.0 ... >>> envs = gym.make_vec("MountainCarContinuous-v0", num_envs=3) >>> envs = TransformReward(env=envs, func=scale_and_shift) >>> _ = envs.action_space.seed(123) >>> obs, info = envs.reset(seed=123) >>> obs, rew, term, trunc, info = envs.step(envs.action_space.sample()) >>> envs.close() >>> obs array([[-4.6343064e-01, 9.8971417e-05], [-4.4488689e-01, -1.9375233e-03], [-4.3118435e-01, -1.5342437e-03]], dtype=float32)
- 参数:
env (Env) – 要封装的向量环境
func – (可调用):应用于奖励的函数
- class gymnasium.wrappers.vector.ClipReward(env: VectorEnv, min_reward: float | ndarray | None = None, max_reward: float | ndarray | None = None)[source]¶
一个封装器,用于将环境的奖励剪裁到上限和下限之间。
- 有剪裁奖励的示例
>>> import numpy as np >>> import gymnasium as gym >>> envs = gym.make_vec("MountainCarContinuous-v0", num_envs=3) >>> envs = ClipReward(envs, 0.0, 2.0) >>> _ = envs.action_space.seed(123) >>> obs, info = envs.reset(seed=123) >>> for _ in range(10): ... obs, rew, term, trunc, info = envs.step(0.5 * np.ones((3, 1))) ... >>> envs.close() >>> rew array([0., 0., 0.])
- 参数:
env – 要封装的向量环境
min_reward – 每一步的最小奖励
max_reward – 每一步的最大奖励
- class gymnasium.wrappers.vector.NormalizeReward(env: VectorEnv, gamma: float = 0.99, epsilon: float = 1e-8)[source]¶
此封装器将缩放奖励,使其指数移动平均值具有近似固定的方差。
属性 _update_running_mean 允许冻结/继续奖励统计信息的运行平均值计算。如果为 True(默认),RunningMeanStd 将在每次调用 self.normalize() 时更新。如果为 False,则使用计算出的统计信息,但不再更新;这可以在评估期间使用。
注意
缩放取决于过去的轨迹,如果封装器是新实例化的或策略最近发生了变化,奖励将不会正确缩放。
- 没有归一化奖励封装器的示例
>>> import gymnasium as gym >>> import numpy as np >>> envs = gym.make_vec("MountainCarContinuous-v0", 3) >>> _ = envs.reset(seed=123) >>> _ = envs.action_space.seed(123) >>> episode_rewards = [] >>> for _ in range(100): ... observation, reward, *_ = envs.step(envs.action_space.sample()) ... episode_rewards.append(reward) ... >>> envs.close() >>> np.mean(episode_rewards) np.float64(-0.03359492141887935) >>> np.std(episode_rewards) np.float64(0.029028230434438706)
- 有归一化奖励封装器的示例
>>> import gymnasium as gym >>> import numpy as np >>> envs = gym.make_vec("MountainCarContinuous-v0", 3) >>> envs = NormalizeReward(envs) >>> _ = envs.reset(seed=123) >>> _ = envs.action_space.seed(123) >>> episode_rewards = [] >>> for _ in range(100): ... observation, reward, *_ = envs.step(envs.action_space.sample()) ... episode_rewards.append(reward) ... >>> envs.close() >>> np.mean(episode_rewards) np.float64(-0.1598639586606745) >>> np.std(episode_rewards) np.float64(0.27800309628058434)
- 参数:
env (env) – 要应用封装器的环境
epsilon (float) – 一个稳定性参数
gamma (float) – 在指数移动平均中使用的折扣因子。
已实现的数据转换封装器¶
- class gymnasium.wrappers.vector.ArrayConversion(env: VectorEnv, env_xp: ModuleType | str, target_xp: ModuleType | str, env_device: Any | None = None, target_device: Any | None = None)[source]¶
封装一个返回与 Array API 兼容数组的向量环境,以便可以通过特定框架与其交互。
流行的 Array API 框架包括
numpy
、torch
、jax.numpy
、cupy
等。通过此封装器,您可以将环境的输出转换为这些框架中的任何一个。相反,如果可能且无需移动数据或进行设备传输,动作会自动映射回环境框架。注意
`gymnasium.wrappers.ArrayConversion` 的向量化版本。
示例
>>> import gymnasium as gym >>> envs = gym.make_vec("JaxEnv-vx", 3) >>> envs = ArrayConversion(envs, xp=np)
- 参数:
env – 要封装的 Array API 兼容环境
env_xp – 环境所使用的 Array API 框架
target_xp – 要转换到的 Array API 框架
env_device – 环境所在的设备
target_device – 应返回数组的设备
- class gymnasium.wrappers.vector.JaxToNumpy(env: VectorEnv)[source]¶
封装一个 jax 向量环境,以便可以通过 numpy 数组与其交互。
注意
`gymnasium.wrappers.JaxToNumpy` 的向量化版本。
动作必须以 numpy 数组形式提供,观测值、奖励、终止和截断将以 numpy 数组形式返回。
示例
>>> import gymnasium as gym >>> envs = gym.make_vec("JaxEnv-vx", 3) >>> envs = JaxToNumpy(envs)
- 参数:
env – 要封装的向量 jax 环境
- class gymnasium.wrappers.vector.JaxToTorch(env: VectorEnv, device: str | device | None = None)[source]¶
封装一个基于 Jax 的向量环境,以便可以通过 PyTorch 张量与其交互。
动作必须以 PyTorch 张量形式提供,观测值、奖励、终止和截断将以 PyTorch 张量形式返回。
示例
>>> import gymnasium as gym >>> envs = gym.make_vec("JaxEnv-vx", 3) >>> envs = JaxToTorch(envs)
- 参数:
env – 要封装的基于 Jax 的向量环境
device – torch 张量应移动到的设备
- class gymnasium.wrappers.vector.NumpyToTorch(env: VectorEnv, device: str | device | None = None)[source]¶
封装一个基于 numpy 的环境,以便可以通过 PyTorch 张量与其交互。
示例
>>> import torch >>> import gymnasium as gym >>> from gymnasium.wrappers.vector import NumpyToTorch >>> envs = gym.make_vec("CartPole-v1", 3) >>> envs = NumpyToTorch(envs) >>> obs, _ = envs.reset(seed=123) >>> type(obs) <class 'torch.Tensor'> >>> action = torch.tensor(envs.action_space.sample()) >>> obs, reward, terminated, truncated, info = envs.step(action) >>> envs.close() >>> type(obs) <class 'torch.Tensor'> >>> type(reward) <class 'torch.Tensor'> >>> type(terminated) <class 'torch.Tensor'> >>> type(truncated) <class 'torch.Tensor'>
- 参数:
env – 要封装的基于 NumPy 的向量环境
device – torch 张量应移动到的设备