点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(正在更新…)

章节内容

上节我们完成了如下内容:
现实中业务中我们遇到了分区副本数量想要调整的问题,假设起初我们的分区副本数只有1,想要修改为2、3来保证当部分Kafka的Broker宕机时,仍然可以提供服务给我们,但是不可以用脚本直接修改,所以我们通过JSON+脚本的方式,来达到Kafka副本分区的调整。

  • 启动服务、创建主题、查看主题
  • 修改分区副本因子(不允许)、修改分区副本因子(成功)
  • 查看结果

大数据-67 Kafka 高级特性 分区 分配策略 Ranger、RoundRobin、Sticky、自定义分区器-LMLPHP

分区分配策略

在Kafka中,每个Topic会包含多个分区,默认情况下一个分区只会被一个消费组下面的一个消费者消费,这里就产生了分区分配的问题。
Kafka中提供了多重分区分配算法(PartitionAssignor):

  • RangeAssignor
  • RoundRobinAssignor
  • StickAssignor

大数据-67 Kafka 高级特性 分区 分配策略 Ranger、RoundRobin、Sticky、自定义分区器-LMLPHP

RangeAssignor

  • PartionAssignor 接口用于用户自定义分区分配算法,以实现Consumer之间的分区分配。
  • 消费组的成员定义他们感兴趣的Topic并将这种订阅关系传递给作为订阅组协调者的Broker,协调者选择其中一个消费者来执行这个消费组的分区,并将分配结果转发给消费组内所有的消费者。
  • Kafka默认采用的是RangeAssignor的分配算法。

大数据-67 Kafka 高级特性 分区 分配策略 Ranger、RoundRobin、Sticky、自定义分区器-LMLPHP

  • RangeAssignor对每个Topic进行独立的分区分配,对于每一个Topic,首先对分区按照分区ID进行数值排序,然后订阅这个Topic的消费组的消费者再进行字典排序,之后尽量均衡的将分区分配给消费者,这里只能是尽量均衡,因为分区数可能无法被消费者数量整除,有一些消费者就会多分配到一些分区。
  • RangeAssignor策略的原理是按照消费者总数和分区总数进行整除运行来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能的均匀的分配给所有的消费者。
  • 对于每一个Topic,RangerAssignor策略会将消费组内所有订阅这个Topic的消费者按照名称的字典序排序,然后为每个消费者划分固定的分区范围,如果不够分配均衡,那么字典序靠前的消费者会被多分配一个分区。

大数据-67 Kafka 高级特性 分区 分配策略 Ranger、RoundRobin、Sticky、自定义分区器-LMLPHP

RoundRobinAssignor

  • RoundRobinAssignor的分配策略是将消费组内订阅的所有Topic的分区及所有消费者进行排序后尽量均衡的分配(RangeAssignor是针对单个Topic的分区进行排序分配的)。
  • 如果消费组内,消费订阅的Topic列表是相同的(每个消费者都订阅了相同的Topic),那么分配结果尽量均衡。如果订阅的Topic列表是不同的,那么分配结果不保证尽量均衡。

大数据-67 Kafka 高级特性 分区 分配策略 Ranger、RoundRobin、Sticky、自定义分区器-LMLPHP

  • 对于 RangeAssignor,在订阅多个Topic的情况下,RoundRobinAssignor的方式能让消费者之间尽量均衡的分配到分区(分配到的分区的差值不会超过1,而RangeAssignor的分配策略可能随着订阅的Topic越来越多,差值越来越大)
  • 对于消费组内消费者订阅Topic不一致的情况:假设有两个消费者分别为C0和C1,有2个TopicT1、T2,分别有3个分区、2个分区,并且 C0 订阅了T1和T2,那么RoundRobinAssignor的分配结果如下:

大数据-67 Kafka 高级特性 分区 分配策略 Ranger、RoundRobin、Sticky、自定义分区器-LMLPHP

StickyAssignor

尽管 RoundRobinAssignor 已经在 RangeAssignoror 上做了一些优化来更均衡的分配分区,但是在一些情况下依旧会产生严重的分配偏差,比如消费组中订阅的Topic列表不相同的情况下。
更核心的问题是无论是 RangeAssignor,还是RoundRobinAssignor,当前的分区分配算法都没有考虑上一次的分配结果。显然,在执行一次新的分配之前,如果能考虑到上一次的分配结果,尽量少的调整分区分配的变动,显然是能减少很多开销的。
Sticky是“粘性的”,可以理解为分配是带粘性的:

  • 分区的分配尽量的均衡
  • 每一次重分配的结果尽量与上一次分配结果保持一致

当这两个目标发生冲突时,优先保证第一个目标。第一个目标是每个分配算法都尽量尝试去完成的,而第二个才是真正体现出 StickyAssignor 特性的。

假设当前有如下内容:

  • 3个Consumer C0、C1、C2
  • 4个Topic:T0、T1、T2、T3 每个Topic有2个分区
  • 所有Consumer都订阅了4个分区

大数据-67 Kafka 高级特性 分区 分配策略 Ranger、RoundRobin、Sticky、自定义分区器-LMLPHP
如果 C1 宕机,此时 StickyAssignor 的结果:
大数据-67 Kafka 高级特性 分区 分配策略 Ranger、RoundRobin、Sticky、自定义分区器-LMLPHP

自定义分区策略

基本概念

需要实现:org.apache.kafka.clients.consumer.internals.PartitionAssignor 接口
其中定义了两个内部类:

  • Subscription:用来表示消费者的订阅信息,类中有两个属性:topics、userData,分别表示消费者所订阅Topic列表和用户自定义信息。
  • PartitionAssignor接口通过subscription()方法来设置消费者自身相关的Subscription信息,注意此方法中只有一个参数Topics,与Subscription类中的topics相互呼应,但是并没有有关userData的参数体现。为了增强用户对分配结果的控制,可以在Subscription()方法内部添加一些影响分配的用户自定义信息赋予userData,比如:权重、IP地址、HOST或者机架
  • Assignment:用来表示分配信息的,类中有两个属性:partitions、userData,分别表示所分配到的分区集合和用户自定义的数据,可以通过PartitonAssignor接口中的onAssignment()方法是在每个消费者收到消费组Leader分配结果时的回调函数,例如在:StickyAssignor策略中就是通过这个方法保存当前的分配方案,以备下次消费组再平衡(Rebalance)时可以提供分配参考依据。

Kafka还提供了一个抽象类:org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor,它可以简化 PartitionAssignor 接口的实现,对 assign() 方法进行了实现,其中将Subscription的 userData信息去掉后,在进行分配。

代码实现

import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;

import java.nio.ByteBuffer;
import java.util.*;

public class WeightedPartitionAssignor implements ConsumerPartitionAssignor {

    @Override
    public Subscription subscription(Set<String> topics) {
        // 在这里添加权重信息到 userData
        ByteBuffer buffer = ByteBuffer.allocate(4);
        buffer.putInt(getWeight());
        buffer.flip();
        return new Subscription(new ArrayList<>(topics), buffer);
    }

    @Override
    public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {
        Map<String, Assignment> assignments = new HashMap<>();
        Map<TopicPartition, List<String>> partitionConsumers = new HashMap<>();

        // 遍历所有订阅的topics
        for (String topic : metadata.topics()) {
            List<TopicPartition> partitions = metadata.partitionsForTopic(topic);
            for (TopicPartition partition : partitions) {
                partitionConsumers.putIfAbsent(partition, new ArrayList<>());
            }
        }

        // 根据权重分配分区
        for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) {
            String consumerId = subscriptionEntry.getKey();
            Subscription subscription = subscriptionEntry.getValue();
            int weight = subscription.userData().getInt();

            for (String topic : subscription.topics()) {
                List<TopicPartition> partitions = metadata.partitionsForTopic(topic);
                for (TopicPartition partition : partitions) {
                    List<String> consumers = partitionConsumers.get(partition);
                    for (int i = 0; i < weight; i++) {
                        consumers.add(consumerId);  // 权重高的消费者多次添加,增加选中的机会
                    }
                }
            }
        }

        // 随机分配分区给消费者
        Random random = new Random();
        for (Map.Entry<TopicPartition, List<String>> entry : partitionConsumers.entrySet()) {
            List<String> consumers = entry.getValue();
            String assignedConsumer = consumers.get(random.nextInt(consumers.size()));
            assignments.computeIfAbsent(assignedConsumer, k -> new Assignment(new ArrayList<>()))
                       .partitions().add(entry.getKey());
        }

        return assignments;
    }

    @Override
    public void onAssignment(Assignment assignment, Cluster metadata) {
        // 可以在这里处理分配后的逻辑,比如保存当前分配的快照
    }

    @Override
    public String name() {
        return "weighted";
    }

    private int getWeight() {
        // 获取权重,可以从配置文件或环境变量中获取
        return 10; // 默认权重为10
    }
}

注册使用

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, WeightedPartitionAssignor.class.getName());
// 配置其他消费者属性

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic1", "topic2"));
08-07 08:11