GCD源码分析6 —— dispatch_source篇

前言

相比较而言,GCD中最不引人注目的就是dispatch_source了,它是BSD系统内核惯有功能kqueue的包装,可以一个监视某些类型事件的发生。当这些事件发生时,它自动将一个block放入dispatch queue的执行例程中。如果您对IO多路复用有一定的了解,那么就很容易能理解dispatch_source

kqueue

kqueue是IO多路复用在BSD系统中的一种实现,它的接口主要包括 kqueue()、kevent() 两个系统调用和 struct kevent 结构:

  • 1、kqueue() 生成一个内核事件队列,返回该队列的文件描述符。

    int     kqueue(void);
    
  • 2、kevent() 提供向内核注册/反注册事件和返回就绪事件或错误事件。

    int     kevent(int kq, 
             const struct kevent *changelist, int nchanges,
             struct kevent *eventlist, int nevents,
             const struct timespec *timeout);
    
  • 3、struct kevent 就是kevent()操作的最基本的事件结构。

    struct kevent { 
       uintptr_t ident;        /* 事件 ID */ 
       short     filter;       /* 事件过滤器 */ 
       u_short   flags;        /* 行为标识 */ 
       u_int     fflags;       /* 过滤器标识值 */ 
       intptr_t  data;         /* 过滤器数据 */ 
       void      *udata;       /* 应用透传数据 */ 
    };
    

在一个 kqueue 中,{ident, filter} 确定一个唯一的事件:

  • 1、ident 事件的 id,一般设置为文件描述符。
  • 2、filter 可以将 kqueue filter 看作事件。内核检测 ident 上注册的 filter 的状态,状态发生了变化,就通知应用程序。kqueue 定义了较多的 filter:
    #define EVFILT_READ         (-1)
    #define EVFILT_WRITE        (-2)
    #define EVFILT_AIO          (-3)    /* attached to aio requests */
    #define EVFILT_VNODE        (-4)    /* attached to vnodes */
    #define EVFILT_PROC         (-5)    /* attached to struct proc */
    #define EVFILT_SIGNAL       (-6)    /* attached to struct proc */
    #define EVFILT_TIMER        (-7)    /* timers */
    #define EVFILT_MACHPORT     (-8)    /* Mach portsets */
    #define EVFILT_FS           (-9)    /* Filesystem events */
    #define EVFILT_USER         (-10)   /* User events */
    
  • 3、行为标志flags:
    #define EV_ADD              0x0001      /* add event to kq (implies enable) */
    #define EV_DELETE           0x0002      /* delete event from kq */
    #define EV_ENABLE           0x0004      /* enable event */
    #define EV_DISABLE          0x0008      /* disable event (not reported) */
    

事件类型

在source.h中可以发现dispatch_source可以支持以下事件类型:

DISPATCH_SOURCE_TYPE_DATA_ADD:      自定义事件
DISPATCH_SOURCE_TYPE_DATA_OR:       自定义事件
DISPATCH_SOURCE_TYPE_MACH_SEND:     Mach端口发送事件。
DISPATCH_SOURCE_TYPE_MACH_RECV:     Mach端口接收事件。
DISPATCH_SOURCE_TYPE_PROC:          进程相关的事件。
DISPATCH_SOURCE_TYPE_READ:          读文件事件。
DISPATCH_SOURCE_TYPE_WRITE:         写文件事件。
DISPATCH_SOURCE_TYPE_VNODE:         文件属性更改事件。
DISPATCH_SOURCE_TYPE_SIGNAL:        接收信号事件。
DISPATCH_SOURCE_TYPE_TIMER:         定时器事件。
DISPATCH_SOURCE_TYPE_MEMORYPRESSURE:内存压力事件。

使用步骤

由于dispatch_source的使用场合较少,所以很有必要对dispatch_source的使用进行介绍:

// 1、创建dispatch源,这里使用加法来合并dispatch源数据,最后一个参数是指定dispatch队列
dispatch_source_t source = dispatch_source_create(dispatch_source_type, handler, mask, dispatch_queue);

// 2、设置响应dispatch源事件的block,在dispatch源指定的队列上运行
dispatch_source_set_event_handler(source, ^{ 
  //可以通过dispatch_source_get_data(source)来得到dispatch源数据
});

// 3、dispatch源创建后处于suspend状态,所以需要启动dispatch源
dispatch_resume(source);

// 4、合并dispatch源数据
dispatch_source_merge_data(source, value);

定时器

在使用定时器时,NSTimer是首先被想到的,但是由于NSTimer会受RunLoop影响,当RunLoop处理的任务很多时,就会导致NSTimer的精度降低,所以在一些对定时器精度要求很高的情况下,我们会考虑CADisplaylink,但是实际上也可以考虑使用GCD定时器。

dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
dispatch_queue_t timer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, queue);

dispatch_source_set_timer(self.timer, dispatch_time(DISPATCH_TIME_NOW, 3.0 * NSEC_PER_SEC), 2.0 * NSEC_PER_SEC, 0);

//设置回调
dispatch_source_set_event_handler(timer, ^{
    // 处理逻辑
});
//启动timer
dispatch_resume(self.timer);

监视文件

dispatch_source_t fileMonitor(const char* filename) 
{ 
    int fd = open(filename, O_EVTONLY); 
    if (fd == -1) return NULL; 

    dispatch_queue_t queue = dispatch_get_global_queue(0, 0); 
    dispatch_source_t source = dispatch_source_create(DISPATCH_SOURCE_TYPE_VNODE, fd, DISPATCH_VNODE_RENAME, queue); 

    // 保存原文件名
    int length = strlen(filename); 
    char* newString = (char*)malloc(length + 1); 
    newString = strcpy(newString, filename); 
    dispatch_set_context(source, newString); 

    // 设置事件发生的回调
    dispatch_source_set_event_handler(source, ^{ 
        // 获取原文件名
        const char*  oldFilename = (char*)dispatch_get_context(source); 

        // 文件名变化逻辑处理
        ... 
    }); 

    // 设置取消回调 
    dispatch_source_set_cancel_handler(source, ^{ 
        char* fileStr = (char*)dispatch_get_context(source); 
        free(fileStr); 
        close(fd); 
    }); 

    // 启动
    dispatch_resume(source); 

   return source; 
}

常用API源码分析

dispatch_source_s

struct dispatch_source_s {
    DISPATCH_STRUCT_HEADER(dispatch_source_s, dispatch_source_vtable_s);
    DISPATCH_QUEUE_HEADER;
    // Instruments always copies DISPATCH_QUEUE_MIN_LABEL_SIZE, which is 64,
    // so the remainder of the structure must be big enough
    union {
        char _ds_pad[DISPATCH_QUEUE_MIN_LABEL_SIZE];
        struct {
            char dq_label[8];
            dispatch_kevent_t ds_dkev;

            dispatch_source_handler_function_t ds_handler_func;
            void *ds_handler_ctxt;

            void *ds_cancel_handler;

            unsigned int ds_is_level:1,
            ds_is_adder:1,
            ds_is_installed:1,
            ds_needs_rearm:1,
            ds_is_armed:1,
            ds_is_legacy:1,
            ds_cancel_is_block:1,
            ds_handler_is_block:1;

            unsigned int ds_atomic_flags;

            unsigned long ds_data;
            unsigned long ds_pending_data;
            unsigned long ds_pending_data_mask;

            TAILQ_ENTRY(dispatch_source_s) ds_list;

            unsigned long ds_ident_hack;

            struct dispatch_timer_source_s ds_timer;
        };
    };
};

如果大家对dispatch_queue_s的定义还有印象的话,那么可以发现dispatch_source_s的定义就是dispatch_queue_s的覆盖版:

struct dispatch_queue_s {
    DISPATCH_STRUCT_HEADER(dispatch_queue_s, dispatch_queue_vtable_s);
    DISPATCH_QUEUE_HEADER;
    char dq_label[DISPATCH_QUEUE_MIN_LABEL_SIZE];   // must be last
};

由于dispatch_source_s的第三项是一个联合体,其中的char _ds_pad[DISPATCH_QUEUE_MIN_LABEL_SIZE];dispatch_queue_s中的char dq_label[DISPATCH_QUEUE_MIN_LABEL_SIZE];就是基本上一回事,所以这样就可以利用dispatch_queue_s的函数对dispatch_source_s进行初始化,之后再针对不同的字段进行覆盖。

dispatch_source_create

按照惯例,还是先从创建说起:

dispatch_source_t 
dispatch_source_create(dispatch_source_type_t type,
    uintptr_t handle,
    unsigned long mask,
    dispatch_queue_t q)
{
    // 注意这里和kevent扯上关系了
    const struct kevent *proto_kev = &type->ke;

    dispatch_source_t ds = NULL;
    dispatch_kevent_t dk = NULL;

    // 基本校验
    ...

    // 申请dispatch_source_t内存空间
    ds = calloc(1ul, sizeof(struct dispatch_source_s));

    // 申请dispatch_kevent_s内存空间
    dk = calloc(1ul, sizeof(struct dispatch_kevent_s));

    // 设置dispatch_kevent_s字段
    dk->dk_kevent = *proto_kev;
    dk->dk_kevent.ident = handle;
    dk->dk_kevent.flags |= EV_ADD|EV_ENABLE;
    dk->dk_kevent.fflags |= (uint32_t)mask;
    dk->dk_kevent.udata = dk;
    TAILQ_INIT(&dk->dk_sources);

    // 调用队列方法_dispatch_queue_init初始化ds
    _dispatch_queue_init((dispatch_queue_t)ds);
    strlcpy(ds->dq_label, "source", sizeof(ds->dq_label));

    // 设置dispatch_source_t特有字段,对上述的_ds_pad的空间进行覆盖
    ds->do_vtable = &_dispatch_source_kevent_vtable;//设置针对source的source_kevent_vtable
    ds->do_ref_cnt++; // 引用计数
    ds->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_INTERVAL;//处于暂停状态,需要手动开启。
    ds->do_targetq = q;// 表示事件触发的回调在该队列执行

    // Dispatch Source
    ds->ds_ident_hack = dk->dk_kevent.ident;
    ds->ds_dkev = dk;
    ds->ds_pending_data_mask = dk->dk_kevent.fflags;
    if ((EV_DISPATCH|EV_ONESHOT) & proto_kev->flags) {
        if (proto_kev->filter != EVFILT_MACHPORT) {
            ds->ds_is_level = true;
        }
        ds->ds_needs_rearm = true;
    } else if (!(EV_CLEAR & proto_kev->flags)) {
        // we cheat and use EV_CLEAR to mean a "flag thingy"
        ds->ds_is_adder = true;
    }

    // If its a timer source, it needs to be re-armed
    if (type->ke.filter == DISPATCH_EVFILT_TIMER) {
        ds->ds_needs_rearm = true;
    }

    // Some sources require special processing
    if (type == DISPATCH_SOURCE_TYPE_MACH_SEND) {
        static dispatch_once_t pred;
        dispatch_once_f(&pred, NULL, _dispatch_mach_notify_source_init);
    } else if (type == DISPATCH_SOURCE_TYPE_TIMER) {
        ds->ds_timer.flags = mask;
    }

    _dispatch_retain(ds->do_targetq);
    return ds;

out_bad:
    free(ds);
    free(dk);
    return NULL;
}
  • 1、dispatch_source_type_s
    dispatch_source_type_s有两个字段,一个kevent,另一个是mask
    ```c
    struct dispatch_source_type_s {
    struct kevent ke;
    uint64_t mask;
    };

// 以_dispatch_source_type_timer为例:
const struct dispatch_source_type_s _dispatch_source_type_timer = {
.ke = {
.filter = DISPATCH_EVFILT_TIMER,
},
.mask = DISPATCH_TIMER_INTERVAL|DISPATCH_TIMER_ONESHOT|DISPATCH_TIMER_ABSOLUTE|DISPATCH_TIMER_WALL_CLOCK,
};

>* 2、do_vtable
```c
const struct dispatch_source_vtable_s _dispatch_source_kevent_vtable = {
    .do_type = DISPATCH_SOURCE_KEVENT_TYPE,
    .do_kind = "kevent-source",
    .do_invoke = _dispatch_source_invoke,
    .do_dispose = _dispatch_source_dispose,
    .do_probe = _dispatch_source_probe,
    .do_debug = _dispatch_source_kevent_debug,
};

dispatch_source_set_event_handler

void dispatch_source_set_event_handler(dispatch_source_t ds, dispatch_block_t handler)
{
    handler = _dispatch_Block_copy(handler);
    dispatch_barrier_async_f((dispatch_queue_t)ds,
        handler, _dispatch_source_set_event_handler2);
}

先对handler进行了copy,然后调用了dispatch_barrier_async_f,这个方法和queue篇中的dispatch_async_f极其相似:把block设置到续体上,然后压入目标队列,同时可以发现,dispatch_barrier_async_f方法中直接把ds当成队列使用了,这也无可厚非,它们极其相似的定义就决定了这是可行的。

void dispatch_barrier_async_f(dispatch_queue_t dq, void *context, dispatch_function_t func)
{
    dispatch_continuation_t dc = fastpath(_dispatch_continuation_alloc_cacheonly());

    if (!dc) return _dispatch_barrier_async_f_slow(dq, context, func);

    dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
    dc->dc_func = func;
    dc->dc_ctxt = context;

    _dispatch_queue_push(dq, dc);
}

再来关注下_dispatch_source_set_event_handler2

static void _dispatch_source_set_event_handler2(void *context)
{
    struct Block_layout *bl = context;

    dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();

    if (ds->ds_handler_is_block && ds->ds_handler_ctxt) {
        Block_release(ds->ds_handler_ctxt);
    }
    ds->ds_handler_func = bl ? (void *)bl->invoke : NULL;
    ds->ds_handler_ctxt = bl;
    ds->ds_handler_is_block = true;
}

这个方法主要是保存一下上下文,回头使用。

dispatch_source_set_cancel_handler

void dispatch_source_set_cancel_handler(dispatch_source_t ds,
    dispatch_block_t handler)
{
    handler = _dispatch_Block_copy(handler);
    dispatch_barrier_async_f((dispatch_queue_t)ds,
                             handler, _dispatch_source_set_cancel_handler2);
}

设置cancel回调,可以看到source是支持cancel的。

static void _dispatch_source_set_cancel_handler2(void *context)
{
    dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
    dispatch_assert(ds->do_vtable == &_dispatch_source_kevent_vtable);

    if (ds->ds_cancel_is_block && ds->ds_cancel_handler) {
        Block_release(ds->ds_cancel_handler);
    }
    ds->ds_cancel_handler = context;
    ds->ds_cancel_is_block = true;
}

dispatch_source_cancel

void dispatch_source_cancel(dispatch_source_t ds)
{
    _dispatch_retain(ds);

    // 设置取消标志
    dispatch_atomic_or(&ds->ds_atomic_flags, DSF_CANCELED);

    // 唤醒source,执行取消操作,解除source注册,最后进行资源释放
    _dispatch_wakeup(ds);

    _dispatch_release(ds);
}

dispatch_source_get_xxx

unsigned long dispatch_source_get_mask(dispatch_source_t ds)
{
    return ds->ds_pending_data_mask;// mask
}

uintptr_t dispatch_source_get_handle(dispatch_source_t ds)
{
    return (int)ds->ds_ident_hack;// 回调函数地址存储在ds_ident_hack字段中
}

unsigned long dispatch_source_get_data(dispatch_source_t ds)
{
    return ds->ds_data;
}

dispatch_source_merge_data

Merges data into a dispatch source of type DISPATCH_SOURCE_TYPE_DATA_ADD or DISPATCH_SOURCE_TYPE_DATA_OR and submits its event handler block to its target queue

void dispatch_source_merge_data(dispatch_source_t ds, unsigned long val)
{   
    struct kevent kev = {
        .fflags = (typeof(kev.fflags))val,
        .data = val,
    };

    _dispatch_source_merge_kevent(ds, &kev);
}
void _dispatch_source_merge_kevent(dispatch_source_t ds, const struct kevent *ke)
{
    struct kevent fake;

    // 如果source已经被取消或者引用计数为0,直接返回
    if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == 0)) {
        return;
    }

    // 发生错误之后的清理工作
    if (ke->flags & EV_ERROR) {
        if (ke->filter == EVFILT_PROC && ke->data == ESRCH) {
            fake = *ke;
            fake.flags &= ~EV_ERROR;
            fake.fflags = NOTE_EXIT;
            fake.data = 0;
            ke = &fake;
        } else {
            // log the unexpected error
            dispatch_assume_zero(ke->data);
            return;
        }
    }

    if (ds->ds_is_level) {
        // EV_EOF
        dispatch_assert(ke->data >= 0l);
        ds->ds_pending_data = ~ke->data;
    } 
    else if (ds->ds_is_adder) {
        dispatch_atomic_add(&ds->ds_pending_data, ke->data);
    } 
    else {
        dispatch_atomic_or(&ds->ds_pending_data, ke->fflags & ds->ds_pending_data_mask);
    }

    // EV_DISPATCH and EV_ONESHOT sources 仅仅触发一次
    if (ds->ds_needs_rearm) {
        ds->ds_is_armed = false;
    }

    // 唤醒source
    _dispatch_wakeup(ds);
}

dispatch_resume

void dispatch_resume(dispatch_object_t dou)
{
    if (slowpath(dou._do->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) {
        return;
    }
    switch (dispatch_atomic_sub(&dou._do->do_suspend_cnt, DISPATCH_OBJECT_SUSPEND_INTERVAL) + DISPATCH_OBJECT_SUSPEND_INTERVAL) {
    case DISPATCH_OBJECT_SUSPEND_INTERVAL:
        _dispatch_wakeup(dou._do);
        break;
    case 0:
        DISPATCH_CLIENT_CRASH("Over-resume of an object");
        break;
    default:
        break;
    }
}

由前面的创建过程可以知晓,一般情况下,创建一个source,其do_suspend_cnt都是DISPATCH_OBJECT_SUSPEND_INTERVAL,表示暂停状态,需要调用dispatch_resume手动启动。
很明显,上面的switch分支应该走DISPATCH_OBJECT_SUSPEND_INTERVAL,从而调用_dispatch_wakeup

_dispatch_wakeup在queue篇中也有分析到,它主要和do_invoke有关,而source中的do_invoke指向了_dispatch_source_invoke,这个方法会执行所有的source actions,每一个action都会保证在合适的队列上执行,如果当前的队列与action不对应,那么正确的队列将会返回,然后该action将会在返回的正确队列上执行,本文的核心重点就是这个了。

dispatch_queue_t _dispatch_source_invoke(dispatch_source_t ds)
{    
    // 获取当前线程所在的队列
    dispatch_queue_t dq = _dispatch_queue_get_current();

    // 如果source还没有安装,则调用_dispatch_kevent_merge进行安装到管理队列上
    if (!ds->ds_is_installed) {
        // The source needs to be installed on the manager queue.
        if (dq != &_dispatch_mgr_q) {
            return &_dispatch_mgr_q;
        }

        _dispatch_kevent_merge(ds);
    } 
    else if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == 0)) {
        // source已经被取消,需要从管理队列中卸载掉,卸载完成后,取消回调需要发送到目标队列
        // 可以看到,如果source的ds_dkev不为空,就需要管理队列,否则目标队列就可以了
        if (ds->ds_dkev) {
            if (dq != &_dispatch_mgr_q) {
                return &_dispatch_mgr_q;
            }
            _dispatch_kevent_release(ds);
            return ds->do_targetq;
        } 
        else if (ds->ds_cancel_handler) {
            if (dq != ds->do_targetq) {
                return ds->do_targetq;
            }
        }   
        // cancel回调
        _dispatch_source_cancel_callout(ds);
    } 
    else if (ds->ds_pending_data) {
        // source有未决的数据,需要通过在目标队列上通过回调传送
        if (dq != ds->do_targetq) {
            return ds->do_targetq;
        }
        // 
        _dispatch_source_latch_and_call(ds);

        // 有些source需要进行rearmed,必须切到管理队列上
        if (ds->ds_needs_rearm) {
            return &_dispatch_mgr_q;
        }
    } 
    else if (ds->ds_needs_rearm && !ds->ds_is_armed) {
        // 在管理队列上进行rearmed
        if (dq != &_dispatch_mgr_q) {
            return &_dispatch_mgr_q;
        }
        _dispatch_kevent_resume(ds->ds_dkev, 0, 0);
        ds->ds_is_armed = true;
    }

    return NULL;
}

该函数中多次提到了管理队列_dispatch_mgr_q,有些情况必须切到管理队列上执行,比如:

  • ds_is_installed为假;
  • ds_dkev不为空;
  • ds_needs_rearm
    关于_dispatch_mgr_q,后文会详细分析。

_dispatch_kevent_merge

  • Find existing kevents, and merge any new flags if necessary
void _dispatch_kevent_merge(dispatch_source_t ds)
{
    static dispatch_once_t pred;
    dispatch_kevent_t dk;
    typeof(dk->dk_kevent.fflags) new_flags;
    bool do_resume = false;

    if (ds->ds_is_installed) {
        return;
    }
    ds->ds_is_installed = true;

    // 初始化_dispatch_source_init_tail_queue_array
    dispatch_once_f(&pred, NULL, _dispatch_source_init_tail_queue_array
        );

    // 寻找是否有相同的事件(dispatch kevent)被观察
    dk = _dispatch_kevent_find(ds->ds_dkev->dk_kevent.ident, ds->ds_dkev->dk_kevent.filter);

    if (dk) {
        // 如果存在有相同的事件,则需要检查是否有新的flags需要更新,从这里可以看出,kevent
        // 并不依赖于source,而是独立存在的。
        new_flags = ~dk->dk_kevent.fflags & ds->ds_dkev->dk_kevent.fflags;
        dk->dk_kevent.fflags |= ds->ds_dkev->dk_kevent.fflags;
        free(ds->ds_dkev);
        ds->ds_dkev = dk;
        do_resume = new_flags;
    }
    else {
        // 如果不存在,则直接将dk插入到事件队列中
        dk = ds->ds_dkev;
        _dispatch_kevent_insert(dk);
        new_flags = dk->dk_kevent.fflags;
        do_resume = true;
    }

    // 将source的ds_list插入到kevent的source队列中
    TAILQ_INSERT_TAIL(&dk->dk_sources, ds, ds_list);

    // 如果flags有更新,则需要重新注册kevent事件
    if (do_resume) {
        dk->dk_kevent.flags |= EV_ADD;
        _dispatch_kevent_resume(ds->ds_dkev, new_flags, 0);
        ds->ds_is_armed = true;
    }
}
  • 1、_dispatch_source_init_tail_queue_array
static void
_dispatch_source_init_tail_queue_array(void *context __attribute__((unused)))
{
    unsigned int i;
    for (i = 0; i < DSL_HASH_SIZE; i++) {
        TAILQ_INIT(&_dispatch_sources[i]);
    }

    TAILQ_INSERT_TAIL(&_dispatch_sources[DSL_HASH(DISPATCH_TIMER_INDEX_WALL)], &_dispatch_kevent_timer[DISPATCH_TIMER_INDEX_WALL], dk_list);
    TAILQ_INSERT_TAIL(&_dispatch_sources[DSL_HASH(DISPATCH_TIMER_INDEX_MACH)], &_dispatch_kevent_timer[DISPATCH_TIMER_INDEX_MACH], dk_list);

    TAILQ_INSERT_TAIL(&_dispatch_sources[0], &_dispatch_kevent_data_or, dk_list);
    TAILQ_INSERT_TAIL(&_dispatch_sources[0], &_dispatch_kevent_data_add, dk_list);
}
#define DSL_HASH_SIZE 256u  
#define DSL_HASH(x) ((x) & (DSL_HASH_SIZE - 1))
static TAILQ_HEAD(, dispatch_kevent_s) _dispatch_sources[DSL_HASH_SIZE];

#define DISPATCH_TIMER_INDEX_WALL 0
#define DISPATCH_TIMER_INDEX_MACH 1
static struct dispatch_kevent_s _dispatch_kevent_timer[] = {
    {
        .dk_kevent = {
            .ident = DISPATCH_TIMER_INDEX_WALL,
            .filter = DISPATCH_EVFILT_TIMER,
            .udata = &_dispatch_kevent_timer[0],
        },
        .dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_timer[0].dk_sources),
    },
    {
        .dk_kevent = {
            .ident = DISPATCH_TIMER_INDEX_MACH,
            .filter = DISPATCH_EVFILT_TIMER,
            .udata = &_dispatch_kevent_timer[1],
        },
        .dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_timer[1].dk_sources),
    },
};
  • 2、_dispatch_kevent_find
static dispatch_kevent_t _dispatch_kevent_find(uintptr_t ident, short filter)
{
    uintptr_t hash = DSL_HASH(filter == EVFILT_MACHPORT ? MACH_PORT_INDEX(ident) : ident);
    dispatch_kevent_t dki;

    TAILQ_FOREACH(dki, &_dispatch_sources[hash], dk_list) {
        // ident和filter都相同才算同一个
        if (dki->dk_kevent.ident == ident && dki->dk_kevent.filter == filter) {
            break;
        }
    }
    return dki;
}
  • 3、_dispatch_kevent_resume
void _dispatch_kevent_resume(dispatch_kevent_t dk, uint32_t new_flags, uint32_t del_flags)
{
    switch (dk->dk_kevent.filter) {
    case DISPATCH_EVFILT_TIMER:
    case DISPATCH_EVFILT_CUSTOM_ADD:
    case DISPATCH_EVFILT_CUSTOM_OR:
        // these types not registered with kevent
        return;
    case EVFILT_MACHPORT:
        _dispatch_kevent_machport_resume(dk, new_flags, del_flags);
        break;
    case EVFILT_PROC:
        if (dk->dk_kevent.flags & EV_ONESHOT) {
            return;
        }
        // fall through
    default:
        _dispatch_update_kq(&dk->dk_kevent);
        if (dk->dk_kevent.flags & EV_DISPATCH) {
            dk->dk_kevent.flags &= ~EV_ADD;
        }
        break;
    }
}

_dispatch_source_cancel_callout

主要执行一些清理工作

void
_dispatch_source_cancel_callout(dispatch_source_t ds)
{
    ds->ds_pending_data_mask = 0;
    ds->ds_pending_data = 0;
    ds->ds_data = 0;

#ifdef __BLOCKS__
    // block的释放和相关清理
    if (ds->ds_handler_is_block) {
        Block_release(ds->ds_handler_ctxt);
        ds->ds_handler_is_block = false;
        ds->ds_handler_func = NULL;
        ds->ds_handler_ctxt = NULL;
    }
#endif

    // 如果ds->ds_cancel_handler为空,直接返回
    if (!ds->ds_cancel_handler) {
        return;
    }

    if (ds->ds_cancel_is_block) {
#ifdef __BLOCKS__
        // 如果回调是block
        dispatch_block_t b = ds->ds_cancel_handler;
        if (ds->ds_atomic_flags & DSF_CANCELED) {
            // 执行取消回调
            b();
        }
        Block_release(ds->ds_cancel_handler);
        ds->ds_cancel_is_block = false;
#endif
    } 
    else {
        // 如果回调是函数
        dispatch_function_t f = ds->ds_cancel_handler;
        if (ds->ds_atomic_flags & DSF_CANCELED) {
            f(ds->do_ctxt);
        }
    }

    // 执行完后,设置ds_cancel_handler为空
    ds->ds_cancel_handler = NULL;
}

_dispatch_source_latch_and_call

void _dispatch_source_latch_and_call(dispatch_source_t ds)
{
    unsigned long prev;

    if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == 0)) {
        return;
    }
    prev = dispatch_atomic_xchg(&ds->ds_pending_data, 0);
    if (ds->ds_is_level) {
        ds->ds_data = ~prev;
    } else {
        ds->ds_data = prev;
    }
    if (dispatch_assume(prev)) {
        if (ds->ds_handler_func) {
            ds->ds_handler_func(ds->ds_handler_ctxt, ds);
        }
    }
}

管理队列

关于_dispatch_mgr_q,我们在queue篇中提到过,但是一直没有看到它的使用场合,它和source的关系很紧密,这里重点分析一下:

struct dispatch_queue_s _dispatch_mgr_q = {
    .do_vtable = &_dispatch_queue_mgr_vtable,
    .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
    .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
    .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
    .do_targetq = &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_COUNT - 1],

    .dq_label = "com.apple.libdispatch-manager",
    .dq_width = 1,
    .dq_serialnum = 2,
};

看看管理队列的函数指针do_vtable的指向:_dispatch_queue_mgr_vtable

static const struct dispatch_queue_vtable_s _dispatch_queue_mgr_vtable = {
    .do_type = DISPATCH_QUEUE_MGR_TYPE,
    .do_kind = "mgr-queue",
    .do_invoke = _dispatch_mgr_invoke,
    .do_debug = dispatch_queue_debug,
    .do_probe = _dispatch_mgr_wakeup,
};

毫无疑问,这里面最重要的就是do_invoke和do_probe了,同时这也是我们探寻source底层的入口了
接下来就需要一些IO多路复用的知识了:

  • 同步阻塞、异步阻塞、同步非阻塞等
  • select、epoll、kqueue等

_dispatch_mgr_wakeup

static bool _dispatch_mgr_wakeup(dispatch_queue_t dq)
{
    static const struct kevent kev = {
        .ident = 1,
        .filter = EVFILT_USER,
#ifdef EV_TRIGGER
        .flags = EV_TRIGGER,
#endif
#ifdef NOTE_TRIGGER
        .fflags = NOTE_TRIGGER,
#endif
    };
    _dispatch_update_kq(&kev);

    return false;
}
void
_dispatch_update_kq(const struct kevent *kev)
{
    struct kevent kev_copy = *kev;
    kev_copy.flags |= EV_RECEIPT;

    // 从kqueue中删除kevent
    if (kev_copy.flags & EV_DELETE) {
        switch (kev_copy.filter) {
        case EVFILT_READ:
            // 清除标记
            if (FD_ISSET((int)kev_copy.ident, &_dispatch_rfds)) {
                FD_CLR((int)kev_copy.ident, &_dispatch_rfds);
                _dispatch_rfd_ptrs[kev_copy.ident] = 0;
                return;
            }
        case EVFILT_WRITE:
            // 清除标记
            if (FD_ISSET((int)kev_copy.ident, &_dispatch_wfds)) {
                FD_CLR((int)kev_copy.ident, &_dispatch_wfds);
                _dispatch_wfd_ptrs[kev_copy.ident] = 0;
                return;
            }
        default:
            break;
        }
    }

    // 监视kev_copy中的事件,如果列表里有任何就绪的fd,则把该事件对应的结构体放进
    // 第4个参数kev_copy列表里面。该方法调用后会阻塞,直到有事件就绪。
    int rval = kevent(_dispatch_get_kq(), &kev_copy, 1, &kev_copy, 1, NULL);


    switch (kev_copy.data) {
    case 0:
        return;
    case EBADF:
        break;
    default:
        // 如果是因为kevent正在读数据或者写数据,导致上面的注册失败,那么需要设置_dispatch_select_workaround为true,然后回头调用select
        switch (kev_copy.filter) {
        case EVFILT_READ:
            _dispatch_select_workaround = true;
            FD_SET((int)kev_copy.ident, &_dispatch_rfds);
            _dispatch_rfd_ptrs[kev_copy.ident] = kev_copy.udata;//读到的数据
            break;
        case EVFILT_WRITE:
            _dispatch_select_workaround = true;
            FD_SET((int)kev_copy.ident, &_dispatch_wfds);
            _dispatch_wfd_ptrs[kev_copy.ident] = kev_copy.udata;//写入的数据
            break;
        default:
            _dispatch_source_drain_kevent(&kev_copy); 
            break;
        }
        break;
    }
}

除了EVFILT_READEVFILT_WRITE两种类型外,其它的事件类型都调用_dispatch_source_drain_kevent

void _dispatch_source_drain_kevent(struct kevent *ke)
{
    static dispatch_once_t pred;
    dispatch_kevent_t dk = ke->udata;
    dispatch_source_t dsi;

    // machport事件类型的处理
    if (ke->filter == EVFILT_MACHPORT) {
        return _dispatch_drain_mach_messages(ke);
    }

    // flags设置
    if (ke->flags & EV_ONESHOT) {
        dk->dk_kevent.flags |= EV_ONESHOT;
    }

    // 遍历dk->dk_sources,对里面的每一个source,执行_dispatch_source_merge_kevent
    // 表示事件已经就绪
    TAILQ_FORACH(dsi, &dk->dk_sources, ds_list) {
        _dispatch_source_merge_kevent(dsi, ke);
    }
}

_dispatch_mgr_invoke

static dispatch_queue_t _dispatch_mgr_invoke(dispatch_queue_t dq)
{
    static const struct timespec timeout_immediately = { 0, 0 };
    struct timespec timeout;
    const struct timespec *timeoutp;
    struct timeval sel_timeout, *sel_timeoutp;
    fd_set tmp_rfds, tmp_wfds;
    struct kevent kev[1];
    int k_cnt, k_err, i, r;

    // 设置线程私有数据TSD
    _dispatch_thread_setspecific(dispatch_queue_key, dq);

    // 无限循环
    for (;;) {
        _dispatch_run_timers();

        // looking for the first unsuspended timer which has its target
        // time set. Given timers are kept in order, if we hit an timer that's
        // unset there's no point in continuing down the list
        timeoutp = _dispatch_get_next_timer_fire(&timeout);

        // 参考_dispatch_update_kq中描述的原因,是一种特殊的处理
        if (_dispatch_select_workaround) {
            FD_COPY(&_dispatch_rfds, &tmp_rfds);
            FD_COPY(&_dispatch_wfds, &tmp_wfds);
            if (timeoutp) {
                sel_timeout.tv_sec = timeoutp->tv_sec;
                sel_timeout.tv_usec = (typeof(sel_timeout.tv_usec))(timeoutp->tv_nsec / 1000u);
                sel_timeoutp = &sel_timeout;
            } else {
                sel_timeoutp = NULL;
            }

            // 这里调用跨平台的多路复用select
            r = select(FD_SETSIZE, &tmp_rfds, &tmp_wfds, NULL, sel_timeoutp);

            // 错误处理
            if (r == -1) {
                if (errno != EBADF) {
                    dispatch_assume_zero(errno);
                    continue;
                }
                for (i = 0; i < FD_SETSIZE; i++) {
                    if (i == _dispatch_kq) {
                        continue;
                    }
                    if (!FD_ISSET(i, &_dispatch_rfds) && !FD_ISSET(i, &_dispatch_wfds)) {
                        continue;
                    }
                    r = dup(i);
                    if (r != -1) {
                        close(r);
                    } else {
                        FD_CLR(i, &_dispatch_rfds);
                        FD_CLR(i, &_dispatch_wfds);
                        _dispatch_rfd_ptrs[i] = 0;
                        _dispatch_wfd_ptrs[i] = 0;
                    }
                }
                continue;
            }

            // 有事件就绪
            if (r > 0) {
                for (i = 0; i < FD_SETSIZE; i++) {
                    if (i == _dispatch_kq) {
                        continue;
                    }
                    if (FD_ISSET(i, &tmp_rfds)) {
                        FD_CLR(i, &_dispatch_rfds); // emulate EV_DISABLE
                        EV_SET(&kev[0], i, EVFILT_READ, EV_ADD|EV_ENABLE|EV_DISPATCH, 0, 1, _dispatch_rfd_ptrs[i]);
                        _dispatch_rfd_ptrs[i] = 0;
                        _dispatch_mgr_thread2(kev, 1);
                    }
                    if (FD_ISSET(i, &tmp_wfds)) {
                        FD_CLR(i, &_dispatch_wfds); // emulate EV_DISABLE
                        EV_SET(&kev[0], i, EVFILT_WRITE, EV_ADD|EV_ENABLE|EV_DISPATCH, 0, 1, _dispatch_wfd_ptrs[i]);
                        _dispatch_wfd_ptrs[i] = 0;
                        _dispatch_mgr_thread2(kev, 1);
                    }
                }
            }

            timeoutp = &timeout_immediately;
        }

        // 正常的路径
        k_cnt = kevent(_dispatch_kq, NULL, 0, kev, sizeof(kev) / sizeof(kev[0]), timeoutp);
        k_err = errno;

        switch (k_cnt) {
        case -1:
            if (k_err == EBADF) {
                DISPATCH_CLIENT_CRASH("Do not close random Unix descriptors");
            }
            dispatch_assume_zero(k_err);
            continue;
        default:
            _dispatch_mgr_thread2(kev, (size_t)k_cnt);
            // fall through
        case 0:
            _dispatch_force_cache_cleanup();
            continue;
        }
    }

    return NULL;
}

不管是特殊处理的路径,还是正常的路径,二者均调用到了_dispatch_mgr_thread2

static void _dispatch_mgr_thread2(struct kevent *kev, size_t cnt)
{
    size_t i;

    for (i = 0; i < cnt; i++) {
        // EVFILT_USER未被source使用
        if (kev[i].filter == EVFILT_USER) {
            _dispatch_queue_serial_drain_till_empty(&_dispatch_mgr_q);
        } 
        else {
            _dispatch_source_drain_kevent(&kev[i]);
        }
    }
}
void _dispatch_queue_serial_drain_till_empty(dispatch_queue_t dq)
{
    _dispatch_queue_drain(dq);
    _dispatch_force_cache_cleanup();
}

总结

本篇的篇幅较长,也比较复杂,而且source本身相对于GCD中的其它API也是最陌生的。本文中涉及到的最多的就是IO多路复用和管理队列,抓住这两个重点,就不难理解dispatch_source的底层原理了。

-------------本文结束 感谢您的阅读-------------

本文标题:GCD源码分析6 —— dispatch_source篇

文章作者:lingyun

发布时间:2018年02月10日 - 00:02

最后更新:2018年02月12日 - 18:02

原始链接:https://tsuijunxi.github.io/2018/02/10/GCD源码分析6 —— dispatch-source篇/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。