转载:from:http://www.verydemo.com/demo_c92_i210679.html

这个程序虽然我调试过,也分析过,但是没有记录笔记,发现下边这篇文章分析直接透彻,拿来借用,聊以自省,开发一直要研究透彻源码,学习其中的处理手段!

kafka c接口librdkafka介绍之二:生产者接口

librdkafka当前版本可以从github上获取:https://github.com/edenhill/librdkafka
一、librdkafka基本信息:
 源码结构:
kafka/
├── examples
│   ├── Makefile
│   ├── rdkafka_example
│   ├── rdkafka_example.c
│   ├── rdkafka_performance
│   └── rdkafka_performance.c
├── librdkafka.a
├── librdkafka.conf
├── librdkafka.so
├── librdkafka.so.0 -> librdkafka.so
├── LICENSE
├── LICENSE.pycrc
├── Makefile
├── rdaddr.c
├── rdaddr.d
├── rdaddr.h
├── rdaddr.o
├── rd.c
├── rdcrc32.c
├── rdcrc32.d
├── rdcrc32.h
├── rdcrc32.o
├── rdfile.c
├── rdfile.d
├── rdfile.h
├── rdfile.o
├── rdgz.c
├── rdgz.d
├── rdgz.h
├── rdgz.o
├── rd.h
├── rdkafka.c
├── rdkafka.d
├── rdkafka.h
├── rdkafka.o
├── rdrand.c
├── rdrand.d
├── rdrand.h
├── rdrand.o
├── rdtime.h
├── rdtypes.h
├── README.md
└── rpm
    ├── librdkafka-build.sh
    ├── librdkafka.spec.in
    └── librdkafka-VER.txt

文件数:
find . -name "*.[h|c]" | wc -l
18

代码行数:
find . -name "*.[h|c]" | xargs wc -l
    89 ./rdtime.h
   246 ./examples/rdkafka_example.c
   351 ./examples/rdkafka_performance.c
   109 ./rd.h
    47 ./rdtypes.h
   183 ./rdaddr.h
    37 ./rd.c
   201 ./rdaddr.c
   139 ./rdfile.c
    42 ./rdgz.h
   103 ./rdcrc32.h
    50 ./rdrand.c
   121 ./rdgz.c
    45 ./rdrand.h
   520 ./rdkafka.h
  1334 ./rdkafka.c
    88 ./rdfile.h
   133 ./rdcrc32.c
  3838 total

核心代码:
rdkafka.h
rdkafka.c

二、生产者接口分析
 librdkafka 生产者接口支持应用程序向kafka推送数据,这可以满足一大类通过网络向kafka直接传送数据的需求。
 推送流程:
这里通过librdkafka自带的例子来说明(例子可参考前文kafka c接口librdkafka介绍之一)
发送的关键两步:
step 1)  创建kafka实例

点击(此处)折叠或打开

  1. rk = rd_kafka_new(RD_KAFKA_PRODUCER, broker, NULL)

step 2)发送数据

点击(此处)折叠或打开

  1. rd_kafka_produce(rk, topic, partition, RD_KAFKA_OP_F_FREE, opbuf, len)

下面就看看这两步背后librdkafka做了什么?
先看看rd_kafka_new做了什么

点击(此处)折叠或打开

  1. rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, const char *broker,
  2. const rd_kafka_conf_t *conf) {
  3. rd_kafka_t *rk;
  4. rd_sockaddr_list_t *rsal;
  5. const char *errstr;
  6. static int rkid = 0;
  7. int err;
  8. /* If broker is NULL, default it to localhost. */
  9. if (!broker)
  10. broker = "localhost";
  11. /* 获取broker的地址信息 */
  12. if (!(rsal = rd_getaddrinfo(broker, RD_KAFKA_PORT_STR, AI_ADDRCONFIG,
  13. AF_UNSPEC, SOCK_STREAM, IPPROTO_TCP,
  14. &errstr))) {
  15. rd_kafka_log(NULL, LOG_ERR, "GETADDR",
  16. "getaddrinfo failed for '%s': %s",
  17. broker, errstr);
  18. return NULL;
  19. }
  20. /*
  21. * Set up the handle.
  22. */
  23. rk = calloc(1, sizeof(*rk));
  24. rk->rk_type = type;
  25. rk->rk_broker.rsal = rsal;
  26. rk->rk_broker.s = -1;
  27. if (conf)
  28. rk->rk_conf = *conf;
  29. else
  30. rk->rk_conf = rd_kafka_defaultconf;
  31. rk->rk_refcnt = 2; /* One for caller, one for us. */
  32. if (rk->rk_type == RD_KAFKA_CONSUMER)
  33. rk->rk_refcnt++; /* Add another refcount for recv thread */
  34. /* 设置kafka初始状态为DOWN */
  35. rd_kafka_set_state(rk, RD_KAFKA_STATE_DOWN);
  36. pthread_mutex_init(&rk->rk_lock, NULL);
  37. /* 初始化生产者消息队列 */
  38. rd_kafka_q_init(&rk->rk_op);
  39. /* 初始化消费者和错误消息队列 */
  40. rd_kafka_q_init(&rk->rk_rep);
  41. /* 花开两朵,各表一枝,这里讨论生产者 */
  42. switch (rk->rk_type)
  43. {
  44. case RD_KAFKA_CONSUMER:
  45. /* Set up consumer specifics. */
  46. rk->rk_consumer.offset_file_fd = -1;
  47. assert(rk->rk_conf.consumer.topic);
  48. rk->rk_consumer.topic =    strdup(rk->rk_conf.consumer.topic);
  49. rk->rk_consumer.partition = rk->rk_conf.consumer.partition;
  50. rk->rk_consumer.offset = rk->rk_conf.consumer.offset;
  51. rk->rk_consumer.app_offset = rk->rk_conf.consumer.offset;
  52. /* File-based load&store of offset. */
  53. if (rk->rk_conf.consumer.offset_file) {
  54. char buf[32];
  55. int r;
  56. char *tmp;
  57. mode_t mode;
  58. /* If path is a directory we need to generate the
  59. * filename (which is a good idea). */
  60. mode = rd_file_mode(rk->rk_conf.consumer.offset_file);
  61. if (mode == 0) {
  62. /* Error: bail out. */
  63. int errno_save = errno;
  64. rd_kafka_destroy0(rk);
  65. errno = errno_save;
  66. return NULL;
  67. }
  68. if (S_ISDIR(mode))
  69. rk->rk_conf.consumer.offset_file =
  70. rd_tsprintf("%s/%s-%"PRIu32,
  71. rk->rk_conf.consumer.
  72. offset_file,
  73. rk->rk_consumer.topic,
  74. rk->rk_consumer.partition);
  75. rk->rk_conf.consumer.offset_file =
  76. strdup(rk->rk_conf.consumer.offset_file);
  77. /* Open file, or create it. */
  78. if ((rk->rk_consumer.offset_file_fd =
  79. open(rk->rk_conf.consumer.offset_file,
  80. O_CREAT|O_RDWR |
  81. (rk->rk_conf.consumer.offset_file_flags &
  82. RD_KAFKA_OFFSET_FILE_FLAGMASK),
  83. 0640)) == -1) {
  84. rd_kafka_destroy0(rk);
  85. return NULL;
  86. }
  87. /* Read current offset from file, or default to 0. */
  88. r = read(rk->rk_consumer.offset_file_fd,
  89. buf, sizeof(buf)-1);
  90. if (r == -1) {
  91. rd_kafka_destroy0(rk);
  92. return NULL;
  93. }
  94. buf[r] = '\0';
  95. rk->rk_consumer.offset = strtoull(buf, &tmp, 10);
  96. if (tmp == buf) /* empty or not an integer */
  97. rk->rk_consumer.offset = 0;
  98. else
  99. rd_kafka_dbg(rk, "OFFREAD",
  100. "Read offset %"PRIu64" from "
  101. "file %s",
  102. rk->rk_consumer.offset,
  103. rk->rk_conf.consumer.offset_file);
  104. }
  105. break;
  106. case RD_KAFKA_PRODUCER:
  107. break;
  108. }
  109. /* Construct a conveniant name for this handle. */
  110. /* 为broker命名 */
  111. snprintf(rk->rk_broker.name, sizeof(rk->rk_broker.name), "%s#%s-%i",
  112. broker, rd_kafka_type2str(rk->rk_type), rkid++);
  113. /* Start the Kafka thread. */
  114. /* 最关键部分,为生产者/消费者启动了单独的线程,线程函数是rd_kafka_thread_main */
  115. if ((err = pthread_create(&rk->rk_thread, NULL,
  116. rd_kafka_thread_main, rk))) {
  117. rd_sockaddr_list_destroy(rk->rk_broker.rsal);
  118. free(rk);
  119. return NULL;
  120. }
  121. return rk;
  122. }

接下来看看rd_kafka_thread_main里面做了什么?

点击(此处)折叠或打开

  1. static void *rd_kafka_thread_main (void *arg) {
  2. rd_kafka_t *rk = arg;
  3. /* 根据kafka接口状态的不同选择不同的动作 */
  4. while (!rk->rk_terminate) {
  5. switch (rk->rk_state)
  6. {
  7. case RD_KAFKA_STATE_DOWN:
  8. /* ..connect() will block until done (or failure) */
  9. if (rd_kafka_connect(rk) == -1)
  10. sleep(1); /*Sleep between connection attempts*/
  11. break;
  12. case RD_KAFKA_STATE_CONNECTING:
  13. break;
  14. case RD_KAFKA_STATE_UP:
  15. /* .._wait_*() blocks for as long as the
  16. * state remains UP. */
  17. if (rk->rk_type == RD_KAFKA_PRODUCER)
  18. rd_kafka_wait_op(rk);/* 对于生产者,需要执行rd_kafka_wait_op操作 */
  19. else
  20. rd_kafka_consumer_wait_io(rk);
  21. break;
  22. }
  23. }
  24. rd_kafka_destroy(rk);
  25. return NULL;
  26. }

继续追踪rd_kafka_wait_op函数。

点击(此处)折叠或打开

  1. /**
  2. * Producer: Wait for PRODUCE events from application.
  3. *
  4. * Locality: Kafka thread
  5. */
  6. static void rd_kafka_wait_op (rd_kafka_t *rk) {
  7. /* 循环取消息,发送消息,销毁消息 */
  8. while (!rk->rk_terminate && rk->rk_state == RD_KAFKA_STATE_UP) {
  9. rd_kafka_op_t *rko =
  10. rd_kafka_q_pop(&rk->rk_op, RD_POLL_INFINITE);
  11. rd_kafka_produce_send(rk, rko);
  12. rd_kafka_op_destroy(rk, rko);
  13. }
  14. }

至此,一个异步的消息处理接口已经活生生的展现在大家的面前:

应用程序中的线程负责向队列扔消息,接口启动的线程负责循环从队列里取消息并向kafka broker发送消息。

既然有了多线程对队列的操作,异步操作,自然就会有了锁、条件变量的身影。

创建工作完成了,接下来就是应用生产消息了,这里没有特别之处,就是向队列里扔数据。

点击(此处)折叠或打开

  1. /**
  2. * Produce one single message and send it off to the broker.
  3. *
  4. * See rdkafka.h for 'msgflags'.
  5. *
  6. * Locality: application thread
  7. */
  8. void rd_kafka_produce (rd_kafka_t *rk, char *topic, uint32_t partition,
  9. int msgflags,
  10. char *payload, size_t len) {
  11. rd_kafka_op_t *rko;
  12. /* 分配空间 */
  13. rko = calloc(1, sizeof(*rko));
  14. rko->rko_type = RD_KAFKA_OP_PRODUCE;
  15. rko->rko_topic = topic;
  16. rko->rko_partition = partition;
  17. rko->rko_flags |= msgflags;
  18. rko->rko_payload = payload;
  19. rko->rko_len = len;
  20. /* 消息入队 */
  21. rd_kafka_q_enq(&rk->rk_op, rko);
  22. }

入队操作也是常规操作:

点击(此处)折叠或打开

  1. /**
  2. * Enqueue the 'rko' op at the tail of the queue 'rkq'.
  3. *
  4. * Locality: any thread.
  5. */
  6. static inline void rd_kafka_q_enq (rd_kafka_q_t *rkq, rd_kafka_op_t *rko) {
  7. /* 上面讲到过,入队和出队是不同的线程来完成的,因此需要加锁 */
  8. pthread_mutex_lock(&rkq->rkq_lock);
  9. TAILQ_INSERT_TAIL(&rkq->rkq_q, rko, rko_link);
  10. (void)rd_atomic_add(&rkq->rkq_qlen, 1);
  11. /* 有数据入队了,通知等待线程 */
  12. pthread_cond_signal(&rkq->rkq_cond);
  13. pthread_mutex_unlock(&rkq->rkq_lock);
  14. }

三、消息处理的细节
3.1 消息存储
尾队列存储

3.2 消息操作接口
入队:

点击(此处)折叠或打开

  1. /**
  2. * Enqueue the 'rko' op at the tail of the queue 'rkq'.
  3. *
  4. * Locality: any thread.
  5. */
  6. static inline void rd_kafka_q_enq (rd_kafka_q_t *rkq, rd_kafka_op_t *rko) {
  7. pthread_mutex_lock(&rkq->rkq_lock);
  8. TAILQ_INSERT_TAIL(&rkq->rkq_q, rko, rko_link);
  9. (void)rd_atomic_add(&rkq->rkq_qlen, 1);
  10. pthread_cond_signal(&rkq->rkq_cond);
  11. pthread_mutex_unlock(&rkq->rkq_lock);
  12. }

出队:

点击(此处)折叠或打开

  1. /**
  2. * Pop an op from a queue.
  3. *
  4. * Locality: any thread.
  5. */
  6. static rd_kafka_op_t *rd_kafka_q_pop (rd_kafka_q_t *rkq, int timeout_ms) {
  7. rd_kafka_op_t *rko;
  8. rd_ts_t last;
  9. pthread_mutex_lock(&rkq->rkq_lock);
  10. while (!(rko = TAILQ_FIRST(&rkq->rkq_q)) &&
  11. (timeout_ms == RD_POLL_INFINITE || timeout_ms > 0)) {
  12. if (timeout_ms != RD_POLL_INFINITE) {
  13. last = rd_clock();
  14. if (pthread_cond_timedwait_ms(&rkq->rkq_cond,
  15. &rkq->rkq_lock,
  16. timeout_ms) ==
  17. ETIMEDOUT) {
  18. pthread_mutex_unlock(&rkq->rkq_lock);
  19. return NULL;
  20. }
  21. timeout_ms -= (rd_clock() - last) / 1000;
  22. } else
  23. pthread_cond_wait(&rkq->rkq_cond, &rkq->rkq_lock);
  24. }
  25. if (rko) {
  26. TAILQ_REMOVE(&rkq->rkq_q, rko, rko_link);
  27. (void)rd_atomic_sub(&rkq->rkq_qlen, 1);
  28. }
  29. pthread_mutex_unlock(&rkq->rkq_lock);
  30. return rko;
  31. }

3.3 入队、出队同步
pthread_mutex_lock/unlock
pthread_cond_wait

3.4 消息发送
rd_kafka_produce_send->rd_kafka_send_request->rd_kafka_send
在rd_kafka_send中最终调用了sendmsg系统调用

点击(此处)折叠或打开

  1. static int rd_kafka_send (rd_kafka_t *rk, const struct msghdr *msg) {
  2. int r;
  3. r = sendmsg(rk->rk_broker.s, msg, 0);
  4. if (r == -1) {
  5. rd_kafka_fail(rk, "Send failed: %s", strerror(errno));
  6. return -1;
  7. }
  8. rk->rk_broker.stats.tx_bytes += r;
  9. rk->rk_broker.stats.tx++;
  10. return r;
  11. }

四、生产者接口存在的问题与解决办法
从上面的分析可以看出,librdkafka c接口接收消息和处理消息是异步的,换句话说,生产者接口存在
问题1: 应用程序调用生产者接口时,是无法马上得知消息是否发送成功的。
另外,如果kafka broker出现了不可用等情况,kafka c接口依然会不断接收数据,存入队列,但队列没有消费者,导致OOM的风险
问题2:当kafka broker不可用时,kafka c接口的消息队列不断扩大,内存消耗一直增长。

受限于kafka当前的通信协议,应用程序暂时无法得知消息是否发送成功的,但我们还是有办法来解决这个问题(不优雅的解决办法)
生产者接口最终都是通过调用rd_kafka_send来发送消息的。
点击(此处)折叠或打开

  1. static int rd_kafka_send (rd_kafka_t *rk, const struct msghdr *msg) {
  2. int r;
  3. r = sendmsg(rk->rk_broker.s, msg, 0);
  4. if (r == -1) {
  5. rd_kafka_fail(rk, "Send failed: %s", strerror(errno));
  6. return -1;
  7. }
  8. rk->rk_broker.stats.tx_bytes += r;
  9. rk->rk_broker.stats.tx++;
  10. return r;
  11. }

当发送消息失败时,会调用rd_kafka_fail函数来处理发送失败的情况。

点击(此处)折叠或打开

  1. /**
  2. * Failure propagation to application.
  3. * Will tear down connection to broker and trigger a reconnect.
  4. *
  5. * If 'fmt' is NULL nothing will be logged or propagated to the application.
  6. *
  7. * Locality: Kafka thread
  8. */
  9. static void rd_kafka_fail (rd_kafka_t *rk, const char *fmt, ...) {
  10. va_list ap;
  11. pthread_mutex_lock(&rk->rk_lock);
  12. rk->rk_err.err = errno;
  13. rd_kafka_set_state(rk, RD_KAFKA_STATE_DOWN);
  14. if (rk->rk_broker.s != -1) {
  15. close(rk->rk_broker.s);
  16. rk->rk_broker.s = -1;
  17. }
  18. if (fmt) {
  19. va_start(ap, fmt);
  20. vsnprintf(rk->rk_err.msg, sizeof(rk->rk_err.msg), fmt, ap);
  21. va_end(ap);
  22. rd_kafka_log(rk, LOG_ERR, "FAIL", "%s", rk->rk_err.msg);
  23. /* Send ERR op back to application for processing. */
  24. rd_kafka_op_reply(rk, RD_KAFKA_OP_ERR,
  25. RD_KAFKA_RESP_ERR__FAIL, 0,
  26. strdup(rk->rk_err.msg),
  27. strlen(rk->rk_err.msg), 0);
  28. }
  29. pthread_mutex_unlock(&rk->rk_lock);
  30. }

从这里可以看到,当发送消息失败时,接口会通过rd_kafka_set_state函数设置kafka为RD_KAFKA_STATE_DOWN状态。
在应用程序中通过判断rk的状态可以得知kafka的当前状态。c接口封装好了访问当前状态的宏。

点击(此处)折叠或打开

  1. #define rd_kafka_state(rk) ((rk)->rk_state)

当应用程序检测到kafka状态为DOWN时,就不再通过c接口向kafka发送消息,这样就可以保证不出现OOM的问题。
这样是否解决了所有问题了呢?
答案是否定的。在实施过程中,笔者发现如果kafka broker不可用时,c接口在第一次获知这个不幸之前(通过sendmsg失败?),依然会向broker发送数据,而向一个已经关闭的socket发送数据,会收到对方的rst信号,这将在应用程序中产生SIGPIPE信号,如果不处理,应用程序也会异常退出!!!
比较简单粗暴的处理办法是:

点击(此处)折叠或打开

  1. sigemptyset(&set);
  2. sigaddset(&set, SIGPIPE);
  3. sigprocmask(SIG_BLOCK, &set, NULL);
05-04 06:32