登录
注册
node.js 学习社区
Node.js代码阅读笔记之libeio

李佩芸

2014-12-11 03:54

近日读node.js代码,随手记下了一些,不对的地方还请大家指正~

在cnodejs里发帖,有些字符有特殊含义,比如下划线“_”会标识斜体。。。(恕我愚钝)
看不清楚可以看这里:
http://www.codingguy.net/?p=195


这个库全称貌似为Enhanced IO,用多线程实现了异步IO操作,为什么不用libev?因为libev用epoll(linux平台),不支持regular file。没错,libeio就是给node.js的fs模块用的。

从demo.c看如何使用

翻开代码,有个demo.c,操作很多,常用文件io都包括了。精简一下代码,得到一个minidemo.c:

int respipe [2];

void want_poll (void)
{
    char dummy;
    printf ("want_poll ()\n");
    write (respipe [1], &dummy, 1);
}

void done_poll (void)
{
    char dummy;
    printf ("done_poll ()\n");
    read (respipe [0], &dummy, 1);
}

int res_cb (eio_req *req)
{
    printf ("res_cb(%d|%s) = %d\n", req->type, req->data ? req->data : "?",
            EIO_RESULT (req));
    if (req->result < 0)
        abort ();
    return 0;
}

void event_loop (void)
{
    // an event loop. yeah.
    struct pollfd pfd;
    pfd.fd     = respipe [0];
    pfd.events = POLLIN;
    printf ("\nentering event loop\n");
    while (eio_nreqs ()) {
        poll (&pfd, 1, -1);
        printf ("eio_poll () = %d\n", eio_poll ());
    }
    printf ("leaving event loop\n");
}

int main (void)
{
    printf ("pipe ()\n");
    if (pipe (respipe)) abort ();
    printf ("eio_init ()\n");
if (eio_init (want_poll, done_poll)) abort ();
    eio_mkdir ("eio-test-dir", 0777, 0, res_cb, "mkdir");
    event_loop ();
}

这个demo很简单,大体流程如下:
enter image description here


  • 创建pipe

  • 初始化eio,注册wantpoll与donepoll

  • 发出异步操作:mkdir

  • 启动event loop

第一次看到这段代码,往往被pipe搞迷糊,为什么创建pipe呢?先不管为什么,我们先看是怎么用的。

创建匿名管道后,fd放在respipe数组中,respipe[ 0 ]用于读pipe,respipe[ 1 ]用于写pipe。在wantpoll和donepoll函数中,分别对pipe进行写和读。而wantpoll与donepoll则通过eio_init注册,即,赋值到全局变量中。

接着发出异步操作,创建一个目录eiomkdir(……),这里的rescb是操作完成之后的回调函数。

最后启动event loop。在这个函数中,有个大循环,eionreqs()表明尚未完成的请求数量,如果有未完成的请求,则poll respipe[0],即等待可读,若可读则调用eiopoll取回结果。eiopoll的执行也会回调rescb函数。

关于前面提到的pipe的作用,这与wantpoll与donepoll机制有关。wantpoll在响应队列第一次装入请求包的时候被调用;donepoll在响应队列为空的时候被调用。wantpoll向pipe写入数据、donepoll从pipe读出数据,读之前通过poll来等待数据可读,若可读,就说明有wantpoll调用,有异步请求被处理。接下来就可以调用eiopoll去接收结果并做回调了。这样,起到了类似锁的作用,同步了生产者消费者类型的资源访问,避免了无意义的循环空转,节约处理器cycle。

小窥源代码

看过demo,我们了解了hello world般的流程。但为了清楚eio接口函数的细节,以及异步io的实现方式,我们要继续读eio.[ch]的代码。话说eio的代码文件并不多,简单介绍一下名字:


  • ecb.h

这个头文件叫libecb(只有一个ecb.h的lib),全称为The e compiler builtins header/library。它把gcc相关的许多功能封装起来,并兼容不同的gcc版本。比如 attribute (attrlist)这些编译器相关的属性,可以用ecb_attribute(attrlist)这个宏来写,且不用考虑编译器版本(3.1以上才支持这个玩意)。


  • eio.[ch]

就这么一对儿c文件和h文件,先不解释了

下面翻开源代码,从demo中出现的几个接口函数开始,分析一下eio的工作流程。

一切的开始:eio_init

使用eio,第一步就是调用eioinit做一堆初始化。初始化mutex、队列、各种计数器,还有注册wantpoll与done_poll回调函数,这两个函数是由用户提供的。

X_MUTEX_CREATE (wrklock);
X_MUTEX_CREATE (reslock);
X_MUTEX_CREATE (reqlock);
X_COND_CREATE  (reqwait);
reqq_init (&req_queue);
reqq_init (&res_queue);
wrk_first.next =
wrk_first.prev = &wrk_first;
started  = 0;
idle     = 0;
nreqs    = 0;
nready   = 0;
npending = 0;
want_poll_cb = want_poll;
done_poll_cb = done_poll;

业务逻辑的入口:eio_xxx

初始化之后,即可调用eio的异步io函数了。我们以前面demo中创建目录为例,调用eio_mkdir(…)函数。如其他异步io函数,函数体非常简洁,但是逻辑并不简洁:

eio_req *eio_mkdir (const char *path, eio_mode_t mode, int pri, eio_cb cb, void *data)
{
    REQ (EIO_MKDIR); PATH; req->int2 = (long)mode; SEND;
}

真是那各种宏啊,淡定一下,找找定义:

#define REQ(rtype)                                              \
eio_req *req;                                                 \
req = (eio_req *)calloc (1, sizeof *req);                     \
if (!req)                                                     \
return 0;                                                   \
req->type    = rtype;                                         \
req->pri     = pri;                                                    \
req->finish  = cb;                                                       \
req->data    = data;                                               \
req->destroy = eio_api_destroy;

这个宏创建了一个eio_req变量,注意是堆空间哦。然后根据rtype,也就是io操作的类型,对其赋值。那些cb等参数都不是宏里面的,从使用场景可以看出是eio _mkdir的参数。接下来是PATH,同样找到定义如下:

#define PATH                                                           \
req->flags |= EIO_FLAG_PTR1_FREE;                             \
req->ptr1 = strdup (path);                                        \
if (!req->ptr1)                                                      \
{                                                                         \
    eio_api_destroy (req);                                               \
    return 0;                                                               \
}

文件io操作嘛,自然要有path。接下来又填写了mode。以上这些操作都是为了构建一个eioreq变量。这个eioreq的结构定义如下:

struct eio_req
{
eio_req volatile *next; /* private ETP */
eio_ssize_t result;  /* result of syscall, e.g. result = read (... */
off_t offs;      /* read, write, truncate, readahead, sync_file_range, fallocate: file offset, mknod: dev_t */
size_t size;     /* read, write, readahead, sendfile, msync, mlock, sync_file_range, fallocate: length */
void *ptr1;      /* all applicable requests: pathname, old name; readdir: optional eio_dirents */
void *ptr2;      /* all applicable requests: new name or memory buffer; readdir: name strings */
eio_tstamp nv1;  /* utime, futime: atime; busy: sleep time */
eio_tstamp nv2;  /* utime, futime: mtime */
int type;        /* EIO_xxx constant ETP */
int int1;        /* all applicable requests: file descriptor; sendfile: output fd; open, msync, mlockall, readdir: flags */
long int2;       /* chown, fchown: uid; sendfile: input fd; open, chmod, mkdir, mknod: file mode, sync_file_range, fallocate: flags */
long int3;       /* chown, fchown: gid */
int errorno;     /* errno value on syscall return */
#if __i386 || __amd64
unsigned char cancelled;
#else
sig_atomic_t cancelled;
#endif
unsigned char flags; /* private */
signed char pri;     /* the priority */
void *data;
eio_cb finish;
void (*destroy)(eio_req *req); /* called when request no longer needed */
void (*feed)(eio_req *req);    /* only used for group requests */
EIO_REQ_MEMBERS
eio_req *grp, *grp_prev, *grp_next, *grp_first; /* private */
};

初始化了一个新的eioreq,接下只用了一个SEND完成其余工作,SEND定义很简单,只是调用了eiosubmit函数:

#define SEND eio_submit (req); return req

异步IO的抽象:eio_submit

eiosubmit只是etpsubmit的一个封装:

void eio_submit (eio_req *req)
{
    etp_submit (req);
}

这个etp_submit几乎是所有io操作的入口,其中的关键代码如下:

X_LOCK (reqlock);
++nreqs;
++nready;
reqq_push (&req_queue, req);
X_COND_SIGNAL (reqwait);
X_UNLOCK (reqlock);
etp_maybe_start_thread ();

先做计数器累加,然后把刚刚初始化的eioreq放在reqqueue,这是request队列,然后调动etpmaybestartthread(),在这个函数中会判断是否调用etpstart_thread()启动工作线程。判断逻辑如下:

if (ecb_expect_true (etp_nthreads () >= wanted))
    return;
if (ecb_expect_true (0 <= (int)etp_nthreads () + (int)etp_npending () - (int)etp_nreqs ()))
    return;

1. 先判断工作线程数量,如果已到达上限(wanted,默认是4),则直接返回,不做任何操作。
2. 如果当前的工作线程数量与已处理请求数量的和小于总的请求数量(包括完成与未完成的),则直接返回,不做任何操作
条件都满足了,接下来会调用etpstartthread()函数,通过thread_create创建线程。

线程?!没错,就是线程:eio_proc

在etpstartthread()中创建线程,线程函数etp_proc的代码大致如下:

for (;;) {
    for (;;) {
        self->req = req = reqq_shift (&req_queue);
        if (req)
            break;
    }
        ETP_EXECUTE (self, req);
        if (!reqq_push (&res_queue, req) && want_poll_cb)
        want_poll_cb ();
}

流程概括如下:


  1. 从req_queue队列(即,待处理队列)中取出一个eio请求

  2. 调用ETP_EXECUTE完成io的实质操作,这会是一个阻塞过程。

  3. 将eio请求插入res_queue队列(即,已处理队列)

  4. 调用wantpollcb回调函数

这里要注意wantpollcb的回调条件,当且仅当入队前res_queue为空,才会调用。

执行同步IO:eio_execute

线程函数中有ETPEXECUTE出现,这个宏指代了eioexecute函数。在这个函数里面,根据eioreq的type域进行switch case,调用相应的io函数完成实质操作,并把结果写到eioreq中。示例代码如下:

/* ... */
switch (req->type)
{
    /* ... */
    case EIO_MKDIR:     req->result = mkdir     (req->ptr1, (eio_mode_t)req->int2); break;
    /* ... */
}
/* ... */

单纯从异步IO操作的执行看,前面介绍的流程已经能完成了,但是我们不仅要执行IO,还要获取执行的结果,并触发想要的回调。这是通过eio_poll来实现的:

接收报告:eio_poll

如同很多函数一样,eiopoll只是套了个壳儿,进而调用etppoll。删减了一些代码,挑出重要逻辑如下:

for (;;) {
    ETP_REQ *req;
    etp_maybe_start_thread ();
    req = reqq_shift (&res_queue);
    if (req) {
        if (!res_queue.size && done_poll_cb)
        done_poll_cb ();
    }
    --nreqs;
}

流程要点如下:

调用etpmaybestartthread(),上面已经讲过,在满足条件的情况下会创建工作线程。
从res
queue中取出一个eioreq
如果res
queue为空,则触发donepollcb回调

总结

就这样吧,再给出一张图来整体贯穿一下:
enter image description here

原文引自:http://cnodejs.org/topic/4f2c0391ab3b872077002f00

回复 · 0

发表回复

你可以在回复中 @ 其他人