prioritized experience replay 思路
優先經驗回放出自ICLR 2016的論文《prioritized experience replay》。
prioritized experience replay的作者們認為,按照一定的優先級來對經驗回放池中的樣本采樣,相比于隨機均勻的從經驗回放池中采樣的效率更高,可以讓模型更快的收斂。其基本思想是RL agent在一些轉移樣本上可以更有效的學習,也可以解釋成“更多地訓練會讓你意外的數據”。
那優先級如何定義呢?作者們使用的是樣本的TD error δ \delta δ 的幅值。對于新生成的樣本,TD error未知時,將樣本賦值為最大優先級,以保證樣本至少將會被采樣一次。每個采樣樣本的概率被定義為
P ( i ) = p i α ∑ k p k α P(i) = \frac {p_i^{\alpha}} {\sum_k p_k^{\alpha}} P(i)=∑k?pkα?piα??
上式中的 p i > 0 p_i >0 pi?>0是回放池中的第i個樣本的優先級, α \alpha α則強調有多重視該優先級,如果 α = 0 \alpha=0 α=0,采樣就退化成和基礎DQN一樣的均勻采樣了。
而 p i p_i pi?如何取值,論文中提供了如下兩種方法,兩種方法都是關于TD error δ \delta δ 單調的:
- 基于比例的優先級: p i = ∣ δ i ∣ + ? p_i = |\delta_i| + \epsilon pi?=∣δi?∣+?, ? \epsilon ?是一個很小的正數常量,防止當TD error為0時樣本就不會被訪問到的情形。(目前大部分實現都是使用的這個形式的優先級)
- 基于排序的優先級: p i = 1 r a n k ( i ) p_i = \frac {1}{rank(i)} pi?=rank(i)1?, 式中的 r a n k ( i ) rank(i) rank(i)是樣本根據 ∣ δ i ∣ |\delta_i| ∣δi?∣ 在經驗回放池中的排序號,此時P就變成了帶有指數 α \alpha α的冪率分布了。
作者們定義的概率調整了樣本的優先級,因此也就在數據分布中引入了偏差,為了彌補偏差,使用了重要性采樣權重(importance-sampling (IS) weights):
w i = ( 1 N ? 1 P ( i ) ) β w_i = \left( \frac{1}{N} \cdot \frac{1}{P(i)} \right)^{\beta} wi?=(N1??P(i)1?)β
上式權重中,當 β = 1 \beta=1 β=1時就完全補償了非均勻概率采樣引入的偏差,作者們提到為了收斂性考慮,最后讓 β \beta β從0到1中的某個值開始,并逐漸增加到1。在Q-learning更新時使用這些權重乘以TD error,也就是使用 w i δ i w_i \delta_i wi?δi?而不是原來的 δ i \delta_i δi?。此外,為了使訓練更穩定,總是對權重乘以 1 / m a x i w i 1/\mathcal{max}_i{w_i} 1/maxi?wi?進行歸一化。
以Double DQN為例,使用優先經驗回放的算法(論文算法1)如下圖:
prioritized experience replay 實現
直接實現優先經驗回放池如下代碼(修改自代碼 )
class PrioReplayBufferNaive:def __init__(self, buf_size, prob_alpha=0.6, epsilon=1e-5, beta=0.4, beta_increment_per_sampling=0.001):self.prob_alpha = prob_alphaself.capacity = buf_sizeself.pos = 0self.buffer = []self.priorities = np.zeros((buf_size, ), dtype=np.float32)self.beta = betaself.beta_increment_per_sampling = beta_increment_per_samplingself.epsilon = epsilondef __len__(self):return len(self.buffer)def size(self): # 目前buffer中數據的數量return len(self.buffer)def add(self, sample):# 新加入的數據使用最大的優先級,保證數據盡可能的被采樣到max_prio = self.priorities.max() if self.buffer else 1.0if len(self.buffer) < self.capacity:self.buffer.append(sample)else:self.buffer[self.pos] = sampleself.priorities[self.pos] = max_prioself.pos = (self.pos + 1) % self.capacitydef sample(self, batch_size):if len(self.buffer) == self.capacity:prios = self.prioritieselse:prios = self.priorities[:self.pos]probs = np.array(prios, dtype=np.float32) ** self.prob_alphaprobs /= probs.sum()indices = np.random.choice(len(self.buffer), batch_size, p=probs, replace=True)samples = [self.buffer[idx] for idx in indices]total = len(self.buffer)self.beta = np.min([1., self.beta + self.beta_increment_per_sampling])weights = (total * probs[indices]) ** (-self.beta)weights /= weights.max()return samples, indices, np.array(weights, dtype=np.float32)def update_priorities(self, batch_indices, batch_priorities):'''更新樣本的優先級'''for idx, prio in zip(batch_indices, batch_priorities):self.priorities[idx] = prio + self.epsilon
直接實現的優先經驗回放,在樣本數很大時的采樣效率不夠高,作者們通過定義了sumtree的數據結構來存儲樣本優先級,該數據結構的每一個節點的值為其子節點之和,而樣本優先級被放在樹的葉子節點上,樹的根節點的值為所有優先級之和 p t o t a l p_{total} ptotal?,更新和采樣時的效率為 O ( l o g N ) O(logN) O(logN)。在采樣時,設采樣批次大小為k,將 [ 0 , p t o t a l ] [0, p_{total}] [0,ptotal?]均分為k等份,然后在每一個區間均勻的采樣一個值,再通過該值從樹中提取到對應的樣本。python 實現如下(代碼來源)
class SumTree:"""父節點的值是其子節點值之和的二叉樹數據結構"""write = 0def __init__(self, capacity):self.capacity = capacityself.tree = np.zeros(2 * capacity - 1)self.data = np.zeros(capacity, dtype=object)self.n_entries = 0# update to the root nodedef _propagate(self, idx, change):parent = (idx - 1) // 2self.tree[parent] += changeif parent != 0:self._propagate(parent, change)# find sample on leaf nodedef _retrieve(self, idx, s):left = 2 * idx + 1right = left + 1if left >= len(self.tree):return idxif s <= self.tree[left]:return self._retrieve(left, s)else:return self._retrieve(right, s - self.tree[left])def total(self):return self.tree[0]# store priority and sampledef add(self, p, data):idx = self.write + self.capacity - 1self.data[self.write] = dataself.update(idx, p)self.write += 1if self.write >= self.capacity:self.write = 0if self.n_entries < self.capacity:self.n_entries += 1# update prioritydef update(self, idx, p):change = p - self.tree[idx]self.tree[idx] = pself._propagate(idx, change)# get priority and sampledef get(self, s):idx = self._retrieve(0, s)dataIdx = idx - self.capacity + 1return (idx, self.tree[idx], self.data[dataIdx])class PrioReplayBuffer: # stored as ( s, a, r, s_ ) in SumTreeepsilon = 0.01alpha = 0.6beta = 0.4beta_increment_per_sampling = 0.001def __init__(self, capacity):self.tree = SumTree(capacity)self.capacity = capacitydef _get_priority(self, error):return (np.abs(error) + self.epsilon) ** self.alphadef add(self, error, sample):p = self._get_priority(error)self.tree.add(p, sample)def sample(self, n):batch = []idxs = []segment = self.tree.total() / npriorities = []self.beta = np.min([1., self.beta + self.beta_increment_per_sampling])for i in range(n):a = segment * ib = segment * (i + 1)s = random.uniform(a, b)(idx, p, data) = self.tree.get(s)priorities.append(p)batch.append(data)idxs.append(idx)sampling_probabilities = priorities / self.tree.total()is_weight = np.power(self.tree.n_entries * sampling_probabilities, -self.beta)is_weight /= is_weight.max()return batch, idxs, is_weightdef update(self, idx, error):'''這里是一次更新一個樣本,所以在調用時,寫for循環依次更次樣本的優先級'''p = self._get_priority(error)self.tree.update(idx, p)
參考資料
-
Schaul, Tom, John Quan, Ioannis Antonoglou, and David Silver. 2015. “Prioritized Experience Replay.” arXiv: Learning,arXiv: Learning, November.
-
sum_tree的實現代碼
-
相關blog: 1 (對應的代碼), 2, 3