日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 >

rust里mp5a4_Rust源码分析:channel内部mpsc队列

發(fā)布時間:2024/7/23 45 豆豆
生活随笔 收集整理的這篇文章主要介紹了 rust里mp5a4_Rust源码分析:channel内部mpsc队列 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

首先,之前的upgrade過程中內(nèi)存的回收要稍微注意下。因為Receiver現(xiàn)在指向shared::Packet之后,那個new_port需要被析構(gòu),也就是調(diào)用drop函數(shù),我們看下drop的實現(xiàn):

implDropforReceiver{fn drop(&mutself){match*unsafe{self.inner()}{Flavor::Oneshot(refp)=>p.drop_port(),Flavor::Stream(refp)=>p.drop_port(),Flavor::Shared(refp)=>p.drop_port(),Flavor::Sync(refp)=>p.drop_port(),}}}

由于之前的swap操作,走Flavor::Oneshot路徑:

pubfn drop_port(&self){matchself.state.swap(DISCONNECTED,Ordering::SeqCst){// An empty channel has nothing to do, and a remotely disconnected// channel also has nothing to do b/c we're about to run the drop// glueDISCONNECTED|EMPTY=>{}// There's data on the channel, so make sure we destroy it promptly.// This is why not using an arc is a little difficult (need the box// to stay valid while we take the data).DATA=>unsafe{(&mut*self.data.get()).take().unwrap();},// We're the only ones that can block on this port_=>unreachable!()}}

同樣是DISCONNECTED替換DISCONNECTED而已,沒有過多操作。

同時不再需要的oneshot::Packet也要被析構(gòu):

implDropforPacket{fn drop(&mutself){assert_eq!(self.state.load(Ordering::SeqCst),DISCONNECTED);}}

只是個DISCONNECTED的檢驗操作。

所以現(xiàn)在Sender/Receiver都存放了Flavor::Shared(Arc<:packet>>),之前的Flavor::Oneshot(Arc<:packet>>>和臨時產(chǎn)生的Sender/Receiver都不存在了。

并發(fā)隊列

所以我們接著關(guān)注內(nèi)在的數(shù)據(jù)結(jié)構(gòu),通過跟蹤以下函數(shù)來分析:Sender::send(&self, t: T)

Receiver::recv(&self)

Receiver::recv_timeout(&self, timeout: Duration)

Sender::send(&self, t: T):

pubfn send(&self,t: T)-> Result>{let(new_inner,ret)=match*unsafe{self.inner()}{Flavor::Oneshot(refp)=>{if!p.sent(){returnp.send(t).map_err(SendError);}else{leta=Arc::new(stream::Packet::new());letrx=Receiver::new(Flavor::Stream(a.clone()));matchp.upgrade(rx){oneshot::UpSuccess=>{letret=a.send(t);(a,ret)}oneshot::UpDisconnected=>(a,Err(t)),oneshot::UpWoke(token)=>{// This send cannot panic because the thread is// asleep (we're looking at it), so the receiver// can't go away.a.send(t).ok().unwrap();token.signal();(a,Ok(()))}}}}Flavor::Stream(refp)=>returnp.send(t).map_err(SendError),Flavor::Shared(refp)=>returnp.send(t).map_err(SendError),Flavor::Sync(..)=>unreachable!(),};unsafe{lettmp=Sender::new(Flavor::Stream(new_inner));mem::swap(self.inner_mut(),tmp.inner_mut());}ret.map_err(SendError)}

事實上,對于我們的case,只有需要關(guān)注一句代碼即可:

Flavor::Shared(refp)=>returnp.send(t).map_err(SendError),

這里的p是Arc<:packet>>的一個引用。我們繼續(xù)看p.send(t):

pubfn send(&self,t: T)-> Result{// See Port::drop for what's going onifself.port_dropped.load(Ordering::SeqCst){returnErr(t)}ifself.cnt.load(Ordering::SeqCst){self.take_to_wake().signal();}nifn{// see the comment in 'try' for a shared channel for why this// window of "not disconnected" is ok.self.cnt.store(DISCONNECTED,Ordering::SeqCst);ifself.sender_drain.fetch_add(1,Ordering::SeqCst)==0{loop{// drain the queue, for info on the thread yield see the// discussion in try_recvloop{matchself.queue.pop(){mpsc::Data(..)=>{}mpsc::Empty=>break,mpsc::Inconsistent=>thread::yield_now(),}}ifself.sender_drain.fetch_sub(1,Ordering::SeqCst)==1{break}}}}// Can't make any assumptions about this case like in the SPSC case._=>{}}Ok(())}

同時,我們再看下shared::Packet的數(shù)據(jù)結(jié)構(gòu)跟初始化信息:

constDISCONNECTED: isize =isize::MIN;constFUDGE: isize =1024;pubstruct Packet{queue: mpsc::Queue,cnt: AtomicIsize,// How many items are on this channelsteals: UnsafeCell,// How many times has a port received without blocking?to_wake: AtomicUsize,// SignalToken for wake upchannels: AtomicUsize,port_dropped: AtomicBool,sender_drain: AtomicIsize,select_lock: Mutex,}pubfn new()-> Packet{Packet{queue: mpsc::Queue::new(),cnt: AtomicIsize::new(0),steals: UnsafeCell::new(0),to_wake: AtomicUsize::new(0),channels: AtomicUsize::new(2),port_dropped: AtomicBool::new(false),sender_drain: AtomicIsize::new(0),select_lock: Mutex::new(()),}}

我們發(fā)現(xiàn):port_dropped用于標記接收端是否已經(jīng)drop。

cnt會計數(shù)當(dāng)前存入多少個數(shù)據(jù)。同時cnt通過跟DISCONNECTED的比較來判斷消費者是否已斷開。

如果send中發(fā)現(xiàn)消費的一方已經(jīng)斷開,則會自己嘗試pop所有的數(shù)據(jù),將他們清理掉。

主要的操作是通過self.queue.push(t)來完成。

那這個self.queue是怎么實現(xiàn)的呢?看下它的代碼,位于文件sync/mpsc/mpsc_queue.rs:

pubstruct Queue{head: AtomicPtr>,tail: UnsafeCell>,}unsafeimplSendforQueue{}unsafeimplSyncforQueue{}implQueue{pubfn new()-> Queue{letstub=unsafe{Node::new(None)};Queue{head: AtomicPtr::new(stub),tail: UnsafeCell::new(stub),}}pubfn push(&self,t: T){unsafe{letn=Node::new(Some(t));letprev=self.head.swap(n,Ordering::AcqRel);(*prev).next.store(n,Ordering::Release);}}pubfn pop(&self)-> PopResult{unsafe{lettail=*self.tail.get();letnext=(*tail).next.load(Ordering::Acquire);if!next.is_null(){*self.tail.get()=next;assert!((*tail).value.is_none());assert!((*next).value.is_some());letret=(*next).value.take().unwrap();let_: Box>=Box::from_raw(tail);returnData(ret);}ifself.head.load(Ordering::Acquire)==tail{Empty}else{Inconsistent}}}............}

事實上,它采用了Non-intrusive MPSC node-based queue的算法,構(gòu)造了一個mpsc的單向鏈表,感興趣的可以通過這個鏈接詳細了解。

這個算法的優(yōu)點是:push:并發(fā)特別快,無等待并且?guī)缀鮾H僅一個swap(XCHG指令)操作,通過不斷地先swap成為head,然后再鏈接prev_head.next = head來構(gòu)造鏈表。

缺點是:non-Linearability:不具備線性一致性,push操作會阻塞pop操作,pop操作中如果發(fā)現(xiàn)head != tail 同時 tail.next還沒來得變?yōu)榉莕ull,那么就觀察到整個隊列處于不一致的狀態(tài),這種情況下這里的實現(xiàn)返回Inconsistent。

同時我們看一下Node的代碼:

struct Node{next: AtomicPtr>,value: Option,}implNode{unsafefn new(v: Option)-> *mutNode{Box::into_raw(boxNode{next: AtomicPtr::new(ptr::null_mut()),value: v,})}}

相對以往不同的是new操作返回的是*mut Node,這里通過Box::into_raw讓使用者自己負責(zé)Node的內(nèi)存釋放。

另一方面,當(dāng)我們Receiver.recv()時假如channel中沒有數(shù)據(jù),那么就需要等待,所以我們再看下相關(guān)的代碼:

pubfn recv(&self)-> Result{loop{letnew_port=match*unsafe{self.inner()}{Flavor::Oneshot(refp)=>{matchp.recv(None){Ok(t)=>returnOk(t),Err(oneshot::Disconnected)=>returnErr(RecvError),Err(oneshot::Upgraded(rx))=>rx,Err(oneshot::Empty)=>unreachable!(),}}Flavor::Stream(refp)=>{matchp.recv(None){Ok(t)=>returnOk(t),Err(stream::Disconnected)=>returnErr(RecvError),Err(stream::Upgraded(rx))=>rx,Err(stream::Empty)=>unreachable!(),}}Flavor::Shared(refp)=>{matchp.recv(None){Ok(t)=>returnOk(t),Err(shared::Disconnected)=>returnErr(RecvError),Err(shared::Empty)=>unreachable!(),}}Flavor::Sync(refp)=>returnp.recv(None).map_err(|_|RecvError),};unsafe{mem::swap(self.inner_mut(),new_port.inner_mut());}}}

只要看:

pubfn recv(&self)-> Result{loop{letnew_port=match*unsafe{self.inner()}{.........Flavor::Shared(refp)=>{matchp.recv(None){Ok(t)=>returnOk(t),Err(shared::Disconnected)=>returnErr(RecvError),Err(shared::Empty)=>unreachable!(),}}};...........}}

接著看p.recv(),它的返回值決定了調(diào)用結(jié)果:

pubfn recv(&self,deadline: Option)-> Result{// This code is essentially the exact same as that found in the stream// case (see stream.rs)matchself.try_recv(){Err(Empty)=>{}data=>returndata,}let(wait_token,signal_token)=blocking::tokens();ifself.decrement(signal_token)==Installed{ifletSome(deadline)=deadline{lettimed_out=!wait_token.wait_max_until(deadline);iftimed_out{self.abort_selection(false);}}else{wait_token.wait();}}matchself.try_recv(){data@Ok(..)=>unsafe{*self.steals.get()-=1;data},data=>data,}}

這里的邏輯是,前面的self.try_recv假如返回了數(shù)據(jù),那么直接返回數(shù)據(jù)即可。否則很可能channel為空,所以通過blocking::tokens()為Receiver準備阻塞相關(guān)的數(shù)據(jù),然后通過decrement方法再次判斷是否有數(shù)據(jù),從而進入阻塞狀態(tài),decrement代碼:

fn decrement(&self,token: SignalToken)-> StartResult{unsafe{assert_eq!(self.to_wake.load(Ordering::SeqCst),0);letptr=token.cast_to_usize();self.to_wake.store(ptr,Ordering::SeqCst);letsteals=ptr::replace(self.steals.get(),0);matchself.cnt.fetch_sub(1+steals,Ordering::SeqCst){DISCONNECTED=>{self.cnt.store(DISCONNECTED,Ordering::SeqCst);}n=>{assert!(n>=0);ifn-steals<=0{returnInstalled}}}self.to_wake.store(0,Ordering::SeqCst);drop(SignalToken::cast_from_usize(ptr));Abort}}

如上所示,將token: SignalToken的指針放入to_wake中,等待將來被喚醒。

所以這里通過self.cnt字段減除1+ steals來判斷隊列是否為空,原因在于這里的計數(shù)方式并不是每次pop一個數(shù)據(jù)就將cnt-1,也許是為了性能考慮,我們將pop的數(shù)據(jù)個數(shù)匯總在了steals字段中,然后等到steals足夠大或者發(fā)現(xiàn)channel為空了才去修改cnt的值。所以這里通過self.cnt - (1+ steals) 與 0 比較來判斷是否已有數(shù)據(jù),如果沒有則返回Installed,否則清理數(shù)據(jù)再返回Abort。

我們先看下Installed之后的邏輯:

ifself.decrement(signal_token)==Installed{ifletSome(deadline)=deadline{lettimed_out=!wait_token.wait_max_until(deadline);iftimed_out{self.abort_selection(false);}}else{wait_token.wait();}}

對于我們的情況它只是調(diào)用 wait_token.wait(),代碼為:

implWaitToken{pubfn wait(self){while!self.inner.woken.load(Ordering::SeqCst){thread::park()}}...........

先檢查woken再調(diào)用park(),注意這里是與之前Send的send操作相匹配的:

pubfn send(&self,t: T)-> Result{.............self.queue.push(t);matchself.cnt.fetch_add(1,Ordering::SeqCst){-1=>{self.take_to_wake().signal();}..........

我們看下相關(guān)的代碼:

fn take_to_wake(&self)-> SignalToken{letptr=self.to_wake.load(Ordering::SeqCst);self.to_wake.store(0,Ordering::SeqCst);assert!(ptr!=0);unsafe{SignalToken::cast_from_usize(ptr)}implSignalToken{pubfn signal(&self)-> bool {letwake=!self.inner.woken.compare_and_swap(false,true,Ordering::SeqCst);ifwake{self.inner.thread.unpark();}wake}....}

先設(shè)置woken再調(diào)用unpark()。如此一來確保等待的Receiver不會永遠睡眠。

我們再看下decrement返回Abort的情況:

pubfn recv(&self,deadline: Option)-> Result{matchself.try_recv(){Err(Empty)=>{}data=>returndata,}let(wait_token,signal_token)=blocking::tokens();ifself.decrement(signal_token)==Installed{.............}matchself.try_recv(){data@Ok(..)=>unsafe{*self.steals.get()-=1;data},data=>data,}}

只是再次調(diào)用self.try_recv()而已,至于這里為什么會有*self.steals.get()-=1的操作,那是要看try_recv操作本身了,它有一個默認steals+1的操作,但是這里的第二個self.try_recv()的計數(shù)已經(jīng)cnt匯總了,所以這個不需要steals+1,我們通過-1來平衡:

pubfn try_recv(&self)-> Result{letret=matchself.queue.pop(){mpsc::Data(t)=>Some(t),mpsc::Empty=>None,mpsc::Inconsistent=>{letdata;loop{thread::yield_now();matchself.queue.pop(){mpsc::Data(t)=>{data=t;break}mpsc::Empty=>panic!("inconsistent => empty"),mpsc::Inconsistent=>{}}}Some(data)}};matchret{Some(data)=>unsafe{if*self.steals.get()>MAX_STEALS{matchself.cnt.swap(0,Ordering::SeqCst){DISCONNECTED=>{self.cnt.store(DISCONNECTED,Ordering::SeqCst);}n=>{letm=cmp::min(n,*self.steals.get());*self.steals.get()-=m;self.bump(n-m);}}assert!(*self.steals.get()>=0);}*self.steals.get()+=1;Ok(data)},None=>{matchself.cnt.load(Ordering::SeqCst){nifn!=DISCONNECTED=>Err(Empty),_=>{matchself.queue.pop(){mpsc::Data(t)=>Ok(t),mpsc::Empty=>Err(Disconnected),// with no senders, an inconsistency is impossible.mpsc::Inconsistent=>unreachable!(),}}}}}}

從代碼中可以看到,如果pop()取得數(shù)據(jù)則直接返回;如果Empty則返回None,從而讓Receiver可以陷入等待;如果Inconsistent 則說明隊列處于push操作稍慢的不一致狀態(tài),我們的辦法就是通過thread::yield_now(),一直調(diào)用pop()直到返回數(shù)據(jù)或者None。

另外,的確是通過MAX_STEALS 這個字段先匯總steals的值:

matchret{Some(data)=>unsafe{if*self.steals.get()>MAX_STEALS{matchself.cnt.swap(0,Ordering::SeqCst){DISCONNECTED=>{self.cnt.store(DISCONNECTED,Ordering::SeqCst);}n=>{letm=cmp::min(n,*self.steals.get());*self.steals.get()-=m;self.bump(n-m);}}assert!(*self.steals.get()>=0);}*self.steals.get()+=1;Ok(data)},...............}

假如steals足夠大,大于MAX_STEALS 我們才通過與cnt比較,然后從cnt中減除它。

總結(jié)

以上是生活随笔為你收集整理的rust里mp5a4_Rust源码分析:channel内部mpsc队列的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。