Python Asyncio排程原理

語言: CN / TW / HK

持續創作,加速成長!這是我參與「掘金日新計劃 · 6 月更文挑戰」的第2天,點選檢視活動詳情

前記

在文章《Python的可等待物件在Asyncio的作用》中介紹了Python的可等待物件作用,特別是Task物件在啟動的時候可以自我驅動,但是一個Task物件只能驅動一條執行鏈,如果要多條鏈執行(併發),還是需要EventLoop來安排驅動,接下來將通過Python.Asyncio庫的原始碼來了解EventLoop是如何運作的。

1.基本介紹

Python.Asyncio是一個大而全的庫,它包括很多功能,而跟核心排程相關的邏輯除了三種可等待物件外,還有其它一些功能,它們分別位於runners.pybase_event.pyevent.py三個檔案中。

runners.py檔案有一個主要的類--Runner,它的主要職責是做好進入協程模式的事件迴圈等到初始化工作,以及在退出協程模式時清理還在記憶體的協程,生成器等物件。

協程模式只是為了能方便理解,對於計算機而言,並沒有這樣區分

event.py檔案除了存放著EventLoop物件的介面以及獲取和設定EventLoop的函式外,還有兩個EventLoop可排程的物件,分別為HandlerTimerHandler,它們可以認為是EvnetLoop呼叫其它物件的容器,用於連線待排程物件和事件迴圈的關係,不過它們的實現非常簡單,對於Handler,它的原始碼如下: ```Python

已經移除了一些不想關的程式碼

class Handle: def init(self, callback, args, loop, context=None): # 初始化上下文,確保執行的時候能找到Handle所在的上下文 if context is None: context = contextvars.copy_context() self._context = context self._loop = loop self._callback = callback self._args = args self._cancelled = False

def cancel(self):
    # 設定當前Handle為取消狀態
    if not self._cancelled:
        self._cancelled = True
        self._callback = None
        self._args = None

def cancelled(self):
    return self._cancelled

def _run(self):
    # 用於執行真正的函式,且通過context.run方法來確保在自己的上下文內執行。
    try:
        # 保持在自己持有的上下文中執行對應的回撥
        self._context.run(self._callback, *self._args)
    except (SystemExit, KeyboardInterrupt):
        raise
    except BaseException as exc:
        cb = format_helpers._format_callback_source(
            self._callback, self._args)
        msg = f'Exception in callback {cb}'
        context = {
            'message': msg,
            'exception': exc,
            'handle': self,
        }
        self._loop.call_exception_handler(context)

通過原始碼可以發現,`Handle`功能十分簡單,提供了可以被取消以及可以在自己所處的上下文執行的功能,而`TimerHandle`繼承於`Handle`比`Handle`多了一些和時間以及排序相關的引數,原始碼如下:Python class TimerHandle(Handle): def init(self, when, callback, args, loop, context=None): super().init(callback, args, loop, context) self._when = when self._scheduled = False

def __hash__(self):
    return hash(self._when)

def __lt__(self, other):
    if isinstance(other, TimerHandle):
        return self._when < other._when
    return NotImplemented

def __le__(self, other):
    if isinstance(other, TimerHandle):
        return self._when < other._when or self.__eq__(other)
    return NotImplemented

def __gt__(self, other):
    if isinstance(other, TimerHandle):
        return self._when > other._when
    return NotImplemented

def __ge__(self, other):
    if isinstance(other, TimerHandle):
        return self._when > other._when or self.__eq__(other)
    return NotImplemented

def __eq__(self, other):
    if isinstance(other, TimerHandle):
        return (self._when == other._when and
                self._callback == other._callback and
                self._args == other._args and
                self._cancelled == other._cancelled)
    return NotImplemented

def cancel(self):
    if not self._cancelled:
        # 用於通知事件迴圈當前Handle已經退出了
        self._loop._timer_handle_cancelled(self)
    super().cancel()

def when(self):
    return self._when

通過程式碼可以發現,這兩個物件十分簡單,而我們在使用`Python.Asyncio`時並不會直接使用到這兩個物件,而是通過`loop.call_xxx`系列方法來把呼叫封裝成`Handle`物件,然後等待`EventLoop`執行。 所以`loop.call_xxx`系列方法可以認為是`EventLoop`的註冊操作,基本上所有非IO的非同步操作都需要通過`loop.call_xxx`方法來把自己的呼叫註冊到`EventLoop`中,比如`Task`物件就在初始化後通過呼叫`loop.call_soon`方法來註冊到`EventLoop`中,`loop.call_sonn`的實現很簡單,它的原始碼如下:Python class BaseEventLoop: ...

def call_soon(self, callback, *args, context=None):
    # 檢查是否事件迴圈是否關閉,如果是則直接丟擲異常
    self._check_closed()
    handle = self._call_soon(callback, args, context)
    return handle

def _call_soon(self, callback, args, context): # 把呼叫封裝成一個handle,這樣方便被事件迴圈呼叫 handle = events.Handle(callback, args, self, context) # 新增一個handle到_ready,等待被呼叫 self._ready.append(handle) return handle `` 可以看到call_soon真正相關的程式碼只有10幾行,它負責把一個呼叫封裝成一個Handle,並新增到self._reday`中,從而實現把呼叫註冊到事件迴圈之中。

loop.call_xxx系列函式除了loop.call_soon系列函式外,還有另外兩個方法--loop.call_atloop.call_later,它們類似於loop.call_soon,不過多了一個時間引數,來告訴EventLoop在什麼時間後才可以呼叫,同時通過loop.call_atloop.call_later註冊的呼叫會通過Python的堆排序模組headpq註冊到self._scheduled變數中,具體程式碼如下: ```Python class BaseEventLoop: ...

def call_later(self, delay, callback, *args, context=None):
    if delay is None:
        raise TypeError('delay must not be None')
    timer = self.call_at(self.time() + delay, callback, *args, context=context)
    return timer

def call_at(self, when, callback, *args, context=None):
    if when is None:
        raise TypeError("when cannot be None")
    self._check_closed()
    # 建立一個timer handle,然後新增到事件迴圈的_scheduled中,等待被呼叫
    timer = events.TimerHandle(when, callback, args, self, context)
    heapq.heappush(self._scheduled, timer)
    timer._scheduled = True
    return timer

```

2.EventLoop的排程實現

在文章《Python的可等待物件在Asyncio的作用》中已經分析到了runner會通過loop.run_until_complete來呼叫mainTask從而開啟EventLoop的排程,所以在分析EventLoop的排程時,應該先從loop.run_until_complete入手,對應的原始碼如下: ```Python class BaseEventLoop: def run_until_complete(self, future): ...

    new_task = not futures.isfuture(future)
    # 把coroutine轉換成task,這樣事件迴圈就可以排程了,事件迴圈的最小排程單位為task
    # 需要注意的是此時事件迴圈並沒註冊到全域性變數中,所以需要顯示的傳進去,
    # 同時Task物件註冊的時候,已經通過loop.call_soon把自己註冊到事件迴圈中,等待排程
    future = tasks.ensure_future(future, loop=self)
    if new_task:
        # An exception is raised if the future didn't complete, so there
        # is no need to log the "destroy pending task" message
        future._log_destroy_pending = False

    # 當該task完成時,意味著當前事件迴圈失去了排程物件,無法繼續排程,所以需要關閉當前事件迴圈,程式會由協程模式返回到執行緒模式
    future.add_done_callback(_run_until_complete_cb)
    try:
        # 事件迴圈開始執行
        self.run_forever()
    except:
        if new_task and future.done() and not future.cancelled():
            # The coroutine raised a BaseException. Consume the exception
            # to not log a warning, the caller doesn't have access to the
            # local task.
            future.exception()
        raise
    finally:
        future.remove_done_callback(_run_until_complete_cb)
    if not future.done():
        raise RuntimeError('Event loop stopped before Future completed.')

    return future.result()

def run_forever(self):
    # 進行一些初始化工作
    self._check_closed()
    self._check_running()
    self._set_coroutine_origin_tracking(self._debug)
    self._thread_id = threading.get_ident()

    old_agen_hooks = sys.get_asyncgen_hooks()
    # 通過asyncgen鉤子來自動關閉asyncgen函式,這樣可以提醒使用者生成器還未關閉
    sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
                           finalizer=self._asyncgen_finalizer_hook)
    try:
        # 設定當前在執行的事件迴圈到全域性變數中,這樣就可以在任一階段獲取到當前的事件迴圈了
        events._set_running_loop(self)
        while True:
            # 正真執行任務的邏輯
            self._run_once()
            if self._stopping:
                break
    finally:
        # 關閉迴圈, 並且清理一些資源
        self._stopping = False
        self._thread_id = None
        events._set_running_loop(None)
        self._set_coroutine_origin_tracking(False)
        sys.set_asyncgen_hooks(*old_agen_hooks)

這段原始碼並不複雜,它的主要邏輯是通過把`Corotinue`轉為一個`Task`物件,然後通過`Task`物件初始化時呼叫`loop.call_sonn`方法把自己註冊到`EventLoop`中,最後再通過`loop.run_forever`中的迴圈程式碼一直執行著,直到`_stopping`被標記為`True`:Python while True: # 正真執行任務的邏輯 self._run_once() if self._stopping: break 可以看出,這段程式碼是確保事件迴圈能一直執行著,自動迴圈結束,而真正排程的核心是`_run_once`函式,它的原始碼如下:Python class BaseEventLoop: ... def _run_once(self): # self._scheduled是一個列表,它只存放TimerHandle sched_count = len(self._scheduled) ############################### # 第一階段,整理self._scheduled # ############################### if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and self._timer_cancelled_count / sched_count > _MIN_CANCELLED_TIMER_HANDLES_FRACTION): # 當待排程的任務數量超過100且待取消的任務佔總任務的50%時,才進入這個邏輯 # 把需要取消的任務移除 new_scheduled = [] for handle in self._scheduled: if handle._cancelled: # 設定handle的_cancelled為True,並且把handle從_scheduled中移除 handle._scheduled = False else: new_scheduled.append(handle)

        # 重新排列堆
        heapq.heapify(new_scheduled)
        self._scheduled = new_scheduled
        self._timer_cancelled_count = 0
    else:
        # 需要取消的handle不多,則只會走這個邏輯,這裡會把堆頂的handle彈出,並標記為不可排程,但不會訪問整個堆
        while self._scheduled and self._scheduled[0]._cancelled:
            self._timer_cancelled_count -= 1
            handle = heapq.heappop(self._scheduled)
            handle._scheduled = False

    #################################
    # 第二階段,計算超時值以及等待事件IO #
    #################################
    timeout = None
    # 當有準備排程的handle或者是正在關閉時,不等待,方便儘快的排程
    if self._ready or self._stopping:
        timeout = 0
    elif self._scheduled:
        # Compute the desired timeout.
        # 如果堆有資料時,通過堆頂的handle計算最短的超時時間,但是最多不能超過MAXIMUM_SELECT_TIMEOUT,以免超過系統限制
        when = self._scheduled[0]._when
        timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)

    # 事件迴圈等待事件,直到有事件或者超時
    event_list = self._selector.select(timeout)

    ##################################################
    # 第三階段,把滿足條件的TimeHandle放入到self._ready中 #
    ##################################################
    # 獲取得到的事件的回撥,然後裝填到_ready
    self._process_events(event_list)

    # 把一些在self._scheduled且滿足排程條件的handle放到_ready中,比如TimerHandle。
    # end_time為當前時間+一個時間單位,猜測是能多處理一些這段時間內產生的事件
    end_time = self.time() + self._clock_resolution
    while self._scheduled:
        handle = self._scheduled[0]
        if handle._when >= end_time:
            break
        handle = heapq.heappop(self._scheduled)
        handle._scheduled = False
        self._ready.append(handle)

    ################################################################################
    # 第四階段,遍歷所有準備排程的handle,並且通過handle的context來執行handle對應的callback #
    ################################################################################
    ntodo = len(self._ready)
    for i in range(ntodo):
        handle = self._ready.popleft()
        # 如果handle已經被取消,則不呼叫
        if handle._cancelled:
            continue
        if self._debug:
            try:
                self._current_handle = handle
                t0 = self.time()
                handle._run()
                dt = self.time() - t0
                if dt >= self.slow_callback_duration:
                    # 執行太久的回撥,記錄下來,這些需要開發者自己優化
                    logger.warning('Executing %s took %.3f seconds',
                                   _format_handle(handle), dt)
            finally:
                self._current_handle = None
        else:
            handle._run()
    handle = None  # Needed to break cycles when an exception occurs.

`` 通過原始碼分析,可以很明確的知道排程邏輯中第一步是先規整self._scheduled,在規整的過程是使用堆排序來進行的,因為堆排序在排程的場景下效率是非常高的,不過這段規整程式碼分成兩種,我猜測是當需要取消的數量過多時直接遍歷的效率會更高。 在規整self._scheduled後,就進入第二步,該步驟開始等待系統事件迴圈返回對應的事件,如果self._ready中有資料,就不做等待了,需要馬上到下一步驟,以便能趕緊安排排程。 在得到系統事件迴圈得到的事件後,就進入到了第三步,該步驟會通過self._process_events方法處理對應的事件,並把事件對應的回撥存放到了self._ready中,最後再遍歷self._ready中的所有Handle並逐一執行(執行時可以認為EventLoop`把控制權返回給對應的呼叫邏輯),至此一個完整的排程邏輯就結束了,並進入下一個排程邏輯。

3.網路IO事件的處理

注:由於系統事件迴圈的限制,所以檔案IO一般還是使用多執行緒來執行,具體見:https://github.com/python/asyncio/wiki/ThirdParty#filesystem

在分析EventLoop排程實現的時候忽略了self._process_events的具體實現邏輯,因為_process_events方法所在asyncio.base_event.py檔案中的BaseEventLoop類並未有具體實現的,因為網路IO相關的需要系統的事件迴圈來幫忙處理,所以與系統事件迴圈相關的邏輯都在asyncio.selector_events.py中的BaseSelectorEventLoop類中。BaseSelectorEventLoop類封裝了selector模組與系統事件迴圈互動,使呼叫者不需要去考慮sock的建立以及sock產生的檔案描述符的監聽與登出等操作,下面以BaseSelectorEventLoop中自帶的pipe為例子,分析BaseSelectorEventLoop是如何進行網路IO事件處理的。

在分析之前,先看一個例子,程式碼如下: ```Python import asyncio import threading

def task(): print("task")

def run_loop_inside_thread(loop): loop.run_forever()

loop = asyncio.get_event_loop() threading.Thread(target=run_loop_inside_thread, args=(loop,)).start() loop.call_soon(task) 如果直接執行這個例子,它並不會輸出`task`(不過在IDE使用DEBUG模式下執行緒啟動會慢一點,所以會輸出的),因為在呼叫`loop.run_forever`後`EventLoop`會一直卡在這段邏輯中:Python event_list = self._selector.select(timeout) 所以呼叫`loop.call_soon`並不會使`EventLoop`馬上安排排程,而如果把`call_soon`換成`call_soon_threadsafe`則可以正常輸出,這是因為`call_soon_threadsafe`中多了一個`self._write_to_self`的呼叫,它的原始碼如下:Python class BaseEventLoop: ...

def call_soon_threadsafe(self, callback, *args, context=None):
    """Like call_soon(), but thread-safe."""
    self._check_closed()
    handle = self._call_soon(callback, args, context)
    self._write_to_self()
    return handle

由於這個呼叫是涉及到IO相關的,所以需要到`BaseSelectorEventLoop`類檢視,接下來以pipe相關的網路IO操作來分析`EventLoop`是如何處理IO事件的(只演示reader物件,writer物件操作與reader類似),對應的原始碼如下:Python class BaseSelectorEventLoop(base_events.BaseEventLoop):

#######
# 建立 #
#######
def __init__(self, selector=None):
    super().__init__()

    if selector is None:
        # 獲取最優的selector
        selector = selectors.DefaultSelector()
    self._selector = selector
    # 建立pipe
    self._make_self_pipe()
    self._transports = weakref.WeakValueDictionary()

def _make_self_pipe(self):
    # 建立Pipe對應的sock 
    self._ssock, self._csock = socket.socketpair()
    # 設定sock為非阻塞
    self._ssock.setblocking(False)
    self._csock.setblocking(False)
    self._internal_fds += 1
    # 阻塞服務端sock讀事件對應的回撥
    self._add_reader(self._ssock.fileno(), self._read_from_self)

def _add_reader(self, fd, callback, *args):
    # 檢查事件迴圈是否關閉
    self._check_closed()
    # 封裝回調為handle物件
    handle = events.Handle(callback, args, self, None)
    try:
        key = self._selector.get_key(fd)
    except KeyError:
        # 如果沒有註冊到系統的事件迴圈,則註冊
        self._selector.register(fd, selectors.EVENT_READ,
                                (handle, None))
    else:
        # 如果已經註冊過,則更新
        mask, (reader, writer) = key.events, key.data
        self._selector.modify(fd, mask | selectors.EVENT_READ,
                              (handle, writer))
        if reader is not None:
            reader.cancel()
    return handle

def _read_from_self(self):
    # 負責消費sock資料
    while True:
        try:
            data = self._ssock.recv(4096)
            if not data:
                break
            self._process_self_data(data)
        except InterruptedError:
            continue
        except BlockingIOError:
            break

#######
# 刪除 #
#######
def _close_self_pipe(self):
    # 登出Pipe對應的描述符 
    self._remove_reader(self._ssock.fileno())
    # 關閉sock
    self._ssock.close()
    self._ssock = None
    self._csock.close()
    self._csock = None
    self._internal_fds -= 1

def _remove_reader(self, fd):
    # 如果事件迴圈已經關閉了,就不用操作了
    if self.is_closed():
        return False
    try:
        # 查詢檔案描述符是否在selector中
        key = self._selector.get_key(fd)
    except KeyError:
        # 不存在則返回
        return False
    else:
        # 存在則進入移除的工作
        mask, (reader, writer) = key.events, key.data
        # 通過事件掩碼判斷是否有其它事件
        mask &= ~selectors.EVENT_READ
        if not mask:
            # 移除已經註冊到selector的檔案描述符
            self._selector.unregister(fd)
        else:
            # 移除已經註冊到selector的檔案描述符,並註冊新的事件
            self._selector.modify(fd, mask, (None, writer))

        # 如果reader不為空,則取消reader
        if reader is not None:
            reader.cancel()
            return True
        else:
            return False

通過原始碼中的建立部分可以看到,`EventLoop`在啟動的時候會建立一對建立通訊的sock,並設定為非阻塞,然後把對應的回撥封裝成一個`Handle`物件並註冊到系統事件迴圈中(刪除則進行對應的反向操作),之後系統事件迴圈就會一直監聽對應的事件,也就是`EventLoop`的執行邏輯會阻塞在下面的呼叫中,等待事件響應:Python event_list = self._selector.select(timeout) 這時如果執行`loop.call_soon_threadsafe`,那麼會通過`write_to_self`寫入一點資訊:Python def _write_to_self(self): csock = self._csock if csock is None: return

    try:
        csock.send(b'\0')
    except OSError:
        if self._debug:
            logger.debug("Fail to write a null byte into the self-pipe socket", exc_info=True)

由於`csock`被寫入了資料,那麼它對應的`ssock`就會收到一個讀事件,系統事件迴圈在收到這個事件通知後就會把資料返回,然後`EventLoop`就會獲得到對應的資料,並交給`process_events`方法進行處理,它的相關程式碼如下:Python class BaseSelectorEventLoop: def _process_events(self, event_list): for key, mask in event_list: # 從回撥事件中獲取到對應的資料,key.data在註冊時是一個元祖,所以這裡要對元祖進行解包 fileobj, (reader, writer) = key.fileobj, key.data if mask & selectors.EVENT_READ and reader is not None: # 得到reader handle,如果是被標記為取消,就移除對應的檔案描述符 if reader._cancelled: self._remove_reader(fileobj) else: # 如果沒被標記為取消,則安排到self._ready中 self._add_callback(reader) if mask & selectors.EVENT_WRITE and writer is not None: # 對於寫物件,也是同樣的道理。 if writer._cancelled: self._remove_writer(fileobj) else: self._add_callback(writer)

def _add_callback(self, handle):
    # 把回撥的handle新增到_ready中
    assert isinstance(handle, events.Handle), 'A Handle is required here'
    if handle._cancelled:
        return
    assert not isinstance(handle, events.TimerHandle)
    self._ready.append(handle)

def _remove_reader(self, fd):
    # 如果事件迴圈已經關閉了,就不用操作了
    if self.is_closed():
        return False
    try:
        # 查詢檔案描述符是否在selector中
        key = self._selector.get_key(fd)
    except KeyError:
        # 不存在則返回
        return False
    else:
        # 存在則進入移除的工作
        mask, (reader, writer) = key.events, key.data
        mask &= ~selectors.EVENT_READ
        if not mask:
            # 移除已經註冊到selector的檔案描述符
            self._selector.unregister(fd)
        else:
            self._selector.modify(fd, mask, (None, writer))

        if reader is not None:
            reader.cancel()
            return True
        else:
            return False

`` 從程式碼中可以看出_process_events會對事件對應的檔案描述符進行處理,並從事件回撥中獲取到對應的Handle物件新增到self._ready中,由EventLoop在接下來遍歷self._ready`並執行。

可以看到網路IO事件的處理並不複雜,因為系統事件迴圈已經為我們做了很多工作了,但是使用者所有與網路IO相關的操作都需要有一個類似的操作,這樣是非常的繁瑣的,幸好asyncio庫已經為我們做了封裝,我們只要呼叫就可以了,方便了很多。