创建自定义环境

编码前:环境设计

创建强化学习 (RL) 环境就像设计一个视频游戏或模拟。在编写任何代码之前,你需要仔细思考想要解决的学习问题。这个设计阶段至关重要——一个设计不佳的环境将使学习变得困难甚至不可能,无论你的算法有多好。

关键设计问题

问自己这些基本问题:

🎯 智能体应该学习什么技能?

  • 穿越迷宫?

  • 平衡和控制一个系统?

  • 优化资源分配?

  • 玩策略游戏?

👀 智能体需要什么信息?

  • 位置和速度?

  • 系统当前状态?

  • 历史数据?

  • 部分可观测性还是完全可观测性?

🎮 智能体可以采取什么动作?

  • 离散选择(向上/向下/向左/向右移动)?

  • 连续控制(转向角、油门)?

  • 多个同时动作?

🏆 我们如何衡量成功?

  • 达到特定目标?

  • 最小化时间或能量?

  • 最大化得分?

  • 避免失败?

⏰ 何时结束回合?

  • 任务完成(成功/失败)?

  • 时间限制?

  • 安全约束?

网格世界 (GridWorld) 示例设计

对于我们的教程示例,我们将创建一个简单的网格世界 (GridWorld) 环境

  • 🎯 技能:高效导航到目标位置

  • 👀 信息:智能体在网格上的位置和目标位置

  • 🎮 动作:向上、向下、向左或向右移动

  • 🏆 成功:以最少步数到达目标

  • ⏰ 结束:当智能体到达目标时(或可选的时间限制)

这提供了一个清晰的学习问题,它足够简单易懂,但要最优地解决却并非微不足道。


本页提供了使用 Gymnasium 创建自定义环境的完整实现。有关包含渲染功能的完整教程

我们建议你在阅读本页之前,先熟悉基本用法

我们将把我们的网格世界游戏实现为一个固定大小的二维方格。智能体在每个时间步可以在网格单元之间垂直或水平移动,目标是在回合开始时随机放置的目标位置导航。

环境 __init__ 函数

像所有环境一样,我们的自定义环境将继承自 gymnasium.Env,它定义了所有环境必须遵循的结构。其中一个要求是定义观测空间和动作空间,它们声明了此环境的有效输入(动作)和输出(观测)。

正如我们的设计所概述的,我们的智能体有四个离散动作(向四个基本方向移动),所以我们将使用 Discrete(4) 空间。

对于我们的观测,我们有几种选择。我们可以将整个网格表示为二维数组,或者使用坐标位置,甚至使用带有独立“层”的 3D 数组来表示智能体和目标。对于本教程,我们将使用简单的字典格式,例如 {"agent": array([1, 0]), "target": array([0, 3])},其中数组表示 x,y 坐标。

这种选择使得观测结果易于人类阅读和调试。我们将此声明为一个 Dict 空间,其中智能体和目标空间为包含整数坐标的 Box 空间。

有关可用于环境的所有可能空间的完整列表,请参阅空间

from typing import Optional
import numpy as np
import gymnasium as gym


class GridWorldEnv(gym.Env):

    def __init__(self, size: int = 5):
        # The size of the square grid (5x5 by default)
        self.size = size

        # Initialize positions - will be set randomly in reset()
        # Using -1,-1 as "uninitialized" state
        self._agent_location = np.array([-1, -1], dtype=np.int32)
        self._target_location = np.array([-1, -1], dtype=np.int32)

        # Define what the agent can observe
        # Dict space gives us structured, human-readable observations
        self.observation_space = gym.spaces.Dict(
            {
                "agent": gym.spaces.Box(0, size - 1, shape=(2,), dtype=int),   # [x, y] coordinates
                "target": gym.spaces.Box(0, size - 1, shape=(2,), dtype=int),  # [x, y] coordinates
            }
        )

        # Define what actions are available (4 directions)
        self.action_space = gym.spaces.Discrete(4)

        # Map action numbers to actual movements on the grid
        # This makes the code more readable than using raw numbers
        self._action_to_direction = {
            0: np.array([1, 0]),   # Move right (positive x)
            1: np.array([0, 1]),   # Move up (positive y)
            2: np.array([-1, 0]),  # Move left (negative x)
            3: np.array([0, -1]),  # Move down (negative y)
        }

构建观测

由于我们需要在 Env.reset()Env.step() 中计算观测,因此拥有一个辅助方法 _get_obs 来将环境的内部状态转换为观测格式会很方便。这使我们的代码 DRY (不要重复自己),并使其更容易在以后修改观测格式。

    def _get_obs(self):
        """Convert internal state to observation format.

        Returns:
            dict: Observation with agent and target positions
        """
        return {"agent": self._agent_location, "target": self._target_location}

我们还可以为 Env.reset()Env.step() 返回的辅助信息实现类似的方法。在我们的例子中,我们将提供智能体和目标之间的曼哈顿距离——这对于调试和理解智能体进度可能很有用,但不应被学习算法本身使用。

    def _get_info(self):
        """Compute auxiliary information for debugging.

        Returns:
            dict: Info with distance between agent and target
        """
        return {
            "distance": np.linalg.norm(
                self._agent_location - self._target_location, ord=1
            )
        }

有时 info 中会包含只在 Env.step() 内部才可用的数据(如单个奖励分量、动作成功/失败等)。在这些情况下,我们会在 step 方法中直接更新 _get_info 返回的字典。

重置函数

reset() 方法启动一个新的回合。它接受两个可选参数:用于可复现随机生成的 seed 和用于附加配置的 options。在第一行,你必须调用 super().reset(seed=seed) 以正确初始化随机数生成器。

在我们的网格世界环境中,reset() 随机地将智能体和目标放置在网格上,确保它们不会在同一个位置开始。我们以元组的形式返回初始观测和信息。

    def reset(self, seed: Optional[int] = None, options: Optional[dict] = None):
        """Start a new episode.

        Args:
            seed: Random seed for reproducible episodes
            options: Additional configuration (unused in this example)

        Returns:
            tuple: (observation, info) for the initial state
        """
        # IMPORTANT: Must call this first to seed the random number generator
        super().reset(seed=seed)

        # Randomly place the agent anywhere on the grid
        self._agent_location = self.np_random.integers(0, self.size, size=2, dtype=int)

        # Randomly place target, ensuring it's different from agent position
        self._target_location = self._agent_location
        while np.array_equal(self._target_location, self._agent_location):
            self._target_location = self.np_random.integers(
                0, self.size, size=2, dtype=int
            )

        observation = self._get_obs()
        info = self._get_info()

        return observation, info

步进函数

step() 方法包含核心环境逻辑。它接收一个动作,更新环境状态,并返回结果。物理、游戏规则和奖励逻辑都在这里。

对于网格世界,我们需要:1. 将离散动作转换为移动方向 2. 更新智能体的位置(带边界检查)3. 根据是否达到目标计算奖励 4. 判断回合是否应该结束 5. 返回所有所需信息

    def step(self, action):
        """Execute one timestep within the environment.

        Args:
            action: The action to take (0-3 for directions)

        Returns:
            tuple: (observation, reward, terminated, truncated, info)
        """
        # Map the discrete action (0-3) to a movement direction
        direction = self._action_to_direction[action]

        # Update agent position, ensuring it stays within grid bounds
        # np.clip prevents the agent from walking off the edge
        self._agent_location = np.clip(
            self._agent_location + direction, 0, self.size - 1
        )

        # Check if agent reached the target
        terminated = np.array_equal(self._agent_location, self._target_location)

        # We don't use truncation in this simple environment
        # (could add a step limit here if desired)
        truncated = False

        # Simple reward structure: +1 for reaching target, 0 otherwise
        # Alternative: could give small negative rewards for each step to encourage efficiency
        reward = 1 if terminated else 0

        observation = self._get_obs()
        info = self._get_info()

        return observation, reward, terminated, truncated, info

常见环境设计陷阱

现在你已经了解了基本结构,让我们讨论初学者常犯的错误

奖励设计问题

问题:仅在最后才给予奖励(稀疏奖励)

# This makes learning very difficult!
reward = 1 if terminated else 0

更好:提供中间反馈

# Option 1: Small step penalty to encourage efficiency
reward = 1 if terminated else -0.01

# Option 2: Distance-based reward shaping
distance = np.linalg.norm(self._agent_location - self._target_location)
reward = 1 if terminated else -0.1 * distance

状态表示问题

问题:包含不相关信息或缺少关键细节

# Too much info - agent doesn't need grid size in every observation
obs = {"agent": self._agent_location, "target": self._target_location, "size": self.size}

# Too little info - agent can't distinguish different positions
obs = {"distance": distance}  # Missing actual positions!

更好:精确包含最佳决策所需的一切

# Just right - positions are sufficient for navigation
obs = {"agent": self._agent_location, "target": self._target_location}

动作空间问题

问题:动作不合理或无法执行

# Bad: Agent can move diagonally but environment doesn't support it
self.action_space = gym.spaces.Discrete(8)  # 8 directions including diagonals

# Bad: Continuous actions for discrete movement
self.action_space = gym.spaces.Box(-1, 1, shape=(2,))  # Continuous x,y movement

边界处理错误

问题:允许无效状态或模糊的边界行为

# Bad: Agent can go outside the grid
self._agent_location = self._agent_location + direction  # No bounds checking!

# Unclear: What happens when agent hits wall?
if np.any(self._agent_location < 0) or np.any(self._agent_location >= self.size):
    # Do nothing? Reset episode? Give penalty? Unclear!

更好:清晰、一致的边界处理

# Clear: Agent stays in place when hitting boundaries
self._agent_location = np.clip(
    self._agent_location + direction, 0, self.size - 1
)

注册和创建环境

虽然你可以立即使用你的自定义环境,但将其注册到 Gymnasium 中会更方便,这样你就可以像创建内置环境一样使用 gymnasium.make() 来创建它。

环境 ID 包含三个组成部分:可选的命名空间(此处为:gymnasium_env)、强制名称(此处为:GridWorld)以及可选但推荐的版本(此处为:v0)。你可以将其注册为 GridWorld-v0GridWorldgymnasium_env/GridWorld,但为了清晰起见,推荐使用完整格式。

由于本教程不属于 Python 包的一部分,我们直接将类作为入口点传递。在实际项目中,你通常会使用像 "my_package.envs:GridWorldEnv" 这样的字符串。

# Register the environment so we can create it with gym.make()
gym.register(
    id="gymnasium_env/GridWorld-v0",
    entry_point=GridWorldEnv,
    max_episode_steps=300,  # Prevent infinite episodes
)

有关注册自定义环境(包括使用字符串入口点)的更完整指南,请阅读完整的创建环境教程。

注册后,你可以使用 gymnasium.pprint_registry() 检查所有可用环境,并使用 gymnasium.make() 创建实例。你还可以使用 gymnasium.make_vec() 创建向量化版本。

import gymnasium as gym

# Create the environment like any built-in environment
>>> env = gym.make("gymnasium_env/GridWorld-v0")
<OrderEnforcing<PassiveEnvChecker<GridWorld<gymnasium_env/GridWorld-v0>>>>

# Customize environment parameters
>>> env = gym.make("gymnasium_env/GridWorld-v0", size=10)
>>> env.unwrapped.size
10

# Create multiple environments for parallel training
>>> vec_env = gym.make_vec("gymnasium_env/GridWorld-v0", num_envs=3)
SyncVectorEnv(gymnasium_env/GridWorld-v0, num_envs=3)

调试你的环境

当你的环境未按预期工作时,以下是常见的调试策略

检查环境有效性

from gymnasium.utils.env_checker import check_env

# This will catch many common issues
try:
    check_env(env)
    print("Environment passes all checks!")
except Exception as e:
    print(f"Environment has issues: {e}")

使用已知动作手动测试

# Test specific action sequences to verify behavior
env = gym.make("gymnasium_env/GridWorld-v0")
obs, info = env.reset(seed=42)  # Use seed for reproducible testing

print(f"Starting position - Agent: {obs['agent']}, Target: {obs['target']}")

# Test each action type
actions = [0, 1, 2, 3]  # right, up, left, down
for action in actions:
    old_pos = obs['agent'].copy()
    obs, reward, terminated, truncated, info = env.step(action)
    new_pos = obs['agent']
    print(f"Action {action}: {old_pos} -> {new_pos}, reward={reward}")

常见调试问题

# Issue 1: Forgot to call super().reset()
def reset(self, seed=None, options=None):
    # super().reset(seed=seed)  # ❌ Missing this line
    # Results in: possibly incorrect seeding

# Issue 2: Wrong action mapping
self._action_to_direction = {
    0: np.array([1, 0]),   # right
    1: np.array([0, 1]),   # up - but is this really "up" in your coordinate system?
    2: np.array([-1, 0]),  # left
    3: np.array([0, -1]),  # down
}

# Issue 3: Not handling boundaries properly
# This allows agent to go outside the grid!
self._agent_location = self._agent_location + direction  # ❌ No bounds checking

使用包装器

有时你希望修改环境的行为,而不更改核心实现。包装器 (Wrappers) 是实现此目的的完美选择——它们允许你添加功能,例如更改观测格式、添加时间限制或修改奖励,而无需触及原始环境代码。

>>> from gymnasium.wrappers import FlattenObservation

>>> # Original observation is a dictionary
>>> env = gym.make('gymnasium_env/GridWorld-v0')
>>> env.observation_space
Dict('agent': Box(0, 4, (2,), int64), 'target': Box(0, 4, (2,), int64))

>>> obs, info = env.reset()
>>> obs
{'agent': array([4, 1]), 'target': array([2, 4])}

>>> # Wrap it to flatten observations into a single array
>>> wrapped_env = FlattenObservation(env)
>>> wrapped_env.observation_space
Box(0, 4, (4,), int64)

>>> obs, info = wrapped_env.reset()
>>> obs
array([3, 0, 2, 1])  # [agent_x, agent_y, target_x, target_y]

当与期望特定输入格式(如需要一维数组而不是字典的神经网络)的算法一起工作时,这尤其有用。

高级环境特性

一旦基本功能正常,你可能想添加更复杂的功能

添加渲染

def render(self):
    """Render the environment for human viewing."""
    if self.render_mode == "human":
        # Print a simple ASCII representation
        for y in range(self.size - 1, -1, -1):  # Top to bottom
            row = ""
            for x in range(self.size):
                if np.array_equal([x, y], self._agent_location):
                    row += "A "  # Agent
                elif np.array_equal([x, y], self._target_location):
                    row += "T "  # Target
                else:
                    row += ". "  # Empty
            print(row)
        print()

参数化环境

def __init__(self, size: int = 5, reward_scale: float = 1.0, step_penalty: float = 0.0):
    self.size = size
    self.reward_scale = reward_scale
    self.step_penalty = step_penalty
    # ... rest of init ...

def step(self, action):
    # ... movement logic ...

    # Flexible reward calculation
    if terminated:
        reward = self.reward_scale  # Success reward
    else:
        reward = -self.step_penalty  # Step penalty (0 by default)

实际环境设计技巧

从简单开始,逐步增加复杂性

  1. 首先:让基本的移动和目标达成功能正常工作

  2. 然后:添加障碍物、多个目标或时间压力

  3. 最后:添加复杂动力学、部分可观测性或多智能体交互

为学习而设计

  • 明确的成功标准:智能体应该知道它何时表现良好

  • 合理的难度:不要太简单(微不足道)或太难(不可能)

  • 一致的规则:在相同状态下采取相同动作应该产生相同效果

  • 信息丰富的观测:包含最佳决策所需的一切

思考你的研究问题

  • 导航:侧重于空间推理和路径规划

  • 控制:强调动力学、稳定性和连续动作

  • 策略:包括部分信息、对手建模或长期规划

  • 优化:设计明确的权衡和资源限制

下一步

恭喜!你现在知道如何创建自定义强化学习环境了。接下来可以探索以下内容:

  1. 添加渲染以可视化你的环境(完整教程

  2. 在你的自定义环境中训练一个智能体训练指南

  3. 尝试不同的奖励函数,看看它们如何影响学习

  4. 尝试包装器组合来修改环境的行为

  5. 创建更复杂的环境,包括障碍物、多个智能体或连续动作

良好环境设计的关键在于迭代——从简单开始,彻底测试,并根据研究或应用目标逐步增加复杂性。