文章目錄
- 隊列
- 雙端隊列 deque
- 底層存儲
- deque接口
- 1. `__init__(self, iterable: Iterable[_T], maxlen: int | None = None) -> None`
- 2. `append(self, __x: _T) -> None`
- 3. `appendleft(self, __x: _T) -> None`
- 4. `copy(self) -> Self`
- 5. `count(self, __x: _T) -> int`
- 6. `extend(self, __iterable: Iterable[_T]) -> None`
- 7. `extendleft(self, __iterable: Iterable[_T]) -> None`
- 8. `insert(self, __i: int, __x: _T) -> None`
- 9. `index(self, __x: _T, __start: int = 0, __stop: int = ...) -> int`
- 10. `pop(self) -> _T`
- 11. `popleft(self) -> _T`
- 12. `remove(self, __value: _T) -> None`
- 13. `rotate(self, __n: int = 1) -> None`
- 線程安全隊列Queue
- 屬性
- 方法
- `__init__(self, maxsize: int = 0) -> None`
- `_init(self, maxsize: int) -> None`
- `empty(self) -> bool`
- `full(self) -> bool`
- `get(self, block: bool = True, timeout: float | None = None) -> _T`
- `get_nowait(self) -> _T`
- `_get(self) -> _T`
- `put(self, item: _T, block: bool = True, timeout: float | None = None) -> None`
- `put_nowait(self, item: _T) -> None`
- `_put(self, item: _T) -> None`
- `join(self) -> None`
- `qsize(self) -> int`
- `_qsize(self) -> int`
- `task_done(self) -> None`
- 線程安全優先級隊列
- 線程安全——棧
- 線程安全的簡單的隊列——SimpleQueue
隊列
在python有以下幾種隊列
- deque 雙端隊列
- Queue 隊列 FIFO
- LifoQueue 棧 LIFO
- PriorityQueue 優先隊列
- SimpleQueue 簡單隊列 FIFO
除了deque
,其他都有一個共同的特點——線程安全
雙端隊列 deque
由于python
中的隊列是基于雙端隊列來實現的,所以我們先查看deque
的實現,如果你通過pycharm去追溯deque的源代碼,你會發現里面只有類的定義并沒有具體的實現,其代碼如下:
class deque(MutableSequence[_T], Generic[_T]):@propertydef maxlen(self) -> int | None: ...@overloaddef __init__(self, *, maxlen: int | None = None) -> None: ...@overloaddef __init__(self, iterable: Iterable[_T], maxlen: int | None = None) -> None: ...def append(self, __x: _T) -> None: ...def appendleft(self, __x: _T) -> None: ...def copy(self) -> Self: ...def count(self, __x: _T) -> int: ...def extend(self, __iterable: Iterable[_T]) -> None: ...def extendleft(self, __iterable: Iterable[_T]) -> None: ...def insert(self, __i: int, __x: _T) -> None: ...def index(self, __x: _T, __start: int = 0, __stop: int = ...) -> int: ...def pop(self) -> _T: ... # type: ignore[override]def popleft(self) -> _T: ...def remove(self, __value: _T) -> None: ...def rotate(self, __n: int = 1) -> None: ...def __copy__(self) -> Self: ...def __len__(self) -> int: ...# These methods of deque don't take slices, unlike MutableSequence, hence the type: ignoresdef __getitem__(self, __key: SupportsIndex) -> _T: ... # type: ignore[override]def __setitem__(self, __key: SupportsIndex, __value: _T) -> None: ... # type: ignore[override]def __delitem__(self, __key: SupportsIndex) -> None: ... # type: ignore[override]def __contains__(self, __key: object) -> bool: ...def __reduce__(self) -> tuple[type[Self], tuple[()], None, Iterator[_T]]: ...def __iadd__(self, __value: Iterable[_T]) -> Self: ...def __add__(self, __value: Self) -> Self: ...def __mul__(self, __value: int) -> Self: ...def __imul__(self, __value: int) -> Self: ...def __lt__(self, __value: deque[_T]) -> bool: ...def __le__(self, __value: deque[_T]) -> bool: ...def __gt__(self, __value: deque[_T]) -> bool: ...def __ge__(self, __value: deque[_T]) -> bool: ...def __eq__(self, __value: object) -> bool: ...if sys.version_info >= (3, 9):def __class_getitem__(cls, __item: Any) -> GenericAlias: ...
底層存儲
這說明其底層并不是由python
寫的,而是由效率更高的c
語言的結構體實現,讓我們大致瀏覽一下python
的deque
的底層實現:
在python
的gihub
倉庫中,我門可以從cpython
(cpython/Modules/_collectionsmodule.c
)中找到deque
的蹤跡
#define BLOCKLEN 64
#define CENTER ((BLOCKLEN - 1) / 2)
#define MAXFREEBLOCKS 16typedef struct BLOCK {struct BLOCK *leftlink;PyObject *data[BLOCKLEN];struct BLOCK *rightlink;
} block;typedef struct {PyObject_VAR_HEADblock *leftblock;block *rightblock;Py_ssize_t leftindex; /* 0 <= leftindex < BLOCKLEN */Py_ssize_t rightindex; /* 0 <= rightindex < BLOCKLEN */size_t state; /* incremented whenever the indices move */Py_ssize_t maxlen; /* maxlen is -1 for unbounded deques */Py_ssize_t numfreeblocks;block *freeblocks[MAXFREEBLOCKS];PyObject *weakreflist;
} dequeobject;
如果學過數據結構的都知道,雙端隊列可以由動態數組或雙向列表實現
- 如果使用數組實現雙端隊列,那么當從頭結點刪除數據時那么需要將后面的數據都要向前移動一格,在大批量數據的情況,從頭結點刪除的效率就很低,復雜度為
O(n)
- 如果使用雙向鏈表實現雙端隊列,盡管能夠解決動態數組從頭結點效率低下的問題,但是算向鏈表會占用額外的空間、且隨機訪問的復雜度為 O ( n ) O(n) O(n),此外它具有鏈表的通病:緩存性能差(非連續存儲空間,無法將整個存儲塊加載到內存,在內存中訪問是跳躍式訪問,不是連續訪問,性能會差很多)
所以python
底層選擇了折中的方式,即使用鏈表保證在隊頭插入元素的高效性,也使用數組提高連續訪問的高效能力,從上面的代碼看,python
的雙端隊列是一個雙向鏈表塊dequeobject
,每個塊block
(BLOCK
)是一個64
大小的數組data
,這樣將其性能進行了折中。當然其插入刪除的操作也變的更復雜。
deque接口
這些方法和構造函數都是Python標準庫中collections.deque
類的接口,用于操作雙端隊列。以下是對每個方法的詳細介紹和用法示例:
1. __init__(self, iterable: Iterable[_T], maxlen: int | None = None) -> None
構造函數,用于初始化一個雙端隊列對象。
參數:
iterable
: 可選,初始化隊列的可迭代對象。如果提供,該對象的元素會被按順序添加到隊列中。maxlen
: 可選,隊列的最大長度。如果設置了最大長度,當隊列滿時,添加新元素會導致舊元素被丟棄。
示例:
>>> from collections import deque
>>> deque()
deque([])
>>> deque([1,2,3])
deque([1, 2, 3])
>>> deque([1,2,3], maxlen=5)
deque([1, 2, 3], maxlen=5)
2. append(self, __x: _T) -> None
在隊列的右端添加一個元素。
參數:
__x
: 要添加的元素。
示例:
>>> dq = deque([1, 2, 3])
>>> dq.append(4)
>>> dq
deque([1, 2, 3, 4])
3. appendleft(self, __x: _T) -> None
在隊列的左端添加一個元素。
參數:
__x
: 要添加的元素。
示例:
>>> dq
deque([1, 2, 3, 4])
>>> dq.appendleft(0)
>>> dq
deque([0, 1, 2, 3, 4])
4. copy(self) -> Self
返回一個雙端隊列的淺拷貝。
示例:
>>> dq.copy()
deque([0, 1, 2, 3, 4])
>>> dq.copy() == dq #判斷相等的方法是元素相同
True
>>> dq_copy = dq.copy()
>>> dq_copy.append(5)
>>> dq_copy
deque([0, 1, 2, 3, 4, 5])
>>> dq
deque([0, 1, 2, 3, 4])
5. count(self, __x: _T) -> int
返回隊列中指定元素的出現次數。
參數:
__x
: 要計數的元素。
示例:
>>> dq
deque([0, 1, 2, 3, 4])
>>> dq.count(1)
1
6. extend(self, __iterable: Iterable[_T]) -> None
在隊列的右端擴展一個可迭代對象的所有元素。
參數:
__iterable
: 包含要添加元素的可迭代對象。
示例:
>>> dq.extend([3,4,5]) #接收可迭代類型
>>> dq
deque([0, 1, 2, 3, 4, 3, 4, 5])
7. extendleft(self, __iterable: Iterable[_T]) -> None
在隊列的左端擴展一個可迭代對象的所有元素。注意,元素會按相反的順序添加。
參數:
__iterable
: 包含要添加元素的可迭代對象。
示例:
>>> dq
deque([0, 1, 2, 3, 4, 3, 4, 5])
>>> dq.extendleft((0,1,2)) # 并不是直接拼接在left,而是反向后拼接的
>>> dq
deque([2, 1, 0, 0, 1, 2, 3, 4, 3, 4, 5])
8. insert(self, __i: int, __x: _T) -> None
在指定位置插入一個元素。
參數:
__i
: 要插入的位置索引。如果索引超出范圍,元素會被插入到隊列的兩端。__x
: 要插入的元素。
示例:
>>> dq
deque([2, 1, 100, 0, 0, 1, 2, 3, 4, 3, 4, 5])
>>> dq.insert(20,100) # 超過的索引直接插在末尾
>>> dq
deque([2, 1, 100, 0, 0, 1, 2, 3, 4, 3, 4, 5, 100])
>>> dq.insert(-1,200) #支持負數索引
>>> dq
deque([2, 1, 100, 0, 0, 1, 2, 3, 4, 3, 4, 5, 200, 100])
>>> dq.insert(-20,50)
>>> dq
deque([50, 2, 1, 100, 0, 0, 1, 2, 3, 4, 3, 4, 5, 200, 100])
9. index(self, __x: _T, __start: int = 0, __stop: int = ...) -> int
返回指定值首次出現的索引。如果沒有找到該值,將引發ValueError
。
參數:
__x
: 要查找的元素。__start
: 可選,搜索的起始位置。__stop
: 可選,搜索的結束位置。
示例:
>>> dq.index(100)
3
>>> dq.index(100,start=5) # 非關鍵字參數
Traceback (most recent call last):File "<stdin>", line 1, in <module>
TypeError: index() takes no keyword arguments
>>> dq.index(100,5)
14
10. pop(self) -> _T
移除并返回隊列右端的元素。如果隊列為空,將引發IndexError
。
示例:
>>> dq = deque([1, 2, 3])
>>> dq.pop()
3
>>> dq.pop()
2
>>> dq.pop()
1
>>> dq.pop()
Traceback (most recent call last):File "<stdin>", line 1, in <module>
IndexError: pop from an empty deque
>>>
11. popleft(self) -> _T
移除并返回隊列左端的元素。如果隊列為空,將引發IndexError
。
示例:
>>> dq = deque([1, 2, 3])
>>> dq.popleft()
1
>>> dq.popleft()
2
>>> dq.popleft()
3
>>> dq.popleft()
Traceback (most recent call last):File "<stdin>", line 1, in <module>
IndexError: pop from an empty deque
>>>
12. remove(self, __value: _T) -> None
移除隊列中第一個匹配的元素。如果沒有找到該值,將引發ValueError
。
參數:
__value
: 要移除的元素。
示例:
>>> dq = deque([1, 2, 3,4,3])
>>> dq.remove(3)
>>> dq
deque([1, 2, 4, 3])
>>> dq.remove(0)
Traceback (most recent call last):File "<stdin>", line 1, in <module>
ValueError: deque.remove(x): x not in deque
13. rotate(self, __n: int = 1) -> None
將隊列旋轉 n
步。如果 n
為正數,元素將從右端移到左端。如果 n
為負數,元素將從左端移到右端。
參數:
__n
: 旋轉的步數。
示例:
>>> dq = deque([1, 2, 3, 4])
>>> dq.rotate(2) #向右旋轉
>>> dq
deque([3, 4, 1, 2])
>>> dq.rotate(-1) #向左旋轉
>>> dq
deque([4, 1, 2, 3])
這些方法和接口提供了豐富的操作來管理和操作雙端隊列,適用于需要在兩端進行頻繁插入和刪除操作的數據結構。
線程安全隊列Queue
Queue的底層是基于deque實現的,因為Queue的功能是deque中的部分,只要對其進行限制其雙端性遵循FIFO就可以使用。下面是Queue的官方代碼
class Queue:'''Create a queue object with a given maximum size.If maxsize is <= 0, the queue size is infinite.'''def __init__(self, maxsize=0):self.maxsize = maxsizeself._init(maxsize)# mutex must be held whenever the queue is mutating. All methods# that acquire mutex must release it before returning. mutex# is shared between the three conditions, so acquiring and# releasing the conditions also acquires and releases mutex.self.mutex = threading.Lock()# Notify not_empty whenever an item is added to the queue; a# thread waiting to get is notified then.self.not_empty = threading.Condition(self.mutex)# Notify not_full whenever an item is removed from the queue;# a thread waiting to put is notified then.self.not_full = threading.Condition(self.mutex)# Notify all_tasks_done whenever the number of unfinished tasks# drops to zero; thread waiting to join() is notified to resumeself.all_tasks_done = threading.Condition(self.mutex)self.unfinished_tasks = 0def task_done(self):'''Indicate that a formerly enqueued task is complete.Used by Queue consumer threads. For each get() used to fetch a task,a subsequent call to task_done() tells the queue that the processingon the task is complete.If a join() is currently blocking, it will resume when all itemshave been processed (meaning that a task_done() call was receivedfor every item that had been put() into the queue).Raises a ValueError if called more times than there were itemsplaced in the queue.'''with self.all_tasks_done:unfinished = self.unfinished_tasks - 1if unfinished <= 0:if unfinished < 0:raise ValueError('task_done() called too many times')self.all_tasks_done.notify_all()self.unfinished_tasks = unfinisheddef join(self):'''Blocks until all items in the Queue have been gotten and processed.The count of unfinished tasks goes up whenever an item is added to thequeue. The count goes down whenever a consumer thread calls task_done()to indicate the item was retrieved and all work on it is complete.When the count of unfinished tasks drops to zero, join() unblocks.'''with self.all_tasks_done:while self.unfinished_tasks:self.all_tasks_done.wait()def qsize(self):'''Return the approximate size of the queue (not reliable!).'''with self.mutex:return self._qsize()def empty(self):'''Return True if the queue is empty, False otherwise (not reliable!).This method is likely to be removed at some point. Use qsize() == 0as a direct substitute, but be aware that either approach risks a racecondition where a queue can grow before the result of empty() orqsize() can be used.To create code that needs to wait for all queued tasks to becompleted, the preferred technique is to use the join() method.'''with self.mutex:return not self._qsize()def full(self):'''Return True if the queue is full, False otherwise (not reliable!).This method is likely to be removed at some point. Use qsize() >= nas a direct substitute, but be aware that either approach risks a racecondition where a queue can shrink before the result of full() orqsize() can be used.'''with self.mutex:return 0 < self.maxsize <= self._qsize()def put(self, item, block=True, timeout=None):'''Put an item into the queue.If optional args 'block' is true and 'timeout' is None (the default),block if necessary until a free slot is available. If 'timeout' isa non-negative number, it blocks at most 'timeout' seconds and raisesthe Full exception if no free slot was available within that time.Otherwise ('block' is false), put an item on the queue if a free slotis immediately available, else raise the Full exception ('timeout'is ignored in that case).'''with self.not_full:if self.maxsize > 0:if not block:if self._qsize() >= self.maxsize:raise Fullelif timeout is None:while self._qsize() >= self.maxsize:self.not_full.wait()elif timeout < 0:raise ValueError("'timeout' must be a non-negative number")else:endtime = time() + timeoutwhile self._qsize() >= self.maxsize:remaining = endtime - time()if remaining <= 0.0:raise Fullself.not_full.wait(remaining)self._put(item)self.unfinished_tasks += 1self.not_empty.notify()def get(self, block=True, timeout=None):'''Remove and return an item from the queue.If optional args 'block' is true and 'timeout' is None (the default),block if necessary until an item is available. If 'timeout' isa non-negative number, it blocks at most 'timeout' seconds and raisesthe Empty exception if no item was available within that time.Otherwise ('block' is false), return an item if one is immediatelyavailable, else raise the Empty exception ('timeout' is ignoredin that case).'''with self.not_empty:if not block:if not self._qsize():raise Emptyelif timeout is None:while not self._qsize():self.not_empty.wait()elif timeout < 0:raise ValueError("'timeout' must be a non-negative number")else:endtime = time() + timeoutwhile not self._qsize():remaining = endtime - time()if remaining <= 0.0:raise Emptyself.not_empty.wait(remaining)item = self._get()self.not_full.notify()return itemdef put_nowait(self, item):'''Put an item into the queue without blocking.Only enqueue the item if a free slot is immediately available.Otherwise raise the Full exception.'''return self.put(item, block=False)def get_nowait(self):'''Remove and return an item from the queue without blocking.Only get an item if one is immediately available. Otherwiseraise the Empty exception.'''return self.get(block=False)# Override these methods to implement other queue organizations# (e.g. stack or priority queue).# These will only be called with appropriate locks held# Initialize the queue representationdef _init(self, maxsize):self.queue = deque()def _qsize(self):return len(self.queue)# Put a new item in the queuedef _put(self, item):self.queue.append(item)# Get an item from the queuedef _get(self):return self.queue.popleft()__class_getitem__ = classmethod(types.GenericAlias)
其重要體現隊列的代碼很少,大多數代碼都在保證線程安全,如下:
def _init(self, maxsize):self.queue = deque()def _qsize(self):return len(self.queue)# Put a new item in the queuedef _put(self, item):self.queue.append(item)# Get an item from the queuedef _get(self):return self.queue.popleft()
從上面可以看出,Queue
的核心操作get、put
都是基于deque
實現的
其Queue類的定義如下:
class Queue(Generic[_T]):maxsize: intmutex: Lock # undocumentednot_empty: Condition # undocumentednot_full: Condition # undocumentedall_tasks_done: Condition # undocumentedunfinished_tasks: int # undocumented# Despite the fact that `queue` has `deque` type,# we treat it as `Any` to allow different implementations in subtypes.queue: Any # undocumenteddef __init__(self, maxsize: int = 0) -> None: ...def _init(self, maxsize: int) -> None: ...def empty(self) -> bool: ...def full(self) -> bool: ...def get(self, block: bool = True, timeout: float | None = None) -> _T: ...def get_nowait(self) -> _T: ...def _get(self) -> _T: ...def put(self, item: _T, block: bool = True, timeout: float | None = None) -> None: ...def put_nowait(self, item: _T) -> None: ...def _put(self, item: _T) -> None: ...def join(self) -> None: ...def qsize(self) -> int: ...def _qsize(self) -> int: ...def task_done(self) -> None: ...if sys.version_info >= (3, 9):def __class_getitem__(cls, item: Any) -> GenericAlias: ...
這些函數和屬性是Python標準庫中的queue.Queue
類的接口。queue.Queue
提供了一個線程安全的FIFO(First In, First Out)隊列,常用于多線程環境下的任務管理。以下是每個函數和屬性的詳細介紹及其用法:
屬性
-
maxsize: int
隊列的最大大小。如果maxsize
為 0 或負數,則隊列大小沒有限制。 -
mutex: Lock
一個鎖對象,用于同步隊列的訪問,確保線程安全。 -
not_empty: Condition
一個條件變量,用于在隊列非空時進行通知。 -
not_full: Condition
一個條件變量,用于在隊列未滿時進行通知。 -
all_tasks_done: Condition
一個條件變量,用于在所有任務完成時進行通知。 -
unfinished_tasks: int
未完成任務的計數。 -
queue: Any
實際存儲隊列元素的容器,類型可以是任意的。該屬性未公開文檔。在_init
中將其初始化。
方法
__init__(self, maxsize: int = 0) -> None
構造函數,用于初始化一個隊列對象。
參數:
maxsize
: 隊列的最大大小。如果為 0 或負數,則隊列大小沒有限制。
示例:
from queue import Queueq = Queue(maxsize=10)
_init(self, maxsize: int) -> None
初始化隊列的內部方法。通常由構造函數調用,不直接使用。
參數:
maxsize
: 隊列的最大大小。
empty(self) -> bool
如果隊列為空,返回 True
。
示例:
>>> from queue import Queue
>>> q = Queue()
>>> q.empty()
True
full(self) -> bool
如果隊列已滿,返回 True
。
示例:
>>> q.put(1)
>>> q.put(2)
>>> q.full()
True
>>> q.put(3) # 滿了之后添加元素會一直等待
get(self, block: bool = True, timeout: float | None = None) -> _T
從隊列中移除并返回一個元素。
參數:
block
: 如果為True
,在隊列為空時會阻塞,直到有元素可用。timeout
: 阻塞的最大時間(以秒為單位)。如果為None
,將一直阻塞直到有元素可用。
示例:
>>> from queue import Queue
>>> q = Queue()
>>> q.put(1)
>>> q.put(2)
>>> q
<queue.Queue object at 0x0000020DD481AD30>
>>> list(q) # Queue是不可迭代類型
Traceback (most recent call last):File "<stdin>", line 1, in <module>
TypeError: 'Queue' object is not iterable
>>> print(q)
<queue.Queue object at 0x0000020DD481AD30>
>>> q.get() # FIFO
1
>>> q.get() # LILO
2
get_nowait(self) -> _T
非阻塞地從隊列中移除并返回一個元素。如果隊列為空,拋出 queue.Empty
異常。
示例:
>>> q = Queue()
>>> q.put(1)
>>> q.get_nowait()
1
>>> q.get_nowait() # 不等待會報錯 如果使用q.get()會一直等待,直到右元素入隊
Traceback (most recent call last):File "<stdin>", line 1, in <module>File "D:\Python\Python36\lib\queue.py", line 192, in get_nowaitreturn self.get(block=False)File "D:\Python\Python36\lib\queue.py", line 161, in getraise Empty
queue.Empty
_get(self) -> _T
內部方法,用于從隊列中獲取一個元素。通常由 get
方法調用,不直接使用。
put(self, item: _T, block: bool = True, timeout: float | None = None) -> None
將一個元素放入隊列。
參數:
item
: 要放入隊列的元素。block
: 如果為True
,在隊列已滿時會阻塞,直到有空間可用。timeout
: 阻塞的最大時間(以秒為單位)。如果為None
,將一直阻塞直到有空間可用。
示例:
>>> q = Queue(2)
>>> q.put(1)
>>> q.put(2)
>>> q.put(3,True,3) # 3秒之后報錯
Traceback (most recent call last):File "<stdin>", line 1, in <module>File "D:\Python\Python36\lib\queue.py", line 141, in putraise Full
queue.Full
put_nowait(self, item: _T) -> None
非阻塞地將一個元素放入隊列。如果隊列已滿,拋出 queue.Full
異常。
示例:
>>> q = Queue(2)
>>> q.put(1)
>>> q.put(2)
>>> q.put_nowait(3) # 立即報錯
Traceback (most recent call last):File "<stdin>", line 1, in <module>File "D:\Python\Python36\lib\queue.py", line 184, in put_nowaitreturn self.put(item, block=False)File "D:\Python\Python36\lib\queue.py", line 130, in putraise Full
queue.Full
_put(self, item: _T) -> None
內部方法,用于將一個元素放入隊列。通常由 put
方法調用,不直接使用。
join(self) -> None
阻塞主線程,直到所有的隊列項被處理。每次從 get
中獲取一個項目后,需要調用 task_done
來表明該項目已經完成處理。
示例:
import threading
import time
from queue import Queuedef worker(q):while True:item = q.get()if item is None:breakprint(f'Processing {item}')time.sleep(1)q.task_done()
q = Queue()
thread = threading.Thread(target=worker, args=(q,))
thread.start()
for item in range(5):q.put(item)
q.join() # 阻塞,直到所有任務完成
q.put(None) # 停止工作線程
thread.join()
qsize(self) -> int
返回隊列中的元素數量。
示例:
>>> q = Queue()
>>> q.put(1)
>>> q.put(2)
>>> q.qsize()
2
>>> len(q) #并沒有實現__len__
Traceback (most recent call last):File "<stdin>", line 1, in <module>
TypeError: object of type 'Queue' has no len()
_qsize(self) -> int
內部方法,用于返回隊列中的元素數量。通常由 qsize
方法調用,不直接使用。
task_done(self) -> None
表明一個隊列中的任務已經完成。在每次從 get
中獲取一個項目后調用。
示例:
>>> q = Queue()
>>> q.put(1)
>>> q.get()
1
>>> q.task_done()
線程安全優先級隊列
優先級隊列繼承了Queue
對象,其主要是繼承了它的線程安全性,其底層存儲于deque
沒有任何關系了,就是數組,插入和彈出都換成了堆的上慮和下慮操作,如果你不知道堆是什么,請看我的另一篇博文圖解堆隊列算法,這里就不介紹了
class PriorityQueue(Queue):'''Variant of Queue that retrieves open entries in priority order (lowest first).Entries are typically tuples of the form: (priority number, data).'''def _init(self, maxsize):self.queue = []def _qsize(self):return len(self.queue)def _put(self, item):heappush(self.queue, item)def _get(self):return heappop(self.queue)
那么PriorityQueue
的調用方法是繼承了Queue
,方法使用時一模一樣的。
線程安全——棧
LifoQueue
的線程安全也是套殼Queue
的,只是將初始化換成數組,get
和put
換成我們使用的普通的棧的形式,就是直接拿數組作為棧append
為入棧,pop
為出棧。
class LifoQueue(Queue):'''Variant of Queue that retrieves most recently added entries first.'''def _init(self, maxsize):self.queue = []def _qsize(self):return len(self.queue)def _put(self, item):self.queue.append(item)def _get(self):return self.queue.pop()
那么LifoQueue
的調用方法是繼承了Queue
,方法使用時一模一樣的。
線程安全的簡單的隊列——SimpleQueue
SimpleQueue
是Queue
的簡化版,他沒有了大小限制,但是也是線程安全的,剩余的put
、get
、put_nowait
、get_nowait
、empty
、qsize
操作都相同。
class _PySimpleQueue:'''Simple, unbounded FIFO queue.This pure Python implementation is not reentrant.'''# Note: while this pure Python version provides fairness# (by using a threading.Semaphore which is itself fair, being based# on threading.Condition), fairness is not part of the API contract.# This allows the C version to use a different implementation.def __init__(self):self._queue = deque()self._count = threading.Semaphore(0)def put(self, item, block=True, timeout=None):'''Put the item on the queue.The optional 'block' and 'timeout' arguments are ignored, as this methodnever blocks. They are provided for compatibility with the Queue class.'''self._queue.append(item)self._count.release()def get(self, block=True, timeout=None):'''Remove and return an item from the queue.If optional args 'block' is true and 'timeout' is None (the default),block if necessary until an item is available. If 'timeout' isa non-negative number, it blocks at most 'timeout' seconds and raisesthe Empty exception if no item was available within that time.Otherwise ('block' is false), return an item if one is immediatelyavailable, else raise the Empty exception ('timeout' is ignoredin that case).'''if timeout is not None and timeout < 0:raise ValueError("'timeout' must be a non-negative number")if not self._count.acquire(block, timeout):raise Emptyreturn self._queue.popleft()def put_nowait(self, item):'''Put an item into the queue without blocking.This is exactly equivalent to `put(item, block=False)` and is only providedfor compatibility with the Queue class.'''return self.put(item, block=False)def get_nowait(self):'''Remove and return an item from the queue without blocking.Only get an item if one is immediately available. Otherwiseraise the Empty exception.'''return self.get(block=False)def empty(self):'''Return True if the queue is empty, False otherwise (not reliable!).'''return len(self._queue) == 0def qsize(self):'''Return the approximate size of the queue (not reliable!).'''return len(self._queue)__class_getitem__ = classmethod(types.GenericAlias)if SimpleQueue is None:SimpleQueue = _PySimpleQueue