本文介绍了验证kafka主题消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在与kafka一起工作,并且被要求对发送给Kafka的消息进行验证,但是我不喜欢我认为的解决方案,这就是为什么我希望有人可以为此建议我

I'm working with kafka and I've been asked with doing a validation of the message that are sent to Kafka, but I don't like the solutions I've thought that's why I hope someone can advice me on this.

我们有许多生产者不在我们的控制范围内,因此他们可以发送任何格式的任何消息,并且我们最多可以发送8000万条记录,并且应在2小时内处理完.有人要求我:

We have many producers outside our control, so they can send any message with any kind of format, and we could have as much as 80 million records sent, and they should be dealt in less than 2 hours.I'v been asked to:

  • 验证格式(Json,因为它必须与mongoDB兼容).

  • Validate the format (Json since it has to be compatible with mongoDB).

验证发送的某些字段.

重命名一些字段

最后2个请求将使用MongoDB中存储的参数来完成.所有这些工作都应假设我们不是唯一一个制造消费者的公司,因此应该对我们的服务进行简单"调用以进行此验证.有什么想法吗?

The last 2 requestes are to be done using parameters stored in MongoDB.All of this should be done assuming we are not the only one making the consumers, so there should be a "simple" call to our service that makes this validation.Any ideas?

推荐答案

这通常是通过Kafka Streams作业完成的.

This is often done with a Kafka Streams job.

您有生产者发送事件的原始"输入主题.然后,Streams作业将从这些主题中读取内容并将有效记录写入干净"主题中.在Streams中,您可以进行各种处理来检查记录或根据需要丰富记录.

You have "raw" input topics where your producers send events. Then a Streams job reads from these topics and writes valid records into "clean" topics. In Streams you can do all sort of processing to check records or enrich them if needed.

您可能还希望将不良记录写入死信队列主题,以便您检查发生这些错误的原因.

You probably also want to write bad records into a dead letter queue topic so you can check why these happened.

然后,您的消费者可以阅读干净的主题,以确保他们只看到经过验证的数据.

Then your consumers can read from the clean topics to ensure they only see validated data.

此解决方案为记录增加了一些延迟,因为在到达消费者之前必须对其进行处理".您还希望在靠近Kafka集群的地方​​运行Streams作业,因为它要验证的数量取决于它可能需要摄取大量数据.

This solution adds some latency to records as they have to be "processed" before being reaching consumers. You also want to run the Streams job close to the Kafka cluster as depending how much you want to validate, it might need to ingest large amount of data.

另请参见使用Kafka的Streams API处理不良消息,其中有些这些概念中有详细的.

Also see Handling bad messages using Kafka's Streams API where some of these concepts are detailed.

这篇关于验证kafka主题消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-28 04:10