twemproxy是twitter开源的redis/memcached 代理,数据分片提供取模,一致性哈希等手段,维护和后端server的长连接,自动踢除server,恢复server,提供专门的状态监控端口供外部工具获取状态监控信息。代码写的比较漂亮,学习了一些Nginx的东西,比如每个请求的处理分为多个阶段,IO模型方面,采用单线程收发包,基于epoll事件驱动模型。文档中提到的Zero Copy技术,通过将消息指针在3个队列之间流转实现比较巧妙。本文主要分析twemproxy的核心部分,即一个请求的从接收到最后发送响应给客户端的流程。

一、大体流程

twemproxy后端支持多个server pool,为每个server pool分配一个监听端口用于接收客户端的连接。客户端和proxy建立连接记作client_conn,发起请求,proxy读取数据,放入req_msg中,设置msg的owner为client_conn,proxy根据策略从server pool中选取一个server并且建立连接记作server_conn,然后转发req_forward,将req_msg指针放入client_conn的output队列中,同时放入server_conn的input队列,然后触发server_conn的写事件,server_conn的写回调函数会从input队列中取出req_msg发送给对应的后端server,发送完成后将req_msg放入server_conn的output队列,当req_msg的响应rsp_msg回来后,调用rsp_filter(用于判断消息是否为空,是否消息可以不用回复等)和rsp_forward,将req_msg从server_conn的output队列中取出,建立req_msg和rsp_msg的对应关系(通过msg的peer字段),通过req_msg的owner找到client_conn,然后启动client_conn的写事件,client_conn的写回调函数从client_conn的output队列中取出req_msg,然后通过peer字段拿到对应的rsp_msg,将其发出去。至此,一次请求从被proxy接收到最后响应给client结束。

可以看出,整个流程,req_msg内容只有一份,req_msg指针在三个队列中的顺序是:

1. req_msg => client_conn.outputq

2. req_msg => server_conn.inputq

3. server_conn.inputq => req_msg

4. req_msg => server_conn.outputq

5. server_conn.outputq => req_msg

6. client_conn.outputq => req_msg

总体来说,proxy既需要接收客户端的连接,也需要维护和后端server的长连接,根据从客户端收到的req根据特定策略选择后端一台server进行转发,同一个客户端的连接上的不同的请求可能会转发到后端不同的server。

twemproxy源码分析-LMLPHP

二、基础数据结构:

后端的每个redis server对应一个server结构:

struct server {
uint32_t idx; /* server index */
struct server_pool *owner; /* owner pool */ // 每个server属于一个server pool
struct string pname; /* name:port:weight (ref in conf_server) */
struct string name; /* name (ref in conf_server) */
uint16_t port; /* port */
uint32_t weight; /* weight */
int family; /* socket family */
socklen_t addrlen; /* socket length */
struct sockaddr *addr; /* socket address (ref in conf_server) */ uint32_t ns_conn_q; /* # server connection */ // 下面队列中conn的个数
struct conn_tqh s_conn_q; /* server connection q */ // proxy和这个redis server之间维护的连接队列 int64_t next_retry; /* next retry time in usec */ // proxy有踢除后端server机制,当proxy给某台server转发请求出错次数达到server_failure_limit次,则next_retry微妙内不会请求该server。可配。
uint32_t failure_count; /* # consecutive failures */
};

twemproxy中使用一堆宏来定义队列等数据结构,如上面struct conn_tqh,nc_connection.h中有定义TAILQ_HEAD(conn_tqh, conn),TAILQ_HEAD宏定义如下:

 /*
* Tail queue declarations.
*/
#define TAILQ_HEAD(name, type) \
struct name { \
struct type *tqh_first; /* first element */ \
struct type **tqh_last; /* addr of last next element */ \
TRACEBUF \
}

可以看出结构体是通过宏来定义的,非常恶心,看代码ctags 找不到。conn_tqh是一个队列头部结构体,嵌入到一个server中,链表中每个元素是一个conn结构体,内嵌入一个TAILQ_ENTRY,用于将conn串入server的队列中。宏定义如下:

 #define TAILQ_ENTRY(type)                                               \
struct { \
struct type *tqe_next; /* next element */ \
struct type **tqe_prev; /* address of previous next element */ \
TRACEBUF \
}

各个field的关系如下图所示:

twemproxy源码分析-LMLPHP

看一个很重要的结构conn,可以表示client和proxy之间的connection,也可以表示proxy和redis server之间的connection:

struct conn {
TAILQ_ENTRY(conn) conn_tqe; /* link in server_pool / server / free q */ // 队列entry字段,用于和其他的conn串起来
void *owner; /* connection owner - server_pool / server */ // 每个连接属于一个server int sd; /* socket descriptor */
int family; /* socket address family */
socklen_t addrlen; /* socket length */
struct sockaddr *addr; /* socket address (ref in server or server_pool) */ struct msg_tqh imsg_q; /* incoming request Q */ //从名字看出,和conn_tqh类似,这里也是一个消息队列,从连接读入的数据会组织成msg,push到这个消息队列,msg由mbuf队列组成用来存储具体的数据。
struct msg_tqh omsg_q; /* outstanding request Q */ // 需要往这个连接中写的msg push到这个消息队列
struct msg *rmsg; /* current message being rcvd */ //从连接上读到的数据往rmsg指向的msg里面填
struct msg *smsg; /* current message being sent */ //当前正在写的msg指针 conn_recv_t recv; /* recv (read) handler */ //读事件触发时回调
conn_recv_next_t recv_next; /* recv next message handler */ //实际读数据之前,调这个函数来得到当前正在使用的msg
conn_recv_done_t recv_done; /* read done handler */ // 每次接收到一个完整的消息后,回调
conn_send_t send; /* send (write) handler */ //写事件触发时回调
conn_send_next_t send_next; /* write next message handler */ 实际写数据之前,定位当前要写的msg
conn_send_done_t send_done; /* write done handler */ //发送完一个msg则回调一次
conn_close_t close; /* close handler */ //
conn_active_t active; /* active? handler */ conn_ref_t ref; /* connection reference handler */ // 得到一个连接后,将连接加入相应的队列
conn_unref_t unref; /* connection unreference handler */ conn_msgq_t enqueue_inq; /* connection inq msg enqueue handler */ //这四个队列用于存放msg的指针,和Zero Copy密切相关,后续详述
conn_msgq_t dequeue_inq; /* connection inq msg dequeue handler */
conn_msgq_t enqueue_outq; /* connection outq msg enqueue handler */
conn_msgq_t dequeue_outq; /* connection outq msg dequeue handler */ size_t recv_bytes; /* received (read) bytes */ //该连接上读了多少数据
size_t send_bytes; /* sent (written) bytes */ uint32_t events; /* connection io events */
err_t err; /* connection errno */
unsigned recv_active:; /* recv active? */
unsigned recv_ready:; /* recv ready? */
unsigned send_active:; /* send active? */
unsigned send_ready:; /* send ready? */ unsigned client:; /* client? or server? */ //连接属于proxy和client之间时,client为1,连接属于proxy和后端server之间时,client为0
unsigned proxy:; /* proxy? */ // listen fd封装在conn中时,proxy置1,响应的recv回调函数accept连接
unsigned connecting:; /* connecting? */
unsigned connected:; /* connected? */
unsigned eof:; /* eof? aka passive close? */
unsigned done:; /* done? aka close? */
unsigned redis:; /* redis? */ //后端server是redis还是memcached
};

二、

不管是proxy accept了Client的连接从而分配一个conn结构,还是proxy主动和后端server建立连接从而分配一个conn结构,都调用conn_get()函数,如下:

struct conn *conn_get(void *owner, bool client, bool redis)
{
struct conn *conn; conn = _conn_get();
if (conn == NULL) {
return NULL;
} /* connection either handles redis or memcache messages */
conn->redis = redis ? : ; conn->client = client ? : ; if (conn->client) {
/*
* client receives a request, possibly parsing it, and sends a
* response downstream.
*/
conn->recv = msg_recv; // 从conn读数据
conn->recv_next = req_recv_next; // 在真正从conn读数据之前,需要分配一个req_msg,用于承载读进来的数据
conn->recv_done = req_recv_done; //每次读完一个完整的消req_msg被调用 conn->send = msg_send; // 将从server收到的响应rsp_msg发给客户端
conn->send_next = rsp_send_next; // 每次发送rsp_msg之前需要首先确定从哪个开始发
conn->send_done = rsp_send_done; // 每次发送完成一个rsp_msg给客户端,调一次 conn->close = client_close; //用于proxy断开和client的半连接
conn->active = client_active; conn->ref = client_ref; //获取conn后将conn丢进客户端连接队列
conn->unref = client_unref; conn->enqueue_inq = NULL;
conn->dequeue_inq = NULL;
conn->enqueue_outq = req_client_enqueue_omsgq; // proxy每次接收到一个client发过来的req_msg,将req_msg入conn的output 队列
conn->dequeue_outq = req_client_dequeue_omsgq; // 给客户端发送完rsp_msg后将其对应的req_msg从conn的output队列中删除
} else {
/*
* server receives a response, possibly parsing it, and sends a
* request upstream.
*/
conn->recv = msg_recv;
conn->recv_next = rsp_recv_next; //从后端server接收数据之前需要先得到一个rsp_msg,用于承载读到的数据
conn->recv_done = rsp_recv_done; // 每次读完一个完整的rsp_msg,则回调
conn->send = msg_send; // 将req_msg往后端server发
conn->send_next = req_send_next; // 确定发哪个req_msg
conn->send_done = req_send_done; // 每转发完一个即回调 conn->close = server_close;
conn->active = server_active; conn->ref = server_ref;
conn->unref = server_unref; conn->enqueue_inq = req_server_enqueue_imsgq; // proxy将需要转发的req_msg放入对应后端server连接的input队列
conn->dequeue_inq = req_server_dequeue_imsgq; //proxy从input队列中取出req_msg发送给后端server完成后,需要将req_msg从这个后端连接的input队列中删除
conn->enqueue_outq = req_server_enqueue_omsgq; // 继上一步,需要将req_msg放入到后端连接的output队列
conn->dequeue_outq = req_server_dequeue_omsgq; // 收到后端server的rsp_msg后,将rsp_msg对应的req_msg从连接的output队列删除
} conn->ref(conn, owner); log_debug(LOG_VVERB, "get conn %p client %d", conn, conn->client); return conn;
}

如果该连接是proxy和后端server建立的,则client为false,否则为true,如果后端server是redis,则redis为true,如果为memcached,则为false,连接上的多个读写回调函数根据传入的标记不同而不同。

三、 请求具体处理流程

从前面可以看出,当proxy和Client之间有数据可读时,会调用msg_recv(),如下:

rstatus_t
msg_recv(struct context *ctx, struct conn *conn)
{
rstatus_t status;
struct msg *msg; ASSERT(conn->recv_active); conn->recv_ready = ;
do {
msg = conn->recv_next(ctx, conn, true); //req_recv_next()
if (msg == NULL) {
return NC_OK;
} status = msg_recv_chain(ctx, conn, msg);
if (status != NC_OK) {
return status;
}
} while (conn->recv_ready); return NC_OK;
}

代码很短,其实就是反复的做两件事:req_recv_next和msg_recv_chain

req_recv_next获取当前用于接收数据的msg,设置到conn->rmsg中,并且返回rmsg,然后传给msg_recv_chain,每次重新接收一个完整的请求时,rmsg为空,如果某次读取只读取了请求的一部分,则rmsg不为空,下次读取时数据继续追加到上一次的msg中。

重点看msg_recv_chain()做的事情:

1. 从conn->rmsg中拿出最后一个mbuf(一个msg的数据实际上存在mbuf中,一个msg可以包含多个mbuf,同样通过队列组织),下一次读最多读最后一个mbuf的剩余空间大小,如果最后一个mbuf满了,分配一个新的,插入到rmsg的mbuf队列尾部

2. 循环从conn读数据,如果读取到的数据小于参入的buf大小,设置conn->recv_ready为0,表示后续没有数据要读了,外部的while(conn->recv_ready)循环退出。同样,如果读操作返回0表示客户端主动断开连接,将conn的eof标记置位,同时recv_ready也清0

3. 调用msg_parse()解析rmsg数据,如果成功解析到一条完整的命令,则继续调用msg_parsed(msg),由于msg由多个mbuf组成,并且TCP是流式协议,所以一次读可能接收到了多条完整的命令,甚至是部分命令。这时,msg_parsed(msg)会将后面这些多余的数据拷贝到一个新的mbuf中,并且产生一个新的msg,作为conn的rmsg。由于一次读可能会读到多条命令,这就是为什么msg_recv_chain()中有下面这个循环:

for (;;) {
status = msg_parse(ctx, conn, msg); //每次解析一条命令
if (status != NC_OK) {
return status;
} /* get next message to parse */
nmsg = conn->recv_next(ctx, conn, false);
if (nmsg == NULL || nmsg == msg) {
/* no more data to parse */
break;
}
msg = nmsg;
}

4. 同时,每次调msg_parse(ctx,conn,msg)解析出一条新的命令后,都会回调req_recv_done()方法。这个方法对请求进行过滤(req_filter)和转发(req_forward)给后端server。

5. msg_recv_chain()完成。

回到msg_recv,根据conn->recv_ready来判断是否连接中还有未读数据,有则继续读,parse。。

下面看请求转发给后端函数 req_forward():

1. 将解析出来的msg(以后将从客户端发过来的msg叫做req_msg)push进conn的output队列

2. 根据key和策略从server pool中选择一个server,并且获得和server的连接,记作server_conn,将req_msg push到server_conn的输入队列,起server_conn的写事件

至此,req_msg从client接收到解析到转发结束。下面看发消息给后端server,从后端server接收响应,将响应回复给客户端的过程

起server_conn写事件后,回调函数msg_send(struct context *ctx, struct conn *conn):

  rstatus_t
msg_send(struct context *ctx, struct conn *conn)
{
rstatus_t status;
struct msg *msg; ASSERT(conn->send_active); conn->send_ready = ;
do {
msg = conn->send_next(ctx, conn);
if (msg == NULL) {
/* nothing to send */
return NC_OK;
} status = msg_send_chain(ctx, conn, msg);
if (status != NC_OK) {
return status;
} } while (conn->send_ready);

可以看出和接收函数msg_recv类似。req_send_next()从server_conn的input队列中拿出一个待发消息赋值给conn->smsg。然后调用msg_send_chain()对消息进行实际的发送。

msg_send_chain()流程如下:

1. 准备NC_IOV_MAX个iov,遍历smsg的mbuf队列,每个mbuf的数据用一个iov指向。

2. 循环调用req_send_next()不断的取出待发送的smsg,将其加入到一个局部消息队列send_msgq,同时遍历smsg的mbuf队列,每个mbuf用一个iov指向,直到NC_IOV_MAX个iov被装满,或者没有消息需要发送了,记录下装进去的消息总大小,记作nsend。

3. 循环往fd上写,返回成功写的大小nsent,遍历send_msgq中的msg,和msg中的mbuf队列,将已发送成功的mbuf置为空,并且将发送了部分msg的pos指向第一个未发送的字节。并且,如果一个msg的mbuf队列中的所有的mbuf都发送完成了,则将调用req_send_done(),将这个msg(指针)从这个连接的input队列中删除,并且放入到连接的output队列中。从这里可以看出,只有一个msg的所有的mbuf都被发送出去了才会从input队列中删除,如果只发送了部分mbuf,这些mbuf会被标记为空,下次继续发送这个msg时,会略过空的msg,实现一个msg过大或者网络阻塞导致需要多次发送才能发出一个msg的情况。

4. 至此,发送流程分析完成。

Redis接收到消息,处理返回,proxy接收响应,和proxy接收client的数据类似,同样调用msg_recv()

  rstatus_t
msg_recv(struct context *ctx, struct conn *conn)
{
rstatus_t status;
struct msg *msg; ASSERT(conn->recv_active); conn->recv_ready = ;
do {
msg = conn->recv_next(ctx, conn, true); //rsp_recv_next()
if (msg == NULL) {
return NC_OK;
} status = msg_recv_chain(ctx, conn, msg);
if (status != NC_OK) {
return status;
}
} while (conn->recv_ready); return NC_OK;
}

只是,在这里,conn->recv_next指向的函数是rsp_recv_next(),不是req_recv_next()。同理,接收回消息的处理函数是rsp_recv_done(),不是req_recv_done()

rsp_recv_next():

1. 如果当前conn的rmsg为空,则分配一个新的返回,否则返回当前这个rmsg

2. 将上一步返回的rmsg传给msg_recv_chain()处理。这个函数之前在proxy接收client请求时分析了,不再赘述。

收到一个完整的响应rsp_msg后,调rsp_recv_done():

1. 同样和前面类似,在这里调用rsp_filter()和rsp_forward(),而不是req_filter()和req_forward()

重点说rsp_forward():

1. 从连接的output队列中弹出第一个元素,记作req_msg,这个msg即是目前收到的这个msg(rsp_msg)相对应的请求msg

2. 将req_msg的标记done置为1,表示这个请求已完成。

3. 通过以下两个语句将这两个msg建立对应关系:

pmsg->peer = msg; //pmsg为req_msg,msg为rsp_msg
msg->peer = pmsg

4. 从pmsg的owner可以找到所属的client conn,然后启client conn的写事件。

5. 至此,proxy从后端server接收响应分析完成。

最后,看一下proxy将响应写给client的流程。

类似,调用msg_send():

rstatus_t
msg_send(struct context *ctx, struct conn *conn)
{
rstatus_t status;
struct msg *msg; ASSERT(conn->send_active); conn->send_ready = ;
do {
msg = conn->send_next(ctx, conn); // rsp_send_next()
if (msg == NULL) {
/* nothing to send */
return NC_OK;
} status = msg_send_chain(ctx, conn, msg);
if (status != NC_OK) {
return status;
} } while (conn->send_ready); return NC_OK;
}

同理,在这里,conn->send_next函数指针指向rsp_send_next():

1. 从client conn的output队列中拿出msg,记作req_msg,从req_msg的peer字段将相应的rsp_msg拿出来,放在conn的smsg上待发送

发出完成后,调用rsp_send_done(),主要做的事就是将req_msg从client conn的output队列中删除。

05-20 01:48