3、基于值函數的深度強化學習算法
1)深度Q網絡(DQN)
核心思想
- 經驗回放(Experience Replay):將智能體的交互經驗(狀態、動作、獎勵、下一狀態)存儲在回放緩沖區中,訓練時隨機采樣以打破數據相關性,提高樣本利用率和模型穩定性。
- 目標網絡(Target Network):使用一個延遲更新的目標網絡計算目標Q值,避免主網絡頻繁更新導致的目標值震蕩,穩定訓練過程。

- rt?:當前時間步?t?獲得的即時獎勵。
- γ:折扣因子(0≤γ≤1),用于平衡即時獎勵與未來獎勵的重要性。
- maxa′?Qtarget?(st+1?,a′;θ?):目標網絡(參數為?θ?)在下一狀態?st+1??下所有可能動作?a′?中Q值的最大值,表示未來收益的估計。

- N:批量大小(mini-batch size),即從經驗回放緩沖區中隨機采樣的樣本數量。
- yi?:第?i?個樣本的目標Q值,由目標網絡計算。
- Qcurrent?(si?,ai?;θ):當前網絡(參數為?θ)對狀態?si??和動作?ai??的Q值預測。
實現代碼
#! /usr/bin/env pythonimport torch
import torch.nn as nn
import time
import random
import numpy as np
import gym
from PIL import Image
import matplotlib.pyplot as plt# DQN深度模型,用來估計Atari環境的Q函數
class DQN(nn.Module):def __init__(self, img_size, num_actions):super().__init__()# 輸入圖像的形狀(c, h, w)self.img_size = img_sizeself.num_actions = num_actions# 對于Atari環境,輸入為(4, 84, 84)self.featnet = nn.Sequential(nn.Conv2d(img_size[0], 32, kernel_size=8, stride=4),nn.ReLU(),nn.Conv2d(32, 64, kernel_size=4, stride=2),nn.ReLU(),nn.Conv2d(64, 64, kernel_size=3, stride=1),nn.ReLU())# 價值網絡,根據特征輸出每個動作的價值self.vnet = nn.Sequential(nn.Linear(self._feat_size(), 512),nn.ReLU(),nn.Linear(512, self.num_actions))def _feat_size(self):#在其管理的代碼塊內,不進行梯度計算with torch.no_grad():x = torch.randn(1, *self.img_size) #x是一個形狀為(1, c, h, w)的隨機張量,其中1表示批量大小為 1,c、h、w分別是圖像的通道數、高度和寬度。x = self.featnet(x).view(1, -1) #view方法用于改變張量的形狀,1表示第一維的大小為 1,-1表示讓 PyTorch 根據張量的總元素數自動計算該維度的大小return x.size(1) #x.size(1)返回張量x第二維的大小def forward(self, x): bs = x.size(0)# 提取特征feat = self.featnet(x).view(bs, -1)# 獲取所有可能動作的價值values = self.vnet(feat)return valuesdef act(self, x, epsilon=0.0):# ε-貪心算法if random.random() > epsilon:with torch.no_grad():values = self.forward(x)return values.argmax(-1).squeeze().item()else:return random.randint(0, self.num_actions-1)from collections import deque
class ExpReplayBuffer(object):def __init__(self, buffer_size):super().__init__()self.buffer = deque(maxlen=buffer_size)def push(self, state, action, reward, next_state, done):self.buffer.append((state, action, reward, next_state, done))def sample(self, bs): #zip(*...) 是一個解包和重新打包的操作。* 是解包運算符,它將 random.sample 返回的列表中的每個元組解包,然后 zip 函數將這些解包后的元素重新組合成新的元組state, action, reward, next_state, done = \zip(*random.sample(self.buffer, bs))return np.stack(state, 0), np.stack(action, 0), \np.stack(reward, 0), np.stack(next_state, 0), \np.stack(done, 0).astype(np.float32)def __len__(self):return len(self.buffer)class EnvWrapper(object):def __init__(self, env, num_frames):super().__init__()self.env_ = envself.num_frames = num_framesself.frame = deque(maxlen=num_frames)def _preprocess(self, img):# 預處理數據img = Image.fromarray(img) # 假設 img 是一個 NumPy 數組img = img.convert("L") # 轉換為灰度圖像img = img.resize((84, 84)) # 調整大小return np.array(img)/256.0 # 歸一化到 [0, 1]def reset(self):obs = self.env_.reset() # 重置環境if isinstance(obs, tuple):obs = obs[0] # 提取元組中的第一個元素作為觀察值self.frame = [] # 清空之前的幀for _ in range(self.num_frames):processed_frame = self._preprocess(obs) # 對觀察值進行預處理self.frame.append(processed_frame)return np.stack(self.frame, axis=0) # 返回堆疊的幀def step(self, action):obs, reward, _, done, _ = self.env_.step(action)processed_frame = self._preprocess(obs)self.frame.pop(0) # 移除最早的幀self.frame.append(processed_frame) # 添加最新的幀return np.stack(self.frame, 0), np.sign(reward), done, {}@propertydef env(self):return self.env_def train(buffer, model, optimizer):# 對經驗回放的數據進行采樣state, action, reward, next_state, done = buffer.sample(BATCH_SIZE)state = torch.tensor(state, dtype=torch.float32) #.cudareward = torch.tensor(reward, dtype=torch.float32) #.cudaaction = torch.tensor(action, dtype=torch.long) #.cudanext_state = torch.tensor(next_state, dtype=torch.float32) #.cudadone = torch.tensor(done, dtype=torch.float32)# 下一步狀態的預測with torch.no_grad():target, _ = model(next_state).max(dim=-1) #獲取最后一個維度上的最大值和最大值索引target = reward + (1-done)*GAMMA*target# 當前狀態的預測#model(state) 調用 DQN 類的 forward 方法對輸入的狀態進行前向傳播,輸出每個樣本在所有可能動作上的價值predict = model(state).gather(1, action.unsqueeze(-1)).squeeze()#unsqueeze(-1) 方法在張量的最后一個維度上增加一個維度,將 action 的形狀從 (batch_size,) 變為 (batch_size, 1)。這樣做是為了滿足 gather 方法的輸入要求。#squeeze 方法用于移除張量中維度大小為 1 的維度。loss = (predict - target).pow(2).mean()# 損失函數的優化optimizer.zero_grad()loss.backward()optimizer.step()return loss.item()GAMMA = 0.99
EPSILON_MIN = 0.01
EPSILON_MAX = 1.00
NFRAMES = 4
BATCH_SIZE = 32
NSTEPS = 400000
NBUFFER = 10000
env = gym.make('PongDeterministic-v4', render_mode='human')
env = EnvWrapper(env, NFRAMES)state = env.reset()
buffer = ExpReplayBuffer(NBUFFER)
dqn = DQN((4, 84, 84), env.env.action_space.n)
# dqn.cuda()
optimizer = torch.optim.Adam(dqn.parameters(), 1e-4)all_rewards = []
all_losses = []
episode_reward = 0
all_steps1 = []
all_steps2 = []eps = lambda t: EPSILON_MIN + (EPSILON_MAX - EPSILON_MIN)*np.exp(-t/30000) #E指數衰減的方式,確保隨著訓練的進行,探索的比例逐漸減少time_start = time.time()
for nstep in range(NSTEPS):print(nstep)p = eps(nstep)state_t = torch.tensor(state, dtype=torch.float32).unsqueeze(0) #.cudaaction = dqn.act(state_t, p)next_state, reward, done, _ = env.step(action)buffer.push(state, action, reward, next_state, done)state = next_stateepisode_reward += rewardif done:state = env.reset()all_rewards.append(episode_reward)all_steps1.append(nstep)episode_reward = 0if len(buffer) >= 1000:loss = train(buffer, dqn, optimizer)all_losses.append(loss)all_steps2.append(nstep)time_end = time.time()
print("DQN cost time:" + str(time_end - time_start))# 繪制獎勵圖
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.plot(all_steps1, all_rewards)
plt.title('Episode Rewards')
plt.xlabel('Step')
plt.ylabel('Reward')# 繪制損失訓練圖
plt.subplot(1, 2, 2)
plt.plot(all_steps2, all_losses)
plt.title('Training Loss')
plt.xlabel('Step')
plt.ylabel('Loss')plt.tight_layout()
plt.show()
2)雙網絡Q學習算法(Double DQN)
核心思想
Double DQN是對DQN的一種改進,旨在解決過估計的問題。在 DQN 的基礎上,引入了兩個 Q 網絡,一個用于選擇動作(在線網絡),另一個用于評估動作的價值(目標網絡)。在更新 Q 值時,使用在線網絡選擇動作,然后用目標網絡來計算目標 Q 值,這樣可以減少 Q 值的過估計問題,提高算法的穩定性和準確性。
公式:
計算Q值公式:
計算損失函數:
優點:有效減少了 Q 值的過估計,提高了算法的性能和穩定性;在一些復雜環境中表現優于傳統的 DQN。
缺點:增加了模型的復雜度和計算量,因為需要維護兩個 Q 網絡;對超參數的調整仍然比較敏感。
實現代碼
#! /usr/bin/env pythonimport torch
import torch.nn as nn
import time
import random
import numpy as np
import gym
from PIL import Image
import matplotlib.pyplot as plt# DQN深度模型,用來估計Atari環境的Q函數
class DQN(nn.Module):def __init__(self, img_size, num_actions):super().__init__()# 輸入圖像的形狀(c, h, w)self.img_size = img_sizeself.num_actions = num_actions# 對于Atari環境,輸入為(4, 84, 84)self.featnet = nn.Sequential(nn.Conv2d(img_size[0], 32, kernel_size=8, stride=4),nn.ReLU(),nn.Conv2d(32, 64, kernel_size=4, stride=2),nn.ReLU(),nn.Conv2d(64, 64, kernel_size=3, stride=1),nn.ReLU())# 值網絡,根據特征輸出每個動作的價值self.vnet = nn.Sequential(nn.Linear(self._feat_size(), 512),nn.ReLU(),nn.Linear(512, self.num_actions))def _feat_size(self):with torch.no_grad():x = torch.randn(1, *self.img_size)x = self.featnet(x).view(1, -1)return x.size(1)def forward(self, x): bs = x.size(0)# 提取特征feat = self.featnet(x).view(bs, -1)# 獲取所有可能動作的價值values = self.vnet(feat)return valuesdef act(self, x, epsilon=0.0):# ε-貪心算法if random.random() > epsilon:with torch.no_grad():values = self.forward(x)return values.argmax(-1).squeeze().item()else:return random.randint(0, self.num_actions-1)from collections import deque
class ExpReplayBuffer(object):def __init__(self, buffer_size):super().__init__()self.buffer = deque(maxlen=buffer_size)def push(self, state, action, reward, next_state, done):self.buffer.append((state, action, reward, next_state, done))def sample(self, bs):state, action, reward, next_state, done = \zip(*random.sample(self.buffer, bs))return np.stack(state, 0), np.stack(action, 0), \np.stack(reward, 0), np.stack(next_state, 0), \np.stack(done, 0).astype(np.float32)def __len__(self):return len(self.buffer)class EnvWrapper(object):def __init__(self, env, num_frames):super().__init__()self.env_ = envself.num_frames = num_framesself.frame = deque(maxlen=num_frames)def _preprocess(self, img):# 預處理數據img = Image.fromarray(img)img = img.convert("L")img = img.resize((84, 84))return np.array(img)/256.0def reset(self):obs = self.env_.reset() # 重置環境if isinstance(obs, tuple):obs = obs[0] # 提取元組中的第一個元素作為觀察值self.frame = [] # 清空之前的幀for _ in range(self.num_frames):processed_frame = self._preprocess(obs) # 對觀察值進行預處理self.frame.append(processed_frame)return np.stack(self.frame, axis=0) # 返回堆疊的幀def step(self, action):obs, reward, _, done, _ = self.env_.step(action)processed_frame = self._preprocess(obs)self.frame.pop(0) # 移除最早的幀self.frame.append(processed_frame) # 添加最新的幀return np.stack(self.frame, 0), np.sign(reward), done, {}@propertydef env(self):return self.env_def train(buffer, model1, model2, optimizer):# 對經驗回放的數據進行采樣state, action, reward, next_state, done = buffer.sample(BATCH_SIZE)state = torch.tensor(state, dtype=torch.float32)reward = torch.tensor(reward, dtype=torch.float32)action = torch.tensor(action, dtype=torch.long)next_state = torch.tensor(next_state, dtype=torch.float32)done = torch.tensor(done, dtype=torch.float32)with torch.no_grad():# 用Q1計算最大價值的動作next_action = model1(next_state).argmax(-1) #獲取最后一個維度上的最大值索引# 用Q2計算對應的最大價值target = model2(next_state)\.gather(1, next_action.unsqueeze(-1)).squeeze() #.squeeze():移除多余的維度,使 target 成為一個一維張量。target = reward + (1-done)*GAMMA*target# 當前狀態的預測predict = model1(state).gather(1, action.unsqueeze(-1)).squeeze()loss = (predict - target).pow(2).mean()# 損失函數的優化optimizer.zero_grad() #確保梯度從零開始計算。loss.backward() #計算當前批次數據的梯度。optimizer.step() #使用計算出的梯度更新模型參數,pytorch框架自帶,PyTorch 會自動根據優化器的類型(如 SGD、Adam 等)和配置的參數(如學習率)來更新模型參數return loss.item() #提供當前批次的損失值,用于監控和評估模型性能GAMMA = 0.99
EPSILON_MIN = 0.01
EPSILON_MAX = 1.00
NFRAMES = 4
BATCH_SIZE = 32
NSTEPS = 400000
NBUFFER = 10000
env = gym.make('PongDeterministic-v4')
env = EnvWrapper(env, NFRAMES)state = env.reset()
buffer = ExpReplayBuffer(NBUFFER)
# 構造兩個相同的神經網絡
dqn1 = DQN((4, 84, 84), env.env.action_space.n)
dqn2 = DQN((4, 84, 84), env.env.action_space.n)
dqn2.load_state_dict(dqn1.state_dict())
# dqn1.cuda()
# dqn2.cuda()
optimizer = torch.optim.Adam(dqn1.parameters(), 1e-4)all_rewards = []
all_losses = []
episode_reward = 0
all_steps1 = []
all_steps2 = []eps = lambda t: EPSILON_MIN + (EPSILON_MAX - EPSILON_MIN)*np.exp(-t/30000)time_start = time.time()
for nstep in range(NSTEPS):print(nstep)p = eps(nstep)state_t = torch.tensor(state, dtype=torch.float32).unsqueeze(0)action = dqn1.act(state_t, p)next_state, reward, done, _ = env.step(action)buffer.push(state, action, reward, next_state, done)state = next_stateepisode_reward += rewardif done:state = env.reset()all_rewards.append(episode_reward)episode_reward = 0all_steps1.append(nstep)if len(buffer) >= 1000:loss = train(buffer, dqn1, dqn2, optimizer)all_losses.append(loss)all_steps2.append(nstep)# 更新Q2參數if (nstep + 1) % 100 == 0:dqn2.load_state_dict(dqn1.state_dict())time_end = time.time()
print("double DQN cost time:" + str(time_end - time_start))# 繪制獎勵圖
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.plot(all_steps1, all_rewards)
plt.title('Episode Rewards')
plt.xlabel('Step')
plt.ylabel('Reward')# 繪制損失訓練圖
plt.subplot(1, 2, 2)
plt.plot(all_steps2, all_losses)
plt.title('Training Loss')
plt.xlabel('Step')
plt.ylabel('Loss')plt.tight_layout()
plt.show()
3)優先經驗回放(Prioritized Experience Replay)
核心思想
- 優先級計算:基于TD誤差的絕對值或排名分配優先級。
- 采樣策略:采用比例優先級或排名優先級,結合重要性采樣(Importance Sampling)糾正偏差。

- rt?:當前時間步的即時獎勵。
- γ:折扣因子(0≤γ≤1),平衡即時與未來收益。
- Qtarget?(st+1?,a′;θ?):目標網絡對下一狀態?st+1??和動作?a′?的Q值預測。
- Qcurrent?(st?,at?;θ):當前網絡對當前狀態?st??和動作?at??的Q值預測。

- ?:極小正數(如?10?6),避免優先級為0。
- 特點:直接關聯TD-error,但受噪聲影響較大(異常值可能主導優先級)。

- rank(i):樣本按?∣δi?∣?排序后的序號(如TD-error最大的樣本排名為1)。
- 特點:對異常值不敏感,魯棒性更強,但需額外排序操作。

- α:控制優先回放強度的超參數(α≥0):
- α=0:退化為均勻采樣;
- α=1:完全按優先級采樣。

- N:經驗回放池容量。
- β:控制偏差補償強度的超參數(通常從0逐漸增至1):
- 訓練初期:β=0.4(弱補償);
- 訓練后期:β=1.0(強補償)。
- 歸一化:為穩定訓練,需將權重歸一化:

- yi?:目標Q值(如Double DQN公式計算)。
- 作用:通過權重?wi??平衡高優先級樣本的過度學習
實現代碼
#! /usr/bin/env pythonimport torch
import torch.nn as nn
import torch.nn.functional as F
import random
import numpy as np
import gym
from PIL import Image# DQN深度模型,用來估計Atari環境的Q函數
class DQN(nn.Module):def __init__(self, img_size, num_actions):super().__init__()# 輸入圖像的形狀(c, h, w)self.img_size = img_sizeself.num_actions = num_actions# 對于Atari環境,輸入為(4, 84, 84)self.featnet = nn.Sequential(nn.Conv2d(img_size[0], 32, kernel_size=8, stride=4),nn.ReLU(),nn.Conv2d(32, 64, kernel_size=4, stride=2),nn.ReLU(),nn.Conv2d(64, 64, kernel_size=3, stride=1),nn.ReLU())# 值網絡,根據特征輸出每個動作的價值self.vnet = nn.Sequential(nn.Linear(self._feat_size(), 512),nn.ReLU(),nn.Linear(512, self.num_actions))def _feat_size(self):with torch.no_grad():x = torch.randn(1, *self.img_size)x = self.featnet(x).view(1, -1)return x.size(1)def forward(self, x): bs = x.size(0)# 提取特征feat = self.featnet(x).view(bs, -1)# 獲取所有可能動作的價值values = self.vnet(feat)return valuesdef act(self, x, epsilon=0.0):# ε-貪心算法if random.random() > epsilon:with torch.no_grad():values = self.forward(x)return values.argmax(-1).squeeze().item()else:return random.randint(0, self.num_actions-1)from collections import deque
from heapq import heappush, heappushpop, heapify, nlargest
from operator import itemgetterclass Sample(tuple):def __lt__(self, x):return self[0] < x[0] #如果當前實例的第一個元素小于 x 的第一個元素,則返回 True;否則,返回 Falseclass PrioritizedExpReplayBuffer(object):def __init__(self, buffer_size, alpha):super().__init__()self.alpha = alphaself.buffer_size = buffer_sizeself.buffer = []def heapify(self):heapify(self.buffer)def push(self, state, action, reward, next_state, done):# 設置樣本的初始時序誤差,如果緩沖區為空,則設置初始時序誤差為 1.0;否則,獲取當前緩沖區中最大的時序誤差作為新樣本的初始時序誤差。td = 1.0 if not self.buffer else \nlargest(1, self.buffer, key=itemgetter(0))[0][0]# 向優先隊列插入樣本if len(self.buffer) < self.buffer_size:heappush(self.buffer, \Sample((td, state, action, reward, next_state, done)))else:heappushpop(self.buffer, \Sample((td, state, action, reward, next_state, done)))# 設置樣本的時序誤差,更新存儲在 self.buffer 中某些樣本的TD(Temporal Difference,時間差分)值def set_td_value(self, index, value):for idx_s, idx_t in enumerate(index): #遍歷index列表,同時獲取每個元素的索引(idx_s)和值(idx_t)。這里idx_t表示self.buffer中需要更新的樣本的索引,而idx_s則是對應的新TD值在value列表中的位置。self.buffer[idx_t] = Sample((value[idx_s], *self.buffer[idx_t][1:])) #更新self.buffer中指定索引(idx_t)處的樣本。def sample(self, bs, beta=1.0):# 計算權重并且歸一化with torch.no_grad():weights = torch.tensor([val[0] for val in self.buffer])weights = weights.abs().pow(self.alpha)weights = weights/weights.sum()prob = weights.cpu().numpy()weights = (len(weights)*weights).pow(-beta)weights = weights/weights.max()weights = weights.cpu().numpy()index = random.choices(range(len(weights)), weights=prob, k=bs)#k=bs: 指定從 range(len(weights)) 中隨機抽取 bs 個元素。# 根據index返回訓練樣本_, state, action, reward, next_state, done = \zip(*[self.buffer[i] for i in index]) #使用 zip 將這些子列表“按列”組合在一起weights = [weights[i] for i in index]return np.stack(weights, 0).astype(np.float32), index, \np.stack(state, 0), np.stack(action, 0), \np.stack(reward, 0), np.stack(next_state, 0), \np.stack(done, 0).astype(np.float32)def __len__(self):return len(self.buffer)class EnvWrapper(object):def __init__(self, env, num_frames):super().__init__()self.env_ = envself.num_frames = num_framesself.frame = deque(maxlen=num_frames)def _preprocess(self, img):# 預處理數據img = Image.fromarray(img)img = img.convert("L")img = img.resize((84, 84))return np.array(img)/256.0def reset(self):obs = self.env_.reset() # 重置環境if isinstance(obs, tuple):obs = obs[0] # 提取元組中的第一個元素作為觀察值self.frame = [] # 清空之前的幀for _ in range(self.num_frames):processed_frame = self._preprocess(obs) # 對觀察值進行預處理self.frame.append(processed_frame)return np.stack(self.frame, axis=0) # 返回堆疊的幀def step(self, action):obs, reward, _, done, _ = self.env_.step(action)# self.frame.append(self._preprocess(obs))processed_frame = self._preprocess(obs)self.frame.pop(0) # 移除最早的幀self.frame.append(processed_frame) # 添加最新的幀return np.stack(self.frame, 0), np.sign(reward), done, {}@propertydef env(self):return self.env_def train(buffer, model1, model2, optimizer):# 對經驗回放的數據進行采樣weights, index, state, action, reward, next_state, done = buffer.sample(BATCH_SIZE, BETA)state = torch.tensor(state, dtype=torch.float32)reward = torch.tensor(reward, dtype=torch.float32)action = torch.tensor(action, dtype=torch.long)next_state = torch.tensor(next_state, dtype=torch.float32)done = torch.tensor(done, dtype=torch.float32)weights = torch.tensor(weights, dtype=torch.float32)# 下一步狀態的預測with torch.no_grad():# 用Q1計算最大價值的動作next_action = model1(next_state).argmax(-1)# 用Q2計算對應的最大價值target = model2(next_state)\.gather(1, next_action.unsqueeze(-1)).squeeze()target = reward + (1-done)*GAMMA*target# 當前狀態的預測predict = model1(state).gather(1, action.unsqueeze(-1)).squeeze()# 計算時序差分誤差with torch.no_grad():td = (predict - target).squeeze().abs().cpu().numpy() + 1e-6#調整誤差buffer.set_td_value(index, td)loss = (weights*(predict - target).pow(2)).mean()# 損失函數的優化optimizer.zero_grad()loss.backward()optimizer.step()return loss.item()GAMMA = 0.99
EPSILON_MIN = 0.01
EPSILON_MAX = 1.00
NFRAMES = 4
BATCH_SIZE = 32
NSTEPS = 4000000
NBUFFER = 20000
ALPHA = 0.4
BETA = 0.6
env = gym.make('PongDeterministic-v4')
env = EnvWrapper(env, NFRAMES)state = env.reset()
buffer = PrioritizedExpReplayBuffer(NBUFFER, ALPHA)
# 構造兩個相同的神經網絡
dqn1 = DQN((4, 84, 84), env.env.action_space.n)
dqn2 = DQN((4, 84, 84), env.env.action_space.n)
dqn2.load_state_dict(dqn1.state_dict())
# dqn1.cuda()
# dqn2.cuda()
optimizer = torch.optim.Adam(dqn1.parameters(), 1e-4)all_rewards = []
all_losses = []
episode_reward = 0eps = lambda t: EPSILON_MIN + (EPSILON_MAX - EPSILON_MIN)*np.exp(-t/30000)for nstep in range(NSTEPS):p = eps(nstep)state_t = torch.tensor(state, dtype=torch.float32).unsqueeze(0)action = dqn1.act(state_t, p)next_state, reward, done, _ = env.step(action)buffer.push(state, action, reward, next_state, done)state = next_stateepisode_reward += rewardif done:state = env.reset()all_rewards.append(episode_reward)episode_reward = 0if len(buffer) >= 10000:loss = train(buffer, dqn1, dqn2, optimizer)# 更新Q2參數if (nstep + 1) % 1000 == 0:dqn2.load_state_dict(dqn1.state_dict())# 重建二叉堆if (nstep + 1) % 100000 == 0:buffer.heapify()
4)競爭DQN算法(Dueling DQN)
核心思想

- V(s;θv?):狀態價值函數,評估狀態?s?的整體好壞(與動作無關)。
- A(s,a;θa?):優勢函數,評估動作?a?相對于狀態?s?下其他動作的相對優勢。
- ∣A∣1?∑a′?A(s,a′;θa?):優勢函數的均值,用于唯一化分解(消除冗余,確保?V(s)?和?A(s,a)?可被唯一確定)。

- 均值形式:更穩定,避免優勢函數取值過大導致數值問題。
- 最大值形式:可能增強最優動作的突出性,但訓練更敏感。
實現代碼
#! /usr/bin/env pythonimport torch
import torch.nn as nn
import torch.nn.functional as F
import random
import numpy as np
import gym
from PIL import Image# Duel DQN深度模型,用來估計Atari環境的Q函數
class DDQN(nn.Module):def __init__(self, img_size, num_actions):super().__init__()# 輸入圖像的形狀(c, h, w)self.img_size = img_sizeself.num_actions = num_actions# 對于Atari環境,輸入為(4, 84, 84)self.featnet = nn.Sequential(nn.Conv2d(img_size[0], 32, kernel_size=8, stride=4),nn.ReLU(),nn.Conv2d(32, 64, kernel_size=4, stride=2),nn.ReLU(),nn.Conv2d(64, 64, kernel_size=3, stride=1),nn.ReLU())# 優勢函數網絡,根據特征輸出每個動作的價值self.adv_net = nn.Sequential(nn.Linear(self._feat_size(), 512),nn.ReLU(),nn.Linear(512, self.num_actions))# 價值函數網絡,根據特征輸出當前的狀態的價值self.val_net = nn.Sequential(nn.Linear(self._feat_size(), 512),nn.ReLU(),nn.Linear(512, 1))def _feat_size(self):with torch.no_grad():x = torch.randn(1, *self.img_size)x = self.featnet(x).view(1, -1)return x.size(1)def forward(self, x): bs = x.size(0)# 提取特征feat = self.featnet(x).view(bs, -1)# 獲取所有可能動作的價值values = self.val_net(feat) + self.adv_net(feat) - \self.adv_net(feat).mean(-1, keepdim=True)return valuesdef act(self, x, epsilon=0.0):# ε-貪心算法if random.random() > epsilon:with torch.no_grad():values = self.forward(x)return values.argmax(-1).squeeze().item()else:return random.randint(0, self.num_actions-1)from collections import deque
class ExpReplayBuffer(object):def __init__(self, buffer_size):super().__init__()self.buffer = deque(maxlen=buffer_size)def push(self, state, action, reward, next_state, done):self.buffer.append((state, action, reward, next_state, done))def sample(self, bs):state, action, reward, next_state, done = \zip(*random.sample(self.buffer, bs))return np.stack(state, 0), np.stack(action, 0), \np.stack(reward, 0), np.stack(next_state, 0), \np.stack(done, 0).astype(np.float32)def __len__(self):return len(self.buffer)class EnvWrapper(object):def __init__(self, env, num_frames):super().__init__()self.env_ = envself.num_frames = num_framesself.frame = deque(maxlen=num_frames)def _preprocess(self, img):# 預處理數據img = Image.fromarray(img)img = img.convert("L")img = img.resize((84, 84))return np.array(img)/256.0def reset(self):obs = self.env_.reset() # 重置環境if isinstance(obs, tuple):obs = obs[0] # 提取元組中的第一個元素作為觀察值self.frame = [] # 清空之前的幀for _ in range(self.num_frames):processed_frame = self._preprocess(obs) # 對觀察值進行預處理self.frame.append(processed_frame)return np.stack(self.frame, axis=0) # 返回堆疊的幀def step(self, action):obs, reward, _, done, _ = self.env_.step(action)processed_frame = self._preprocess(obs)self.frame.pop(0) # 移除最早的幀self.frame.append(processed_frame) # 添加最新的幀return np.stack(self.frame, 0), np.sign(reward), done, {}@propertydef env(self):return self.env_def train(buffer, model1, model2, optimizer):# 對經驗回放的數據進行采樣state, action, reward, next_state, done = buffer.sample(BATCH_SIZE)state = torch.tensor(state, dtype=torch.float32) #.cuda()reward = torch.tensor(reward, dtype=torch.float32) #.cuda()action = torch.tensor(action, dtype=torch.long) #.cuda()next_state = torch.tensor(next_state, dtype=torch.float32) #.cuda()done = torch.tensor(done, dtype=torch.float32) #.cuda()# 下一步狀態的預測,直接使用Q2的結果with torch.no_grad():target, _ = model2(next_state).max(-1)target = reward + (1-done)*GAMMA*target# 當前狀態的預測predict = model1(state).gather(1, action.unsqueeze(-1)).squeeze()loss = (predict - target).pow(2).mean()# 損失函數的優化optimizer.zero_grad()loss.backward()optimizer.step()return loss.item()GAMMA = 0.99
EPSILON_MIN = 0.01
EPSILON_MAX = 1.00
NFRAMES = 4
BATCH_SIZE = 32
NSTEPS = 4000000
NBUFFER = 100000
env = gym.make('PongDeterministic-v4')
env = EnvWrapper(env, NFRAMES)state = env.reset()
buffer = ExpReplayBuffer(NBUFFER)
dqn1 = DDQN((4, 84, 84), env.env.action_space.n)
dqn2 = DDQN((4, 84, 84), env.env.action_space.n)
dqn2.load_state_dict(dqn1.state_dict())
optimizer = torch.optim.Adam(dqn1.parameters(), 1e-4)all_rewards = []
all_losses = []
episode_reward = 0eps = lambda t: EPSILON_MIN + (EPSILON_MAX - EPSILON_MIN)*np.exp(-t/30000)for nstep in range(NSTEPS):p = eps(nstep)state_t = torch.tensor(state, dtype=torch.float32).unsqueeze(0) #.cuda()action = dqn1.act(state_t, p)next_state, reward, done, _ = env.step(action)buffer.push(state, action, reward, next_state, done)state = next_stateepisode_reward += rewardif done:state = env.reset()all_rewards.append(episode_reward)episode_reward = 0if len(buffer) >= 10000:loss = train(buffer, dqn1, dqn2, optimizer)# 更新Q2參數if (nstep + 1) % 1000 == 0:dqn2.load_state_dict(dqn1.state_dict())
5)分布形式的DQN算法(Distribution DQN)
核心思想
傳統的 DQN 估計的是 Q 值的期望,而 Distribution DQN 則直接對 Q 值的概率分布進行建模。它將 Q 值表示為一個離散的概率分布,通過學習這個分布來估計不同獎勵水平的概率。這樣可以更全面地描述智能體對未來獎勵的不確定性,從而做出更穩健的決策。
公式:
C51算法是分布形式的DQN算法的典型代表,其核心思想是將動作價值函數建模為離散分布而非單一期望值,從而更精確地刻畫未來回報的不確定性。在 C51 中,將 Q 值的范圍離散化為?N?個原子(atoms),每個原子代表一個可能的 Q 值,同時為每個原子分配一個概率,這些概率構成了 Q 值的概率分布。設?(V_min)?和?(V_max)?分別是 Q 值的最小和最大值,將區間?(V_min, V_max])?均勻劃分為?N?個原子,第?i?個原子的值為:
目標分布的計算
在 C51 中,需要計算目標分布。假設當前的樣本為?(s, a, r, s'),其中?s?是當前狀態,a?是采取的動作,r?是獲得的獎勵,s'?是下一個狀態。
1. 選擇下一個狀態的最優動作
首先,根據當前網絡估計的 Q 值分布,選擇下一個狀態s'的最優動作?a'。Q 值的期望可以通過分布的加權和來計算:
最優動作?a'?為:
2. 計算目標分布的支持點
根據貝爾曼方程,目標分布的支持點為:
其中,γ是折扣因子。
損失函數
C51 算法使用交叉熵損失函數來最小化預測分布和目標分布之間的差異。對于樣本?(s, a, r, s'),損失函數為:
在實際訓練中,會從經驗回放緩沖區中隨機抽取一批樣本,計算這批樣本的平均損失,然后使用梯度下降法更新網絡參數。
優點:能夠更好地處理獎勵的不確定性,在一些具有隨機獎勵的環境中表現出色;提供了更豐富的信息,有助于智能體理解環境的動態性。
缺點:計算復雜度較高,因為需要處理概率分布而不是簡單的數值;對數據量的要求較大,需要更多的樣本才能準確估計 Q 值分布。
實現代碼
#! /usr/bin/env pythonimport torch
import torch.nn as nn
import torch.nn.functional as F
import random
import numpy as np
import gym
from PIL import Imageclass CDQN(nn.Module):def __init__(self, img_size, num_actions, vmin, vmax, num_cats):super().__init__()# 輸入圖像的形狀(c, h, w)self.img_size = img_sizeself.num_actions = num_actionsself.num_cats = num_cats #在vmin到vmax的價值函數的范圍內劃分的中間點的數目self.vmax = vmax #模型能夠表示的最大價值函數的值self.vmin = vmin #模型能夠表示的最小價值函數的值# 計算從vmin到vmax之間的一系列離散的價值self.register_buffer( #將離散值與pytorch模型綁定,命名為vrange"vrange",torch.linspace(self.vmin, self.vmax, num_cats).view(1, 1, -1) #計算離散的值) #在 self.vmin 和 self.vmax 之間均勻生成 num_cats 個點# 計算兩個價值的差值self.register_buffer("dv",torch.tensor((vmax-vmin)/(num_cats-1)))# 對于Atari環境,輸入為(4, 84, 84)self.featnet = nn.Sequential(nn.Conv2d(img_size[0], 32, kernel_size=8, stride=4),nn.ReLU(),nn.Conv2d(32, 64, kernel_size=4, stride=2),nn.ReLU(),nn.Conv2d(64, 64, kernel_size=3, stride=1),nn.ReLU())self.category_net = nn.Sequential(nn.Linear(self._feat_size(), 512),nn.ReLU(),nn.Linear(512, self.num_actions*self.num_cats),)def _feat_size(self):with torch.no_grad():x = torch.randn(1, *self.img_size)x = self.featnet(x).view(1, -1)return x.size(1)def forward(self, x): bs = x.size(0)# 提取特征feat = self.featnet(x).view(bs, -1)# 獲取所有可能動作的價值概率分布logits = self.category_net(feat)\.view(-1, self.num_actions, self.num_cats)return logitsdef qval(self, x):probs = self.forward(x).softmax(-1)return (probs*self.vrange).sum(-1)def act(self, x, epsilon=0.0):# ε-貪心算法if random.random() > epsilon:with torch.no_grad():qval = self.qval(x)return qval.argmax(-1).squeeze().item()else:return random.randint(0, self.num_actions-1)from collections import deque
class ExpReplayBuffer(object):def __init__(self, buffer_size):super().__init__()self.buffer = deque(maxlen=buffer_size)def push(self, state, action, reward, next_state, done):self.buffer.append((state, action, reward, next_state, done))def sample(self, bs):state, action, reward, next_state, done = \zip(*random.sample(self.buffer, bs))return np.stack(state, 0), np.stack(action, 0), \np.stack(reward, 0), np.stack(next_state, 0), \np.stack(done, 0).astype(np.float32)def __len__(self):return len(self.buffer)class EnvWrapper(object):def __init__(self, env, num_frames):super().__init__()self.env_ = envself.num_frames = num_framesself.frame = deque(maxlen=num_frames)def _preprocess(self, img):# 預處理數據img = Image.fromarray(img)img = img.convert("L")img = img.resize((84, 84))return np.array(img)/256.0def reset(self):obs = self.env_.reset() # 重置環境if isinstance(obs, tuple):obs = obs[0] # 提取元組中的第一個元素作為觀察值self.frame = [] # 清空之前的幀for _ in range(self.num_frames):processed_frame = self._preprocess(obs) # 對觀察值進行預處理self.frame.append(processed_frame)return np.stack(self.frame, axis=0) # 返回堆疊的幀def step(self, action):obs, reward, _, done, _ = self.env_.step(action)processed_frame = self._preprocess(obs)self.frame.pop(0) # 移除最早的幀self.frame.append(processed_frame) # 添加最新的幀return np.stack(self.frame, 0), np.sign(reward), done, {}@propertydef env(self):return self.env_def train(buffer, model, optimizer):# 對經驗回放的數據進行采樣state, action, reward, next_state, done = buffer.sample(BATCH_SIZE)state = torch.tensor(state, dtype=torch.float32)reward = torch.tensor(reward, dtype=torch.float32)action = torch.tensor(action, dtype=torch.long)next_state = torch.tensor(next_state, dtype=torch.float32)done = torch.tensor(done, dtype=torch.float32)idx = torch.arange(BATCH_SIZE)# 下一步狀態的預測with torch.no_grad():prob = model(next_state).softmax(-1) #.softmax(-1) 將 logits 轉換為概率分布。value_dist = prob*model.vrange #得到Q值的分布概率next_action = value_dist.sum(-1).argmax(-1) #選擇最優動作prob = prob[idx, next_action[idx], :] #選出在最優動作下的Q 值分布概率# 計算下一步獎勵的映射value = reward.unsqueeze(-1) + \(1-done).unsqueeze(-1)*GAMMA*model.vrange.squeeze(0)value = (value.clamp(VMIN, VMAX) - VMIN)/DVlf, uf = value.floor(), value.ceil() #floor() 和 ceil():分別取 value 的下界和上界整數索引ll, ul = lf.long(), uf.long()target = torch.zeros_like(value)#scatter_add_:PyTorch 的張量操作,用于將值按索引累加到目標張量中,投影操作target.scatter_add_(1, ll, prob*(uf-value))target.scatter_add_(1, ul, prob*(value-lf))# 當前狀態的預測predict = model(state)[idx, action[idx], :]loss = -(target*predict.log_softmax(-1)).mean()# 損失函數的優化optimizer.zero_grad()loss.backward()optimizer.step()return loss.item()GAMMA = 0.99
EPSILON_MIN = 0.01
EPSILON_MAX = 1.00
NFRAMES = 4
BATCH_SIZE = 32
NSTEPS = 4000000
NBUFFER = 100000
VMIN = -10
VMAX = 10
NCATS = 51
DV = (VMAX - VMIN)/(NCATS - 1)
env = gym.make('PongDeterministic-v4')
env = EnvWrapper(env, NFRAMES)state = env.reset()
buffer = ExpReplayBuffer(NBUFFER)
dqn = CDQN((4, 84, 84), env.env.action_space.n, VMIN, VMAX, NCATS)
# dqn.cuda()
optimizer = torch.optim.Adam(dqn.parameters(), 1e-4)all_rewards = []
all_losses = []
episode_reward = 0eps = lambda t: EPSILON_MIN + (EPSILON_MAX - EPSILON_MIN)*np.exp(-t/30000)for nstep in range(NSTEPS):p = eps(nstep)state_t = torch.tensor(state, dtype=torch.float32).unsqueeze(0)action = dqn.act(state_t, p)next_state, reward, done, _ = env.step(action)buffer.push(state, action, reward, next_state, done)state = next_stateepisode_reward += rewardif done:state = env.reset()all_rewards.append(episode_reward)episode_reward = 0if len(buffer) >= 10000:loss = train(buffer, dqn, optimizer)
6)彩虹算法(Rainbow)
核心思想
將多種改進技術結合在一起,包括 Double DQN、Prioritized Experience Replay、Dueling DQN、Distribution DQN 等。通過綜合這些技術的優點,彩虹算法能夠更有效地學習值函數,提高算法的性能和穩定性。
優點:在各種環境中都表現出了良好的性能,能夠快速收斂到較優的策略;結合了多種技術,充分利用了各自的優勢,對不同類型的任務都有較好的適應性。
缺點:由于結合了多種技術,模型復雜度較高,訓練和調優相對困難;需要更多的計算資源和時間來運行。