本文介绍了Kafka Connect S3 Sink连接器按ID字段对大型主题进行分区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在过去的几周中,我们一直在努力将Kafka Connect添加到我们的数据平台,并认为这将是从Kafka提取数据到S3数据湖中的一种有用方法.我们与FieldPartitioner和TimeBasePartitioner一起玩耍,并看到了一些相当不错的结果.

We've been working on adding Kafka Connect to our data platform for the last few weeks and think it would be a useful way of extracting data from Kafka into an S3 datalake. We've played around with FieldPartitioner and the TimeBasePartitioner and seen some pretty decent results.

我们还需要按用户ID进行分区-但是尝试在用户ID字段上使用FieldPartitioner时,连接器非常慢-尤其是与按日期进行分区等相比.我知道按ID进行分区会带来很多麻烦输出分区的数量,因此不会那么快-很好,但是它需要能够跟上生产者的需求.

We also have the need to partition by user id - but having tried using the FieldPartitioner on a user id field the connector is extremely slow - especially compared to partitioning by date etc. I understand that partitioning by an id will create a lot of output partitions and thus won't be as fast - which is fine but it needs to be able to keep up with producers.

到目前为止,我们已经尝试增加内存和堆-但通常不会看到任何内存问题,除非我们将flush.size增大很多.我们还尝试了较小的冲洗尺寸,非常小的和较大的rotate.schedule.interval.ms配置.我们也研究了网络,但这似乎很好-使用其他分区程序,网络可以保持正常运行.

So far we've tried increasing memory and heap - but we don't usually see any memory issues unless we bump the flush.size to a large number. We've also tried small flush sizes, very small and large rotate.schedule.interval.ms configurations. We've also looked at networking, but that seems to be fine - using other partitioners the network keeps up fine.

在可能要浪费大量时间之前,是否有人尝试或成功使用id字段(特别是在较大的主题上)使用S3 Sink连接器对id字段进行分区?还是有人在配置或设置方面有任何建议可能是个不错的地方?

Before potentially wasting a lot of time on this has anyone attempted or succeeded in partitioning by an id field, especially on larger topics, using the S3 Sink Connector? Or has anyone got any suggestions in terms of configuration or setup that might be a good place to look?

推荐答案

我不习惯使用Kafka的连接器,但至少我会尽力提供帮助.

I'm not used to Kafka's connector, but I will at least try to help.

我不清楚您是否可以将连接器配置为kafka主题的分区级别;我假设这里有某种方法可以做到这一点.

I am not aware if you can configure the connector to kafka topic's partition level; I am assuming there's some way to do that here.

实现此目标的一种可能方法是专注于您的客户向Kafka经纪人生产的步骤.我的建议是实现您自己的 Partitioner ,以使进一步"定位成为可能.控制您要在kafka一侧发送数据的位置.

One possible way to do this would be focused on the step where your clients produce to the Kafka brokers. My suggestion is to implement your own Partitioner, in order to have a "further" control of where you want to send the data on kafka's side.

这是自定义分区程序的示例/简化.例如,您的生产者发送的 key 具有以下格式: id_name_date .此自定义分区程序尝试提取第一个元素( id ),然后选择所需的分区.

This is an example/simplification of your custom partitioner. For example, the key your producers send has this format: id_name_date. This custom partitioner tries to extract the first element (id) and then chooses the desired partition.

public class IdPartitioner implements Partitioner
{
   @Override
   public int partition(String topic, Object key, byte[] kb,
                        Object v, byte[] vb, Cluster cl)
   {
       try
       {
         String pKey= (String) key;
         int id = Integer.parseInt(pKey.substring(0,pKey.indexOf("_")));

          /* getPartitionForId would decide which partition number corresponds
           for the received ID.You could also implement the logic directly here.*/

         return getPartitionForId(id);
       }
       catch (Exception e)
       {return 0;}
   }

   @Override
   public void close()
   {
     //maybe some work here if needed
   }
}

即使您可能需要在 KafkaConnect 端进行更多调整,我相信此选项也可能会有所帮助.假设您有一个包含5个分区的主题,并且 getPartitionForId 只是检查ID的第一个数字以确定分区(出于简化目的,,最小Id为100,最大Id为599)).

Even if you'll may need some more tunning on KafkaConnect side, I believe this option may be helpful. Assuming you have a topic with 5 partitions, and that getPartitionForId just checks the first number of the ID in order to decide the partition (for simplification purposes, min Id is 100 and max Id is 599).

因此,如果接收到的密钥为: 123_tempdata_20201203 ,则分区方法将返回 0 ,即第一个分区.

So if the received key is, f.e: 123_tempdata_20201203, the partition method would return 0, that is, the 1st partition.

(图片显示的是P1而不是P0,因为我认为示例看起来更自然,但是请注意,第一个分区实际上已定义为 partition 0 .老实说,我在绘画时忘记了P0,并且没有保存模板,所以我不得不寻找借口,例如:看起来更自然).

(The image shows P1 instead of P0 because i believe the example looks more natural this way, but be aware that the 1st partition is in fact defined as partition 0 . Ok to be honest I forgot about P0 while painting this and didn't save the template, so I had to search for an excuse, like: looks more natural).

基本上,这是在S3上传之前进行的预调整住宿.

Basically this would be a pre-adjustment, or acommodation, before the S3 upload.

我知道这可能不是理想的答案,因为我不知道您的系统的确切规格. 我的猜测是,有可能将主题分区直接指向s3个位置.

I am aware maybe this isn't the ideal answer, as I don't know the exact specifications of your system. My guess is that there's some possibility to directly point topic partitions to s3 locations.

如果不可能的话,至少我希望这可以为您提供更多的想法.干杯!

If there's no possibility to do so, at least I hope this could give you some further ideas. Cheers!

这篇关于Kafka Connect S3 Sink连接器按ID字段对大型主题进行分区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-28 19:00