GCD源码分析3 —— dispatch_queue篇

前言

GCD的队列是GCD源码分析系列中的重点

队列的定义

dispatch_queue_s是一个结构体,定义如下:

struct dispatch_queue_s {
    DISPATCH_STRUCT_HEADER(dispatch_queue_s, dispatch_queue_vtable_s);
    DISPATCH_QUEUE_HEADER;
    char dq_label[DISPATCH_QUEUE_MIN_LABEL_SIZE];   // must be last
};

可以看到结构体里面包含两个宏和一个dq_label

DISPATCH_STRUCT_HEADER

#define DISPATCH_STRUCT_HEADER(x, y)        \
    const struct y *do_vtable;      \   // 这个结构体内包含了这个 dispatch_object_s 的操作函数
    struct x *volatile do_next;     \   // 链表的 next
    unsigned int do_ref_cnt;        \   // 引用计数
    unsigned int do_xref_cnt;       \   // 外部引用计数
    unsigned int do_suspend_cnt;        \   // suspend计数,用作暂停标志,比如延时处理的任务,设置该引用计数之后;在任务到时后,计时器处理将会将该标志位修改,然后唤醒队列调度
    struct dispatch_queue_s *do_targetq;\   // 目标队列,就是当前这个struct x在哪个队列运行
    void *do_ctxt;                      \   // 上下文,我们要传递的参数
    void *do_finalizer

在GCD中,很多结构体的定义基本上都会调用DISPATCH_STRUCT_HEADER,并把结构体的名字作为第一个宏参数x传递进去,而第二个宏参数y通常是dispatch_queue_vtable_s
struct x *volatile do_next,x参数仅仅会影响do_next的指针类型,可以理解为将来要进行链表操作。很显然,在队列的定义中,这行展开为:

struct dispatch_queue_s *volatile do_next;

而第二个参数y也只是会影响到do_vtable指针的类型,这里展开为:

const struct dispatch_queue_vtable_s *do_vtable;

dispatch_queue_vtable_s这个结构体内包含了这个dispatch_object_s

或者其子类的操作函数,而且针对这些操作函数,定义了相对简短的宏,方便调用

unsigned long const do_type;    \                           // 数据的具体类型
const char *const do_kind; \                                // 数据的类型描述字符串
size_t (*const do_debug)(struct x *, char *, size_t);   \   // 用来获取调试时需要的变量信息
struct dispatch_queue_s *(*const do_invoke)(struct x *);\   // 唤醒队列的方法,全局队列和主队列此项为NULL
bool (*const do_probe)(struct x *); \                       // 用于检测传入对象中的一些值是否满足条件
void (*const do_dispose)(struct x *)                        // 销毁队列的方法,通常内部会调用 这个对象的 finalizer 函数

DISPATCH_QUEUE_HEADER

#define DISPATCH_QUEUE_HEADER \
    uint32_t dq_running; \
    uint32_t dq_width; \
    struct dispatch_object_s *dq_items_tail; \
    struct dispatch_object_s *volatile dq_items_head; \
    unsigned long dq_serialnum; \
    void *dq_finalizer_ctxt; \
    dispatch_queue_finalizer_function_t dq_finalizer_func

dq_label

dq_label代表队列的名字,且最长的长度不能超过DISPATCH_QUEUE_MIN_LABEL_SIZE = 64

队列的获取

GCD队列的获取通常有以下几种方式:

1、主队列:dispatch_get_main_queue

#define dispatch_get_main_queue() (&_dispatch_main_q)

可以看到dispatch_get_main_queue实际上是一个宏,它返回的是结构体_dispatch_main_q的地址

struct dispatch_queue_s _dispatch_main_q = {
    .do_vtable = &_dispatch_queue_vtable,
    .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
    .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
    .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
    .do_targetq = &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_COUNT / 2],

    .dq_label = "com.apple.main-thread",
    .dq_running = 1,
    .dq_width = 1,
    .dq_serialnum = 1,
};

do_vtable

看看主队列的函数指针do_vtable的指向:_dispatch_queue_vtable

static const struct dispatch_queue_vtable_s _dispatch_queue_vtable = {
    .do_type = DISPATCH_QUEUE_TYPE,
    .do_kind = "queue",
    .do_dispose = _dispatch_queue_dispose,
    .do_invoke = (void *)dummy_function_r0,
    .do_probe = (void *)dummy_function_r0,
    .do_debug = dispatch_queue_debug,
};

do_ref_cnt && do_xref_cnt

这两个值和GCD对象的内存管理有关,只有两个值同时为0,GCD对象才能被释放,主队列的这两个成员的值都为DISPATCH_OBJECT_GLOBAL_REFCNT:

#define DISPATCH_OBJECT_GLOBAL_REFCNT   (~0u)

void _dispatch_retain(dispatch_object_t dou)
{
    if (dou._do->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) {
        return; // global object
    }
    ...
}

void dispatch_release(dispatch_object_t dou)
{
    typeof(dou._do->do_xref_cnt) oldval;

    if (dou._do->do_xref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) {
        return;
    }
    ...

_dispatch_retaindispatch_release函数中可以看出,主队列的生命周期是伴随着应用的,不会受retain和release的影响

do_targetq

目标队列,通常非全局队列(例如mgr_queue),需要压入到glablalQueue中来处理,因此需要指明target_queue

#define DISPATCH_ROOT_QUEUE_COUNT 6
.do_targetq = &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_COUNT / 2],
.do_targetq = &_dispatch_root_queues[3],

即为”com.apple.root.default-overcommit-priority”这个全局队列

管理队列:_dispatch_mgr_q

注:这个队列是GCD内部使用的,不对外公开

struct dispatch_queue_s _dispatch_mgr_q = {
    .do_vtable = &_dispatch_queue_mgr_vtable,
    .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
    .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
    .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
    .do_targetq = &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_COUNT - 1],

    .dq_label = "com.apple.libdispatch-manager",
    .dq_width = 1,
    .dq_serialnum = 2,
};

do_vtable

看看管理队列的函数指针do_vtable的指向:_dispatch_queue_mgr_vtable

static const struct dispatch_queue_vtable_s _dispatch_queue_mgr_vtable = {
    .do_type = DISPATCH_QUEUE_MGR_TYPE,
    .do_kind = "mgr-queue",
    .do_invoke = _dispatch_mgr_invoke,
    .do_debug = dispatch_queue_debug,
    .do_probe = _dispatch_mgr_wakeup,
};

do_ref_cnt && do_xref_cnt

可以参考主队列

do_targetq

即为”com.apple.root.high-overcommit-priority”这个全局队列

全局队列:dispatch_get_global_queue

enum {
    DISPATCH_QUEUE_PRIORITY_HIGH = 2,
    DISPATCH_QUEUE_PRIORITY_DEFAULT = 0,
    DISPATCH_QUEUE_PRIORITY_LOW = -2,
};

dispatch_queue_t dispatch_get_global_queue(long priority, unsigned long flags)
{
    if (flags & ~DISPATCH_QUEUE_OVERCOMMIT) {
        return NULL;
    }
    return _dispatch_get_root_queue(priority, flags & DISPATCH_QUEUE_OVERCOMMIT);
}

static inline dispatch_queue_t _dispatch_get_root_queue(long priority, bool overcommit)
{
    if (overcommit) switch (priority) {
    case DISPATCH_QUEUE_PRIORITY_LOW:
        return &_dispatch_root_queues[1];
    case DISPATCH_QUEUE_PRIORITY_DEFAULT:
        return &_dispatch_root_queues[3];
    case DISPATCH_QUEUE_PRIORITY_HIGH:
        return &_dispatch_root_queues[5];
    }
    switch (priority) {
    case DISPATCH_QUEUE_PRIORITY_LOW:
        return &_dispatch_root_queues[0];
    case DISPATCH_QUEUE_PRIORITY_DEFAULT:
        return &_dispatch_root_queues[2];
    case DISPATCH_QUEUE_PRIORITY_HIGH:
        return &_dispatch_root_queues[4];
    default:
        return NULL;
    }
}

我们当前分析的libdispatch定义了6个全局队列(最新的版本有8个全局队列)

static struct dispatch_queue_s _dispatch_root_queues[] = {
    {
        .do_vtable = &_dispatch_queue_root_vtable,
        .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
        .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
        .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
        .do_ctxt = &_dispatch_root_queue_contexts[0],

        .dq_label = "com.apple.root.low-priority",
        .dq_running = 2,
        .dq_width = UINT32_MAX,
        .dq_serialnum = 4,
    },
    {
        .do_vtable = &_dispatch_queue_root_vtable,
        .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
        .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
        .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
        .do_ctxt = &_dispatch_root_queue_contexts[1],

        .dq_label = "com.apple.root.low-overcommit-priority",
        .dq_running = 2,
        .dq_width = UINT32_MAX,
        .dq_serialnum = 5,
    },
    {
        .do_vtable = &_dispatch_queue_root_vtable,
        .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
        .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
        .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
        .do_ctxt = &_dispatch_root_queue_contexts[2],

        .dq_label = "com.apple.root.default-priority",
        .dq_running = 2,
        .dq_width = UINT32_MAX,
        .dq_serialnum = 6,
    },
    {
        .do_vtable = &_dispatch_queue_root_vtable,
        .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
        .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
        .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
        .do_ctxt = &_dispatch_root_queue_contexts[3],

        .dq_label = "com.apple.root.default-overcommit-priority",
        .dq_running = 2,
        .dq_width = UINT32_MAX,
        .dq_serialnum = 7,
    },
    {
        .do_vtable = &_dispatch_queue_root_vtable,
        .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
        .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
        .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
        .do_ctxt = &_dispatch_root_queue_contexts[4],

        .dq_label = "com.apple.root.high-priority",
        .dq_running = 2,
        .dq_width = UINT32_MAX,
        .dq_serialnum = 8,
    },
    {
        .do_vtable = &_dispatch_queue_root_vtable,
        .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
        .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
        .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
        .do_ctxt = &_dispatch_root_queue_contexts[5],

        .dq_label = "com.apple.root.high-overcommit-priority",
        .dq_running = 2,
        .dq_width = UINT32_MAX,
        .dq_serialnum = 9,
    },
};

do_vtable

看看全局队列的函数指针do_vtable的指向:_dispatch_queue_root_vtable

static const struct dispatch_queue_vtable_s _dispatch_queue_root_vtable = {
    .do_type = DISPATCH_QUEUE_GLOBAL_TYPE,
    .do_kind = "global-queue",
    .do_debug = dispatch_queue_debug,
    .do_probe = _dispatch_queue_wakeup_global,
};

do_ref_cnt && do_xref_cnt

可以参考主队类

do_ctxt

注意全局队列有一个do_ctxt,它是上下文,是我们要传递的参数

#define MAX_THREAD_COUNT 255

static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
    {
        .dgq_thread_mediator = &_dispatch_thread_mediator[0],
        .dgq_thread_pool_size = MAX_THREAD_COUNT,
    },
    {
        .dgq_thread_mediator = &_dispatch_thread_mediator[1],
        .dgq_thread_pool_size = MAX_THREAD_COUNT,
    },
    {
        .dgq_thread_mediator = &_dispatch_thread_mediator[2],
        .dgq_thread_pool_size = MAX_THREAD_COUNT,
    },
    {
        .dgq_thread_mediator = &_dispatch_thread_mediator[3],
        .dgq_thread_pool_size = MAX_THREAD_COUNT,
    },
    {
        .dgq_thread_mediator = &_dispatch_thread_mediator[4],
        .dgq_thread_pool_size = MAX_THREAD_COUNT,
    },
    {
        .dgq_thread_mediator = &_dispatch_thread_mediator[5],
        .dgq_thread_pool_size = MAX_THREAD_COUNT,
    },
};

其中的_dispatch_thread_mediator定义如下:

static struct dispatch_semaphore_s _dispatch_thread_mediator[] = {
    {
        .do_vtable = &_dispatch_semaphore_vtable,
        .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
        .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
    },
    {
        .do_vtable = &_dispatch_semaphore_vtable,
        .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
        .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
    },
    {
        .do_vtable = &_dispatch_semaphore_vtable,
        .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
        .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
    },
    {
        .do_vtable = &_dispatch_semaphore_vtable,
        .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
        .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
    },
    {
        .do_vtable = &_dispatch_semaphore_vtable,
        .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
        .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
    },
    {
        .do_vtable = &_dispatch_semaphore_vtable,
        .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
        .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
    },
};

自定义队列

dispatch_queue_t dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
{
    dispatch_queue_t dq;
    size_t label_len;

    // 队列名字预处理
    if (!label) {
        label = "";
    }

    label_len = strlen(label);
    if (label_len < (DISPATCH_QUEUE_MIN_LABEL_SIZE - 1)) {
        label_len = (DISPATCH_QUEUE_MIN_LABEL_SIZE - 1);
    }

    // 申请队列内存
    dq = calloc(1ul, sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_MIN_LABEL_SIZE + label_len + 1);
    if (slowpath(!dq)) {
        return dq;
    }
    // 设置自定义队列的基本属性
    _dispatch_queue_init(dq);

    // 设置队列名字
    strcpy(dq->dq_label, label);

#ifndef DISPATCH_NO_LEGACY
    if (slowpath(attr)) {
        // 获取一个全局队列,它有两个参数,分别表示优先级和是否支持 overcommit
        // 带有 overcommit 的队列表示每当有任务提交时,系统都会新开一个线程处理,这样就不会造成某个线程过载
        dq->do_targetq = _dispatch_get_root_queue(attr->qa_priority, attr->qa_flags & DISPATCH_QUEUE_OVERCOMMIT);

        dq->dq_finalizer_ctxt = attr->finalizer_ctxt;
        dq->dq_finalizer_func = attr->finalizer_func;

        // Block特殊处理
#ifdef __BLOCKS__
        if (attr->finalizer_func == (void*)_dispatch_call_block_and_release2) {
            // 如果finalizer_ctxt是一个Block, 需要进行retain.
            dq->dq_finalizer_ctxt = Block_copy(dq->dq_finalizer_ctxt);
            if (!(dq->dq_finalizer_ctxt)) {
                goto out_bad;
            }
        }
#endif
    }
#endif

    return dq;

out_bad:
    free(dq);
    return NULL;
}

_dispatch_queue_init

来看下内联函数_dispatch_queue_init的实现

inline void _dispatch_queue_init(dispatch_queue_t dq)
{
    dq->do_vtable = &_dispatch_queue_vtable;
    dq->do_next = DISPATCH_OBJECT_LISTLESS;
    dq->do_ref_cnt = 1;
    dq->do_xref_cnt = 1;
    dq->do_targetq = _dispatch_get_root_queue(0, true);
    dq->dq_running = 0;
    dq->dq_width = 1;
    dq->dq_serialnum = dispatch_atomic_inc(&_dispatch_queue_serial_numbers) - 1;
}

通过前面的代码可以发现,全局队列的并发数dq_width均为UINT32_MAX,而这里_dispatch_queue_init中的dq_width为1,说明这是一个串行队列的默认设置。
另外dq->do_targetq = _dispatch_get_root_queue(0, true),它涉及到GCD队列与block 的一个重要模型,target_queue。向任何队列中提交的 block,都会被放到它的目标队列中执行,而普通串行队列的目标队列就是一个支持 overcommit 的全局队列,全局队列的底层则是一个线程池。
引用苹果的一张经典图

其他的属性基本上可以参考前面的信息,这里着重理解下dq_serialnum:

static unsigned long _dispatch_queue_serial_numbers = 10;
// skip zero
// 1 - main_q
// 2 - mgr_q
// 3 - _unused_
// 4,5,6,7,8,9 - global queues

可以看到,0被跳过,3未使用,1用于主队列,2用于管理队列,4~9用于全局队列。自定义队列从10开始,每次自定义一个队列时,都会先原子操作_dispatch_queue_serial_numbers变量,然后减1,这样保证了每个自定义队列的dq_serialnum的唯一性。

dispatch_queue_attr_t特殊处理

如果在自定义队列时,传递了attr参数,那么表示支持overcommit,带有overcommit 的队列表示每当有任务提交时,系统都会新开一个线程处理,这样就不会造成某个线程过载。
同时如果finalizer_func == _dispatch_call_block_and_release2需要对dq_finalizer_ctxt进行retain

void _dispatch_call_block_and_release2(void *block, void *ctxt)
{
    void (^b)(void*) = block;
    b(ctxt);
    Block_release(b);
}

常用API解析

dispatch_async

void dispatch_async(dispatch_queue_t dq, void (^work)(void))
{
    dispatch_async_f(dq, _dispatch_Block_copy(work), _dispatch_call_block_and_release);
}

dispatch_async主要将参数进行了处理,然后去调用dispatch_async_f`

  • 1、_dispatch_Block_copy在堆上创建传入block的拷贝,或者增加引用计数,这样就保证了block在执行之前不会被销毁
  • 2 _dispatch_call_block_and_release的定义如下,顾名思义,调用block,然后将block销毁
void _dispatch_call_block_and_release(void *block)
{
    void (^b)(void) = block;
    b();
    Block_release(b);
}

dispatch_async_f

接下来分析一下dispatch_async_f的实现

void dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
{
    dispatch_continuation_t dc = fastpath(_dispatch_continuation_alloc_cacheonly());
    if (!dc) {
        return _dispatch_async_f_slow(dq, ctxt, func);
    }
    dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
    dc->dc_func = func;
    dc->dc_ctxt = ctxt;
    _dispatch_queue_push(dq, dc);
}
  • 1、我们首先来看下_dispatch_continuation_alloc_cacheonly,它的目的就是从线程的TLS(线程的私有存储,线程都是有自己的私有存储的,这些私有存储不会被其他线程所使用)中提取出一个 dispatch_continuation_t 结构
static inline dispatch_continuation_t _dispatch_continuation_alloc_cacheonly(void)
{
    dispatch_continuation_t dc = fastpath(_dispatch_thread_getspecific(dispatch_cache_key));
    if (dc) {
        _dispatch_thread_setspecific(dispatch_cache_key, dc->do_next);
    }
    return dc;
}
  • 2、如果线程中的TLS不存在 dispatch_continuation_t 结构的数据,则走_dispatch_async_f_slow() 函数。
  • 3、如果dc不为空,设置其do_vtable为DISPATCH_OBJ_ASYNC_BIT(主要用于区分类型),把传入的block传给dc的dc_ctxt作为上下文,最后将dc的dc_func设置为_dispatch_call_block_and_release,最后调用_dispatch_queue_push进行入队操作
    DISPATCH_OBJ_ASYNC_BIT是一个宏定义,是为了区分async、group和barrier。
#define DISPATCH_OBJ_ASYNC_BIT  0x1
#define DISPATCH_OBJ_BARRIER_BIT    0x2
#define DISPATCH_OBJ_GROUP_BIT  0x4

继续往下分析_dispatch_async_f_slow

DISPATCH_NOINLINE static void _dispatch_async_f_slow(dispatch_queue_t dq, void *context, dispatch_function_t func)
{
    dispatch_continuation_t dc = fastpath(_dispatch_continuation_alloc_from_heap());

    dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
    dc->dc_func = func;
    dc->dc_ctxt = context;

    // 往dq这个队列中压入了一个续体dc
    _dispatch_queue_push(dq, dc);
}
dispatch_continuation_t _dispatch_continuation_alloc_from_heap(void)
{
    static dispatch_once_t pred;
    dispatch_continuation_t dc;
    dispatch_once_f(&pred, NULL, _dispatch_ccache_init);

    while (!(dc = fastpath(malloc_zone_calloc(_dispatch_ccache_zone, 1, ROUND_UP_TO_CACHELINE_SIZE(sizeof(*dc)))))) {
        sleep(1);
    }
    return dc;
}

从堆上获取dispatch_continuation_t之后,设置dc的成员,跟前面一致。之后同样走到了_dispatch_queue_push函数

_dispatch_queue_push

继续往下分析_dispatch_queue_push

#define _dispatch_queue_push(x, y) _dispatch_queue_push_list((x), (y), (y))

_dispatch_queue_push是一个宏,实际上是调用了_dispatch_queue_push_list

_dispatch_queue_push_list

static inline void
_dispatch_queue_push_list(dispatch_queue_t dq, dispatch_object_t _head, dispatch_object_t _tail)
{
    struct dispatch_object_s *prev, *head = _head._do, *tail = _tail._do;
    tail->do_next = NULL;
    prev = fastpath(dispatch_atomic_xchg(&dq->dq_items_tail, tail));
    if (prev) {
        prev->do_next = head;
    } else {
        _dispatch_queue_push_list_slow(dq, head);
    }
}

_dispatch_queue_push_list 函数是 inline函数,说明这个函数会调用很频繁,inline 通常用在内核中,效率很高,但生成的二进制文件会变大,典型的空间换时间。

  • 1、 如果队列不为空,那么直接将该dc放到队尾,并重定向dq->dq_items_tail,因为队列前面还有任务,所以此时把dc插入到队尾就OK了
    这里解释一下为什么会重定向:

    • 根据dispatch_atomic_xchg的定义“将p设为n并返回p操作之前的值”,dispatch_atomic_xchg(&dq->dq_items_tail, tail)这行代码的含义等同于:dq->dq_items_tail = tail,重定向了队尾指针
    • prev是原先的队尾元素,prev->do_next = head则把tail结点放到了队尾。
  • 2、如果队列为空,则调用_dispatch_queue_push_list_slow:

    void _dispatch_queue_push_list_slow(dispatch_queue_t dq, struct dispatch_object_s *obj)
    {
      _dispatch_retain(dq);
      dq->dq_items_head = obj;
      _dispatch_wakeup(dq);
      _dispatch_release(dq);
    }
    

    _dispatch_queue_push_list_slow直接将dq->dq_items_head设置为dc,然后调用_dispatch_wakeup唤醒这个队列。这里直接执行_dispatch_wakeup的原因是此时队列为空,没有任务在执行,处于休眠状态,所以需要唤醒。

_dispatch_wakeup

接下来分析一下如何唤醒一个队列:这里的dou指队列

dispatch_queue_t _dispatch_wakeup(dispatch_object_t dou)
{
    dispatch_queue_t tq;

    if (slowpath(DISPATCH_OBJECT_SUSPENDED(dou._do))) {
        return NULL;
    }

    // 全局队列的dx_probe指向了_dispatch_queue_wakeup_global,这里走唤醒逻辑
    // 如果唤醒失败,且队尾指针为空,则返回NULL
    if (!dx_probe(dou._do) && !dou._dq->dq_items_tail) {
        return NULL;
    }

    if (!_dispatch_trylock(dou._do)) {
#if DISPATCH_COCOA_COMPAT
        if (dou._dq == &_dispatch_main_q) {
            //传入主队列,会进入到 _dispatch_queue_wakeup_main() 函数中
            _dispatch_queue_wakeup_main();
        }
#endif
        return NULL;
    }

    // 如果既不是全局队列,也不是主队列,则找到该队列的目标队列do_targetq,将续体压入目标队列,继续走_dispatch_queue_push逻辑
    _dispatch_retain(dou._do);
    tq = dou._do->do_targetq;
    _dispatch_queue_push(tq, dou._do);
    return tq;  // libdispatch doesn't need this, but the Instrument DTrace probe does
}
  • 1、如果是主队列,则直接调用_dispatch_queue_wakeup_main
void _dispatch_queue_wakeup_main(void)
{
    kern_return_t kr;

    // dispatch_once_f保证只初始化1次
    dispatch_once_f(&_dispatch_main_q_port_pred, NULL, _dispatch_main_q_port_init);

    // 唤醒主线程(核心逻辑在这里,可惜未开源)
    kr = _dispatch_send_wakeup_main_thread(main_q_port, 0);

    switch (kr) {
    case MACH_SEND_TIMEOUT:
    case MACH_SEND_TIMED_OUT:
    case MACH_SEND_INVALID_DEST:
        break;
    default:
        dispatch_assume_zero(kr);
        break;
    }

    _dispatch_safe_fork = false;
}
  • 2、如果是全局队列,会进入到全局队列的dx_probe指向的函数_dispatch_queue_wakeup_global中:
    在这里面我们就真正的接触到pthread
    ```c
    bool _dispatch_queue_wakeup_global(dispatch_queue_t dq)
    {
    static dispatch_once_t pred;
    struct dispatch_root_queue_context_s *qc = dq->do_ctxt;
    pthread_workitem_handle_t wh;
    unsigned int gen_cnt;
    pthread_t pthr;
    int r, t_count;
if (!dq->dq_items_tail) {
    return false;
}

_dispatch_safe_fork = false;

dispatch_debug_queue(dq, __PRETTY_FUNCTION__);

// 全局队列的检测,初始化和配置环境(只调用1次)
dispatch_once_f(&pred, NULL, _dispatch_root_queues_init);

// 如果队列的dgq_kworkqueue不为空,则从
if (qc->dgq_kworkqueue) {
    if (dispatch_atomic_cmpxchg(&qc->dgq_pending, 0, 1)) {
        _dispatch_debug("requesting new worker thread");

        r = pthread_workqueue_additem_np(qc->dgq_kworkqueue, _dispatch_worker_thread2, dq, &wh, &gen_cnt);
        dispatch_assume_zero(r);
    } else {
        _dispatch_debug("work thread request still pending on global queue: %p", dq);
    }
    goto out;
}

// 发送一个信号量,这是一种线程保活的方法
if (dispatch_semaphore_signal(qc->dgq_thread_mediator)) {
    goto out;
}

// 检测线程池可用的大小,如果还有,则线程池减1
do {
    t_count = qc->dgq_thread_pool_size;
    if (!t_count) {
        _dispatch_debug("The thread pool is full: %p", dq);
        goto out;
    }
} while (!dispatch_atomic_cmpxchg(&qc->dgq_thread_pool_size, t_count, t_count - 1));

// 使用pthread 库创建一个线程,线程的入口是_dispatch_worker_thread
while ((r = pthread_create(&pthr, NULL, _dispatch_worker_thread, dq))) {
    if (r != EAGAIN) {
        dispatch_assume_zero(r);
    }
    sleep(1);
}

// 调用pthread_detach,主线程与子线程分离,子线程结束后,资源自动回收
r = pthread_detach(pthr);
dispatch_assume_zero(r);

out:
return false;
}

>* 1、如果队列上下文中的`dgq_kworkqueue`存在,则调用`pthread_workqueue_additem_np`函数,该函数使用`workq_kernreturn`系统调用,通知`workqueue`增加应当执行的项目。根据该通知,XNU内核基于系统状态判断是否要生成线程,如果是`overcommit`优先级的队列,`workqueue`则始终生成线程,之后线程执行`_dispatch_worker_thread2`函数。

>* 2、反之,如果`dgq_kworkqueue`不存在,则调用`pthread_create`函数直接启动一个线程,执行`_dispatch_worker_thread`函数,但是这个函数中仍然调用到了`_dispatch_worker_thread2`,和第1条殊途同归。

#### _dispatch_worker_thread
```c
void *_dispatch_worker_thread(void *context)
{
    dispatch_queue_t dq = context;
    struct dispatch_root_queue_context_s *qc = dq->do_ctxt;
    sigset_t mask;
    int r;

    // workaround tweaks the kernel workqueue does for us
    r = sigfillset(&mask);
    dispatch_assume_zero(r);
    r = _dispatch_pthread_sigmask(SIG_BLOCK, &mask, NULL);
    dispatch_assume_zero(r);

    do {
        _dispatch_worker_thread2(context);
        // we use 65 seconds in case there are any timers that run once a minute
    } while (dispatch_semaphore_wait(qc->dgq_thread_mediator, dispatch_time(0, 65ull * NSEC_PER_SEC)) == 0);

    dispatch_atomic_inc(&qc->dgq_thread_pool_size);
    if (dq->dq_items_tail) {
        _dispatch_queue_wakeup_global(dq);
    }

    return NULL;
}

函数前面主要是设置新线程的信号掩码,真正的任务调度在_dispatch_worker_thread2里面,而我们也可以看到,这个任务调度结束后,这个线程在等待一个信号量,而等待的信号量就是前面dispatch_queue_wakeup_global里面的信号量,为什么要这样做?这样做的原因是不要频繁开启新线程,如果有一个新线程完成所有任务了,这个线程就要结束了,但这里并不是这样,而是等待一个信号量,大约等待65秒,如果65秒内接收到新的信号量(表示有新的任务),这个线程就会去继续执行加进来的任务,而不是重新开启新线程,65秒后没接收到信号量,则退出这个线程,销毁这个线程

_dispatch_worker_thread2

前面提到的两种方式实际上最终都调用到了_dispatch_worker_thread2函数,可见核心的执行逻辑都在这里,需要格外关注:

void _dispatch_worker_thread2(void *context)
{
    struct dispatch_object_s *item;
    dispatch_queue_t dq = context;
    struct dispatch_root_queue_context_s *qc = dq->do_ctxt;

    if (_dispatch_thread_getspecific(dispatch_queue_key)) {
        DISPATCH_CRASH("Premature thread recycling");
    }

    // 把dq设置为刚启动的这个线程的TSD
    _dispatch_thread_setspecific(dispatch_queue_key, dq);
    qc->dgq_pending = 0;


    // _dispatch_queue_concurrent_drain_one用来取出队列的一个内容
    while ((item = fastpath(_dispatch_queue_concurrent_drain_one(dq)))) {
        // 用来对取出的内容进行处理(如果是任务,则执行任务)
        _dispatch_continuation_pop(item);
    }

    _dispatch_thread_setspecific(dispatch_queue_key, NULL);

    _dispatch_force_cache_cleanup();
}

这个函数里面进行任务的调度,两个函数很重要,

  • 1、一个是_dispatch_queue_concurrent_drain_one,用来取出队列的一个内容;
  • 2、另一个是_dispatch_continuation_pop函数,用来对取出的内容进行处理;

_dispatch_queue_concurrent_drain_one

先来分析下_dispatch_queue_concurrent_drain_one

struct dispatch_object_s *
_dispatch_queue_concurrent_drain_one(dispatch_queue_t dq)
{
    struct dispatch_object_s *head, *next, *const mediator = (void *)~0ul;

    // The mediator value acts both as a "lock" and a signal
    head = dispatch_atomic_xchg(&dq->dq_items_head, mediator);

    if (slowpath(head == NULL)) {
        // 如果队列是空的,就返回NUL
        dispatch_atomic_cmpxchg(&dq->dq_items_head, mediator, NULL);
        _dispatch_debug("no work on global work queue");
        return NULL;
    }


    if (slowpath(head == mediator)) {
        // 该线程在现线程竞争中失去了对队列的拥有权,这意味着libdispatch的效率很糟糕,
        // 这种情况意味着在线程池中有太多的线程,这个时候应该创建一个pengding线程,
        // 然后退出该线程,内核会在负载减弱的时候创建一个新的线程

        _dispatch_queue_wakeup_global(dq);
        return NULL;
    }

    // 在返回之前将head指针的do_next保存下来,如果next为NULL,这意味着item是最后一个
    next = fastpath(head->do_next);

    if (slowpath(!next)) {
        dq->dq_items_head = NULL;

        if (dispatch_atomic_cmpxchg(&dq->dq_items_tail, head, NULL)) {
            // head 和 tail头尾指针均为空
            goto out;
        }

        // 此时一定有item,该线程不会等待太久.
        while (!(next = head->do_next)) {
            _dispatch_hardware_pause();
        }
    }

        // 继续调度
    dq->dq_items_head = next;
    _dispatch_queue_wakeup_global(dq);
out:
    // 返回队列的头指针
    return head;
}

_dispatch_continuation_pop

接下来分析一下_dispatch_continuation_pop

static inline void _dispatch_continuation_pop(dispatch_object_t dou)
{
    dispatch_continuation_t dc = dou._dc;
    dispatch_group_t dg;

    // 首先检测传进来的内容是不是队列,如果是,就进入 _dispatch_queue_invoke 处理队列
    if (DISPATCH_OBJ_IS_VTABLE(dou._do)) {
        return _dispatch_queue_invoke(dou._dq);
    }

    // Add the item back to the cache before calling the function. This
    // allows the 'hot' continuation to be used for a quick callback.
    //
    // The ccache version is per-thread.
    // Therefore, the object has not been reused yet.
    // This generates better assembly.


    if ((long)dou._do->do_vtable & DISPATCH_OBJ_ASYNC_BIT) {
        _dispatch_continuation_free(dc);
    }

    // 判断是否是group
    if ((long)dou._do->do_vtable & DISPATCH_OBJ_GROUP_BIT) {
        dg = dc->dc_group;
    } else {
        dg = NULL;
    }

    // 否则这个形参就是任务封装的 dispatch_continuation_t 结构体,直接执行任务。
    dc->dc_func(dc->dc_ctxt);

    // 如果是group,需要进行调用dispatch_group_leave,释放信号
    if (dg) {
        dispatch_group_leave(dg);
        _dispatch_release(dg);
    }
}

从上面的函数中可以发现,压入队列的不仅是续体任务,还有可能是队列。如果是队列,直接执行了_dispatch_queue_invoke,否则执行dc->dc_func(dc->dc_ctxt)

接下来分析一下_dispatch_queue_invoke的执行过程,即pop出来的队列是如何被执行的

DISPATCH_NOINLINE void _dispatch_queue_invoke(dispatch_queue_t dq)
{
    dispatch_queue_t tq = dq->do_targetq;

    if (!slowpath(DISPATCH_OBJECT_SUSPENDED(dq)) && fastpath(_dispatch_queue_trylock(dq))) {
        _dispatch_queue_drain(dq);
        if (tq == dq->do_targetq) {
            tq = dx_invoke(dq);
        } else {
            tq = dq->do_targetq;
        }

        // 本队列的dq_running减一,因为任务要么被直接执行了,要么被压到target队列了
        dispatch_atomic_dec(&dq->dq_running);

        // 如果tq != dq->do_targetq,进行入队操作
        if (tq) {
            return _dispatch_queue_push(tq, dq);
        }
    }

    dq->do_next = DISPATCH_OBJECT_LISTLESS;
    if (dispatch_atomic_sub(&dq->do_suspend_cnt, DISPATCH_OBJECT_SUSPEND_LOCK) == 0) {
        // 如果队列处于空闲状态,需要唤醒
        if (dq->dq_running == 0) {
            _dispatch_wakeup(dq); 
        }
    }

    // 释放队列
    _dispatch_release(dq);  // added when the queue is put on the list
}

如果是直接触发,即直接调用dx_invoke,那么会返回NULL

流程整理

dispatch_async 的实现比较复杂,主要是因为其中的数据结构较多,分支流程控制比较复杂。但思路其实很简单,用链表保存所有提交的 block,然后在底层线程池中,依次取出 block 并执行,具体的函数调用流程如下图:

dispatch_async
└──_dispatch_async_f_slow
    └──_dispatch_queue_push
        └──_dispatch_queue_push_list
            └──_dispatch_queue_push_list_slow
                └──_dispatch_wakeup
                    └──_dispatch_queue_wakeup_main
                        └──_dispatch_send_wakeup_main_thread
                    └──_dispatch_queue_wakeup_global
                        └──pthread_workqueue_additem_np
                            └──_dispatch_worker_thread2
                        └──pthread_create
                            └──_dispatch_worker_thread
                                └──_dispatch_worker_thread2
                                    └──_dispatch_queue_concurrent_drain_one
                                    └──_dispatch_continuation_pop
                                        └──_dispatch_queue_invoke(queue)
                                        └──dc->dc_func(dc->dc_ctxt)(continuation);

dispatch_sync

说完了dispatch_async,再来看下dispatch_sync

void dispatch_sync(dispatch_queue_t dq, void (^work)(void))
{
#if DISPATCH_COCOA_COMPAT
    if (slowpath(dq == &_dispatch_main_q)) {
        return _dispatch_sync_slow(dq, work);
    }
#endif
    struct Block_basic *bb = (void *)work;
    dispatch_sync_f(dq, work, (dispatch_function_t)bb->Block_invoke);
}
  • 1、 如果是主队列,则调用_dispatch_sync_slow,可以看到,这个方法最终还是调用了dispatch_sync_f
static void _dispatch_sync_slow(dispatch_queue_t dq, void (^work)(void))
{
    struct Block_basic *bb = (void *)work;
    dispatch_sync_f(dq, work, (dispatch_function_t)bb->Block_invoke);
}
  • 2、否则,调用dispatch_sync_f:
void dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
{
    typeof(dq->dq_running) prev_cnt;
    dispatch_queue_t old_dq;

    if (dq->dq_width == 1) {
        // 向一个串行队列中压进一个同步任务
        return dispatch_barrier_sync_f(dq, ctxt, func);
    }

    // 向一个并发队列中压进一个同步任务
    if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))){    
        // 如果并发队列中存在其他任务或者队列被挂起,则直接进入_dispatch_sync_f_slow 
        // 函数,等待这个队列中的其他任务完成(信号量的方式),然后执行这个任务
        _dispatch_sync_f_slow(dq);
    }
    else{            
        prev_cnt = dispatch_atomic_add(&dq->dq_running, 2) - 2;

        if (slowpath(prev_cnt & 1)) {

            if (dispatch_atomic_sub(&dq->dq_running, 2) == 0) {
                // 队列已经为空,也没有正在执行的任务,需要唤醒队列
                _dispatch_wakeup(dq);
            }            
            // 队列已经为空,但是有正在执行的任务
            _dispatch_sync_f_slow(dq);
        }
    }

    old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
    _dispatch_thread_setspecific(dispatch_queue_key, dq);
    func(ctxt);
    _dispatch_workitem_inc();
    _dispatch_thread_setspecific(dispatch_queue_key, old_dq);

    if (slowpath(dispatch_atomic_sub(&dq->dq_running, 2) == 0)) {
        _dispatch_wakeup(dq);
    }
}
  • 3、如果是向一个串行队列压入同步任务,则调用dispatch_barrier_sync_f:
void dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
{
    // 保存当前线程的TSD:key为dispatch_queue_key
    dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key);

    // 1) ensure that this thread hasn't enqueued anything ahead of this call
    // 2) the queue is not suspended
    // 3) the queue is not weird
    if (slowpath(dq->dq_items_tail)
            || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))
            || slowpath(!_dispatch_queue_trylock(dq))) {
        return _dispatch_barrier_sync_f_slow(dq, ctxt, func);
    }

    // 核心逻辑,将当前线程的目标dispatch_queue_key设置为dq,然后执行block,之后再恢复之前的old_dq
    _dispatch_thread_setspecific(dispatch_queue_key, dq);
    func(ctxt);
    _dispatch_workitem_inc();
    _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
    _dispatch_queue_unlock(dq);
}
  • 4、如果向一个并发队列中压入同步任务,如果队列不为空,或者挂起,或者有正在执行的任务,则调用_dispatch_sync_f_slow,进行信号等待,否则直接调用_dispatch_wakeup唤醒队列执行任务
static void _dispatch_sync_f_slow(dispatch_queue_t dq)
{
    // the global root queues do not need strict ordering
    if (dq->do_targetq == NULL) {
        dispatch_atomic_add(&dq->dq_running, 2);
        return;
    }

    struct dispatch_sync_slow_s {
        DISPATCH_CONTINUATION_HEADER(dispatch_sync_slow_s);
    } dss = {
        .do_vtable = NULL,
        .dc_func = _dispatch_sync_f_slow2,
        .dc_ctxt = _dispatch_get_thread_semaphore(),
    };

    // XXX FIXME -- concurrent queues can be come serial again
    _dispatch_queue_push(dq, (void *)&dss);

    // 信号等待保证同步
    dispatch_semaphore_wait(dss.dc_ctxt, DISPATCH_TIME_FOREVER);
    _dispatch_put_thread_semaphore(dss.dc_ctxt);
}

流程整理

dispatch_sync同步方法的实现相对来说更简单,只需要将任务压入响应的队列,并用信号量做等待,具体调用栈如下:

dispatch_sync
└──_dispatch_sync_slow
    └──dispatch_sync_f
        └──dispatch_barrier_sync_f(串行队列压入同步任务)
        └──_dispatch_sync_f_slow(并发队列中压进一个同步任务)

总结

队列的内容比较多,而且比较复杂,由于本人能力有限,难免有些地方理解不够到位,写的不够清晰,请多多指教。

-------------本文结束 感谢您的阅读-------------

本文标题:GCD源码分析3 —— dispatch_queue篇

文章作者:lingyun

发布时间:2018年02月03日 - 00:02

最后更新:2018年02月10日 - 12:02

原始链接:https://tsuijunxi.github.io/2018/02/03/GCD源码分析3 —— dispatch-queue篇/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。