日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

深入理解 GCD

發布時間:2023/12/20 编程问答 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 深入理解 GCD 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
深入理解 GCD

前言

首先提出一些問題:

  • dispatch_async?函數如何實現,分發到主隊列和全局隊列有什么區別,一定會新建線程執行任務么?
  • dispatch_sync?函數如何實現,為什么說 GCD 死鎖是隊列導致的而不是線程,死鎖不是操作系統的概念么?
  • 信號量是如何實現的,有哪些使用場景?
  • dispatch_group?的等待與通知、dispatch_once?如何實現?
  • dispatch_source?用來做定時器如何實現,有什么優點和用途?
  • dispatch_suspend?和?dispatch_resume?如何實現,隊列的的暫停和計時器的暫停有區別么?
  • 以上問題基本都是對 GCD 常用 API 的追問與思考,深入理解這些問題有助于更好地使用 GCD,比如以下代碼的執行結果是什么?

    - (void)viewDidLoad {[super viewDidLoad];dispatch_queue_t queue = dispatch_queue_create("com.bestswifter.queue", nil);dispatch_sync(queue, ^{NSLog(@"current thread = %@", [NSThread currentThread]);dispatch_sync(dispatch_get_main_queue(), ^{NSLog(@"current thread = %@", [NSThread currentThread]);});}); }

    以下內容為個人的學習總結,僅供參考,不一定適合新手入門。最好的學習方法還是自己下載一份源碼并仔細閱讀學習。

    文章主要分析了常見 API 的實現原理,因水平所限,不可避免的有理解錯誤的地方,歡迎指出。如果對具體分析不感興趣,可以直接跳到文章末尾的“總結”部分。

    知識儲備

    閱讀?GCD 源碼之前,需要了解一些相關知識,這樣才能在讀到源碼時不至于一臉懵逼,進而影響理解。

    DISPATCH_DECL

    GCD 中對變量的定義大多遵循如下格式:

    #define DISPATCH_DECL(name) typedef struct name##_s *name##_t

    比如說非常常見的?DISPATCH_DECL(dispatch_queue);,它的展開形式是:

    typedef struct dispatch_queue_s *dispatch_queue_t; 這行代碼定義了一個?dispatch_queue_t?類型的指針,指向一個?dispatch_queue_s?類型的結構體。

    TSD

    TSD(Thread-Specific Data) 表示線程私有數據。在 C++ 中,全局變量可以被所有線程訪問,局部變量只有函數內部可以訪問。而 TSD 的作用就是能夠在同一個線程的不同函數中被訪問。在不同線程中,雖然名字相同,但是獲取到的數據隨線程不同而不同。

    通常,我們可以利用 POSIX 庫提供的 API 來實現 TSD:

    int pthread_key_create(pthread_key_t *key, void (*destr_function) (void *)) 這個函數用來創建一個 key,在線程退出時會將 key 對應的數據傳入?destr_function?函數中進行清理。

    我們分別使用 get/set 方法來訪問/修改 key 對應的數據:

    int pthread_setspecific(pthread_key_t key, const void *pointer) void * pthread_getspecific(pthread_key_t key)

    在 GCD 中定義了六個 key,根據名字大概能猜出各自的含義:

    pthread_key_t dispatch_queue_key; pthread_key_t dispatch_sema4_key; pthread_key_t dispatch_cache_key; pthread_key_t dispatch_io_key; pthread_key_t dispatch_apply_key; pthread_key_t dispatch_bcounter_key;

    fastpath && slowpath

    這是定義在?internal.h?中的兩個宏:

    為了理解所謂的快路徑和慢路徑,我們需要先學習一點計算機基礎知識。比如這段非常簡單的代碼:

    #define fastpath(x) ((typeof(x))__builtin_expect((long)(x), ~0l)) #define slowpath(x) ((typeof(x))__builtin_expect((long)(x), 0l)) if (x)return 1; else return 39;

    由于計算機并非一次只讀取一條指令,而是讀取多條指令,所以在讀到 if 語句時也會把?return 1?讀取進來。如果 x 為 0,那么會重新讀取?return 39,重讀指令相對來說比較耗時。

    如過 x 有非常大的概率是 0,那么?return 1?這條指令每次不可避免的會被讀取,并且實際上幾乎沒有機會執行, 造成了不必要的指令重讀。當然,最簡單的優化就是:

    if (!x) return 39; else return 1;

    然而對程序員來說,每次都做這樣的判斷非常燒腦,而且容易出錯。于是 GCC 提供了一個內置函數?__builtin_expect:

    long __builtin_expect (long EXP, long C) 它的返回值就是整個函數的返回值,參數 C 代表預計的值,表示程序員知道 EXP 的值很可能就是 C。比如上文中的例子可以這樣寫: if (__builtin_expect(x, 0)) return 1; elsereturn 39;

    雖然寫法邏輯不變,但是編譯器會把匯編代碼優化成?if(!x)?的形式。

    因此,在蘋果定義的兩個宏中,fastpath(x)?依然返回 x,只是告訴編譯器 x 的值一般不為 0,從而編譯器可以進行優化。同理,slowpath(x)?表示 x 的值很可能為 0,希望編譯器進行優化。

    dispatch_queue_t

    以?dispatch_queue_create?的源碼為例:

    dispatch_queue_create(const char *label, dispatch_queue_attr_t attr) {// 省略 label 相關的操作 dispatch_queue_t dq;dq = _dispatch_alloc(DISPATCH_VTABLE(queue),sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_MIN_LABEL_SIZE -DISPATCH_QUEUE_CACHELINE_PAD + label_len + 1);_dispatch_queue_init(dq);if (fastpath(!attr)) {return dq;}if (fastpath(attr == DISPATCH_QUEUE_CONCURRENT)) {dq->dq_width = UINT32_MAX;dq->do_targetq = _dispatch_get_root_queue(0, false);} else {dispatch_debug_assert(!attr, "Invalid attribute");}return dq; }

    我們知道創建隊列時, attr 屬性有三個值可選,nil、DISPATCH_QUEUE_SERIAL(實際上就是 nil) 或?DISPATCH_QUEUE_CONCURRENT。第一個 if 判斷中,蘋果認為串行隊列,或者?NULL參數更常見,因此?!attr?的值很有可能不為 0,這與上文的結論一致。

    第二個判斷中,參數幾乎有只可能是?DISPATCH_QUEUE_CONCURRENT,因此?attr == DISPATCH_QUEUE_CONCURRENT?這個判斷機會不會為 0,依然與?fastpath?的作用一致。

    _dispatch_get_root_queue?會獲取一個全局隊列,它有兩個參數,分別表示優先級和是否支持 overcommit。一共有四個優先級,LOW、DEFAULT、HIGH?和?BACKGROUND,因此共有 8 個全局隊列。帶有 overcommit 的隊列表示每當有任務提交時,系統都會新開一個線程處理,這樣就不會造成某個線程過載(overcommit)。

    這 8 個全局隊列的序列號是 4-11,序列號為 1 的隊列是主隊列,2 是 manager 隊列,用來管理 GCD 內部的任務(比如下文介紹的定時器),3 這個序列號暫時沒有使用。隊列 的?dq_width?被設置為?UINT32_MAX,表示這些隊列不限制并發數。

    作為對比,在?_dispatch_queue_init?中,并發數限制為 1,也就是串行隊列的默認設置:

    static inline void _dispatch_queue_init(dispatch_queue_t dq) { dq->do_next = DISPATCH_OBJECT_LISTLESS;dq->do_targetq = _dispatch_get_root_queue(0, true);dq->dq_running = 0;dq->dq_width = 1; }

    注意這行代碼:?dq->do_targetq = _dispatch_get_root_queue(0, true);,它涉及到 GCD 隊列與 block 的一個重要模型,target_queue。向任何隊列中提交的 block,都會被放到它的目標隊列中執行,而普通串行隊列的目標隊列就是一個支持 overcommit 的全局隊列,全局隊列的底層則是一個線程池。

    借用?objc 的文章?中的圖片來表示:


    線程池與目標隊列

    dispatch_async

    直接上函數實現:

    dispatch_async(dispatch_queue_t queue, dispatch_block_t block) {dispatch_async_f(dq, _dispatch_Block_copy(work), _dispatch_call_block_and_release); }

    隊列其實就是一個用來提交 block 的對象,當 block 提交到隊列中后,將按照 “先入先出(FIFO)” 的順序進行處理。系統在 GCD 的底層會維護一個線程池,用來執行這些 block。

    block 參數的類型是?dispatch_block_t,它是一個沒有參數,沒有返回值的 block:

    typedef void (^dispatch_block_t)(void);

    dispatch_async?的函數很簡單,它將 block 復制了一份,然后調用另一個函數?dispatch_async_f:

    dispatch_async_f(dispatch_queue_t queue, void *context, dispatch_function_t work); work 參數是一個函數,在實際調用時,會把第二參數 context 作為參數傳入,以?_dispatch_call_block_and_release?為例: void _dispatch_call_block_and_release(void *block) {void (^b)(void) = block;b();Block_release(b); }

    省略各種分支后的?dispatch_async_f?函數實現如下:

    void dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func) {dispatch_continuation_t dc;if (dq->dq_width == 1) {return dispatch_barrier_async_f(dq, ctxt, func);}dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;dc->dc_func = func;dc->dc_ctxt = ctxt;if (dq->do_targetq) {return _dispatch_async_f2(dq, dc);}_dispatch_queue_push(dq, dc); }

    可見如果是串行隊列 (dq_width = 1),會調用?dispatch_barrier_async_f?函數處理,這個后文會有介紹。如果有?do_targetq?則進行轉發,否則調用?_dispatch_queue_push?入隊。

    這里的?dispatch_continuation_t?其實是對 block 的封裝,然后調用?_dispatch_queue_push?這個宏將封裝好的 block 放入隊列中。

    把這個宏展開,然后依次分析調用棧,選擇一條主干調用線,結果如下:

    _dispatch_queue_push └──_dispatch_trace_queue_push└──_dispatch_queue_push└──_dispatch_queue_push_slow└──_dispatch_queue_push_list_slow2└──_dispatch_wakeup└──dx_probe

    隊列中保存了一個鏈表,我們首先將新的 block 添加到鏈表尾部,然后調用?dx_probe宏,它依賴于 vtable 數據結構,GCD 中的大部分對象,比如隊列等,都具有這個數據結構。它定義了對象在不同操作下該執行的方法,比如在這里的?probe?操作下,實際上會執行?_dispatch_queue_wakeup_global?方法,調用棧如下

    _dispatch_queue_wakeup_global └──_dispatch_queue_wakeup_global2└──_dispatch_queue_wakeup_global_slow

    在?_dispatch_queue_wakeup_global_slow?我們見到了熟悉的老朋友,pthread 線程:

    static void _dispatch_queue_wakeup_global_slow(dispatch_queue_t dq, unsigned int n) {// 如果線程池已滿,則直接調用 _dispatch_worker_thread // 否則創建線程池 pthread_t pthr;while ((r = pthread_create(&pthr, NULL, _dispatch_worker_thread, dq))) {if (r != EAGAIN) {(void)dispatch_assume_zero(r);}sleep(1);}r = pthread_detach(pthr);(void)dispatch_assume_zero(r); }

    由此可見這里確實使用了線程池。創建線程后會執行?_dispatch_worker_thread?回調:

    _dispatch_worker_thread └──_dispatch_worker_thread4└──_dispatch_continuation_pop

    在 pop 函數中,我們拿到了最早加入的任務,然后執行:

    static inline void _dispatch_continuation_pop(dispatch_object_t dou) {// ..._dispatch_client_callout(dc->dc_ctxt, dc->dc_func);if (dg) {dispatch_group_leave(dg);_dispatch_release(dg);} }

    dispatch_async?的實現比較復雜,主要是因為其中的數據結構較多,分支流程控制比較復雜。但思路其實很簡單,用鏈表保存所有提交的 block,然后在底層線程池中,依次取出 block 并執行。

    如果熟悉了相關數據結構和調用流程,接下來研究 GCD 的其他 API 就比較輕松了。

    dispatch_sync

    同步方法的實現相對來說和異步類似,而且更簡單,調用棧如下:

    dispatch_sync └──dispatch_sync_f└──_dispatch_sync_f2└──_dispatch_sync_f_slow static void _dispatch_sync_f_slow(dispatch_queue_t dq, void *ctxt, dispatch_function_t func) {_dispatch_thread_semaphore_t sema = _dispatch_get_thread_semaphore();struct dispatch_sync_slow_s {DISPATCH_CONTINUATION_HEADER(sync_slow);} dss = {.do_vtable = (void*)DISPATCH_OBJ_SYNC_SLOW_BIT,.dc_ctxt = (void*)sema,};_dispatch_queue_push(dq, (void *)&dss);_dispatch_thread_semaphore_wait(sema);_dispatch_put_thread_semaphore(sema);// ... }

    這里利用了線程專屬信號量,保證了每次只有一個 block 被執行。

    這條調用棧有多個分支,如果向當前串行隊列提交任務就會走到上述分支,導致死鎖。如果是向其它串行隊列提交 block,則會利用原子性操作來實現,因此不會有死鎖問題。

    dispatch_semaphore

    關于信號量的 API 不多,主要是三個,create、wait?和?signal。

    信號量在初始化時要指定 value,隨后內部將這個 value 存儲起來。實際操作時會存兩個 value,一個是當前的 value,一個是記錄初始 value。

    信號的?wait?和?signal?是互逆的兩個操作。如果 value 大于 0,前者將 value 減一,此時如果 value 小于零就一直等待。

    初始 value 必須大于等于 0,如果為 0 并隨后調用?wait?方法,線程將被阻塞直到別的線程調用了?signal?方法。

    dispatch_semaphore_wait

    首先從這個函數的源碼看起:

    long dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout) {long value = dispatch_atomic_dec2o(dsema, dsema_value);dispatch_atomic_acquire_barrier();if (fastpath(value >= 0)) {return 0;}return _dispatch_semaphore_wait_slow(dsema, timeout); }

    第一行的?dispatch_atomic_dec2o?是一個宏,會調用 GCC 內置的函數?__sync_sub_and_fetch,實現減法的原子性操作。因此這一行的意思是將 dsema 的值減一,并把新的值賦給 value。

    如果減一后的 value 大于等于 0 就立刻返回,沒有任何操作,否則進入等待狀態。

    _dispatch_semaphore_wait_slow?函數針對不同的 timeout 參數,分了三種情況考慮:

    case DISPATCH_TIME_NOW:while ((orig = dsema->dsema_value) < 0) {if (dispatch_atomic_cmpxchg2o(dsema, dsema_value, orig, orig + 1)) {return KERN_OPERATION_TIMED_OUT;}}

    這種情況下會立刻判斷?dsema->dsema_value?與?orig?是否相等。如果 while 判斷成立,內部的 if 判斷一定也成立,此時會將 value 加一(也就是變為 0) 并返回。加一的原因是為了抵消?wait?函數一開始的減一操作。此時函數調用方會得到返回值?KERN_OPERATION_TIMED_OUT,表示由于等待時間超時而返回。

    實際上 while 判斷一定會成立,因為如果 value 大于等于 0,在上一個函數?dispatch_semaphore_wait?中就已經返回了。

    第二種情況是?DISPATCH_TIME_FOREVER?這個 case:

    case DISPATCH_TIME_FOREVER:do {kr = semaphore_wait(dsema->dsema_port);} while (kr == KERN_ABORTED);break;

    進入 do-while 循環后會調用系統的?semaphore_wait?方法,KERN_ABORTED?表示調用者被一個與信號量系統無關的原因喚醒。因此一旦發生這種情況,還是要繼續等待,直到收到?signal?調用。

    在其他情況下(default 分支),我們指定一個超時時間,這和?DISPATCH_TIME_FOREVER?的處理比較類似,不同的是我們調用了內核提供的?semaphore_timedwait?方法可以指定超時時間。

    整個函數的框架如下:

    static long _dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema, dispatch_time_t timeout) { again:while ((orig = dsema->dsema_sent_ksignals)) {if (dispatch_atomic_cmpxchg2o(dsema, dsema_sent_ksignals, orig,orig - 1)) {return 0;}}switch (timeout) {default: /* semaphore_timedwait */case DISPATCH_TIME_NOW: /* KERN_OPERATION_TIMED_OUT */case DISPATCH_TIME_FOREVER: /* semaphore_wait */}goto again; }

    可見信號量被喚醒后,會回到最開始的地方,進入 while 循環。這個判斷條件一般都會成立,極端情況下由于內核存在 bug,導致?orig?和?dsema_sent_ksignals?不相等,也就是收到虛假?signal?信號時會忽略。

    進入 while 循環后,if 判斷一定成立,因此返回 0,正如文檔所說,返回 0 表示成功,否則表示超時。

    dispatch_semaphore_signal

    這個函數的實現相對來說比較簡單,因為它不需要阻塞,只用喚醒。簡化版源碼如下:

    long dispatch_semaphore_signal(dispatch_semaphore_t dsema) {long value = dispatch_atomic_inc2o(dsema, dsema_value);if (fastpath(value > 0)) {return 0;}return _dispatch_semaphore_signal_slow(dsema); }

    首先會調用原子方法讓 value 加一,如果大于零就立刻返回 0,否則返回?_dispatch_semaphore_signal_slow:

    long _dispatch_semaphore_signal_slow(dispatch_semaphore_t dsema) {(void)dispatch_atomic_inc2o(dsema, dsema_sent_ksignals);_dispatch_semaphore_create_port(&dsema->dsema_port);kern_return_t kr = semaphore_signal(dsema->dsema_port);return 1; }

    它的作用僅僅是調用內核的?semaphore_signal?函數喚醒信號量,然后返回 1。這也符合文檔中的描述:“如果喚醒了線程,返回非 0,否則返回 0”。

    dispatch_group

    有了上面的鋪墊,group 是一個非常容易理解的概念,我們先看看如何創建 group:

    dispatch_group_t dispatch_group_create(void) {dispatch_group_t dg = _dispatch_alloc(DISPATCH_VTABLE(group), sizeof(struct dispatch_semaphore_s));_dispatch_semaphore_init(LONG_MAX, dg);return dg; }

    沒錯,group 就是一個 value 為?LONG_MAX?的信號量。

    dispatch_group_async

    它僅僅是?dispatch_group_async_f?的封裝:

    void dispatch_group_async_f(dispatch_group_t dg, dispatch_queue_t dq, void *ctxt, dispatch_function_t func) {dispatch_continuation_t dc;dispatch_group_enter(dg);dc = _dispatch_continuation_alloc();dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_GROUP_BIT);dc->dc_func = func;dc->dc_ctxt = ctxt;dc->dc_data = dg;_dispatch_queue_push(dq, dc); }

    這個函數和?dispatch_async_f?的實現高度一致,主要的不同在于調用了?dispatch_group_enter?方法:

    這個方法也沒做什么,就是調用?wait?方法讓信號量的 value 減一而已。

    void dispatch_group_enter(dispatch_group_t dg) {dispatch_semaphore_t dsema = (dispatch_semaphore_t)dg;(void)dispatch_semaphore_wait(dsema, DISPATCH_TIME_FOREVER); }

    dispatch_group_wait

    這個方法用于等待 group 中所有任務執行完成,可以理解為信號量 wait 的封裝:

    long dispatch_group_wait(dispatch_group_t dg, dispatch_time_t timeout) {dispatch_semaphore_t dsema = (dispatch_semaphore_t)dg;if (dsema->dsema_value == dsema->dsema_orig) {return 0;}if (timeout == 0) {return KERN_OPERATION_TIMED_OUT;}return _dispatch_group_wait_slow(dsema, timeout); }

    如果當前 value 和原始 value 相同,表明任務已經全部完成,直接返回 0,如果 timeout 為 0 也會立刻返回,否則調用?_dispatch_group_wait_slow。這個方法等等待部分和?_dispatch_semaphore_signal_slow?幾乎一致,區別在于等待結束后它不是 return,而是調用?_dispatch_group_wake?去喚醒這個 group。

    static long _dispatch_group_wait_slow(dispatch_semaphore_t dsema, dispatch_time_t timeout) { again:_dispatch_group_wake(dsema);switch (timeout) {/* 三種情況分類 */} goto again; }

    這里我們暫時跳過?_dispatch_group_wake,后面會有詳細分析。只要知道這個函數在 group 中所有事件執行完后會被調用即可。

    dispatch_group_notify

    老習慣,這個函數僅僅是封裝了?dispatch_group_notify_f:

    void dispatch_group_notify_f(dispatch_group_t dg, dispatch_queue_t dq, void *ctxt, void (*func)(void *)) {dispatch_semaphore_t dsema = (dispatch_semaphore_t)dg;struct dispatch_sema_notify_s *dsn, *prev;dsn->dsn_queue = dq;dsn->dsn_ctxt = ctxt;dsn->dsn_func = func;prev = dispatch_atomic_xchg2o(dsema, dsema_notify_tail, dsn);if (fastpath(prev)) {prev->dsn_next = dsn;} else {/* ... */} }

    這種結構的代碼我們已經遇到多次了,它其實就是在鏈表的尾部續上新的元素。所以 notify 方法并沒有做過多的處理,只是是用鏈表把所有回調通知保存起來,等待調用。

    dispatch_group_leave

    在介紹?dispatch_async?函數時,我們看到任務在被執行時,還會調用?dispatch_group_leave?函數:

    void dispatch_group_leave(dispatch_group_t dg) {dispatch_semaphore_t dsema = (dispatch_semaphore_t)dg;long value = dispatch_atomic_inc2o(dsema, dsema_value);if (slowpath(value == dsema->dsema_orig)) {(void)_dispatch_group_wake(dsema);} }

    當 group 的 value 變為初始值時,表示所有任務都已執行完,開始調用?_dispatch_group_wake?處理回調。

    _dispatch_group_wake

    static long _dispatch_group_wake(dispatch_semaphore_t dsema) {struct dispatch_sema_notify_s *next, *head, *tail = NULL;long rval;head = dispatch_atomic_xchg2o(dsema, dsema_notify_head, NULL);if (head) {tail = dispatch_atomic_xchg2o(dsema, dsema_notify_tail, NULL);}rval = dispatch_atomic_xchg2o(dsema, dsema_group_waiters, 0);if (rval) {_dispatch_semaphore_create_port(&dsema->dsema_waiter_port);do {kern_return_t kr = semaphore_signal(dsema->dsema_waiter_port);} while (--rval);}
    if (head) { // async group notify blocks do { dispatch_async_f(head->dsn_queue, head->dsn_ctxt, head->dsn_func); next = fastpath(head->dsn_next); if (!next && head != tail) { while (!(next = fastpath(head->dsn_next))) { _dispatch_hardware_pause(); } } free(head); } while ((head = next)); } return 0; } ?

    這個函數主要分為兩部分,首先循環調用?semaphore_signal?告知喚醒當初等待 group 的信號量,因此?dispatch_group_wait?函數得以返回。

    然后獲取鏈表,依次調用?dispatch_async_f?異步執行在 notify 函數中注冊的回調。

    dispatch_once

    dispatch_once?僅僅是一個包裝,內部直接調用了?dispatch_once_f:

    void dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func) {struct _dispatch_once_waiter_s * volatile *vval = (struct _dispatch_once_waiter_s**)val;struct _dispatch_once_waiter_s dow = { NULL, 0 };struct _dispatch_once_waiter_s *tail, *tmp;_dispatch_thread_semaphore_t sema;if (dispatch_atomic_cmpxchg(vval, NULL, &dow)) {_dispatch_client_callout(ctxt, func);tmp = dispatch_atomic_xchg(vval, DISPATCH_ONCE_DONE);tail = &dow;while (tail != tmp) {while (!tmp->dow_next) {_dispatch_hardware_pause();}
    sema = tmp->dow_sema; tmp = (struct _dispatch_once_waiter_s*)tmp->dow_next; _dispatch_thread_semaphore_signal(sema); } } else { dow.dow_sema = _dispatch_get_thread_semaphore(); for (;;) { tmp = *vval; if (tmp == DISPATCH_ONCE_DONE) { break; } dispatch_atomic_store_barrier(); if (dispatch_atomic_cmpxchg(vval, tmp, &dow)) { dow.dow_next = tmp; _dispatch_thread_semaphore_wait(dow.dow_sema); } } _dispatch_put_thread_semaphore(dow.dow_sema); } } ? 這段代碼比較長,我們考慮三個場景:
  • 第一次調用: 此時外部傳進來的 onceToken 還是空指針,所以 vval 為 NULL,if 判斷成立。首先執行 block,然后讓將 vval 的值設為?DISPATCH_ONCE_DONE?表示任務已經完成,同時用 tmp 保存先前的 vval。此時,dow 也為空,因此 while 判斷不成立,代碼執行結束。
  • 同一線程第二次調用: 由于 vval 已經變成了?DISPATCH_ONCE_DONE,因此 if 判斷不成立,進入 else 分支的 for 循環。由于 tmp 就是?DISPATCH_ONCE_DONE,所以循環退出,沒有做任何事。
  • 多個線程同時調用: 由于 if 判斷中是一個原子性操作,所以必然只有一個線程能進入 if 分支,其他的進入 else 分支。由于其他線程在調用函數時,vval 還不是?DISPATCH_ONCE_DONE,所以進入到 for 循環的后半部分。這里構造了一個鏈表,鏈表的每個節點上都調用了信號量的?wait?方法并阻塞,而在 if 分支中,則會依次遍歷所有的節點并調用?signal?方法,喚醒所有等待中的信號量。
  • dispatch_barrier_async

    它調用了?dispatch_barrier_async_f?函數,實現原理也和?dispatch_async_f?類似:

    void dispatch_barrier_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func) {dispatch_continuation_t dc;dc = fastpath(_dispatch_continuation_alloc_cacheonly());dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);dc->dc_func = func;dc->dc_ctxt = ctxt;_dispatch_queue_push(dq, dc); }

    區別在于?do_vtable?被設置了兩個標志位,多了一個?DISPATCH_OBJ_BARRIER_BIT?標記。這個標記在從隊列中取出任務時被用到:

    static _dispatch_thread_semaphore_t _dispatch_queue_drain(dispatch_queue_t dq) {while (dq->dq_items_tail) {/* ... */if (!DISPATCH_OBJ_IS_VTABLE(dc) && (long)dc->do_vtable & DISPATCH_OBJ_BARRIER_BIT) {if (dq->dq_running > 1) {goto out;}} else {_dispatch_continuation_redirect(dq, dc);continue;}} out: /* 不完整的 drain,需要清理現場 */return sema; // 返回空的信號量 } 這里原來是一個循環,會拿出所有的任務,依次調用?_dispatch_continuation_redirect,最終并行處理。一旦遇到?DISPATCH_OBJ_BARRIER_BIT這個標記,就會終止循環。

    在?out?標簽后面,返回了一個空的信號量,隨后方法的調用者會把它單獨放入隊列,等待下一次執行:

    void _dispatch_queue_invoke(dispatch_queue_t dq) {_dispatch_thread_semaphore_t sema = _dispatch_queue_drain(dq);if (sema) {_dispatch_thread_semaphore_signal(sema);} else if (tq) {return _dispatch_queue_push(tq, dq);} }

    因此 barrier 方法能等待此前所有任務執行完以后執行?_dispatch_queue_push,同時保證自己執行完以后才執行后續的操作。

    dispatch_source

    source 是一種資源,類似于 生產者/消費者模式中的生產者,而隊列則是消費者。當有新的資源(source) 產生時,他們被放到對應的隊列上被執行(消費)。

    dispatch_source?最常見的用途之一就是用來實現定時器,舉一個小例子:

    dispatch_source_t timer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, queue); dispatch_source_set_timer(timer, dispatch_walltime(NULL, 0), 10*NSEC_PER_SEC, 1*NSEC_PER_SEC); //每10秒觸發timer,誤差1秒 dispatch_source_set_event_handler(timer, ^{// 定時器觸發時執行的 block }); dispatch_resume(timer);

    使用 GCD Timer 的好處在于不依賴 runloop,因此任何線程都可以使用。由于使用了 block,不會忘記避免循環引用。此外,定時器可以自由控制精度,隨時修改間隔時間等。

    dispatch_source_create

    下面從底層源碼的角度來研究這幾行代碼的作用。首先是?dispatch_source_create?函數,它和之前見到的 create 函數都差不多,對 dispatch_source_t 對象做了一些初始化工作:

    dispatch_source_t ds = NULL; ds = _dispatch_alloc(DISPATCH_VTABLE(source), sizeof(struct dispatch_source_s)); _dispatch_queue_init((dispatch_queue_t)ds); ds->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_INTERVAL; ds->do_targetq = &_dispatch_mgr_q; dispatch_set_target_queue(ds, q); return ds;

    這里涉及到兩個隊列,其中 q 是用戶指定的隊列,表示事件觸發的回調在哪個隊列執行。而?_dispatch_mgr_q?則表示由哪個隊列來管理這個 source,mgr 是 manager 的縮寫,也是上文提到的序列號為 2 的內部隊列。

    dispatch_source_set_timer

    在這個函數中,首先會有參數處理,過濾掉不符合要求的參數。隨后創建了?dispatch_set_timer_params?類型的指針 params:

    struct dispatch_set_timer_params {dispatch_source_t ds;uintptr_t ident;struct dispatch_timer_source_s values; };

    這個 params 負責綁定定時器對象與他的參數(存儲在?valus?屬性中),最后調用:

    dispatch_barrier_async_f((dispatch_queue_t)ds, params, _dispatch_source_set_timer2);

    這里是把 source 當做隊列來使用,因此實際上是調用了?_dispatch_source_set_timer2(params)?方法:

    static void _dispatch_source_set_timer2(void *context) {// Called on the source queuestruct dispatch_set_timer_params *params = context;dispatch_suspend(params->ds);dispatch_barrier_async_f(&_dispatch_mgr_q, params,_dispatch_source_set_timer3); }

    這里首先暫停了隊列,避免了修改的過程中定時器被觸發。然后在 manager 隊列上執行?_dispatch_source_set_timer3(params):

    static void _dispatch_source_set_timer3(void *context) {struct dispatch_set_timer_params *params = context;dispatch_source_t ds = params->ds;// ... _dispatch_timer_list_update(ds);dispatch_resume(ds); }

    _dispatch_timer_list_update?函數的作用是根據下一次觸發時間將 timer 排序。

    接下來,當初分發到 manager 隊列的 block 將要被執行,走到?_dispatch_mgr_invoke?函數,其中有如下代碼:

    timeoutp = _dispatch_get_next_timer_fire(&timeout); r = select(FD_SETSIZE, &tmp_rfds, &tmp_wfds, NULL, sel_timeoutp);

    可見 GCD 的定時器是由系統的 select 方法實現的。

    當內層的 manager 隊列被喚醒后,還會進一步喚醒外層的隊列(當初用戶指定的那個),并在隊列上執行 timer 觸發時的 block。

    dispatch_resume/suspend

    GCD 對象的暫停和恢復由?do_suspend_cnt?決定,暫停時通過原子操作將改屬性的值加 2,對應的在恢復時通過原子操作將該屬性減二。

    它有兩個默認值:

    #define DISPATCH_OBJECT_SUSPEND_LOCK 1u #define DISPATCH_OBJECT_SUSPEND_INTERVAL 2u

    在喚醒隊列時有如下代碼:

    void _dispatch_queue_invoke(dispatch_queue_t dq) {if (!dispatch_atomic_sub2o(dq, do_suspend_cnt, DISPATCH_OBJECT_SUSPEND_LOCK)) {if (dq->dq_running == 0) {_dispatch_wakeup(dq); // verify that the queue is idle }} }

    可見能夠喚醒隊列的前提是?dq->do_suspend_cnt - 1 = 0,也就是要求?do_suspend_cnt的值就是?DISPATCH_OBJECT_SUSPEND_LOCK。

    觀察 8 個全局隊列和主隊列的定義就會發現,他們的?do_suspend_cnt?值確實為?DISPATCH_OBJECT_SUSPEND_LOCK,因此默認處于啟動狀態。

    而?dispatch_source?的 create 方法中,do_suspend_cnt?的初始值為?DISPATCH_OBJECT_SUSPEND_INTERVAL,因此默認處于暫停狀態,需要手動開啟。

    dispatch_after

    dispatch_after?其實依賴于定時器的實現,函數內部調用了?dispatch_after_f:

    void dispatch_after_f(dispatch_time_t when, dispatch_queue_t queue, void *ctxt, dispatch_function_t func) {uint64_t delta;struct _dispatch_after_time_s *datc = NULL;dispatch_source_t ds;// 如果延遲為 0,直接調用 dispatch_asyncdelta = _dispatch_timeout(when);if (delta == 0) {return dispatch_async_f(queue, ctxt, func);}ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, queue);dispatch_assert(ds);datc = malloc(sizeof(*datc));dispatch_assert(datc);datc->datc_ctxt = ctxt;datc->datc_func = func;datc->ds = ds; dispatch_set_context(ds, datc); dispatch_source_set_event_handler_f(ds, _dispatch_after_timer_callback); dispatch_source_set_timer(ds, when, DISPATCH_TIME_FOREVER, 0); dispatch_resume(ds); }

    首先將延遲執行的 block 封裝在?_dispatch_after_time_s?這個結構體中,并且作為上下文,與 timer 綁定,然后啟動 timer。

    到時以后,執行?_dispatch_after_timer_callback?回調,并取出上下文中的 block:

    static void _dispatch_after_timer_callback(void *ctxt) {struct _dispatch_after_time_s *datc = ctxt;_dispatch_client_callout(datc->datc_ctxt, datc->datc_func);// 清理工作 }

    總結

    本文主要整理了 GCD 中常見的 API 以及底層的實現原理。對于隊列來說,需要理解它的數據結構,轉發機制,以及底層的線程池模型。

    dispatch_async?會把任務添加到隊列的一個鏈表中,添加完后會喚醒隊列,根據 vtable 中的函數指針,調用 wakeup 方法。在 wakeup 方法中,從線程池里取出工作線程(如果沒有就新建),然后在工作線程中取出鏈表頭部指向的 block 并執行。

    dispatch_sync?的實現略簡單一些,它不涉及線程池(因此一般都在當前線程執行),而是利用與線程綁定的信號量來實現串行。

    分發到不同隊列時,代碼進入的分支也不一樣,比如?dispatch_async?到主隊列的任務由 runloop 處理,而分發到其他隊列的任務由線程池處理。

    在當前串行隊列中執行?dispatch_sync?時,由于?dq_running?屬性(表示在運行的任務數量) 為 1,所以以下判斷成立:

    if (slowpath(!dispatch_atomic_cmpxchg2o(dq, dq_running, 0, 1))) {return _dispatch_barrier_sync_f_slow(dq, ctxt, func); }

    在?_dispatch_barrier_sync_f_slow?函數中使用了線程對應的信號量并且調用?wait?方法,從而導致線程死鎖。

    如果向其它隊列同步提交 block,最終進入?_dispatch_barrier_sync_f_invoke,它只是保證了 block 執行的原子性,但沒有使用線程對應的信號量。

    對于信號量來說,它主要使用?signal?和?wait?這兩個接口,底層分別調用了內核提供的方法。在調用?signal?方法后,先將 value 減一,如果大于零立刻返回,否則陷入等待。signal?方法將信號量加一,如果 value 大于零立刻返回,否則說明喚醒了某一個等待線程,此時由系統決定哪個線程的等待方法可以返回。

    dispatch_group?的本質就是一個 value 非常大的信號量,等待 group 完成實際上就是等待 value 恢復初始值。而 notify 的作用是將所有注冊的回調組裝成一個鏈表,在?dispatch_async?完成時判斷 value 是不是恢復初始值,如果是則調用?dispatch_async?異步執行所有注冊的回調。

    dispatch_once?通過一個靜態變量來標記 block 是否已被執行,同時使用信號量確保只有一個線程能執行,執行完 block 后會喚醒其他所有等待的線程。

    dispatch_barrier_async?改變了 block 的?vtable?標記位,當它將要被取出執行時,會等待前面的 block 都執行完,然后在下一次循環中被執行。

    dispatch_source?可以用來實現定時器。所有的 source 會被提交到用戶指定的隊列,然后提交到 manager 隊列中,按照觸發時間排好序。隨后找到最近觸發的定時器,調用內核的?select?方法等待。等待結束后,依次喚醒 manager 隊列和用戶指定隊列,最終觸發一開始設置的回調 block。

    GCD 中的對象用?do_suspend_cnt?來表示是否暫停。隊列默認處于啟動狀態,而?dispatch_source?需要手動啟動。

    dispatch_after?函數依賴于?dispatch_source?定時器,它只是注冊了一個定時器,然后在回調函數中執行 block。

    總結

    以上是生活随笔為你收集整理的深入理解 GCD的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。