问题描述
我在 Thrift
中定义了一组 structs
,如下所示:
I have a set of structs
defined in Thrift
such as the following:
struct Foo {
1: i32 a,
2: i64 b
}
我需要在 C ++
中执行以下操作:
I need to do the following in C++
:
(a)将 Foo
的实例序列化为Thrift兼容的字节(使用 Binary
或 Compact
Thrift协议)
(a) Serialize instances of Foo
into Thrift-compatible bytes (either using the Binary
or Compact
Thrift protocol)
(b)将字节序列化的实例发送到 Kafka
主题
(b) Send the byte-serialized instances to a Kafka
topic
问题
我如何将 Thrift
序列化的实例发送到 Kafka
集群?
How I do send the Thrift
serialized instances to a Kafka
cluster?
预先感谢
推荐答案
找出我自己问题的答案.
Figured out the answer to my own question.
下面的代码片段说明了如何将 Foo
的实例序列化为 Thrift
兼容的字节(使用Thrift Compact
协议).为了使用 Binary
协议,请将 TCompactProtocol
替换为 TBinaryProtocol
.
The code snippet below illustrates how to serialize an instance of Foo
to Thrift
-compatible bytes (using the Thrift Compact
protocol). In order to use the Binary
protocol, replace TCompactProtocol
with TBinaryProtocol
.
#include <thrift/transport/TBufferTransports.h>
#include <thrift/protocol/TCompactProtocol.h>
using apache::thrift::protocol::TCompactProtocol;
using apache::thrift::transport::TMemoryBuffer;
...
...
boost::shared_ptr<TMemoryBuffer> buffer(new TMemoryBuffer());
boost::shared_ptr<TCompactProtocol> protocol(new TCompactProtocol(buffer));
uint8_t **serialized_bytes = reinterpret_cast<uint8_t **>(malloc(sizeof(uint8_t *)));
uint32_t num_bytes = 0;
// 'foo' is an instance of Foo
foo->write(protocol.get());
buffer->getBuffer(serialized_bytes, &num_bytes);
发送到Kafka群集
以下代码段说明了如何将Thrift兼容字节发送到Kafka集群.
Sending to Kafka cluster
The following code snippet illustrates how to send the Thrift-compatible bytes to a Kafka cluster.
注意:下面使用的kafka客户端库为 librdkafka .
NOTE: The kafka client library used below is librdkafka.
#include "rdkafkacpp.h"
std::string errstr;
// Create global configuration
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("metadata.broker.list", "localhost:9092", errstr);
conf->set("api.version.request", "true", errstr);
// Create kafka producer
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
// Create topic-specific configuration
RdKafka::Topic *topic = RdKafka::Topic::create(producer, "topic_name", nullptr, errstr);
auto partition = 1;
// Sending the serialized bytes to Kafka cluster
auto res = producer->produce(
topic, partition,
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
serialized_bytes, num_bytes,
NULL, NULL);
if (res != RdKafka::ERR_NO_ERROR) {
std::cerr << "Failed to publish message" << RdKafka::err2str(res) << std::endl;
} else {
std::cout << "Published message of " << num_bytes << " bytes" << std::endl;
}
producer->flush(10000);
这篇关于在C ++中将序列化的Thrift结构序列化为Kafka的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!