Python 進階:queue 佇列原始碼分析
起步
queue 模組提供適用於多執行緒程式設計的先進先出(FIFO)資料結構。因為它是執行緒安全的,所以多個執行緒很輕鬆地使用同一個例項。
原始碼分析
先從初始化的函式來看:
class Queue:
def __init__(self, maxsize=0):
# 設定佇列的最大容量
self.maxsize = maxsize
self._init(maxsize)
# 執行緒鎖,互斥變數
self.mutex = threading.Lock()
# 由鎖衍生出三個條件變數
self.not_empty = threading.Condition(self.mutex)
self.not_full = threading.Condition(self.mutex)
self.all_tasks_done = threading.Condition(self.mutex)
self.unfinished_tasks = 0
def _init(self, maxsize):
# 初始化底層資料結構
self.queue = deque()
從這初始化函式能得到哪些資訊呢?首先,佇列是可以設定其容量大小的,並且具體的底層存放元素的它使用了 collections.deque() 雙端列表的資料結構,這使得能很方便的做先進先出操作。這裡還特地抽象為 _init 函式是為了方便其子類進行覆蓋,允許子類使用其他結構來存放元素(比如優先佇列使用了 list)。
然後就是執行緒鎖 self.mutex ,對於底層資料結構 self.queue 的操作都要先獲得這把鎖;再往下是三個條件變數,這三個 Condition 都以 self.mutex 作為引數,也就是說它們共用一把鎖;從這可以知道諸如 with self.mutex 與 with self.not_empty 等都是互斥的。
基於這些鎖而做的一些簡單的操作:
class Queue:
...
def qsize(self):
# 返回佇列中的元素數
with self.mutex:
return self._qsize()
def empty(self):
# 佇列是否為空
with self.mutex:
return not self._qsize()
def full(self):
# 佇列是否已滿
with self.mutex:
return 0 < self.maxsize <= self._qsize()
def _qsize(self):
return len(self.queue)
這個程式碼片段挺好理解的,無需分析。
作為佇列,主要得完成入隊與出隊的操作,首先是入隊:
class Queue:
...
def put(self, item, block=True, timeout=None):
with self.not_full: # 獲取條件變數not_full
if self.maxsize > 0:
if not block:
if self._qsize() >= self.maxsize:
raise Full # 如果 block 是 False,並且佇列已滿,那麼丟擲 Full 異常
elif timeout is None:
while self._qsize() >= self.maxsize:
self.not_full.wait() # 阻塞直到由剩餘空間
elif timeout < 0: # 不合格的引數值,丟擲ValueError
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = time() + timeout # 計算等待的結束時間
while self._qsize() >= self.maxsize:
remaining = endtime - time()
if remaining <= 0.0:
raise Full # 等待期間一直沒空間,丟擲 Full 異常
self.not_full.wait(remaining)
self._put(item) # 往底層資料結構中加入一個元素
self.unfinished_tasks += 1
self.not_empty.notify()
def _put(self, item):
self.queue.append(item)
儘管只有二十幾行的程式碼,但這裡的邏輯還是比較複雜的。它要處理超時與佇列剩餘空間不足的情況,具體幾種情況如下:
- 如果 block 是 False,忽略timeout引數
- 若此時佇列已滿,則丟擲 Full 異常;
- 若此時佇列未滿,則立即把元素儲存到底層資料結構中;
- 如果 block 是 True
- 若 timeout 是 None 時,那麼put操作可能會阻塞,直到佇列中有空閒的空間(預設);
- 若 timeout 是非負數,則會阻塞相應時間直到佇列中有剩餘空間,在這個期間,如果佇列中一直沒有空間,丟擲 Full 異常;
處理好引數邏輯後,,將元素儲存到底層資料結構中,並遞增unfinished_tasks,同時通知 not_empty ,喚醒在其中等待資料的執行緒。
出隊操作:
class Queue:
...
def get(self, block=True, timeout=None):
with self.not_empty:
if not block:
if not self._qsize():
raise Empty
elif timeout is None:
while not self._qsize():
self.not_empty.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = time() + timeout
while not self._qsize():
remaining = endtime - time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
item = self._get()
self.not_full.notify()
return item
def _get(self):
return self.queue.popleft()
get() 操作是 put() 相反的操作,程式碼塊也及其相似,get() 是從佇列中移除最先插入的元素並將其返回。
- 如果 block 是 False,忽略timeout引數
- 若此時佇列沒有元素,則丟擲 Empty 異常;
- 若此時佇列由元素,則立即把元素儲存到底層資料結構中;
- 如果 block 是 True
- 若 timeout 是 None 時,那麼get操作可能會阻塞,直到佇列中有元素(預設);
- 若 timeout 是非負數,則會阻塞相應時間直到佇列中有元素,在這個期間,如果佇列中一直沒有元素,則丟擲 Empty 異常;
最後,通過 self.queue.popleft() 將最早放入佇列的元素移除,並通知 not_full ,喚醒在其中等待資料的執行緒。
這裡有個值得注意的地方,在 put() 操作中遞增了 self.unfinished_tasks ,而 get() 中卻沒有遞減,這是為什麼?
這其實是為了留給使用者一個消費元素的時間,get() 僅僅是獲取元素,並不代表消費者執行緒處理的該元素,使用者需要呼叫 task_done() 來通知佇列該任務處理完成了:
class Queue:
...
def task_done(self):
with self.all_tasks_done:
unfinished = self.unfinished_tasks - 1
if unfinished <= 0:
if unfinished < 0: # 也就是成功呼叫put()的次數小於呼叫task_done()的次數時,會丟擲異常
raise ValueError('task_done() called too many times')
self.all_tasks_done.notify_all() # 當unfinished為0時,會通知all_tasks_done
self.unfinished_tasks = unfinished
def join(self):
with self.all_tasks_done:
while self.unfinished_tasks: # 如果有未完成的任務,將呼叫wait()方法等待
self.all_tasks_done.wait()
由於 task_done() 使用方呼叫的,當 task_done() 次數大於 put() 次數時會丟擲異常。
task_done() 操作的作用是喚醒正在阻塞的 join() 操作。join() 方法會一直阻塞,直到佇列中所有的元素都被取出,並被處理了(和執行緒的join方法類似)。也就是說 join() 方法必須配合 task_done() 來使用才行。
LIFO 後進先出佇列
LifoQueue使用後進先出順序,與棧結構相似:
class LifoQueue(Queue):
'''Variant of Queue that retrieves most recently added entries first.'''
def _init(self, maxsize):
self.queue = []
def _qsize(self):
return len(self.queue)
def _put(self, item):
self.queue.append(item)
def _get(self):
return self.queue.pop()
這就是 LifoQueue 全部程式碼了,這正是 Queue 設計很棒的一個原因,它將底層的資料操作抽象成四個操作函式,本身來處理執行緒安全的問題,使得其子類只需關注底層的操作。
LifoQueue 底層資料結構改用 list 來存放,通過 self.queue.pop() 就能將 list 中最後一個元素移除,無需重置索引。
PriorityQueue 優先佇列
from heapq import heappush, heappop
class PriorityQueue(Queue):
'''Variant of Queue that retrieves open entries in priority order (lowest first).
Entries are typically tuples of the form: (priority number, data).
'''
def _init(self, maxsize):
self.queue = []
def _qsize(self):
return len(self.queue)
def _put(self, item):
heappush(self.queue, item)
def _get(self):
return heappop(self.queue)
優先佇列使用了 heapq 模組的結構,也就是最小堆的結構。優先佇列更為常用,佇列中專案的處理順序需要基於這些專案的特徵,一個簡單的例子:
import queue
class A:
def __init__(self, priority, value):
self.priority = priority
self.value = value
def __lt__(self, other):
return self.priority < other.priority
q = queue.PriorityQueue()
q.put(A(1, 'a'))
q.put(A(0, 'b'))
q.put(A(1, 'c'))
print(q.get().value) # 'b'
使用優先佇列的時候,需要定義 lt 魔術方法,來定義它們之間如何比較大小。若元素的 priority 相同,依然使用先進先出的順序。
以上就是本次分享的所有內容,想要了解更多 python 知識歡迎前往公眾號:Python 程式設計學習圈 ,傳送 “J” 即可免費獲取,每日干貨分享
- 位元組一面:Redis主節點宕機,如何處理?
- 如何使用 Redis 實現 “附近的人” 這個功能?
- 介紹一款能取代 Scrapy 的爬蟲框架 - feapder
- 直觀講解一下 RPC 呼叫和 HTTP 呼叫的區別!
- MySQL 億級資料分頁的優化
- Python 多執行緒小技巧:比 time.sleep 更好用的暫停寫法!
- Python面試官:請說說併發場景鎖怎麼用?
- Python如何非同步傳送日誌到遠端伺服器?
- Python 中的數字到底是什麼?
- 如何建立一個完美的 Python 專案?
- 詳解 Python 的二元算術運算,為什麼說減法只是語法糖?
- Python 為什麼沒有 main 函式?為什麼我不推薦寫 main 函式?
- Bug分析,假刪除導致文章釋出成功卻打不開的問題
- Python 進階:queue 佇列原始碼分析
- Python例項篇:自動操作Excel檔案(既簡單又特別實用)
- 誰說程式設計師不懂浪漫,當代碼遇到文學..
- Python 為什麼沒有 void 關鍵字?
- 程式語言中分號“;”的簡明歷史
- Python 什麼情況下會生成 pyc 檔案?
- 函式和方法的裝飾器