優先級隊列
你可能比較奇怪,隊列不是早就講了嘛。這里之所以放到這里講優先級隊列,是因為雖然名字有隊列,
但其實是使用堆來實現的。上一章講完了堆,這一章我們就趁熱打鐵來實現一個優先級隊列。
實現優先級隊列
優先級隊列(Priority Queue) 顧名思義,就是入隊的時候可以給一個優先級,通常是個數字或者時間戳等,
當出隊的時候我們希望按照給定的優先級出隊,我們按照 TDD(測試驅動開發) 的方式先來寫測試代碼:
def test_priority_queue():size = 5pq = PriorityQueue(size)pq.push(5, 'purple') # priority, valuepq.push(0, 'white')pq.push(3, 'orange')pq.push(1, 'black')res = []while not pq.is_empty():res.append(pq.pop())assert res == ['purple', 'orange', 'black', 'white']
上邊就是期望的行為,寫完測試代碼后我們來編寫優先級隊列的代碼,按照出隊的時候最大優先級先出的順序:
class PriorityQueue(object):def __init__(self, maxsize):self.maxsize = maxsizeself._maxheap = MaxHeap(maxsize)def push(self, priority, value):# 注意這里把這個 tuple push 進去,python 比較 tuple 從第一個開始比較# 這樣就很巧妙地實現了按照優先級排序entry = (priority, value) # 入隊的時候會根據 priority 維持堆的特性self._maxheap.add(entry)def pop(self, with_priority=False):entry = self._maxheap.extract()if with_priority:return entryelse:return entry[1]def is_empty(self):return len(self._maxheap) == 0
源碼
# -*- coding:utf-8 -*-# 第二章拷貝的 Array 代碼class Array(object):def __init__(self, size=32):self._size = sizeself._items = [None] * sizedef __getitem__(self, index):return self._items[index]def __setitem__(self, index, value):self._items[index] = valuedef __len__(self):return self._sizedef clear(self, value=None):for i in range(len(self._items)):self._items[i] = valuedef __iter__(self):for item in self._items:yield item#####################################################
# heap 實現
#####################################################class MaxHeap(object):"""Heaps:完全二叉樹,最大堆的非葉子節點的值都比孩子大,最小堆的非葉子結點的值都比孩子小Heap包含兩個屬性,order property 和 shape property(a complete binary tree),在插入一個新節點的時候,始終要保持這兩個屬性插入操作:保持堆屬性和完全二叉樹屬性, sift-up 操作維持堆屬性extract操作:只獲取根節點數據,并把樹最底層最右節點copy到根節點后,sift-down操作維持堆屬性用數組實現heap,從根節點開始,從上往下從左到右給每個節點編號,則根據完全二叉樹的性質,給定一個節點i, 其父親和孩子節點的編號分別是:parent = (i-1) // 2left = 2 * i + 1rgiht = 2 * i + 2使用數組實現堆一方面效率更高,節省樹節點的內存占用,一方面還可以避免復雜的指針操作,減少調試難度。"""def __init__(self, maxsize=None):self.maxsize = maxsizeself._elements = Array(maxsize)self._count = 0def __len__(self):return self._countdef add(self, value):if self._count >= self.maxsize:raise Exception('full')self._elements[self._count] = valueself._count += 1self._siftup(self._count-1) # 維持堆的特性def _siftup(self, ndx):if ndx > 0:parent = int((ndx-1)/2)if self._elements[ndx] > self._elements[parent]: # 如果插入的值大于 parent,一直交換self._elements[ndx], self._elements[parent] = self._elements[parent], self._elements[ndx]self._siftup(parent) # 遞歸def extract(self):if self._count <= 0:raise Exception('empty')value = self._elements[0] # 保存 root 值self._count -= 1self._elements[0] = self._elements[self._count] # 最右下的節點放到root后siftDownself._siftdown(0) # 維持堆特性return valuedef _siftdown(self, ndx):left = 2 * ndx + 1right = 2 * ndx + 2# determine which node contains the larger valuelargest = ndxif (left < self._count and # 有左孩子self._elements[left] >= self._elements[largest] andself._elements[left] >= self._elements[right]): # 原書這個地方沒寫實際上找的未必是largestlargest = leftelif right < self._count and self._elements[right] >= self._elements[largest]:largest = rightif largest != ndx:self._elements[ndx], self._elements[largest] = self._elements[largest], self._elements[ndx]self._siftdown(largest)class PriorityQueue(object):def __init__(self, maxsize):self.maxsize = maxsizeself._maxheap = MaxHeap(maxsize)def push(self, priority, value):entry = (priority, value) # 注意這里把這個 tuple push進去,python 比較 tuple 從第一個開始比較self._maxheap.add(entry)def pop(self, with_priority=False):entry = self._maxheap.extract()if with_priority:return entryelse:return entry[1]def is_empty(self):return len(self._maxheap) == 0def test_priority_queue():size = 5pq = PriorityQueue(size)pq.push(5, 'purple')pq.push(0, 'white')pq.push(3, 'orange')pq.push(1, 'black')res = []while not pq.is_empty():res.append(pq.pop())assert res == ['purple', 'orange', 'black', 'white']def test_buildin_PriorityQueue(): # python3"""測試內置的 PriorityQueuehttps://pythonguides.com/priority-queue-in-python/"""from queue import PriorityQueueq = PriorityQueue()q.put((10, 'Red balls'))q.put((8, 'Pink balls'))q.put((5, 'White balls'))q.put((4, 'Green balls'))while not q.empty():item = q.get()print(item)def test_buildin_heapq_as_PriorityQueue():"""測試使用 heapq 實現優先級隊列,保存一個 tuple 比較元素(tuple第一個元素是優先級)"""import heapqs_roll = []heapq.heappush(s_roll, (4, "Tom"))heapq.heappush(s_roll, (1, "Aruhi"))heapq.heappush(s_roll, (3, "Dyson"))heapq.heappush(s_roll, (2, "Bob"))while s_roll:deque_r = heapq.heappop(s_roll)print(deque_r)# python3 沒有了 __cmp__ 魔法函數 https://stackoverflow.com/questions/8276983/why-cant-i-use-the-method-cmp-in-python-3-as-for-python-2
class Item:def __init__(self, key, weight):self.key, self.weight = key, weightdef __lt__(self, other): # 看其來 heapq 實現只用了 小于 比較,這里定義了就可以 push 一個 item 類return self.weight < other.weightdef __eq__(self, other):return self.weight == other.weightdef __str__(self):return '{}:{}'.format(self.key,self.weight)def test_heap_item():"""測試使用 Item 類實現優先級隊列,因為 heapq 內置使用的是小于運算法,重寫魔術 < 比較方法即可實現"""import heapqpq = []heapq.heappush(pq, Item('c', 3))heapq.heappush(pq, Item('a', 1))heapq.heappush(pq, Item('b', 2))while pq:print(heapq.heappop(pq))
練習題
- 請你實現按照小優先級先出隊的順序的優先級隊列