今天看的論文是這篇
主要提出了傳統優先級經驗回放(PER)在復雜交通場景中效率低下,使用二叉樹存儲樣本,導致大規模樣本時計算復雜度高。而且不丟棄樣本,造成存儲空間浪費。
雙重經驗池:
為了解決以上問題,文章提出了雙重經驗池。分為普通經驗池(隨機存儲交互數據?(s_t, a_t, r, s_{t+1})
,用于基礎訓練。)
和優先經驗池(僅存儲??高價值樣本??(需滿足:獎勵≥歷史平均獎勵且>獎勵中位數))。
并執行異優訓練:優先池以一定概率啟動訓練(如10%),避免過擬合并加速收斂。
動態訓練周期??
為了解決固定訓練周期在交通流變化時效率低(如車輛少時過度訓練)。提出了動態訓練周期
動態生成每輪訓練周期(epoch),公式如下:(這里的訓練周期動態我是第一次見)
動態系數
這樣設計能夠使早期車少時側重即時獎勵變化(ω1?權重高),后期車多時側重歷史表現(ω2?權重高)。確實妙啊。
還有加入了之前別的論文也研究過的壓力獎勵:
獎勵函數設計:
定義??車道壓力??:Pi?=Nin??Nout?(入站車輛數 - 出站車輛數)。
??獎勵??:ri?=?Pi?,總獎勵?R(st?,at?)=∑ri?。
這樣能夠協調車量通過最小化壓力路口實現縮短車輛平均通行時間,提升路口吞吐量??。?
論文總結:DERLight通過??雙經驗回放??提升采樣效率,結合??動態周期訓練??適應環境變化,以??壓力驅動的獎勵函數??優化交通流。實驗證明其在降低通行時間、提升吞吐量和加速收斂方面顯著優于主流算法(如CoLight、PressLight),且具備跨領域應用潛力。
個人看法:
我覺得這個壓力獎勵還有很大的研究空間。這個動態周期確實我之前居然沒想到過(太菜了),作者給我提供了一個新的思路,核心雙重經驗池也是很大的思路。(能不能多重)(動態經驗池)。
主要代碼大概
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random
from collections import deque
import mathclass DQN(nn.Module):"""DQN網絡結構 (評估網絡和目標網絡)"""def __init__(self, state_size, action_size):super(DQN, self).__init__()self.fc1 = nn.Linear(state_size, 64)self.fc2 = nn.Linear(64, 64)self.fc3 = nn.Linear(64, action_size)self.relu = nn.ReLU()def forward(self, state):x = self.relu(self.fc1(state))x = self.relu(self.fc2(x))return self.fc3(x)class DERLight:"""DERLight交通燈控制算法實現"""def __init__(self, state_size, action_size):# 算法參數設置self.state_size = state_sizeself.action_size = action_sizeself.memory_capacity = 10000 # 經驗池容量self.batch_size = 32self.gamma = 0.8 # 折扣因子self.epsilon = 0.8 # 初始探索率self.epsilon_min = 0.2self.epsilon_decay = 0.995self.target_update_freq = 5 # 目標網絡更新頻率self.priority_prob = 0.1 # 優先經驗池啟動概率# 創建雙經驗池self.memory_D = deque(maxlen=self.memory_capacity) # 普通經驗池self.memory_D_prime = deque(maxlen=self.memory_capacity) # 優先經驗池# 創建DQN網絡self.eval_net = DQN(state_size, action_size)self.target_net = DQN(state_size, action_size)self.target_net.load_state_dict(self.eval_net.state_dict())self.optimizer = optim.Adam(self.eval_net.parameters(), lr=0.001)self.loss_func = nn.MSELoss()# 獎勵相關參數self.rewards = [] # 獎勵歷史self.epoch = 1000 # 初始訓練周期self.r_median = None # 獎勵中位數self.r_average = 0 # 獎勵平均值def compute_pressure(self, state):"""計算車道壓力 (核心公式1)"""# 狀態向量包含各車道的車輛數 [in_N, out_N, in_S, out_S, in_E, out_E, in_W, out_W]pressures = []for i in range(0, len(state), 2):n_in = state[i]n_out = state[i+1]pressures.append(n_in - n_out)return pressuresdef compute_reward(self, state):"""計算壓力獎勵 (核心公式2)"""pressures = self.compute_pressure(state)total_reward = -sum(pressures) # 總獎勵為壓力總和的負值return total_rewarddef dynamic_epoch(self, r_t, r_t_minus1):"""動態訓練周期計算 (核心公式6-8)"""# 更新歷史獎勵記錄self.rewards.append(r_t)self.r_average = np.mean(self.rewards)self.r_median = np.median(self.rewards) if self.rewards else 0# 模擬時間 (0-60分鐘)T = len(self.rewards) % 60# 計算動態系數 (公式7-8)w1 = -math.atan(30 - T) * (r_t_minus1 - r_t)w2 = -math.atan(T - 30) * self.r_average# 更新訓練周期 (公式6)epoch_update = w1 + w2self.epoch = max(100, min(2000, self.epoch + int(epoch_update)))return self.epochdef store_experience(self, state, action, reward, next_state, done):"""存儲經驗到雙經驗池"""# 存儲到普通經驗池Dself.memory_D.append((state, action, reward, next_state, done))# 判斷是否為優先經驗 (條件: 獎勵≥平均獎勵且>中位數)if (reward >= self.r_average) and (reward > self.r_median):self.memory_D_prime.append((state, action, reward, next_state, done))def sample_experience(self, memory):"""從經驗池采樣"""if len(memory) < self.batch_size:return Nonebatch = random.sample(memory, self.batch_size)states, actions, rewards, next_states, dones = zip(*batch)return (torch.FloatTensor(states),torch.LongTensor(actions),torch.FloatTensor(rewards),torch.FloatTensor(next_states),torch.FloatTensor(dones))def choose_action(self, state):"""ε-貪婪策略選擇動作"""if np.random.rand() <= self.epsilon:return random.randrange(self.action_size)else:state = torch.FloatTensor(state).unsqueeze(0)q_values = self.eval_net(state)return torch.argmax(q_values).item()def update_network(self, sample, is_priority=False):"""更新網絡參數"""if sample is None:returnstates, actions, rewards, next_states, dones = sample# 計算當前Q值q_eval = self.eval_net(states).gather(1, actions.unsqueeze(1))# 計算目標Q值q_next = self.target_net(next_states).detach()q_target = rewards + (1 - dones) * self.gamma * q_next.max(1)[0].view(-1, 1)# 計算損失并更新網絡loss = self.loss_func(q_eval, q_target)self.optimizer.zero_grad()loss.backward()self.optimizer.step()# 動態調整探索率if self.epsilon > self.epsilon_min:self.epsilon *= self.epsilon_decaydef train(self, env, episodes=1000):"""DERLight訓練過程"""for episode in range(episodes):state = env.reset()total_reward = 0r_t_minus1 = 0 # 上一時間步的獎勵while True:# 選擇并執行動作action = self.choose_action(state)next_state, done = env.step(action)# 計算壓力獎勵reward = self.compute_reward(state)total_reward += reward# 存儲經驗self.store_experience(state, action, reward, next_state, done)# 從普通經驗池D采樣并訓練sample_D = self.sample_experience(self.memory_D)self.update_network(sample_D)# 以一定概率從優先經驗池D'采樣并訓練if np.random.rand() < self.priority_prob and self.memory_D_prime:# 計算動態訓練周期self.dynamic_epoch(reward, r_t_minus1)# 使用動態周期進行多次訓練for _ in range(min(5, self.epoch // 100)):sample_D_prime = self.sample_experience(self.memory_D_prime)self.update_network(sample_D_prime, is_priority=True)# 更新目標網絡if episode % self.target_update_freq == 0:self.target_net.load_state_dict(self.eval_net.state_dict())# 更新狀態和獎勵state = next_stater_t_minus1 = rewardif done:print(f"Episode: {episode}, Total Reward: {total_reward:.2f}, Epsilon: {self.epsilon:.3f}, Epoch: {self.epoch}")breakclass TrafficSimulationEnv:"""簡化的交通模擬環境 (用于演示)"""def __init__(self, state_size=8):self.state_size = state_sizeself.max_steps = 100def reset(self):self.step_count = 0# 隨機生成初始狀態: [in_N, out_N, in_S, out_S, in_E, out_E, in_W, out_W]self.state = np.random.randint(0, 20, size=self.state_size)return self.statedef step(self, action):# 簡化狀態轉移邏輯self.step_count += 1# 根據動作更新車流# 實際實現應使用更復雜的交通流模型next_state = self.state.copy()# 減少進入車輛 (模擬車輛離開)for i in [0, 2, 4, 6]: # 入口車道next_state[i] = max(0, next_state[i] - np.random.randint(1, 4))# 增加新車輛 (概率性)for i in range(len(next_state)):if np.random.rand() < 0.3:next_state[i] += np.random.randint(1, 3)# 檢查結束條件done = self.step_count >= self.max_stepsself.state = next_statereturn next_state, doneif __name__ == "__main__":# 初始化環境和算法env = TrafficSimulationEnv(state_size=8)derlight = DERLight(state_size=8, action_size=4) # 4個相位動作# 開始訓練derlight.train(env, episodes=1000)