文章目錄
- 前言
- 核心工具函數
- 廣義優勢估計 (Generalized Advantage Estimation, GAE)
- 案例一:TRPO 解決離散動作問題 (CartPole-v1)
- 1. 環境初始化
- 2. 網絡結構定義
- 3. TRPO 智能體實現
- 4. 訓練與可視化
- 5. 訓練主程序與結果
- 案例二:TRPO 解決連續動作問題 (Pendulum-v1)
- 1. 環境與工具函數
- 2. 網絡結構的關鍵區別
- 3. TRPO 智能體實現 (連續版)
- 4. 訓練與可視化
- 5. 訓練主程序與結果
- 總結
前言
歡迎來到深度強化學習(Deep Reinforcement Learning, DRL)的世界!DRL 是深度學習與強化學習的強大結合,它讓智能體(Agent)能夠在復雜的、高維度的環境中通過試錯來學習最優策略。近年來,DRL 在游戲、機器人控制、自然語言處理等領域取得了舉世矚目的成就。
在眾多 DRL 算法中,策略梯度(Policy Gradient, PG)方法是一類直接對策略進行優化的算法。然而,傳統的 PG 方法存在一個顯著的痛點:訓練過程不穩定。由于策略更新的步長難以確定,過大的步長可能導致新策略性能急劇下降,使得訓練過程崩潰;過小的步長則會導致收斂速度過慢。
為了解決這個問題,來自 UC Berkeley 的 John Schulman 等人提出了**信賴域策略優化(Trust Region Policy Optimization, TRPO)**算法。TRPO 的核心思想是在每次策略更新時,施加一個約束,確保新舊策略之間的差異(用 KL 散度衡量)在一個“信賴域”內。這就像我們下山時,不會一步邁出太遠,而是先在腳邊找一個可靠的落腳點,小步快走,從而保證了每一步都是在穩健地提升策略性能。
TRPO 通過復雜的數學推導,將這個帶約束的優化問題轉化為通過**共軛梯度法(Conjugate Gradient)和線性搜索(Line Search)**高效求解的問題,從而在保證穩定性的同時,實現了較高的樣本效率。
本篇博客旨在通過 PyTorch 代碼,深入淺出地剖析 TRPO 算法的實現細節。我們將從零開始,構建 TRPO 智能體,并分別在兩個經典的強化學習環境中進行實戰:
- CartPole-v1:一個經典的離散動作空間環境,目標是控制小車移動來保持桿子豎直不倒。
- Pendulum-v1:一個經典的連續動作空間環境,目標是施加力矩來將擺桿豎直向上并保持。
通過這兩個案例,我們將全面掌握 TRPO 在不同動作空間下的實現差異與核心思想。無論您是 DRL 的初學者還是有一定經驗的實踐者,相信這篇代碼驅動的博客都能為您帶來深刻的理解和啟發。
完整代碼:下載鏈接
核心工具函數
在正式進入算法實現之前,我們先介紹兩個在策略優化算法中非常實用的工具函數:廣義優勢估計(GAE)和移動平均。
廣義優勢估計 (Generalized Advantage Estimation, GAE)
在 Actor-Critic 框架中,優勢函數 A(s, a) = Q(s, a) - V(s) 用來評估在狀態 s 下采取動作 a 相對于平均水平的好壞。GAE 是一種先進的優勢函數估計方法,它通過引入一個參數 λ
來平衡 TD(0) 估計(偏差低,方差高)和蒙特卡洛估計(偏差高,方差低)之間的權衡,從而有效降低策略梯度的方差,提升訓練穩定性。
下面是 GAE 的實現代碼,包含了詳盡的維度分析和注釋。
"""
強化學習工具函數集
包含廣義優勢估計(GAE)和數據平滑處理功能
"""import torch
import numpy as npdef compute_advantage(gamma, lmbda, td_delta):"""計算廣義優勢估計(Generalized Advantage Estimation,GAE)GAE是一種在強化學習中用于減少策略梯度方差的技術,通過對時序差分誤差進行指數加權平均來估計優勢函數,平衡偏差和方差的權衡。參數:gamma (float): 折扣因子,維度: 標量取值范圍[0,1],決定未來獎勵的重要性lmbda (float): GAE參數,維度: 標量 取值范圍[0,1],控制偏差-方差權衡lmbda=0時為TD(0)單步時間差分,lmbda=1時為蒙特卡洛方法用采樣到的獎勵-狀態價值估計td_delta (torch.Tensor): 時序差分誤差序列,維度: [時間步數]包含每個時間步的TD誤差值返回:torch.Tensor: 廣義優勢估計值,維度: [時間步數]與輸入td_delta維度相同的優勢函數估計數學公式:A_t^GAE(γ,λ) = Σ_{l=0}^∞ (γλ)^l * δ_{t+l}其中 δ_t = r_t + γV(s_{t+1}) - V(s_t) 是TD誤差"""# 將PyTorch張量轉換為NumPy數組進行計算# td_delta維度: [時間步數] -> [時間步數]td_delta = td_delta.detach().numpy() # 因為A用來求g的,需要梯度,防止梯度向下傳播# 初始化優勢值列表,用于存儲每個時間步的優勢估計# advantage_list維度: 最終為[時間步數]advantage_list = []# 初始化當前優勢值,從序列末尾開始反向計算# advantage維度: 標量advantage = 0.0# 從時間序列末尾開始反向遍歷TD誤差# 反向計算是因為GAE需要利用未來的信息# delta維度: 標量(td_delta中的單個元素)for delta in td_delta[::-1]: # [::-1]實現序列反轉# GAE遞歸公式:A_t = δ_t + γλA_{t+1}# gamma * lmbda * advantage: 來自未來時間步的衰減優勢值# delta: 當前時間步的TD誤差# advantage維度: 標量advantage = gamma * lmbda * advantage + delta# 將計算得到的優勢值添加到列表中# advantage_list維度: 逐步增長到[時間步數]advantage_list.append(advantage)# 由于是反向計算,需要將結果列表反轉回正確的時間順序# advantage_list維度: [時間步數](時間順序已恢復)advantage_list.reverse()# 將NumPy列表轉換回PyTorch張量并返回# 返回值維度: [時間步數]return torch.tensor(advantage_list, dtype=torch.float)def moving_average(data, window_size):"""計算移動平均值,用于平滑獎勵曲線該函數通過滑動窗口的方式對時間序列數據進行平滑處理,可以有效減少數據中的噪聲,使曲線更加平滑美觀。常用于強化學習中對訓練過程的獎勵曲線進行可視化優化。參數:data (list): 原始數據序列,維度: [num_episodes]包含需要平滑處理的數值數據(如每輪訓練的獎勵值)window_size (int): 移動窗口大小,維度: 標量決定了平滑程度,窗口越大平滑效果越明顯但也會導致更多的數據點丟失返回:list: 移動平均后的數據,維度: [len(data) - window_size + 1]返回的數據長度會比原數據少 window_size - 1 個元素這是因為需要足夠的數據點來計算第一個移動平均值示例:>>> data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] # 維度: [10]>>> smoothed = moving_average(data, 3) # window_size = 3>>> print(smoothed) # 輸出: [2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0] 維度: [8]"""# 邊界檢查:如果數據長度小于窗口大小,直接返回原數據# 這種情況下無法計算移動平均值# data維度: [num_episodes], window_size維度: 標量if len(data) < window_size:return data# 初始化移動平均值列表# moving_avg維度: 最終為[len(data) - window_size + 1]moving_avg = []# 遍歷數據,計算每個窗口的移動平均值# i的取值范圍: 0 到 len(data) - window_size# 循環次數: len(data) - window_size + 1# 每次循環處理一個滑動窗口位置for i in range(len(data) - window_size + 1):# 提取當前窗口內的數據切片# window_data維度: [window_size]# 包含從索引i開始的連續window_size個元素# 例如:當i=0, window_size=3時,提取data[0:3]window_data = data[i:i + window_size]# 計算當前窗口內數據的算術平均值# np.mean(window_data)維度: 標量# 將平均值添加到結果列表中moving_avg.append(np.mean(window_data))# 返回移動平均后的數據列表# moving_avg維度: [len(data) - window_size + 1]return moving_avg
案例一:TRPO 解決離散動作問題 (CartPole-v1)
我們首先從相對簡單的離散動作環境 CartPole-v1 開始。
1. 環境初始化
我們使用 OpenAI Gym 庫來創建 CartPole-v1 環境。該環境的狀態是4維連續向量,動作是2維離散值(向左或向右推小車)。
"""
強化學習環境初始化模塊
用于創建和配置OpenAI Gym環境
"""import gym# 環境配置
# 定義要使用的強化學習環境名稱
# CartPole-v1是經典的平衡桿控制問題:
# - 狀態空間:4維連續空間(車位置、車速度、桿角度、桿角速度)
# - 動作空間:2維離散空間(向左推車、向右推車)
# - 目標:保持桿子平衡盡可能長的時間
# env_name維度: 標量(字符串)
env_name = 'CartPole-v1'# 創建強化學習環境實例
# gym.make()函數根據環境名稱創建對應的環境對象
# 該環境對象包含了狀態空間、動作空間、獎勵函數等定義
# env維度: gym.Env對象(包含狀態空間[4]和動作空間[2]的環境實例)
# env.observation_space.shape: (4,) - 觀測狀態維度
# env.action_space.n: 2 - 離散動作數量
env = gym.make(env_name)
2. 網絡結構定義
TRPO 屬于 Actor-Critic 算法家族,因此我們需要定義兩個網絡:策略網絡(Actor)和價值網絡(Critic)。
PolicyNet
(策略網絡):輸入是環境狀態,輸出是每個離散動作的選擇概率分布。我們使用Softmax
函數來確保輸出是合法的概率分布。ValueNet
(價值網絡):輸入是環境狀態,輸出是一個標量,代表該狀態的價值估計 V(s)。
"""
TRPO(Trust Region Policy Optimization)智能體實現
包含策略網絡、價值網絡和TRPO算法的完整實現
"""import torch
import torch.nn.functional as F
import numpy as np
import copyclass PolicyNet(torch.nn.Module):"""策略網絡,面向離散動作空間使用神經網絡來學習狀態到動作概率分布的映射,輸出每個動作的選擇概率,用于策略梯度算法。"""def __init__(self, state_dim, hidden_dim, action_dim):"""初始化策略網絡參數:state_dim (int): 狀態空間維度,維度: 標量表示環境狀態向量的長度hidden_dim (int): 隱藏層神經元數量,維度: 標量控制網絡的表達能力action_dim (int): 動作空間維度,維度: 標量表示可選擇的離散動作數量"""super(PolicyNet, self).__init__()# 第一層全連接層:狀態輸入到隱藏層# 權重維度: [hidden_dim, state_dim]# 偏置維度: [hidden_dim]self.fc1 = torch.nn.Linear(state_dim, hidden_dim)# 第二層全連接層:隱藏層到動作輸出# 權重維度: [action_dim, hidden_dim] # 偏置維度: [action_dim]self.fc2 = torch.nn.Linear(hidden_dim, action_dim)def forward(self, x):"""前向傳播計算動作概率分布參數:x (torch.Tensor): 輸入狀態,維度: [batch_size, state_dim]返回:torch.Tensor: 動作概率分布,維度: [batch_size, action_dim]每行是一個概率分布,所有元素和為1"""# 第一層線性變換后應用ReLU激活函數# x維度: [batch_size, state_dim] -> [batch_size, hidden_dim]x = F.relu(self.fc1(x))# 第二層線性變換后應用Softmax得到概率分布# x維度: [batch_size, hidden_dim] -> [batch_size, action_dim]# dim=1表示在動作維度上進行softmax歸一化return F.softmax(self.fc2(x), dim=1)class ValueNet(torch.nn.Module):"""價值網絡,對狀態進行價值評估學習狀態價值函數V(s),用于計算優勢函數和TD誤差,優勢函數用于計算g是Actor-Critic算法中的Critic部分。"""def __init__(self, state_dim, hidden_dim):"""初始化價值網絡參數:state_dim (int): 狀態空間維度,維度: 標量hidden_dim (int): 隱藏層神經元數量,維度: 標量"""super(ValueNet, self).__init__()# 第一層全連接層:狀態輸入到隱藏層# 權重維度: [hidden_dim, state_dim]# 偏置維度: [hidden_dim]self.fc1 = torch.nn.Linear(state_dim, hidden_dim)# 第二層全連接層:隱藏層到價值輸出(單個標量值)# 權重維度: [1, hidden_dim]# 偏置維度: [1]self.fc2 = torch.nn.Linear(hidden_dim, 1)def forward(self, x):"""前向傳播計算狀態價值參數:x (torch.Tensor): 輸入狀態,維度: [batch_size, state_dim]返回:torch.Tensor: 狀態價值估計,維度: [batch_size, 1]每個狀態對應一個價值估計"""# 第一層線性變換后應用ReLU激活函數# x維度: [batch_size, state_dim] -> [batch_size, hidden_dim]x = F.relu(self.fc1(x))# 第二層線性變換得到價值估計(無激活函數)# x維度: [batch_size, hidden_dim] -> [batch_size, 1]return self.fc2(x)
3. TRPO 智能體實現
這是整個項目的核心。TRPO
類封裝了算法的所有邏輯。
__init__
: 初始化 Actor 和 Critic 網絡、Critic 的優化器(注意,Actor 的參數不通過傳統優化器更新)以及 TRPO 的各項超參數。take_action
: 根據策略網絡輸出的概率分布,采樣一個動作。hessian_matrix_vector_product
: 這是 TRPO 的一個關鍵技巧。為了避免直接計算和求逆復雜的 Hessian 矩陣(KL 散度對策略參數的二階導數矩陣 H),我們只計算它與一個向量v
的乘積Hv
。這可以通過兩次自動微分高效完成。conjugate_gradient
: 共軛梯度法。這是一個迭代算法,用于高效地求解線性方程組Hx = g
,從而找到策略更新的“自然梯度”方向x = H?1g
。compute_surrogate_obj
: 計算策略優化的代理目標函數,即帶重要性采樣修正的優勢函數期望。line_search
: 在共軛梯度法找到的更新方向上,通過線性搜索(回溯法)找到一個既能提升目標函數,又滿足 KL 散度約束的最佳步長。policy_learn
: 整合上述方法,完成一次策略網絡的更新。update
: 算法的整體更新流程。它接收一個回合的經驗數據,計算 TD 誤差、GAE 優勢函數,然后分別更新價值網絡(通過梯度下降)和策略網絡(通過policy_learn
)。
class TRPO:"""TRPO(Trust Region Policy Optimization)算法實現TRPO是一種策略優化算法,通過限制策略更新的步長來保證訓練穩定性,——KL散度用來引出H使用信任域約束和共軛梯度法來求解優化問題。"""def __init__(self, hidden_dim, state_space, action_space, lmbda,kl_constraint, alpha, critic_lr, gamma, device):"""初始化TRPO算法參數:hidden_dim (int): 隱藏層維度,維度: 標量state_space (gym.Space): 狀態空間,維度: shape為[state_dim]action_space (gym.Space): 動作空間,維度: n為動作數量lmbda (float): GAE參數,維度: 標量,取值[0,1]kl_constraint (float): KL散度約束,維度: 標量alpha (float): 線性搜索參數,維度: 標量critic_lr (float): 價值網絡學習率,維度: 標量gamma (float): 折扣因子,維度: 標量,取值[0,1]device (torch.device): 計算設備,維度: 標量"""# 從環境空間中提取維度信息# state_dim維度: 標量,表示狀態向量長度state_dim = state_space.shape[0]# action_dim維度: 標量,表示離散動作數量action_dim = action_space.n# 初始化策略網絡(Actor)# 策略網絡參數不使用傳統優化器,而是通過TRPO算法更新# self.actor維度: PolicyNet對象,參數總數取決于網絡結構self.actor = PolicyNet(state_dim, hidden_dim, action_dim).to(device)# 初始化價值網絡(Critic)# self.critic維度: ValueNet對象self.critic = ValueNet(state_dim, hidden_dim).to(device)# 價值網絡優化器,使用Adam算法# self.critic_optimizer維度: 優化器對象self.critic_optimizer = torch.optim.Adam(self.critic.parameters(),lr=critic_lr)# 算法超參數# gamma維度: 標量,未來獎勵折扣因子self.gamma = gamma# lmbda維度: 標量,GAE中的偏差-方差權衡參數self.lmbda = lmbda# kl_constraint維度: 標量,KL散度的最大允許值self.kl_constraint = kl_constraint# alpha維度: 標量,線性搜索中的步長縮減因子self.alpha = alpha# device維度: 設備對象,指定計算設備self.device = devicedef take_action(self, state):"""根據當前狀態選擇動作參數:state (np.ndarray): 當前狀態,維度: [state_dim]返回:int: 選擇的動作索引,維度: 標量"""# 將狀態轉換為批量格式并轉換為張量# state維度: [state_dim] -> [1, state_dim]state = np.array([state])# state維度: [1, state_dim],torch.Tensor類型state = torch.tensor(state, dtype=torch.float).to(self.device)# 通過策略網絡計算動作概率分布# probs維度: [1, action_dim],每個動作的選擇概率probs = self.actor(state)# 創建分類分布對象用于采樣# action_dist維度: Categorical分布對象action_dist = torch.distributions.Categorical(probs)# 從概率分布中采樣動作# action維度: [1],包含選擇的動作索引action = action_dist.sample()# 返回標量形式的動作索引# 返回值維度: 標量(整數)return action.item()def hessian_matrix_vector_product(self, states, old_action_dists, vector):"""計算Hessian矩陣與向量的乘積(用于共軛梯度法)在TRPO中,需要計算KL散度Hessian矩陣與向量的乘積,這里使用自動微分的技巧來高效計算。參數:states (torch.Tensor): 狀態批次,維度: [batch_size, state_dim]old_action_dists (torch.distributions.Categorical): 舊策略分布,維度: batch_size個分布vector (torch.Tensor): 要相乘的向量,維度: [param_dim],param_dim為策略網絡參數總數返回:torch.Tensor: Hessian-向量乘積,維度: [param_dim]"""# 計算新策略的動作分布# new_action_dists維度: batch_size個Categorical分布new_action_dists = torch.distributions.Categorical(self.actor(states))# 計算新舊策略之間的平均KL散度# kl維度: 標量,表示策略更新的大小kl = torch.mean(torch.distributions.kl.kl_divergence(old_action_dists,new_action_dists))# 計算KL散度對策略參數的梯度# kl_grad維度: 與策略網絡參數結構相同的梯度元組kl_grad = torch.autograd.grad(kl,self.actor.parameters(),create_graph=True)# 將梯度展平為向量形式# kl_grad_vector維度: [param_dim],策略參數的梯度向量kl_grad_vector = torch.cat([grad.view(-1) for grad in kl_grad])# 計算梯度向量與輸入向量的點積# kl_grad_vector_product維度: 標量kl_grad_vector_product = torch.dot(kl_grad_vector, vector)# 計算二階梯度(Hessian-向量乘積)# grad2維度: 與策略網絡參數結構相同的二階梯度元組grad2 = torch.autograd.grad(kl_grad_vector_product,self.actor.parameters())# 將二階梯度展平為向量形式# grad2_vector維度: [param_dim],即Hv的結果grad2_vector = torch.cat([grad.view(-1) for grad in grad2