.Net(c#)使用 Kafka 小结
1.开篇
由于项目中必须使用 kafka 来作为消息组件,所以使用 kafka 有一段时间了。不得不感叹 kafka 是一个相当优秀的消息系统。下面直接对使用过程做一总结,希望对大家有用。
1.1.kafka 部署
kafka 的简单搭建我们使用 docker 进行,方便快捷单节点。生产环境不推荐这样的单节点 kafka 部署。
1.1.1.确保安装了 docker 和 docker-compose
网上很多教程,安装也简单,不作为重点赘述。
1.1.2.编写 docker-compose.yml
将以下内容直接复制到新建空文件docker-compose.yml
中。
version: "3"
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
depends_on: [zookeeper]
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_CREATE_TOPICS: "test"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
1.1.3.容器构建提交
在docker-compose.yml
文件的目录下执行以下命令:
docker-compose build # 打包
docker-compose up # 启动, 添加 -d 可以后台启动。
看到日志输出:
Creating network "desktop_default" with the default driver
Creating desktop_zookeeper_1 ... done
Creating desktop_kafka_1 ... done
Attaching to desktop_zookeeper_1, desktop_kafka_1
zookeeper_1 | ZooKeeper JMX enabled by default
zookeeper_1 | Using config: /opt/zookeeper-3.4.13/bin/../conf/zoo.cfg
zookeeper_1 | 2020-05-17 03:34:31,794 [myid:] - INFO [main:QuorumPeerConfig@136] - Reading configuration from: /opt/zookeeper-3.4.13/bin/../conf/zoo.cfg
...
zookeeper_1 | 2020-05-17 03:34:31,872 [myid:] - INFO [main:ZooKeeperServer@836] - tickTime set to 2000
...
kafka_1 | Excluding KAFKA_VERSION from broker config
没有错误输出说明部署成功。
2.kafka 客户端选择
在 github 上能够找到好几个 c#可以使用的 kafka 客户端。大家可以去搜一下,本文就只说明rdkafka-dotnet和confluent-kafka-dotnet。
2.1.rdkafka-dotnet
我们生产环境中就使用的该客户端。在该项目 github 首页中可以看到:
var config = new Config() { GroupId = "example-csharp-consumer" };
using (var consumer = new EventConsumer(config, "127.0.0.1:9092"))
{
consumer.OnMessage += (obj, msg) =>
{
//...
};
}
没错,使用它的原因就是它提供了EventConsumer,可以直接异步订阅消息。整体上来说该客户端非常的稳定,性能优良。使用过程中比较难缠的就是它的配置,比较不直观。它基于librdkafka(C/C++)实现,配置 Config 类中显式配置比较少,大多数是通过字典配置的,比如:
var config = new Config();
config["auto.offset.reset"] = "earliest";//配置首次消息偏移位置为最早
这对于新手来说并不是很友好,很难想到去这样配置。当然如果有 librdkafka 的使用经验会好很多。大多数配置在 librdkafka 项目的CONFIGURATION。
还有一个需要注意的是 Broker 的版本支持Broker version support: >=0.8,也在 librdkafka 项目中可以找到。
2.2 confluent-kafka-dotnet
confluent-kafka-dotnet 是 rdkafka-dotnet(好几年没有维护了)的官方后续版本。推荐使用 confluent-kafka-dotnet,因为配置相对友好,更加全面。比如:
var conf = new ConsumerConfig
{
AutoOffsetReset = AutoOffsetReset.Earliest//显式强类型赋值配置
};
对于 EventConsumer 怎么办呢?在项目变更记录中已经明确提出移除了 OnMessage 多播委托,而 EventConsumer,也就不存在了。但这不难,我们可以参照基项目写一个:
public class EventConsumer<TKey, TValue> : IDisposable
{
private Task _consumerTask;
private CancellationTokenSource _consumerCts;
public IConsumer<TKey, TValue> Consumer { get; }
public ConsumerBuilder<TKey, TValue> Builder { get; set; }
public EventConsumer(IEnumerable<KeyValuePair<string, string>> config)
{
Builder = new ConsumerBuilder<TKey, TValue>(config);
Consumer = Builder.Build();
}
public event EventHandler<ConsumeResult<TKey, TValue>> OnConsumeResult;
public event EventHandler<ConsumeException> OnConsumeException;
public void Start()
{
if (Consumer.Subscription?.Any() != true)
{
throw new InvalidOperationException("Subscribe first using the Consumer.Subscribe() function");
}
if (_consumerTask != null)
{
return;
}
_consumerCts = new CancellationTokenSource();
var ct = _consumerCts.Token;
_consumerTask = Task.Factory.StartNew(() =>
{
while (!ct.IsCancellationRequested)
{
try
{
var cr = Consumer.Consume(TimeSpan.FromSeconds(1));
if (cr == null) continue;
OnConsumeResult?.Invoke(this, cr);
}
catch (ConsumeException e)
{
OnConsumeException?.Invoke(this, e);
}
}
}, ct, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
public async Task Stop()
{
if (_consumerCts == null || _consumerTask == null) return;
_consumerCts.Cancel();
try
{
await _consumerTask;
}
finally
{
_consumerTask = null;
_consumerCts = null;
}
}
public void Dispose()
{
if (_consumerTask != null)
{
Stop().Wait();
}
Consumer?.Dispose();
}
}
使用测试:
static async Task Main(string[] args)
{
Console.WriteLine("Hello World!");
var conf = new ConsumerConfig
{
GroupId = "test-consumer-group",
BootstrapServers = "localhost:9092",
AutoOffsetReset = AutoOffsetReset.Earliest,
};
var eventConsumer = new EventConsumer<Ignore, string>(conf);
eventConsumer.Consumer.Subscribe(new[] {"test"});
eventConsumer.OnConsumeResult += (sen, cr) =>
{
Console.WriteLine($"Receive '{cr.Message.Value}' from '{cr.TopicPartitionOffset}'");
};
do
{
var line = Console.ReadLine();
switch (line)
{
case "stop":
eventConsumer.Stop();
break;
case "start":
eventConsumer.Start();
break;
}
} while (true);
}
3.功能扩展
!!!以下讨论都是对confluent-kafka-dotnet。
由于用户终端也使用了 kafka 客户端订阅消息。如果终端长时间没有上线,并且消息过期时间也较长,服务端会存有大量消息。终端一上线就会读取到大量的堆积消息,很容易就把内存耗尽了。考虑到客户端不是长期在线的场景,无需不间断的处理所有消息,服务端才适合这个角色(:。所以客户端只需每次从登录时的最新点开始读取就可以了,历史性统计就交给服务器去做。
最便捷的方法是每次客户端连接都使用新的groupid,用时间或者guid撒盐。但这样会使服务端记录大量的group信息(如果终端很多m个,并且终端断开连接重连的次数也会很多随机n次,那么也是m*n个group信息),势必对服务端性能造成影响。
另一种方法是在保持groupid不变的情况下,修改消费偏移。那如何去设置位置偏移为最新点呢?
3.1 错误思路 AutoOffsetReset
在配置中存在一个让新手容易产生误解的配置项AutoOffsetReset.Latest自动偏移到最新位置。当你兴冲冲的准备大干一番时发现只有首次创建GroupId时会起作用,当 groupid 已经存在 kafka 记录中时它就不管用了。
3.2 提交偏移 Commit
我们能够在IConsumer<TKey, TValue>
中找到该 commit 方法,它有三个重载:
1. 无参函数。就是提交当前客户端`IConsumer<TKey, TValue>.Assignment`记录的偏移。
2. 参数ConsumeResult<TKey, TValue>。一次仅提交一个偏移。当然配置中默认设置为自动提交(`conf.EnableAutoCommit = true;`),无需手动提交。
3. 参数IEnumerable<TopicPartitionOffset> offsets。直接提交到某一个位置。TopicPartitionOffset有三个决定性属性:话题topic、分区:partition、偏移offset。
第三个函数就是我们想要的,我们只需得到对应参数TopicPartitionOffset的值就可以。
3.2.1.TopicPartition的获取
topic 是我们唯一可以确定的。在IConsumer<TKey, TValue>.Assignment
中可以得到 topic 和 partition。但遗憾的是它只有不会立即有值。我们只能主动去服务端获取,在IAdminClient
中找到了可获取该信息的方法,所以我们做一扩展:
public static IEnumerable<TopicPartition> GetTopicPartitions(ConsumerConfig config, string topic, TimeSpan timeout)
{
using var adv = new AdminClientBuilder(config).Build();
var topPns = adv.GetTopicPartition(topic, timeout);
return topPns;
}
public static IEnumerable<TopicPartition> GetTopicPartition(this AdminClient client, string topic, TimeSpan timeout)
{
var mta = client.GetMetadata(timeout);
var topicPartitions = mta.Topics
.Where(t => topic == t.Topic)
.SelectMany(t => t.Partitions.Select(tt => new TopicPartition(t.Topic, tt.PartitionId)))
.ToList();
return topicPartitions;
}
3.2.2. TopicPartitionOffset获取
我们还差 offset 的值,通过IConsumer<TKey, TValue>.QueryWatermarkOffsets
方法可以查到当前水位,而其中 High 水位就是最新偏移。
现在我们可以完成我们的任务了吗?问题再次出现,虽然客户端表现得从最新点消费了,但是在此之前的卡顿和类似与内存溢出让人不得心安。Commit 还是消费了所有消息:(,只不过暗搓搓的进行。在所有消息消费期间读取所有未消费,然后拼命提交。客户端哪有这么大的内存和性能呢。最终,找到一个和第三个 commit 方法一样接受参数的方法Assign
,一试果然灵验。
public static void AssignOffsetToHighWatermark<TKey, TValue>(this IConsumer<TKey, TValue> consumer, TopicPartition partition, TimeSpan timeout)
{
var water = consumer.QueryWatermarkOffsets(partition, timeout);
if (water == null || water.High == 0) return;
var offset = new TopicPartitionOffset(partition.Topic, partition.Partition, water.High);
consumer.Assign(offset);
}
3.2.3.实际使用
最终的使用示例:
//...
var topicPartitions = ConsumerEx.GetTopicPartitions(conf, "test", TimeSpan.FromSeconds(5));
topicPartitions?.ToList().ForEach(t =>
{
eventConsumer.Consumer.AssignOffsetToHighWatermark(t, TimeSpan.FromSeconds(5));
});
eventConsumer.Start();//在消费事件开始之前就可以进行偏移设置
//...
请注意,如果您关闭了自动提交功能,并且不主动提交任何偏移信息,那么服务端对该 group 的偏移记录将一直不变,Assign 函数并不会改变任何服务的偏移记录。
4.总结
这一圈下来整个 kafka 的基本消费流程也就搞清楚了。kafka 消费者需要对消费的消息进行提交。事实上,每个消息体里都有偏移信息。不提交对于服务端来说就是客户端没有处理过该消息,将不会更改已消费偏移。以此来保证消息消费的可靠性。这和 tcp 中三次握手有异曲同工之妙。
服务端保存着每一个 groupid 对应的已经提交偏移Committed Offset
。当然客户端不提交它是不会变更的(不考虑直接操作服务端的形式)。
客户端保存自己的当前偏移Current Offset
,可以通过Assign
和Commit
进行更改,二者区别是Commit
将连同提交到服务端对应的偏移中进行更改,而Assign
仅改变客户端偏移,这一更改记录在IConsumer<TKey, TValue>.Assignment
中,首次启动时客户端异步向服务端请求Committed Offset
来对其赋值。这就是在 3.2 节中我们没有立即得到该值的的原因,该值将在可能在几秒中后被赋值,所以写了一个主动获取的方法GetTopicPartition
。客户端下一次消费将根据IConsumer<TKey, TValue>.Assignment
进行。
使用AdminClientBuilder.GetMetadata
函数可以得到对应话题的元数据,包括:topic、partition、Brokers 等。
使用IConsumer<TKey, TValue>.QueryWatermarkOffsets
函数可以得到当前服务端的水位,low 为最早的偏移(可能不是 0,考虑消息过期被删除的情况),high 为最新的偏移。