本文介绍了在 Spark Executors 上向 Kafka 提交偏移量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我从 Kafka 获取事件,在 Spark 上丰富/过滤/转换它们,然后将它们存储在 ES 中.我正在向 Kafka 提交偏移量

I am getting events from Kafka, enriching/filtering/transforming them on Spark and then storing them in ES. I am committing back the offsets to Kafka

我有两个问题:

(1) 我目前的 Spark 工作非常慢

我有 50 个分区用于一个主题和 20 个执行程序.每个 executor 有 2 个内核和 4g 内存.我的驱动程序有 8g 内存.我消耗了 1000 个事件/分区/秒,我的批处理间隔是 10 秒.这意味着,我在 10 秒内消耗了 500000 个事件

I have 50 partitions for a topic and 20 executors. Each executor has 2 cores and 4g of memory each. My driver has 8g of memory. I am consuming 1000 events/partition/second and my batch interval is 10 seconds. This means, I am consuming 500000 events in 10 seconds

我的ES集群如下:

20 个分片/索引

3 个主实例 c5.xlarge.elasticsearch

3 master instances c5.xlarge.elasticsearch

12 个实例 m4.xlarge.elasticsearch

12 instances m4.xlarge.elasticsearch

磁盘/节点 = 1024 GB 所以总共 12 TB

disk / node = 1024 GB so 12 TB in total

而且我遇到了巨大的调度和处理延迟

And I am getting huge scheduling and processing delays

(2) 如何在执行器上提交偏移量?

目前,我在执行器上丰富/转换/过滤我的事件,然后使用 BulkRequest 将所有内容发送到 ES.这是一个同步过程.如果我得到正面反馈,我会将偏移量列表发送给驱动程序.如果没有,我会发回一个空列表.在驱动程序上,我向 Kafka 提交了偏移量.我相信,应该有一种方法,我可以在执行程序上提交偏移量,但我不知道如何将 kafka Stream 传递给执行程序:

Currently, I enrich/transform/filter my events on executors and then send everything to ES using BulkRequest. It's a synchronous process. If I get positive feedback, I send the offset list to driver. If not, I send back an empty list. On the driver, I commit offsets to Kafka. I believe, there should be a way, where I can commit offsets on executors but I don't know how to pass kafka Stream to executors:

((CanCommitOffsets) kafkaStream.inputDStream()).commitAsync(offsetRanges, this::onComplete);

这是向需要Kafka Stream的Kafka提交偏移量的代码

This is the code for committing offsets to Kafka which requires Kafka Stream

这是我的整体代码:

 kafkaStream.foreachRDD( // kafka topic
                rdd -> { // runs on driver
                    rdd.cache();
                    String batchIdentifier =
                            Long.toHexString(Double.doubleToLongBits(Math.random()));

                    LOGGER.info("@@ [" + batchIdentifier + "] Starting batch ...");

                    Instant batchStart = Instant.now();

                    List<OffsetRange> offsetsToCommit =
                            rdd.mapPartitionsWithIndex( // kafka partition
                                    (index, eventsIterator) -> { // runs on worker

                                        OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

                                        LOGGER.info(
                                                "@@ Consuming " + offsetRanges[index].count() + " events" + " partition: " + index
                                        );

                                        if (!eventsIterator.hasNext()) {
                                            return Collections.emptyIterator();
                                        }

                                        // get single ES documents
                                        List<SingleEventBaseDocument> eventList = getSingleEventBaseDocuments(eventsIterator);

                                        // build request wrappers
                                        List<InsertRequestWrapper> requestWrapperList = getRequestsToInsert(eventList, offsetRanges[index]);

                                        LOGGER.info(
                                                "@@ Processed " + offsetRanges[index].count() + " events" + " partition: " + index + " list size: " + eventList.size()
                                        );

                                        BulkResponse bulkItemResponses = elasticSearchRepository.addElasticSearchDocumentsSync(requestWrapperList);

                                        if (!bulkItemResponses.hasFailures()) {
                                            return Arrays.asList(offsetRanges).iterator();
                                        }

                                        elasticSearchRepository.close();
                                        return Collections.emptyIterator();
                                    },
                                    true
                            ).collect();

                    LOGGER.info(
                            "@@ [" + batchIdentifier + "] Collected all offsets in " + (Instant.now().toEpochMilli() - batchStart.toEpochMilli()) + "ms"
                    );

                    OffsetRange[] offsets = new OffsetRange[offsetsToCommit.size()];

                    for (int i = 0; i < offsets.length ; i++) {
                        offsets[i] = offsetsToCommit.get(i);
                    }

                    try {
                        offsetManagementMapper.commit(offsets);
                    } catch (Exception e) {
                        // ignore
                    }

                    LOGGER.info(
                            "@@ [" + batchIdentifier + "] Finished batch of " + offsetsToCommit.size() + " messages " +
                                    "in " + (Instant.now().toEpochMilli() - batchStart.toEpochMilli()) + "ms"
                    );
                    rdd.unpersist();
                });

推荐答案

您可以将偏移逻辑移到 rdd 循环上方...我正在使用以下模板以获得更好的偏移处理和性能

You can move the offset logic above the rdd loop ... I am using below template for better offset handling and performance

JavaInputDStream<ConsumerRecord<String, String>> kafkaStream = KafkaUtils.createDirectStream(jssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));



        kafkaStream.foreachRDD( kafkaStreamRDD -> {
            //fetch kafka offsets for manually commiting it later
            OffsetRange[] offsetRanges = ((HasOffsetRanges) kafkaStreamRDD.rdd()).offsetRanges();

            //filter unwanted data
            kafkaStreamRDD.filter(
                    new Function<ConsumerRecord<String, String>, Boolean>() {
                @Override
                public Boolean call(ConsumerRecord<String, String> kafkaRecord) throws Exception {
                    if(kafkaRecord!=null) {
                        if(!StringUtils.isAnyBlank(kafkaRecord.key() , kafkaRecord.value())) {
                            return Boolean.TRUE;
                        }
                    }
                    return Boolean.FALSE;
                }
            }).foreachPartition( kafkaRecords -> {

                // init connections here

                while(kafkaRecords.hasNext()) {
                    ConsumerRecord<String, String> kafkaConsumerRecord = kafkaRecords.next();
                    // work here
                }

            });
            //commit offsets
            ((CanCommitOffsets) kafkaStream.inputDStream()).commitAsync(offsetRanges);
        });

这篇关于在 Spark Executors 上向 Kafka 提交偏移量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-20 08:28