为多个主题创建多个消费者组

为多个主题创建多个消费者组

本文介绍了为多个主题创建多个消费者组的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我的情况下,我需要多个主题,每个主题都与多个使用者链接.我想为每个主题设置一个消费者组.我没有在 kafka .net客户端中找到任何方法,因此可以动态创建使用者组并将主题与该使用者组链接.我使用的是 kafka 0.9.0 版本,请告诉我是否需要更改为 kafka服务器设置或在 Zookeeper 上使用?

In my case I need multiple topics with each topic being linked with multiple consumers. I want to set a consumer group for each topic. I did not find any method in kafka .net client so that I can create consumer group dynamically and link the topic with that consumer group. I am using kafka 0.9.0 version, please tell me if I need to change to kafka server setting or on Zookeeper?

推荐答案

我使用Microsoft .NET kafka作为下面的链接构建了一个快速原型.不确定是否能解决您的问题.

I'm built a quick prototype with Microsoft .NET kafka as link below. not sure it's solving your problem or not.

但是,我强烈建议您使用此库,因为它比kafka-net具有更多功能(例如,支持zookeeper来维护偏移量,主题组等)

However, I'm hightly recommend you to use this library because it contain much more feature than kafka-net(e.g. supports zookeeper for maintaining offset, topic group, etc.)

https://github.com/Microsoft/CSharpClient-for-Kafka

示例代码

static void Main(string[] args)
    {
        Task.Factory.StartNew(() =>
        {
            ConsumerConfiguration consumerConfig = new ConsumerConfiguration
            {
                AutoCommit = true,
                AutoCommitInterval = 1000,
                GroupId = "group1",
                ConsumerId = "1",
                AutoOffsetReset = OffsetRequest.SmallestTime,
                NumberOfTries = 20,
                ZooKeeper = new ZooKeeperConfiguration("localhost:2181", 30000, 30000, 2000)
            };
            var consumer = new ZookeeperConsumerConnector(consumerConfig, true);
            var dictionaryMapping = new Dictionary<string, int>();
            dictionaryMapping.Add("topic1", 1);

            var streams = consumer.CreateMessageStreams(dictionaryMapping, new DefaultDecoder());

            var messageStream = streams["topic1"][0];

            foreach (var message in messageStream.GetCancellable(new CancellationToken()))
            {
                Console.WriteLine("Response: P{0},O{1} : {2}", message.PartitionId, message.Offset, Encoding.UTF8.GetString(message.Payload));

                //If you set AutoCommit to false, you can commit by yourself from this command.
                //consumer.CommitOffsets()
            }
        });


        var brokerConfig = new BrokerConfiguration()
        {
            BrokerId = 1,
            Host = "localhost",
            Port = 9092
        };
        var config = new ProducerConfiguration(new List<BrokerConfiguration> { brokerConfig });
        config.CompressionCodec = CompressionCodecs.DefaultCompressionCodec;
        config.ProducerRetries = 3;
        config.RequiredAcks = -1;
        var kafkaProducer = new Producer(config);

        byte[] payloadData = Encoding.UTF8.GetBytes("Test Message");
        var inputMessage = new Message(payloadData);
        var data = new ProducerData<string, Message>("topic1", inputMessage);

        for (int i = 0; i < 10; i++)
        {
            kafkaProducer.Send(data);
        }

        Console.ReadLine();
    }

希望获得帮助.

这篇关于为多个主题创建多个消费者组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-28 03:04