GCD源碼原理分析

image.png

Machapi

Mach是XNU的核心,被BSD層包裝。XNU由如下幾個組件組成:數組

  • MACH內核bash

    • 進程和線程抽象
  • 虛擬內存管理網絡

  • 任務調度數據結構

  • 進程間通訊和消息傳遞機制閉包

  • BSD架構

    • UNIX進程模型
    • POSIX線程模型
    • UNIX用戶與組
    • 網絡協議棧
    • 文件系統訪問
    • 設備訪問
  • libKernapp

  • I/O Kit異步

Mach的獨特之處在於選擇了經過消息傳遞的方式實現對象與對象之間的通訊。而其餘架構一個對象要訪問另外一個對象須要經過一個你們都知道的接口,而Mach對象不能直接調用另外一個對象,而是必須傳遞消息。async

一條消息就像網絡包同樣,定義爲透明的blob(binary larger object,二進制大對象),經過固定的包頭進行分裝

typedef struct
{
        mach_msg_header_t       header;
        mach_msg_body_t         body;
} mach_msg_base_t;

typedef struct 
{
  mach_msg_bits_t   msgh_bits; // 消息頭標誌位
  mach_msg_size_t   msgh_size; // 大小
  mach_port_t       msgh_remote_port; // 目標(發消息)或源(接消息)
  mach_port_t       msgh_local_port; // 源(發消息)或目標(接消息)
  mach_port_name_t  msgh_voucher_port;
  mach_msg_id_t     msgh_id; // 惟一id
} mach_msg_header_t;
複製代碼

Mach消息的發送和接收都是經過同一個API函數mach_msg()進行的。這個函數在用戶態和內核態都有實現。爲了實現消息的發送和接收,mach_msg()函數調用了一個Mach陷阱(trap)。Mach陷阱就是Mach中和系統調用等同的概念。在用戶態調用mach_msg_trap()會引起陷阱機制,切換到內核態,在內核態中,內核實現的mach_msg()會完成實際的工做。這個函數也將會在下面的源碼分析中遇到。

每個BSD進程都在底層關聯一個Mach任務對象,由於Mach提供的都是很是底層的抽象,提供的API從設計上講很基礎且不完整,因此須要在這之上提供一個更高的層次以實現完整的功能。咱們開發層遇到的進程和線程就是BSD層對Mach的任務和線程的複雜包裝。

進程填充的是線程,而線程是二進制代碼的實際執行單元。用戶態的線程始於對pthread_create的調用。這個函數的又由bsdthread_create系統調用完成,而bsdthread_create又實際上是Mach中的thread_create的複雜包裝,說到底真正的線程建立仍是有Mach層完成。

UNIX中,進程不能被建立出來,都是經過fork()系統調用複製出來的。複製出來的進程都會被要加載的執行程序覆蓋整個內存空間。 接着,瞭解下經常使用的宏和經常使用的數據結構體。

源碼中常見的宏

1. __builtin_expect 這個實際上是個函數,針對編譯器優化的一個函數,後面幾個宏是對這個函數的封裝,因此提早拎出來講一下。寫代碼中咱們常常會遇到條件判斷語句

if(今天是工做日) {
    printf("好好上班");
}else{
    printf("好好睡覺");
}
複製代碼

CPU讀取指令的時候並不是一條一條的來讀,而是多條一塊兒加載進來,好比已經加載了if(今天是工做日) printf(「好好上班」);的指令,這時候條件式若是爲非,也就是非工做日,那麼CPU繼續把printf(「好好睡覺」);這條指令加載進來,這樣就形成了性能浪費的現象。 __builtin_expect的第一個參數是實際值,第二個參數是預測值。使用這個目的是告訴編譯器if條件式是否是有更大的可能被知足。

2. likely和unlikely

解開這個宏後實際上是對__builtin_expect封裝,likely表示更大可能成立,unlikely表示更大可能不成立。

#define likely(x) __builtin_expect(!!(x), 1)
#define unlikely(x) __builtin_expect(!!(x), 0)
複製代碼

遇到這樣的,if(likely(a == 0))理解成if(a==0)便可,unlikely也是一樣的。

3. fastpath和slowpath

跟上面也是差很少的,fastpath表示更大可能成立,slowpath表示更大可能不成立

#define fastpath(x) ((typeof(x))__builtin_expect(_safe_cast_to_long(x), ~0l))
#define slowpath(x) ((typeof(x))__builtin_expect(_safe_cast_to_long(x), 0l))
複製代碼

這兩個理解起來跟likely和unlikely同樣,只須要關注裏面的條件式是否知足便可。

####4. os_atomic_cmpxchg

其內部就是atomic_compare_exchange_strong_explicit函數,這個函數的做用是:第二個參數與第一個參數值比較,若是相等,第三個參數的值替換第一個參數的值。若是不相等,把第一個參數的值賦值到第二個參數上。

#define os_atomic_cmpxchg(p, e, v, m) \
        ({ _os_atomic_basetypeof(p) _r = (e); \
        atomic_compare_exchange_strong_explicit(_os_atomic_c11_atomic(p), \
        &_r, v, memory_order_##m, memory_order_relaxed); })
複製代碼

5. os_atomic_store2o

將第二個參數,保存到第一個參數

#define os_atomic_store2o(p, f, v, m) os_atomic_store(&(p)->f, (v), m)
#define os_atomic_store(p, v, m) \
      atomic_store_explicit(_os_atomic_c11_atomic(p), v, memory_order_##m)
複製代碼

####6. os_atomic_inc_orig

將1保存到第一個參數中
#define os_atomic_inc_orig(p, m) os_atomic_add_orig((p), 1, m)
#define os_atomic_add_orig(p, v, m) _os_atomic_c11_op_orig((p), (v), m, add, +)
#define _os_atomic_c11_op_orig(p, v, m, o, op) \
        atomic_fetch_##o##_explicit(_os_atomic_c11_atomic(p), v, \
        memory_order_##m)
複製代碼

####數據結構體

接着,瞭解一些經常使用數據結構體。

1. dispatch_queue_t

typedef struct dispatch_queue_s *dispatch_queue_t;

  咱們看下dispatch_queue_s怎麼定義的。發現其內部有個_DISPATCH_QUEUE_HEADER宏定義。

struct dispatch_queue_s {
    _DISPATCH_QUEUE_HEADER(queue);
    DISPATCH_QUEUE_CACHELINE_PADDING; 
} DISPATCH_ATOMIC64_ALIGN;

  解開_DISPATCH_QUEUE_HEADER後發現又一個DISPATCH_OBJECT_HEADER宏定義,繼續拆解

#define _DISPATCH_QUEUE_HEADER(x) \
    struct os_mpsc_queue_s _as_oq[0]; \
    DISPATCH_OBJECT_HEADER(x); \
    _OS_MPSC_QUEUE_FIELDS(dq, dq_state); \
    uint32_t dq_side_suspend_cnt; \
    dispatch_unfair_lock_s dq_sidelock; \
    union { \
        dispatch_queue_t dq_specific_q; \
        struct dispatch_source_refs_s *ds_refs; \
        struct dispatch_timer_source_refs_s *ds_timer_refs; \
        struct dispatch_mach_recv_refs_s *dm_recv_refs; \
    }; \
    DISPATCH_UNION_LE(uint32_t volatile dq_atomic_flags, \
        const uint16_t dq_width, \
        const uint16_t __dq_opaque \
    ); \
    DISPATCH_INTROSPECTION_QUEUE_HEADER

  還有一層宏_DISPATCH_OBJECT_HEADER

#define DISPATCH_OBJECT_HEADER(x) \
    struct dispatch_object_s _as_do[0]; \
    _DISPATCH_OBJECT_HEADER(x)
複製代碼

不熟悉##的做用的同窗,這裏先說明下這個做用就拼接成字符串,好比x爲group的話,下面就會拼接爲dispatch_group這樣的。

#define _DISPATCH_OBJECT_HEADER(x) \
    struct _os_object_s _as_os_obj[0]; \
    OS_OBJECT_STRUCT_HEADER(dispatch_##x); \
    struct dispatch_##x##_s *volatile do_next; \
    struct dispatch_queue_s *do_targetq; \
    void *do_ctxt; \
    void *do_finalizer
複製代碼

來到OS_OBJECT_STRUCT_HEADER以後,咱們須要注意一個成員變量,記住這個成員變量名字叫作do_vtable。再繼續拆解_OS_OBJECT_HEADER發現裏面起就是一個isa指針和引用計數一些信息。

#define OS_OBJECT_STRUCT_HEADER(x) \
    _OS_OBJECT_HEADER(\
    const void *_objc_isa, \
    do_ref_cnt, \
    do_xref_cnt); \
    // 注意這個成員變量,後面將任務Push到隊列就是經過這個變量
    const struct x##_vtable_s *do_vtable

#define _OS_OBJECT_HEADER(isa, ref_cnt, xref_cnt) \
        isa; /* must be pointer-sized */ \
        int volatile ref_cnt; \
        int volatile xref_cnt
複製代碼

####2. dispatch_continuation_t

說到這個結構體,若是沒看過源碼的話,確定對這個結構體很陌生,由於對外的api裏面沒有跟continuation有關的。因此這裏先說下這個結構體就是用來封裝block對象的,保存block的上下文環境和block執行函數等。

typedef struct dispatch_continuation_s {
    struct dispatch_object_s _as_do[0];
    DISPATCH_CONTINUATION_HEADER(continuation);
} *dispatch_continuation_t;
複製代碼

看下里面的宏:DISPATCH_CONTINUATION_HEADER

#define DISPATCH_CONTINUATION_HEADER(x) \
    union { \
        const void *do_vtable; \
        uintptr_t dc_flags; \
    }; \
    union { \
        pthread_priority_t dc_priority; \
        int dc_cache_cnt; \
        uintptr_t dc_pad; \
    }; \
    struct dispatch_##x##_s *volatile do_next; \
    struct voucher_s *dc_voucher; \
    dispatch_function_t dc_func; \
    void *dc_ctxt; \
    void *dc_data; \
    void *dc_other
複製代碼

####3. dispatch_object_t

typedef union {
    struct _os_object_s *_os_obj;
    struct dispatch_object_s *_do;
    struct dispatch_continuation_s *_dc;
    struct dispatch_queue_s *_dq;
    struct dispatch_queue_attr_s *_dqa;
    struct dispatch_group_s *_dg;
    struct dispatch_source_s *_ds;
    struct dispatch_mach_s *_dm;
    struct dispatch_mach_msg_s *_dmsg;
    struct dispatch_source_attr_s *_dsa;
    struct dispatch_semaphore_s *_dsema;
    struct dispatch_data_s *_ddata;
    struct dispatch_io_s *_dchannel;
    struct dispatch_operation_s *_doperation;
    struct dispatch_disk_s *_ddisk;
} dispatch_object_t DISPATCH_TRANSPARENT_UNION;
複製代碼

4. dispatch_function_t

dispatch_function_t只是一個函數指針

typedef void (*dispatch_function_t)(void *_Nullable);
複製代碼

至此,一些經常使用的宏和數據結構體介紹完畢,接下來,咱們真正的要一塊兒閱讀GCD相關的源碼了。

####建立隊列

首先咱們先從建立隊列講起。咱們已經很熟悉,建立隊列的方法是調用dispatch_queue_create函數。

其內部又調用了_dispatch_queue_create_with_target函數
  DISPATCH_TARGET_QUEUE_DEFAULT這個宏其實就是null

  dispatch_queue_t dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
  {   // attr通常咱們都是傳DISPATCH_QUEUE_SERIAL、DISPATCH_QUEUE_CONCURRENT或者nil
      // 而DISPATCH_QUEUE_SERIAL其實就是null
      return _dispatch_queue_create_with_target(label, attr,
              DISPATCH_TARGET_QUEUE_DEFAULT, true);
  }
複製代碼

_dispatch_queue_create_with_target函數,這裏會建立一個root隊列,並將本身新建的隊列綁定到所對應的root隊列上。

static dispatch_queue_t _dispatch_queue_create_with_target(const char *label, dispatch_queue_attr_t dqa,
        dispatch_queue_t tq, bool legacy)
{   // 根據上文代碼註釋裏提到的,做者認爲調用者傳入DISPATCH_QUEUE_SERIAL和nil的概率要大於傳DISPATCH_QUEUE_CONCURRENT。因此這裏設置個默認值。
    // 這裏怎麼理解呢?只要看作if(!dqa)便可
    if (!slowpath(dqa)) {
        // _dispatch_get_default_queue_attr裏面會將dqa的dqa_autorelease_frequency指定爲DISPATCH_AUTORELEASE_FREQUENCY_INHERIT的,inactive也指定爲false。這裏就不展開了,只須要知道賦了哪些值。由於後面會用到。
        dqa = _dispatch_get_default_queue_attr();
    } else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
        DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
    }

    // 取出優先級
    dispatch_qos_t qos = _dispatch_priority_qos(dqa->dqa_qos_and_relpri);

    // overcommit單純從英文理解表示過量使用的意思,那這裏這個overcommit就是一個標識符,表示是否是就算負荷很高了,但仍是得給我新開一個線程出來給我執行任務。
    _dispatch_queue_attr_overcommit_t overcommit = dqa->dqa_overcommit;
    if (overcommit != _dispatch_queue_attr_overcommit_unspecified && tq) {
        if (tq->do_targetq) {
            DISPATCH_CLIENT_CRASH(tq, "Cannot specify both overcommit and "
                    "a non-global target queue");
        }
    }

    // 若是overcommit沒有被指定
    if (overcommit == _dispatch_queue_attr_overcommit_unspecified) {
         // 因此對於overcommit,若是是串行的話默認是開啓的,而並行是關閉的
        overcommit = dqa->dqa_concurrent ?
                _dispatch_queue_attr_overcommit_disabled :
                _dispatch_queue_attr_overcommit_enabled;
    }

    // 以前說過初始化隊列默認傳了DISPATCH_TARGET_QUEUE_DEFAULT,也就是null,因此進入if語句。
    if (!tq) {
        // 獲取一個管理本身隊列的root隊列。
        tq = _dispatch_get_root_queue(
                qos == DISPATCH_QOS_UNSPECIFIED ? DISPATCH_QOS_DEFAULT : qos,
                overcommit == _dispatch_queue_attr_overcommit_enabled);
        if (slowpath(!tq)) {
            DISPATCH_CLIENT_CRASH(qos, "Invalid queue attribute");
        }
    }

    // legacy默認是trueif (legacy) {
        // 以前說過,默認是會給dqa_autorelease_frequency指定爲DISPATCH_AUTORELEASE_FREQUENCY_INHERIT,因此這個判斷式是成立的
        if (dqa->dqa_inactive || dqa->dqa_autorelease_frequency) {
            legacy = false;
        }
    }

    // vtable變量很重要,以後會被賦值到以前說的dispatch_queue_t結構體裏的do_vtable變量上
    const void *vtable;
    dispatch_queue_flags_t dqf = 0;
    
    // legacy變爲falseif (legacy) {
        vtable = DISPATCH_VTABLE(queue);
    } else if (dqa->dqa_concurrent) {
        // 若是建立隊列的時候傳了DISPATCH_QUEUE_CONCURRENT,就是走這裏
        vtable = DISPATCH_VTABLE(queue_concurrent);
    } else {
        // 若是建立線程沒有指定爲並行隊列,不管你傳DISPATCH_QUEUE_SERIAL仍是nil,都會建立一個串行隊列。
        vtable = DISPATCH_VTABLE(queue_serial);
    }

    if (label) {
        // 判斷傳進來的字符串是否可變的,若是可變的copy成一份不可變的
        const char *tmp = _dispatch_strdup_if_mutable(label);
        if (tmp != label) {
            dqf |= DQF_LABEL_NEEDS_FREE;
            label = tmp;
        }
    }

    // _dispatch_object_alloc裏面就將vtable賦值給do_vtable變量上了。
    dispatch_queue_t dq = _dispatch_object_alloc(vtable,
            sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD);
    // 第三個參數根據是否並行隊列,若是不是則最多開一個線程,若是是則最多開0x1000 - 2個線程,這個數量很驚人了已經,換成十進制就是(4096 - 2)個。
    // dqa_inactive以前說串行是false的
    // DISPATCH_QUEUE_ROLE_INNER 也是0,因此這裏串行隊列的話dqa->dqa_state是0
    _dispatch_queue_init(dq, dqf, dqa->dqa_concurrent ?
            DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
            (dqa->dqa_inactive ? DISPATCH_QUEUE_INACTIVE : 0));

    dq->dq_label = label;
#if HAVE_PTHREAD_WORKQUEUE_QOS
    dq->dq_priority = dqa->dqa_qos_and_relpri;
    if (overcommit == _dispatch_queue_attr_overcommit_enabled) {
        dq->dq_priority |= DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
    }
#endif
    _dispatch_retain(tq);
    if (qos == QOS_CLASS_UNSPECIFIED) {
        _dispatch_queue_priority_inherit_from_target(dq, tq);
    }
    if (!dqa->dqa_inactive) {
        _dispatch_queue_inherit_wlh_from_target(dq, tq);
    }
    // 自定義的queue的目標隊列是root隊列
    dq->do_targetq = tq;
    _dispatch_object_debug(dq, "%s", __func__);
    return _dispatch_introspection_queue_create(dq);
}
複製代碼

這個函數裏面仍是有幾個重要的地方拆出來看下,首先是建立一個root隊列_dispatch_get_root_queue函數。取root隊列,通常是從一個裝有12個root隊列數組裏面取。

static inline dispatch_queue_t
_dispatch_get_root_queue(dispatch_qos_t qos, bool overcommit)
{
    if (unlikely(qos == DISPATCH_QOS_UNSPECIFIED || qos > DISPATCH_QOS_MAX)) {
        DISPATCH_CLIENT_CRASH(qos, "Corrupted priority");
    }
    return &_dispatch_root_queues[2 * (qos - 1) + overcommit];
}
複製代碼

看下這個_dispatch_root_queues數組。咱們能夠看到,每個優先級都有對應的root隊列,每個優先級又分爲是否是能夠過載的隊列。

struct dispatch_queue_s _dispatch_root_queues[] = {
#define _DISPATCH_ROOT_QUEUE_IDX(n, flags) \
    ((flags & DISPATCH_PRIORITY_FLAG_OVERCOMMIT) ? \
        DISPATCH_ROOT_QUEUE_IDX_##n##_QOS_OVERCOMMIT : \
        DISPATCH_ROOT_QUEUE_IDX_##n##_QOS)
#define _DISPATCH_ROOT_QUEUE_ENTRY(n, flags, ...) \
    [_DISPATCH_ROOT_QUEUE_IDX(n, flags)] = { \
        DISPATCH_GLOBAL_OBJECT_HEADER(queue_root), \
        .dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE, \
        .do_ctxt = &_dispatch_root_queue_contexts[ \
                _DISPATCH_ROOT_QUEUE_IDX(n, flags)], \
        .dq_atomic_flags = DQF_WIDTH(DISPATCH_QUEUE_WIDTH_POOL), \
        .dq_priority = _dispatch_priority_make(DISPATCH_QOS_##n, 0) | flags | \
                DISPATCH_PRIORITY_FLAG_ROOTQUEUE | \
                ((flags & DISPATCH_PRIORITY_FLAG_DEFAULTQUEUE) ? 0 : \
                DISPATCH_QOS_##n << DISPATCH_PRIORITY_OVERRIDE_SHIFT), \
        __VA_ARGS__ \
    }
    _DISPATCH_ROOT_QUEUE_ENTRY(MAINTENANCE, 0,
        .dq_label = "com.apple.root.maintenance-qos",
        .dq_serialnum = 4,
    ),
    _DISPATCH_ROOT_QUEUE_ENTRY(MAINTENANCE, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
        .dq_label = "com.apple.root.maintenance-qos.overcommit",
        .dq_serialnum = 5,
    ),
    _DISPATCH_ROOT_QUEUE_ENTRY(BACKGROUND, 0,
        .dq_label = "com.apple.root.background-qos",
        .dq_serialnum = 6,
    ),
    _DISPATCH_ROOT_QUEUE_ENTRY(BACKGROUND, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
        .dq_label = "com.apple.root.background-qos.overcommit",
        .dq_serialnum = 7,
    ),
    _DISPATCH_ROOT_QUEUE_ENTRY(UTILITY, 0,
        .dq_label = "com.apple.root.utility-qos",
        .dq_serialnum = 8,
    ),
    _DISPATCH_ROOT_QUEUE_ENTRY(UTILITY, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
        .dq_label = "com.apple.root.utility-qos.overcommit",
        .dq_serialnum = 9,
    ),
    _DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT, DISPATCH_PRIORITY_FLAG_DEFAULTQUEUE,
        .dq_label = "com.apple.root.default-qos",
        .dq_serialnum = 10,
    ),
    _DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT,
            DISPATCH_PRIORITY_FLAG_DEFAULTQUEUE | DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
        .dq_label = "com.apple.root.default-qos.overcommit",
        .dq_serialnum = 11,
    ),
    _DISPATCH_ROOT_QUEUE_ENTRY(USER_INITIATED, 0,
        .dq_label = "com.apple.root.user-initiated-qos",
        .dq_serialnum = 12,
    ),
    _DISPATCH_ROOT_QUEUE_ENTRY(USER_INITIATED, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
        .dq_label = "com.apple.root.user-initiated-qos.overcommit",
        .dq_serialnum = 13,
    ),
    _DISPATCH_ROOT_QUEUE_ENTRY(USER_INTERACTIVE, 0,
        .dq_label = "com.apple.root.user-interactive-qos",
        .dq_serialnum = 14,
    ),
    _DISPATCH_ROOT_QUEUE_ENTRY(USER_INTERACTIVE, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
        .dq_label = "com.apple.root.user-interactive-qos.overcommit",
        .dq_serialnum = 15,
    ),
};
複製代碼

其中DISPATCH_GLOBAL_OBJECT_HEADER(queue_root),解析到最後是OSdispatch##name##_class這樣的這樣的,對應的實例對象是以下代碼,指定了root隊列各個操做對應的函數。

DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_root, queue,
    .do_type = DISPATCH_QUEUE_GLOBAL_ROOT_TYPE,
    .do_kind = "global-queue",
    .do_dispose = _dispatch_pthread_root_queue_dispose,
    .do_push = _dispatch_root_queue_push,
    .do_invoke = NULL,
    .do_wakeup = _dispatch_root_queue_wakeup,
    .do_debug = dispatch_queue_debug,
);
複製代碼

其次看下DISPATCH_VTABLE這個宏,這個宏很重要。最後解封也是&OSdispatch##name##_class這樣的。其實就是取dispatch_object_t對象。   以下代碼,這裏再舉個VTABLE的串行對象,裏面有各個狀態該執行的函數:銷燬函、掛起、恢復、push等函數都是在這裏指定的。因此這裏的do_push咱們須要特別留意,後面push block任務到隊列,就是經過調用do_push

DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_serial, queue,
    .do_type = DISPATCH_QUEUE_SERIAL_TYPE,
    .do_kind = "serial-queue",
    .do_dispose = _dispatch_queue_dispose,
    .do_suspend = _dispatch_queue_suspend,
    .do_resume = _dispatch_queue_resume,
    .do_finalize_activation = _dispatch_queue_finalize_activation,
    .do_push = _dispatch_queue_push,
    .do_invoke = _dispatch_queue_invoke,
    .do_wakeup = _dispatch_queue_wakeup,
    .do_debug = dispatch_queue_debug,
    .do_set_targetq = _dispatch_queue_set_target_queue,
);

  繼續看下_dispatch_object_alloc和_dispatch_queue_init兩個函數,首先看下_dispatch_object_alloc函數

void * _dispatch_object_alloc(const void *vtable, size_t size)
{
// OS_OBJECT_HAVE_OBJC1爲1的知足式是:
// #if TARGET_OS_MAC && !TARGET_OS_SIMULATOR && defined(__i386__)
// 因此對於iOS並不知足
#if OS_OBJECT_HAVE_OBJC1
    const struct dispatch_object_vtable_s *_vtable = vtable;
    dispatch_object_t dou;
    dou._os_obj = _os_object_alloc_realized(_vtable->_os_obj_objc_isa, size);
    dou._do->do_vtable = vtable;
    return dou._do;
#else
    return _os_object_alloc_realized(vtable, size);
#endif
}

inline _os_object_t _os_object_alloc_realized(const void *cls, size_t size)
{
    _os_object_t obj;
    dispatch_assert(size >= sizeof(struct _os_object_s));
    while (!fastpath(obj = calloc(1u, size))) {
        _dispatch_temporary_resource_shortage();
    }
    obj->os_obj_isa = cls;
    return obj;
}

void _dispatch_temporary_resource_shortage(void)
{
    sleep(1);
    asm("");  // prevent tailcall
}

  再看下_dispatch_queue_init函數,這裏也就是作些初始化工做了

static inline void _dispatch_queue_init(dispatch_queue_t dq, dispatch_queue_flags_t dqf,
        uint16_t width, uint64_t initial_state_bits)
{
    uint64_t dq_state = DISPATCH_QUEUE_STATE_INIT_VALUE(width);

    dispatch_assert((initial_state_bits & ~(DISPATCH_QUEUE_ROLE_MASK |
            DISPATCH_QUEUE_INACTIVE)) == 0);

    if (initial_state_bits & DISPATCH_QUEUE_INACTIVE) {
        dq_state |= DISPATCH_QUEUE_INACTIVE + DISPATCH_QUEUE_NEEDS_ACTIVATION;
        dq_state |= DLOCK_OWNER_MASK;
        dq->do_ref_cnt += 2; 
    }

    dq_state |= (initial_state_bits & DISPATCH_QUEUE_ROLE_MASK);
    // 指向DISPATCH_OBJECT_LISTLESS是優化編譯器的做用。只是爲了生成更好的指令讓CPU更好的編碼
    dq->do_next = (struct dispatch_queue_s *)DISPATCH_OBJECT_LISTLESS;
    dqf |= DQF_WIDTH(width);
    // dqf 保存進 dq->dq_atomic_flags
    os_atomic_store2o(dq, dq_atomic_flags, dqf, relaxed);
    dq->dq_state = dq_state;
    dq->dq_serialnum =
            os_atomic_inc_orig(&_dispatch_queue_serial_numbers, relaxed);
}

  最後是_dispatch_introspection_queue_create函數,一個內省函數。

dispatch_queue_t _dispatch_introspection_queue_create(dispatch_queue_t dq)
{
    TAILQ_INIT(&dq->diq_order_top_head);
    TAILQ_INIT(&dq->diq_order_bottom_head);
    _dispatch_unfair_lock_lock(&_dispatch_introspection.queues_lock);
    TAILQ_INSERT_TAIL(&_dispatch_introspection.queues, dq, diq_list);
    _dispatch_unfair_lock_unlock(&_dispatch_introspection.queues_lock);

    DISPATCH_INTROSPECTION_INTERPOSABLE_HOOK_CALLOUT(queue_create, dq);
    if (DISPATCH_INTROSPECTION_HOOK_ENABLED(queue_create)) {
        _dispatch_introspection_queue_create_hook(dq);
    }
    return dq;
}

  至此,一個隊列的建立過程咱們大體瞭解了。大體能夠分爲這麼幾點

    設置隊列優先級
    默認建立的是一個串行隊列
    設置隊列掛載的根隊列。優先級不一樣根隊列也不一樣
    實例化vtable對象,這個對象給不一樣隊列指定了push、wakeup等函數。
複製代碼

0x04 dispatch_sync dispatch_sync直接調用的是dispatch_sync_f

void dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
{
    // 很大可能不會走if分支,看作if(_dispatch_block_has_private_data(work))
    if (unlikely(_dispatch_block_has_private_data(work))) {
        return _dispatch_sync_block_with_private_data(dq, work, 0);
    }
    dispatch_sync_f(dq, work, _dispatch_Block_invoke(work));
}

void
dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
{
    // 串行隊列會走到這個if分支
    if (likely(dq->dq_width == 1)) {
        return dispatch_barrier_sync_f(dq, ctxt, func);
    }

    // 全局獲取的並行隊列或者綁定的是非調度線程的隊列會走進這個if分支
    if (unlikely(!_dispatch_queue_try_reserve_sync_width(dq))) {
        return _dispatch_sync_f_slow(dq, ctxt, func, 0);
    }

    _dispatch_introspection_sync_begin(dq);
    if (unlikely(dq->do_targetq->do_targetq)) {
        return _dispatch_sync_recurse(dq, ctxt, func, 0);
    }
    // 自定義並行隊列會來到這個函數
    _dispatch_sync_invoke_and_complete(dq, ctxt, func);
}
複製代碼

先說第一種狀況,串行隊列。

void dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
        dispatch_function_t func)
{
    dispatch_tid tid = _dispatch_tid_self();

    // 隊列綁定的是非調度線程就會走這裏
    if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dq, tid))) {
        return _dispatch_sync_f_slow(dq, ctxt, func, DISPATCH_OBJ_BARRIER_BIT);
    }

    _dispatch_introspection_sync_begin(dq);
    if (unlikely(dq->do_targetq->do_targetq)) {
        return _dispatch_sync_recurse(dq, ctxt, func, DISPATCH_OBJ_BARRIER_BIT);
    }
    // 通常會走到這裏
    _dispatch_queue_barrier_sync_invoke_and_complete(dq, ctxt, func);
}

static void _dispatch_queue_barrier_sync_invoke_and_complete(dispatch_queue_t dq,
        void *ctxt, dispatch_function_t func)
{
    // 首先會執行這個函數
    _dispatch_sync_function_invoke_inline(dq, ctxt, func);
    // 若是後面還有別的任務
    if (unlikely(dq->dq_items_tail || dq->dq_width > 1)) {
        // 內部其實就是喚醒隊列
        return _dispatch_queue_barrier_complete(dq, 0, 0);
    }

    const uint64_t fail_unlock_mask = DISPATCH_QUEUE_SUSPEND_BITS_MASK |
            DISPATCH_QUEUE_ENQUEUED | DISPATCH_QUEUE_DIRTY |
            DISPATCH_QUEUE_RECEIVED_OVERRIDE | DISPATCH_QUEUE_SYNC_TRANSFER |
            DISPATCH_QUEUE_RECEIVED_SYNC_WAIT;
    uint64_t old_state, new_state;

    // 原子鎖。檢查dq->dq_state與old_state是否相等,若是相等把new_state賦值給dq->dq_state,若是不相等,把dq_state賦值給old_state。
    // 串行隊列走到這裏,dq->dq_state與old_state是相等的,會把new_state也就是閉包裏的賦值的值給dq->dq_state
    os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
        new_state  = old_state - DISPATCH_QUEUE_SERIAL_DRAIN_OWNED;
        new_state &= ~DISPATCH_QUEUE_DRAIN_UNLOCK_MASK;
        new_state &= ~DISPATCH_QUEUE_MAX_QOS_MASK;
        if (unlikely(old_state & fail_unlock_mask)) {
            os_atomic_rmw_loop_give_up({
                return _dispatch_queue_barrier_complete(dq, 0, 0);
            });
        }
    });
    if (_dq_state_is_base_wlh(old_state)) {
        _dispatch_event_loop_assert_not_owned((dispatch_wlh_t)dq);
    }
}

static inline void _dispatch_sync_function_invoke_inline(dispatch_queue_t dq, void *ctxt,
        dispatch_function_t func)
{
    // 保護現場 -> 調用函數 -> 恢復現場
    dispatch_thread_frame_s dtf;
    _dispatch_thread_frame_push(&dtf, dq);
    _dispatch_client_callout(ctxt, func);
    _dispatch_perfmon_workitem_inc();
    _dispatch_thread_frame_pop(&dtf);
}

  而後另外一種狀況,自定義並行隊列會走_dispatch_sync_invoke_and_complete函數。

static void _dispatch_sync_invoke_and_complete(dispatch_queue_t dq, void *ctxt,
        dispatch_function_t func)
{
    _dispatch_sync_function_invoke_inline(dq, ctxt, func);
    // 將自定義隊列加入到root隊列裏去
    // dispatch_async也會調用此方法,以前咱們初始化的時候會綁定一個root隊列,這裏就將咱們新建的隊列交給root隊列進行管理
    _dispatch_queue_non_barrier_complete(dq);
}

static inline void _dispatch_sync_function_invoke_inline(dispatch_queue_t dq, void *ctxt,
        dispatch_function_t func)
{
    dispatch_thread_frame_s dtf;
    _dispatch_thread_frame_push(&dtf, dq);
    // 執行任務
    _dispatch_client_callout(ctxt, func);
    _dispatch_perfmon_workitem_inc();
    _dispatch_thread_frame_pop(&dtf);
}

## dispatch_async

  內部就是兩個函數_dispatch_continuation_init和_dispatch_continuation_async

void dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
{
    dispatch_continuation_t dc = _dispatch_continuation_alloc();
    // 設置標識位
    uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT;

    _dispatch_continuation_init(dc, dq, work, 0, 0, dc_flags);
    _dispatch_continuation_async(dq, dc);
}

  _dispatch_continuation_init函數只是一個初始化,主要就是保存Block上下文,指定block的執行函數

static inline void _dispatch_continuation_init(dispatch_continuation_t dc,
        dispatch_queue_class_t dqu, dispatch_block_t work,
        pthread_priority_t pp, dispatch_block_flags_t flags, uintptr_t dc_flags)
{
    dc->dc_flags = dc_flags | DISPATCH_OBJ_BLOCK_BIT;
    // block對象賦值到dc_ctxt
    dc->dc_ctxt = _dispatch_Block_copy(work);
    // 設置默認任務優先級
    _dispatch_continuation_priority_set(dc, pp, flags);

    // 大多數狀況不會走這個分支
    if (unlikely(_dispatch_block_has_private_data(work))) {
        return _dispatch_continuation_init_slow(dc, dqu, flags);
    }

    // 這個標識位多眼熟,就是前面入口賦值的,沒的跑了,指定執行函數就是_dispatch_call_block_and_release了
    if (dc_flags & DISPATCH_OBJ_CONSUME_BIT) {
        dc->dc_func = _dispatch_call_block_and_release;
    } else {
        dc->dc_func = _dispatch_Block_invoke(work);
    }
    _dispatch_continuation_voucher_set(dc, dqu, flags);
}

  _dispatch_call_block_and_release這個函數就是直接執行block了,因此dc->dc_func被調用的話就block會被直接執行了。

void _dispatch_call_block_and_release(void *block)
{
    void (^b)(void) = block;
    b();
    Block_release(b);
}

  上面的初始化過程就是這樣,接着看下_dispatch_continuation_async函數

void _dispatch_continuation_async(dispatch_queue_t dq, dispatch_continuation_t dc)
{
    // 看看是否是barrier類型的block
    _dispatch_continuation_async2(dq, dc,
            dc->dc_flags & DISPATCH_OBJ_BARRIER_BIT);
}

static inline void _dispatch_continuation_async2(dispatch_queue_t dq, dispatch_continuation_t dc,
        bool barrier)
{
    // 若是是用barrier插進來的任務或者是串行隊列,直接將任務加入到隊列
    // #define DISPATCH_QUEUE_USES_REDIRECTION(width) \
    //    ({ uint16_t _width = (width); \
    //    _width > 1 && _width < DISPATCH_QUEUE_WIDTH_POOL; }) 
    if (fastpath(barrier || !DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width))) {
        return _dispatch_continuation_push(dq, dc);
    }
    return _dispatch_async_f2(dq, dc);
}

// 能夠先看下若是是barrier任務,直接調用_dispatch_continuation_push函數
static void _dispatch_continuation_push(dispatch_queue_t dq, dispatch_continuation_t dc)
{
    dx_push(dq, dc, _dispatch_continuation_override_qos(dq, dc));
}

// _dispatch_continuation_async2函數裏面調用_dispatch_async_f2函數
static void
_dispatch_async_f2(dispatch_queue_t dq, dispatch_continuation_t dc)
{
    // 若是還有任務,slowpath表示很大可能隊尾是沒有任務的。
    // 實際開發中也的確如此,通常狀況下咱們不會dispatch_async以後又立刻跟着一個dispatch_async
    if (slowpath(dq->dq_items_tail)) {
        return _dispatch_continuation_push(dq, dc);
    }

    if (slowpath(!_dispatch_queue_try_acquire_async(dq))) {
        return _dispatch_continuation_push(dq, dc);
    }

    // 通常會直接來到這裏,_dispatch_continuation_override_qos函數裏面主要作的是判斷dq有沒有設置的優先級,若是沒有就用block對象的優先級,若是有就用本身的
    return _dispatch_async_f_redirect(dq, dc,
            _dispatch_continuation_override_qos(dq, dc));
}

static void _dispatch_async_f_redirect(dispatch_queue_t dq,
        dispatch_object_t dou, dispatch_qos_t qos)
{
    // 這裏會走進if的語句,由於_dispatch_object_is_redirection內部的dx_type(dou._do) == type條件爲否
    if (!slowpath(_dispatch_object_is_redirection(dou))) {
        dou._dc = _dispatch_async_redirect_wrap(dq, dou);
    }
    // dq換成所綁定的root隊列
    dq = dq->do_targetq;

    // 基本不會走裏面的循環,主要作的就是找到根root隊列
    while (slowpath(DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width))) {
        if (!fastpath(_dispatch_queue_try_acquire_async(dq))) {
            break;
        }
        if (!dou._dc->dc_ctxt) {
            dou._dc->dc_ctxt = (void *)
                    (uintptr_t)_dispatch_queue_autorelease_frequency(dq);
        }
        dq = dq->do_targetq;
    }

    // 把裝有block信息的結構體裝進所在隊列對應的root_queue裏面
    dx_push(dq, dou, qos);
}

// dx_push是個宏定義,這裏作的就是將任務push到任務隊列,咱們看到這裏,就知道dx_push就是調用對象的do_push。
#define dx_push(x, y, z) dx_vtable(x)->do_push(x, y, z)
#define dx_vtable(x) (&(x)->do_vtable->_os_obj_vtable)

  _dispatch_async_f_redirect函數裏先看這句dou._dc = _dispatch_async_redirect_wrap(dq, dou);

static inline dispatch_continuation_t _dispatch_async_redirect_wrap(dispatch_queue_t dq, dispatch_object_t dou)
{
    dispatch_continuation_t dc = _dispatch_continuation_alloc();

    dou._do->do_next = NULL;
    // 因此dispatch_async推動的任務的do_vtable成員變量是有值的
    dc->do_vtable = DC_VTABLE(ASYNC_REDIRECT);
    dc->dc_func = NULL;
    dc->dc_ctxt = (void *)(uintptr_t)_dispatch_queue_autorelease_frequency(dq);
    // 所屬隊列被裝進dou._dc->dc_data裏面了
    dc->dc_data = dq;
    dc->dc_other = dou._do;
    dc->dc_voucher = DISPATCH_NO_VOUCHER;
    dc->dc_priority = DISPATCH_NO_PRIORITY;
    _dispatch_retain(dq); // released in _dispatch_async_redirect_invoke
    return dc;
}

// dc->do_vtable = DC_VTABLE(ASYNC_REDIRECT); 就是下面指定redirect的invoke函數是_dispatch_async_redirect_invoke,後面任務被執行就是經過這個函數
const struct dispatch_continuation_vtable_s _dispatch_continuation_vtables[] = {
    DC_VTABLE_ENTRY(ASYNC_REDIRECT,
        .do_kind = "dc-redirect",
        .do_invoke = _dispatch_async_redirect_invoke),
#if HAVE_MACH
    DC_VTABLE_ENTRY(MACH_SEND_BARRRIER_DRAIN,
        .do_kind = "dc-mach-send-drain",
        .do_invoke = _dispatch_mach_send_barrier_drain_invoke),
    DC_VTABLE_ENTRY(MACH_SEND_BARRIER,
        .do_kind = "dc-mach-send-barrier",
        .do_invoke = _dispatch_mach_barrier_invoke),
    DC_VTABLE_ENTRY(MACH_RECV_BARRIER,
        .do_kind = "dc-mach-recv-barrier",
        .do_invoke = _dispatch_mach_barrier_invoke),
    DC_VTABLE_ENTRY(MACH_ASYNC_REPLY,
        .do_kind = "dc-mach-async-reply",
        .do_invoke = _dispatch_mach_msg_async_reply_invoke),
#endif
#if HAVE_PTHREAD_WORKQUEUE_QOS
    DC_VTABLE_ENTRY(OVERRIDE_STEALING,
        .do_kind = "dc-override-stealing",
        .do_invoke = _dispatch_queue_override_invoke),
    // 留意這個,後面也會被用到
    DC_VTABLE_ENTRY(OVERRIDE_OWNING,
        .do_kind = "dc-override-owning",
        .do_invoke = _dispatch_queue_override_invoke),
#endif
};

  再看dx_push(dq, dou, qos);這句,其實就是調用_dispatch_root_queue_push函數

void _dispatch_root_queue_push(dispatch_queue_t rq, dispatch_object_t dou,
        dispatch_qos_t qos)
{
    // 通常狀況下,不管自定義仍是非自定義都會走進這個條件式(好比:dispatch_get_global_queue)
    // 裏面主要對比的是qos與root隊列的qos是否一致。基本上都不一致的,若是不一致走進這個if語句
    if (_dispatch_root_queue_push_needs_override(rq, qos)) {
        return _dispatch_root_queue_push_override(rq, dou, qos);
    }
    _dispatch_root_queue_push_inline(rq, dou, dou, 1);
}

static void _dispatch_root_queue_push_override(dispatch_queue_t orig_rq,
        dispatch_object_t dou, dispatch_qos_t qos)
{
    bool overcommit = orig_rq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
    dispatch_queue_t rq = _dispatch_get_root_queue(qos, overcommit);
    dispatch_continuation_t dc = dou._dc;
    // 這個_dispatch_object_is_redirection函數其實就是return _dispatch_object_has_type(dou,DISPATCH_CONTINUATION_TYPE(ASYNC_REDIRECT));
    // 因此自定義隊列會走這個if語句,若是是dispatch_get_global_queue不會走if語句
    if (_dispatch_object_is_redirection(dc)) {
        dc->dc_func = (void *)orig_rq;
    } else {
        // dispatch_get_global_queue來到這裏
        dc = _dispatch_continuation_alloc();
        // 至關因而下面的,也就是指定了執行函數爲_dispatch_queue_override_invoke,因此有別於自定義隊列的invoke函數。
        // DC_VTABLE_ENTRY(OVERRIDE_OWNING,
        // .do_kind = "dc-override-owning",
        // .do_invoke = _dispatch_queue_override_invoke),
        dc->do_vtable = DC_VTABLE(OVERRIDE_OWNING);
        _dispatch_trace_continuation_push(orig_rq, dou);
        dc->dc_ctxt = dc;
        dc->dc_other = orig_rq;
        dc->dc_data = dou._do;
        dc->dc_priority = DISPATCH_NO_PRIORITY;
        dc->dc_voucher = DISPATCH_NO_VOUCHER;
    }
    _dispatch_root_queue_push_inline(rq, dc, dc, 1);
}

static inline void _dispatch_root_queue_push_inline(dispatch_queue_t dq, dispatch_object_t _head,
        dispatch_object_t _tail, int n)
{
    struct dispatch_object_s *head = _head._do, *tail = _tail._do;
    // 把任務裝進隊列,大多數不走進if語句。可是第一個任務進來以前仍是知足這個條件式的,會進入這個條件語句去激活隊列來執行裏面的任務,後面再加入的任務由於隊列被激活了,因此也就不太須要再進入這個隊列了,因此相對來講激活隊列只要一次,因此做者認爲大多數狀況下不須要走進這個條件語句
    if (unlikely(_dispatch_queue_push_update_tail_list(dq, head, tail))) {
        // 保存隊列頭
        _dispatch_queue_push_update_head(dq, head);
        return _dispatch_global_queue_poke(dq, n, 0);
    }
}
複製代碼

至此,咱們能夠看到,咱們裝入到自定義的任務都被扔到其掛靠的root隊列裏去了,因此咱們咱們本身建立的隊列只是一個代理人身份,真正的管理人是其對應的root隊列,但同時這個隊列也是被管理的。 繼續看_dispatch_global_queue_poke函數

void
_dispatch_global_queue_poke(dispatch_queue_t dq, int n, int floor)
{
    return _dispatch_global_queue_poke_slow(dq, n, floor);
}

  繼續看_dispatch_global_queue_poke函數調用了_dispatch_global_queue_poke_slow函數,這裏也很關鍵了,裏面執行_pthread_workqueue_addthreads函數,把任務交給內核分發處理

_dispatch_global_queue_poke_slow(dispatch_queue_t dq, int n, int floor)
{
    dispatch_root_queue_context_t qc = dq->do_ctxt;
    int remaining = n;
    int r = ENOSYS;

    _dispatch_root_queues_init();
    _dispatch_debug_root_queue(dq, __func__);
    if (qc->dgq_kworkqueue != (void*)(~0ul))
    {
        r = _pthread_workqueue_addthreads(remaining,
                _dispatch_priority_to_pp(dq->dq_priority));
        (void)dispatch_assume_zero(r);
        return;
    }
}

int
_pthread_workqueue_addthreads(int numthreads, pthread_priority_t priority)
{
    int res = 0;

    if (__libdispatch_workerfunction == NULL) {
        return EPERM;
    }

    if ((__pthread_supported_features & PTHREAD_FEATURE_FINEPRIO) == 0) {
        return ENOTSUP;
    }

    res = __workq_kernreturn(WQOPS_QUEUE_REQTHREADS, NULL, numthreads, (int)priority);
    if (res == -1) {
        res = errno;
    }
    return res;
}
複製代碼

那麼,加入到根隊列的任務是怎麼被運行起來的?在此以前,咱們先模擬一下在GCD內部把程序搞掛掉,這樣咱們就能夠追溯下調用棧關係。

(
    0   CoreFoundation                      0x00000001093fe12b __exceptionPreprocess + 171
    1   libobjc.A.dylib                     0x0000000108a92f41 objc_exception_throw + 48
    2   CoreFoundation                      0x000000010943e0cc _CFThrowFormattedException + 194
    3   CoreFoundation                      0x000000010930c23d -[__NSPlaceholderArray initWithObjects:count:] + 237
    4   CoreFoundation                      0x0000000109312e34 +[NSArray arrayWithObjects:count:] + 52
    5   HotPatch                            0x000000010769df77 __29-[ViewController viewDidLoad]_block_invoke + 87
    6   libdispatch.dylib                   0x000000010c0a62f7 _dispatch_call_block_and_release + 12
    7   libdispatch.dylib                   0x000000010c0a733d _dispatch_client_callout + 8
    8   libdispatch.dylib                   0x000000010c0ad754 _dispatch_continuation_pop + 967
    9   libdispatch.dylib                   0x000000010c0abb85 _dispatch_async_redirect_invoke + 780
    10  libdispatch.dylib                   0x000000010c0b3102 _dispatch_root_queue_drain + 772
    11  libdispatch.dylib                   0x000000010c0b2da0 _dispatch_worker_thread3 + 132
    12  libsystem_pthread.dylib             0x000000010c5f95a2 _pthread_wqthread + 1299
    13  libsystem_pthread.dylib             0x000000010c5f907d 
    start_wqthread + 13
)
複製代碼

很明顯,咱們已經看到加入到隊列的任務的調用關係是:

start_wqthread -> _pthread_wqthread -> _dispatch_worker_thread3 -> _dispatch_root_queue_drain -> _dispatch_async_redirect_invoke -> _dispatch_continuation_pop -> _dispatch_client_callout -> _dispatch_call_block_and_release
複製代碼

只看調用關係也不知道里面作了什麼,因此仍是上代碼

// 根據優先級取出相應的root隊列,再調用_dispatch_worker_thread4函數
static void _dispatch_worker_thread3(pthread_priority_t pp)
{
    bool overcommit = pp & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG;
    dispatch_queue_t dq;
    pp &= _PTHREAD_PRIORITY_OVERCOMMIT_FLAG | ~_PTHREAD_PRIORITY_FLAGS_MASK;
    _dispatch_thread_setspecific(dispatch_priority_key, (void *)(uintptr_t)pp);
    dq = _dispatch_get_root_queue(_dispatch_qos_from_pp(pp), overcommit);
    return _dispatch_worker_thread4(dq);
}
// 開始調用_dispatch_root_queue_drain函數,取出任務
static void _dispatch_worker_thread4(void *context)
{
    dispatch_queue_t dq = context;
    dispatch_root_queue_context_t qc = dq->do_ctxt;

    _dispatch_introspection_thread_add();
    int pending = os_atomic_dec2o(qc, dgq_pending, relaxed);
    dispatch_assert(pending >= 0);
    _dispatch_root_queue_drain(dq, _dispatch_get_priority());
    _dispatch_voucher_debug("root queue clear", NULL);
    _dispatch_reset_voucher(NULL, DISPATCH_THREAD_PARK);
}
// 循環取出任務
 static void _dispatch_root_queue_drain(dispatch_queue_t dq, pthread_priority_t pp)
{
        _dispatch_queue_set_current(dq);
    dispatch_priority_t pri = dq->dq_priority;
    if (!pri) pri = _dispatch_priority_from_pp(pp);
    dispatch_priority_t old_dbp = _dispatch_set_basepri(pri);
    _dispatch_adopt_wlh_anon();

    struct dispatch_object_s *item;
    bool reset = false;
    dispatch_invoke_context_s dic = { };
        dispatch_invoke_flags_t flags = DISPATCH_INVOKE_WORKER_DRAIN |
            DISPATCH_INVOKE_REDIRECTING_DRAIN;
    _dispatch_queue_drain_init_narrowing_check_deadline(&dic, pri);
    _dispatch_perfmon_start();
    while ((item = fastpath(_dispatch_root_queue_drain_one(dq)))) {
        if (reset) _dispatch_wqthread_override_reset();
        _dispatch_continuation_pop_inline(item, &dic, flags, dq);
        reset = _dispatch_reset_basepri_override();
        if (unlikely(_dispatch_queue_drain_should_narrow(&dic))) {
            break;
        }
    }

    // overcommit or not. worker thread
    if (pri & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG) {
        _dispatch_perfmon_end(perfmon_thread_worker_oc);
    } else {
        _dispatch_perfmon_end(perfmon_thread_worker_non_oc);
    }

    _dispatch_reset_wlh();
    _dispatch_reset_basepri(old_dbp);
    _dispatch_reset_basepri_override();
    _dispatch_queue_set_current(NULL);
}

// 這個函數的做用就是調度出任務的執行函數
static inline void _dispatch_continuation_pop_inline(dispatch_object_t dou,
        dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags,
        dispatch_queue_t dq)
{
    dispatch_pthread_root_queue_observer_hooks_t observer_hooks =
            _dispatch_get_pthread_root_queue_observer_hooks();
    if (observer_hooks) observer_hooks->queue_will_execute(dq);
    _dispatch_trace_continuation_pop(dq, dou);
    flags &= _DISPATCH_INVOKE_PROPAGATE_MASK;

    // 以前說過dispatch_async是有do_vtable成員變量的,因此會走進這個if分支,又invoke方法指定爲_dispatch_async_redirect_invoke,因此執行該函數
    // 相同的,若是是dispatch_get_global_queue也會走這個分支,執行_dispatch_queue_override_invoke方法,這個以前也說過了
    if (_dispatch_object_has_vtable(dou)) {
        dx_invoke(dou._do, dic, flags);
    } else {
        _dispatch_continuation_invoke_inline(dou, DISPATCH_NO_VOUCHER, flags);
    }
    if (observer_hooks) observer_hooks->queue_did_execute(dq);
}

// 繼續按自定義隊列的步驟走
void _dispatch_async_redirect_invoke(dispatch_continuation_t dc,
        dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags)
{
    dispatch_thread_frame_s dtf;
    struct dispatch_continuation_s *other_dc = dc->dc_other;
    dispatch_invoke_flags_t ctxt_flags = (dispatch_invoke_flags_t)dc->dc_ctxt;

    dispatch_queue_t assumed_rq = (dispatch_queue_t)dc->dc_func;
    dispatch_queue_t dq = dc->dc_data, rq, old_dq;
    dispatch_priority_t old_dbp;

    if (ctxt_flags) {
        flags &= ~_DISPATCH_INVOKE_AUTORELEASE_MASK;
        flags |= ctxt_flags;
    }
    old_dq = _dispatch_get_current_queue();
    if (assumed_rq) {
        old_dbp = _dispatch_root_queue_identity_assume(assumed_rq);
        _dispatch_set_basepri(dq->dq_priority);
    } else {
        old_dbp = _dispatch_set_basepri(dq->dq_priority);
    }

    _dispatch_thread_frame_push(&dtf, dq);
    // _dispatch_continuation_pop_forwarded裏面就是執行_dispatch_continuation_pop函數
    _dispatch_continuation_pop_forwarded(dc, DISPATCH_NO_VOUCHER,
            DISPATCH_OBJ_CONSUME_BIT, {
        _dispatch_continuation_pop(other_dc, dic, flags, dq);
    });
    _dispatch_thread_frame_pop(&dtf);
    if (assumed_rq) _dispatch_queue_set_current(old_dq);
    _dispatch_reset_basepri(old_dbp);

    rq = dq->do_targetq;
    while (slowpath(rq->do_targetq) && rq != old_dq) {
        _dispatch_queue_non_barrier_complete(rq);
        rq = rq->do_targetq;
    }

    _dispatch_queue_non_barrier_complete(dq);
    _dispatch_release_tailcall(dq); 
}

// 順便說下,若是按照的是dispatch_get_global_queue會執行_dispatch_queue_override_invoke函數
void _dispatch_queue_override_invoke(dispatch_continuation_t dc,
        dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags)
{
    dispatch_queue_t old_rq = _dispatch_queue_get_current();
    dispatch_queue_t assumed_rq = dc->dc_other;
    dispatch_priority_t old_dp;
    voucher_t ov = DISPATCH_NO_VOUCHER;
    dispatch_object_t dou;

    dou._do = dc->dc_data;
    old_dp = _dispatch_root_queue_identity_assume(assumed_rq);
    if (dc_type(dc) == DISPATCH_CONTINUATION_TYPE(OVERRIDE_STEALING)) {
        flags |= DISPATCH_INVOKE_STEALING;
    } else {
        // balance the fake continuation push in
        // _dispatch_root_queue_push_override
        _dispatch_trace_continuation_pop(assumed_rq, dou._do);
    }
    // 一樣調用_dispatch_continuation_pop函數
    _dispatch_continuation_pop_forwarded(dc, ov, DISPATCH_OBJ_CONSUME_BIT, {
        if (_dispatch_object_has_vtable(dou._do)) {
            dx_invoke(dou._do, dic, flags);
        } else {
            _dispatch_continuation_invoke_inline(dou, ov, flags);
        }
    });
    _dispatch_reset_basepri(old_dp);
    _dispatch_queue_set_current(old_rq);
}
複製代碼

// 迴歸正題,不管是自定義的隊列仍是獲取系統的,最終都會調用這個函數

void _dispatch_continuation_pop(dispatch_object_t dou, dispatch_invoke_context_t dic,
        dispatch_invoke_flags_t flags, dispatch_queue_t dq)
{
    _dispatch_continuation_pop_inline(dou, dic, flags, dq);
}

static inline void _dispatch_continuation_pop_inline(dispatch_object_t dou,
        dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags,
        dispatch_queue_t dq)
{
    dispatch_pthread_root_queue_observer_hooks_t observer_hooks =
            _dispatch_get_pthread_root_queue_observer_hooks();
    if (observer_hooks) observer_hooks->queue_will_execute(dq);
    _dispatch_trace_continuation_pop(dq, dou);
    flags &= _DISPATCH_INVOKE_PROPAGATE_MASK;
    if (_dispatch_object_has_vtable(dou)) {
        dx_invoke(dou._do, dic, flags);
    } else {
        _dispatch_continuation_invoke_inline(dou, DISPATCH_NO_VOUCHER, flags);
    }
    if (observer_hooks) observer_hooks->queue_did_execute(dq);
}

static inline void _dispatch_continuation_invoke_inline(dispatch_object_t dou, voucher_t ov,
        dispatch_invoke_flags_t flags)
{
    dispatch_continuation_t dc = dou._dc, dc1;
    dispatch_invoke_with_autoreleasepool(flags, {
        uintptr_t dc_flags = dc->dc_flags;

        _dispatch_continuation_voucher_adopt(dc, ov, dc_flags);
        if (dc_flags & DISPATCH_OBJ_CONSUME_BIT) {
            dc1 = _dispatch_continuation_free_cacheonly(dc);
        } else {
            dc1 = NULL;
        }
        // 後面分析dispatch_group_async的時候會走if這個分支,但此次走的是else分支
        if (unlikely(dc_flags & DISPATCH_OBJ_GROUP_BIT)) {
            _dispatch_continuation_with_group_invoke(dc);
        } else {
            // 此次走這裏,直接執行block函數
            _dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
            _dispatch_introspection_queue_item_complete(dou);
        }
        if (unlikely(dc1)) {
            _dispatch_continuation_free_to_cache_limit(dc1);
        }
    });
    _dispatch_perfmon_workitem_inc();
}
複製代碼

至此,任務怎麼被調度執行的已經看明白了。start_wqthread是彙編寫的,直接和內核交互。雖然咱們明確了使用了異步的任務被執行的調用順序,可是想必仍是有這樣的疑問_dispatch_worker_thread3是怎麼跟內核扯上關係的。爲何調用的是_dispatch_worker_thread3,而不是_dispatch_worker_thread或者_dispatch_worker_thread4呢?   在此以前須要說的是,在GCD中一共有2個線程池管理着任務,一個是主線程池,另外一個就是除了主線程任務的線程池。主線程池由序號1的隊列管理,其餘有序號2的隊列進行管理。加上runloop運行的runloop隊列,一共就有16個隊列。

序號  標籤
1   com.apple.main-thread
2   com.apple.libdispatch-manager
3   com.apple.root.libdispatch-manager
4   com.apple.root.maintenance-qos
5   com.apple.root.maintenance-qos.overcommit
6   com.apple.root.background-qos
7   com.apple.root.background-qos.overcommit
8   com.apple.root.utility-qos
9   com.apple.root.utility-qos.overcommit
10  com.apple.root.default-qos
11  com.apple.root.default-qos.overcommit
12  com.apple.root.user-initiated-qos
13  com.apple.root.user-initiated-qos.overcommit
14  com.apple.root.user-interactive-qos
15  com.apple.root.user-interactive-qos.overcommit
複製代碼

看圖的話,就以下圖線程池圖 有那麼多root隊列,因此application啓動的時候就會初始化這些root隊列的_dispatch_root_queues_init函數。

void
_dispatch_root_queues_init(void)
{
    static dispatch_once_t _dispatch_root_queues_pred;
    dispatch_once_f(&_dispatch_root_queues_pred, NULL,
            _dispatch_root_queues_init_once);
}

static void
_dispatch_root_queues_init_once(void *context DISPATCH_UNUSED)
{
    int wq_supported;
    _dispatch_fork_becomes_unsafe();
    if (!_dispatch_root_queues_init_workq(&wq_supported)) {
        size_t i;
        for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
            bool overcommit = true;
            _dispatch_root_queue_init_pthread_pool(
                    &_dispatch_root_queue_contexts[i], 0, overcommit);
        }
        DISPATCH_INTERNAL_CRASH((errno << 16) | wq_supported,
                "Root queue initialization failed");
    }
}

static inline bool
_dispatch_root_queues_init_workq(int *wq_supported)
{
    int r; (void)r;
    bool result = false;
    *wq_supported = 0;
    bool disable_wq = false; (void)disable_wq;
    bool disable_qos = false;
    bool disable_kevent_wq = false;
    if (!disable_wq && !disable_qos) {
        *wq_supported = _pthread_workqueue_supported();
        if (!disable_kevent_wq && (*wq_supported & WORKQ_FEATURE_KEVENT)) {
            r = _pthread_workqueue_init_with_kevent(_dispatch_worker_thread3,
                    (pthread_workqueue_function_kevent_t)
                    _dispatch_kevent_worker_thread,
                    offsetof(struct dispatch_queue_s, dq_serialnum), 0);
            result = !r;
        } 
    }
    return result;
}
複製代碼

來到這裏,已經看到_pthread_workqueue_init_with_kevent函數就是綁定了_dispatch_worker_thread3函數去作一些GCD的線程任務,看到源代碼_pthread_workqueue_init_with_kevent作了些什麼。

int
_pthread_workqueue_init_with_kevent(pthread_workqueue_function2_t queue_func,
        pthread_workqueue_function_kevent_t kevent_func,
        int offset, int flags)
{
    return _pthread_workqueue_init_with_workloop(queue_func, kevent_func, NULL, offset, flags);
}

int
_pthread_workqueue_init_with_workloop(pthread_workqueue_function2_t queue_func,
        pthread_workqueue_function_kevent_t kevent_func,
        pthread_workqueue_function_workloop_t workloop_func,
        int offset, int flags)
{
    if (flags != 0) {
        return ENOTSUP;
    }

    __workq_newapi = true;
    __libdispatch_offset = offset;

    int rv = pthread_workqueue_setdispatch_with_workloop_np(queue_func, kevent_func, workloop_func);
    return rv;
}

static int
pthread_workqueue_setdispatch_with_workloop_np(pthread_workqueue_function2_t queue_func,
        pthread_workqueue_function_kevent_t kevent_func,
        pthread_workqueue_function_workloop_t workloop_func)
{
    int res = EBUSY;
    if (__libdispatch_workerfunction == NULL) {
        // Check whether the kernel supports new SPIs
        res = __workq_kernreturn(WQOPS_QUEUE_NEWSPISUPP, NULL, __libdispatch_offset, kevent_func != NULL ? 0x01 : 0x00);
        if (res == -1){
            res = ENOTSUP;
        } else {
            __libdispatch_workerfunction = queue_func;
            __libdispatch_keventfunction = kevent_func;
            __libdispatch_workloopfunction = workloop_func;

            // Prepare the kernel for workq action
            (void)__workq_open();
            if (__is_threaded == 0) {
                __is_threaded = 1;
            }
        }
    }
    return res;
}
複製代碼

咱們看到了__libdispatch_workerfunction = queue_func;指定了隊列工做函數。而後咱們往回看以前說的咱們製造了一我的爲crash,追溯棧裏看到_pthread_wqthread這個函數。看下這個函數怎麼啓用_dispatch_worker_thread3

// 實際代碼不少,這裏我精簡了下,拿到了__libdispatch_workerfunction對應的_dispatch_worker_thread3,而後直接執行。
void
_pthread_wqthread(pthread_t self, mach_port_t kport, void *stacklowaddr, void *keventlist, int flags, int nkevents)
{
    pthread_workqueue_function_t func = (pthread_workqueue_function_t)__libdispatch_workerfunction;
    int options = overcommit ? WORKQ_ADDTHREADS_OPTION_OVERCOMMIT : 0;
    // 執行函數
    (*func)(thread_class, options, NULL);
    __workq_kernreturn(WQOPS_THREAD_RETURN, NULL, 0, 0);
    _pthread_exit(self, NULL);
}
複製代碼
相關文章
相關標籤/搜索