首页/知识库/Actor-Critic:A2C 与 A3C

Actor-Critic:A2C 与 A3C

✍️ AI Master📅 创建 2026-04-12📖 20 min 阅读
💡

文章摘要

结合值方法和策略梯度,理解 Actor-Critic 架构的优势

1Actor-Critic 架构:策略网络 + 值网络

Actor-Critic 是强化学习中将值方法和策略梯度方法融合的经典架构。它包含两个核心组件:Actor(策略网络)负责根据当前状态选择动作,本质上是一个参数化的随机策略 pi(a|s; theta);Critic(值网络)负责评估 Actor 选择动作的好坏,通常估计状态值 V(s; w) 或动作值 Q(s,a; w)。

与纯策略梯度方法(如 REINFORCE)相比,Actor-Critic 的最大优势在于引入了 Critic 来降低梯度估计的方差。REINFORCE 依赖完整 episode 的回报,方差大、学习慢。Critic 提供了即时的评估信号,使得 Actor 可以逐步更新而非等待 episode 结束。同时,Actor-Critic 保持了策略梯度方法的天然优势:可以处理连续动作空间,并且策略本身是参数化的概率分布。

python
import torch
import torch.nn as nn
import torch.nn.functional as F

class ActorCriticNetwork(nn.Module):
    """Actor-Critic 共享特征提取网络"""
    def __init__(self, state_dim: int, action_dim: int, hidden_dim: int = 128):
        super().__init__()
        self.shared = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
        )
        self.actor_head = nn.Linear(hidden_dim, action_dim)
        self.critic_head = nn.Linear(hidden_dim, 1)

    def forward(self, state):
        features = self.shared(state)
        action_probs = F.softmax(self.actor_head(features), dim=-1)
        state_value = self.critic_head(features)
        return action_probs, state_value
python
class ActorCriticAgent:
    def __init__(self, state_dim: int, action_dim: int, lr: float = 3e-4):
        self.network = ActorCriticNetwork(state_dim, action_dim)
        self.optimizer = torch.optim.Adam(self.network.parameters(), lr=lr)

    def select_action(self, state):
        state_tensor = torch.FloatTensor(state).unsqueeze(0)
        action_probs, state_value = self.network(state_tensor)
        dist = torch.distributions.Categorical(action_probs)
        action = dist.sample()
        return action.item(), dist.log_prob(action), state_value

    def compute_returns(self, rewards, dones, gamma=0.99):
        """Compute discounted returns for Critic evaluation"""
        returns = []
        R = 0
        for reward, done in zip(reversed(rewards), reversed(dones)):
            if done:
                R = 0
            R = reward + gamma * R
            returns.insert(0, R)
        return returns
组件网络输出训练目标对应传统方法

Actor

动作概率分布 pi(a|s)

最大化优势函数加权对数概率

策略梯度方法

Critic

状态值 V(s) 或 Q(s,a)

最小化时序差分误差

值函数方法 (TD/Q-learning)

共享特征层

状态表征向量

被两个头的梯度共同更新

多任务学习的共享表示

优势函数 A(s,a)

由 Critic 推导

衡量动作相对平均水平的优劣

REINFORCE with baseline

共享特征提取层是 Actor-Critic 设计的关键。Actor 和 Critic 需要理解相同的环境信息,共享表示可以提高样本效率。但当两者目标差异过大时,可以考虑独立网络。

Actor 和 Critic 的学习速率需要仔细调节。Critic 学习太快会导致 Actor 的梯度信号不稳定;学习太慢则无法提供有效的评估反馈。通常 Critic 的学习速率略高于 Actor。

2优势函数 Advantage:减少方差的基线技巧

优势函数 A(s,a) 是 Actor-Critic 架构中最核心的概念之一。它的定义是:A(s,a) = Q(s,a) - V(s),表示在状态 s 下采取动作 a 相比于该状态平均水平的优势。当 A(s,a) > 0 时,说明这个动作比平均水平好,应该增加其概率;当 A(s,a) < 0 时,说明这个动作不如平均水平,应该降低其概率。

在策略梯度定理中引入优势函数作为基线(baseline),可以显著降低梯度估计的方差。数学上,策略梯度可以写作:gradient = E[log pi(a|s) * A(s,a)]。基线的选择不影响梯度的无偏性(因为 E[log pi(a|s)] = 0),但选择合适的基线可以大幅降低方差。在实践中,V(s) 是最常用的基线估计器,通过 Critic 网络学习得到。

python
class AdvantageCalculator:
    """Compute different forms of advantage function"""

    @staticmethod
    def td_advantage(rewards, values, dones, gamma=0.99):
        """Single-step TD error as advantage estimate
        A(s,a) = r + gamma*V(s') - V(s) = TD error
        """
        advantages = []
        for t in range(len(rewards)):
            if dones[t]:
                adv = rewards[t] - values[t]
            else:
                adv = rewards[t] + gamma * values[t + 1] - values[t]
            advantages.append(adv)
        return advantages

    @staticmethod
    def nstep_advantage(rewards, values, dones, gamma=0.99, n=5):
        """n-step advantage: trade-off between bias and variance
        Larger n reduces bias but increases variance
        """
        advantages = []
        for t in range(len(rewards)):
            G = 0
            for i in range(n):
                if t + i >= len(rewards):
                    break
                if i > 0 and dones[t + i - 1]:
                    break
                G += (gamma ** i) * rewards[t + i]
            if t + n < len(values) and not dones[t + n - 1]:
                G += (gamma ** n) * values[t + n]
            advantages.append(G - values[t])
        return advantages
python
def actor_critic_update(
    log_probs: list,
    advantages: list,
    critic_losses: list,
    optimizer: torch.optim.Optimizer,
    entropy_coef: float = 0.01
):
    """Execute one Actor-Critic update step

    Args:
        log_probs: log pi(a|s) for each step
        advantages: advantage estimate for each step
        critic_losses: Critic MSE loss for each step
        optimizer: optimizer
        entropy_coef: entropy regularization coefficient
    """
    actor_loss = -torch.stack([
        lp.detach() * adv for lp, adv in zip(log_probs, advantages)
    ]).mean()

    critic_loss = torch.stack(critic_losses).mean()

    entropy = -torch.stack([
        torch.sum(torch.exp(lp) * lp) for lp in log_probs
    ]).mean()

    total_loss = actor_loss + critic_loss - entropy_coef * entropy

    optimizer.zero_grad()
    total_loss.backward()
    torch.nn.utils.clip_grad_norm_(optimizer.param_groups[0]["params"], 0.5)
    optimizer.step()

    return actor_loss.item(), critic_loss.item(), entropy.item()
优势估计方法公式偏差方差适用场景

单步 TD

r + gamma*V(s') - V(s)

较高

在线学习,快速更新

蒙特卡洛

G_t - V(s)

无偏

很高

回合结束更新

n 步回报

G_t^{(n)} - V(s)

中等

中等

通用,n 是可调超参

GAE

加权多步 TD 误差

可调

可调

最先进的 Actor-Critic 算法

V 作为基线

Q(s,a) - V(s)

依赖 Q 精度

低于 MC

理论推导常用

优势函数比直接用回报更新策略更稳定。直觉:它告诉 Agent 这个动作是好于还是差于平均水平,而不是简单地告诉回报是多少。

如果 Critic 的 V(s) 估计非常不准确(比如训练初期),优势函数会引入系统性偏差。训练初期可以考虑用更大的 Critic 学习率或预热策略。

3A2C:同步优势 Actor-Critic

A2C(Advantage Actor-Critic)是 Actor-Critic 的同步版本,由 OpenAI 在 2017 年提出。它的核心思想是使用多个并行的环境副本(通常是 8-32 个)同时收集经验,然后同步更新共享的 Actor-Critic 网络。

与 A3C 的异步更新不同,A2C 的每个 worker 收集固定步数的经验后,等待所有 worker 完成,然后计算平均梯度进行一次全局更新。这种同步方式避免了 A3C 中异步更新导致的策略陈旧(stale gradients)问题,在 GPU 上也能更高效地批处理。A2C 在 Atari 游戏、MuJoCo 连续控制等任务上都展现出了优秀的性能和稳定性。

python
class A2CAgent:
    def __init__(self, state_dim, action_dim, num_envs=8,
                 n_steps=5, lr=7e-4, gamma=0.99,
                 entropy_coef=0.01, value_coef=0.5):
        self.network = ActorCriticNetwork(state_dim, action_dim)
        self.optimizer = torch.optim.Adam(self.network.parameters(), lr=lr)
        self.num_envs = num_envs
        self.n_steps = n_steps
        self.gamma = gamma
        self.entropy_coef = entropy_coef
        self.value_coef = value_coef

    def collect_rollout(self, envs):
        """Synchronous rollout collection from multiple envs"""
        states = envs.reset()
        states_list, actions_list, rewards_list = [], [], []
        values_list, log_probs_list, dones_list = [], [], []

        for _ in range(self.n_steps):
            states_tensor = torch.FloatTensor(states)
            probs, values = self.network(states_tensor)
            values = values.squeeze(-1)

            dist = torch.distributions.Categorical(probs)
            actions = dist.sample()
            log_probs = dist.log_prob(actions)

            states_list.append(states.copy())
            actions_list.append(actions.numpy().copy())
            values_list.append(values.detach().numpy().copy())
            log_probs_list.append(log_probs.detach().numpy().copy())

            next_states, rewards, terminated, truncated, _ = envs.step(actions.numpy())
            rewards_list.append(rewards.copy())
            dones_list.append((terminated | truncated).copy())
            states = next_states

        last_values = self.network(torch.FloatTensor(states))[1]
        last_values = last_values.squeeze(-1).detach().numpy()
        return (states_list, actions_list, rewards_list,
                values_list, log_probs_list, dones_list, last_values)
python
    def compute_gae(self, rewards, values, dones, last_values,
                    gamma=0.99, gae_lambda=0.95):
        """Generalized Advantage Estimation"""
        advantages = []
        gae = 0
        for t in reversed(range(len(rewards))):
            next_val = last_values if t == len(rewards) - 1 else values[t + 1]
            mask = 1.0 - dones[t]
            delta = rewards[t] + gamma * next_val * mask - values[t]
            gae = delta + gamma * gae_lambda * mask * gae
            advantages.insert(0, gae)
        advantages = np.array(advantages)
        returns = advantages + np.array(values)
        return advantages, returns

    def update(self, rollout_data):
        """Update network using collected experience"""
        states, actions, rewards, values, log_probs, dones, last_values = rollout_data
        advantages, returns = self.compute_gae(
            rewards, values, dones, last_values, self.gamma
        )

        states_t = torch.FloatTensor(np.array(states).reshape(-1, states[0].shape[-1]))
        actions_t = torch.LongTensor(np.array(actions).flatten())
        advantages_t = torch.FloatTensor(advantages.flatten())
        returns_t = torch.FloatTensor(returns.flatten())

        probs, state_values = self.network(states_t)
        dist = torch.distributions.Categorical(probs)
        new_log_probs = dist.log_prob(actions_t)
        entropy = dist.entropy().mean()

        actor_loss = -(new_log_probs * advantages_t).mean()
        critic_loss = F.mse_loss(state_values.squeeze(-1), returns_t)
        total_loss = actor_loss + self.value_coef * critic_loss - self.entropy_coef * entropy

        self.optimizer.zero_grad()
        total_loss.backward()
        torch.nn.utils.clip_grad_norm_(self.network.parameters(), 0.5)
        self.optimizer.step()

        return actor_loss.item(), critic_loss.item()
超参数推荐值影响调节建议

n_steps

5

每次更新的 rollout 长度

较小值更新频繁,较大值更接近 MC

num_envs

8-16

并行环境数量

越多样本多样性越高,但内存消耗大

gamma

0.99

折扣因子

长程任务用接近 1 的值

gae_lambda

0.95

GAE 的偏差-方差权衡参数

接近 1 降低偏差但增加方差

entropy_coef

0.01

熵正则化强度

防止过早收敛,复杂任务可增大

value_coef

0.5

Critic 损失的权重

控制值函数学习速度

A2C 的 num_envs 选择很关键:太少会导致训练不稳定(样本方差大),太多会浪费计算资源。从 8 个开始实验,根据任务复杂度调整。

A2C 的同步更新意味着慢的 worker 会拖慢整体速度。确保所有环境的 episode 长度相近,否则会出现严重的等待时间浪费。

4A3C:异步优势 Actor-Critic

A3C(Asynchronous Advantage Actor-Critic)是 DeepMind 在 2016 年提出的里程碑式算法。它的核心创新在于使用多个异步 worker,每个 worker 独立地与环境交互、计算梯度并更新全局共享网络。这种异步架构不需要 GPU,在多核 CPU 上就能高效运行。

A3C 的异步更新有一个隐含的正则化效果:不同 worker 探索环境的不同部分,它们的梯度天然具有多样性,相当于对策略更新施加了隐式的噪声。这种噪声类似于小批量 SGD 中的随机性,有助于逃离局部最优。但这也带来了策略陈旧(stale gradients)问题:当 worker 的梯度到达时,全局参数可能已经被其他 worker 更新过多次。DeepMind 建议最多使用 16 个异步 worker。

python
import threading

class A3CWorker(threading.Thread):
    """A3C asynchronous worker thread"""

    def __init__(self, worker_id, global_network, optimizer,
                 env_id, max_steps=1000, gamma=0.99):
        super().__init__(daemon=True)
        self.worker_id = worker_id
        self.global_network = global_network
        self.local_network = ActorCriticNetwork(
            state_dim=global_network.state_dim,
            action_dim=global_network.action_dim
        )
        self.local_network.load_state_dict(global_network.state_dict())
        self.optimizer = optimizer
        self.env = gym.make(env_id)
        self.max_steps = max_steps
        self.gamma = gamma

    def run(self):
        state, _ = self.env.reset()
        total_steps = 0
        while total_steps < self.max_steps:
            states, actions, rewards, values, log_probs = [], [], [], [], []
            done = False

            while not done:
                probs, value = self.local_network(torch.FloatTensor(state).unsqueeze(0))
                dist = torch.distributions.Categorical(probs.squeeze(0))
                action = dist.sample()

                next_state, reward, terminated, truncated, _ = self.env.step(action.item())
                done = terminated or truncated

                states.append(state)
                actions.append(action.item())
                rewards.append(reward)
                values.append(value.item())
                log_probs.append(dist.log_prob(action))
                state = next_state

            R = 0 if done else self.local_network(
                torch.FloatTensor(state).unsqueeze(0)
            )[1].item()
            returns = self._compute_returns(rewards, R)
            self._update_network(states, actions, log_probs, values, returns)
            self.local_network.load_state_dict(self.global_network.state_dict())
            total_steps += len(rewards)
python
    def _compute_returns(self, rewards, last_value):
        returns = []
        R = last_value
        for r in reversed(rewards):
            R = r + self.gamma * R
            returns.insert(0, R)
        return returns

    def _update_network(self, states, actions, log_probs, values, returns):
        all_states = torch.FloatTensor(np.array(states))
        probs, state_vals = self.local_network(all_states)
        dist = torch.distributions.Categorical(probs)
        new_log_probs = dist.log_prob(torch.LongTensor(actions))

        advantages = np.array(returns) - np.array(values)
        advantages_t = torch.FloatTensor(advantages)
        returns_t = torch.FloatTensor(returns)

        actor_loss = -(new_log_probs * advantages_t).mean()
        critic_loss = F.mse_loss(state_vals.squeeze(-1), returns_t)
        entropy = dist.entropy().mean()
        total_loss = actor_loss + 0.5 * critic_loss - 0.01 * entropy

        self.local_network.zero_grad()
        total_loss.backward()

        for local_param, global_param in zip(
            self.local_network.parameters(),
            self.global_network.parameters()
        ):
            if global_param.grad is not None:
                global_param.grad = local_param.grad
            else:
                global_param._grad = local_param.grad

        self.optimizer.step()

    def stop(self):
        self.env.close()
对比维度A3C(异步)A2C(同步)

更新方式

异步独立更新全局网络

同步收集经验后批量更新

硬件需求

多核 CPU(不需要 GPU)

GPU 友好(批量计算)

梯度陈旧

存在(worker 使用旧参数)

不存在(统一快照)

隐式正则化

异步噪声相当于正则化

需要显式熵正则化

代码复杂度

高(需要线程管理、锁)

低(标准批量训练)

训练效率

CPU 集群上高效

GPU 上更高效

现代地位

已被 A2C/PPO 取代

主流基线算法

现代强化学习中,A2C 和 PPO 已经基本取代了 A3C。了解 A3C 的价值在于理解异步训练的思想,这在分布式训练中仍然很重要。

A3C 的线程安全是一个常被忽略的坑。PyTorch 的 parameter.grad 在多写场景下可能导致梯度累加错误。建议使用参数锁或改用 A2C 的同步更新。

5GAE:广义优势估计

GAE(Generalized Advantage Estimation)由 Schulman 等人在 2015 年提出,是现代 Actor-Critic 算法(PPO、A3C 等)中计算优势函数的标准方法。GAE 的核心思想是对不同步数的 TD 误差进行指数加权平均,通过一个参数 lambda 在偏差和方差之间灵活权衡。

当 lambda = 0 时,GAE 退化为单步 TD 误差,偏差大但方差小;当 lambda = 1 时,GAE 等价于蒙特卡洛回报减去值函数,无偏但方差大。通过选择 lambda 在 0.9 到 0.95 之间,GAE 能在偏差和方差之间取得极好的平衡。数学上,GAE 的公式为:A_GAE(s,a) = sum_{l=0}^{inf} (gamma * lambda)^l * delta_{t+l},其中 delta_t = r_t + gamma * V(s_{t+1}) - V(s_t) 是 TD 误差。

python
def compute_gae(rewards, values, dones, last_value,
                gamma=0.99, gae_lambda=0.95):
    """Generalized Advantage Estimation (GAE)

    Args:
        rewards: T-step rewards (T,)
        values: T-step state value estimates (T,)
        dones: T-step termination flags (T,)
        last_value: V(s) estimate for the last step
        gamma: discount factor
        gae_lambda: GAE bias-variance trade-off parameter

    Returns:
        advantages: GAE advantages (T,)
        returns: return targets (T,)
    """
    T = len(rewards)
    advantages = np.zeros(T)
    gae = 0

    for t in reversed(range(T)):
        if t == T - 1:
            next_value = last_value
        else:
            next_value = values[t + 1]

        non_terminal = 1.0 - dones[t]
        delta = rewards[t] + gamma * next_value * non_terminal - values[t]
        gae = delta + gamma * gae_lambda * non_terminal * gae
        advantages[t] = gae

    returns = advantages + np.array(values)
    return advantages, returns
python
def analyze_gae_tradeoff(rewards, values, dones, last_value, gamma=0.99):
    """Analyze GAE statistical properties under different lambda values"""
    lambdas = [0.0, 0.5, 0.9, 0.95, 1.0]
    results = {}

    for lam in lambdas:
        advs, _ = compute_gae(rewards, values, dones, last_value,
                              gamma=gamma, gae_lambda=lam)
        results[lam] = {
            "mean_advantage": np.mean(advs),
            "std_advantage": np.std(advs),
            "max_abs_advantage": np.max(np.abs(advs)),
            "smoothness": np.mean(np.abs(np.diff(advs))),
        }

    print(f"{'lambda':>8} | {'Mean':>10} | {'Std':>10} | {'Max|A|':>10} | {'Smooth':>10}")
    print("-" * 58)
    for lam, stats in results.items():
        print(f"{lam:>8.2f} | {stats['mean_advantage']:>10.4f} | "
              f"{stats['std_advantage']:>10.4f} | "
              f"{stats['max_abs_advantage']:>10.4f} | "
              f"{stats['smoothness']:>10.4f}")
    return results


def explain_gae():
    """Explain the mathematical intuition of GAE"""
    # lambda=0: single-step TD error only
    # A = delta_t = r_t + gamma*V(s_{t+1}) - V(s_t)
    # High bias (only looks one step ahead), low variance

    # lambda=1: full multi-step return
    # A = sum_{l=0}^{T-t-1} gamma^l * delta_{t+l}
    # Unbiased (considers all future rewards), high variance

    # lambda=0.95: exponentially decaying weights
    # A = delta_t + 0.95*gamma*delta_{t+1} + 0.95^2*gamma^2*delta_{t+2} + ...
    # Recent TD errors weighted more, distant ones decay
    # Best balance of bias and variance
    pass
参数lambda=0lambda=0.5lambda=0.95lambda=1.0

偏差

高(只看下一步)

中等

最低(等价 MC)

方差

最低

中等

中等偏高

最高

计算效率

最高(O(T))

O(T)

O(T)

O(T)

对 V 的依赖

强(V 不准则偏差大)

中等

中等

推荐场景

训练初期快速更新

简单任务

大多数复杂任务(推荐)

对偏差极度敏感的场景

GAE 的 lambda=0.95 是一个经验性的黄金值,在大多数任务中都能取得很好的效果。只有在你明确知道偏差或方差是瓶颈时,才需要调整这个参数。

GAE 计算中 dones 标志的处理很关键。如果 done 时没有正确截断(设置 non_terminal = 0),会导致跨 episode 的 bootstrap 污染,严重影响训练。

6连续动作空间:从离散到连续

前面讨论的 A2C 和 A3C 主要针对离散动作空间,输出动作的概率分布。但在许多实际控制问题中(如机器人关节控制、自动驾驶方向盘),动作是连续的。这就需要将策略网络的输出从分类分布改为连续分布,通常是多维高斯分布(Gaussian Distribution)。

连续策略 pi(a|s) 由高斯分布的参数化决定:均值 mu(s) 和标准差 sigma(s)。网络输出均值 mu(直接由线性层生成),标准差可以设为可学习参数或输入依赖。采样动作 a = mu + sigma * epsilon,其中 epsilon ~ N(0,1)。使用 reparameterization trick(重参数化技巧),可以使得采样过程可微,从而使用标准的反向传播进行梯度计算。

python
class ContinuousActorCriticNetwork(nn.Module):
    """Actor-Critic network for continuous action spaces"""
    def __init__(self, state_dim: int, action_dim: int,
                 hidden_dim: int = 256, log_std_init: float = -0.5):
        super().__init__()
        self.action_dim = action_dim

        self.shared = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.LayerNorm(hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
        )

        self.mu_head = nn.Linear(hidden_dim, action_dim)
        self.log_std = nn.Parameter(torch.ones(action_dim) * log_std_init)

        self.critic_head = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1),
        )

    def forward(self, state):
        features = self.shared(state)
        mu = self.mu_head(features)
        std = torch.exp(self.log_std).expand_as(mu)
        state_value = self.critic_head(features)
        return mu, std, state_value

    def get_action(self, state, deterministic=False):
        mu, std, value = self.forward(state)
        if deterministic:
            return mu, torch.tensor(0.0), value
        dist = torch.distributions.Normal(mu, std)
        action = dist.rsample()
        log_prob = dist.log_prob(action).sum(dim=-1)
        return torch.tanh(action), log_prob, value
python
class ContinuousA2C:
    def __init__(self, state_dim, action_dim, lr=3e-4):
        self.network = ContinuousActorCriticNetwork(state_dim, action_dim)
        self.optimizer = torch.optim.Adam(self.network.parameters(), lr=lr)
        self.gamma = 0.99
        self.gae_lambda = 0.95

    def compute_loss(self, states, actions, advantages, returns):
        mu, std, values = self.network(states)
        dist = torch.distributions.Normal(mu, std)

        new_log_probs = dist.log_prob(actions).sum(dim=-1)
        actor_loss = -(new_log_probs * advantages).mean()

        critic_loss = F.mse_loss(values.squeeze(-1), returns)
        entropy = dist.entropy().sum(dim=-1).mean()
        total_loss = actor_loss + 0.5 * critic_loss - 0.005 * entropy

        self.optimizer.zero_grad()
        total_loss.backward()
        torch.nn.utils.clip_grad_norm_(self.network.parameters(), 0.5)
        self.optimizer.step()

        return actor_loss.item(), critic_loss.item(), entropy.item()

    def train_on_batch(self, batch):
        states = torch.FloatTensor(batch["states"])
        actions = torch.FloatTensor(batch["actions"])
        advantages = torch.FloatTensor(batch["advantages"])
        returns = torch.FloatTensor(batch["returns"])
        return self.compute_loss(states, actions, advantages, returns)
特性离散动作(分类分布)连续动作(高斯分布)

网络输出

动作概率(softmax)

均值 mu + 标准差 sigma

动作采样

Categorical(probs).sample()

Normal(mu, std).rsample()

对数概率

log P(a)

log N(a; mu, sigma)

重参数化

不可微(不能 backprop)

可微(rsample 支持)

典型环境

Atari, CartPole

Pendulum, MuJoCo, Humanoid

探索方式

概率分布的随机性

标准差控制探索范围

连续策略的标准差初始值很关键:太大导致早期探索过于随机,太小导致策略陷入局部最优。建议 log_std 初始化为 -0.5 到 0.0 之间。

使用 tanh 压缩动作后,需要额外的修正项来计算正确的对数概率(雅可比行列式修正)。否则策略梯度的方向可能不正确,导致训练不稳定。

7PyTorch 实战:Pendulum 环境

Pendulum(倒立摆)是 Gymnasium 中最经典的连续控制环境之一。任务是通过施加力矩让一个摆锤从下垂状态摆动到直立状态并保持平衡。状态包含摆锤的角度(cos 和 sin 表示)、角速度;动作是施加的力矩(范围 -2 到 +2);奖励是 -(theta^2 + 0.1theta_dot^2 + 0.001action^2),越接近直立奖励越高。

我们将用 A2C 算法在 Pendulum 环境上进行完整的训练。这个实现包含环境包装(动作归一化、帧堆叠可选)、GAE 优势计算、批量更新和训练循环。Pendulum 的难度适中,A2C 通常在 1000-2000 个 episode 内可以学会基本的摆起和平衡策略。

python
import gymnasium as gym
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F

# === Environment Setup ===
def make_env(env_id="Pendulum-v1", seed=42):
    env = gym.make(env_id)
    return env

# === Network Definition ===
class A2CPendulumNetwork(nn.Module):
    def __init__(self, state_dim=3, action_dim=1, hidden_dim=128):
        super().__init__()
        self.shared = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.Tanh(),
        )
        self.mu_head = nn.Linear(hidden_dim, action_dim)
        self.log_std = nn.Parameter(torch.zeros(action_dim))
        self.value_head = nn.Linear(hidden_dim, 1)

    def forward(self, state):
        x = self.shared(state)
        mu = 2.0 * torch.tanh(self.mu_head(x))
        std = torch.exp(self.log_std).clamp(0.1, 2.0)
        value = self.value_head(x)
        return mu, std, value

# === Agent ===
class A2CPendulumAgent:
    def __init__(self, lr=3e-4, gamma=0.99, gae_lambda=0.95):
        self.network = A2CPendulumNetwork()
        self.optimizer = torch.optim.Adam(self.network.parameters(), lr=lr)
        self.gamma = gamma
        self.gae_lambda = gae_lambda
        self.rollout_states = []
        self.rollout_actions = []
        self.rollout_rewards = []
        self.rollout_log_probs = []
        self.rollout_values = []
        self.rollout_dones = []
python
    def select_action(self, state):
        state_t = torch.FloatTensor(state).unsqueeze(0)
        mu, std, value = self.network(state_t)
        dist = torch.distributions.Normal(mu, std)
        action = dist.rsample().clamp(-2.0, 2.0)
        log_prob = dist.log_prob(action).sum(-1)
        return action.squeeze(0), log_prob.item(), value.item()

    def compute_gae(self, last_value):
        rewards, values, dones = (self.rollout_rewards,
                                   self.rollout_values,
                                   self.rollout_dones)
        T = len(rewards)
        advantages = np.zeros(T)
        gae = 0
        for t in reversed(range(T)):
            next_v = last_value if t == T - 1 else values[t + 1]
            non_term = 1.0 - dones[t]
            delta = rewards[t] + self.gamma * next_v * non_term - values[t]
            gae = delta + self.gamma * self.gae_lambda * non_term * gae
            advantages[t] = gae
        returns = advantages + np.array(values)
        return advantages, returns

    def update(self):
        states = torch.FloatTensor(np.array(self.rollout_states))
        actions = torch.FloatTensor(np.array(self.rollout_actions))
        last_v = self.network(torch.FloatTensor(self.rollout_states[-1]))[2].item()
        advantages, returns = self.compute_gae(last_v)
        advantages_t = torch.FloatTensor(advantages)
        returns_t = torch.FloatTensor(returns)

        mu, std, values = self.network(states)
        dist = torch.distributions.Normal(mu, std)
        new_log_probs = dist.log_prob(actions).sum(-1)
        entropy = dist.entropy().sum(-1).mean()

        actor_loss = -(new_log_probs * advantages_t).mean()
        critic_loss = F.mse_loss(values.squeeze(-1), returns_t)
        total_loss = actor_loss + 0.5 * critic_loss - 0.005 * entropy

        self.optimizer.zero_grad()
        total_loss.backward()
        torch.nn.utils.clip_grad_norm_(self.network.parameters(), 0.5)
        self.optimizer.step()

        self.rollout_states.clear()
        self.rollout_actions.clear()
        self.rollout_rewards.clear()
        self.rollout_log_probs.clear()
        self.rollout_values.clear()
        self.rollout_dones.clear()

        return actor_loss.item(), critic_loss.item(), entropy.item()


# === Training Loop ===
def train_a2c_pendulum(episodes=1500, rollout_steps=2048, log_interval=50):
    env = make_env()
    agent = A2CPendulumAgent()
    episode_rewards = []
    state, _ = env.reset(seed=42)
    current_ep_reward = 0

    for step in range(rollout_steps * episodes // rollout_steps):
        action, log_prob, value = agent.select_action(state)
        next_state, reward, terminated, truncated, _ = env.step(action.numpy())
        done = terminated or truncated

        agent.rollout_states.append(state)
        agent.rollout_actions.append(action.numpy())
        agent.rollout_rewards.append(reward)
        agent.rollout_log_probs.append(log_prob)
        agent.rollout_values.append(value)
        agent.rollout_dones.append(float(done))

        current_ep_reward += reward
        state = next_state

        if done:
            episode_rewards.append(current_ep_reward)
            if len(episode_rewards) % log_interval == 0:
                avg = np.mean(episode_rewards[-log_interval:])
                print(f"Episode {len(episode_rewards)}, Avg Reward: {avg:.1f}")
            current_ep_reward = 0
            state, _ = env.reset()

        if len(agent.rollout_states) >= rollout_steps:
            agent.update()

    env.close()
    return episode_rewards, agent


if __name__ == "__main__":
    rewards, trained_agent = train_a2c_pendulum(episodes=1500)
    print(f"Final avg reward: {np.mean(rewards[-100:]):.1f}")
训练阶段Episode 范围平均奖励行为特征

随机探索

0-100

-1600 ~ -1200

完全随机动作,摆锤下垂

初期学习

100-300

-1200 ~ -800

开始施加力矩,偶尔摆动

摆动学习

300-600

-800 ~ -500

学会摆动到高位

平衡尝试

600-1000

-500 ~ -300

能到达顶部但不稳定

稳定平衡

1000+

-300 ~ -150

基本能保持直立平衡

Pendulum 环境的状态表示使用 cos(theta) 和 sin(theta) 而非直接用角度 theta,这是为了保持周期性。这是连续控制任务中常见的状态编码技巧。

A2C 在 Pendulum 上可能训练不稳定,因为策略梯度方法对超参数很敏感。如果奖励没有改善,尝试降低学习率(1e-4)、增大 GAE lambda(0.98)或增加 rollout_steps(4096)。

继续你的 AI 学习之旅

浏览更多 AI 知识库文章,或者探索 GitHub 上的优质 AI 项目