在上一篇文章中,我們介紹了 Proximal Policy Optimization (PPO) 算法,并使用它解決了 CartPole 問題。本文將深入探討 Deep Deterministic Policy Gradient (DDPG) 算法,這是一種用于連續動作空間的強化學習算法。我們將使用 PyTorch 實現 DDPG 算法,并應用于經典的 Pendulum 問題。
一、DDPG 算法基礎
DDPG 是一種基于 Actor-Critic 框架的算法,專門用于解決連續動作空間的強化學習問題。它結合了深度 Q 網絡(DQN)和策略梯度方法的優點,能夠高效地處理高維狀態和動作空間。
1. DDPG 的核心思想
-
確定性策略:
-
DDPG 使用確定性策略(Deterministic Policy),即給定狀態時,策略網絡直接輸出一個確定的動作,而不是動作的概率分布。
-
-
目標網絡:
-
DDPG 使用目標網絡(Target Network)來穩定訓練過程,類似于 DQN 中的目標網絡。
-
-
經驗回放:
-
DDPG 使用經驗回放緩沖區(Replay Buffer)來存儲和重用過去的經驗,從而提高數據利用率。
-
2. DDPG 的優勢
-
適用于連續動作空間:
-
DDPG 能夠直接輸出連續動作,適用于機器人控制、自動駕駛等任務。
-
-
訓練穩定:
-
通過目標網絡和經驗回放,DDPG 能夠穩定地訓練策略網絡和價值網絡。
-
-
高效采樣:
-
DDPG 可以重復使用舊策略的采樣數據,從而提高數據利用率。
-
3. DDPG 的算法流程
-
使用當前策略采樣一批數據。
-
使用目標網絡計算目標 Q 值。
-
更新 Critic 網絡以最小化 Q 值的誤差。
-
更新 Actor 網絡以最大化 Q 值。
-
更新目標網絡。
-
重復上述過程,直到策略收斂。
二、Pendulum 問題實戰
我們將使用 PyTorch 實現 DDPG 算法,并應用于 Pendulum 問題。目標是控制擺桿使其保持直立。
1. 問題描述
Pendulum 環境的狀態空間包括擺桿的角度和角速度。動作空間是一個連續的扭矩值,范圍在 ?2,2 之間。智能體每保持擺桿直立一步,就會獲得一個負的獎勵,目標是最大化累積獎勵。
2. 實現步驟
-
安裝并導入必要的庫。
-
定義 Actor 網絡和 Critic 網絡。
-
定義 DDPG 訓練過程。
-
測試模型并評估性能。
3. 代碼實現
以下是完整的代碼實現:
import gym
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import random
from collections import deque
import matplotlib.pyplot as plt
?
# 設置 Matplotlib 支持中文顯示
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
?
# 檢查 GPU 是否可用
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"使用設備: {device}")
?
# 環境初始化
env = gym.make('Pendulum-v1')
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
max_action = float(env.action_space.high[0])
?
# 隨機種子設置
SEED = 42
torch.manual_seed(SEED)
np.random.seed(SEED)
random.seed(SEED)
?
?
# 定義 Actor 網絡
class Actor(nn.Module):def __init__(self, state_dim, action_dim, max_action):super(Actor, self).__init__()self.fc1 = nn.Linear(state_dim, 512)self.ln1 = nn.LayerNorm(512) # 層歸一化self.fc2 = nn.Linear(512, 512)self.ln2 = nn.LayerNorm(512)self.fc3 = nn.Linear(512, action_dim)self.max_action = max_action
?def forward(self, x):x = F.relu(self.ln1(self.fc1(x)))x = F.relu(self.ln2(self.fc2(x)))return self.max_action * torch.tanh(self.fc3(x))
?
?
# 定義 Critic 網絡
class Critic(nn.Module):def __init__(self, state_dim, action_dim):super(Critic, self).__init__()self.fc1 = nn.Linear(state_dim + action_dim, 256)self.fc2 = nn.Linear(256, 256)self.fc3 = nn.Linear(256, 1)
?def forward(self, x, u):x = F.relu(self.fc1(torch.cat([x, u], 1)))x = F.relu(self.fc2(x))x = self.fc3(x)return x
?
?
# 添加OU噪聲類
class OUNoise:def __init__(self, action_dim, mu=0, theta=0.15, sigma=0.2):self.mu = mu * np.ones(action_dim)self.theta = thetaself.sigma = sigmaself.reset()
?def reset(self):self.state = np.copy(self.mu)
?def sample(self):dx = self.theta * (self.mu - self.state) + self.sigma * np.random.randn(len(self.state))self.state += dxreturn self.state
?
?
# 定義 DDPG 算法
class DDPG:def __init__(self, state_dim, action_dim, max_action):self.actor = Actor(state_dim, action_dim, max_action).to(device)self.actor_target = Actor(state_dim, action_dim, max_action).to(device)self.actor_target.load_state_dict(self.actor.state_dict())self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=1e-4)
?self.critic = Critic(state_dim, action_dim).to(device)self.critic_target = Critic(state_dim, action_dim).to(device)self.critic_target.load_state_dict(self.critic.state_dict())self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=1e-3)self.noise = OUNoise(action_dim, sigma=0.2) # 示例:Ornstein-Uhlenbeck噪聲
?self.max_action = max_actionself.replay_buffer = deque(maxlen=1000000)self.batch_size = 64self.gamma = 0.99self.tau = 0.005self.noise_sigma = 0.5 # 初始噪聲強度self.noise_decay = 0.995
?self.actor_lr_scheduler = optim.lr_scheduler.StepLR(self.actor_optimizer, step_size=100, gamma=0.95)self.critic_lr_scheduler = optim.lr_scheduler.StepLR(self.critic_optimizer, step_size=100, gamma=0.95)
?def select_action(self, state):state = torch.FloatTensor(state).unsqueeze(0).to(device)self.actor.eval()with torch.no_grad():action = self.actor(state).cpu().data.numpy().flatten()self.actor.train()return action
?def train(self):if len(self.replay_buffer) < self.batch_size:return
?# 從經驗回放緩沖區中采樣batch = random.sample(self.replay_buffer, self.batch_size)state = torch.FloatTensor(np.array([transition[0] for transition in batch])).to(device)action = torch.FloatTensor(np.array([transition[1] for transition in batch])).to(device)reward = torch.FloatTensor(np.array([transition[2] for transition in batch])).reshape(-1, 1).to(device)next_state = torch.FloatTensor(np.array([transition[3] for transition in batch])).to(device)done = torch.FloatTensor(np.array([transition[4] for transition in batch])).reshape(-1, 1).to(device)
?# 計算目標 Q 值next_action = self.actor_target(next_state)target_Q = self.critic_target(next_state, next_action)target_Q = reward + (1 - done) * self.gamma * target_Q
?# 更新 Critic 網絡current_Q = self.critic(state, action)critic_loss = F.mse_loss(current_Q, target_Q.detach())self.critic_optimizer.zero_grad()critic_loss.backward()self.critic_optimizer.step()
?# 更新 Actor 網絡actor_loss = -self.critic(state, self.actor(state)).mean()self.actor_optimizer.zero_grad()actor_loss.backward()self.actor_optimizer.step()
?# 更新目標網絡for param, target_param in zip(self.critic.parameters(), self.critic_target.parameters()):target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)for param, target_param in zip(self.actor.parameters(), self.actor_target.parameters()):target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
?def save(self, filename):torch.save(self.actor.state_dict(), filename + "_actor.pth")torch.save(self.critic.state_dict(), filename + "_critic.pth")
?def load(self, filename):self.actor.load_state_dict(torch.load(filename + "_actor.pth"))self.critic.load_state_dict(torch.load(filename + "_critic.pth"))
?
?
# 訓練流程
def train_ddpg(env, agent, episodes=500):rewards_history = []moving_avg = []
?for ep in range(episodes):state,_ = env.reset()episode_reward = 0done = False
?while not done:action = agent.select_action(state)next_state, reward, done, _, _ = env.step(action)agent.replay_buffer.append((state, action, reward, next_state, done))state = next_stateepisode_reward += rewardagent.train()
?rewards_history.append(episode_reward)moving_avg.append(np.mean(rewards_history[-50:]))
?if (ep + 1) % 50 == 0:print(f"Episode: {ep + 1}, Avg Reward: {moving_avg[-1]:.2f}")
?return moving_avg, rewards_history
?
?
# 訓練啟動
ddpg_agent = DDPG(state_dim, action_dim, max_action)
moving_avg, rewards_history = train_ddpg(env, ddpg_agent)
?
# 可視化結果
plt.figure(figsize=(12, 6))
plt.plot(rewards_history, alpha=0.6, label='single round reward')
plt.plot(moving_avg, 'r-', linewidth=2, label='moving average (50 rounds)')
plt.xlabel('episodes')
plt.ylabel('reward')
plt.title('DDPG training performance on Pendulum-v1')
plt.legend()
plt.grid(True)
plt.show()
三、代碼解析
-
Actor 和 Critic 網絡:
-
Actor 網絡輸出連續動作,通過
tanh
函數將動作限制在 ?max_action,max_action 范圍內。 -
Critic 網絡輸出狀態-動作對的 Q 值。
-
-
DDPG 訓練過程:
-
使用當前策略采樣一批數據。
-
使用目標網絡計算目標 Q 值。
-
更新 Critic 網絡以最小化 Q 值的誤差。
-
更新 Actor 網絡以最大化 Q 值。
-
更新目標網絡。
-
-
訓練過程:
-
在訓練過程中,每 50 個 episode 打印一次平均獎勵。
-
訓練結束后,繪制訓練過程中的總獎勵曲線。
-
四、運行結果
運行上述代碼后,你將看到以下輸出:
-
訓練過程中每 50 個 episode 打印一次平均獎勵。
-
訓練結束后,繪制訓練過程中的總獎勵曲線。
五、總結
本文介紹了 DDPG 算法的基本原理,并使用 PyTorch 實現了一個簡單的 DDPG 模型來解決 Pendulum 問題。通過這個例子,我們學習了如何使用 DDPG 算法進行連續動作空間的策略優化。
在下一篇文章中,我們將探討更高級的強化學習算法,如 Twin Delayed DDPG (TD3)。敬請期待!
代碼實例說明:
-
本文代碼可以直接在 Jupyter Notebook 或 Python 腳本中運行。
-
如果你有 GPU,可以將模型和數據移動到 GPU 上運行,例如:
actor = actor.to('cuda')
,state = state.to('cuda')
。
希望這篇文章能幫助你更好地理解 DDPG 算法!如果有任何問題,歡迎在評論區留言討論。