创建自定义环境

本页简要概述了如何使用 Gymnasium 创建自定义环境。如需包含渲染的更完整教程,请在阅读本页之前阅读完整教程,并阅读基本用法

我们将实现一个非常简单的游戏,名为 GridWorldEnv,它由固定大小的二维正方形网格组成。智能体可以在每个时间步中在网格单元之间垂直或水平移动,智能体的目标是导航到网格上在剧集开始时随机放置的目标。

关于游戏的基本信息

  • 观测提供目标和智能体的位置。

  • 我们的环境中有 4 个离散动作,分别对应于“右”、“上”、“左”和“下”的移动。

  • 当智能体导航到目标所在的网格单元时,环境结束(终止)。

  • 只有当智能体到达目标时才会被奖励,即当智能体到达目标时奖励为 1,否则为 0。

环境 __init__

与所有环境一样,我们的自定义环境将继承自 gymnasium.Env,它定义了环境的结构。环境的要求之一是定义观测空间和动作空间,它们声明了环境可能输入(动作)和输出(观测)的通用集合。正如我们在关于游戏的基本信息中概述的那样,我们的智能体有四个离散动作,因此我们将使用具有四个选项的 Discrete(4) 空间。

对于我们的观测,有几种选择,在本教程中,我们将假设我们的观测看起来像 {"agent": array([1, 0]), "target": array([0, 3])},其中数组元素表示智能体或目标的 x 和 y 位置。表示观测的替代选项是使用 2d 网格,其值表示网格上的智能体和目标,或使用 3d 网格,其中每个“层”仅包含智能体或目标信息。因此,我们将声明观测空间为 Dict,其中智能体和目标空间是 Box,允许 int 类型的数组输出。

有关可用于环境的可能空间的完整列表,请参阅空间

from typing import Optional
import numpy as np
import gymnasium as gym


class GridWorldEnv(gym.Env):

    def __init__(self, size: int = 5):
        # The size of the square grid
        self.size = size

        # Define the agent and target location; randomly chosen in `reset` and updated in `step`
        self._agent_location = np.array([-1, -1], dtype=np.int32)
        self._target_location = np.array([-1, -1], dtype=np.int32)

        # Observations are dictionaries with the agent's and the target's location.
        # Each location is encoded as an element of {0, ..., `size`-1}^2
        self.observation_space = gym.spaces.Dict(
            {
                "agent": gym.spaces.Box(0, size - 1, shape=(2,), dtype=int),
                "target": gym.spaces.Box(0, size - 1, shape=(2,), dtype=int),
            }
        )

        # We have 4 actions, corresponding to "right", "up", "left", "down"
        self.action_space = gym.spaces.Discrete(4)
        # Dictionary maps the abstract actions to the directions on the grid
        self._action_to_direction = {
            0: np.array([1, 0]),  # right
            1: np.array([0, 1]),  # up
            2: np.array([-1, 0]),  # left
            3: np.array([0, -1]),  # down
        }

构建观测

由于我们需要在 Env.reset()Env.step() 中计算观测,因此通常方便地拥有一个 _get_obs 方法,该方法将环境的状态转换为观测。但是,这不是强制性的,您可以分别在 Env.reset()Env.step() 中计算观测。

    def _get_obs(self):
        return {"agent": self._agent_location, "target": self._target_location}

我们还可以为 Env.reset()Env.step() 返回的辅助信息实现类似的方法。在我们的例子中,我们希望提供智能体和目标之间的曼哈顿距离

    def _get_info(self):
        return {
            "distance": np.linalg.norm(
                self._agent_location - self._target_location, ord=1
            )
        }

通常,info 还将包含一些仅在 Env.step() 方法内部可用的数据(例如,单独的奖励项)。在这种情况下,我们将不得不更新 _get_infoEnv.step() 中返回的字典。

Reset 函数

reset() 的目的是为环境启动一个新剧集,并具有两个参数:seedoptionsseed 可用于将随机数生成器初始化为确定性状态,options 可用于指定 reset 中使用的值。在 reset 的第一行,您需要调用 super().reset(seed=seed),这将初始化随机数生成器(np_random)以在 reset() 的其余部分中使用。

在我们的自定义环境中,reset() 需要随机选择智能体和目标的位置(如果它们的位置相同,我们会重复此操作)。reset() 的返回类型是初始观测和任何辅助信息的元组。因此,我们可以使用我们之前为此实现的 _get_obs_get_info 方法

    def reset(self, seed: Optional[int] = None, options: Optional[dict] = None):
        # We need the following line to seed self.np_random
        super().reset(seed=seed)

        # Choose the agent's location uniformly at random
        self._agent_location = self.np_random.integers(0, self.size, size=2, dtype=int)

        # We will sample the target's location randomly until it does not coincide with the agent's location
        self._target_location = self._agent_location
        while np.array_equal(self._target_location, self._agent_location):
            self._target_location = self.np_random.integers(
                0, self.size, size=2, dtype=int
            )

        observation = self._get_obs()
        info = self._get_info()

        return observation, info

Step 函数

step() 方法通常包含您环境的大部分逻辑,它接受一个 action,并在应用该动作后计算环境的状态,返回下一个观测、结果奖励、环境是否已终止、环境是否已截断以及辅助信息的元组。

对于我们的环境,在 step 函数期间需要发生几件事

  • 我们使用 self._action_to_direction 将离散动作(例如,2)转换为具有智能体位置的网格方向。为了防止智能体超出网格边界,我们裁剪智能体的位置以保持在边界内。

  • 我们通过检查智能体的当前位置是否等于目标的位置来计算智能体的奖励。

  • 由于环境在内部不会截断(我们可以在 make() 期间将时间限制包装器应用于环境),因此我们将 truncated 永久设置为 False。

  • 我们再次使用 _get_obs 和 _get_info 来获取智能体的观测和辅助信息。

    def step(self, action):
        # Map the action (element of {0,1,2,3}) to the direction we walk in
        direction = self._action_to_direction[action]
        # We use `np.clip` to make sure we don't leave the grid bounds
        self._agent_location = np.clip(
            self._agent_location + direction, 0, self.size - 1
        )

        # An environment is completed if and only if the agent has reached the target
        terminated = np.array_equal(self._agent_location, self._target_location)
        truncated = False
        reward = 1 if terminated else 0  # the agent is only reached at the end of the episode
        observation = self._get_obs()
        info = self._get_info()

        return observation, reward, terminated, truncated, info

注册和创建环境

虽然现在可以直接使用您的新自定义环境,但更常见的是使用 gymnasium.make() 初始化环境。在本节中,我们将解释如何注册自定义环境,然后对其进行初始化。

环境 ID 由三个组件组成,其中两个是可选的:一个可选的命名空间(此处:gymnasium_env)、一个强制性名称(此处:GridWorld)和一个可选但推荐的版本(此处:v0)。它也可能已注册为 GridWorld-v0(推荐方法)、GridWorldgymnasium_env/GridWorld,然后在环境创建期间应使用适当的 ID。

入口点可以是字符串或函数,由于本教程不是 python 项目的一部分,因此我们不能使用字符串,但对于大多数环境,这是指定入口点的常用方法。

Register 还有其他参数,可用于指定环境的关键字参数,例如,是否应用时间限制包装器等。有关更多信息,请参见 gymnasium.register()

gym.register(
    id="gymnasium_env/GridWorld-v0",
    entry_point=GridWorldEnv,
)

有关注册自定义环境(包括字符串入口点)的更完整指南,请阅读完整的创建环境教程。

环境注册后,您可以通过 gymnasium.pprint_registry() 进行检查,这将输出所有已注册的环境,然后可以使用 gymnasium.make() 初始化该环境。可以使用 gymnasium.make_vec() 实例化环境的向量化版本,该版本具有并行运行的同一环境的多个实例。

import gymnasium as gym
>>> gym.make("gymnasium_env/GridWorld-v0")
<OrderEnforcing<PassiveEnvChecker<GridWorld<gymnasium_env/GridWorld-v0>>>>
>>> gym.make("gymnasium_env/GridWorld-v0", max_episode_steps=100)
<TimeLimit<OrderEnforcing<PassiveEnvChecker<GridWorld<gymnasium_env/GridWorld-v0>>>>>
>>> env = gym.make("gymnasium_env/GridWorld-v0", size=10)
>>> env.unwrapped.size
10
>>> gym.make_vec("gymnasium_env/GridWorld-v0", num_envs=3)
SyncVectorEnv(gymnasium_env/GridWorld-v0, num_envs=3)

使用包装器

通常,我们希望使用自定义环境的不同变体,或者我们希望修改 Gymnasium 或其他方提供的环境的行为。包装器允许我们在不更改环境实现或添加任何样板代码的情况下执行此操作。查看包装器文档,了解如何使用包装器的详细信息以及有关实现自己的说明。在我们的示例中,观测不能直接在学习代码中使用,因为它们是字典。但是,我们实际上不需要修改我们的环境实现来解决这个问题!我们可以简单地在环境实例之上添加一个包装器,以将观测展平为单个数组

>>> from gymnasium.wrappers import FlattenObservation

>>> env = gym.make('gymnasium_env/GridWorld-v0')
>>> env.observation_space
Dict('agent': Box(0, 4, (2,), int64), 'target': Box(0, 4, (2,), int64))
>>> env.reset()
({'agent': array([4, 1]), 'target': array([2, 4])}, {'distance': 5.0})
>>> wrapped_env = FlattenObservation(env)
>>> wrapped_env.observation_space
Box(0, 4, (4,), int64)
>>> wrapped_env.reset()
(array([3, 0, 2, 1]), {'distance': 2.0})