DQN是什么玩意兒?
深度Q網絡(DQN)是深度強化學習領域里一個超厲害的算法。它把Q學習和深度神經網絡巧妙地結合在了一起,專門用來搞定那些狀態空間維度特別高、特別復雜的難題。它展示了用函數近似來學習價值函數的超能力,因為傳統的表格方法在面對狀態空間特別大或者連續不斷的狀態空間時,就會因為太復雜而搞不定。
為啥要用深度學習來搞Q學習呢?
表格Q學習會把每個狀態-動作對都對應一個估計的Q值,存到一個大表格里。這種方法碰到幾個超級麻煩的問題:
- 維度災難:狀態變量的數量或者每個狀態變量可能的值一多,Q表格的大小就會像滾雪球一樣指數級增長,很快就會變得在計算上根本搞不定,存都存不下,更別提更新了(比如用像素表示的游戲狀態、機器人的傳感器讀數)。
- 連續狀態:表格方法根本沒法直接處理連續的狀態變量,還得先離散化,這不僅會丟失信息,而且離散化之后還是會碰到維度問題。
深度神經網絡就像一個超級英雄,它能輕松解決這些問題,因為它是個函數近似器。它不用給每個特定的狀態-動作對都存一個值,而是通過一個神經網絡 (Q(s, a; \theta)) 來學習從狀態(可能還有動作)到Q值的映射,這個映射是由網絡的權重 (\theta) 參數化的。這就讓網絡能夠在相似的狀態之間泛化知識,特別適合處理大狀態空間或者連續狀態空間。通常情況下,網絡會把狀態 (s) 作為輸入,然后輸出這個狀態下所有可能的離散動作對應的估計Q值。
DQN都在哪兒用,怎么用呢?
DQN是一個超級厲害的突破,它讓強化學習能夠解決以前根本搞不定的問題:
- 玩游戲:它最出名的就是用原始像素數據作為輸入,在Atari 2600游戲里玩出了超越人類的水平。
- 機器人控制:從傳感器數據里學習控制策略(雖然很多時候會針對連續動作進行調整)。
- 優化問題:在資源分配或者系統控制這種狀態很復雜的地方也能用。
- 序列決策:在推薦系統或者對話管理這種狀態維度很高的領域里也能大顯身手。
DQN最適合解決以下這類問題:
- 狀態空間很大、維度很高或者連續不斷。
- 動作空間是離散的。
- 沒有環境動態的模型(無模型)。
- 可以離線學習(在學習最優策略的時候,可以按照一個不同的探索策略來行動)。
DQN的數學基礎
回憶一下Q學習
DQN的基礎就是Q學習的更新規則,它旨在逐步改進對動作價值函數 (Q(s, a)) 的估計——從狀態 (s) 開始,采取動作 (a),然后按照最優策略繼續行動,預期能得到的總折扣未來回報。(Q^*(s, a)) 的貝爾曼最優性方程是:
Q ? ( s , a ) = E [ R t + 1 + γ max ? a ′ Q ? ( S t + 1 , a ′ ) ∣ S t = s , A t = a ] Q^*(s, a) = \mathbb{E}[R_{t+1} + \gamma \max_{a'} Q^*(S_{t+1}, a') | S_t=s, A_t=a] Q?(s,a)=E[Rt+1?+γa′max?Q?(St+1?,a′)∣St?=s,At?=a]
表格Q學習的更新規則是這樣近似這個方程的:
Q ( s t , a t ) ← Q ( s t , a t ) + α [ r t + γ max ? a ′ Q ( s t + 1 , a ′ ) ? TD目標 ? Q ( s t , a t ) ] Q(s_t, a_t) \leftarrow Q(s_t, a_t) + \alpha \left[ \underbrace{r_t + \gamma \max_{a'} Q(s_{t+1}, a')}_{\text{TD目標}} - Q(s_t, a_t) \right] Q(st?,at?)←Q(st?,at?)+α ?TD目標 rt?+γa′max?Q(st+1?,a′)???Q(st?,at?) ?
函數近似
DQN把Q表格換成了一個神經網絡 (Q(s, a; \theta))。這個網絡通常把狀態 (s) 作為輸入,然后輸出一個向量,包含所有動作對應的Q值,(Q(s, \cdot; \theta))。
DQN的損失函數
訓練Q網絡就是要最小化預測的Q值 (Q(s_t, a_t; \theta)) 和從貝爾曼方程里得到的目標值之間的差距。一個很直接的目標值就是 (r_t + \gamma \max_{a’} Q(s_{t+1}, a’; \theta))。但是,用同一個變化超快的網絡來既做預測又計算目標值,會導致不穩定。DQN引入了兩個超重要的技巧:
-
經驗回放:把代理(agent)經歷的轉移 ((s_t, a_t, r_t, s_{t+1}, \text{done}_t)) 都存到一個回放緩存 (\mathcal{D}) 里。訓練的時候,就從 (\mathcal{D}) 里隨機抽樣一小批轉移。這樣就能打破連續樣本之間的相關性,讓訓練過程更加穩定、數據效率更高,有點像監督學習的感覺。
-
目標網絡:用一個單獨的網絡 (Q(s, a; \theta^-)) 來計算目標Q值,這個網絡的權重 (\theta^-) 是固定的,只會在一定時間間隔后更新(比如,每 (C) 步更新一次),把主Q網絡的權重 (\theta) 復制過來 ((\theta^- \leftarrow \theta))。這就為主網絡提供了一個更穩定的、可以學習的目標。
從回放緩存里抽樣出來的轉移 ((s_t, a_t, r_t, s_{t+1}, d_t)) 的目標值 (Y_t) 是:
Y t = { r t 如果? d t 是真(終止狀態) r t + γ max ? a ′ Q ( s t + 1 , a ′ ; θ ? ) 如果? d t 是假(非終止狀態) Y_t = \begin{cases} r_t & \text{如果 } d_t \text{ 是真(終止狀態)} \\ r_t + \gamma \max_{a'} Q(s_{t+1}, a'; \theta^-) & \text{如果 } d_t \text{ 是假(非終止狀態)} \end{cases} Yt?={rt?rt?+γmaxa′?Q(st+1?,a′;θ?)?如果?dt??是真(終止狀態)如果?dt??是假(非終止狀態)?
損失函數通常是均方誤差(MSE)或者Huber損失(Smooth L1),在小批量樣本上用梯度下降法來最小化這個損失函數:
L ( θ ) = E ( s , a , r , s ′ , d ) ~ D [ ( Y j ? Q ( s j , a j ; θ ) ) 2 ] L(\theta) = \mathbb{E}_{(s, a, r, s', d) \sim \mathcal{D}} \left[ (Y_j - Q(s_j, a_j; \theta))^2 \right] L(θ)=E(s,a,r,s′,d)~D?[(Yj??Q(sj?,aj?;θ))2]
DQN的逐步解釋
- 初始化:回放緩存 (\mathcal{D})(容量 (N)),主Q網絡 (Q(s, a; \theta))(隨機權重 (\theta)),目標網絡 (Q(s, a; \theta-))((\theta- = \theta)),探索參數 (\epsilon)。
- 對于每個劇集:
a. 重置環境,獲取初始狀態 (s_1)。如果需要的話,對狀態進行預處理。
b. 對于每一步 (t):
i. 使用基于 (\epsilon)-貪婪的策略,根據 (Q(s_t, \cdot; \theta)) 來選擇動作 (a_t)。
ii. 執行 (a_t),觀察獎勵 (r_t),下一個狀態 (s_{t+1}),完成標志 (d_t)。對 (s_{t+1}) 進行預處理。
iii. 把 ((s_t, a_t, r_t, s_{t+1}, d_t)) 存到 (\mathcal{D}) 里。
iv. 抽樣小批量:從 (\mathcal{D}) 里隨機抽樣。
v. 計算目標 (Y_j) 使用目標網絡 (Q(s, a; \theta^-))。
vi. 訓練主網絡:對 (L(\theta) = (Y_j - Q(s_j, a_j; \theta))^2) 進行梯度下降步驟。
vii. 更新目標網絡:每 (C) 步,設置 (\theta^- \leftarrow \theta)。
viii. (s_t \leftarrow s_{t+1})。
ix. 如果 (d_t),結束劇集。 - 重復:直到收斂或者達到最大劇集數。
DQN的關鍵組成部分
Q網絡
- 核心函數近似器。學習把狀態映射到動作價值。
- 架構取決于狀態的表示方式(對于向量用多層感知機MLP,對于圖像用卷積神經網絡CNN)。
- 使用非線性激活函數(比如ReLU)。
- 輸出層通常有和離散動作數量一樣多的單元,輸出原始的Q值(最后沒有激活函數)。
經驗回放
- 存儲代理的經歷。
- 打破相關性,允許重復使用數據,提高穩定性和樣本效率。
- 使用數據結構比如
deque
來實現。
目標網絡
- 主Q網絡的一個副本,更新頻率更低。
- 在計算TD誤差的時候提供穩定的、可以學習的目標,防止“移動目標”問題。
- 對于穩定DQN訓練來說至關重要。
探索與利用
- 通常使用 (\epsilon)-貪婪:以概率 (\epsilon) 隨機行動,以概率 (1-\epsilon) 貪婪地(根據Q網絡)行動。
- (\epsilon) 指數衰減:(\epsilon) 隨著時間逐漸減小(比如線性或者指數衰減),從一個較高的初始值逐漸減小到一個較低的最終值。
損失函數(MSE/Huber)
- 測量網絡預測和TD目標之間的差異。
- Huber損失(在PyTorch里叫Smooth L1損失)通常比MSE更受歡迎,因為它對異常值不那么敏感,而異常值在訓練早期可能會因為大的TD誤差而出現。
超參數
- DQN的性能對超參數特別敏感,比如學習率、緩沖區大小、批量大小、目標更新頻率、折扣因子和 (\epsilon) 衰減計劃。通常需要仔細調整。
實際例子:自定義網格世界
因為Gymnasium被禁止使用了,我們就自己創建一個簡單的自定義網格世界環境。
環境描述:
- 網格大小:10x10。
- 狀態:代理的
(row, col)
位置。為了網絡輸入,表示為歸一化的向量[row/10, col/10]
。 - 動作:4個離散動作:0(上),1(下),2(左),3(右)。
- 起始狀態:(0, 0)。
- 目標狀態:(9, 9)。
- 獎勵:
- 到達目標狀態(9, 9)得 +10 分。
- 撞墻(試圖移出網格)扣 -1 分。
- 其他步驟扣 -0.1 分(小成本鼓勵效率)。
- 終止:當代理到達目標或者達到最大步數時,劇集結束。
設置環境
我們得先導入必要的庫,設置好環境,所以咱們現在就動手吧。
# 導入用于數值計算、繪圖和實用功能的必要庫
import numpy as np
import matplotlib.pyplot as plt
import random
import math
from collections import namedtuple, deque
from itertools import count
from typing import List, Tuple, Dict, Optional# 導入 PyTorch 用于構建和訓練神經網絡
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F# 設置設備,如果可用就用 GPU,否則就用 CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"使用設備:{device}")# 設置隨機種子,以便在每次運行時都能得到可重現的結果
seed = 42
random.seed(seed) # Python 隨機模塊的種子
np.random.seed(seed) # NumPy 的種子
torch.manual_seed(seed) # PyTorch(CPU)的種子
if torch.cuda.is_available():torch.cuda.manual_seed_all(seed) # PyTorch(GPU)的種子# 為 Jupyter Notebook 啟用內聯繪圖
%matplotlib inline
使用設備:cpu
創建自定義環境
第一步就是創建一個自定義的環境,模擬一個簡單的網格世界。這個環境有一個 10x10 的網格,代理可以在四個方向上移動:上、下、左、右。代理從左上角(0, 0)開始,目標是到達右下角(9, 9)。根據代理的動作和到達的狀態,會給予相應的獎勵。獎勵定義如下:
- 到達目標(右下角)得 +10 分
- 撞墻(超出邊界)扣 -1 分
- 每走一步扣 -0.1 分(鼓勵走更短的路徑)
# 自定義網格世界環境
class GridEnvironment:"""一個簡單的 10x10 網格世界環境。狀態:(row, col) 表示為歸一化的向量 [row/10, col/10]。動作:0(上),1(下),2(左),3(右)。獎勵:到達目標得 +10 分,撞墻扣 -1 分,每走一步扣 -0.1 分。"""def __init__(self, rows: int = 10, cols: int = 10) -> None:"""初始化網格世界環境。參數:- rows (int): 網格的行數。- cols (int): 網格的列數。"""self.rows: int = rowsself.cols: int = colsself.start_state: Tuple[int, int] = (0, 0) # 起始位置self.goal_state: Tuple[int, int] = (rows - 1, cols - 1) # 目標位置self.state: Tuple[int, int] = self.start_state # 當前狀態self.state_dim: int = 2 # 狀態由 2 個坐標(row, col)表示self.action_dim: int = 4 # 4 個離散動作:上、下、左、右# 動作映射:將動作索引映射到 (row_delta, col_delta)self.action_map: Dict[int, Tuple[int, int]] = {0: (-1, 0), # 上1: (1, 0), # 下2: (0, -1), # 左3: (0, 1) # 右}def reset(self) -> torch.Tensor:"""重置環境到起始狀態。返回:torch.Tensor:起始狀態作為歸一化的張量。"""self.state = self.start_statereturn self._get_state_tensor(self.state)def _get_state_tensor(self, state_tuple: Tuple[int, int]) -> torch.Tensor:"""將 (row, col) 元組轉換為歸一化的張量,供網絡使用。參數:- state_tuple (Tuple[int, int]): 狀態表示為元組 (row, col)。返回:torch.Tensor:歸一化的狀態作為張量。"""# 將坐標歸一化到 0 和 1 之間normalized_state: List[float] = [state_tuple[0] / (self.rows - 1),state_tuple[1] / (self.cols - 1)]return torch.tensor(normalized_state, dtype=torch.float32, device=device)def step(self, action: int) -> Tuple[torch.Tensor, float, bool]:"""根據給定的動作在環境中執行一步。參數:action (int): 要執行的動作(0:上,1:下,2:左,3:右)。返回:Tuple[torch.Tensor, float, bool]:- next_state_tensor (torch.Tensor): 下一個狀態作為歸一化的張量。- reward (float): 動作的獎勵。- done (bool): 劇集是否結束。"""# 如果已經到達目標狀態,就返回當前狀態if self.state == self.goal_state:return self._get_state_tensor(self.state), 0.0, True# 獲取動作對應的行和列的變化量dr, dc = self.action_map[action]current_row, current_col = self.statenext_row, next_col = current_row + dr, current_col + dc# 默認的步進成本reward: float = -0.1hit_wall: bool = False# 檢查動作是否會導致撞墻(超出邊界)if not (0 <= next_row < self.rows and 0 <= next_col < self.cols):# 保持在相同的狀態,并受到懲罰next_row, next_col = current_row, current_colreward = -1.0hit_wall = True# 更新狀態self.state = (next_row, next_col)next_state_tensor: torch.Tensor = self._get_state_tensor(self.state)# 檢查是否到達目標狀態done: bool = (self.state == self.goal_state)if done:reward = 10.0 # 到達目標的獎勵return next_state_tensor, reward, donedef get_action_space_size(self) -> int:"""返回動作空間的大小。返回:int:可能的動作數量(4)。"""return self.action_dimdef get_state_dimension(self) -> int:"""返回狀態表示的維度。返回:int:狀態的維度(2)。"""return self.state_dim
現在我們已經實現了自定義的網格環境,接下來我們實例化它并驗證它的屬性和功能。
# 用 10x10 的網格實例化自定義網格環境
custom_env = GridEnvironment(rows=10, cols=10)# 獲取動作空間的大小和狀態維度
n_actions_custom = custom_env.get_action_space_size()
n_observations_custom = custom_env.get_state_dimension()# 打印環境的基本信息
print(f"自定義網格環境:")
print(f"大小:{custom_env.rows}x{custom_env.cols}") # 網格大小
print(f"狀態維度:{n_observations_custom}") # 狀態維度(2 表示行和列)
print(f"動作維度:{n_actions_custom}") # 可能的動作數量(4)
print(f"起始狀態:{custom_env.start_state}") # 起始位置
print(f"目標狀態:{custom_env.goal_state}") # 目標位置# 重置環境并打印起始狀態的歸一化狀態張量
print(f"示例狀態張量 (0,0):{custom_env.reset()}")# 執行一個示例動作:向右移動(動作=3)并打印結果
next_s, r, d = custom_env.step(3) # 動作 3 對應向右移動
print(f"動作結果 (向右):next_state={next_s.cpu().numpy()}, reward={r}, done={d}")# 再執行一個示例動作:向上移動(動作=0)并打印結果
# 由于代理已經在最上面一行,所以這個動作會導致撞墻
next_s, r, d = custom_env.step(0) # 動作 0 對應向上移動
print(f"動作結果 (向上):next_state={next_s.cpu().numpy()}, reward={r}, done={d}")
自定義網格環境:
大小:10x10
狀態維度:2
動作維度:4
起始狀態:(0, 0)
目標狀態:(9, 9)
示例狀態張量 (0,0):tensor([0., 0.])
動作結果 (向右):next_state=[0. 0.11111111], reward=-0.1, done=False
動作結果 (向上):next_state=[0. 0.11111111], reward=-1.0, done=False
你可以看到,代理在最上面一行,向上移動會導致撞墻,受到 -1.0 的懲罰。下一個狀態保持不變。
實現DQN算法
現在,咱們來實現核心部分:Q網絡、回放緩存、動作選擇策略、優化步驟以及目標網絡更新機制。
定義Q網絡
我們用 PyTorch 的 nn.Module
定義一個簡單的多層感知機(MLP)。
# 定義 Q 網絡架構
class DQN(nn.Module):"""簡單的多層感知機 Q 網絡"""def __init__(self, n_observations: int, n_actions: int):"""初始化 Q 網絡。參數:- n_observations (int):狀態空間的維度。- n_actions (int):可能的動作數量。"""super(DQN, self).__init__()# 定義網絡層# 簡單的 MLP:輸入 -> 隱藏層1 -> ReLU -> 隱藏層2 -> ReLU -> 輸出self.layer1 = nn.Linear(n_observations, 128) # 輸入層self.layer2 = nn.Linear(128, 128) # 隱藏層self.layer3 = nn.Linear(128, n_actions) # 輸出層(每個動作的 Q 值)def forward(self, x: torch.Tensor) -> torch.Tensor:"""網絡的前向傳播。參數:- x (torch.Tensor):表示狀態(或狀態批次)的輸入張量。返回:- torch.Tensor:表示每個動作 Q 值的輸出張量。"""# 確保輸入是浮點張量if not isinstance(x, torch.Tensor):x = torch.tensor(x, dtype=torch.float32, device=device)elif x.dtype != torch.float32:x = x.to(dtype=torch.float32)# 應用各層并使用 ReLU 激活函數x = F.relu(self.layer1(x))x = F.relu(self.layer2(x))return self.layer3(x) # 輸出層沒有激活函數(原始 Q 值)
定義回放緩存
我們用 collections.deque
實現高效的存儲和 random.sample
進行批量抽樣。一個 namedtuple
有助于組織轉移數據。
# 定義存儲轉移的結構
Transition = namedtuple('Transition',('state', 'action', 'next_state', 'reward', 'done'))# 定義回放緩存類
class ReplayMemory(object):"""存儲轉移并允許抽樣批次"""def __init__(self, capacity: int):"""初始化回放緩存。參數:- capacity (int):最多存儲的轉移數量。"""self.memory = deque([], maxlen=capacity)def push(self, *args):"""保存一個轉移。參數:- *args:轉移的各個元素(狀態、動作、下一個狀態、獎勵、完成標志)。"""self.memory.append(Transition(*args))def sample(self, batch_size: int) -> List[Transition]:"""從內存中隨機抽樣一批轉移。參數:- batch_size (int):要抽樣的轉移數量。返回:- List[Transition]:包含抽樣轉移的列表。"""return random.sample(self.memory, batch_size)def __len__(self) -> int:"""返回當前內存的大小"""return len(self.memory)
動作選擇((\epsilon)-貪婪)
這個函數根據當前狀態和 Q 網絡,使用 (\epsilon)-貪婪策略來選擇動作,用于探索。
# 動作選擇(\(\epsilon\)-貪婪 - 修改為單狀態張量輸入)
def select_action_custom(state: torch.Tensor,policy_net: nn.Module,epsilon_start: float,epsilon_end: float,epsilon_decay: int,n_actions: int) -> Tuple[torch.Tensor, float]:"""使用 \(\epsilon\)-貪婪策略為單個狀態張量選擇動作。參數:- state (torch.Tensor):當前狀態作為張量,形狀為 [state_dim]。- policy_net (nn.Module):用于估計 Q 值的 Q 網絡。- epsilon_start (float):初始探索率 \(\epsilon\)。- epsilon_end (float):衰減后 \(\epsilon\) 的最終值。- epsilon_decay (int):\(\epsilon\) 的衰減率(值越大衰減越慢)。- n_actions (int):可能的動作數量。返回:- Tuple[torch.Tensor, float]:- 選中的動作作為張量,形狀為 [1, 1]。- 衰減后的當前 \(\epsilon\) 值。"""global steps_done_custom # 用于跟蹤已執行的步數的全局計數器sample = random.random() # 生成一個隨機數,用于 \(\epsilon\)-貪婪決策# 根據衰減公式計算當前的 \(\epsilon\) 值epsilon_threshold = epsilon_end + (epsilon_start - epsilon_end) * \math.exp(-1. * steps_done_custom / epsilon_decay)steps_done_custom += 1 # 增加步數計數器if sample > epsilon_threshold:# 利用:選擇 Q 值最高的動作with torch.no_grad():# 為狀態張量添加一個批次維度,使其變為 [1, state_dim]state_batch = state.unsqueeze(0)# 選擇 Q 值最高的動作(輸出形狀:[1, n_actions])action = policy_net(state_batch).max(1)[1].view(1, 1) # 重塑為 [1, 1]else:# 探索:選擇一個隨機動作action = torch.tensor([[random.randrange(n_actions)]], device=device, dtype=torch.long)return action, epsilon_threshold
優化步驟和選擇動作
優化步驟包括從回放緩存中抽樣一個小批量,使用目標網絡計算目標 Q 值,并通過反向傳播更新主 Q 網絡。動作選擇使用 (\epsilon)-貪婪策略。
def select_action_custom(state: torch.Tensor,policy_net: nn.Module,epsilon_start: float,epsilon_end: float,epsilon_decay: int,n_actions: int
) -> Tuple[torch.Tensor, float]:"""使用 \(\epsilon\)-貪婪策略為單個狀態張量選擇動作。參數:- state (torch.Tensor):當前狀態作為張量,形狀為 [state_dim]。- policy_net (nn.Module):用于估計 Q 值的 Q 網絡。- epsilon_start (float):初始探索率 \(\epsilon\)。- epsilon_end (float):衰減后 \(\epsilon\) 的最終值。- epsilon_decay (int):\(\epsilon\) 的衰減率(值越大衰減越慢)。- n_actions (int):可能的動作數量。返回:- Tuple[torch.Tensor, float]:- 選中的動作作為張量,形狀為 [1, 1]。- 衰減后的當前 \(\epsilon\) 值。"""global steps_done_custom # 用于跟蹤已執行的步數的全局計數器# 生成一個隨機數,用于 \(\epsilon\)-貪婪決策sample: float = random.random()# 根據衰減公式計算當前的 \(\epsilon\) 值epsilon_threshold: float = epsilon_end + (epsilon_start - epsilon_end) * \math.exp(-1.0 * steps_done_custom / epsilon_decay)# 增加步數計數器steps_done_custom += 1if sample > epsilon_threshold:# 利用:選擇 Q 值最高的動作with torch.no_grad():# 為狀態張量添加一個批次維度,使其變為 [1, state_dim]state_batch: torch.Tensor = state.unsqueeze(0)# 選擇 Q 值最高的動作(輸出形狀:[1, n_actions])action: torch.Tensor = policy_net(state_batch).max(1)[1].view(1, 1) # 重塑為 [1, 1]else:# 探索:選擇一個隨機動作action = torch.tensor([[random.randrange(n_actions)]], device=device, dtype=torch.long)return action, epsilon_threshold
接下來我們繼續實現優化步驟,這是DQN算法的核心部分,它通過從回放緩存中抽樣一個小批量數據,計算目標Q值,并通過反向傳播更新主Q網絡。
def optimize_model(memory: ReplayMemory,policy_net: nn.Module,target_net: nn.Module,optimizer: optim.Optimizer,batch_size: int,gamma: float,criterion: nn.Module = nn.SmoothL1Loss()) -> Optional[float]:"""對策略網絡執行一步優化。參數:- memory (ReplayMemory):存儲過去轉移的回放緩存。- policy_net (nn.Module):正在優化的主Q網絡。- target_net (nn.Module):用于穩定目標計算的目標Q網絡。- optimizer (optim.Optimizer):用于更新策略網絡的優化器。- batch_size (int):每次優化步驟要抽樣的轉移數量。- gamma (float):未來獎勵的折扣因子。- criterion (nn.Module):使用的損失函數(默認:SmoothL1損失)。返回:- Optional[float]:優化步驟的損失值,如果沒有足夠的樣本則返回None。"""# 確保回放緩存中有足夠的樣本以執行優化if len(memory) < batch_size:return None# 從回放緩存中抽樣一批轉移transitions = memory.sample(batch_size)batch = Transition(*zip(*transitions)) # 將轉移解包為單獨的組件# 標記非終止狀態(不是終端狀態的狀態)non_final_mask = torch.tensor(tuple(map(lambda s: s is not None, batch.next_state)),device=device, dtype=torch.bool)# 將非終止的下一個狀態堆疊成張量if any(non_final_mask): # 檢查是否有任何非終止狀態non_final_next_states = torch.stack([s for s in batch.next_state if s is not None])# 將當前狀態、動作、獎勵和完成標志堆疊成張量state_batch = torch.stack(batch.state)action_batch = torch.cat(batch.action)reward_batch = torch.cat(batch.reward)done_batch = torch.cat(batch.done)# 計算所采取動作的Q(s_t, a)state_action_values = policy_net(state_batch).gather(1, action_batch)# 使用目標網絡計算下一個狀態的V(s_{t+1})next_state_values = torch.zeros(batch_size, device=device)with torch.no_grad():if any(non_final_mask): # 只計算非終止狀態next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0]# 使用貝爾曼方程計算預期的Q值expected_state_action_values = (next_state_values * gamma) + reward_batch# 計算預測和預期Q值之間的損失loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))# 執行反向傳播和優化optimizer.zero_grad() # 清除之前的梯度loss.backward() # 計算梯度torch.nn.utils.clip_grad_value_(policy_net.parameters(), 100) # 限制梯度以防止梯度爆炸optimizer.step() # 更新策略網絡return loss.item() # 返回損失值以便記錄
目標網絡更新
這個函數將主策略網絡的權重復制到目標網絡。我們將使用“硬”更新,每 TARGET_UPDATE
步更新一次。
def update_target_net(policy_net: nn.Module, target_net: nn.Module) -> None:"""將策略網絡的權重復制到目標網絡。參數:- policy_net (nn.Module):主Q網絡,其權重將被復制。- target_net (nn.Module):目標Q網絡,將復制權重。返回:- None"""target_net.load_state_dict(policy_net.state_dict())
運行DQN算法
設置超參數,初始化網絡、優化器和回放緩存,然后運行主訓練循環。
超參數設置
我們需要為自定義網格世界設置環境和超參數。這些參數將定義學習率、折扣因子、批量大小以及其他重要的DQN算法設置。
# 自定義網格世界的超參數
BATCH_SIZE_CUSTOM = 128
GAMMA_CUSTOM = 0.99 # 折扣因子(鼓勵向前看)
EPS_START_CUSTOM = 1.0 # 從完全探索開始
EPS_END_CUSTOM = 0.05 # 探索率最終值為 5%
EPS_DECAY_CUSTOM = 10000 # 較慢的衰減,以滿足可能較大的狀態空間探索需求
TAU_CUSTOM = 0.005 # 用于軟更新的 Tau(備用,這里不使用)
LR_CUSTOM = 5e-4 # 學習率(可能需要調整)
MEMORY_CAPACITY_CUSTOM = 10000
TARGET_UPDATE_FREQ_CUSTOM = 20 # 目標網絡更新頻率
NUM_EPISODES_CUSTOM = 500 # 劇集數量
MAX_STEPS_PER_EPISODE_CUSTOM = 200 # 每個劇集的最大步數(與網格大小相關)
初始化
定義了環境和DQN之后,我們可以初始化策略和目標網絡、優化器和回放緩存。
# 重新實例化自定義 GridEnvironment
custom_env: GridEnvironment = GridEnvironment(rows=10, cols=10)# 獲取動作空間的大小和狀態維度
n_actions_custom: int = custom_env.get_action_space_size() # 可能的動作數量(4)
n_observations_custom: int = custom_env.get_state_dimension() # 狀態空間的維度(2)# 初始化策略網絡(主Q網絡)和目標網絡
policy_net_custom: DQN = DQN(n_observations_custom, n_actions_custom).to(device) # 主Q網絡
target_net_custom: DQN = DQN(n_observations_custom, n_actions_custom).to(device) # 目標Q網絡# 將策略網絡的權重復制到目標網絡,并將其設置為評估模式
target_net_custom.load_state_dict(policy_net_custom.state_dict()) # 同步權重
target_net_custom.eval() # 將目標網絡設置為評估模式# 初始化策略網絡的優化器
optimizer_custom: optim.AdamW = optim.AdamW(policy_net_custom.parameters(), lr=LR_CUSTOM, amsgrad=True)# 初始化回放緩存,指定容量
memory_custom: ReplayMemory = ReplayMemory(MEMORY_CAPACITY_CUSTOM)# 用于繪圖的列表
episode_rewards_custom = []
episode_lengths_custom = []
episode_epsilons_custom = []
episode_losses_custom = []
訓練循環
現在我們已經編寫好了所有代碼,接下來讓我們在自定義網格世界環境中訓練DQN代理。
print("開始在自定義網格世界中訓練DQN...")# 初始化全局計數器,用于 \(\epsilon\) 衰減
steps_done_custom = 0# 訓練循環
for i_episode in range(NUM_EPISODES_CUSTOM):# 重置環境并獲取初始狀態張量state = custom_env.reset()total_reward = 0current_losses = []for t in range(MAX_STEPS_PER_EPISODE_CUSTOM):# 使用 \(\epsilon\)-貪婪策略選擇動作action_tensor, current_epsilon = select_action_custom(state, policy_net_custom, EPS_START_CUSTOM, EPS_END_CUSTOM, EPS_DECAY_CUSTOM, n_actions_custom)action = action_tensor.item()# 在環境中執行動作next_state_tensor, reward, done = custom_env.step(action)total_reward += reward# 為存儲在回放緩存中準備張量reward_tensor = torch.tensor([reward], device=device, dtype=torch.float32)action_tensor_mem = torch.tensor([[action]], device=device, dtype=torch.long)done_tensor = torch.tensor([done], device=device, dtype=torch.bool)# 將轉移存儲在回放緩存中memory_next_state = next_state_tensor if not done else Nonememory_custom.push(state, action_tensor_mem, memory_next_state, reward_tensor, done_tensor)# 轉移到下一個狀態state = next_state_tensor# 對策略網絡執行一步優化loss = optimize_model(memory_custom, policy_net_custom, target_net_custom, optimizer_custom, BATCH_SIZE_CUSTOM, GAMMA_CUSTOM)if loss is not None:current_losses.append(loss)# 如果劇集結束,則跳出循環if done:break# 存儲劇集統計信息episode_rewards_custom.append(total_reward)episode_lengths_custom.append(t + 1)episode_epsilons_custom.append(current_epsilon)episode_losses_custom.append(np.mean(current_losses) if current_losses else 0)# 定期更新目標網絡if i_episode % TARGET_UPDATE_FREQ_CUSTOM == 0:update_target_net(policy_net_custom, target_net_custom)# 每 50 個劇集打印一次進度if (i_episode + 1) % 50 == 0:avg_reward = np.mean(episode_rewards_custom[-50:])avg_length = np.mean(episode_lengths_custom[-50:])avg_loss = np.mean([l for l in episode_losses_custom[-50:] if l > 0])print(f"劇集 {i_episode+1}/{NUM_EPISODES_CUSTOM} | "f"最近 50 個劇集的平均獎勵:{avg_reward:.2f} | "f"平均長度:{avg_length:.2f} | "f"平均損失:{avg_loss:.4f} | "f"\(\epsilon\):{current_epsilon:.3f}")print("自定義網格世界訓練完成。")
開始在自定義網格世界中訓練DQN...
劇集 50/500 | 最近 50 個劇集的平均獎勵:-13.14 | 平均長度:109.86 | 平均損失:0.0330 | \(\epsilon\):0.599
劇集 100/500 | 最近 50 個劇集的平均獎勵:0.68 | 平均長度:60.18 | 平均損失:0.0290 | \(\epsilon\):0.456
劇集 150/500 | 最近 50 個劇集的平均獎勵:0.20 | 平均長度:62.06 | 平均損失:0.0240 | \(\epsilon\):0.348
劇集 200/500 | 最近 50 個劇集的平均獎勵:4.34 | 平均長度:40.32 | 平均損失:0.0115 | \(\epsilon\):0.293
劇集 250/500 | 最近 50 個劇集的平均獎勵:5.62 | 平均長度:32.48 | 平均損失:0.0110 | \(\epsilon\):0.257
劇集 300/500 | 最近 50 個劇集的平均獎勵:5.81 | 平均長度:30.50 | 平均損失:0.0068 | \(\epsilon\):0.228
劇集 350/500 | 最近 50 個劇集的平均獎勵:6.96 | 平均長度:25.32 | 平均損失:0.0144 | \(\epsilon\):0.206
劇集 400/500 | 最近 50 個劇集的平均獎勵:6.56 | 平均長度:25.90 | 平均損失:0.0134 | \(\epsilon\):0.187
劇集 450/500 | 最近 50 個劇集的平均獎勵:6.57 | 平均長度:29.74 | 平均損失:0.0024 | \(\epsilon\):0.168
劇集 500/500 | 最近 50 個劇集的平均獎勵:7.58 | 平均長度:20.84 | 平均損失:0.0010 | \(\epsilon\):0.157
自定義網格世界訓練完成。
可視化學習過程
為自定義網格世界環境繪制結果圖表。
# 為自定義網格世界繪制結果
plt.figure(figsize=(20, 3))# 獎勵
plt.subplot(1, 3, 1)
plt.plot(episode_rewards_custom)
plt.title('DQN 自定義網格:劇集獎勵')
plt.xlabel('劇集')
plt.ylabel('總獎勵')
plt.grid(True)
rewards_ma_custom = np.convolve(episode_rewards_custom, np.ones(50)/50, mode='valid')
if len(rewards_ma_custom) > 0: # 避免繪制空的移動平均值plt.plot(np.arange(len(rewards_ma_custom)) + 49, rewards_ma_custom, label='50-劇集移動平均值', color='orange')
plt.legend()# 長度
plt.subplot(1, 3, 2)
plt.plot(episode_lengths_custom)
plt.title('DQN 自定義網格:劇集長度')
plt.xlabel('劇集')
plt.ylabel('步數')
plt.grid(True)
lengths_ma_custom = np.convolve(episode_lengths_custom, np.ones(50)/50, mode='valid')
if len(lengths_ma_custom) > 0:plt.plot(np.arange(len(lengths_ma_custom)) + 49, lengths_ma_custom, label='50-劇集移動平均值', color='orange')
plt.legend()# \(\epsilon\)
plt.subplot(1, 3, 3)
plt.plot(episode_epsilons_custom)
plt.title('DQN 自定義網格:\(\epsilon\) 衰減')
plt.xlabel('劇集')
plt.ylabel('\(\epsilon\)')
plt.grid(True)plt.tight_layout()
plt.show()
接下來我們繼續分析DQN學習曲線,并且可視化學習到的策略。
分析DQN學習曲線(自定義網格世界)
- 獎勵曲線:獎勵應該隨著時間的推移而增加,可能比CartPole更波動,因為有步進成本,而且可能會暫時被困住。移動平均值應該顯示出向正值學習的趨勢,因為代理更頻繁地到達目標(+10),同時盡量減少步數(-0.1成本)和撞墻(-1)。
- 劇集長度曲線:最初很高,隨著代理學會更直接地到達目標,劇集長度應該會減少。平臺或尖峰可能表明探索導致路徑變長或被困住。收斂到從起點到目標的最小可能步數表明學習良好。
- (\epsilon) 衰減:顯示計劃的探索率隨劇集的減少。
這表明DQN在自定義、手動定義的環境中學習策略,只使用基本庫以及PyTorch進行神經網絡組件。
可視化學習到的策略(可選)
我們可以創建一個策略網格,類似于表格方法,但現在策略是從Q網絡的輸出中得出的。
def plot_dqn_policy_grid(policy_net: nn.Module, env: GridEnvironment, device: torch.device) -> None:"""繪制從DQN得出的貪婪策略。參數:- policy_net (nn.Module):用于得出策略的訓練有素的Q網絡。- env (GridEnvironment):自定義網格環境。- device (torch.device):用于處理張量的設備(CPU/GPU)。返回:- None:顯示策略網格的圖表。"""# 獲取網格環境的維度rows: int = env.rowscols: int = env.cols# 初始化一個空網格來存儲策略符號policy_grid: np.ndarray = np.empty((rows, cols), dtype=str)# 定義每個動作的符號action_symbols: Dict[int, str] = {0: '↑', 1: '↓', 2: '←', 3: '→'}# 創建圖表fig, ax = plt.subplots(figsize=(cols * 0.6, rows * 0.6)) # 根據網格維度調整大小# 遍歷網格中的每個單元格for r in range(rows):for c in range(cols):state_tuple: Tuple[int, int] = (r, c) # 當前狀態作為元組# 如果當前單元格是目標狀態,則標記為'G'if state_tuple == env.goal_state:policy_grid[r, c] = 'G'ax.text(c, r, 'G', ha='center', va='center', color='green', fontsize=12, weight='bold')else:# 將狀態轉換為張量表示state_tensor: torch.Tensor = env._get_state_tensor(state_tuple)# 使用策略網絡確定最佳動作with torch.no_grad():# 為狀態張量添加一個批次維度state_tensor = state_tensor.unsqueeze(0)# 獲取當前狀態的Q值q_values: torch.Tensor = policy_net(state_tensor)# 選擇Q值最高的動作best_action: int = q_values.max(1)[1].item()# 在策略網格中存儲動作符號policy_grid[r, c] = action_symbols[best_action]# 將動作符號添加到圖表中ax.text(c, r, policy_grid[r, c], ha='center', va='center', color='black', fontsize=12)# 設置網格可視化ax.matshow(np.zeros((rows, cols)), cmap='Greys', alpha=0.1) # 背景網格ax.set_xticks(np.arange(-.5, cols, 1), minor=True)ax.set_yticks(np.arange(-.5, rows, 1), minor=True)ax.grid(which='minor', color='black', linestyle='-', linewidth=1) # 次要網格線ax.set_xticks([]) # 移除x軸刻度ax.set_yticks([]) # 移除y軸刻度ax.set_title("DQN 學習到的策略(自定義網格)") # 圖表標題# 顯示圖表plt.show()# 繪制訓練有素的網絡學習到的策略
print("\n繪制從DQN得出的策略:")
plot_dqn_policy_grid(policy_net_custom, custom_env, device)
繪制從DQN得出的策略:
DQN中常見的挑戰及解決方案
挑戰:訓練不穩定/發散
- 解決方案:
- 調整學習率:降低學習率 (
LR
)。 - 增加目標網絡更新頻率:減少目標網絡的更新頻率(增加
TARGET_UPDATE_FREQ
),或者使用軟更新 (TAU
)。 - 梯度裁剪:防止梯度爆炸。
- 增大回放緩存:增加
MEMORY_CAPACITY
。 - 更換優化器/損失函數:嘗試使用 RMSprop 或 Huber 損失。
- 調整學習率:降低學習率 (
挑戰:學習速度慢
- 解決方案:
- 調整超參數:謹慎提高學習率,調整 (\epsilon) 衰減,優化批量大小。
- 調整網絡架構:嘗試不同的層數和神經元數量。
- 使用優先經驗回放:更頻繁地抽樣回放緩存中“重要”的轉移(更復雜的擴展)。
- 使用雙DQN/雙Q網絡:這些擴展可以提高性能和穩定性。
挑戰:Q值過高估計
- 解決方案:實現 雙DQN,在目標計算中將動作選擇和價值估計解耦。
挑戰:回放緩存中的相關性
- 解決方案:確保回放緩存足夠大,并且隨機抽樣。考慮使用優先回放。
總結
深度Q網絡(DQN)算法成功地將Q學習擴展到能夠處理高維狀態空間的領域,通過使用深度神經網絡作為函數近似器。經驗回放和目標網絡等關鍵創新對于在將深度學習與時間差分方法結合時穩定學習過程至關重要。
正如在CartPole環境中所展示的那樣,DQN能夠直接從經驗中學習處理連續狀態空間和離散動作的有效策略。盡管基本的DQN存在一些局限性(例如,過高估計、處理連續動作),但它為許多先進的深度強化學習算法奠定了基礎,如雙DQN、雙Q網絡、Rainbow以及演員-評論家方法,這些算法解決了這些挑戰并能夠處理更復雜的問題。理解DQN是掌握現代強化學習技術的重要一步。