使用 REINFORCE 训练 Mujoco¶

- 本教程有两个目的
理解如何从零开始实现 REINFORCE [1] 以解决 Mujoco 的 InvertedPendulum-v4
使用 Gymnasium v0.26+ 的 step() 函数实现深度强化学习算法
我们将使用 REINFORCE,这是最早的策略梯度方法之一。与先学习价值函数再从中导出策略的繁琐过程不同,REINFORCE 直接优化策略。换句话说,它被训练为最大化蒙特卡洛回报的概率。稍后会详细介绍。
倒立摆是 Mujoco 的 cartpole,但现在由 Mujoco 物理模拟器驱动 - 这允许更复杂的实验(例如改变重力的影响)。此环境涉及一个可以线性移动的小车,杆子的一端固定在小车上,另一端自由。小车可以向左或向右推动,目标是通过对小车施加力来平衡小车顶部的杆子。有关环境的更多信息,请访问 https://gymnasium.org.cn/environments/mujoco/inverted_pendulum/
训练目标:平衡小车顶部的杆子(倒立摆)
动作:智能体接受 1D 向量作为动作。动作空间是 [-3, 3]
中的连续 (action)
,其中 action 表示施加到小车的数值力(幅度表示力的大小,符号表示方向)
方法:我们使用 PyTorch 从零开始编写 REINFORCE,以训练神经网络策略来掌握倒立摆。
Gymnasium v0.26+ Env.step() 函数的解释
env.step(A)
允许我们在当前环境 ‘env’ 中采取动作 ‘A’。环境随后执行该动作并返回五个变量
next_obs
:这是智能体在采取动作后将收到的观测。
reward
:这是智能体在采取动作后将收到的奖励。
terminated
:这是一个布尔变量,指示环境是否已终止。
truncated
:这也是一个布尔变量,指示 episode 是否因提前截断而结束,即是否达到时间限制。
info
:这是一个字典,可能包含有关环境的附加信息。
from __future__ import annotations
import random
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import torch
import torch.nn as nn
from torch.distributions.normal import Normal
import gymnasium as gym
plt.rcParams["figure.figsize"] = (10, 5)
策略网络¶

我们首先构建一个智能体将使用 REINFORCE 学习的策略。策略是从当前环境观测到要采取的动作的概率分布的映射。本教程中使用的策略由神经网络参数化。它由 2 个线性层组成,这两个线性层在预测的均值和标准差之间共享。此外,单个线性层用于估计均值和标准差。 nn.Tanh
用作隐藏层之间的非线性。以下函数估计正态分布的均值和标准差,从中采样动作。因此,期望策略学习适当的权重以基于当前观测输出均值和标准差。
class Policy_Network(nn.Module):
"""Parametrized Policy Network."""
def __init__(self, obs_space_dims: int, action_space_dims: int):
"""Initializes a neural network that estimates the mean and standard deviation
of a normal distribution from which an action is sampled from.
Args:
obs_space_dims: Dimension of the observation space
action_space_dims: Dimension of the action space
"""
super().__init__()
hidden_space1 = 16 # Nothing special with 16, feel free to change
hidden_space2 = 32 # Nothing special with 32, feel free to change
# Shared Network
self.shared_net = nn.Sequential(
nn.Linear(obs_space_dims, hidden_space1),
nn.Tanh(),
nn.Linear(hidden_space1, hidden_space2),
nn.Tanh(),
)
# Policy Mean specific Linear Layer
self.policy_mean_net = nn.Sequential(
nn.Linear(hidden_space2, action_space_dims)
)
# Policy Std Dev specific Linear Layer
self.policy_stddev_net = nn.Sequential(
nn.Linear(hidden_space2, action_space_dims)
)
def forward(self, x: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]:
"""Conditioned on the observation, returns the mean and standard deviation
of a normal distribution from which an action is sampled from.
Args:
x: Observation from the environment
Returns:
action_means: predicted mean of the normal distribution
action_stddevs: predicted standard deviation of the normal distribution
"""
shared_features = self.shared_net(x.float())
action_means = self.policy_mean_net(shared_features)
action_stddevs = torch.log(
1 + torch.exp(self.policy_stddev_net(shared_features))
)
return action_means, action_stddevs
构建智能体¶

现在我们已经完成了策略的构建,让我们开发 REINFORCE,它赋予策略网络生命。REINFORCE 的算法可以在上面找到。如前所述,REINFORCE 旨在最大化蒙特卡洛回报。
有趣的事实:REINFORCE 是 “ ‘RE’ward ‘I’ncrement ‘N’on-negative ‘F’actor times ‘O’ffset ‘R’einforcement times ‘C’haracteristic ‘E’ligibility” 的首字母缩写
注意:超参数的选择是为了训练一个性能尚可的智能体。没有进行广泛的超参数调整。
class REINFORCE:
"""REINFORCE algorithm."""
def __init__(self, obs_space_dims: int, action_space_dims: int):
"""Initializes an agent that learns a policy via REINFORCE algorithm [1]
to solve the task at hand (Inverted Pendulum v4).
Args:
obs_space_dims: Dimension of the observation space
action_space_dims: Dimension of the action space
"""
# Hyperparameters
self.learning_rate = 1e-4 # Learning rate for policy optimization
self.gamma = 0.99 # Discount factor
self.eps = 1e-6 # small number for mathematical stability
self.probs = [] # Stores probability values of the sampled action
self.rewards = [] # Stores the corresponding rewards
self.net = Policy_Network(obs_space_dims, action_space_dims)
self.optimizer = torch.optim.AdamW(self.net.parameters(), lr=self.learning_rate)
def sample_action(self, state: np.ndarray) -> float:
"""Returns an action, conditioned on the policy and observation.
Args:
state: Observation from the environment
Returns:
action: Action to be performed
"""
state = torch.tensor(np.array([state]))
action_means, action_stddevs = self.net(state)
# create a normal distribution from the predicted
# mean and standard deviation and sample an action
distrib = Normal(action_means[0] + self.eps, action_stddevs[0] + self.eps)
action = distrib.sample()
prob = distrib.log_prob(action)
action = action.numpy()
self.probs.append(prob)
return action
def update(self):
"""Updates the policy network's weights."""
running_g = 0
gs = []
# Discounted return (backwards) - [::-1] will return an array in reverse
for R in self.rewards[::-1]:
running_g = R + self.gamma * running_g
gs.insert(0, running_g)
deltas = torch.tensor(gs)
log_probs = torch.stack(self.probs)
# Calculate the mean of log probabilities for all actions in the episode
log_prob_mean = log_probs.mean()
# Update the loss with the mean log probability and deltas
# Now, we compute the correct total loss by taking the sum of the element-wise products.
loss = -torch.sum(log_prob_mean * deltas)
# Update the policy network
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# Empty / zero out all episode-centric/related variables
self.probs = []
self.rewards = []
现在让我们使用 REINFORCE 训练策略,以掌握倒立摆的任务。
以下是训练过程的概述
- 对于随机种子中的每个种子
重新初始化智能体
- 对于最大 episode 数范围内的每个 episode
- 直到 episode 完成
基于当前观测采样动作
采取动作并接收奖励和下一个观测
存储采取的动作、其概率和观测到的奖励
更新策略
注意:深度强化学习在许多常见用例中对随机种子相当敏感 (https://spinningup.openai.com/en/latest/spinningup/spinningup.html)。因此,测试各种种子非常重要,我们将这样做。
# Create and wrap the environment
env = gym.make("InvertedPendulum-v4")
wrapped_env = gym.wrappers.RecordEpisodeStatistics(env, 50) # Records episode-reward
total_num_episodes = int(5e3) # Total number of episodes
# Observation-space of InvertedPendulum-v4 (4)
obs_space_dims = env.observation_space.shape[0]
# Action-space of InvertedPendulum-v4 (1)
action_space_dims = env.action_space.shape[0]
rewards_over_seeds = []
for seed in [1, 2, 3, 5, 8]: # Fibonacci seeds
# set seed
torch.manual_seed(seed)
random.seed(seed)
np.random.seed(seed)
# Reinitialize agent every seed
agent = REINFORCE(obs_space_dims, action_space_dims)
reward_over_episodes = []
for episode in range(total_num_episodes):
# gymnasium v26 requires users to set seed while resetting the environment
obs, info = wrapped_env.reset(seed=seed)
done = False
while not done:
action = agent.sample_action(obs)
# Step return type - `tuple[ObsType, SupportsFloat, bool, bool, dict[str, Any]]`
# These represent the next observation, the reward from the step,
# if the episode is terminated, if the episode is truncated and
# additional info from the step
obs, reward, terminated, truncated, info = wrapped_env.step(action)
agent.rewards.append(reward)
# End the episode when either truncated or terminated is true
# - truncated: The episode duration reaches max number of timesteps
# - terminated: Any of the state space values is no longer finite.
done = terminated or truncated
reward_over_episodes.append(wrapped_env.return_queue[-1])
agent.update()
if episode % 1000 == 0:
avg_reward = int(np.mean(wrapped_env.return_queue))
print("Episode:", episode, "Average Reward:", avg_reward)
rewards_over_seeds.append(reward_over_episodes)
绘制学习曲线¶
rewards_to_plot = [[reward[0] for reward in rewards] for rewards in rewards_over_seeds]
df1 = pd.DataFrame(rewards_to_plot).melt()
df1.rename(columns={"variable": "episodes", "value": "reward"}, inplace=True)
sns.set(style="darkgrid", context="talk", palette="rainbow")
sns.lineplot(x="episodes", y="reward", data=df1).set(
title="REINFORCE for InvertedPendulum-v4"
)
plt.show()

作者:Siddarth Chandrasekar
许可证:MIT 许可证
参考文献¶
[1] Williams, Ronald J.. “Simple statistical gradient-following algorithms for connectionist reinforcement learning.” Machine Learning 8 (2004): 229-256.