文章目錄
- 前言
- PPO 算法簡介
- 從 TRPO 到 PPO
- PPO 的兩種形式:懲罰與截斷
- 代碼實踐:PPO 解決離散動作空間問題 (CartPole)
- 環境與工具函數
- 定義策略與價值網絡
- PPO 智能體核心實現
- 訓練與結果
- 代碼實踐:PPO 解決連續動作空間問題 (Pendulum)
- 環境準備
- 適用于連續動作的網絡
- PPO 智能體 (連續版)
- 訓練與結果
- 總結
前言
歡迎來到深度強化學習(DRL)的世界!在眾多 DRL 算法中,Proximal Policy Optimization (PPO) 無疑是最受歡迎和廣泛應用的算法之一。它由 OpenAI 在 2017 年提出,以其出色的性能、相對簡單的實現和穩定的訓練過程而著稱,成為了許多研究和應用的基準算法。
本篇博客旨在通過一個完整的 PyTorch 實現,帶您從代碼層面深入理解 PPO 算法。我們將不僅僅是看公式,更是要“動手”,一步步構建、訓練和分析 PPO 智能體。為了全面掌握其應用,我們將分別在經典的離散動作空間(CartPole-v1)和連續動作空間(Pendulum-v1)兩個環境中進行實踐。
無論您是 DRL 的初學者,還是希望鞏固 PPO 知識的實踐者,相信通過這篇代碼驅動的教程,您都能對 PPO 有一個更具體、更深刻的認識。
完整代碼:下載鏈接
PPO 算法簡介
在深入代碼之前,我們先快速回顧一下 PPO 的核心思想。
從 TRPO 到 PPO
PPO 的思想源于 TRPO(Trust Region Policy Optimization)。TRPO 旨在通過限制每次策略更新的步長,確保更新后的策略不會與舊策略偏離太遠,從而保證學習的穩定性。它的優化目標如下:
TRPO 通過一個 KL 散度的約束來限制策略更新的區域,但這個約束的計算過程非常復雜,涉及泰勒展開、共軛梯度、線性搜索等,導致其實現難度大,運算量也非常可觀。
PPO 的出現正是為了解決這個問題。它繼承了 TRPO 的核心思想,即在更新策略時不要“步子邁得太大”,但采用了更簡單、更易于實現的方法。
PPO 的兩種形式:懲罰與截斷
PPO 主要有兩種形式:PPO-Penalty 和 PPO-Clip。
-
PPO-Penalty (懲罰)
它將 TRPO 的 KL 散度約束作為一個懲罰項直接放入目標函數中,變成一個無約束的優化問題,并通過一個動態調整的系數β
來控制懲罰的力度。 -
PPO-Clip (截斷)
這是更常用的一種形式,也是我們代碼將要實現的版本。它直接在目標函數中進行截斷(clip),以保證新的參數和舊的參數的差距不會太大。其核心思想在于
clip
函數。我們定義一個比率r(θ)
為新策略與舊策略輸出同一動作的概率之比。- 當優勢函數 A > 0 時(即當前動作優于平均水平),我們希望增大這個動作的概率,但
r(θ)
的上限被截斷在1+ε
,防止策略更新過于激進。 - 當優勢函數 A < 0 時(即當前動作劣于平均水平),我們希望減小這個動作的概率,但
r(θ)
的下限被截斷在1-ε
,同樣是為了限制更新幅度。
下圖直觀地展示了 PPO-Clip 的目標函數
L^Clip
與概率比r(θ)
的關系: - 當優勢函數 A > 0 時(即當前動作優于平均水平),我們希望增大這個動作的概率,但
大量的實驗表明,PPO-Clip 的性能通常比 PPO-Penalty 更好且更穩定。因此,我們的代碼實踐將專注于 PPO-Clip 的實現。
理論鋪墊結束,讓我們開始編碼吧!
代碼實踐:PPO 解決離散動作空間問題 (CartPole)
我們將從經典的 CartPole-v1 環境開始,它要求智能體通過向左或向右施加力來保持桿子豎直不倒,是一個典型的離散動作空間問題(動作:0-向左,1-向右)。
環境與工具函數
首先,我們定義一些通用的工具函數并初始化環境。這里的核心是 compute_advantage
函數,它實現了廣義優勢估計(GAE),這是一種在偏差和方差之間取得平衡的優勢函數計算方法,對于穩定策略梯度算法的訓練至關重要。
PPO離散動作.ipynb
"""
強化學習工具函數集
包含廣義優勢估計(GAE)和數據平滑處理功能
"""import torch
import numpy as npdef compute_advantage(gamma, lmbda, td_delta):"""計算廣義優勢估計(Generalized Advantage Estimation,GAE)GAE是一種在強化學習中用于減少策略梯度方差的技術,通過對時序差分誤差進行指數加權平均來估計優勢函數,平衡偏差和方差的權衡。參數:gamma (float): 折扣因子,維度: 標量取值范圍[0,1],決定未來獎勵的重要性lmbda (float): GAE參數,維度: 標量 取值范圍[0,1],控制偏差-方差權衡lmbda=0時為TD(0)單步時間差分,lmbda=1時為蒙特卡洛方法用采樣到的獎勵-狀態價值估計td_delta (torch.Tensor): 時序差分誤差序列,維度: [時間步數]包含每個時間步的TD誤差值返回:torch.Tensor: 廣義優勢估計值,維度: [時間步數]與輸入td_delta維度相同的優勢函數估計數學公式:A_t^GAE(γ,λ) = Σ_{l=0}^∞ (γλ)^l * δ_{t+l}其中 δ_t = r_t + γV(s_{t+1}) - V(s_t) 是TD誤差"""# 將PyTorch張量轉換為NumPy數組進行計算# td_delta維度: [時間步數] -> [時間步數]td_delta = td_delta.detach().numpy() # 因為A用來求g的,需要梯度,防止梯度向下傳播# 初始化優勢值列表,用于存儲每個時間步的優勢估計# advantage_list維度: 最終為[時間步數]advantage_list = []# 初始化當前優勢值,從序列末尾開始反向計算# advantage維度: 標量advantage = 0.0# 從時間序列末尾開始反向遍歷TD誤差# 反向計算是因為GAE需要利用未來的信息# delta維度: 標量(td_delta中的單個元素)for delta in td_delta[::-1]: # [::-1]實現序列反轉# GAE遞歸公式:A_t = δ_t + γλA_{t+1}# gamma * lmbda * advantage: 來自未來時間步的衰減優勢值# delta: 當前時間步的TD誤差# advantage維度: 標量advantage = gamma * lmbda * advantage + delta# 將計算得到的優勢值添加到列表中# advantage_list維度: 逐步增長到[時間步數]advantage_list.append(advantage)# 由于是反向計算,需要將結果列表反轉回正確的時間順序# advantage_list維度: [時間步數](時間順序已恢復)advantage_list.reverse()# 將NumPy列表轉換回PyTorch張量并返回# 返回值維度: [時間步數]return torch.tensor(advantage_list, dtype=torch.float)def moving_average(data, window_size):"""計算移動平均值,用于平滑獎勵曲線該函數通過滑動窗口的方式對時間序列數據進行平滑處理,可以有效減少數據中的噪聲,使曲線更加平滑美觀。常用于強化學習中對訓練過程的獎勵曲線進行可視化優化。參數:data (list): 原始數據序列,維度: [num_episodes]包含需要平滑處理的數值數據(如每輪訓練的獎勵值)window_size (int): 移動窗口大小,維度: 標量決定了平滑程度,窗口越大平滑效果越明顯但也會導致更多的數據點丟失返回:list: 移動平均后的數據,維度: [len(data) - window_size + 1]返回的數據長度會比原數據少 window_size - 1 個元素這是因為需要足夠的數據點來計算第一個移動平均值示例:>>> data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] # 維度: [10]>>> smoothed = moving_average(data, 3) # window_size = 3>>> print(smoothed) # 輸出: [2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0] 維度: [8]"""# 邊界檢查:如果數據長度小于窗口大小,直接返回原數據# 這種情況下無法計算移動平均值# data維度: [num_episodes], window_size維度: 標量if len(data) < window_size:return data# 初始化移動平均值列表# moving_avg維度: 最終為[len(data) - window_size + 1]moving_avg = []# 遍歷數據,計算每個窗口的移動平均值# i的取值范圍: 0 到 len(data) - window_size# 循環次數: len(data) - window_size + 1# 每次循環處理一個滑動窗口位置for i in range(len(data) - window_size + 1):# 提取當前窗口內的數據切片# window_data維度: [window_size]# 包含從索引i開始的連續window_size個元素# 例如:當i=0, window_size=3時,提取data[0:3]window_data = data[i:i + window_size]# 計算當前窗口內數據的算術平均值# np.mean(window_data)維度: 標量# 將平均值添加到結果列表中moving_avg.append(np.mean(window_data))# 返回移動平均后的數據列表# moving_avg維度: [len(data) - window_size + 1]return moving_avg``````python
"""
強化學習環境初始化模塊
用于創建和配置OpenAI Gym環境
"""import gym# 環境配置
# 定義要使用的強化學習環境名稱
# CartPole-v1是經典的平衡桿控制問題:
# - 狀態空間:4維連續空間(車位置、車速度、桿角度、桿角速度)
# - 動作空間:2維離散空間(向左推車、向右推車)
# - 目標:保持桿子平衡盡可能長的時間
# env_name維度: 標量(字符串)
env_name = 'CartPole-v1'# 創建強化學習環境實例
# gym.make()函數根據環境名稱創建對應的環境對象
# 該環境對象包含了狀態空間、動作空間、獎勵函數等定義
# env維度: gym.Env對象(包含狀態空間[4]和動作空間[2]的環境實例)
# env.observation_space.shape: (4,) - 觀測狀態維度
# env.action_space.n: 2 - 離散動作數量
env = gym.make(env_name)
定義策略與價值網絡
PPO 是一種 Actor-Critic 架構的算法。我們需要定義兩個網絡:
- 策略網絡 (PolicyNet):作為 Actor,輸入狀態,輸出一個動作的概率分布。
- 價值網絡 (ValueNet):作為 Critic,輸入狀態,輸出該狀態的價值估計 V(s)。
"""
PPO(Proximal Policy Optimization)算法實現
包含策略網絡、價值網絡和PPO智能體的完整定義
"""import torch
import torch.nn.functional as F
import numpy as npclass PolicyNet(torch.nn.Module):"""策略網絡(Actor Network)用于輸出動作概率分布,指導智能體如何選擇動作"""def __init__(self, state_dim, hidden_dim, action_dim):"""初始化策略網絡參數:state_dim (int): 狀態空間維度,維度: 標量對于CartPole-v1環境,state_dim=4hidden_dim (int): 隱藏層神經元數量,維度: 標量控制網絡的表達能力action_dim (int): 動作空間維度,維度: 標量對于CartPole-v1環境,action_dim=2"""super(PolicyNet, self).__init__()# 第一層全連接層:狀態輸入 -> 隱藏層# 輸入維度: [batch_size, state_dim] -> 輸出維度: [batch_size, hidden_dim]self.fc1 = torch.nn.Linear(state_dim, hidden_dim)# 第二層全連接層:隱藏層 -> 動作概率# 輸入維度: [batch_size, hidden_dim] -> 輸出維度: [batch_size, action_dim]self.fc2 = torch.nn.Linear(hidden_dim, action_dim)def forward(self, x):"""前向傳播過程參數:x (torch.Tensor): 輸入狀態,維度: [batch_size, state_dim]返回:torch.Tensor: 動作概率分布,維度: [batch_size, action_dim]每行為一個狀態對應的動作概率分布,概率和為1"""# 第一層 + ReLU激活函數# x維度: [batch_size, state_dim] -> [batch_size, hidden_dim]x = F.relu(self.fc1(x))# 第二層 + Softmax激活函數,輸出概率分布# x維度: [batch_size, hidden_dim] -> [batch_size, action_dim]# dim=1表示在第1維(動作維度)上進行softmax,確保每行概率和為1return F.softmax(self.fc2(x), dim=1)class ValueNet(torch.nn.Module):"""價值網絡(Critic Network)用于估計狀態價值函數V(s),評估當前狀態的好壞"""def __init__(self, state_dim, hidden_dim):"""初始化價值網絡參數:state_dim (int): 狀態空間維度,維度: 標量對于CartPole-v1環境,state_dim=4hidden_dim (int): 隱藏層神經元數量,維度: 標量控制網絡的表達能力"""super(ValueNet, self).__init__()# 第一層全連接層:狀態輸入 -> 隱藏層# 輸入維度: [batch_size, state_dim] -> 輸出維度: [batch_size, hidden_dim]self.fc1 = torch.nn.Linear(state_dim, hidden_dim)# 第二層全連接層:隱藏層 -> 狀態價值(標量)# 輸入維度: [batch_size, hidden_dim] -> 輸出維度: [batch_size, 1]self.fc2 = torch.nn.Linear(hidden_dim, 1)def forward(self, x):"""前向傳播過程參數:x (torch.Tensor): 輸入狀態,維度: [batch_size, state_dim]返回:torch.Tensor: 狀態價值估計,維度: [batch_size, 1]每行為一個狀態對應的價值估計"""# 第一層 + ReLU激活函數# x維度: [batch_size, state_dim] -> [batch_size, hidden_dim]x = F.relu(self.fc1(x))# 第二層,輸出狀態價值(無激活函數,可以輸出負值)# x維度: [batch_size, hidden_dim] -> [batch_size, 1]return self.fc2(x)
PPO 智能體核心實現
這是我們 PPO 算法的核心。PPO
類封裝了 Actor 和 Critic,并實現了 take_action
(動作選擇)和 update
(網絡更新)兩個關鍵方法。請特別關注 update
函數,它完整地實現了 PPO-Clip 的目標函數計算和參數更新邏輯。
class PPO:"""PPO(Proximal Policy Optimization)算法實現采用截斷方式防止策略更新過大,確保訓練穩定性"""def __init__(self, state_dim, hidden_dim, action_dim, actor_lr, critic_lr,lmbda, epochs, eps, gamma, device):"""初始化PPO智能體參數:state_dim (int): 狀態空間維度,維度: 標量hidden_dim (int): 隱藏層神經元數量,維度: 標量action_dim (int): 動作空間維度,維度: 標量actor_lr (float): Actor網絡學習率,維度: 標量critic_lr (float): Critic網絡學習率,維度: 標量lmbda (float): GAE參數λ,維度: 標量,取值范圍[0,1]epochs (int): 每次更新的訓練輪數,維度: 標量eps (float): PPO截斷參數ε,維度: 標量,通常取0.1-0.3gamma (float): 折扣因子γ,維度: 標量,取值范圍[0,1]device (torch.device): 計算設備(CPU或GPU),維度: 標量"""# 初始化Actor網絡(策略網絡)# 網絡參數維度:fc1權重[state_dim, hidden_dim], fc2權重[hidden_dim, action_dim]self.actor = PolicyNet(state_dim, hidden_dim, action_dim).to(device)#