线程中task取消_Rust Async: async-task源码分析
async-std是rust異步生態中的基礎運行時庫之一,核心理念是合理的性能 + 用戶友好的api體驗。經過幾個月密集的開發,前些天已經發布1.0穩定版本。因此是時候來一次深入的底層源碼分析。async-std的核心是一個帶工作竊取的多線程Executor,而其本身的實現又依賴于async-task這個關鍵庫,因此本文主要對async-task的源碼進行分析。
當Future提交給Executor執行時,Executor需要在堆上為這個Future分配空間,同時需要給它分配一些狀態信息,比如Future是否可以執行(poll),是否在等待被喚醒,是否已經執行完成等等。我們一般把提交給Executor執行的Future和其連帶的狀態稱為 task。async-task這個庫就是對task進行抽象封裝,以便于Executor的實現,其有幾個創新的特性:
使用方式
async-task只對外暴露了一個函數接口以及對應了兩個返回值類型:
pub fn spawn<F, R, S, T>(future: F, schedule: S, tag: T) -> (Task<T>, JoinHandle<R, T>) whereF: Future<Output = R> + Send + 'static,R: Send + 'static,S: Fn(Task<T>) + Send + Sync + 'static,T: Send + Sync + 'static,其中,參數future表示要執行的Future,schedule是一個閉包,當task變為可執行狀態時會調用這個函數以調度該task重新執行,tag是附帶在該task上的額外上下文信息,比如task的名字,id等。 返回值Task就是構造好的task對象,JoinHandle實現了Future,用于接收最終執行的結果。
值得注意的是spawn這個函數并不會做類似在后臺進行計算的操作,而僅僅是分配內存,創建一個task出來,因此其實叫create_task反而更為恰當且好理解。
Task提供了如下幾個方法:
// 對該task進行調度pub fn schedule(self);// poll一次內部的Future,如果Future完成了,則會通知JoinHandle取結果。否則task進// 入等待,直到被被下一次喚醒進行重新調度執行。pub fn run(self);// 取消task的執行pub fn cancel(&self);// 返回創建時傳入的tag信息pub fn tag(&self) -> &T;JoinHandle實現了Future trait,同時也提供了如下幾個方法:
// 取消task的執行pub fn cancel(&self);// 返回創建時傳入的tag信息pub fn tag(&self) -> &T;同時,Task和JoinHandle都實現了Send+Sync,所以他們可以出現在不同的線程,并通過tag方法可以同時持有 &T,因此spawn函數對T有Sync的約束。
借助于async_task的抽象,下面的幾十行代碼就實現了一個共享全局任務隊列的多線程Executor:
use std::future::Future; use std::thread;use crossbeam::channel::{unbounded, Sender}; use futures::executor; use once_cell::sync::Lazy;static QUEUE: Lazy<Sender<async_task::Task<()>>> = Lazy::new(|| {let (sender, receiver) = unbounded::<async_task::Task<()>>();for _ in 0..4 {let recv = receiver.clone();thread::spawn(|| {for task in recv {task.run(); }});}sender });fn spawn<F, R>(future: F) -> async_task::JoinHandle<R, ()> whereF: Future<Output = R> + Send + 'static,R: Send + 'static, {let schedule = |task| QUEUE.send(task).unwrap();let (task, handle) = async_task::spawn(future, schedule, ());task.schedule();handle }fn main() {let handles: Vec<_> = (0..10).map(|i| {spawn(async move {println!("Hello from task {}", i);})}).collect();// Wait for the tasks to finish.for handle in handles {executor::block_on(handle);} }Task的結構圖
通常rust里的并發數據結構會包含底層的實現,一般叫Inner或者RawXXX,包含大量裸指針等unsafe操作,然后再其基礎上進行類型安全包裝,提供上層語義。比如channel,上層暴露出 Sender和 Receiver,其行為不一樣,但內部表示是完全一樣的。async-task也類似,JoinHandle, Task以及調用Future::poll時傳遞的Waker類型內部都共享同一個RawTask結構。由于JoinHandle本身是一個Future,整個并發結構還有第四個角色-在JoinHandle上調用poll的task傳遞的Waker,為避免引起混淆就稱它為Awaiter吧。整個的結構圖大致如下:
整個task在堆上一次分配,內存布局按Header,Tag, Schedule,Future/Output排列。由于Future和Output不同時存在,因此他們共用同一塊內存。
- JoinHandle:只有一個,不訪問Future,可以訪問Output,一旦銷毀就不再生成;
- Task:主要訪問Future,銷毀后可以繼續生成,不過同一時間最多只有一個,這樣可以避免潛在的多個Task對Future進行并發訪問的bug;
- Waker:可以存在多份,主要訪問schedule數據,由于spawn函數的參數要求schedule必須是Send+Sync,因此多個waker并發調用是安全的。
- Header:本身包含三個部分,state是一個原子變量,包含引用計數,task的執行狀態,awaiter鎖等信息;awaiter保存的是JoinHandle所在的task執行時傳遞的Waker,用于當Output生成后通知JoinHandle來取;vtable是一個指向靜態變量的虛表指針。
task中的狀態
所有的并發操作都是通過Header中的state這個原子變量來進行同步協調的。主要有以下幾種flag:
JoinHandle的實現分析
JoinHandle::cancel
為避免并發問題,JoinHandle不接觸Future數據,而由于取消task的執行需要析構Future數據,因此cancel操作通過重新schedule一次,把操作傳遞給Task執行。
impl<R, T> JoinHandle<R, T> {pub fn cancel(&self) {let ptr = self.raw_task.as_ptr();let header = ptr as *const Header;unsafe {let mut state = (*header).state.load(Ordering::Acquire);loop {// 如果task已經結束或者closed,什么也不做。if state & (COMPLETED | CLOSED) != 0 {break;}let new = if state & (SCHEDULED | RUNNING) == 0 {// 如果不處于scheduled或running狀態,那么下面就需要調用schedule// 函數通知Task,因此要加上SCHEDULED 和增加引用計數(state | SCHEDULED | CLOSED) + REFERENCE} else {// 否則要么task已經schedue過了,過段時間會重新執行,要么當前正在// 運行,因此只需要設置closed狀態,task執行完后會收到close狀態并// 進行處理。state | CLOSED};match (*header).state.compare_exchange_weak(state,new,Ordering::AcqRel,Ordering::Acquire,) {Ok(_) => {// 重新schedule以便executor將Future銷毀if state & (SCHEDULED | RUNNING) == 0 {((*header).vtable.schedule)(ptr);}// 如果有awaiter的話,通知相應的的task。if state & AWAITER != 0 {(*header).notify();}break;}Err(s) => state = s,// 失敗重試}}}} }JoinHandle::drop
由于整個task的所有權是由JoinHandle,Task和Waker共享的,因此都需要手動實現drop。Output只會由JoinHandle訪問,因此如果有的話也要一同銷毀。
impl<R, T> Drop for JoinHandle<R, T> {fn drop(&mut self) {let ptr = self.raw_task.as_ptr();let header = ptr as *const Header;let mut output = None;unsafe {// 由于很多時候JoinHandle不用,會在剛創建的時候直接drop掉,因此針對這種情// 況作一個特殊化處理。這樣一個原子操作就完成了。if let Err(mut state) = (*header).state.compare_exchange_weak(SCHEDULED | HANDLE | REFERENCE,SCHEDULED | REFERENCE,Ordering::AcqRel,Ordering::Acquire,) {loop {// 如果task完成了,但是還沒有close掉,說明output還沒有被取走,需// 要在這里取出來進行析構。if state & COMPLETED != 0 && state & CLOSED == 0 {// 標記為closed,這樣就可以安全地讀取output的數據。match (*header).state.compare_exchange_weak(state,state | CLOSED,Ordering::AcqRel,Ordering::Acquire,) {Ok(_) => { output =Some((((*header).vtable.get_output)(ptr) as *mut R).read());// 更新狀態重新循環state |= CLOSED;}Err(s) => state = s,}} else {// 進到這里說明task要么沒完成,要么已經closed了。let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 {// Task和Waker都已經沒了,并且沒closed,根據進else的條// 件可知task沒完成,Future還在,重新schedule一次,讓// executor把Future析構掉。SCHEDULED | CLOSED | REFERENCE} else {// 移除HANDLE flagstate & !HANDLE};match (*header).state.compare_exchange_weak(state,new,Ordering::AcqRel,Ordering::Acquire,) {Ok(_) => {// 如果這是最后一個引用if state & !(REFERENCE - 1) == 0 { if state & CLOSED == 0 {//并且沒closed,根據進else的條件可知task沒// 完成,重新schedule一次,析構Future((*header).vtable.schedule)(ptr);} else {// task已經完成了,output也已經在上面讀出// 來了,同時也是最后一個引用,需要把task自// 身析構掉。((*header).vtable.destroy)(ptr);}}// 還有其他引用在,資源的釋放由他們負責。break;}Err(s) => state = s,}}}}}// 析構讀取出來的outputdrop(output);} }JoinHandle::poll
檢查Output是否已經可以拿,沒有的話注冊cx.waker()等通知。
impl<R, T> Future for JoinHandle<R, T> {type Output = Option<R>;fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {let ptr = self.raw_task.as_ptr();let header = ptr as *const Header;unsafe {let mut state = (*header).state.load(Ordering::Acquire);loop {// task已經closed了,沒output可拿。if state & CLOSED != 0 {// 大部分可情況下,header里的awaiter就是cx.waker,也有例外,因// 此一并進行通知。(*header).notify_unless(cx.waker());return Poll::Ready(None);}// 如果task還沒完成if state & COMPLETED == 0 {// 那么注冊當前的cx.waker到Header::awaiter里,這樣完成了可以收// 到通知。abort_on_panic(|| {(*header).swap_awaiter(Some(cx.waker().clone()));});// 要是在上面注冊前正好task完成了,那么就收不到通知了,因此注冊后// 需要重新讀取下狀態看看。state = (*header).state.load(Ordering::Acquire);// task已經closed了,沒output可拿,返回None。if state & CLOSED != 0 {// 這里我分析下來是不需要再通知了,提了個pr等作者回應。(*header).notify_unless(cx.waker());return Poll::Ready(None);}// task還沒完成,上面已經注冊了waker,可以直接返回Pending。if state & COMPLETED == 0 {return Poll::Pending;}}// 到這里說明task已經完成了。把它設置為closed狀態,就可以拿output了。match (*header).state.compare_exchange(state,state | CLOSED,Ordering::AcqRel,Ordering::Acquire,) {Ok(_) => {// 設置closed成功,通知其他的awaiter。由于上面是原子的swap操// 作,且一旦設置為closed,awaiter就不會再變更了,因此可以// 用AWAITER這個flag進行快速判斷。if state & AWAITER != 0 {(*header).notify_unless(cx.waker());}// 讀取出Output并返回。let output = ((*header).vtable.get_output)(ptr) as *mut R;return Poll::Ready(Some(output.read()));}Err(s) => state = s,}}}} }Task的實現分析
Task::schedule
這個函數先通過Task內部保存的指針指向Header,并從Header的vtable字段中拿到schedule函數指針,這個函數最終調用的是用戶調用spawn時傳入的schedule閉包。因此本身很直接。
Task::run
這個函數先通過Task內部保存的指針指向Header,并從Header的vtable字段中拿到run函數指針,其指向RawTask::run,實現如下:
首先根據指針參數強轉為RawTask,并根據Header的vtable拿到RawWakerVTable,構造好Waker和Context,為調用Future::poll做準備。
unsafe fn run(ptr: *const ()) {let raw = Self::from_ptr(ptr);let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr,&(*raw.header).vtable.raw_waker,)));let cx = &mut Context::from_waker(&waker);//... }然后獲取當前的state,循環直到更新state的RUNING成功為止。
let mut state = (*raw.header).state.load(Ordering::Acquire);loop {// 如果task已經closed,那么Future可以直接析構掉,并返回。if state & CLOSED != 0 {if state & AWAITER != 0 {(*raw.header).notify();}Self::drop_future(ptr);// 扣掉當前task的引用計數,因為run函數的參數是self。Self::decrement(ptr);return;}// 移除SCHEDULED狀態,并標記RUNINGmatch (*raw.header).state.compare_exchange_weak(state,(state & !SCHEDULED) | RUNNING,Ordering::AcqRel,Ordering::Acquire,) {Ok(_) => {// 更新state到新的狀態,后面的代碼還要復用state。state = (state & !SCHEDULED) | RUNNING;break;}Err(s) => state = s,}}標記為RUNING狀態后,就可以開始正式調用Future::poll了,不過在調用前設置Guard,以便poll函數panic時,可以調用Guard的drop函數保證狀態一致。
let guard = Guard(raw);let poll = <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx);mem::forget(guard); // 沒panic,移除掉guard.drop的調用。match poll {Poll::Ready(out) => {/// ... }Poll::Pending => {// ... }}如果Future完成了,那么先把Future析構掉,騰出內存把output寫進去。并循環嘗試將RUNING狀態去掉。
match poll {Poll::Ready(out) => {Self::drop_future(ptr);raw.output.write(out);let mut output = None;loop {// JoinHandle已經沒了,那么output沒人取,我們需要析構掉output,并設置為// closed狀態。let new = if state & HANDLE == 0 {(state & !RUNNING & !SCHEDULED) | COMPLETED | CLOSED} else {(state & !RUNNING & !SCHEDULED) | COMPLETED};match (*raw.header).state.compare_exchange_weak(state,new,Ordering::AcqRel,Ordering::Acquire,) {Ok(_) => {// 如果handle沒了,或者跑的時候closed了,那么需要把output再讀取// 出來析構掉。if state & HANDLE == 0 || state & CLOSED != 0 {output = Some(raw.output.read());}// 通知JoinHandle來取數據。if state & AWAITER != 0 {(*raw.header).notify();}Self::decrement(ptr);break;}Err(s) => state = s,}}drop(output);}Poll::Pending => {// ...}如果沒完成的話,循環嘗試移除RUNING,同時在poll的時候其他線程不能調用shedule函數,而是設置SCHEDULED,所以需要檢查這個flag,如果設置了,則需要代勞。
match poll {Poll::Ready(out) => {/// handle ready case ... }Poll::Pending => {loop {// poll的時候closed了,這里為啥要移除SCHEDULED狀態,暫時不清楚,需要問問// 作者。let new = if state & CLOSED != 0 {state & !RUNNING & !SCHEDULED} else {state & !RUNNING};match (*raw.header).state.compare_exchange_weak(state,new,Ordering::AcqRel,Ordering::Acquire,) {Ok(state) => {if state & CLOSED != 0 {// 設置closed狀態的那個線程是不能碰Future的,否則和當前線程// 產生內存并發訪問沖突。因此代勞析構操作。Self::drop_future(ptr);Self::decrement(ptr);} else if state & SCHEDULED != 0 {// poll的時候其他線程想schedule這個task,但是不能調用,因此// 當前線程代勞。 chedule函數接收self,類似move語義,因此這里// 不需要decrement。Self::schedule(ptr);} else {Self::decrement(ptr);}break;}Err(s) => state = s,}}} }在poll時如果發生panic,則Guard負責收拾殘局。
fn drop(&mut self) {let raw = self.0;let ptr = raw.header as *const ();unsafe {let mut state = (*raw.header).state.load(Ordering::Acquire);loop {// poll的時候被其他線程closed了,if state & CLOSED != 0 {// 看代碼state一旦處于CLOSED后,schedule不會再運行。這里為啥要移除// SCHEDULED狀態,暫時不清楚,需要問問作者。(*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);// 析構FutureRawTask::<F, R, S, T>::drop_future(ptr);RawTask::<F, R, S, T>::decrement(ptr);break;}match (*raw.header).state.compare_exchange_weak(state,(state & !RUNNING & !SCHEDULED) | CLOSED,Ordering::AcqRel,Ordering::Acquire,) {Ok(state) => {// 析構FutureRawTask::<F, R, S, T>::drop_future(ptr);// 通知awaitertask已經close了.if state & AWAITER != 0 {(*raw.header).notify();}RawTask::<F, R, S, T>::decrement(ptr);break;}Err(s) => state = s,}}} }Waker相關函數的實現
wake函數
wake函數主要功能是設置SCHEDULE狀態,并嘗試調用schedule函數,有兩個重要的細節需要注意:
wake_by_ref
這個函數的功能和wake類似,唯一的區別就是wake的參數是self,有move語義,wakebyref是&self。實現差異不大,就不做具體分析了。
clone_waker
waker的clone實現也比較簡單,直接將Header里的state的引用計數加一即可。
unsafe fn clone_waker(ptr: *const ()) -> RawWaker {let raw = Self::from_ptr(ptr);let raw_waker = &(*raw.header).vtable.raw_waker;let state = (*raw.header).state.fetch_add(REFERENCE, Ordering::Relaxed);if state > isize::max_value() as usize {std::process::abort();}RawWaker::new(ptr, raw_waker) }總結
整個task的設計非常精細,api也非常直觀,難怪一發布就直接上1.0版本。
總結
以上是生活随笔為你收集整理的线程中task取消_Rust Async: async-task源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java 中的doit(n)_CoreJ
- 下一篇: JAVA入门级教学之(构造方法)