AsyncVectorEnv¶
- class gymnasium.vector.AsyncVectorEnv(env_fns: Sequence[Callable[[], Env]], shared_memory: bool = True, copy: bool = True, context: str | None = None, daemon: bool = True, worker: Callable[[int, Callable[[], Env], Connection, Connection, bool, Queue], None] | None = None, observation_mode: str | Space = 'same')[source]¶
并行运行多个环境的矢量化环境。
它使用
multiprocessing
进程,以及用于通信的管道。示例
>>> import gymnasium as gym >>> envs = gym.make_vec("Pendulum-v1", num_envs=2, vectorization_mode="async") >>> envs AsyncVectorEnv(Pendulum-v1, num_envs=2) >>> envs = gym.vector.AsyncVectorEnv([ ... lambda: gym.make("Pendulum-v1", g=9.81), ... lambda: gym.make("Pendulum-v1", g=1.62) ... ]) >>> envs AsyncVectorEnv(num_envs=2) >>> observations, infos = envs.reset(seed=42) >>> observations array([[-0.14995256, 0.9886932 , -0.12224312], [ 0.5760367 , 0.8174238 , -0.91244936]], dtype=float32) >>> infos {} >>> _ = envs.action_space.seed(123) >>> observations, rewards, terminations, truncations, infos = envs.step(envs.action_space.sample()) >>> observations array([[-0.1851753 , 0.98270553, 0.714599 ], [ 0.6193494 , 0.7851154 , -1.0808398 ]], dtype=float32) >>> rewards array([-2.96495728, -1.00214607]) >>> terminations array([False, False]) >>> truncations array([False, False]) >>> infos {}
- 参数:
env_fns – 创建环境的函数。
shared_memory – 如果为
True
,则来自工作进程的观察结果通过共享变量传递回来。如果观察结果很大(例如图像),这可以提高效率。copy – 如果为
True
,则AsyncVectorEnv.reset()
和AsyncVectorEnv.step()
方法返回观察结果的副本。context – multiprocessing 的上下文。如果为
None
,则使用默认上下文。daemon – 如果为
True
,则子进程的daemon
标志被打开;也就是说,如果主进程退出,它们也会退出。但是,daemon=True
会阻止子进程生成子进程,因此对于某些环境,您可能需要将其设置为False
。worker – 如果设置,则在子进程中使用该工作进程,而不是默认工作进程。这对于覆盖某些内部矢量化环境逻辑很有用,例如,如何处理终止或截断时的重置。
observation_mode – 定义环境观察空间如何进行批处理。‘same’ 定义应该有
n
个相同空间的副本。‘different’ 定义可能有多个观察空间,但参数不同,不过需要相同的形状和数据类型,警告,可能会出现意外错误。传递Tuple[Space, Space]
对象可以定义自定义single_observation_space
和observation_space
,警告,可能会出现意外错误。
警告
worker 是一个高级模式选项。它提供了高度的灵活性和很大的自毁风险;因此,如果您正在编写自己的工作进程,建议您从
_worker
(或_async_worker
)方法的代码开始,并添加更改。- 引发:
RuntimeError – 如果某个子环境的观察空间与 observation_space 不匹配(或者,默认情况下,与第一个子环境的观察空间不匹配)。
ValueError – 如果 observation_space 是一个自定义空间(即不是 Gym 中的默认空间,例如 gymnasium.spaces.Box、gymnasium.spaces.Discrete 或 gymnasium.spaces.Dict)并且 shared_memory 为 True。
- reset(*, seed: int | list[int] | None = None, options: dict[str, Any] | None = None) tuple[ObsType, dict[str, Any]] [source]¶
并行重置所有子环境,并返回一批串联的观察结果和信息。
- 参数:
seed – 环境重置种子
options – 是否返回选项
- 返回:
来自矢量化环境的一批观测结果和信息。
- step(actions: ActType) tuple[ObsType, ArrayType, ArrayType, ArrayType, dict[str, Any]] [source]¶
对每个并行环境执行一个动作。
- 参数:
actions –
action_space
的元素,动作批次。- 返回:
Batch of (observations, rewards, terminations, truncations, infos)
- close(**kwargs: Any)¶
关闭所有并行环境并释放资源。
它还会关闭所有现有的图像查看器,然后调用
close_extras()
并将closed
设置为True
。警告
此函数本身不会关闭环境,应在
close_extras()
中处理。 这对于同步和异步矢量化环境都是通用的。注意
这将在垃圾回收或程序退出时自动调用。
- 参数:
**kwargs – 传递给
close_extras()
的关键字参数
- call(name: str, *args: Any, **kwargs: Any) tuple[Any, ...] [source]¶
使用 args 和 kwargs 从每个并行环境调用一个方法。
- 参数:
name (str) – 要调用的方法或属性的名称。
*args – 应用于方法调用的位置参数。
**kwargs – 应用于方法调用的关键字参数。
- 返回:
每个环境对方法或属性的单个调用的结果列表。
- get_attr(name: str) tuple[Any, ...] [source]¶
从每个并行环境获取属性。
- 参数:
name (str) – 要从每个单独环境获取的属性的名称。
- 返回:
具有名称的属性
- set_attr(name: str, values: list[Any] | tuple[Any] | object)[source]¶
设置子环境的属性。
- 参数:
name – 要在每个单独环境中设置的属性的名称。
values – 要设置的属性的值。如果
values
是一个列表或元组,则它对应于每个单独环境的值,否则所有环境都设置单个值。
- 引发:
ValueError – 值必须是一个长度等于环境数量的列表或元组。
AlreadyPendingCallError – 在等待挂起调用完成时调用
set_attr()
。
其他方法¶
- property AsyncVectorEnv.np_random: tuple[Generator, ...]¶
返回包装环境的 numpy 随机数生成器的元组。
- property AsyncVectorEnv.np_random_seed: tuple[int, ...]¶
返回所有包装环境的 np_random 种子的元组。