GCD源码分析4 —— dispatch_group篇

前言

这篇文章主要分析一下dispatch_group的底层实现,主要是包括几个重要的函数:

  • dispatch_group_create
  • dispatch_group_enter
  • dispatch_group_leave
  • dispatch_group_wait
  • dispatch_group_notify
  • dispatch_group_async

相关API解析

dispatch_group_create

关于这个问题实际上在开篇中提过一次,当时主要是为了分析dispatch_group_s的本质,而且得知dispatch_group_t本质上是一个value为LONG_MAX的semaphore,那么可想而知,基本上对dispatch_group_t的分析,信号量肯定脱不了干系

dispatch_group_t
dispatch_group_create(void){
    return (dispatch_group_t)dispatch_semaphore_create(LONG_MAX);
}

关于信号量的底层实现,会有专门的一篇来进行剖析

dispatch_group_enter

void dispatch_group_enter(dispatch_group_t dg){
    dispatch_semaphore_t dsema = (dispatch_semaphore_t)dg;
    dispatch_semaphore_wait(dsema, DISPATCH_TIME_FOREVER);
}

可以看到,首先先把dispatch_group_t转换为dispatch_semaphore_t,然后调用dispatch_semaphore_wait

long dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout){
        // 如果信号量的值-1之后大于等于0,表示有资源可用
    if (dispatch_atomic_dec(&dsema->dsema_value) >= 0) {
        return 0;
    }
    return _dispatch_semaphore_wait_slow(dsema, timeout);
}

该函数先把信号量的信号值原子减1,如果大于等于0,则表示有资源可用,直接返回;否则调用_dispatch_semaphore_wait_slow,等到FIFO队列中信号量的到来,直到timeout为止。关于_dispatch_semaphore_wait_slow的具体分析请参考dispatch_semaphore篇

调用栈:

dispatch_group_enter
└──dispatch_semaphore_wait
    └──_dispatch_semaphore_wait_slow

dispatch_group_leave

void dispatch_group_leave(dispatch_group_t dg){
    dispatch_semaphore_t dsema = (dispatch_semaphore_t)dg;
    dispatch_semaphore_signal(dsema);
    if (dsema->dsema_value == dsema->dsema_orig) {
        _dispatch_group_wake(dsema);
    }
}

表明任务组group中一个block任务已经完成,必要时发出唤醒信号。在dispatch_group_leave中,将group的信号量加1,然后进入判断:如果此时信号值和group信号量的初始值相等,则表明任务完成,进入唤醒流程_dispatch_group_wake.

long _dispatch_group_wake(dispatch_semaphore_t dsema){

    struct dispatch_sema_notify_s *tmp;
    struct dispatch_sema_notify_s *head = dispatch_atomic_xchg(&dsema->dsema_notify_head, NULL);
    long rval = dispatch_atomic_xchg(&dsema->dsema_group_waiters, 0);
    bool do_rel = head;
    long kr;

    // wake any "group" waiter or notify blocks

    if (rval) {
        _dispatch_semaphore_create_port(&dsema->dsema_waiter_port);
        do {
            kr = semaphore_signal(dsema->dsema_waiter_port);
            DISPATCH_SEMAPHORE_VERIFY_KR(kr);
        } while (--rval);
    }
    while (head) {
        dispatch_async_f(head->dsn_queue, head->dsn_ctxt, head->dsn_func);
        _dispatch_release(head->dsn_queue);
        do {
            tmp = head->dsn_next;
        } while (!tmp && !dispatch_atomic_cmpxchg(&dsema->dsema_notify_tail, head, NULL));
        free(head);
        head = tmp;
    }
    if (do_rel) {
        _dispatch_release(dsema);
    }
    return 0;
}

在_dispatch_group_wake方法中主要做了两件事:

  • 一是通过semaphore_signal唤醒信号量,
  • 二是依次调用dispatch_async_f

调用栈:

dispatch_group_leave
└──_dispatch_group_wake
    └──semaphore_signal
    └──dispatch_async_f

dispatch_group_wait

long dispatch_group_wait(dispatch_group_t dg, dispatch_time_t timeout)
{
    dispatch_semaphore_t dsema = (dispatch_semaphore_t)dg;

    if (dsema->dsema_value == dsema->dsema_orig) {
        return 0;
    }
    if (timeout == 0) {
        return KERN_OPERATION_TIMED_OUT;
    }
    return _dispatch_group_wait_slow(dsema, timeout);
}
DISPATCH_NOINLINE
static long _dispatch_group_wait_slow(dispatch_semaphore_t dsema, dispatch_time_t timeout)
{
    mach_timespec_t _timeout;
    kern_return_t kr;
    uint64_t nsec;
    long orig;

again:
    // check before we cause another signal to be sent by incrementing dsema->dsema_group_waiters
    if (dsema->dsema_value == dsema->dsema_orig) {
        return _dispatch_group_wake(dsema);
    }
    // Mach semaphores appear to sometimes spuriously wake up.  Therefore,
    // we keep a parallel count of the number of times a Mach semaphore is
    // signaled.
    dispatch_atomic_inc(&dsema->dsema_group_waiters);
    // check the values again in case we need to wake any threads
    if (dsema->dsema_value == dsema->dsema_orig) {
        return _dispatch_group_wake(dsema);
    }

    _dispatch_semaphore_create_port(&dsema->dsema_waiter_port);

    // From xnu/osfmk/kern/sync_sema.c:
    // wait_semaphore->count = -1;  /* we don't keep an actual count */
    //
    // The code above does not match the documentation, and that fact is
    // not surprising. The documented semantics are clumsy to use in any
    // practical way. The above hack effectively tricks the rest of the
    // Mach semaphore logic to behave like the libdispatch algorithm.

    switch (timeout) {
    default:
        do {
            nsec = _dispatch_timeout(timeout);
            _timeout.tv_sec = (typeof(_timeout.tv_sec))(nsec / NSEC_PER_SEC);
            _timeout.tv_nsec = (typeof(_timeout.tv_nsec))(nsec % NSEC_PER_SEC);
            kr = slowpath(semaphore_timedwait(dsema->dsema_waiter_port, _timeout));
        } while (kr == KERN_ABORTED);
        if (kr != KERN_OPERATION_TIMED_OUT) {
            DISPATCH_SEMAPHORE_VERIFY_KR(kr);
            break;
        }
    // Fall through and try to undo the earlier change to dsema->dsema_group_waiters
    case DISPATCH_TIME_NOW:
        while ((orig = dsema->dsema_group_waiters)) {
            if (dispatch_atomic_cmpxchg(&dsema->dsema_group_waiters, orig, orig - 1)) {
                return KERN_OPERATION_TIMED_OUT;
            }
        }
    // Another thread called semaphore_signal().
    // Fall through and drain the wakeup.
    case DISPATCH_TIME_FOREVER:
        do {
            kr = semaphore_wait(dsema->dsema_waiter_port);
        } while (kr == KERN_ABORTED);
        DISPATCH_SEMAPHORE_VERIFY_KR(kr);
        break;
    }

    goto again;
}

调用栈:

dispatch_group_wait
└──_dispatch_group_wait_slow
    └──semaphore_timedwait
    └──semaphore_wait

dispatch_group_notify

void dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq, dispatch_block_t db){
    dispatch_group_notify_f(dg, dq, _dispatch_Block_copy(db), _dispatch_call_block_and_release);
}
void dispatch_group_notify_f(dispatch_group_t dg, dispatch_queue_t dq, void *ctxt, void (*func)(void *)){
    dispatch_semaphore_t dsema = (dispatch_semaphore_t)dg;
    struct dispatch_sema_notify_s *dsn, *prev;

    // FIXME -- this should be updated to use the continuation cache
    while (!(dsn = malloc(sizeof(*dsn)))) {
        sleep(1);
    }

    dsn->dsn_next = NULL;
    dsn->dsn_queue = dq;
    dsn->dsn_ctxt = ctxt;
    dsn->dsn_func = func;
    _dispatch_retain(dq);

    prev = dispatch_atomic_xchg(&dsema->dsema_notify_tail, dsn);
    if (fastpath(prev)) {
        prev->dsn_next = dsn;
    } else {
        _dispatch_retain(dg);
        dsema->dsema_notify_head = dsn;
        if (dsema->dsema_value == dsema->dsema_orig) {
            _dispatch_group_wake(dsema);
        }
    }
}

调用栈:

dispatch_group_notify
└──dispatch_group_notify_f
    └──_dispatch_group_wake

dispatch_group_async

先通过如下伪代码直观上理解一下dispatch_group_async的执行流程

/// 伪代码,理解用
void dispatch_group_async(dispatch_group_t group, dispatch_queue_t queue, dispatch_block_t block)
{
    dispatch_retain(group);
    dispatch_group_enter(group);
    dispatch_async(queue, ^{
        block();
        dispatch_group_leave(group);
        dispatch_release(group);
    });
}

实际源码:

void dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq, dispatch_block_t db){
    dispatch_group_async_f(dg, dq, _dispatch_Block_copy(db), _dispatch_call_block_and_release);
}
#endif

void dispatch_group_async_f(dispatch_group_t dg, dispatch_queue_t dq, void *ctxt, void (*func)(void *)){

    dispatch_continuation_t dc;

    _dispatch_retain(dg);
    dispatch_group_enter(dg);

    dc = _dispatch_continuation_alloc_cacheonly() ?: _dispatch_continuation_alloc_from_heap();

    dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT|DISPATCH_OBJ_GROUP_BIT);
    dc->dc_func = func;
    dc->dc_ctxt = ctxt;
    dc->dc_group = dg;

    _dispatch_queue_push(dq, dc);
}

可以看到,dispatch_group_async是对dispatch_group_async_f的一层封装。

而在dispatch_group_async_f函数中主要执行了以下操作:

  • 调用dispatch_group_enter,进行信号量引用计数管理;
  • 新建了一个dispatch_continuation_t,记录block任务所属的任务组,相关上下文,调用方法等信息;
  • 调用_dispatch_queue_push将dispatch_continuation_t加入队列

大家还记得dispatch_async_f的实现吗?

void dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
{
    dispatch_continuation_t dc = fastpath(_dispatch_continuation_alloc_cacheonly());
    if (!dc) {
        return _dispatch_async_f_slow(dq, ctxt, func);
    }
    dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
    dc->dc_func = func;
    dc->dc_ctxt = ctxt;
    _dispatch_queue_push(dq, dc);
}

可以发现,dispatch_group_async_fdispatch_async_f的实现高度一致,主要的不同在于dispatch_group_async_f在前面调用了dispatch_group_enter方法:

到了这里,大家只看到了伪代码的前半部分,但是后半部分在哪里对应呢?
实际上是对应于_dispatch_continuation_pop里面的操作,注意最后几行代码(这里不会对_dispatch_continuation_pop作详细分析,只是为了了解)

static inline void _dispatch_continuation_pop(dispatch_object_t dou)
{
    dispatch_continuation_t dc = dou._dc;
    dispatch_group_t dg;

    // 首先检测传进来的内容是不是队列,如果是队列,就进入 _dispatch_queue_invoke 处理队列
    if (DISPATCH_OBJ_IS_VTABLE(dou._do)) {
        return _dispatch_queue_invoke(dou._dq);
    }

    // Add the item back to the cache before calling the function. This
    // allows the 'hot' continuation to be used for a quick callback.
    //
    // The ccache version is per-thread.
    // Therefore, the object has not been reused yet.
    // This generates better assembly.

    // 否则这个形参就是任务封装的 dispatch_continuation_t 结构体,直接执行任务。
    if ((long)dou._do->do_vtable & DISPATCH_OBJ_ASYNC_BIT) {
        _dispatch_continuation_free(dc);
    }
    if ((long)dou._do->do_vtable & DISPATCH_OBJ_GROUP_BIT) {
        dg = dc->dc_group;
    } else {
        dg = NULL;
    }
    dc->dc_func(dc->dc_ctxt);
    if (dg) {
        dispatch_group_leave(dg);
        _dispatch_release(dg);
    }
}

实际上这个函数,包括里面的_dispatch_queue_push_dispatch_continuation_pop都已经在dispatch_queue篇中分析到了,这里不再重复继续分析下去了。

总结

group篇调用到的很多函数会和queue篇交叉,一些重复的问题可以参考queue篇的分析。

-------------本文结束 感谢您的阅读-------------

本文标题:GCD源码分析4 —— dispatch_group篇

文章作者:lingyun

发布时间:2018年02月06日 - 12:02

最后更新:2018年02月10日 - 12:02

原始链接:https://tsuijunxi.github.io/2018/02/06/GCD源码分析4 —— dispatch-group篇/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。