Python 标准库 18.5 - asyncio
2019獨角獸企業重金招聘Python工程師標準>>>
Event Loop
event loop 對象包含兩個部分:event 和 loop。event 負責 I/O 事件通知而 loop 負責循環處理 I/O 通知并在就緒時調用回調。這里 event 的含義與 select 中的 event mask 類似。
BaseEventLoop 類實現了基本的 loop 部分,而類似于 BaseSelectorEventLoop 這樣的類實現了基于 selector 的 event 部分。
event loop 內部維護著兩個容器:_ready 和 _scheduled。類型分別是 deque 和 list。_ready 代表已經可以執行,_scheduled 代表計劃執行。_scheduled 中的 handle 是可以 cancel 的。
一次 loop 的基本流程可以參見 _run_once() 方法,其說明文檔如下:
This calls all currently ready callbacks, polls for I/O, schedules the resulting callbacks, and finally schedules 'call_later' callbacks.流程為:
將 _scheduled 中已 canceled 的 handle 去掉
檢查 _ready 和 _scheduled 以確定一個用于 _selector.select() 的 timeout 值
timeout = Noneif self._ready:timeout = 0elif self._scheduled:# Compute the desired timeout.when = self._scheduled[0]._whentimeout = max(0, when - self.time())通過 _selector.select() 獲得一個 event_list 并 _process_events() 之
_process_events 即為將 得到的 events(handle)添加到 _ready 中
順序檢查 _scheduled 將其中 .when() 到期的 handle 挪到 _ready 中
順序執行 _ready 中的 handle (handle._run())
故 eventloop 計劃異步任務的基本方法就是將延時任務添加到 _scheduled 中,以及將即時任務添加到 _ready 中。延時任務的來源有 await future、loop.create_task() 等(最簡單的方法應該是直接實例化 Future 實例,但這種做法除非在測試,一般不必用于真實業務中)。即時任務的來源基本有三種:call_soon() 的調用、_shceduled 到期,和 selector.select() 的返回。在 IO 處理中一般主要依賴第三種機制。
callback
callback 類型是普通的函數(不能是 coroutine)。
可以使用的方法有 call_soon 和 call_at。(call_later 是通過 call_at 實現的)
調用 call_soon 會將一個 Handle 壓入 _ready
調用 call_at 會將一個 TimerHandle 壓入 _scheduled
create_task
Task 用于處理 coroutine。底層機制上實際仍然依賴 callback。
I/O
EventLoop 對 IO 的支持依賴 selector,因此基本可以想象一下它的實現邏輯,下面以 BaseSelectorEventLoop 類的 add_reader 方法為例
def add_reader(self, fd, callback, *args):"""Add a reader callback."""self._check_closed()handle = events.Handle(callback, args, self)try:key = self._selector.get_key(fd)except KeyError:self._selector.register(fd, selectors.EVENT_READ,(handle, None))else:mask, (reader, writer) = key.events, key.dataself._selector.modify(fd, mask | selectors.EVENT_READ,(handle, writer))if reader is not None:reader.cancel()當你調用 loop.create_connection 或 asyncio.open_connection 這類操作 IO 的方法時, add_reader 就會被調用。他會把你要使用的 fd 注冊到 selector 里面,注冊的 data 參數是一個 Handle 對象。如調用 asyncio.open_connection 時,這個 callback 就是 _SelectorSocketTransport._read_ready 方法。這樣當 selector 發現這個 fd 就緒的時候,_read_ready就會被調用。
Handle
Handle 是對一個回調的封裝,是調用 call 類方法返回的一個對象,標識一個任務。擁有 cancel() 和 _run() 方法。
Task & Coroutine
Coroutine
使用 async def 定義一個 coroutine function,調用 coroutine function 可以得到一個 coroutine object。下文中統稱 coroutine,具體含義的辯解依賴上下文。
區別兩種對象的一個顯式方法是使用 asyncio.iscoroutine() 和 asyncio.iscoroutinefunction() 函數。
coroutine 的執行依賴 event loop。
較早期版本(3.5 以前)定義 coroutine 的方法是使用 @asyncio.coroutine 裝飾器,await 也需要用 yield from 替代。asyncio 的 sleep() 函數定義如下:
@coroutine def sleep(delay, result=None, *, loop=None):"""Coroutine that completes after a given time (in seconds)."""future = futures.Future(loop=loop)h = future._loop.call_later(delay, future._set_result_unless_cancelled, result)try:return (yield from future)finally:h.cancel()Future
class asyncio.Future(*, loop=None) 是對一個可調用對象的異步執行控制或者說代理的封裝。因此具有如下方法:
- cancel()
- cancelled()
- done()
- result()
- exception()
- add_done_callback(fn)
- remove_done_callback(fn)
- set_result(result)
- set_exception(exception)
注意 Future 并不包含可執行對象的本體,他只保存狀態、結果、額外的回調函數這些東西。這也是上面稱之為代理的原因。因為實際的調用過程是在 event loop 里發生的,event loop 負責在異步執行完成后向 future 對象寫入 result 或 exception。這是異步任務的基本邏輯。
future 實例有三種狀態:
- PENDING
- CANCELLED
- FINISHED
初始狀態為 PENDING,當調用 cancel() 方法時會立即進入 CANCELLED 狀態并 schedule callbacks。當被調用 set_result()時會進入 FINISHED 狀態,并 schedule callbacks。當然兩種情況下傳入 callback 的參數會不同。
schedule callbacks 依然依賴 event loop 來執行:
def _schedule_callbacks(self):"""Internal: Ask the event loop to call all callbacks.The callbacks are scheduled to be called as soon as possible. Alsoclears the callback list."""callbacks = self._callbacks[:]if not callbacks:returnself._callbacks[:] = []for callback in callbacks:self._loop.call_soon(callback, self)Task
class asyncio.Task(coro, *, loop=None) 是 Future 的子類。因為 Future 沒有保存其相關可執行對象的信息,我們 schedule the execution of a coroutine 這件事一般是通過 Task 對象來做的。
create_task 通過調用 _loop.call_soon(self._step) 來 schedule coroutine,而 Task 的核心是其 _step() 方法,_step(self, value=None, exc=None) 的核心代碼是: (更多異常處理代碼沒有貼進來)
try:if exc is not None:result = coro.throw(exc)else:result = coro.send(value)except StopIteration as exc:self.set_result(exc.value)else:if isinstance(result, futures.Future):# Yielded Future must come from Future.__iter__().if result._blocking:result._blocking = Falseresult.add_done_callback(self._wakeup)self._fut_waiter = resultelif result is None:# Bare yield relinquishes control for one event loop iteration.self._loop.call_soon(self._step)result is None 的情況發生在 coroutine 中嵌套 await 時,否則 coroutine return 的值會以 StopIeration 的 value 屬性的形式拋出。這便是當捕獲到此異常時會調用 set_result 的原因。調用此方法意味著 coroutine(Future) 的結束。
因為 Coroutine 中可以 await future,所以 Task 提供了一種機制用于當 future 完成時喚醒父級協程。即為當 await future 時,task 對象會將此 future 保存到 _fut_waiter 對象中,并為其添加一個名為 _wake_up() 的回調。
Transport & Protocol
Transport 是 asyncio 提供的一個抽象了通信接口的類。操作 transport 時可以不再關心具體的通信對象是 socket 還是 pipe。它的基本繼承結構是這樣的:
BaseTransportget_extra_infocloseReadTransport(BaseTransport)pause_readingresume_readingWriteTransport(BaseTransport)set_write_buffer_limitsget_write_buffer_sizewritewritelineswrite_eofcan_write_eofabortTransport(ReadTransport, Writetransport)Protocol 是當創建連接時與 Transport 一起被創建的一個業務層對象。一般用戶需要子類化一個 Protocol 并覆蓋他的一些方法以實現設計的功能。Protocol 的繼承結構是這樣的
BaseProtocolconnection_madeconnection_lostpause_writingresume_writingProtocol(BaseProtocol)data_receivedeof_receivedProtocol 默認定義了以上的事件回調,由 event_loop 負責在事件發生時調用。protocol 的生命周期中這些回調的順序為:
start -> CM [-> DR*] [-> ER?] -> CL -> end* CM: connection_made() * DR: data_received() * ER: eof_received() * CL: connection_lost()其中 connection_made 會傳入對應的 Transport 對象用于標識遠端連接,可以使用這個對象返回消息。因此一個簡單的 echo server 的 Protocol 至少應該是這樣的:
class EchoProtocol(Protocol):def connection_made(self, transport):self.transport = transportdef data_received(self, data):self.transport.write(data)event_loop
Protocol 的實例化與回調調用都依賴 EventLoop,具體的注冊機制為調用 loop 的 create_connection 和 create_server 協程方法。分別用于創建客戶端與服務器連接。
@coroutine def create_connection(self, protocol_factory, host=None, port=None, *,ssl=None, family=0, proto=0, flags=0, sock=None,local_addr=None, server_hostname=None):"""Connect to a TCP server.Create a streaming transport connection to a given Internet host andport: socket family AF_INET or socket.AF_INET6 depending on host (orfamily if specified), socket type SOCK_STREAM. protocol_factory must bea callable returning a protocol instance.This method is a coroutine which will try to establish the connectionin the background. When successful, the coroutine returns a(transport, protocol) pair."""...return transport, protocol@coroutine def create_server(self, protocol_factory, host=None, port=None,*,family=socket.AF_UNSPEC,flags=socket.AI_PASSIVE,sock=None,backlog=100,ssl=None,reuse_address=None):"""Create a TCP server bound to host and port.Return a Server object which can be used to stop the service.This method is a coroutine."""...return server其中 host 和 port 并不是必需的,可以用一個 socket 實例代替。
Streams
Stream 是對 Transport + Protocol 模式的又一層封裝(而非取代)。當使用 asyncio.open_connection 創建客戶端時,得到的是一對 StreamReader 和 StreamWriter 對象。
StreamReader 的核心方法是 _wakeup_waiter 和 feed_data。當 Protocol(StreamProtocol) 的 data_received(data) 被調用時,data 會被直接轉發給 feed_data,然后 feed_data 會把 data 放到 self.buffer 里并調用 _wakeup_waiter,這里的 waiter 是之前如 data = await reader.read(n) 這樣的語句生成的 Future。
_wakeup_waiter 的代碼其實基本上只是調用了一下 self._waiter.set_result(None)。這里之所以使用 None 只是為了將 waiter 的狀態從 _PENDING 改為 _FINISHED,真正的數據獲取(read(n))是從 buffer 里取的。
Stream 之于 Protocol 的優勢在于對 I/O 事件的處理方式,從被動的 def data_received(data) 轉變為主動的 data = await reader.read(n)。將 asyncio 的編程模式從回調式變成了一種類似同步編程的方式。
StreamReaderProtocol
這是用來替代 StreamReader 成為 Protocol 的一個沒什么特別作用只是在轉發方法調用的 helper class,其之所以存在是因為要避免 StreamReader 直接成為 Protocol 的子類。否則用戶就可以直接通過 StreamReader 訪問到 Protocol 的方法,而這種情形與 StreamReader 的設計目標相悖。
因此這個類的存在一般可以忽視。
舉個栗子
下面通過一個最簡單的 socket read 操作解釋一下 event_loop 的調度方式:
import asyncioloop = asyncio.get_event_loop()async def foo(loop):reader, writer = await asyncio.open_connection('127.0.0.1', 1234, loop=loop)for i in range(2):data = await reader.read(100)print(data.decode())loop.run_until_complete(loop.create_task(foo()))上例嘗試創建一個 socket 連接 1234 端口,并讀取兩次數據。內部流程基本是這樣的:
例子中使用了 for 循環來實現同一 socket 的多次讀操作,是因為 foo() 這個 task 一旦執行完畢,由 open_connection 創建的 reader 對象就會被關閉。
可見 Stream 的核心思路便是一旦需要讀數據就使用 _wait_for_data 將自身掛起,并等待 selector 將之喚醒。同時異步編程的基本思路便是:以 Handle 為單位,不斷把下一個 handle 交給 loop 的下一次 _run_once 執行。而 coroutine 又在異步之上增加了掛起/喚醒功能,使得任務流調度更加靈活和接近同步。
轉載于:https://my.oschina.net/lionets/blog/499803
總結
以上是生活随笔為你收集整理的Python 标准库 18.5 - asyncio的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: android保存文件到手机内存
- 下一篇: Supporting Python 3(