AsyncVectorEnv

class gymnasium.vector.AsyncVectorEnv(env_fns: Sequence[Callable[[], Env]], shared_memory: bool = True, copy: bool = True, context: str | None = None, daemon: bool = True, worker: Callable[[int, Callable[[], Env], Connection, Connection, bool, Queue], None] | None = None, observation_mode: str | Space = 'same', autoreset_mode: str | AutoresetMode = AutoresetMode.NEXT_STEP)[source]

向量化环境,并行运行多个环境。

它使用 multiprocessing 进程和管道进行通信。

示例

>>> import gymnasium as gym
>>> envs = gym.make_vec("Pendulum-v1", num_envs=2, vectorization_mode="async")
>>> envs
AsyncVectorEnv(Pendulum-v1, num_envs=2)
>>> envs = gym.vector.AsyncVectorEnv([
...     lambda: gym.make("Pendulum-v1", g=9.81),
...     lambda: gym.make("Pendulum-v1", g=1.62)
... ])
>>> envs
AsyncVectorEnv(num_envs=2)
>>> observations, infos = envs.reset(seed=42)
>>> observations
array([[-0.14995256,  0.9886932 , -0.12224312],
       [ 0.5760367 ,  0.8174238 , -0.91244936]], dtype=float32)
>>> infos
{}
>>> _ = envs.action_space.seed(123)
>>> observations, rewards, terminations, truncations, infos = envs.step(envs.action_space.sample())
>>> observations
array([[-0.1851753 ,  0.98270553,  0.714599  ],
       [ 0.6193494 ,  0.7851154 , -1.0808398 ]], dtype=float32)
>>> rewards
array([-2.96495728, -1.00214607])
>>> terminations
array([False, False])
>>> truncations
array([False, False])
>>> infos
{}
参数:
  • env_fns – 创建环境的函数。

  • shared_memory – 如果为 True,则来自工作进程的观测值将通过共享变量传回。如果观测值很大(例如图像),这可以提高效率。

  • copy – 如果为 True,则 AsyncVectorEnv.reset()AsyncVectorEnv.step() 方法返回观测值的副本。

  • contextmultiprocessing 的上下文。如果为 None,则使用默认上下文。

  • daemon – 如果为 True,则子进程的 daemon 标志将开启;也就是说,如果主进程退出,它们将退出。但是,daemon=True 阻止子进程生成子进程,因此对于某些环境,您可能希望将其设置为 False

  • worker – 如果设置,则在子进程中使用该 worker 而不是默认 worker。这对于覆盖某些内部向量环境逻辑可能很有用,例如,如何处理终止或截断时的重置。

  • observation_mode – 定义环境观测空间应如何批量处理。“same” 定义应有 n 个相同空间的副本。“different” 定义可以有具有不同参数的多个观测空间,但需要相同的形状和 dtype,警告,可能会引发意外错误。传递 Tuple[Space, Space] 对象允许定义自定义的 single_observation_spaceobservation_space,警告,可能会引发意外错误。

  • autoreset_mode – 使用的自动重置模式,有关更多信息,请参阅 https://farama.org/Vector-Autoreset-Mode

警告

worker 是一个高级模式选项。它提供了高度的灵活性和很高的自伤风险;因此,如果您正在编写自己的 worker,建议从 _worker (或 _async_worker) 方法的代码开始,并添加更改。

引发:
  • RuntimeError – 如果某些子环境的观测空间与 observation_space 不匹配(或者,默认情况下,与第一个子环境的观测空间不匹配)。

  • ValueError – 如果 observation_space 是自定义空间(即,不是 Gym 中的默认空间,例如 gymnasium.spaces.Box、gymnasium.spaces.Discrete 或 gymnasium.spaces.Dict)且 shared_memory 为 True。

reset(*, seed: int | list[int] | None = None, options: dict[str, Any] | None = None) tuple[ObsType, dict[str, Any]][source]

并行重置所有子环境,并返回一批连接的观测值和信息。

参数:
  • seed – 环境重置种子

  • options – 是否返回选项

返回:

来自向量化环境的一批观测值和信息。

step(actions: ActType) tuple[ObsType, ArrayType, ArrayType, ArrayType, dict[str, Any]][source]

为每个并行环境采取一个动作。

参数:

actionsaction_space 动作批次中的元素。

返回:

批次 (观测值、奖励、终止、截断、信息)

close(**kwargs: Any)

关闭所有并行环境并释放资源。

它还会关闭所有现有的图像查看器,然后调用 close_extras() 并将 closed 设置为 True

警告

此函数本身不关闭环境,应在 close_extras() 中处理。这对于同步和异步向量化环境都是通用的。

注意

当垃圾回收或程序退出时,将自动调用此函数。

参数:

**kwargs – 传递给 close_extras() 的关键字参数

call(name: str, *args: Any, **kwargs: Any) tuple[Any, ...][source]

使用 args 和 kwargs 从每个并行环境中调用一个方法。

参数:
  • name (str) – 要调用的方法或属性的名称。

  • *args – 应用于方法调用的位置参数。

  • **kwargs – 应用于方法调用的关键字参数。

返回:

对每个环境的方法或属性的单独调用的结果列表。

get_attr(name: str) tuple[Any, ...][source]

从每个并行环境中获取一个属性。

参数:

name (str) – 要从每个单独环境中获取的属性的名称。

返回:

具有名称的属性

set_attr(name: str, values: list[Any] | tuple[Any] | object)[source]

设置子环境的属性。

参数:
  • name – 要在每个单独环境中设置的属性的名称。

  • values – 要设置的属性的值。如果 values 是列表或元组,则它对应于每个单独环境的值,否则为所有环境设置单个值。

引发:
  • ValueError – 值必须是列表或元组,其长度等于环境的数量。

  • AlreadyPendingCallError – 在等待挂起的调用完成时调用 set_attr()

其他方法

property AsyncVectorEnv.np_random: tuple[Generator, ...]

返回包装环境的 numpy 随机数生成器元组。

property AsyncVectorEnv.np_random_seed: tuple[int, ...]

返回所有包装环境的 np_random 种子元组。