转载:from:http://www.verydemo.com/demo_c92_i210679.html
这个程序虽然我调试过,也分析过,但是没有记录笔记,发现下边这篇文章分析直接透彻,拿来借用,聊以自省,开发一直要研究透彻源码,学习其中的处理手段!
kafka c接口librdkafka介绍之二:生产者接口
源码结构:
kafka/
├── examples
│ ├── Makefile
│ ├── rdkafka_example
│ ├── rdkafka_example.c
│ ├── rdkafka_performance
│ └── rdkafka_performance.c
├── librdkafka.a
├── librdkafka.conf
├── librdkafka.so
├── librdkafka.so.0 -> librdkafka.so
├── LICENSE
├── LICENSE.pycrc
├── Makefile
├── rdaddr.c
├── rdaddr.d
├── rdaddr.h
├── rdaddr.o
├── rd.c
├── rdcrc32.c
├── rdcrc32.d
├── rdcrc32.h
├── rdcrc32.o
├── rdfile.c
├── rdfile.d
├── rdfile.h
├── rdfile.o
├── rdgz.c
├── rdgz.d
├── rdgz.h
├── rdgz.o
├── rd.h
├── rdkafka.c
├── rdkafka.d
├── rdkafka.h
├── rdkafka.o
├── rdrand.c
├── rdrand.d
├── rdrand.h
├── rdrand.o
├── rdtime.h
├── rdtypes.h
├── README.md
└── rpm
├── librdkafka-build.sh
├── librdkafka.spec.in
└── librdkafka-VER.txt
文件数:
find . -name "*.[h|c]" | wc -l
18
代码行数:
find . -name "*.[h|c]" | xargs wc -l
89 ./rdtime.h
246 ./examples/rdkafka_example.c
351 ./examples/rdkafka_performance.c
109 ./rd.h
47 ./rdtypes.h
183 ./rdaddr.h
37 ./rd.c
201 ./rdaddr.c
139 ./rdfile.c
42 ./rdgz.h
103 ./rdcrc32.h
50 ./rdrand.c
121 ./rdgz.c
45 ./rdrand.h
520 ./rdkafka.h
1334 ./rdkafka.c
88 ./rdfile.h
133 ./rdcrc32.c
3838 total
核心代码:
rdkafka.h
rdkafka.c
二、生产者接口分析
librdkafka 生产者接口支持应用程序向kafka推送数据,这可以满足一大类通过网络向kafka直接传送数据的需求。
推送流程:
这里通过librdkafka自带的例子来说明(例子可参考前文kafka c接口librdkafka介绍之一)
发送的关键两步:
step 1) 创建kafka实例
点击(此处)折叠或打开
- rk = rd_kafka_new(RD_KAFKA_PRODUCER, broker, NULL)
step 2)发送数据
点击(此处)折叠或打开
- rd_kafka_produce(rk, topic, partition, RD_KAFKA_OP_F_FREE, opbuf, len)
下面就看看这两步背后librdkafka做了什么?
先看看rd_kafka_new做了什么
点击(此处)折叠或打开
- rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, const char *broker,
- const rd_kafka_conf_t *conf) {
- rd_kafka_t *rk;
- rd_sockaddr_list_t *rsal;
- const char *errstr;
- static int rkid = 0;
- int err;
- /* If broker is NULL, default it to localhost. */
- if (!broker)
- broker = "localhost";
- /* 获取broker的地址信息 */
- if (!(rsal = rd_getaddrinfo(broker, RD_KAFKA_PORT_STR, AI_ADDRCONFIG,
- AF_UNSPEC, SOCK_STREAM, IPPROTO_TCP,
- &errstr))) {
- rd_kafka_log(NULL, LOG_ERR, "GETADDR",
- "getaddrinfo failed for '%s': %s",
- broker, errstr);
- return NULL;
- }
- /*
- * Set up the handle.
- */
- rk = calloc(1, sizeof(*rk));
- rk->rk_type = type;
- rk->rk_broker.rsal = rsal;
- rk->rk_broker.s = -1;
- if (conf)
- rk->rk_conf = *conf;
- else
- rk->rk_conf = rd_kafka_defaultconf;
- rk->rk_refcnt = 2; /* One for caller, one for us. */
- if (rk->rk_type == RD_KAFKA_CONSUMER)
- rk->rk_refcnt++; /* Add another refcount for recv thread */
- /* 设置kafka初始状态为DOWN */
- rd_kafka_set_state(rk, RD_KAFKA_STATE_DOWN);
- pthread_mutex_init(&rk->rk_lock, NULL);
- /* 初始化生产者消息队列 */
- rd_kafka_q_init(&rk->rk_op);
- /* 初始化消费者和错误消息队列 */
- rd_kafka_q_init(&rk->rk_rep);
- /* 花开两朵,各表一枝,这里讨论生产者 */
- switch (rk->rk_type)
- {
- case RD_KAFKA_CONSUMER:
- /* Set up consumer specifics. */
- rk->rk_consumer.offset_file_fd = -1;
- assert(rk->rk_conf.consumer.topic);
- rk->rk_consumer.topic = strdup(rk->rk_conf.consumer.topic);
- rk->rk_consumer.partition = rk->rk_conf.consumer.partition;
- rk->rk_consumer.offset = rk->rk_conf.consumer.offset;
- rk->rk_consumer.app_offset = rk->rk_conf.consumer.offset;
- /* File-based load&store of offset. */
- if (rk->rk_conf.consumer.offset_file) {
- char buf[32];
- int r;
- char *tmp;
- mode_t mode;
- /* If path is a directory we need to generate the
- * filename (which is a good idea). */
- mode = rd_file_mode(rk->rk_conf.consumer.offset_file);
- if (mode == 0) {
- /* Error: bail out. */
- int errno_save = errno;
- rd_kafka_destroy0(rk);
- errno = errno_save;
- return NULL;
- }
- if (S_ISDIR(mode))
- rk->rk_conf.consumer.offset_file =
- rd_tsprintf("%s/%s-%"PRIu32,
- rk->rk_conf.consumer.
- offset_file,
- rk->rk_consumer.topic,
- rk->rk_consumer.partition);
- rk->rk_conf.consumer.offset_file =
- strdup(rk->rk_conf.consumer.offset_file);
- /* Open file, or create it. */
- if ((rk->rk_consumer.offset_file_fd =
- open(rk->rk_conf.consumer.offset_file,
- O_CREAT|O_RDWR |
- (rk->rk_conf.consumer.offset_file_flags &
- RD_KAFKA_OFFSET_FILE_FLAGMASK),
- 0640)) == -1) {
- rd_kafka_destroy0(rk);
- return NULL;
- }
- /* Read current offset from file, or default to 0. */
- r = read(rk->rk_consumer.offset_file_fd,
- buf, sizeof(buf)-1);
- if (r == -1) {
- rd_kafka_destroy0(rk);
- return NULL;
- }
- buf[r] = '\0';
- rk->rk_consumer.offset = strtoull(buf, &tmp, 10);
- if (tmp == buf) /* empty or not an integer */
- rk->rk_consumer.offset = 0;
- else
- rd_kafka_dbg(rk, "OFFREAD",
- "Read offset %"PRIu64" from "
- "file %s",
- rk->rk_consumer.offset,
- rk->rk_conf.consumer.offset_file);
- }
- break;
- case RD_KAFKA_PRODUCER:
- break;
- }
- /* Construct a conveniant name for this handle. */
- /* 为broker命名 */
- snprintf(rk->rk_broker.name, sizeof(rk->rk_broker.name), "%s#%s-%i",
- broker, rd_kafka_type2str(rk->rk_type), rkid++);
- /* Start the Kafka thread. */
- /* 最关键部分,为生产者/消费者启动了单独的线程,线程函数是rd_kafka_thread_main */
- if ((err = pthread_create(&rk->rk_thread, NULL,
- rd_kafka_thread_main, rk))) {
- rd_sockaddr_list_destroy(rk->rk_broker.rsal);
- free(rk);
- return NULL;
- }
- return rk;
- }
接下来看看rd_kafka_thread_main里面做了什么?
点击(此处)折叠或打开
- static void *rd_kafka_thread_main (void *arg) {
- rd_kafka_t *rk = arg;
- /* 根据kafka接口状态的不同选择不同的动作 */
- while (!rk->rk_terminate) {
- switch (rk->rk_state)
- {
- case RD_KAFKA_STATE_DOWN:
- /* ..connect() will block until done (or failure) */
- if (rd_kafka_connect(rk) == -1)
- sleep(1); /*Sleep between connection attempts*/
- break;
- case RD_KAFKA_STATE_CONNECTING:
- break;
- case RD_KAFKA_STATE_UP:
- /* .._wait_*() blocks for as long as the
- * state remains UP. */
- if (rk->rk_type == RD_KAFKA_PRODUCER)
- rd_kafka_wait_op(rk);/* 对于生产者,需要执行rd_kafka_wait_op操作 */
- else
- rd_kafka_consumer_wait_io(rk);
- break;
- }
- }
- rd_kafka_destroy(rk);
- return NULL;
- }
继续追踪rd_kafka_wait_op函数。
点击(此处)折叠或打开
- /**
- * Producer: Wait for PRODUCE events from application.
- *
- * Locality: Kafka thread
- */
- static void rd_kafka_wait_op (rd_kafka_t *rk) {
- /* 循环取消息,发送消息,销毁消息 */
- while (!rk->rk_terminate && rk->rk_state == RD_KAFKA_STATE_UP) {
- rd_kafka_op_t *rko =
- rd_kafka_q_pop(&rk->rk_op, RD_POLL_INFINITE);
- rd_kafka_produce_send(rk, rko);
- rd_kafka_op_destroy(rk, rko);
- }
- }
至此,一个异步的消息处理接口已经活生生的展现在大家的面前:
应用程序中的线程负责向队列扔消息,接口启动的线程负责循环从队列里取消息并向kafka broker发送消息。
既然有了多线程对队列的操作,异步操作,自然就会有了锁、条件变量的身影。
创建工作完成了,接下来就是应用生产消息了,这里没有特别之处,就是向队列里扔数据。
点击(此处)折叠或打开
- /**
- * Produce one single message and send it off to the broker.
- *
- * See rdkafka.h for 'msgflags'.
- *
- * Locality: application thread
- */
- void rd_kafka_produce (rd_kafka_t *rk, char *topic, uint32_t partition,
- int msgflags,
- char *payload, size_t len) {
- rd_kafka_op_t *rko;
- /* 分配空间 */
- rko = calloc(1, sizeof(*rko));
- rko->rko_type = RD_KAFKA_OP_PRODUCE;
- rko->rko_topic = topic;
- rko->rko_partition = partition;
- rko->rko_flags |= msgflags;
- rko->rko_payload = payload;
- rko->rko_len = len;
- /* 消息入队 */
- rd_kafka_q_enq(&rk->rk_op, rko);
- }
入队操作也是常规操作:
点击(此处)折叠或打开
- /**
- * Enqueue the 'rko' op at the tail of the queue 'rkq'.
- *
- * Locality: any thread.
- */
- static inline void rd_kafka_q_enq (rd_kafka_q_t *rkq, rd_kafka_op_t *rko) {
- /* 上面讲到过,入队和出队是不同的线程来完成的,因此需要加锁 */
- pthread_mutex_lock(&rkq->rkq_lock);
- TAILQ_INSERT_TAIL(&rkq->rkq_q, rko, rko_link);
- (void)rd_atomic_add(&rkq->rkq_qlen, 1);
- /* 有数据入队了,通知等待线程 */
- pthread_cond_signal(&rkq->rkq_cond);
- pthread_mutex_unlock(&rkq->rkq_lock);
- }
三、消息处理的细节
3.1 消息存储
尾队列存储
3.2 消息操作接口
入队:
点击(此处)折叠或打开
- /**
- * Enqueue the 'rko' op at the tail of the queue 'rkq'.
- *
- * Locality: any thread.
- */
- static inline void rd_kafka_q_enq (rd_kafka_q_t *rkq, rd_kafka_op_t *rko) {
- pthread_mutex_lock(&rkq->rkq_lock);
- TAILQ_INSERT_TAIL(&rkq->rkq_q, rko, rko_link);
- (void)rd_atomic_add(&rkq->rkq_qlen, 1);
- pthread_cond_signal(&rkq->rkq_cond);
- pthread_mutex_unlock(&rkq->rkq_lock);
- }
出队:
点击(此处)折叠或打开
- /**
- * Pop an op from a queue.
- *
- * Locality: any thread.
- */
- static rd_kafka_op_t *rd_kafka_q_pop (rd_kafka_q_t *rkq, int timeout_ms) {
- rd_kafka_op_t *rko;
- rd_ts_t last;
- pthread_mutex_lock(&rkq->rkq_lock);
- while (!(rko = TAILQ_FIRST(&rkq->rkq_q)) &&
- (timeout_ms == RD_POLL_INFINITE || timeout_ms > 0)) {
- if (timeout_ms != RD_POLL_INFINITE) {
- last = rd_clock();
- if (pthread_cond_timedwait_ms(&rkq->rkq_cond,
- &rkq->rkq_lock,
- timeout_ms) ==
- ETIMEDOUT) {
- pthread_mutex_unlock(&rkq->rkq_lock);
- return NULL;
- }
- timeout_ms -= (rd_clock() - last) / 1000;
- } else
- pthread_cond_wait(&rkq->rkq_cond, &rkq->rkq_lock);
- }
- if (rko) {
- TAILQ_REMOVE(&rkq->rkq_q, rko, rko_link);
- (void)rd_atomic_sub(&rkq->rkq_qlen, 1);
- }
- pthread_mutex_unlock(&rkq->rkq_lock);
- return rko;
- }
3.3 入队、出队同步
pthread_mutex_lock/unlock
pthread_cond_wait
3.4 消息发送
rd_kafka_produce_send->rd_kafka_send_request->rd_kafka_send
在rd_kafka_send中最终调用了sendmsg系统调用
点击(此处)折叠或打开
- static int rd_kafka_send (rd_kafka_t *rk, const struct msghdr *msg) {
- int r;
- r = sendmsg(rk->rk_broker.s, msg, 0);
- if (r == -1) {
- rd_kafka_fail(rk, "Send failed: %s", strerror(errno));
- return -1;
- }
- rk->rk_broker.stats.tx_bytes += r;
- rk->rk_broker.stats.tx++;
- return r;
- }
四、生产者接口存在的问题与解决办法
从上面的分析可以看出,librdkafka c接口接收消息和处理消息是异步的,换句话说,生产者接口存在
问题1: 应用程序调用生产者接口时,是无法马上得知消息是否发送成功的。
另外,如果kafka broker出现了不可用等情况,kafka c接口依然会不断接收数据,存入队列,但队列没有消费者,导致OOM的风险
问题2:当kafka broker不可用时,kafka c接口的消息队列不断扩大,内存消耗一直增长。
受限于kafka当前的通信协议,应用程序暂时无法得知消息是否发送成功的,但我们还是有办法来解决这个问题(不优雅的解决办法)
生产者接口最终都是通过调用rd_kafka_send来发送消息的。
点击(此处)折叠或打开
- static int rd_kafka_send (rd_kafka_t *rk, const struct msghdr *msg) {
- int r;
- r = sendmsg(rk->rk_broker.s, msg, 0);
- if (r == -1) {
- rd_kafka_fail(rk, "Send failed: %s", strerror(errno));
- return -1;
- }
- rk->rk_broker.stats.tx_bytes += r;
- rk->rk_broker.stats.tx++;
- return r;
- }
当发送消息失败时,会调用rd_kafka_fail函数来处理发送失败的情况。
点击(此处)折叠或打开
- /**
- * Failure propagation to application.
- * Will tear down connection to broker and trigger a reconnect.
- *
- * If 'fmt' is NULL nothing will be logged or propagated to the application.
- *
- * Locality: Kafka thread
- */
- static void rd_kafka_fail (rd_kafka_t *rk, const char *fmt, ...) {
- va_list ap;
- pthread_mutex_lock(&rk->rk_lock);
- rk->rk_err.err = errno;
- rd_kafka_set_state(rk, RD_KAFKA_STATE_DOWN);
- if (rk->rk_broker.s != -1) {
- close(rk->rk_broker.s);
- rk->rk_broker.s = -1;
- }
- if (fmt) {
- va_start(ap, fmt);
- vsnprintf(rk->rk_err.msg, sizeof(rk->rk_err.msg), fmt, ap);
- va_end(ap);
- rd_kafka_log(rk, LOG_ERR, "FAIL", "%s", rk->rk_err.msg);
- /* Send ERR op back to application for processing. */
- rd_kafka_op_reply(rk, RD_KAFKA_OP_ERR,
- RD_KAFKA_RESP_ERR__FAIL, 0,
- strdup(rk->rk_err.msg),
- strlen(rk->rk_err.msg), 0);
- }
- pthread_mutex_unlock(&rk->rk_lock);
- }
从这里可以看到,当发送消息失败时,接口会通过rd_kafka_set_state函数设置kafka为RD_KAFKA_STATE_DOWN状态。
在应用程序中通过判断rk的状态可以得知kafka的当前状态。c接口封装好了访问当前状态的宏。
点击(此处)折叠或打开
- #define rd_kafka_state(rk) ((rk)->rk_state)
当应用程序检测到kafka状态为DOWN时,就不再通过c接口向kafka发送消息,这样就可以保证不出现OOM的问题。
这样是否解决了所有问题了呢?
答案是否定的。在实施过程中,笔者发现如果kafka broker不可用时,c接口在第一次获知这个不幸之前(通过sendmsg失败?),依然会向broker发送数据,而向一个已经关闭的socket发送数据,会收到对方的rst信号,这将在应用程序中产生SIGPIPE信号,如果不处理,应用程序也会异常退出!!!
比较简单粗暴的处理办法是:
点击(此处)折叠或打开
- sigemptyset(&set);
- sigaddset(&set, SIGPIPE);
- sigprocmask(SIG_BLOCK, &set, NULL);