3.actor-critic方法
3.1 Reinforce 算法,也稱為蒙特卡洛策略梯度。蒙特卡洛方差
第一節介紹了DQN
在上一節基于策略的方法中,我們的目標是直接優化策略,而無需使用價值函數。更準確地說,Reinforce 是 基于策略的方法 的一個子類,稱為 策略梯度方法。這個子類通過使用梯度上升估計最優策略的權重來直接優化策略。
我們看到 Reinforce 運行良好。然而,因為我們使用蒙特卡洛采樣來估計回報(我們使用整個 episode 來計算回報),所以我們在策略梯度估計中存在顯著的方差。
請記住,策略梯度估計是回報增加最快的方向。換句話說,如何更新我們的策略權重,以便導致良好回報的動作有更高的概率被采取。蒙特卡洛方差,因為我們需要大量樣本來緩解它,導致訓練速度變慢。
為什么方差大,因為各個軌跡樣本差異比較大,更新梯度也會比較亂,不穩定,樣本足夠多求平均方差會小一些。
!## 3.1 Reinforce 算法,也稱為蒙特卡洛策略梯度。蒙特卡洛方差
在基于策略的方法中,我們的目標是直接優化策略,而無需使用價值函數。更準確地說,Reinforce 是 基于策略的方法 的一個子類,稱為 策略梯度方法。這個子類通過使用梯度上升估計最優策略的權重來直接優化策略。
我們看到 Reinforce 運行良好。然而,因為我們使用蒙特卡洛采樣來估計回報(我們使用整個 episode 來計算回報),所以我們在策略梯度估計中存在顯著的方差。
請記住,策略梯度估計是回報增加最快的方向。換句話說,如何更新我們的策略權重,以便導致良好回報的動作有更高的概率被采取。蒙特卡洛方差,因為我們需要大量樣本來緩解它,導致訓練速度變慢。
為什么方差大,因為各個軌跡樣本差異比較大,更新梯度也會比較亂,不穩定,樣本足夠多求平均方差會小一些。
3.2 A2C方法 advantage actor-critic 主要步驟
-
actor我們的策略接收state并輸出一個動作action
-
critic并使用state和action ,計算在該狀態下采取該動作的價值:Q 值。
-
env給出 S_t+1 和R_t+1
-
actor更新:策略梯度方法如下圖, 如果是advantage ac方法,q會被替換為優勢函數A(s,a)
-
critic更新:注意TD error計算決定了更新的方向
3.3 A2C方法loss
可以看出 A2C方法將上一節的策略梯度方法 拆分為2個網絡,2個loss函數。更新更慢一些更穩定一些。策略梯度直接用最大化 概率乘回報。 A2C方法 actor最大化 概率乘預測的價值, 同時有個critic網絡更新狀態價值。
參考代碼邏輯:
def compute_loss(agent, states, actions, rewards, next_states, dones, gamma=0.99):# 獲取策略概率和狀態價值action_probs, state_values = agent(states) # (B, action_dim), (B, 1)_, next_state_values = agent(next_states) # (B, 1)# 計算優勢函數td_targets = rewards + gamma * next_state_values * (1 - dones) # (B, 1)advantages = td_targets.detach() - state_values # (B, 1)# 策略損失dist = Categorical(action_probs)log_probs = dist.log_prob(actions) # (B,)policy_loss = -(log_probs * advantages.squeeze()).mean() # scalar# 價值損失value_loss = F.mse_loss(state_values, td_targets.detach()) # scalar# 熵正則化entropy = -torch.sum(action_probs * torch.log(action_probs), dim=-1).mean() # scalar# 總損失total_loss = policy_loss + 0.5 * value_loss - 0.01 * entropyreturn total_loss, policy_loss, value_loss, entropy
3.4 A2C 核心代碼原理
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.distributions import Categoricalclass ActorCritic(nn.Module):def __init__(self, state_dim, action_dim, hidden_dim=64):super(ActorCritic, self).__init__()# 共享的特征提取層self.fc1 = nn.Linear(state_dim, hidden_dim)# Actor層 - 輸出動作概率self.fc_actor = nn.Linear(hidden_dim, action_dim)# Critic層 - 輸出狀態價值self.fc_critic = nn.Linear(hidden_dim, 1)def forward(self, x):x = F.relu(self.fc1(x))# 動作概率 (batch_size, action_dim)action_probs = F.softmax(self.fc_actor(x), dim=-1)# 狀態價值 (batch_size, 1)state_values = self.fc_critic(x)return action_probs, state_valuesdef compute_returns(rewards, gamma=0.99):"""計算折扣回報參數:rewards: 獎勵序列 [r1, r2, ..., rT]gamma: 折扣因子返回:returns: 折扣回報序列 [R1, R2, ..., RT]"""R = 0returns = []for r in reversed(rewards):R = r + gamma * Rreturns.insert(0, R)return returnsdef a2c_update(agent, optimizer, states, actions, rewards, next_states, dones, gamma=0.99):"""A2C算法更新步驟參數:agent: ActorCritic網絡r: 優化器states: 狀態序列 (T, state_dim)actions: 動作序列 (T,)rewards: 獎勵序列 (T,)next_states: 下一個狀態序列 (T, state_dim)dones: 終止標志序列 (T,)gamma: 折扣因子"""# 轉換為tensorstates = torch.FloatTensor(states) # (T, state_dim)actions = torch.LongTensor(actions) # (T,)rewards = torch.FloatTensor(rewards) # (T,)next_states = torch.FloatTensor(next_states) # (T, state_dim)dones = torch.FloatTensor(dones) # (T,)# 計算狀態價值和下一個狀態價值_, state_values = agent(states) # (T, 1)_, next_state_values = agent(next_states) # (T, 1)# 計算TD目標td_targets = rewards + gamma * next_state_values * (1 - dones) # (T, 1)# 計算優勢函數advantages = td_targets.detach() - state_values # (T, 1)# 計算動作概率action_probs, _ = agent(states) # (T, action_dim)dist = Categorical(action_probs)# 計算策略梯度損失policy_loss = -dist.log_prob(actions) * advantages.squeeze() # (T,)policy_loss = policy_loss.mean()# 計算價值函數損失value_loss = F.mse_loss(state_values, td_targets.detach())# 熵正則化entropy_loss = -torch.sum(action_probs * torch.log(action_probs), dim=-1).mean() # scalar# 總損失loss = policy_loss + 0.5 * value_loss - 0.01 * entropy_loss # 0.5是價值損失系數# 反向傳播optimizer.zero_grad()loss.backward()optimizer.step()return policy_loss.item(), value_loss.item(), entropy_loss.item()
訓練腳本
import gym
import numpy as np
from collections import deque
import matplotlib.pyplot as plt# 創建環境
env = gym.make('CartPole-v1')
state_dim = env.observation_space.shape[0] # 4
action_dim = env.action_space.n # 2# 初始化A2C智能體
agent = ActorCritic(state_dim, action_dim)
optimizer = optim.Adam(agent.parameters(), lr=0.001)# 訓練參數
num_episodes = 1000
max_steps = 1000
gamma = 0.99# 訓練循環
episode_rewards = []
for episode in range(num_episodes):state = env.reset()episode_reward = 0episode_states = []episode_actions = []episode_rewards = []episode_next_states = []episode_dones = []for step in range(max_steps):# 選擇動作state_tensor = torch.FloatTensor(state).unsqueeze(0) # (1, state_dim)action_probs, _ = agent(state_tensor) # (1, action_dim)dist = Categorical(action_probs)action = dist.sample().item() # scalar# 執行動作next_state, reward, done, _ = env.step(action)# 存儲經驗episode_states.append(state)episode_actions.append(action)episode_rewards.append(reward)episode_next_states.append(next_state)episode_dones.append(done)state = next_stateepisode_reward += reward if done:break# 更新網絡policy_loss, value_loss = a2c_update(agent, optimizer,episode_states, episode_actions, episode_rewards,episode_next_states, episode_dones, gamma)# 記錄獎勵episode_rewards.append(episode_reward)# 打印訓練信息if (episode + 1) % 10 == 0:avg_reward = np.mean(episode_rewards[-10:])print(f"Episode {episode+1}, Avg Reward: {avg_reward:.1f}, Policy Loss: {policy_loss:.3f}, Value Loss: {value_loss:.3f}")# 繪制訓練曲線
plt.plot(episode_rewards)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('A2C Training Performance on CartPole')
plt.show()# 測試訓練好的智能體
def test_agent(agent, env, num_episodes=10):for episode in range(num_episodes):state = env.reset()total_reward = 0done = Falsewhile not done:state_tensor = torch.FloatTensor(state).unsqueeze(0)with torch.no_grad():action_probs, _ = agent(state_tensor)action = torch.argmax(action_probs).item()next_state, reward, done, _ = env.step(action)total_reward += rewardstate = next_stateprint(f"Test Episode {episode+1}: Total Reward = {total_reward}")test_agent(agent, env)
也可以封裝成class
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
from collections import deque
import gymclass A2CNetwork(nn.Module):"""A2C網絡架構,包含共享特征提取層、Actor頭和Critic頭"""def __init__(self, input_dim, action_dim, hidden_dim=256):super(A2CNetwork, self).__init__()# 共享特征提取層# input_dim: 狀態空間維度,例如(4,)表示4維狀態# hidden_dim: 隱藏層維度self.shared_layers = nn.Sequential(nn.Linear(input_dim, hidden_dim), # shape: (batch_size, input_dim) -> (batch_size, hidden_dim)nn.ReLU(),nn.Linear(hidden_dim, hidden_dim), # shape: (batch_size, hidden_dim) -> (batch_size, hidden_dim)nn.ReLU())# Actor頭:輸出動作概率分布# action_dim: 動作空間維度,例如2表示2個離散動作self.actor_head = nn.Linear(hidden_dim, action_dim) # shape: (batch_size, hidden_dim) -> (batch_size, action_dim)# Critic頭:輸出狀態價值self.critic_head = nn.Linear(hidden_dim, 1) # shape: (batch_size, hidden_dim) -> (batch_size, 1)def forward(self, state):"""前向傳播Args:state: 狀態張量,shape: (batch_size, input_dim)Returns:action_probs: 動作概率分布,shape: (batch_size, action_dim)state_value: 狀態價值,shape: (batch_size, 1)"""# 特征提取features = self.shared_layers(state) # shape: (batch_size, hidden_dim)# Actor輸出:動作概率分布action_logits = self.actor_head(features) # shape: (batch_size, action_dim)action_probs = F.softmax(action_logits, dim=-1) # shape: (batch_size, action_dim)# Critic輸出:狀態價值state_value = self.critic_head(features) # shape: (batch_size, 1)return action_probs, state_valueclass A2CAgent:"""A2C智能體實現"""def __init__(self, state_dim, action_dim, lr=3e-4, gamma=0.99, entropy_coef=0.01, value_coef=0.5):"""初始化A2C智能體Args:state_dim: 狀態空間維度action_dim: 動作空間維度lr: 學習率gamma: 折扣因子entropy_coef: 熵正則化系數value_coef: 價值損失權重"""self.gamma = gammaself.entropy_coef = entropy_coefself.value_coef = value_coef# 創建網絡self.network = A2CNetwork(state_dim, action_dim)self.optimizer = optim.Adam(self.network.parameters(), lr=lr)# 存儲軌跡數據self.reset_trajectory()def reset_trajectory(self):"""重置軌跡存儲"""self.states = [] # 狀態序列,每個元素shape: (state_dim,)self.actions = [] # 動作序列,每個元素shape: ()self.rewards = [] # 獎勵序列,每個元素shape: ()self.log_probs = [] # 對數概率序列,每個元素shape: ()self.values = [] # 狀態價值序列,每個元素shape: ()def select_action(self, state):"""根據當前策略選擇動作Args:state: 當前狀態,shape: (state_dim,)Returns:action: 選擇的動作,標量log_prob: 動作的對數概率,標量value: 狀態價值,標量"""# 轉換為張量并添加batch維度state_tensor = torch.FloatTensor(state).unsqueeze(0) # shape: (1, state_dim)# 前向傳播action_probs, state_value = self.network(state_tensor)# action_probs shape: (1, action_dim)# state_value shape: (1, 1)# 創建分布并采樣動作dist = torch.distributions.Categorical(action_probs)action = dist.sample() # shape: (1,)log_prob = dist.log_prob(action) # shape: (1,)return action.item(), log_prob.squeeze(), state_value.squeeze()def store_transition(self, state, action, reward, log_prob, value):"""存儲一步轉移Args:state: 狀態,shape: (state_dim,)action: 動作,標量reward: 獎勵,標量log_prob: 對數概率,標量張量value: 狀態價值,標量張量"""self.states.append(state)self.actions.append(action)self.rewards.append(reward)self.log_probs.append(log_prob)self.values.append(value)def compute_returns_and_advantages(self, next_value=0):"""計算回報和優勢函數Args:next_value: 下一個狀態的價值(終止狀態為0)Returns:returns: 回報序列,shape: (trajectory_length,)advantages: 優勢序列,shape: (trajectory_length,)"""trajectory_length = len(self.rewards)# 計算回報(從后往前)returns = torch.zeros(trajectory_length)R = next_value # 從終止狀態開始for t in reversed(range(trajectory_length)):R = self.rewards[t] + self.gamma * R # R = r_t + γ * R_{t+1}returns[t] = R# 轉換values為張量values = torch.stack(self.values) # shape: (trajectory_length,)# 計算優勢函數:A(s,a) = R - V(s)advantages = returns - values # shape: (trajectory_length,)return returns, advantagesdef update(self, next_value=0):"""更新網絡參數Args:next_value: 下一個狀態的價值"""if len(self.rewards) == 0:return# 計算回報和優勢returns, advantages = self.compute_returns_and_advantages(next_value)# 轉換為張量states = torch.FloatTensor(np.array(self.states)) # shape: (trajectory_length, state_dim)actions = torch.LongTensor(self.actions) # shape: (trajectory_length,)log_probs = torch.stack(self.log_probs) # shape: (trajectory_length,)# 前向傳播獲取當前策略下的概率和價值action_probs, state_values = self.network(states)# action_probs shape: (trajectory_length, action_dim)# state_values shape: (trajectory_length, 1)state_values = state_values.squeeze() # shape: (trajectory_length,)# 計算當前策略下的對數概率和熵dist = torch.distributions.Categorical(action_probs)new_log_probs = dist.log_prob(actions) # shape: (trajectory_length,)entropy = dist.entropy().mean() # 標量,策略熵# 計算損失# Actor損失:策略梯度損失actor_loss = -(new_log_probs * advantages.detach()).mean() # 標量# Critic損失:價值函數損失critic_loss = F.mse_loss(state_values, returns.detach()) # 標量# 總損失total_loss = actor_loss + self.value_coef * critic_loss - self.entropy_coef * entropy# 反向傳播和優化self.optimizer.zero_grad()total_loss.backward()# 梯度裁剪,防止梯度爆炸torch.nn.utils.clip_grad_norm_(self.network.parameters(), max_norm=0.5)self.optimizer.step()# 重置軌跡self.reset_trajectory()return {'actor_loss': actor_loss.item(),'critic_loss': critic_loss.item(),'entropy': entropy.item(),'total_loss': total_loss.item()}import gym
import matplotlib.pyplot as plt
from collections import dequedef train_a2c(env_name='CartPole-v1', num_episodes=1000, max_steps=500, update_freq=5):"""訓練A2C智能體Args:env_name: 環境名稱num_episodes: 訓練回合數max_steps: 每回合最大步數update_freq: 更新頻率(每多少步更新一次)"""# 創建環境env = gym.make(env_name)state_dim = env.observation_space.shape[0] # 狀態維度,例如CartPole為4action_dim = env.action_space.n # 動作維度,例如CartPole為2print(f"狀態維度: {state_dim}, 動作維度: {action_dim}")# 創建智能體agent = A2CAgent(state_dim, action_dim)# 訓練記錄episode_rewards = [] # 每回合總獎勵recent_rewards = deque(maxlen=100) # 最近100回合的獎勵for episode in range(num_episodes):state = env.reset() # shape: (state_dim,)episode_reward = 0step_count = 0for step in range(max_steps):# 選擇動作action, log_prob, value = agent.select_action(state)# 執行動作next_state, reward, done, _ = env.step(action)# next_state shape: (state_dim,)# reward: 標量# done: 布爾值# 存儲轉移agent.store_transition(state, action, reward, log_prob, value)episode_reward += rewardstep_count += 1state = next_state# 定期更新或回合結束時更新if step_count % update_freq == 0 or done:# 計算下一個狀態的價值(如果回合結束則為0)if done:next_value = 0else:with torch.no_grad():next_state_tensor = torch.FloatTensor(next_state).unsqueeze(0)_, next_value = agent.network(next_state_tensor)next_value = next_value.squeeze().item()# 更新網絡loss_info = agent.update(next_value)if loss_info and episode % 100 == 0:print(f"Episode {episode}, Step {step}: "f"Actor Loss: {loss_info['actor_loss']:.4f}, "f"Critic Loss: {loss_info['critic_loss']:.4f}, "f"Entropy: {loss_info['entropy']:.4f}")if done:break# 記錄獎勵episode_rewards.append(episode_reward)recent_rewards.append(episode_reward)# 打印進度if episode % 100 == 0:avg_reward = np.mean(recent_rewards)print(f"Episode {episode}, Average Reward: {avg_reward:.2f}, "f"Current Reward: {episode_reward:.2f}")env.close()return agent, episode_rewardsdef test_agent(agent, env_name='CartPole-v1', num_episodes=10, render=True):"""測試訓練好的智能體Args:agent: 訓練好的A2C智能體env_name: 環境名稱num_episodes: 測試回合數render: 是否渲染"""env = gym.make(env_name)test_rewards = []for episode in range(num_episodes):state = env.reset()episode_reward = 0done = Falsewhile not done:if render:env.render()# 選擇動作(測試時不需要存儲軌跡)action, _, _ = agent.select_action(state)state, reward, done, _ = env.step(action)episode_reward += rewardtest_rewards.append(episode_reward)print(f"Test Episode {episode + 1}: Reward = {episode_reward}")env.close()avg_test_reward = np.mean(test_rewards)print(f"\n平均測試獎勵: {avg_test_reward:.2f}")return test_rewardsdef plot_training_results(episode_rewards):"""繪制訓練結果Args:episode_rewards: 每回合獎勵列表"""plt.figure(figsize=(12, 4))# 原始獎勵曲線plt.subplot(1, 2, 1)plt.plot(episode_rewards, alpha=0.6)plt.title('Episode Rewards')plt.xlabel('Episode')plt.ylabel('Reward')plt.grid(True)# 移動平均獎勵曲線plt.subplot(1, 2, 2)window_size = 100if len(episode_rewards) >= window_size:moving_avg = []for i in range(window_size - 1, len(episode_rewards)):moving_avg.append(np.mean(episode_rewards[i - window_size + 1:i + 1]))plt.plot(range(window_size - 1, len(episode_rewards)), moving_avg)plt.title(f'Moving Average Rewards (window={window_size})')plt.xlabel('Episode')plt.ylabel('Average Reward')plt.grid(True)plt.tight_layout()plt.show()# 主函數
if __name__ == "__main__":print("開始訓練A2C智能體...")# 訓練智能體agent, rewards = train_a2c(env_name='CartPole-v1',num_episodes=1000,max_steps=500,update_freq=5)print("\n訓練完成!開始測試...")# 測試智能體test_rewards = test_agent(agent, num_episodes=5, render=False)# 繪制結果plot_training_results(rewards)print("\n訓練和測試完成!")
更多關于方差和偏差參考:Making Sense of the Bias / Variance Trade-off in (Deep) Reinforcement Learning
https://blog.mlreview.com/making-sense-of-the-bias-variance-trade-off-in-deep-reinforcement-learning-79cf1e83d565
在 RL 的情況下,方差現在是指有噪聲但平均準確的值估計,而偏差是指穩定但不準確的值估計
3.2 A2C方法 advantage actor-critic 主要步驟
- actor我們的策略接收state并輸出一個動作action
-
critic并使用state和action ,計算在該狀態下采取該動作的價值:Q 值。
-
env給出 S_t+1 和R_t+1
-
actor更新:策略梯度方法如下圖, 如果是advantage ac方法,q會被替換為優勢函數A(s,a)
-
critic更新:注意TD error計算決定了更新的方向
3.3 A2C方法loss
參考代碼邏輯:
def compute_loss(agent, states, actions, rewards, next_states, dones, gamma=0.99):# 獲取策略概率和狀態價值action_probs, state_values = agent(states) # (B, action_dim), (B, 1)_, next_state_values = agent(next_states) # (B, 1)# 計算優勢函數td_targets = rewards + gamma * next_state_values * (1 - dones) # (B, 1)advantages = td_targets.detach() - state_values # (B, 1)# 策略損失dist = Categorical(action_probs)log_probs = dist.log_prob(actions) # (B,)policy_loss = -(log_probs * advantages.squeeze()).mean() # scalar# 價值損失value_loss = F.mse_loss(state_values, td_targets.detach()) # scalar# 熵正則化entropy = -torch.sum(action_probs * torch.log(action_probs), dim=-1).mean() # scalar# 總損失total_loss = policy_loss + 0.5 * value_loss - 0.01 * entropyreturn total_loss, policy_loss, value_loss, entropy
3.4 A2C 核心代碼原理
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.distributions import Categoricalclass ActorCritic(nn.Module):def __init__(self, state_dim, action_dim, hidden_dim=64):super(ActorCritic, self).__init__()# 共享的特征提取層self.fc1 = nn.Linear(state_dim, hidden_dim)# Actor層 - 輸出動作概率self.fc_actor = nn.Linear(hidden_dim, action_dim)# Critic層 - 輸出狀態價值self.fc_critic = nn.Linear(hidden_dim, 1)def forward(self, x):x = F.relu(self.fc1(x))# 動作概率 (batch_size, action_dim)action_probs = F.softmax(self.fc_actor(x), dim=-1)# 狀態價值 (batch_size, 1)state_values = self.fc_critic(x)return action_probs, state_valuesdef compute_returns(rewards, gamma=0.99):"""計算折扣回報參數:rewards: 獎勵序列 [r1, r2, ..., rT]gamma: 折扣因子返回:returns: 折扣回報序列 [R1, R2, ..., RT]"""R = 0returns = []for r in reversed(rewards):R = r + gamma * Rreturns.insert(0, R)return returnsdef a2c_update(agent, optimizer, states, actions, rewards, next_states, dones, gamma=0.99):"""A2C算法更新步驟參數:agent: ActorCritic網絡optimizer: 優化器states: 狀態序列 (T, state_dim)actions: 動作序列 (T,)rewards: 獎勵序列 (T,)next_states: 下一個狀態序列 (T, state_dim)dones: 終止標志序列 (T,)gamma: 折扣因子"""# 轉換為tensorstates = torch.FloatTensor(states) # (T, state_dim)actions = torch.LongTensor(actions) # (T,)rewards = torch.FloatTensor(rewards) # (T,)next_states = torch.FloatTensor(next_states) # (T, state_dim)dones = torch.FloatTensor(dones) # (T,)# 計算狀態價值和下一個狀態價值_, state_values = agent(states) # (T, 1)_, next_state_values = agent(next_states) # (T, 1)# 計算TD目標td_targets = rewards + gamma * next_state_values * (1 - dones) # (T, 1)# 計算優勢函數advantages = td_targets.detach() - state_values # (T, 1)# 計算動作概率action_probs, _ = agent(states) # (T, action_dim)dist = Categorical(action_probs)# 計算策略梯度損失policy_loss = -dist.log_prob(actions) * advantages.squeeze() # (T,)policy_loss = policy_loss.mean()# 計算價值函數損失value_loss = F.mse_loss(state_values, td_targets.detach())# 熵正則化entropy_loss = -torch.sum(action_probs * torch.log(action_probs), dim=-1).mean() # scalar# 總損失loss = policy_loss + 0.5 * value_loss - 0.01 * entropy_loss # 0.5是價值損失系數# 反向傳播optimizer.zero_grad()loss.backward()optimizer.step()return policy_loss.item(), value_loss.item(), entropy_loss.item()
訓練腳本
import gym
import numpy as np
from collections import deque
import matplotlib.pyplot as plt# 創建環境
env = gym.make('CartPole-v1')
state_dim = env.observation_space.shape[0] # 4
action_dim = env.action_space.n # 2# 初始化A2C智能體
agent = ActorCritic(state_dim, action_dim)
optimizer = optim.Adam(agent.parameters(), lr=0.001)# 訓練參數
num_episodes = 1000
max_steps = 1000
gamma = 0.99# 訓練循環
episode_rewards = []
for episode in range(num_episodes):state = env.reset()episode_reward = 0episode_states = []episode_actions = []episode_rewards = []episode_next_states = []episode_dones = []for step in range(max_steps):# 選擇動作state_tensor = torch.FloatTensor(state).unsqueeze(0) # (1, state_dim)action_probs, _ = agent(state_tensor) # (1, action_dim)dist = Categorical(action_probs)action = dist.sample().item() # scalar# 執行動作next_state, reward, done, _ = env.step(action)# 存儲經驗episode_states.append(state)episode_actions.append(action)episode_rewards.append(reward)episode_next_states.append(next_state)episode_dones.append(done)state = next_stateepisode_reward += rewardif done:break# 更新網絡policy_loss, value_loss = a2c_update(agent, optimizer,episode_states, episode_actions, episode_rewards,episode_next_states, episode_dones, gamma)# 記錄獎勵episode_rewards.append(episode_reward)# 打印訓練信息if (episode + 1) % 10 == 0:avg_reward = np.mean(episode_rewards[-10:])print(f"Episode {episode+1}, Avg Reward: {avg_reward:.1f}, Policy Loss: {policy_loss:.3f}, Value Loss: {value_loss:.3f}")# 繪制訓練曲線
plt.plot(episode_rewards)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('A2C Training Performance on CartPole')
plt.show()# 測試訓練好的智能體
def test_agent(agent, env, num_episodes=10):for episode in range(num_episodes):state = env.reset()total_reward = 0done = Falsewhile not done:state_tensor = torch.FloatTensor(state).unsqueeze(0)with torch.no_grad():action_probs, _ = agent(state_tensor)action = torch.argmax(action_probs).item()next_state, reward, done, _ = env.step(action)total_reward += rewardstate = next_stateprint(f"Test Episode {episode+1}: Total Reward = {total_reward}")test_agent(agent, env)
也可以封裝成class
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
from collections import deque
import gymclass A2CNetwork(nn.Module):"""A2C網絡架構,包含共享特征提取層、Actor頭和Critic頭"""def __init__(self, input_dim, action_dim, hidden_dim=256):super(A2CNetwork, self).__init__()# 共享特征提取層# input_dim: 狀態空間維度,例如(4,)表示4維狀態# hidden_dim: 隱藏層維度self.shared_layers = nn.Sequential(nn.Linear(input_dim, hidden_dim), # shape: (batch_size, input_dim) -> (batch_size, hidden_dim)nn.ReLU(),nn.Linear(hidden_dim, hidden_dim), # shape: (batch_size, hidden_dim) -> (batch_size, hidden_dim)nn.ReLU())# Actor頭:輸出動作概率分布# action_dim: 動作空間維度,例如2表示2個離散動作self.actor_head = nn.Linear(hidden_dim, action_dim) # shape: (batch_size, hidden_dim) -> (batch_size, action_dim)# Critic頭:輸出狀態價值self.critic_head = nn.Linear(hidden_dim, 1) # shape: (batch_size, hidden_dim) -> (batch_size, 1)def forward(self, state):"""前向傳播Args:state: 狀態張量,shape: (batch_size, input_dim)Returns:action_probs: 動作概率分布,shape: (batch_size, action_dim)state_value: 狀態價值,shape: (batch_size, 1)"""# 特征提取features = self.shared_layers(state) # shape: (batch_size, hidden_dim)# Actor輸出:動作概率分布action_logits = self.actor_head(features) # shape: (batch_size, action_dim)action_probs = F.softmax(action_logits, dim=-1) # shape: (batch_size, action_dim)# Critic輸出:狀態價值state_value = self.critic_head(features) # shape: (batch_size, 1)return action_probs, state_valueclass A2CAgent:"""A2C智能體實現"""def __init__(self, state_dim, action_dim, lr=3e-4, gamma=0.99, entropy_coef=0.01, value_coef=0.5):"""初始化A2C智能體Args:state_dim: 狀態空間維度action_dim: 動作空間維度lr: 學習率gamma: 折扣因子entropy_coef: 熵正則化系數value_coef: 價值損失權重"""self.gamma = gammaself.entropy_coef = entropy_coefself.value_coef = value_coef# 創建網絡self.network = A2CNetwork(state_dim, action_dim)self.optimizer = optim.Adam(self.network.parameters(), lr=lr)# 存儲軌跡數據self.reset_trajectory()def reset_trajectory(self):"""重置軌跡存儲"""self.states = [] # 狀態序列,每個元素shape: (state_dim,)self.actions = [] # 動作序列,每個元素shape: ()self.rewards = [] # 獎勵序列,每個元素shape: ()self.log_probs = [] # 對數概率序列,每個元素shape: ()self.values = [] # 狀態價值序列,每個元素shape: ()def select_action(self, state):"""根據當前策略選擇動作Args:state: 當前狀態,shape: (state_dim,)Returns:action: 選擇的動作,標量log_prob: 動作的對數概率,標量value: 狀態價值,標量"""# 轉換為張量并添加batch維度state_tensor = torch.FloatTensor(state).unsqueeze(0) # shape: (1, state_dim)# 前向傳播action_probs, state_value = self.network(state_tensor)# action_probs shape: (1, action_dim)# state_value shape: (1, 1)# 創建分布并采樣動作dist = torch.distributions.Categorical(action_probs)action = dist.sample() # shape: (1,)log_prob = dist.log_prob(action) # shape: (1,)return action.item(), log_prob.squeeze(), state_value.squeeze()def store_transition(self, state, action, reward, log_prob, value):"""存儲一步轉移Args:state: 狀態,shape: (state_dim,)action: 動作,標量reward: 獎勵,標量log_prob: 對數概率,標量張量value: 狀態價值,標量張量"""self.states.append(state)self.actions.append(action)self.rewards.append(reward)self.log_probs.append(log_prob)self.values.append(value)def compute_returns_and_advantages(self, next_value=0):"""計算回報和優勢函數Args:next_value: 下一個狀態的價值(終止狀態為0)Returns:returns: 回報序列,shape: (trajectory_length,)advantages: 優勢序列,shape: (trajectory_length,)"""trajectory_length = len(self.rewards)# 計算回報(從后往前)returns = torch.zeros(trajectory_length)R = next_value # 從終止狀態開始for t in reversed(range(trajectory_length)):R = self.rewards[t] + self.gamma * R # R = r_t + γ * R_{t+1}returns[t] = R# 轉換values為張量values = torch.stack(self.values) # shape: (trajectory_length,)# 計算優勢函數:A(s,a) = R - V(s)advantages = returns - values # shape: (trajectory_length,)return returns, advantagesdef update(self, next_value=0):"""更新網絡參數Args:next_value: 下一個狀態的價值"""if len(self.rewards) == 0:return# 計算回報和優勢returns, advantages = self.compute_returns_and_advantages(next_value)# 轉換為張量states = torch.FloatTensor(np.array(self.states)) # shape: (trajectory_length, state_dim)actions = torch.LongTensor(self.actions) # shape: (trajectory_length,)log_probs = torch.stack(self.log_probs) # shape: (trajectory_length,)# 前向傳播獲取當前策略下的概率和價值action_probs, state_values = self.network(states)# action_probs shape: (trajectory_length, action_dim)# state_values shape: (trajectory_length, 1)state_values = state_values.squeeze() # shape: (trajectory_length,)# 計算當前策略下的對數概率和熵dist = torch.distributions.Categorical(action_probs)new_log_probs = dist.log_prob(actions) # shape: (trajectory_length,)entropy = dist.entropy().mean() # 標量,策略熵# 計算損失# Actor損失:策略梯度損失actor_loss = -(new_log_probs * advantages.detach()).mean() # 標量# Critic損失:價值函數損失critic_loss = F.mse_loss(state_values, returns.detach()) # 標量# 總損失total_loss = actor_loss + self.value_coef * critic_loss - self.entropy_coef * entropy# 反向傳播和優化self.optimizer.zero_grad()total_loss.backward()# 梯度裁剪,防止梯度爆炸torch.nn.utils.clip_grad_norm_(self.network.parameters(), max_norm=0.5)self.optimizer.step()# 重置軌跡self.reset_trajectory()return {'actor_loss': actor_loss.item(),'critic_loss': critic_loss.item(),'entropy': entropy.item(),'total_loss': total_loss.item()}import gym
import matplotlib.pyplot as plt
from collections import dequedef train_a2c(env_name='CartPole-v1', num_episodes=1000, max_steps=500, update_freq=5):"""訓練A2C智能體Args:env_name: 環境名稱num_episodes: 訓練回合數max_steps: 每回合最大步數update_freq: 更新頻率(每多少步更新一次)"""# 創建環境env = gym.make(env_name)state_dim = env.observation_space.shape[0] # 狀態維度,例如CartPole為4action_dim = env.action_space.n # 動作維度,例如CartPole為2print(f"狀態維度: {state_dim}, 動作維度: {action_dim}")# 創建智能體agent = A2CAgent(state_dim, action_dim)# 訓練記錄episode_rewards = [] # 每回合總獎勵recent_rewards = deque(maxlen=100) # 最近100回合的獎勵for episode in range(num_episodes):state = env.reset() # shape: (state_dim,)episode_reward = 0step_count = 0for step in range(max_steps):# 選擇動作action, log_prob, value = agent.select_action(state)# 執行動作next_state, reward, done, _ = env.step(action)# next_state shape: (state_dim,)# reward: 標量# done: 布爾值# 存儲轉移agent.store_transition(state, action, reward, log_prob, value)episode_reward += rewardstep_count += 1state = next_state# 定期更新或回合結束時更新if step_count % update_freq == 0 or done:# 計算下一個狀態的價值(如果回合結束則為0)if done:next_value = 0else:with torch.no_grad():next_state_tensor = torch.FloatTensor(next_state).unsqueeze(0)_, next_value = agent.network(next_state_tensor)next_value = next_value.squeeze().item()# 更新網絡loss_info = agent.update(next_value)if loss_info and episode % 100 == 0:print(f"Episode {episode}, Step {step}: "f"Actor Loss: {loss_info['actor_loss']:.4f}, "f"Critic Loss: {loss_info['critic_loss']:.4f}, "f"Entropy: {loss_info['entropy']:.4f}")if done:break# 記錄獎勵episode_rewards.append(episode_reward)recent_rewards.append(episode_reward)# 打印進度if episode % 100 == 0:avg_reward = np.mean(recent_rewards)print(f"Episode {episode}, Average Reward: {avg_reward:.2f}, "f"Current Reward: {episode_reward:.2f}")env.close()return agent, episode_rewardsdef test_agent(agent, env_name='CartPole-v1', num_episodes=10, render=True):"""測試訓練好的智能體Args:agent: 訓練好的A2C智能體env_name: 環境名稱num_episodes: 測試回合數render: 是否渲染"""env = gym.make(env_name)test_rewards = []for episode in range(num_episodes):state = env.reset()episode_reward = 0done = Falsewhile not done:if render:env.render()# 選擇動作(測試時不需要存儲軌跡)action, _, _ = agent.select_action(state)state, reward, done, _ = env.step(action)episode_reward += rewardtest_rewards.append(episode_reward)print(f"Test Episode {episode + 1}: Reward = {episode_reward}")env.close()avg_test_reward = np.mean(test_rewards)print(f"\n平均測試獎勵: {avg_test_reward:.2f}")return test_rewardsdef plot_training_results(episode_rewards):"""繪制訓練結果Args:episode_rewards: 每回合獎勵列表"""plt.figure(figsize=(12, 4))# 原始獎勵曲線plt.subplot(1, 2, 1)plt.plot(episode_rewards, alpha=0.6)plt.title('Episode Rewards')plt.xlabel('Episode')plt.ylabel('Reward')plt.grid(True)# 移動平均獎勵曲線plt.subplot(1, 2, 2)window_size = 100if len(episode_rewards) >= window_size:moving_avg = []for i in range(window_size - 1, len(episode_rewards)):moving_avg.append(np.mean(episode_rewards[i - window_size + 1:i + 1]))plt.plot(range(window_size - 1, len(episode_rewards)), moving_avg)plt.title(f'Moving Average Rewards (window={window_size})')plt.xlabel('Episode')plt.ylabel('Average Reward')plt.grid(True)plt.tight_layout()plt.show()# 主函數
if __name__ == "__main__":print("開始訓練A2C智能體...")# 訓練智能體agent, rewards = train_a2c(env_name='CartPole-v1',num_episodes=1000,max_steps=500,update_freq=5)print("\n訓練完成!開始測試...")# 測試智能體test_rewards = test_agent(agent, num_episodes=5, render=False)# 繪制結果plot_training_results(rewards)print("\n訓練和測試完成!")
更多關于方差和偏差參考:Making Sense of the Bias / Variance Trade-off in (Deep) Reinforcement Learning
https://blog.mlreview.com/making-sense-of-the-bias-variance-trade-off-in-deep-reinforcement-learning-79cf1e83d565
在 RL 的情況下,方差現在是指有噪聲但平均準確的值估計,而偏差是指穩定但不準確的值估計