我正在为librdkafka使用consumer.c example file,并且试图弄清楚如何将rkm有效负载(在244行打印)转换为json,以便可以从json中获取参数的值。

现在,我正在使用jansson,但是存在一些问题,可以根据需要进行扩展。

librdkafka或我不知道的标准C库中有此功能吗?

最佳答案

只需将消息有效负载(二进制)和长度传递给Jansson,如下所示:

/* For each consumed message .. */
rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(consumer, 1000);

if (!rkmessage) {
   /* No message available */
   continue;

} else if (rkmessage->err) {
   /* Handle consumer event/error (typically not fatal) */
   handle_consumer_error(rkmessage->err, "%s", rd_kafka_message_errstr(rkmessage));

} else if (rkmessage->len > 0) {
   /* Parse JSON value */
   json_error_t err;
   json_t *json = json_loadb(rkm->payload, rkm->len, JSON_ALLOW_NUL, &err);
   if (!json) {
      handle_consumer_error(RD_KAFKA_RESP_ERR__BAD_MSG,
                            "Failed to parse JSON for message at %s [%"PRId32"] offset %"PRIu64: line %d: %s\n",
                            rd_kafka_topic_name(rkmessage->rkt), rkmessage->partition),
                            rkmessage->offset, err.line, err.text);
   } else {
      /* Process the message */
      process_message(json);

      json_decref(json);
   }
}

rd_kafka_message_destroy(rkmessage);


有关更多信息,请参见Jansson文档:https://jansson.readthedocs.io/en/latest/tutorial.html

10-02 07:28