GCD源码分析4 —— dispatch_group篇

前言

这篇文章主要分析一下dispatch_group的底层实现,主要是包括几个重要的函数:

  • dispatch_group_create
  • dispatch_group_enter
  • dispatch_group_leave
  • dispatch_group_wait
  • dispatch_group_notify
  • dispatch_group_async

相关API解析

dispatch_group_create

关于这个问题实际上在开篇中提过一次,当时主要是为了分析dispatch_group_s的本质,而且得知dispatch_group_t本质上是一个value为LONG_MAX的semaphore,那么可想而知,基本上对dispatch_group_t的分析,信号量肯定脱不了干系

1
2
3
4
dispatch_group_t
dispatch_group_create(void){
return (dispatch_group_t)dispatch_semaphore_create(LONG_MAX);
}

关于信号量的底层实现,会有专门的一篇来进行剖析

dispatch_group_enter

1
2
3
4
void dispatch_group_enter(dispatch_group_t dg){
dispatch_semaphore_t dsema = (dispatch_semaphore_t)dg;
dispatch_semaphore_wait(dsema, DISPATCH_TIME_FOREVER);
}

可以看到,首先先把dispatch_group_t转换为dispatch_semaphore_t,然后调用dispatch_semaphore_wait

1
2
3
4
5
6
7
long dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout){
// 如果信号量的值-1之后大于等于0,表示有资源可用
if (dispatch_atomic_dec(&dsema->dsema_value) >= 0) {
return 0;
}
return _dispatch_semaphore_wait_slow(dsema, timeout);
}

该函数先把信号量的信号值原子减1,如果大于等于0,则表示有资源可用,直接返回;否则调用_dispatch_semaphore_wait_slow,等到FIFO队列中信号量的到来,直到timeout为止。关于_dispatch_semaphore_wait_slow的具体分析请参考dispatch_semaphore篇

调用栈:

1
2
3
dispatch_group_enter
└──dispatch_semaphore_wait
└──_dispatch_semaphore_wait_slow

dispatch_group_leave

1
2
3
4
5
6
7
void dispatch_group_leave(dispatch_group_t dg){
dispatch_semaphore_t dsema = (dispatch_semaphore_t)dg;
dispatch_semaphore_signal(dsema);
if (dsema->dsema_value == dsema->dsema_orig) {
_dispatch_group_wake(dsema);
}
}

表明任务组group中一个block任务已经完成,必要时发出唤醒信号。在dispatch_group_leave中,将group的信号量加1,然后进入判断:如果此时信号值和group信号量的初始值相等,则表明任务完成,进入唤醒流程_dispatch_group_wake.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
long _dispatch_group_wake(dispatch_semaphore_t dsema){

struct dispatch_sema_notify_s *tmp;
struct dispatch_sema_notify_s *head = dispatch_atomic_xchg(&dsema->dsema_notify_head, NULL);
long rval = dispatch_atomic_xchg(&dsema->dsema_group_waiters, 0);
bool do_rel = head;
long kr;

// wake any "group" waiter or notify blocks

if (rval) {
_dispatch_semaphore_create_port(&dsema->dsema_waiter_port);
do {
kr = semaphore_signal(dsema->dsema_waiter_port);
DISPATCH_SEMAPHORE_VERIFY_KR(kr);
} while (--rval);
}
while (head) {
dispatch_async_f(head->dsn_queue, head->dsn_ctxt, head->dsn_func);
_dispatch_release(head->dsn_queue);
do {
tmp = head->dsn_next;
} while (!tmp && !dispatch_atomic_cmpxchg(&dsema->dsema_notify_tail, head, NULL));
free(head);
head = tmp;
}
if (do_rel) {
_dispatch_release(dsema);
}
return 0;
}

在_dispatch_group_wake方法中主要做了两件事:

  • 一是通过semaphore_signal唤醒信号量,
  • 二是依次调用dispatch_async_f

调用栈:

1
2
3
4
dispatch_group_leave
└──_dispatch_group_wake
└──semaphore_signal
└──dispatch_async_f

dispatch_group_wait

1
2
3
4
5
6
7
8
9
10
11
12
long dispatch_group_wait(dispatch_group_t dg, dispatch_time_t timeout)
{
dispatch_semaphore_t dsema = (dispatch_semaphore_t)dg;

if (dsema->dsema_value == dsema->dsema_orig) {
return 0;
}
if (timeout == 0) {
return KERN_OPERATION_TIMED_OUT;
}
return _dispatch_group_wait_slow(dsema, timeout);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
DISPATCH_NOINLINE
static long _dispatch_group_wait_slow(dispatch_semaphore_t dsema, dispatch_time_t timeout)
{
mach_timespec_t _timeout;
kern_return_t kr;
uint64_t nsec;
long orig;

again:
// check before we cause another signal to be sent by incrementing dsema->dsema_group_waiters
if (dsema->dsema_value == dsema->dsema_orig) {
return _dispatch_group_wake(dsema);
}
// Mach semaphores appear to sometimes spuriously wake up. Therefore,
// we keep a parallel count of the number of times a Mach semaphore is
// signaled.
dispatch_atomic_inc(&dsema->dsema_group_waiters);
// check the values again in case we need to wake any threads
if (dsema->dsema_value == dsema->dsema_orig) {
return _dispatch_group_wake(dsema);
}

_dispatch_semaphore_create_port(&dsema->dsema_waiter_port);

// From xnu/osfmk/kern/sync_sema.c:
// wait_semaphore->count = -1; /* we don't keep an actual count */
//
// The code above does not match the documentation, and that fact is
// not surprising. The documented semantics are clumsy to use in any
// practical way. The above hack effectively tricks the rest of the
// Mach semaphore logic to behave like the libdispatch algorithm.

switch (timeout) {
default:
do {
nsec = _dispatch_timeout(timeout);
_timeout.tv_sec = (typeof(_timeout.tv_sec))(nsec / NSEC_PER_SEC);
_timeout.tv_nsec = (typeof(_timeout.tv_nsec))(nsec % NSEC_PER_SEC);
kr = slowpath(semaphore_timedwait(dsema->dsema_waiter_port, _timeout));
} while (kr == KERN_ABORTED);
if (kr != KERN_OPERATION_TIMED_OUT) {
DISPATCH_SEMAPHORE_VERIFY_KR(kr);
break;
}
// Fall through and try to undo the earlier change to dsema->dsema_group_waiters
case DISPATCH_TIME_NOW:
while ((orig = dsema->dsema_group_waiters)) {
if (dispatch_atomic_cmpxchg(&dsema->dsema_group_waiters, orig, orig - 1)) {
return KERN_OPERATION_TIMED_OUT;
}
}
// Another thread called semaphore_signal().
// Fall through and drain the wakeup.
case DISPATCH_TIME_FOREVER:
do {
kr = semaphore_wait(dsema->dsema_waiter_port);
} while (kr == KERN_ABORTED);
DISPATCH_SEMAPHORE_VERIFY_KR(kr);
break;
}

goto again;
}

调用栈:

1
2
3
4
dispatch_group_wait
└──_dispatch_group_wait_slow
└──semaphore_timedwait
└──semaphore_wait

dispatch_group_notify

1
2
3
void dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq, dispatch_block_t db){
dispatch_group_notify_f(dg, dq, _dispatch_Block_copy(db), _dispatch_call_block_and_release);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void dispatch_group_notify_f(dispatch_group_t dg, dispatch_queue_t dq, void *ctxt, void (*func)(void *)){
dispatch_semaphore_t dsema = (dispatch_semaphore_t)dg;
struct dispatch_sema_notify_s *dsn, *prev;

// FIXME -- this should be updated to use the continuation cache
while (!(dsn = malloc(sizeof(*dsn)))) {
sleep(1);
}

dsn->dsn_next = NULL;
dsn->dsn_queue = dq;
dsn->dsn_ctxt = ctxt;
dsn->dsn_func = func;
_dispatch_retain(dq);

prev = dispatch_atomic_xchg(&dsema->dsema_notify_tail, dsn);
if (fastpath(prev)) {
prev->dsn_next = dsn;
} else {
_dispatch_retain(dg);
dsema->dsema_notify_head = dsn;
if (dsema->dsema_value == dsema->dsema_orig) {
_dispatch_group_wake(dsema);
}
}
}

调用栈:

1
2
3
dispatch_group_notify
└──dispatch_group_notify_f
└──_dispatch_group_wake

dispatch_group_async

先通过如下伪代码直观上理解一下dispatch_group_async的执行流程

1
2
3
4
5
6
7
8
9
10
11
/// 伪代码,理解用
void dispatch_group_async(dispatch_group_t group, dispatch_queue_t queue, dispatch_block_t block)
{
dispatch_retain(group);
dispatch_group_enter(group);
dispatch_async(queue, ^{
block();
dispatch_group_leave(group);
dispatch_release(group);
});
}

实际源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq, dispatch_block_t db){
dispatch_group_async_f(dg, dq, _dispatch_Block_copy(db), _dispatch_call_block_and_release);
}
#endif

void dispatch_group_async_f(dispatch_group_t dg, dispatch_queue_t dq, void *ctxt, void (*func)(void *)){

dispatch_continuation_t dc;

_dispatch_retain(dg);
dispatch_group_enter(dg);

dc = _dispatch_continuation_alloc_cacheonly() ?: _dispatch_continuation_alloc_from_heap();

dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT|DISPATCH_OBJ_GROUP_BIT);
dc->dc_func = func;
dc->dc_ctxt = ctxt;
dc->dc_group = dg;

_dispatch_queue_push(dq, dc);
}

可以看到,dispatch_group_async是对dispatch_group_async_f的一层封装。

而在dispatch_group_async_f函数中主要执行了以下操作:

  • 调用dispatch_group_enter,进行信号量引用计数管理;
  • 新建了一个dispatch_continuation_t,记录block任务所属的任务组,相关上下文,调用方法等信息;
  • 调用_dispatch_queue_push将dispatch_continuation_t加入队列

大家还记得dispatch_async_f的实现吗?

1
2
3
4
5
6
7
8
9
10
11
void dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
{
dispatch_continuation_t dc = fastpath(_dispatch_continuation_alloc_cacheonly());
if (!dc) {
return _dispatch_async_f_slow(dq, ctxt, func);
}
dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
dc->dc_func = func;
dc->dc_ctxt = ctxt;
_dispatch_queue_push(dq, dc);
}

可以发现,dispatch_group_async_fdispatch_async_f的实现高度一致,主要的不同在于dispatch_group_async_f在前面调用了dispatch_group_enter方法:

到了这里,大家只看到了伪代码的前半部分,但是后半部分在哪里对应呢?
实际上是对应于_dispatch_continuation_pop里面的操作,注意最后几行代码(这里不会对_dispatch_continuation_pop作详细分析,只是为了了解)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
static inline void _dispatch_continuation_pop(dispatch_object_t dou)
{
dispatch_continuation_t dc = dou._dc;
dispatch_group_t dg;

// 首先检测传进来的内容是不是队列,如果是队列,就进入 _dispatch_queue_invoke 处理队列
if (DISPATCH_OBJ_IS_VTABLE(dou._do)) {
return _dispatch_queue_invoke(dou._dq);
}

// Add the item back to the cache before calling the function. This
// allows the 'hot' continuation to be used for a quick callback.
//
// The ccache version is per-thread.
// Therefore, the object has not been reused yet.
// This generates better assembly.

// 否则这个形参就是任务封装的 dispatch_continuation_t 结构体,直接执行任务。
if ((long)dou._do->do_vtable & DISPATCH_OBJ_ASYNC_BIT) {
_dispatch_continuation_free(dc);
}
if ((long)dou._do->do_vtable & DISPATCH_OBJ_GROUP_BIT) {
dg = dc->dc_group;
} else {
dg = NULL;
}
dc->dc_func(dc->dc_ctxt);
if (dg) {
dispatch_group_leave(dg);
_dispatch_release(dg);
}
}

实际上这个函数,包括里面的_dispatch_queue_push_dispatch_continuation_pop都已经在dispatch_queue篇中分析到了,这里不再重复继续分析下去了。

总结

group篇调用到的很多函数会和queue篇交叉,一些重复的问题可以参考queue篇的分析。

-------------本文结束 感谢您的阅读-------------

本文标题:GCD源码分析4 —— dispatch_group篇

文章作者:lingyun

发布时间:2018年02月06日 - 12:02

最后更新:2018年02月10日 - 12:02

原始链接:https://tsuijunxi.github.io/2018/02/06/GCD源码分析4 —— dispatch-group篇/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。