.Net(c#)使用 Kafka 小结

1.开篇

由于项目中必须使用 kafka 来作为消息组件,所以使用 kafka 有一段时间了。不得不感叹 kafka 是一个相当优秀的消息系统。下面直接对使用过程做一总结,希望对大家有用。

1.1.kafka 部署

kafka 的简单搭建我们使用 docker 进行,方便快捷单节点。生产环境不推荐这样的单节点 kafka 部署。

1.1.1.确保安装了 docker 和 docker-compose

网上很多教程,安装也简单,不作为重点赘述。

1.1.2.编写 docker-compose.yml

将以下内容直接复制到新建空文件docker-compose.yml中。

version: "3"
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    depends_on: [zookeeper]
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_CREATE_TOPICS: "test"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

1.1.3.容器构建提交

docker-compose.yml文件的目录下执行以下命令:

docker-compose build # 打包
docker-compose up # 启动, 添加 -d 可以后台启动。

看到日志输出:

Creating network "desktop_default" with the default driver
Creating desktop_zookeeper_1 ... done
Creating desktop_kafka_1     ... done
Attaching to desktop_zookeeper_1, desktop_kafka_1
zookeeper_1  | ZooKeeper JMX enabled by default
zookeeper_1  | Using config: /opt/zookeeper-3.4.13/bin/../conf/zoo.cfg
zookeeper_1  | 2020-05-17 03:34:31,794 [myid:] - INFO  [main:QuorumPeerConfig@136] - Reading configuration from: /opt/zookeeper-3.4.13/bin/../conf/zoo.cfg
...
zookeeper_1  | 2020-05-17 03:34:31,872 [myid:] - INFO  [main:ZooKeeperServer@836] - tickTime set to 2000
...
kafka_1      | Excluding KAFKA_VERSION from broker config

没有错误输出说明部署成功。

2.kafka 客户端选择

在 github 上能够找到好几个 c#可以使用的 kafka 客户端。大家可以去搜一下,本文就只说明rdkafka-dotnetconfluent-kafka-dotnet

2.1.rdkafka-dotnet

我们生产环境中就使用的该客户端。在该项目 github 首页中可以看到:

var config = new Config() { GroupId = "example-csharp-consumer" };
using (var consumer = new EventConsumer(config, "127.0.0.1:9092"))
{
    consumer.OnMessage += (obj, msg) =>
    {
        //...
    };
}

没错,使用它的原因就是它提供了EventConsumer,可以直接异步订阅消息。整体上来说该客户端非常的稳定,性能优良。使用过程中比较难缠的就是它的配置,比较不直观。它基于librdkafka(C/C++)实现,配置 Config 类中显式配置比较少,大多数是通过字典配置的,比如:

var config = new Config();
config["auto.offset.reset"] = "earliest";//配置首次消息偏移位置为最早

这对于新手来说并不是很友好,很难想到去这样配置。当然如果有 librdkafka 的使用经验会好很多。大多数配置在 librdkafka 项目的CONFIGURATION

还有一个需要注意的是 Broker 的版本支持Broker version support: >=0.8,也在 librdkafka 项目中可以找到。

2.2 confluent-kafka-dotnet

confluent-kafka-dotnet 是 rdkafka-dotnet(好几年没有维护了)的官方后续版本。推荐使用 confluent-kafka-dotnet,因为配置相对友好,更加全面。比如:

var conf = new ConsumerConfig
{
    AutoOffsetReset = AutoOffsetReset.Earliest//显式强类型赋值配置
};

对于 EventConsumer 怎么办呢?在项目变更记录中已经明确提出移除了 OnMessage 多播委托,而 EventConsumer,也就不存在了。但这不难,我们可以参照基项目写一个:

public class EventConsumer<TKey, TValue> : IDisposable
{
    private Task _consumerTask;
    private CancellationTokenSource _consumerCts;
    public IConsumer<TKey, TValue> Consumer { get; }
    public ConsumerBuilder<TKey, TValue> Builder { get; set; }
    public EventConsumer(IEnumerable<KeyValuePair<string, string>> config)
    {
        Builder = new ConsumerBuilder<TKey, TValue>(config);
        Consumer = Builder.Build();
    }
    public event EventHandler<ConsumeResult<TKey, TValue>> OnConsumeResult;
    public event EventHandler<ConsumeException> OnConsumeException;
    public void Start()
    {
        if (Consumer.Subscription?.Any() != true)
        {
            throw new InvalidOperationException("Subscribe first using the Consumer.Subscribe() function");
        }
        if (_consumerTask != null)
        {
            return;
        }
        _consumerCts = new CancellationTokenSource();
        var ct = _consumerCts.Token;
        _consumerTask = Task.Factory.StartNew(() =>
        {
            while (!ct.IsCancellationRequested)
            {
                try
                {
                    var cr = Consumer.Consume(TimeSpan.FromSeconds(1));
                    if (cr == null) continue;
                    OnConsumeResult?.Invoke(this, cr);
                }
                catch (ConsumeException e)
                {
                    OnConsumeException?.Invoke(this, e);
                }
            }
        }, ct, TaskCreationOptions.LongRunning, TaskScheduler.Default);
    }
    public async Task Stop()
    {
        if (_consumerCts == null || _consumerTask == null) return;
        _consumerCts.Cancel();
        try
        {
            await _consumerTask;
        }
        finally
        {
            _consumerTask = null;
            _consumerCts = null;
        }
    }
    public void Dispose()
    {
        if (_consumerTask != null)
        {
            Stop().Wait();
        }
        Consumer?.Dispose();
    }
}

使用测试:

static async Task Main(string[] args)
{
    Console.WriteLine("Hello World!");
    var conf = new ConsumerConfig
    {
        GroupId = "test-consumer-group",
        BootstrapServers = "localhost:9092",
        AutoOffsetReset = AutoOffsetReset.Earliest,
    };
    var eventConsumer = new EventConsumer<Ignore, string>(conf);
    eventConsumer.Consumer.Subscribe(new[] {"test"});
    eventConsumer.OnConsumeResult += (sen, cr) =>
    {
        Console.WriteLine($"Receive '{cr.Message.Value}' from '{cr.TopicPartitionOffset}'");
    };
    do
    {
        var line = Console.ReadLine();
        switch (line)
        {
            case "stop":
                eventConsumer.Stop();
                break;
            case "start":
                eventConsumer.Start();
                break;
        }
    } while (true);
}

3.功能扩展

!!!以下讨论都是对confluent-kafka-dotnet。

由于用户终端也使用了 kafka 客户端订阅消息。如果终端长时间没有上线,并且消息过期时间也较长,服务端会存有大量消息。终端一上线就会读取到大量的堆积消息,很容易就把内存耗尽了。考虑到客户端不是长期在线的场景,无需不间断的处理所有消息,服务端才适合这个角色(:。所以客户端只需每次从登录时的最新点开始读取就可以了,历史性统计就交给服务器去做。

最便捷的方法是每次客户端连接都使用新的groupid,用时间或者guid撒盐。但这样会使服务端记录大量的group信息(如果终端很多m个,并且终端断开连接重连的次数也会很多随机n次,那么也是m*n个group信息),势必对服务端性能造成影响。

另一种方法是在保持groupid不变的情况下,修改消费偏移。那如何去设置位置偏移为最新点呢?

3.1 错误思路 AutoOffsetReset

在配置中存在一个让新手容易产生误解的配置项AutoOffsetReset.Latest自动偏移到最新位置。当你兴冲冲的准备大干一番时发现只有首次创建GroupId时会起作用,当 groupid 已经存在 kafka 记录中时它就不管用了。

3.2 提交偏移 Commit

我们能够在IConsumer<TKey, TValue>中找到该 commit 方法,它有三个重载:

1. 无参函数。就是提交当前客户端`IConsumer<TKey, TValue>.Assignment`记录的偏移。
2. 参数ConsumeResult<TKey, TValue>。一次仅提交一个偏移。当然配置中默认设置为自动提交(`conf.EnableAutoCommit = true;`),无需手动提交。
3. 参数IEnumerable<TopicPartitionOffset> offsets。直接提交到某一个位置。TopicPartitionOffset有三个决定性属性:话题topic、分区:partition、偏移offset。

第三个函数就是我们想要的,我们只需得到对应参数TopicPartitionOffset的值就可以。

3.2.1.TopicPartition的获取

topic 是我们唯一可以确定的。在IConsumer<TKey, TValue>.Assignment中可以得到 topic 和 partition。但遗憾的是它只有不会立即有值。我们只能主动去服务端获取,在IAdminClient中找到了可获取该信息的方法,所以我们做一扩展:

public static IEnumerable<TopicPartition> GetTopicPartitions(ConsumerConfig config, string topic, TimeSpan timeout)
{
    using var adv = new AdminClientBuilder(config).Build();
    var topPns = adv.GetTopicPartition(topic, timeout);
    return topPns;
}

public static IEnumerable<TopicPartition> GetTopicPartition(this AdminClient client, string topic, TimeSpan timeout)
{
    var mta = client.GetMetadata(timeout);
    var topicPartitions = mta.Topics
        .Where(t => topic == t.Topic)
        .SelectMany(t => t.Partitions.Select(tt => new TopicPartition(t.Topic, tt.PartitionId)))
        .ToList();
    return topicPartitions;
}

3.2.2. TopicPartitionOffset获取

我们还差 offset 的值,通过IConsumer<TKey, TValue>.QueryWatermarkOffsets方法可以查到当前水位,而其中 High 水位就是最新偏移。

现在我们可以完成我们的任务了吗?问题再次出现,虽然客户端表现得从最新点消费了,但是在此之前的卡顿和类似与内存溢出让人不得心安。Commit 还是消费了所有消息:(,只不过暗搓搓的进行。在所有消息消费期间读取所有未消费,然后拼命提交。客户端哪有这么大的内存和性能呢。最终,找到一个和第三个 commit 方法一样接受参数的方法Assign,一试果然灵验。

public static void AssignOffsetToHighWatermark<TKey, TValue>(this IConsumer<TKey, TValue> consumer, TopicPartition partition, TimeSpan timeout)
{
    var water = consumer.QueryWatermarkOffsets(partition, timeout);
    if (water == null || water.High == 0) return;
    var offset = new TopicPartitionOffset(partition.Topic, partition.Partition, water.High);
    consumer.Assign(offset);
}

3.2.3.实际使用

最终的使用示例:

//...
var topicPartitions = ConsumerEx.GetTopicPartitions(conf, "test", TimeSpan.FromSeconds(5));
topicPartitions?.ToList().ForEach(t =>
{
    eventConsumer.Consumer.AssignOffsetToHighWatermark(t, TimeSpan.FromSeconds(5));
});
eventConsumer.Start();//在消费事件开始之前就可以进行偏移设置
//...

请注意,如果您关闭了自动提交功能,并且不主动提交任何偏移信息,那么服务端对该 group 的偏移记录将一直不变,Assign 函数并不会改变任何服务的偏移记录。

4.总结

这一圈下来整个 kafka 的基本消费流程也就搞清楚了。kafka 消费者需要对消费的消息进行提交。事实上,每个消息体里都有偏移信息。不提交对于服务端来说就是客户端没有处理过该消息,将不会更改已消费偏移。以此来保证消息消费的可靠性。这和 tcp 中三次握手有异曲同工之妙。

服务端保存着每一个 groupid 对应的已经提交偏移Committed Offset。当然客户端不提交它是不会变更的(不考虑直接操作服务端的形式)。

客户端保存自己的当前偏移Current Offset,可以通过AssignCommit进行更改,二者区别是Commit将连同提交到服务端对应的偏移中进行更改,而Assign仅改变客户端偏移,这一更改记录在IConsumer<TKey, TValue>.Assignment中,首次启动时客户端异步向服务端请求Committed Offset来对其赋值。这就是在 3.2 节中我们没有立即得到该值的的原因,该值将在可能在几秒中后被赋值,所以写了一个主动获取的方法GetTopicPartition。客户端下一次消费将根据IConsumer<TKey, TValue>.Assignment进行。

使用AdminClientBuilder.GetMetadata函数可以得到对应话题的元数据,包括:topic、partition、Brokers 等。

使用IConsumer<TKey, TValue>.QueryWatermarkOffsets函数可以得到当前服务端的水位,low 为最早的偏移(可能不是 0,考虑消息过期被删除的情况),high 为最新的偏移。

05-18 09:17