



在我的情况下,我需要多个主题,每个主题都与多个使用者链接.我想为每个主题设置一个消费者组.我没有在 kafka .net客户端中找到任何方法,因此可以动态创建使用者组并将主题与该使用者组链接.我使用的是 kafka 0.9.0 版本,请告诉我是否需要更改为 kafka服务器设置或在 Zookeeper 上使用?

In my case I need multiple topics with each topic being linked with multiple consumers. I want to set a consumer group for each topic. I did not find any method in kafka .net client so that I can create consumer group dynamically and link the topic with that consumer group. I am using kafka 0.9.0 version, please tell me if I need to change to kafka server setting or on Zookeeper?


我使用Microsoft .NET kafka作为下面的链接构建了一个快速原型.不确定是否能解决您的问题.

I'm built a quick prototype with Microsoft .NET kafka as link below. not sure it's solving your problem or not.


However, I'm hightly recommend you to use this library because it contain much more feature than kafka-net(e.g. supports zookeeper for maintaining offset, topic group, etc.)



static void Main(string[] args)
        Task.Factory.StartNew(() =>
            ConsumerConfiguration consumerConfig = new ConsumerConfiguration
                AutoCommit = true,
                AutoCommitInterval = 1000,
                GroupId = "group1",
                ConsumerId = "1",
                AutoOffsetReset = OffsetRequest.SmallestTime,
                NumberOfTries = 20,
                ZooKeeper = new ZooKeeperConfiguration("localhost:2181", 30000, 30000, 2000)
            var consumer = new ZookeeperConsumerConnector(consumerConfig, true);
            var dictionaryMapping = new Dictionary<string, int>();
            dictionaryMapping.Add("topic1", 1);

            var streams = consumer.CreateMessageStreams(dictionaryMapping, new DefaultDecoder());

            var messageStream = streams["topic1"][0];

            foreach (var message in messageStream.GetCancellable(new CancellationToken()))
                Console.WriteLine("Response: P{0},O{1} : {2}", message.PartitionId, message.Offset, Encoding.UTF8.GetString(message.Payload));

                //If you set AutoCommit to false, you can commit by yourself from this command.

        var brokerConfig = new BrokerConfiguration()
            BrokerId = 1,
            Host = "localhost",
            Port = 9092
        var config = new ProducerConfiguration(new List<BrokerConfiguration> { brokerConfig });
        config.CompressionCodec = CompressionCodecs.DefaultCompressionCodec;
        config.ProducerRetries = 3;
        config.RequiredAcks = -1;
        var kafkaProducer = new Producer(config);

        byte[] payloadData = Encoding.UTF8.GetBytes("Test Message");
        var inputMessage = new Message(payloadData);
        var data = new ProducerData<string, Message>("topic1", inputMessage);

        for (int i = 0; i < 10; i++)




07-28 03:04