创建自定义环境¶
编码前:环境设计¶
创建强化学习 (RL) 环境就像设计一个视频游戏或模拟。在编写任何代码之前,你需要仔细思考想要解决的学习问题。这个设计阶段至关重要——一个设计不佳的环境将使学习变得困难甚至不可能,无论你的算法有多好。
关键设计问题¶
问自己这些基本问题:
🎯 智能体应该学习什么技能?
穿越迷宫?
平衡和控制一个系统?
优化资源分配?
玩策略游戏?
👀 智能体需要什么信息?
位置和速度?
系统当前状态?
历史数据?
部分可观测性还是完全可观测性?
🎮 智能体可以采取什么动作?
离散选择(向上/向下/向左/向右移动)?
连续控制(转向角、油门)?
多个同时动作?
🏆 我们如何衡量成功?
达到特定目标?
最小化时间或能量?
最大化得分?
避免失败?
⏰ 何时结束回合?
任务完成(成功/失败)?
时间限制?
安全约束?
网格世界 (GridWorld) 示例设计¶
对于我们的教程示例,我们将创建一个简单的网格世界 (GridWorld) 环境
🎯 技能:高效导航到目标位置
👀 信息:智能体在网格上的位置和目标位置
🎮 动作:向上、向下、向左或向右移动
🏆 成功:以最少步数到达目标
⏰ 结束:当智能体到达目标时(或可选的时间限制)
这提供了一个清晰的学习问题,它足够简单易懂,但要最优地解决却并非微不足道。
本页提供了使用 Gymnasium 创建自定义环境的完整实现。有关包含渲染功能的完整教程。
我们建议你在阅读本页之前,先熟悉基本用法!
我们将把我们的网格世界游戏实现为一个固定大小的二维方格。智能体在每个时间步可以在网格单元之间垂直或水平移动,目标是在回合开始时随机放置的目标位置导航。
环境 __init__
函数¶
像所有环境一样,我们的自定义环境将继承自 gymnasium.Env
,它定义了所有环境必须遵循的结构。其中一个要求是定义观测空间和动作空间,它们声明了此环境的有效输入(动作)和输出(观测)。
正如我们的设计所概述的,我们的智能体有四个离散动作(向四个基本方向移动),所以我们将使用 Discrete(4)
空间。
对于我们的观测,我们有几种选择。我们可以将整个网格表示为二维数组,或者使用坐标位置,甚至使用带有独立“层”的 3D 数组来表示智能体和目标。对于本教程,我们将使用简单的字典格式,例如 {"agent": array([1, 0]), "target": array([0, 3])}
,其中数组表示 x,y 坐标。
这种选择使得观测结果易于人类阅读和调试。我们将此声明为一个 Dict
空间,其中智能体和目标空间为包含整数坐标的 Box
空间。
有关可用于环境的所有可能空间的完整列表,请参阅空间
from typing import Optional
import numpy as np
import gymnasium as gym
class GridWorldEnv(gym.Env):
def __init__(self, size: int = 5):
# The size of the square grid (5x5 by default)
self.size = size
# Initialize positions - will be set randomly in reset()
# Using -1,-1 as "uninitialized" state
self._agent_location = np.array([-1, -1], dtype=np.int32)
self._target_location = np.array([-1, -1], dtype=np.int32)
# Define what the agent can observe
# Dict space gives us structured, human-readable observations
self.observation_space = gym.spaces.Dict(
{
"agent": gym.spaces.Box(0, size - 1, shape=(2,), dtype=int), # [x, y] coordinates
"target": gym.spaces.Box(0, size - 1, shape=(2,), dtype=int), # [x, y] coordinates
}
)
# Define what actions are available (4 directions)
self.action_space = gym.spaces.Discrete(4)
# Map action numbers to actual movements on the grid
# This makes the code more readable than using raw numbers
self._action_to_direction = {
0: np.array([1, 0]), # Move right (positive x)
1: np.array([0, 1]), # Move up (positive y)
2: np.array([-1, 0]), # Move left (negative x)
3: np.array([0, -1]), # Move down (negative y)
}
构建观测¶
由于我们需要在 Env.reset()
和 Env.step()
中计算观测,因此拥有一个辅助方法 _get_obs
来将环境的内部状态转换为观测格式会很方便。这使我们的代码 DRY (不要重复自己),并使其更容易在以后修改观测格式。
def _get_obs(self):
"""Convert internal state to observation format.
Returns:
dict: Observation with agent and target positions
"""
return {"agent": self._agent_location, "target": self._target_location}
我们还可以为 Env.reset()
和 Env.step()
返回的辅助信息实现类似的方法。在我们的例子中,我们将提供智能体和目标之间的曼哈顿距离——这对于调试和理解智能体进度可能很有用,但不应被学习算法本身使用。
def _get_info(self):
"""Compute auxiliary information for debugging.
Returns:
dict: Info with distance between agent and target
"""
return {
"distance": np.linalg.norm(
self._agent_location - self._target_location, ord=1
)
}
有时 info 中会包含只在 Env.step()
内部才可用的数据(如单个奖励分量、动作成功/失败等)。在这些情况下,我们会在 step 方法中直接更新 _get_info
返回的字典。
重置函数¶
reset()
方法启动一个新的回合。它接受两个可选参数:用于可复现随机生成的 seed
和用于附加配置的 options
。在第一行,你必须调用 super().reset(seed=seed)
以正确初始化随机数生成器。
在我们的网格世界环境中,reset()
随机地将智能体和目标放置在网格上,确保它们不会在同一个位置开始。我们以元组的形式返回初始观测和信息。
def reset(self, seed: Optional[int] = None, options: Optional[dict] = None):
"""Start a new episode.
Args:
seed: Random seed for reproducible episodes
options: Additional configuration (unused in this example)
Returns:
tuple: (observation, info) for the initial state
"""
# IMPORTANT: Must call this first to seed the random number generator
super().reset(seed=seed)
# Randomly place the agent anywhere on the grid
self._agent_location = self.np_random.integers(0, self.size, size=2, dtype=int)
# Randomly place target, ensuring it's different from agent position
self._target_location = self._agent_location
while np.array_equal(self._target_location, self._agent_location):
self._target_location = self.np_random.integers(
0, self.size, size=2, dtype=int
)
observation = self._get_obs()
info = self._get_info()
return observation, info
步进函数¶
step()
方法包含核心环境逻辑。它接收一个动作,更新环境状态,并返回结果。物理、游戏规则和奖励逻辑都在这里。
对于网格世界,我们需要:1. 将离散动作转换为移动方向 2. 更新智能体的位置(带边界检查)3. 根据是否达到目标计算奖励 4. 判断回合是否应该结束 5. 返回所有所需信息
def step(self, action):
"""Execute one timestep within the environment.
Args:
action: The action to take (0-3 for directions)
Returns:
tuple: (observation, reward, terminated, truncated, info)
"""
# Map the discrete action (0-3) to a movement direction
direction = self._action_to_direction[action]
# Update agent position, ensuring it stays within grid bounds
# np.clip prevents the agent from walking off the edge
self._agent_location = np.clip(
self._agent_location + direction, 0, self.size - 1
)
# Check if agent reached the target
terminated = np.array_equal(self._agent_location, self._target_location)
# We don't use truncation in this simple environment
# (could add a step limit here if desired)
truncated = False
# Simple reward structure: +1 for reaching target, 0 otherwise
# Alternative: could give small negative rewards for each step to encourage efficiency
reward = 1 if terminated else 0
observation = self._get_obs()
info = self._get_info()
return observation, reward, terminated, truncated, info
常见环境设计陷阱¶
现在你已经了解了基本结构,让我们讨论初学者常犯的错误
奖励设计问题¶
问题:仅在最后才给予奖励(稀疏奖励)
# This makes learning very difficult!
reward = 1 if terminated else 0
更好:提供中间反馈
# Option 1: Small step penalty to encourage efficiency
reward = 1 if terminated else -0.01
# Option 2: Distance-based reward shaping
distance = np.linalg.norm(self._agent_location - self._target_location)
reward = 1 if terminated else -0.1 * distance
状态表示问题¶
问题:包含不相关信息或缺少关键细节
# Too much info - agent doesn't need grid size in every observation
obs = {"agent": self._agent_location, "target": self._target_location, "size": self.size}
# Too little info - agent can't distinguish different positions
obs = {"distance": distance} # Missing actual positions!
更好:精确包含最佳决策所需的一切
# Just right - positions are sufficient for navigation
obs = {"agent": self._agent_location, "target": self._target_location}
动作空间问题¶
问题:动作不合理或无法执行
# Bad: Agent can move diagonally but environment doesn't support it
self.action_space = gym.spaces.Discrete(8) # 8 directions including diagonals
# Bad: Continuous actions for discrete movement
self.action_space = gym.spaces.Box(-1, 1, shape=(2,)) # Continuous x,y movement
边界处理错误¶
问题:允许无效状态或模糊的边界行为
# Bad: Agent can go outside the grid
self._agent_location = self._agent_location + direction # No bounds checking!
# Unclear: What happens when agent hits wall?
if np.any(self._agent_location < 0) or np.any(self._agent_location >= self.size):
# Do nothing? Reset episode? Give penalty? Unclear!
更好:清晰、一致的边界处理
# Clear: Agent stays in place when hitting boundaries
self._agent_location = np.clip(
self._agent_location + direction, 0, self.size - 1
)
注册和创建环境¶
虽然你可以立即使用你的自定义环境,但将其注册到 Gymnasium 中会更方便,这样你就可以像创建内置环境一样使用 gymnasium.make()
来创建它。
环境 ID 包含三个组成部分:可选的命名空间(此处为:gymnasium_env
)、强制名称(此处为:GridWorld
)以及可选但推荐的版本(此处为:v0)。你可以将其注册为 GridWorld-v0
、GridWorld
或 gymnasium_env/GridWorld
,但为了清晰起见,推荐使用完整格式。
由于本教程不属于 Python 包的一部分,我们直接将类作为入口点传递。在实际项目中,你通常会使用像 "my_package.envs:GridWorldEnv"
这样的字符串。
# Register the environment so we can create it with gym.make()
gym.register(
id="gymnasium_env/GridWorld-v0",
entry_point=GridWorldEnv,
max_episode_steps=300, # Prevent infinite episodes
)
有关注册自定义环境(包括使用字符串入口点)的更完整指南,请阅读完整的创建环境教程。
注册后,你可以使用 gymnasium.pprint_registry()
检查所有可用环境,并使用 gymnasium.make()
创建实例。你还可以使用 gymnasium.make_vec()
创建向量化版本。
import gymnasium as gym
# Create the environment like any built-in environment
>>> env = gym.make("gymnasium_env/GridWorld-v0")
<OrderEnforcing<PassiveEnvChecker<GridWorld<gymnasium_env/GridWorld-v0>>>>
# Customize environment parameters
>>> env = gym.make("gymnasium_env/GridWorld-v0", size=10)
>>> env.unwrapped.size
10
# Create multiple environments for parallel training
>>> vec_env = gym.make_vec("gymnasium_env/GridWorld-v0", num_envs=3)
SyncVectorEnv(gymnasium_env/GridWorld-v0, num_envs=3)
调试你的环境¶
当你的环境未按预期工作时,以下是常见的调试策略
检查环境有效性¶
from gymnasium.utils.env_checker import check_env
# This will catch many common issues
try:
check_env(env)
print("Environment passes all checks!")
except Exception as e:
print(f"Environment has issues: {e}")
使用已知动作手动测试¶
# Test specific action sequences to verify behavior
env = gym.make("gymnasium_env/GridWorld-v0")
obs, info = env.reset(seed=42) # Use seed for reproducible testing
print(f"Starting position - Agent: {obs['agent']}, Target: {obs['target']}")
# Test each action type
actions = [0, 1, 2, 3] # right, up, left, down
for action in actions:
old_pos = obs['agent'].copy()
obs, reward, terminated, truncated, info = env.step(action)
new_pos = obs['agent']
print(f"Action {action}: {old_pos} -> {new_pos}, reward={reward}")
常见调试问题¶
# Issue 1: Forgot to call super().reset()
def reset(self, seed=None, options=None):
# super().reset(seed=seed) # ❌ Missing this line
# Results in: possibly incorrect seeding
# Issue 2: Wrong action mapping
self._action_to_direction = {
0: np.array([1, 0]), # right
1: np.array([0, 1]), # up - but is this really "up" in your coordinate system?
2: np.array([-1, 0]), # left
3: np.array([0, -1]), # down
}
# Issue 3: Not handling boundaries properly
# This allows agent to go outside the grid!
self._agent_location = self._agent_location + direction # ❌ No bounds checking
使用包装器¶
有时你希望修改环境的行为,而不更改核心实现。包装器 (Wrappers) 是实现此目的的完美选择——它们允许你添加功能,例如更改观测格式、添加时间限制或修改奖励,而无需触及原始环境代码。
>>> from gymnasium.wrappers import FlattenObservation
>>> # Original observation is a dictionary
>>> env = gym.make('gymnasium_env/GridWorld-v0')
>>> env.observation_space
Dict('agent': Box(0, 4, (2,), int64), 'target': Box(0, 4, (2,), int64))
>>> obs, info = env.reset()
>>> obs
{'agent': array([4, 1]), 'target': array([2, 4])}
>>> # Wrap it to flatten observations into a single array
>>> wrapped_env = FlattenObservation(env)
>>> wrapped_env.observation_space
Box(0, 4, (4,), int64)
>>> obs, info = wrapped_env.reset()
>>> obs
array([3, 0, 2, 1]) # [agent_x, agent_y, target_x, target_y]
当与期望特定输入格式(如需要一维数组而不是字典的神经网络)的算法一起工作时,这尤其有用。
高级环境特性¶
一旦基本功能正常,你可能想添加更复杂的功能
添加渲染¶
def render(self):
"""Render the environment for human viewing."""
if self.render_mode == "human":
# Print a simple ASCII representation
for y in range(self.size - 1, -1, -1): # Top to bottom
row = ""
for x in range(self.size):
if np.array_equal([x, y], self._agent_location):
row += "A " # Agent
elif np.array_equal([x, y], self._target_location):
row += "T " # Target
else:
row += ". " # Empty
print(row)
print()
参数化环境¶
def __init__(self, size: int = 5, reward_scale: float = 1.0, step_penalty: float = 0.0):
self.size = size
self.reward_scale = reward_scale
self.step_penalty = step_penalty
# ... rest of init ...
def step(self, action):
# ... movement logic ...
# Flexible reward calculation
if terminated:
reward = self.reward_scale # Success reward
else:
reward = -self.step_penalty # Step penalty (0 by default)
实际环境设计技巧¶
从简单开始,逐步增加复杂性¶
首先:让基本的移动和目标达成功能正常工作
然后:添加障碍物、多个目标或时间压力
最后:添加复杂动力学、部分可观测性或多智能体交互
为学习而设计¶
明确的成功标准:智能体应该知道它何时表现良好
合理的难度:不要太简单(微不足道)或太难(不可能)
一致的规则:在相同状态下采取相同动作应该产生相同效果
信息丰富的观测:包含最佳决策所需的一切
思考你的研究问题¶
导航:侧重于空间推理和路径规划
控制:强调动力学、稳定性和连续动作
策略:包括部分信息、对手建模或长期规划
优化:设计明确的权衡和资源限制
下一步¶
恭喜!你现在知道如何创建自定义强化学习环境了。接下来可以探索以下内容:
添加渲染以可视化你的环境(完整教程)
在你的自定义环境中训练一个智能体(训练指南)
尝试不同的奖励函数,看看它们如何影响学习
尝试包装器组合来修改环境的行为
创建更复杂的环境,包括障碍物、多个智能体或连续动作
良好环境设计的关键在于迭代——从简单开始,彻底测试,并根据研究或应用目标逐步增加复杂性。