前言
相比较而言,GCD中最不引人注目的就是dispatch_source
了,它是BSD系统内核惯有功能kqueue的包装,可以一个监视某些类型事件的发生。当这些事件发生时,它自动将一个block放入dispatch queue
的执行例程中。如果您对IO多路复用有一定的了解,那么就很容易能理解dispatch_source
。
kqueue
kqueue是IO多路复用在BSD系统中的一种实现,它的接口主要包括 kqueue()、kevent() 两个系统调用和 struct kevent 结构:
1、kqueue() 生成一个内核事件队列,返回该队列的文件描述符。
1 int kqueue(void);2、kevent() 提供向内核注册/反注册事件和返回就绪事件或错误事件。
1
2
3
4 int kevent(int kq,
const struct kevent *changelist, int nchanges,
struct kevent *eventlist, int nevents,
const struct timespec *timeout);3、struct kevent 就是kevent()操作的最基本的事件结构。
1
2
3
4
5
6
7
8 struct kevent {
uintptr_t ident; /* 事件 ID */
short filter; /* 事件过滤器 */
u_short flags; /* 行为标识 */
u_int fflags; /* 过滤器标识值 */
intptr_t data; /* 过滤器数据 */
void *udata; /* 应用透传数据 */
};
在一个 kqueue 中,{ident, filter} 确定一个唯一的事件:
- 1、ident 事件的 id,一般设置为文件描述符。
2、filter 可以将 kqueue filter 看作事件。内核检测 ident 上注册的 filter 的状态,状态发生了变化,就通知应用程序。kqueue 定义了较多的 filter:
1
2
3
4
5
6
7
8
9
103、行为标志flags:
1
2
3
4
事件类型
在source.h中可以发现dispatch_source
可以支持以下事件类型:1
2
3
4
5
6
7
8
9
10
11DISPATCH_SOURCE_TYPE_DATA_ADD: 自定义事件
DISPATCH_SOURCE_TYPE_DATA_OR: 自定义事件
DISPATCH_SOURCE_TYPE_MACH_SEND: Mach端口发送事件。
DISPATCH_SOURCE_TYPE_MACH_RECV: Mach端口接收事件。
DISPATCH_SOURCE_TYPE_PROC: 进程相关的事件。
DISPATCH_SOURCE_TYPE_READ: 读文件事件。
DISPATCH_SOURCE_TYPE_WRITE: 写文件事件。
DISPATCH_SOURCE_TYPE_VNODE: 文件属性更改事件。
DISPATCH_SOURCE_TYPE_SIGNAL: 接收信号事件。
DISPATCH_SOURCE_TYPE_TIMER: 定时器事件。
DISPATCH_SOURCE_TYPE_MEMORYPRESSURE:内存压力事件。
使用步骤
由于dispatch_source
的使用场合较少,所以很有必要对dispatch_source
的使用进行介绍:1
2
3
4
5
6
7
8
9
10
11
12
13// 1、创建dispatch源,这里使用加法来合并dispatch源数据,最后一个参数是指定dispatch队列
dispatch_source_t source = dispatch_source_create(dispatch_source_type, handler, mask, dispatch_queue);
// 2、设置响应dispatch源事件的block,在dispatch源指定的队列上运行
dispatch_source_set_event_handler(source, ^{
//可以通过dispatch_source_get_data(source)来得到dispatch源数据
});
// 3、dispatch源创建后处于suspend状态,所以需要启动dispatch源
dispatch_resume(source);
// 4、合并dispatch源数据
dispatch_source_merge_data(source, value);
定时器
在使用定时器时,NSTimer是首先被想到的,但是由于NSTimer会受RunLoop影响,当RunLoop处理的任务很多时,就会导致NSTimer的精度降低,所以在一些对定时器精度要求很高的情况下,我们会考虑CADisplaylink,但是实际上也可以考虑使用GCD定时器。1
2
3
4
5
6
7
8
9
10
11dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
dispatch_queue_t timer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, queue);
dispatch_source_set_timer(self.timer, dispatch_time(DISPATCH_TIME_NOW, 3.0 * NSEC_PER_SEC), 2.0 * NSEC_PER_SEC, 0);
//设置回调
dispatch_source_set_event_handler(timer, ^{
// 处理逻辑
});
//启动timer
dispatch_resume(self.timer);
监视文件
1 | dispatch_source_t fileMonitor(const char* filename) |
常用API源码分析
dispatch_source_s
1 | struct dispatch_source_s { |
如果大家对dispatch_queue_s
的定义还有印象的话,那么可以发现dispatch_source_s
的定义就是dispatch_queue_s
的覆盖版:1
2
3
4
5struct dispatch_queue_s {
DISPATCH_STRUCT_HEADER(dispatch_queue_s, dispatch_queue_vtable_s);
DISPATCH_QUEUE_HEADER;
char dq_label[DISPATCH_QUEUE_MIN_LABEL_SIZE]; // must be last
};
由于dispatch_source_s
的第三项是一个联合体,其中的char _ds_pad[DISPATCH_QUEUE_MIN_LABEL_SIZE];
和dispatch_queue_s
中的char dq_label[DISPATCH_QUEUE_MIN_LABEL_SIZE];
就是基本上一回事,所以这样就可以利用dispatch_queue_s
的函数对dispatch_source_s
进行初始化,之后再针对不同的字段进行覆盖。
dispatch_source_create
按照惯例,还是先从创建说起:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74dispatch_source_t
dispatch_source_create(dispatch_source_type_t type,
uintptr_t handle,
unsigned long mask,
dispatch_queue_t q)
{
// 注意这里和kevent扯上关系了
const struct kevent *proto_kev = &type->ke;
dispatch_source_t ds = NULL;
dispatch_kevent_t dk = NULL;
// 基本校验
...
// 申请dispatch_source_t内存空间
ds = calloc(1ul, sizeof(struct dispatch_source_s));
// 申请dispatch_kevent_s内存空间
dk = calloc(1ul, sizeof(struct dispatch_kevent_s));
// 设置dispatch_kevent_s字段
dk->dk_kevent = *proto_kev;
dk->dk_kevent.ident = handle;
dk->dk_kevent.flags |= EV_ADD|EV_ENABLE;
dk->dk_kevent.fflags |= (uint32_t)mask;
dk->dk_kevent.udata = dk;
TAILQ_INIT(&dk->dk_sources);
// 调用队列方法_dispatch_queue_init初始化ds
_dispatch_queue_init((dispatch_queue_t)ds);
strlcpy(ds->dq_label, "source", sizeof(ds->dq_label));
// 设置dispatch_source_t特有字段,对上述的_ds_pad的空间进行覆盖
ds->do_vtable = &_dispatch_source_kevent_vtable;//设置针对source的source_kevent_vtable
ds->do_ref_cnt++; // 引用计数
ds->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_INTERVAL;//处于暂停状态,需要手动开启。
ds->do_targetq = q;// 表示事件触发的回调在该队列执行
// Dispatch Source
ds->ds_ident_hack = dk->dk_kevent.ident;
ds->ds_dkev = dk;
ds->ds_pending_data_mask = dk->dk_kevent.fflags;
if ((EV_DISPATCH|EV_ONESHOT) & proto_kev->flags) {
if (proto_kev->filter != EVFILT_MACHPORT) {
ds->ds_is_level = true;
}
ds->ds_needs_rearm = true;
} else if (!(EV_CLEAR & proto_kev->flags)) {
// we cheat and use EV_CLEAR to mean a "flag thingy"
ds->ds_is_adder = true;
}
// If its a timer source, it needs to be re-armed
if (type->ke.filter == DISPATCH_EVFILT_TIMER) {
ds->ds_needs_rearm = true;
}
// Some sources require special processing
if (type == DISPATCH_SOURCE_TYPE_MACH_SEND) {
static dispatch_once_t pred;
dispatch_once_f(&pred, NULL, _dispatch_mach_notify_source_init);
} else if (type == DISPATCH_SOURCE_TYPE_TIMER) {
ds->ds_timer.flags = mask;
}
_dispatch_retain(ds->do_targetq);
return ds;
out_bad:
free(ds);
free(dk);
return NULL;
}
1、dispatch_source_type_s
dispatch_source_type_s
有两个字段,一个kevent,另一个是mask
1
2
3
4
5
6
7
8
9
10
11
12 struct dispatch_source_type_s {
struct kevent ke;
uint64_t mask;
};
// 以_dispatch_source_type_timer为例:
const struct dispatch_source_type_s _dispatch_source_type_timer = {
.ke = {
.filter = DISPATCH_EVFILT_TIMER,
},
.mask = DISPATCH_TIMER_INTERVAL|DISPATCH_TIMER_ONESHOT|DISPATCH_TIMER_ABSOLUTE|DISPATCH_TIMER_WALL_CLOCK,
};2、do_vtable
1
2
3
4
5
6
7
8 const struct dispatch_source_vtable_s _dispatch_source_kevent_vtable = {
.do_type = DISPATCH_SOURCE_KEVENT_TYPE,
.do_kind = "kevent-source",
.do_invoke = _dispatch_source_invoke,
.do_dispose = _dispatch_source_dispose,
.do_probe = _dispatch_source_probe,
.do_debug = _dispatch_source_kevent_debug,
};
dispatch_source_set_event_handler
1 | void dispatch_source_set_event_handler(dispatch_source_t ds, dispatch_block_t handler) |
先对handler进行了copy,然后调用了dispatch_barrier_async_f
,这个方法和queue篇中的dispatch_async_f
极其相似:把block设置到续体上,然后压入目标队列,同时可以发现,dispatch_barrier_async_f
方法中直接把ds当成队列使用了,这也无可厚非,它们极其相似的定义就决定了这是可行的。1
2
3
4
5
6
7
8
9
10
11
12void dispatch_barrier_async_f(dispatch_queue_t dq, void *context, dispatch_function_t func)
{
dispatch_continuation_t dc = fastpath(_dispatch_continuation_alloc_cacheonly());
if (!dc) return _dispatch_barrier_async_f_slow(dq, context, func);
dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
dc->dc_func = func;
dc->dc_ctxt = context;
_dispatch_queue_push(dq, dc);
}
再来关注下_dispatch_source_set_event_handler2
1
2
3
4
5
6
7
8
9
10
11
12
13static void _dispatch_source_set_event_handler2(void *context)
{
struct Block_layout *bl = context;
dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
if (ds->ds_handler_is_block && ds->ds_handler_ctxt) {
Block_release(ds->ds_handler_ctxt);
}
ds->ds_handler_func = bl ? (void *)bl->invoke : NULL;
ds->ds_handler_ctxt = bl;
ds->ds_handler_is_block = true;
}
这个方法主要是保存一下上下文,回头使用。
dispatch_source_set_cancel_handler
1 | void dispatch_source_set_cancel_handler(dispatch_source_t ds, |
设置cancel回调,可以看到source是支持cancel的。1
2
3
4
5
6
7
8
9
10
11static void _dispatch_source_set_cancel_handler2(void *context)
{
dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
dispatch_assert(ds->do_vtable == &_dispatch_source_kevent_vtable);
if (ds->ds_cancel_is_block && ds->ds_cancel_handler) {
Block_release(ds->ds_cancel_handler);
}
ds->ds_cancel_handler = context;
ds->ds_cancel_is_block = true;
}
dispatch_source_cancel
1 | void dispatch_source_cancel(dispatch_source_t ds) |
dispatch_source_get_xxx
1 | unsigned long dispatch_source_get_mask(dispatch_source_t ds) |
dispatch_source_merge_data
Merges data into a dispatch source of type DISPATCH_SOURCE_TYPE_DATA_ADD or DISPATCH_SOURCE_TYPE_DATA_OR and submits its event handler block to its target queue
1 | void dispatch_source_merge_data(dispatch_source_t ds, unsigned long val) |
1 | void _dispatch_source_merge_kevent(dispatch_source_t ds, const struct kevent *ke) |
dispatch_resume
1 | void dispatch_resume(dispatch_object_t dou) |
由前面的创建过程可以知晓,一般情况下,创建一个source,其do_suspend_cnt
都是DISPATCH_OBJECT_SUSPEND_INTERVAL
,表示暂停状态,需要调用dispatch_resume
手动启动。
很明显,上面的switch分支应该走DISPATCH_OBJECT_SUSPEND_INTERVAL
,从而调用_dispatch_wakeup
。
_dispatch_wakeup
在queue篇中也有分析到,它主要和do_invoke
有关,而source中的do_invoke
指向了_dispatch_source_invoke
,这个方法会执行所有的source actions,每一个action都会保证在合适的队列上执行,如果当前的队列与action不对应,那么正确的队列将会返回,然后该action将会在返回的正确队列上执行,本文的核心重点就是这个了。
1 | dispatch_queue_t _dispatch_source_invoke(dispatch_source_t ds) |
该函数中多次提到了管理队列_dispatch_mgr_q
,有些情况必须切到管理队列上执行,比如:
- ds_is_installed为假;
- ds_dkev不为空;
- ds_needs_rearm
关于_dispatch_mgr_q
,后文会详细分析。
_dispatch_kevent_merge
- Find existing kevents, and merge any new flags if necessary
1 | void _dispatch_kevent_merge(dispatch_source_t ds) |
- 1、_dispatch_source_init_tail_queue_array
1 | static void |
1 | #define DSL_HASH_SIZE 256u |
- 2、_dispatch_kevent_find
1 | static dispatch_kevent_t _dispatch_kevent_find(uintptr_t ident, short filter) |
- 3、_dispatch_kevent_resume
1 | void _dispatch_kevent_resume(dispatch_kevent_t dk, uint32_t new_flags, uint32_t del_flags) |
_dispatch_source_cancel_callout
主要执行一些清理工作
1 | void |
_dispatch_source_latch_and_call
1 | void _dispatch_source_latch_and_call(dispatch_source_t ds) |
管理队列
关于_dispatch_mgr_q
,我们在queue篇中提到过,但是一直没有看到它的使用场合,它和source的关系很紧密,这里重点分析一下:
1 | struct dispatch_queue_s _dispatch_mgr_q = { |
看看管理队列的函数指针do_vtable的指向:_dispatch_queue_mgr_vtable
1 | static const struct dispatch_queue_vtable_s _dispatch_queue_mgr_vtable = { |
毫无疑问,这里面最重要的就是do_invoke和do_probe了,同时这也是我们探寻source底层的入口了
接下来就需要一些IO多路复用的知识了:
- 同步阻塞、异步阻塞、同步非阻塞等
- select、epoll、kqueue等
_dispatch_mgr_wakeup
1 | static bool _dispatch_mgr_wakeup(dispatch_queue_t dq) |
1 | void |
除了EVFILT_READ
和EVFILT_WRITE
两种类型外,其它的事件类型都调用_dispatch_source_drain_kevent
1 | void _dispatch_source_drain_kevent(struct kevent *ke) |
_dispatch_mgr_invoke
1 | static dispatch_queue_t _dispatch_mgr_invoke(dispatch_queue_t dq) |
不管是特殊处理的路径,还是正常的路径,二者均调用到了_dispatch_mgr_thread2
:1
2
3
4
5
6
7
8
9
10
11
12
13
14static void _dispatch_mgr_thread2(struct kevent *kev, size_t cnt)
{
size_t i;
for (i = 0; i < cnt; i++) {
// EVFILT_USER未被source使用
if (kev[i].filter == EVFILT_USER) {
_dispatch_queue_serial_drain_till_empty(&_dispatch_mgr_q);
}
else {
_dispatch_source_drain_kevent(&kev[i]);
}
}
}
1 | void _dispatch_queue_serial_drain_till_empty(dispatch_queue_t dq) |
总结
本篇的篇幅较长,也比较复杂,而且source本身相对于GCD中的其它API也是最陌生的。本文中涉及到的最多的就是IO多路复用和管理队列,抓住这两个重点,就不难理解dispatch_source的底层原理了。