GCD源码分析6 —— dispatch_source篇

前言

相比较而言,GCD中最不引人注目的就是dispatch_source了,它是BSD系统内核惯有功能kqueue的包装,可以一个监视某些类型事件的发生。当这些事件发生时,它自动将一个block放入dispatch queue的执行例程中。如果您对IO多路复用有一定的了解,那么就很容易能理解dispatch_source

kqueue

kqueue是IO多路复用在BSD系统中的一种实现,它的接口主要包括 kqueue()、kevent() 两个系统调用和 struct kevent 结构:

  • 1、kqueue() 生成一个内核事件队列,返回该队列的文件描述符。

    1
    int     kqueue(void);
  • 2、kevent() 提供向内核注册/反注册事件和返回就绪事件或错误事件。

    1
    2
    3
    4
    int     kevent(int kq, 
    const struct kevent *changelist, int nchanges,
    struct kevent *eventlist, int nevents,
    const struct timespec *timeout);
  • 3、struct kevent 就是kevent()操作的最基本的事件结构。

    1
    2
    3
    4
    5
    6
    7
    8
    struct kevent { 
    uintptr_t ident; /* 事件 ID */
    short filter; /* 事件过滤器 */
    u_short flags; /* 行为标识 */
    u_int fflags; /* 过滤器标识值 */
    intptr_t data; /* 过滤器数据 */
    void *udata; /* 应用透传数据 */
    };

在一个 kqueue 中,{ident, filter} 确定一个唯一的事件:

  • 1、ident 事件的 id,一般设置为文件描述符。
  • 2、filter 可以将 kqueue filter 看作事件。内核检测 ident 上注册的 filter 的状态,状态发生了变化,就通知应用程序。kqueue 定义了较多的 filter:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    #define EVFILT_READ         (-1)
    #define EVFILT_WRITE (-2)
    #define EVFILT_AIO (-3) /* attached to aio requests */
    #define EVFILT_VNODE (-4) /* attached to vnodes */
    #define EVFILT_PROC (-5) /* attached to struct proc */
    #define EVFILT_SIGNAL (-6) /* attached to struct proc */
    #define EVFILT_TIMER (-7) /* timers */
    #define EVFILT_MACHPORT (-8) /* Mach portsets */
    #define EVFILT_FS (-9) /* Filesystem events */
    #define EVFILT_USER (-10) /* User events */
  • 3、行为标志flags:

    1
    2
    3
    4
    #define EV_ADD              0x0001      /* add event to kq (implies enable) */
    #define EV_DELETE 0x0002 /* delete event from kq */
    #define EV_ENABLE 0x0004 /* enable event */
    #define EV_DISABLE 0x0008 /* disable event (not reported) */

事件类型

在source.h中可以发现dispatch_source可以支持以下事件类型:

1
2
3
4
5
6
7
8
9
10
11
DISPATCH_SOURCE_TYPE_DATA_ADD:      自定义事件
DISPATCH_SOURCE_TYPE_DATA_OR: 自定义事件
DISPATCH_SOURCE_TYPE_MACH_SEND: Mach端口发送事件。
DISPATCH_SOURCE_TYPE_MACH_RECV: Mach端口接收事件。
DISPATCH_SOURCE_TYPE_PROC: 进程相关的事件。
DISPATCH_SOURCE_TYPE_READ: 读文件事件。
DISPATCH_SOURCE_TYPE_WRITE: 写文件事件。
DISPATCH_SOURCE_TYPE_VNODE: 文件属性更改事件。
DISPATCH_SOURCE_TYPE_SIGNAL: 接收信号事件。
DISPATCH_SOURCE_TYPE_TIMER: 定时器事件。
DISPATCH_SOURCE_TYPE_MEMORYPRESSURE:内存压力事件。

使用步骤

由于dispatch_source的使用场合较少,所以很有必要对dispatch_source的使用进行介绍:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 1、创建dispatch源,这里使用加法来合并dispatch源数据,最后一个参数是指定dispatch队列
dispatch_source_t source = dispatch_source_create(dispatch_source_type, handler, mask, dispatch_queue);

// 2、设置响应dispatch源事件的block,在dispatch源指定的队列上运行
dispatch_source_set_event_handler(source, ^{
  //可以通过dispatch_source_get_data(source)来得到dispatch源数据
});

// 3、dispatch源创建后处于suspend状态,所以需要启动dispatch源
dispatch_resume(source);

// 4、合并dispatch源数据
dispatch_source_merge_data(source, value);

定时器

在使用定时器时,NSTimer是首先被想到的,但是由于NSTimer会受RunLoop影响,当RunLoop处理的任务很多时,就会导致NSTimer的精度降低,所以在一些对定时器精度要求很高的情况下,我们会考虑CADisplaylink,但是实际上也可以考虑使用GCD定时器。

1
2
3
4
5
6
7
8
9
10
11
dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
dispatch_queue_t timer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, queue);

dispatch_source_set_timer(self.timer, dispatch_time(DISPATCH_TIME_NOW, 3.0 * NSEC_PER_SEC), 2.0 * NSEC_PER_SEC, 0);

//设置回调
dispatch_source_set_event_handler(timer, ^{
// 处理逻辑
});
//启动timer
dispatch_resume(self.timer);

监视文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
dispatch_source_t fileMonitor(const char* filename) 
{
int fd = open(filename, O_EVTONLY);
if (fd == -1) return NULL;

dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
dispatch_source_t source = dispatch_source_create(DISPATCH_SOURCE_TYPE_VNODE, fd, DISPATCH_VNODE_RENAME, queue);

// 保存原文件名
int length = strlen(filename);
char* newString = (char*)malloc(length + 1);
newString = strcpy(newString, filename);
dispatch_set_context(source, newString);

// 设置事件发生的回调
dispatch_source_set_event_handler(source, ^{
// 获取原文件名
const char* oldFilename = (char*)dispatch_get_context(source);

// 文件名变化逻辑处理
...
});

// 设置取消回调
dispatch_source_set_cancel_handler(source, ^{
char* fileStr = (char*)dispatch_get_context(source);
free(fileStr);
close(fd);
});

// 启动
dispatch_resume(source);

return source;
}

常用API源码分析

dispatch_source_s

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
struct dispatch_source_s {
DISPATCH_STRUCT_HEADER(dispatch_source_s, dispatch_source_vtable_s);
DISPATCH_QUEUE_HEADER;
// Instruments always copies DISPATCH_QUEUE_MIN_LABEL_SIZE, which is 64,
// so the remainder of the structure must be big enough
union {
char _ds_pad[DISPATCH_QUEUE_MIN_LABEL_SIZE];
struct {
char dq_label[8];
dispatch_kevent_t ds_dkev;

dispatch_source_handler_function_t ds_handler_func;
void *ds_handler_ctxt;

void *ds_cancel_handler;

unsigned int ds_is_level:1,
ds_is_adder:1,
ds_is_installed:1,
ds_needs_rearm:1,
ds_is_armed:1,
ds_is_legacy:1,
ds_cancel_is_block:1,
ds_handler_is_block:1;

unsigned int ds_atomic_flags;

unsigned long ds_data;
unsigned long ds_pending_data;
unsigned long ds_pending_data_mask;

TAILQ_ENTRY(dispatch_source_s) ds_list;

unsigned long ds_ident_hack;

struct dispatch_timer_source_s ds_timer;
};
};
};

如果大家对dispatch_queue_s的定义还有印象的话,那么可以发现dispatch_source_s的定义就是dispatch_queue_s的覆盖版:

1
2
3
4
5
struct dispatch_queue_s {
DISPATCH_STRUCT_HEADER(dispatch_queue_s, dispatch_queue_vtable_s);
DISPATCH_QUEUE_HEADER;
char dq_label[DISPATCH_QUEUE_MIN_LABEL_SIZE]; // must be last
};

由于dispatch_source_s的第三项是一个联合体,其中的char _ds_pad[DISPATCH_QUEUE_MIN_LABEL_SIZE];dispatch_queue_s中的char dq_label[DISPATCH_QUEUE_MIN_LABEL_SIZE];就是基本上一回事,所以这样就可以利用dispatch_queue_s的函数对dispatch_source_s进行初始化,之后再针对不同的字段进行覆盖。

dispatch_source_create

按照惯例,还是先从创建说起:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
dispatch_source_t 
dispatch_source_create(dispatch_source_type_t type,
uintptr_t handle,
unsigned long mask,
dispatch_queue_t q)
{
// 注意这里和kevent扯上关系了
const struct kevent *proto_kev = &type->ke;

dispatch_source_t ds = NULL;
dispatch_kevent_t dk = NULL;

// 基本校验
...

// 申请dispatch_source_t内存空间
ds = calloc(1ul, sizeof(struct dispatch_source_s));

// 申请dispatch_kevent_s内存空间
dk = calloc(1ul, sizeof(struct dispatch_kevent_s));

// 设置dispatch_kevent_s字段
dk->dk_kevent = *proto_kev;
dk->dk_kevent.ident = handle;
dk->dk_kevent.flags |= EV_ADD|EV_ENABLE;
dk->dk_kevent.fflags |= (uint32_t)mask;
dk->dk_kevent.udata = dk;
TAILQ_INIT(&dk->dk_sources);

// 调用队列方法_dispatch_queue_init初始化ds
_dispatch_queue_init((dispatch_queue_t)ds);
strlcpy(ds->dq_label, "source", sizeof(ds->dq_label));

// 设置dispatch_source_t特有字段,对上述的_ds_pad的空间进行覆盖
ds->do_vtable = &_dispatch_source_kevent_vtable;//设置针对source的source_kevent_vtable
ds->do_ref_cnt++; // 引用计数
ds->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_INTERVAL;//处于暂停状态,需要手动开启。
ds->do_targetq = q;// 表示事件触发的回调在该队列执行

// Dispatch Source
ds->ds_ident_hack = dk->dk_kevent.ident;
ds->ds_dkev = dk;
ds->ds_pending_data_mask = dk->dk_kevent.fflags;
if ((EV_DISPATCH|EV_ONESHOT) & proto_kev->flags) {
if (proto_kev->filter != EVFILT_MACHPORT) {
ds->ds_is_level = true;
}
ds->ds_needs_rearm = true;
} else if (!(EV_CLEAR & proto_kev->flags)) {
// we cheat and use EV_CLEAR to mean a "flag thingy"
ds->ds_is_adder = true;
}

// If its a timer source, it needs to be re-armed
if (type->ke.filter == DISPATCH_EVFILT_TIMER) {
ds->ds_needs_rearm = true;
}

// Some sources require special processing
if (type == DISPATCH_SOURCE_TYPE_MACH_SEND) {
static dispatch_once_t pred;
dispatch_once_f(&pred, NULL, _dispatch_mach_notify_source_init);
} else if (type == DISPATCH_SOURCE_TYPE_TIMER) {
ds->ds_timer.flags = mask;
}

_dispatch_retain(ds->do_targetq);
return ds;

out_bad:
free(ds);
free(dk);
return NULL;
}

  • 1、dispatch_source_type_s
    dispatch_source_type_s有两个字段,一个kevent,另一个是mask

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    struct dispatch_source_type_s {
    struct kevent ke;
    uint64_t mask;
    };

    // 以_dispatch_source_type_timer为例:
    const struct dispatch_source_type_s _dispatch_source_type_timer = {
    .ke = {
    .filter = DISPATCH_EVFILT_TIMER,
    },
    .mask = DISPATCH_TIMER_INTERVAL|DISPATCH_TIMER_ONESHOT|DISPATCH_TIMER_ABSOLUTE|DISPATCH_TIMER_WALL_CLOCK,
    };
  • 2、do_vtable

    1
    2
    3
    4
    5
    6
    7
    8
    const struct dispatch_source_vtable_s _dispatch_source_kevent_vtable = {
    .do_type = DISPATCH_SOURCE_KEVENT_TYPE,
    .do_kind = "kevent-source",
    .do_invoke = _dispatch_source_invoke,
    .do_dispose = _dispatch_source_dispose,
    .do_probe = _dispatch_source_probe,
    .do_debug = _dispatch_source_kevent_debug,
    };

dispatch_source_set_event_handler

1
2
3
4
5
6
void dispatch_source_set_event_handler(dispatch_source_t ds, dispatch_block_t handler)
{
handler = _dispatch_Block_copy(handler);
dispatch_barrier_async_f((dispatch_queue_t)ds,
handler, _dispatch_source_set_event_handler2);
}

先对handler进行了copy,然后调用了dispatch_barrier_async_f,这个方法和queue篇中的dispatch_async_f极其相似:把block设置到续体上,然后压入目标队列,同时可以发现,dispatch_barrier_async_f方法中直接把ds当成队列使用了,这也无可厚非,它们极其相似的定义就决定了这是可行的。

1
2
3
4
5
6
7
8
9
10
11
12
void dispatch_barrier_async_f(dispatch_queue_t dq, void *context, dispatch_function_t func)
{
dispatch_continuation_t dc = fastpath(_dispatch_continuation_alloc_cacheonly());

if (!dc) return _dispatch_barrier_async_f_slow(dq, context, func);

dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
dc->dc_func = func;
dc->dc_ctxt = context;

_dispatch_queue_push(dq, dc);
}

再来关注下_dispatch_source_set_event_handler2

1
2
3
4
5
6
7
8
9
10
11
12
13
static void _dispatch_source_set_event_handler2(void *context)
{
struct Block_layout *bl = context;

dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();

if (ds->ds_handler_is_block && ds->ds_handler_ctxt) {
Block_release(ds->ds_handler_ctxt);
}
ds->ds_handler_func = bl ? (void *)bl->invoke : NULL;
ds->ds_handler_ctxt = bl;
ds->ds_handler_is_block = true;
}

这个方法主要是保存一下上下文,回头使用。

dispatch_source_set_cancel_handler

1
2
3
4
5
6
7
void dispatch_source_set_cancel_handler(dispatch_source_t ds,
dispatch_block_t handler)
{
handler = _dispatch_Block_copy(handler);
dispatch_barrier_async_f((dispatch_queue_t)ds,
handler, _dispatch_source_set_cancel_handler2);
}

设置cancel回调,可以看到source是支持cancel的。

1
2
3
4
5
6
7
8
9
10
11
static void _dispatch_source_set_cancel_handler2(void *context)
{
dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
dispatch_assert(ds->do_vtable == &_dispatch_source_kevent_vtable);

if (ds->ds_cancel_is_block && ds->ds_cancel_handler) {
Block_release(ds->ds_cancel_handler);
}
ds->ds_cancel_handler = context;
ds->ds_cancel_is_block = true;
}

dispatch_source_cancel

1
2
3
4
5
6
7
8
9
10
11
12
void dispatch_source_cancel(dispatch_source_t ds)
{
_dispatch_retain(ds);

// 设置取消标志
dispatch_atomic_or(&ds->ds_atomic_flags, DSF_CANCELED);

// 唤醒source,执行取消操作,解除source注册,最后进行资源释放
_dispatch_wakeup(ds);

_dispatch_release(ds);
}

dispatch_source_get_xxx

1
2
3
4
5
6
7
8
9
10
11
12
13
14
unsigned long dispatch_source_get_mask(dispatch_source_t ds)
{
return ds->ds_pending_data_mask;// mask
}

uintptr_t dispatch_source_get_handle(dispatch_source_t ds)
{
return (int)ds->ds_ident_hack;// 回调函数地址存储在ds_ident_hack字段中
}

unsigned long dispatch_source_get_data(dispatch_source_t ds)
{
return ds->ds_data;
}

dispatch_source_merge_data

Merges data into a dispatch source of type DISPATCH_SOURCE_TYPE_DATA_ADD or DISPATCH_SOURCE_TYPE_DATA_OR and submits its event handler block to its target queue

1
2
3
4
5
6
7
8
9
void dispatch_source_merge_data(dispatch_source_t ds, unsigned long val)
{
struct kevent kev = {
.fflags = (typeof(kev.fflags))val,
.data = val,
};

_dispatch_source_merge_kevent(ds, &kev);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
void _dispatch_source_merge_kevent(dispatch_source_t ds, const struct kevent *ke)
{
struct kevent fake;

// 如果source已经被取消或者引用计数为0,直接返回
if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == 0)) {
return;
}

// 发生错误之后的清理工作
if (ke->flags & EV_ERROR) {
if (ke->filter == EVFILT_PROC && ke->data == ESRCH) {
fake = *ke;
fake.flags &= ~EV_ERROR;
fake.fflags = NOTE_EXIT;
fake.data = 0;
ke = &fake;
} else {
// log the unexpected error
dispatch_assume_zero(ke->data);
return;
}
}

if (ds->ds_is_level) {
// EV_EOF
dispatch_assert(ke->data >= 0l);
ds->ds_pending_data = ~ke->data;
}
else if (ds->ds_is_adder) {
dispatch_atomic_add(&ds->ds_pending_data, ke->data);
}
else {
dispatch_atomic_or(&ds->ds_pending_data, ke->fflags & ds->ds_pending_data_mask);
}

// EV_DISPATCH and EV_ONESHOT sources 仅仅触发一次
if (ds->ds_needs_rearm) {
ds->ds_is_armed = false;
}

// 唤醒source
_dispatch_wakeup(ds);
}

dispatch_resume

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void dispatch_resume(dispatch_object_t dou)
{
if (slowpath(dou._do->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) {
return;
}
switch (dispatch_atomic_sub(&dou._do->do_suspend_cnt, DISPATCH_OBJECT_SUSPEND_INTERVAL) + DISPATCH_OBJECT_SUSPEND_INTERVAL) {
case DISPATCH_OBJECT_SUSPEND_INTERVAL:
_dispatch_wakeup(dou._do);
break;
case 0:
DISPATCH_CLIENT_CRASH("Over-resume of an object");
break;
default:
break;
}
}

由前面的创建过程可以知晓,一般情况下,创建一个source,其do_suspend_cnt都是DISPATCH_OBJECT_SUSPEND_INTERVAL,表示暂停状态,需要调用dispatch_resume手动启动。
很明显,上面的switch分支应该走DISPATCH_OBJECT_SUSPEND_INTERVAL,从而调用_dispatch_wakeup

_dispatch_wakeup在queue篇中也有分析到,它主要和do_invoke有关,而source中的do_invoke指向了_dispatch_source_invoke,这个方法会执行所有的source actions,每一个action都会保证在合适的队列上执行,如果当前的队列与action不对应,那么正确的队列将会返回,然后该action将会在返回的正确队列上执行,本文的核心重点就是这个了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
dispatch_queue_t _dispatch_source_invoke(dispatch_source_t ds)
{
// 获取当前线程所在的队列
dispatch_queue_t dq = _dispatch_queue_get_current();

// 如果source还没有安装,则调用_dispatch_kevent_merge进行安装到管理队列上
if (!ds->ds_is_installed) {
// The source needs to be installed on the manager queue.
if (dq != &_dispatch_mgr_q) {
return &_dispatch_mgr_q;
}

_dispatch_kevent_merge(ds);
}
else if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == 0)) {
// source已经被取消,需要从管理队列中卸载掉,卸载完成后,取消回调需要发送到目标队列
// 可以看到,如果source的ds_dkev不为空,就需要管理队列,否则目标队列就可以了
if (ds->ds_dkev) {
if (dq != &_dispatch_mgr_q) {
return &_dispatch_mgr_q;
}
_dispatch_kevent_release(ds);
return ds->do_targetq;
}
else if (ds->ds_cancel_handler) {
if (dq != ds->do_targetq) {
return ds->do_targetq;
}
}
// cancel回调
_dispatch_source_cancel_callout(ds);
}
else if (ds->ds_pending_data) {
// source有未决的数据,需要通过在目标队列上通过回调传送
if (dq != ds->do_targetq) {
return ds->do_targetq;
}
//
_dispatch_source_latch_and_call(ds);

// 有些source需要进行rearmed,必须切到管理队列上
if (ds->ds_needs_rearm) {
return &_dispatch_mgr_q;
}
}
else if (ds->ds_needs_rearm && !ds->ds_is_armed) {
// 在管理队列上进行rearmed
if (dq != &_dispatch_mgr_q) {
return &_dispatch_mgr_q;
}
_dispatch_kevent_resume(ds->ds_dkev, 0, 0);
ds->ds_is_armed = true;
}

return NULL;
}

该函数中多次提到了管理队列_dispatch_mgr_q,有些情况必须切到管理队列上执行,比如:

  • ds_is_installed为假;
  • ds_dkev不为空;
  • ds_needs_rearm
    关于_dispatch_mgr_q,后文会详细分析。

_dispatch_kevent_merge

  • Find existing kevents, and merge any new flags if necessary
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
void _dispatch_kevent_merge(dispatch_source_t ds)
{
static dispatch_once_t pred;
dispatch_kevent_t dk;
typeof(dk->dk_kevent.fflags) new_flags;
bool do_resume = false;

if (ds->ds_is_installed) {
return;
}
ds->ds_is_installed = true;

// 初始化_dispatch_source_init_tail_queue_array
dispatch_once_f(&pred, NULL, _dispatch_source_init_tail_queue_array
);

// 寻找是否有相同的事件(dispatch kevent)被观察
dk = _dispatch_kevent_find(ds->ds_dkev->dk_kevent.ident, ds->ds_dkev->dk_kevent.filter);

if (dk) {
// 如果存在有相同的事件,则需要检查是否有新的flags需要更新,从这里可以看出,kevent
// 并不依赖于source,而是独立存在的。
new_flags = ~dk->dk_kevent.fflags & ds->ds_dkev->dk_kevent.fflags;
dk->dk_kevent.fflags |= ds->ds_dkev->dk_kevent.fflags;
free(ds->ds_dkev);
ds->ds_dkev = dk;
do_resume = new_flags;
}
else {
// 如果不存在,则直接将dk插入到事件队列中
dk = ds->ds_dkev;
_dispatch_kevent_insert(dk);
new_flags = dk->dk_kevent.fflags;
do_resume = true;
}

// 将source的ds_list插入到kevent的source队列中
TAILQ_INSERT_TAIL(&dk->dk_sources, ds, ds_list);

// 如果flags有更新,则需要重新注册kevent事件
if (do_resume) {
dk->dk_kevent.flags |= EV_ADD;
_dispatch_kevent_resume(ds->ds_dkev, new_flags, 0);
ds->ds_is_armed = true;
}
}
  • 1、_dispatch_source_init_tail_queue_array
1
2
3
4
5
6
7
8
9
10
11
12
13
14
static void
_dispatch_source_init_tail_queue_array(void *context __attribute__((unused)))
{
unsigned int i;
for (i = 0; i < DSL_HASH_SIZE; i++) {
TAILQ_INIT(&_dispatch_sources[i]);
}

TAILQ_INSERT_TAIL(&_dispatch_sources[DSL_HASH(DISPATCH_TIMER_INDEX_WALL)], &_dispatch_kevent_timer[DISPATCH_TIMER_INDEX_WALL], dk_list);
TAILQ_INSERT_TAIL(&_dispatch_sources[DSL_HASH(DISPATCH_TIMER_INDEX_MACH)], &_dispatch_kevent_timer[DISPATCH_TIMER_INDEX_MACH], dk_list);

TAILQ_INSERT_TAIL(&_dispatch_sources[0], &_dispatch_kevent_data_or, dk_list);
TAILQ_INSERT_TAIL(&_dispatch_sources[0], &_dispatch_kevent_data_add, dk_list);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#define DSL_HASH_SIZE 256u  
#define DSL_HASH(x) ((x) & (DSL_HASH_SIZE - 1))
static TAILQ_HEAD(, dispatch_kevent_s) _dispatch_sources[DSL_HASH_SIZE];

#define DISPATCH_TIMER_INDEX_WALL 0
#define DISPATCH_TIMER_INDEX_MACH 1
static struct dispatch_kevent_s _dispatch_kevent_timer[] = {
{
.dk_kevent = {
.ident = DISPATCH_TIMER_INDEX_WALL,
.filter = DISPATCH_EVFILT_TIMER,
.udata = &_dispatch_kevent_timer[0],
},
.dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_timer[0].dk_sources),
},
{
.dk_kevent = {
.ident = DISPATCH_TIMER_INDEX_MACH,
.filter = DISPATCH_EVFILT_TIMER,
.udata = &_dispatch_kevent_timer[1],
},
.dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_timer[1].dk_sources),
},
};
  • 2、_dispatch_kevent_find
1
2
3
4
5
6
7
8
9
10
11
12
13
static dispatch_kevent_t _dispatch_kevent_find(uintptr_t ident, short filter)
{
uintptr_t hash = DSL_HASH(filter == EVFILT_MACHPORT ? MACH_PORT_INDEX(ident) : ident);
dispatch_kevent_t dki;

TAILQ_FOREACH(dki, &_dispatch_sources[hash], dk_list) {
// ident和filter都相同才算同一个
if (dki->dk_kevent.ident == ident && dki->dk_kevent.filter == filter) {
break;
}
}
return dki;
}
  • 3、_dispatch_kevent_resume
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void _dispatch_kevent_resume(dispatch_kevent_t dk, uint32_t new_flags, uint32_t del_flags)
{
switch (dk->dk_kevent.filter) {
case DISPATCH_EVFILT_TIMER:
case DISPATCH_EVFILT_CUSTOM_ADD:
case DISPATCH_EVFILT_CUSTOM_OR:
// these types not registered with kevent
return;
case EVFILT_MACHPORT:
_dispatch_kevent_machport_resume(dk, new_flags, del_flags);
break;
case EVFILT_PROC:
if (dk->dk_kevent.flags & EV_ONESHOT) {
return;
}
// fall through
default:
_dispatch_update_kq(&dk->dk_kevent);
if (dk->dk_kevent.flags & EV_DISPATCH) {
dk->dk_kevent.flags &= ~EV_ADD;
}
break;
}
}

_dispatch_source_cancel_callout

主要执行一些清理工作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
void
_dispatch_source_cancel_callout(dispatch_source_t ds)
{
ds->ds_pending_data_mask = 0;
ds->ds_pending_data = 0;
ds->ds_data = 0;

#ifdef __BLOCKS__
// block的释放和相关清理
if (ds->ds_handler_is_block) {
Block_release(ds->ds_handler_ctxt);
ds->ds_handler_is_block = false;
ds->ds_handler_func = NULL;
ds->ds_handler_ctxt = NULL;
}
#endif

// 如果ds->ds_cancel_handler为空,直接返回
if (!ds->ds_cancel_handler) {
return;
}

if (ds->ds_cancel_is_block) {
#ifdef __BLOCKS__
// 如果回调是block
dispatch_block_t b = ds->ds_cancel_handler;
if (ds->ds_atomic_flags & DSF_CANCELED) {
// 执行取消回调
b();
}
Block_release(ds->ds_cancel_handler);
ds->ds_cancel_is_block = false;
#endif
}
else {
// 如果回调是函数
dispatch_function_t f = ds->ds_cancel_handler;
if (ds->ds_atomic_flags & DSF_CANCELED) {
f(ds->do_ctxt);
}
}

// 执行完后,设置ds_cancel_handler为空
ds->ds_cancel_handler = NULL;
}

_dispatch_source_latch_and_call

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void _dispatch_source_latch_and_call(dispatch_source_t ds)
{
unsigned long prev;

if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == 0)) {
return;
}
prev = dispatch_atomic_xchg(&ds->ds_pending_data, 0);
if (ds->ds_is_level) {
ds->ds_data = ~prev;
} else {
ds->ds_data = prev;
}
if (dispatch_assume(prev)) {
if (ds->ds_handler_func) {
ds->ds_handler_func(ds->ds_handler_ctxt, ds);
}
}
}

管理队列

关于_dispatch_mgr_q,我们在queue篇中提到过,但是一直没有看到它的使用场合,它和source的关系很紧密,这里重点分析一下:

1
2
3
4
5
6
7
8
9
10
11
struct dispatch_queue_s _dispatch_mgr_q = {
.do_vtable = &_dispatch_queue_mgr_vtable,
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
.do_targetq = &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_COUNT - 1],

.dq_label = "com.apple.libdispatch-manager",
.dq_width = 1,
.dq_serialnum = 2,
};

看看管理队列的函数指针do_vtable的指向:_dispatch_queue_mgr_vtable

1
2
3
4
5
6
7
static const struct dispatch_queue_vtable_s _dispatch_queue_mgr_vtable = {
.do_type = DISPATCH_QUEUE_MGR_TYPE,
.do_kind = "mgr-queue",
.do_invoke = _dispatch_mgr_invoke,
.do_debug = dispatch_queue_debug,
.do_probe = _dispatch_mgr_wakeup,
};

毫无疑问,这里面最重要的就是do_invoke和do_probe了,同时这也是我们探寻source底层的入口了
接下来就需要一些IO多路复用的知识了:

  • 同步阻塞、异步阻塞、同步非阻塞等
  • select、epoll、kqueue等

_dispatch_mgr_wakeup

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static bool _dispatch_mgr_wakeup(dispatch_queue_t dq)
{
static const struct kevent kev = {
.ident = 1,
.filter = EVFILT_USER,
#ifdef EV_TRIGGER
.flags = EV_TRIGGER,
#endif
#ifdef NOTE_TRIGGER
.fflags = NOTE_TRIGGER,
#endif
};
_dispatch_update_kq(&kev);

return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
void
_dispatch_update_kq(const struct kevent *kev)
{
struct kevent kev_copy = *kev;
kev_copy.flags |= EV_RECEIPT;

// 从kqueue中删除kevent
if (kev_copy.flags & EV_DELETE) {
switch (kev_copy.filter) {
case EVFILT_READ:
// 清除标记
if (FD_ISSET((int)kev_copy.ident, &_dispatch_rfds)) {
FD_CLR((int)kev_copy.ident, &_dispatch_rfds);
_dispatch_rfd_ptrs[kev_copy.ident] = 0;
return;
}
case EVFILT_WRITE:
// 清除标记
if (FD_ISSET((int)kev_copy.ident, &_dispatch_wfds)) {
FD_CLR((int)kev_copy.ident, &_dispatch_wfds);
_dispatch_wfd_ptrs[kev_copy.ident] = 0;
return;
}
default:
break;
}
}

// 监视kev_copy中的事件,如果列表里有任何就绪的fd,则把该事件对应的结构体放进
// 第4个参数kev_copy列表里面。该方法调用后会阻塞,直到有事件就绪。
int rval = kevent(_dispatch_get_kq(), &kev_copy, 1, &kev_copy, 1, NULL);


switch (kev_copy.data) {
case 0:
return;
case EBADF:
break;
default:
// 如果是因为kevent正在读数据或者写数据,导致上面的注册失败,那么需要设置_dispatch_select_workaround为true,然后回头调用select
switch (kev_copy.filter) {
case EVFILT_READ:
_dispatch_select_workaround = true;
FD_SET((int)kev_copy.ident, &_dispatch_rfds);
_dispatch_rfd_ptrs[kev_copy.ident] = kev_copy.udata;//读到的数据
break;
case EVFILT_WRITE:
_dispatch_select_workaround = true;
FD_SET((int)kev_copy.ident, &_dispatch_wfds);
_dispatch_wfd_ptrs[kev_copy.ident] = kev_copy.udata;//写入的数据
break;
default:
_dispatch_source_drain_kevent(&kev_copy);
break;
}
break;
}
}

除了EVFILT_READEVFILT_WRITE两种类型外,其它的事件类型都调用_dispatch_source_drain_kevent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void _dispatch_source_drain_kevent(struct kevent *ke)
{
static dispatch_once_t pred;
dispatch_kevent_t dk = ke->udata;
dispatch_source_t dsi;

// machport事件类型的处理
if (ke->filter == EVFILT_MACHPORT) {
return _dispatch_drain_mach_messages(ke);
}

// flags设置
if (ke->flags & EV_ONESHOT) {
dk->dk_kevent.flags |= EV_ONESHOT;
}

// 遍历dk->dk_sources,对里面的每一个source,执行_dispatch_source_merge_kevent
// 表示事件已经就绪
TAILQ_FORACH(dsi, &dk->dk_sources, ds_list) {
_dispatch_source_merge_kevent(dsi, ke);
}
}

_dispatch_mgr_invoke

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
static dispatch_queue_t _dispatch_mgr_invoke(dispatch_queue_t dq)
{
static const struct timespec timeout_immediately = { 0, 0 };
struct timespec timeout;
const struct timespec *timeoutp;
struct timeval sel_timeout, *sel_timeoutp;
fd_set tmp_rfds, tmp_wfds;
struct kevent kev[1];
int k_cnt, k_err, i, r;

// 设置线程私有数据TSD
_dispatch_thread_setspecific(dispatch_queue_key, dq);

// 无限循环
for (;;) {
_dispatch_run_timers();

// looking for the first unsuspended timer which has its target
// time set. Given timers are kept in order, if we hit an timer that's
// unset there's no point in continuing down the list
timeoutp = _dispatch_get_next_timer_fire(&timeout);

// 参考_dispatch_update_kq中描述的原因,是一种特殊的处理
if (_dispatch_select_workaround) {
FD_COPY(&_dispatch_rfds, &tmp_rfds);
FD_COPY(&_dispatch_wfds, &tmp_wfds);
if (timeoutp) {
sel_timeout.tv_sec = timeoutp->tv_sec;
sel_timeout.tv_usec = (typeof(sel_timeout.tv_usec))(timeoutp->tv_nsec / 1000u);
sel_timeoutp = &sel_timeout;
} else {
sel_timeoutp = NULL;
}

// 这里调用跨平台的多路复用select
r = select(FD_SETSIZE, &tmp_rfds, &tmp_wfds, NULL, sel_timeoutp);

// 错误处理
if (r == -1) {
if (errno != EBADF) {
dispatch_assume_zero(errno);
continue;
}
for (i = 0; i < FD_SETSIZE; i++) {
if (i == _dispatch_kq) {
continue;
}
if (!FD_ISSET(i, &_dispatch_rfds) && !FD_ISSET(i, &_dispatch_wfds)) {
continue;
}
r = dup(i);
if (r != -1) {
close(r);
} else {
FD_CLR(i, &_dispatch_rfds);
FD_CLR(i, &_dispatch_wfds);
_dispatch_rfd_ptrs[i] = 0;
_dispatch_wfd_ptrs[i] = 0;
}
}
continue;
}

// 有事件就绪
if (r > 0) {
for (i = 0; i < FD_SETSIZE; i++) {
if (i == _dispatch_kq) {
continue;
}
if (FD_ISSET(i, &tmp_rfds)) {
FD_CLR(i, &_dispatch_rfds); // emulate EV_DISABLE
EV_SET(&kev[0], i, EVFILT_READ, EV_ADD|EV_ENABLE|EV_DISPATCH, 0, 1, _dispatch_rfd_ptrs[i]);
_dispatch_rfd_ptrs[i] = 0;
_dispatch_mgr_thread2(kev, 1);
}
if (FD_ISSET(i, &tmp_wfds)) {
FD_CLR(i, &_dispatch_wfds); // emulate EV_DISABLE
EV_SET(&kev[0], i, EVFILT_WRITE, EV_ADD|EV_ENABLE|EV_DISPATCH, 0, 1, _dispatch_wfd_ptrs[i]);
_dispatch_wfd_ptrs[i] = 0;
_dispatch_mgr_thread2(kev, 1);
}
}
}

timeoutp = &timeout_immediately;
}

// 正常的路径
k_cnt = kevent(_dispatch_kq, NULL, 0, kev, sizeof(kev) / sizeof(kev[0]), timeoutp);
k_err = errno;

switch (k_cnt) {
case -1:
if (k_err == EBADF) {
DISPATCH_CLIENT_CRASH("Do not close random Unix descriptors");
}
dispatch_assume_zero(k_err);
continue;
default:
_dispatch_mgr_thread2(kev, (size_t)k_cnt);
// fall through
case 0:
_dispatch_force_cache_cleanup();
continue;
}
}

return NULL;
}

不管是特殊处理的路径,还是正常的路径,二者均调用到了_dispatch_mgr_thread2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static void _dispatch_mgr_thread2(struct kevent *kev, size_t cnt)
{
size_t i;

for (i = 0; i < cnt; i++) {
// EVFILT_USER未被source使用
if (kev[i].filter == EVFILT_USER) {
_dispatch_queue_serial_drain_till_empty(&_dispatch_mgr_q);
}
else {
_dispatch_source_drain_kevent(&kev[i]);
}
}
}

1
2
3
4
5
void _dispatch_queue_serial_drain_till_empty(dispatch_queue_t dq)
{
_dispatch_queue_drain(dq);
_dispatch_force_cache_cleanup();
}

总结

本篇的篇幅较长,也比较复杂,而且source本身相对于GCD中的其它API也是最陌生的。本文中涉及到的最多的就是IO多路复用和管理队列,抓住这两个重点,就不难理解dispatch_source的底层原理了。

-------------本文结束 感谢您的阅读-------------

本文标题:GCD源码分析6 —— dispatch_source篇

文章作者:lingyun

发布时间:2018年02月10日 - 00:02

最后更新:2018年02月12日 - 18:02

原始链接:https://tsuijunxi.github.io/2018/02/10/GCD源码分析6 —— dispatch-source篇/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。