原文

前言

libevent 封装了底层多路复用接口,让我们可以更方便地跨平台使用异步网络IO。同时, libevent 还实现了定时任务,使用它我们就不用自己实现一遍了,还是比较方便。

libevent 官方提供了 libevent的教程、 以及libevent的接口文档,写得 相当好。我在阅读过一遍之后,开始尝试使用它实现一个负责与物联网设备通信的接入程序, 也就是普通的TCP/UDP服务端,承担接收连接请求、接收数据、下发数据、验证身份、转发 设备请求、管理连接超时、以及实现一些简单的接口,当然还有其它懒得说的功能。这个程 序跟 nginx 是很像的,之前我直接用 epoll 实现过很多个类似的程序,最近看到 libevent 开发的程序很多,于是开始尝试使用它。

然后遇到了问题。我尝试创建多个IO线程,然而事情并不是我想象的那样, event_del() 阻塞了。libevent 的事件操作只能在 dispatch 循环同一个线程中执行,也就是循环退 出后,或者在回调函数中。

一番调试后有了这个echo-server 例子,以及这篇说明,记录我调试的过程。

libevent 入门

libevent 有两个概念,event_base 和 event。

event 是事件,可以设定触发条件(可读/可写/超时/信号),以及条件出发后需要 执行的函数。event 相当于 epoll 中的 epoll_event。

事件循环在 event_base 上执行,event_base 里记录了所有事件的触发条件,循环中检查 条件,如果条件满足,则调用 event 中指定的函数。event_base 相当于 epoll 中 epoll_create() 创建的结构。

例如,要读取一个文件描述符 fd ,可以创建一个读事件,在事件的回调函数中读 fd :

// 事件的回调函数
void event_cb(evutil_socket_t fd, short what, void* arg) {
  // 在这里读 fd
  // ...
}

void* arg = NULL;
// 创建 event_base
struct event_base* ev_base = event_base_new();
// 创建 event, 可以从一个event_base中创建多个event
struct event* ev = event_new(ev_base, fd, EV_READ, event_cb, arg);

// 把事件注册到循环中
event_add(ev, NULL);

// 开始事件循环,如果事件的条件满足,则调用事件的回调函数
event_base_dispatch(ev_base);

设计结构

有两种方式结构能达到我的目的,一种是一个线程监听事件,线程池处理事件;第二种是多 个线程监听事件,事件出发后直接在本线程中执行。我使用的是第二种。

单事件循环,多处理线程

event_base 操作只能与事件循环在同一个线程中,为了在多线程中都可以进行事件处理, 第一个想法是在事件线程中创建 event_base 循环,回调函数中将事件的处理交给线程池。 事件循环中存在一个超时事件A,这个超时事件的回调函数专门负责执行线程池发过来的操作 事件的代码。如果在线程池中还需要操作事件,则将操作事件的代码发给事件线程,并将超 时事件激活以执行这些代码。

我在使用 epoll 时经常使用这种方式,一个线程监听事件,事件触发后再交由线程池处理, 如果要添加或操作事件,则把操作函数发到队列中,由监听事件的线程执行。这种方式需要 在多个线程中交流,会造成一些性能损失,但在我实际的项目中,这些性能损失跟业务消耗 的性能比起来微不足道。但这次开发的程序我还是用了另一种模式。

多事件循环

创建多个事件线程,每个线程创建一个 event_base 事件循环,事件触发时直接在事件线 程中执行。

实际的项目中,事件触发后,不只进行IO操作,还有很多阻塞的任务需要处理,最常见的是 请求数据库。数据库操作也可以写成异步IO放在事件循环中,但为了方便,我都把数据库操 作放到线程池中运行,运行结束后将事件操作放到队列,向上一个结构一样,由一个超时任 务来处理事件操作。

代码说明

代码都上传了,去除了业务相关的逻辑,只实现了 echo-server 功能。

buffer

buffer.cc 和 buffer.h 实现了可变长度的读写缓冲区。libevent 有 evbuffer 结构, 自己实现 buffer 是因为业务的程序中不止用了 libevent,还有很多遗留的IO代码,为 了在IO以后统一业务接口,还是沿用自己实现的 buffer。libevent 的实现是比我的实 现更方便高效的,比我不知高到哪里去,今后我会考虑使用它。

buffer 本质上是个缓存队列,数据从头部插入,从尾部取出。数据取出的顺序与数据插 入的顺序相同。

dispatcher

Dispatcher 是对 event_base 事件循环的封装。 event_base_loop() 函数在此处 运行,使用中每个IO线程拥有一个 Dispather 实例。其中的队列 post_callbacks_ 保存来自其它线程通过 post() 方法对 Dispatcher 的操作。 post() 函数中对超 时事件激活,而在超时事件中则将函数从队列中取出执行。

代码中只演示了一个队列,实际由于业务需要,可能需要多个队列以满足任务优先级的 需求。在我们的业务程序中,关闭连接、释放资源等操作被认为是优先级低的,我们设 立一个单独的队列,在其它队列中的内容都处理完之后,再处理低优先级队列的任务。

thread_pool

一个简单的线程池实现。多个线程循环从任务队列中获取任务后执行。任务队列存在多 个,以实现优先级的目的。在我们的业务中,设备身份认证包含很多密码学和数据库操 作,非常耗时,我们将这个操作的优先级设置很低;相对地,设备数据上传优先级较高。 这样可以保证现有业务不因大量的高性能消耗的设备接入请求中断。

listener

打开监听端口,注册事件,触发事件后调用 accept 函数接收新连接,接收新连接后 调用指定函数(handler)处理连接。

监听链接设置了 SO_REUSEPORT 选项,这样可以在多个线程中同时监听一个端口。

handler

连接处理的方法。listener 接收新连接后,实例化这个类处理新连接。

handler 的读事件接收客户端发来的数据,放到 read_buf 中,再从 read_buf 写 到 write_buf (这里有点多余),随后注册写事件。在写事件中,将 write_buf 的 内容发送给客户端。完成 echo-server 的功能。

实际的程序中,经常需要查询某个客户端连接,将数据从服务端主动发送给客户端,所 以我们将 handler 放到一个哈希表中存储,并设置引用计数。但这个例子里没有这个必 要。

main

程序入口,做一些初始化工作。

总结

搞这些花里胡哨的不见得比新员工学一周go开发的业务程序性能高。如果你真的需要一个 与业务关系不是那么密切,更新不频繁而且对效率要求高的程序,可以尝试使用这里介绍 的方法。

03-05 15:31