问题描述
在我的情况下,我需要多个主题,每个主题都与多个使用者链接.我想为每个主题设置一个消费者组.我没有在 kafka
.net客户端中找到任何方法,因此可以动态创建使用者组并将主题与该使用者组链接.我使用的是 kafka 0.9.0
版本,请告诉我是否需要更改为 kafka服务器
设置或在 Zookeeper
上使用?
In my case I need multiple topics with each topic being linked with multiple consumers. I want to set a consumer group for each topic. I did not find any method in kafka
.net client so that I can create consumer group dynamically and link the topic with that consumer group. I am using kafka 0.9.0
version, please tell me if I need to change to kafka server
setting or on Zookeeper
?
推荐答案
我使用Microsoft .NET kafka作为下面的链接构建了一个快速原型.不确定是否能解决您的问题.
I'm built a quick prototype with Microsoft .NET kafka as link below. not sure it's solving your problem or not.
但是,我强烈建议您使用此库,因为它比kafka-net具有更多功能(例如,支持zookeeper来维护偏移量,主题组等)
However, I'm hightly recommend you to use this library because it contain much more feature than kafka-net(e.g. supports zookeeper for maintaining offset, topic group, etc.)
https://github.com/Microsoft/CSharpClient-for-Kafka
示例代码
static void Main(string[] args)
{
Task.Factory.StartNew(() =>
{
ConsumerConfiguration consumerConfig = new ConsumerConfiguration
{
AutoCommit = true,
AutoCommitInterval = 1000,
GroupId = "group1",
ConsumerId = "1",
AutoOffsetReset = OffsetRequest.SmallestTime,
NumberOfTries = 20,
ZooKeeper = new ZooKeeperConfiguration("localhost:2181", 30000, 30000, 2000)
};
var consumer = new ZookeeperConsumerConnector(consumerConfig, true);
var dictionaryMapping = new Dictionary<string, int>();
dictionaryMapping.Add("topic1", 1);
var streams = consumer.CreateMessageStreams(dictionaryMapping, new DefaultDecoder());
var messageStream = streams["topic1"][0];
foreach (var message in messageStream.GetCancellable(new CancellationToken()))
{
Console.WriteLine("Response: P{0},O{1} : {2}", message.PartitionId, message.Offset, Encoding.UTF8.GetString(message.Payload));
//If you set AutoCommit to false, you can commit by yourself from this command.
//consumer.CommitOffsets()
}
});
var brokerConfig = new BrokerConfiguration()
{
BrokerId = 1,
Host = "localhost",
Port = 9092
};
var config = new ProducerConfiguration(new List<BrokerConfiguration> { brokerConfig });
config.CompressionCodec = CompressionCodecs.DefaultCompressionCodec;
config.ProducerRetries = 3;
config.RequiredAcks = -1;
var kafkaProducer = new Producer(config);
byte[] payloadData = Encoding.UTF8.GetBytes("Test Message");
var inputMessage = new Message(payloadData);
var data = new ProducerData<string, Message>("topic1", inputMessage);
for (int i = 0; i < 10; i++)
{
kafkaProducer.Send(data);
}
Console.ReadLine();
}
希望获得帮助.
这篇关于为多个主题创建多个消费者组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!