(基于0.10版本)

Group Management Protocol

Kafka的coordiantor要做的事情就是group management,就是要对一个团队(或者叫组)的成员进行管理。Group management就是要做这些事情:

  • 维持group的成员组成。这包括允许新的成员加入,检测成员的存活性,清除不再存活的成员。
  • 协调group成员的行为。

Kafka为其设计了一个协议,就收做Group Management Protocol.

很明显,consumer group所要做的事情,是可以用group management 协议做到的。而cooridnator, 及这个协议,也是为了实现不依赖Zookeeper的高级消费者而提出并实现的。只不过,Kafka对高级消费者的成员管理行为进行了抽象,抽象出来group management功能共有的逻辑,以此设计了Group Management Protocol, 使得这个协议不只适用于Kafka consumer(目前Kafka Connect也在用它),也可以作为其它"group"的管理协议。

那么,这个协议抽象出来了哪些group management共有的逻辑呢? Kafka Consumer的AbstractCoordinator的注释给出了一些答案。

AbstractCoordinator

首先,AbstractorCoordinator是位于broker端的coordinator的客户端。这段注释里的"The cooridnator"都是指broker端的那个cooridnator,而不是AbstractCoordiantor。AbstractCoordinator和broker端的coordinator的分工,可以从注释里大致看出来。这段注释说,Kafka的group management protocol包括以下的动作序列:

  • Group Registration:Group的成员需要向cooridnator注册自己,并且提供关于成员自身的元数据(比如,这个消费成员想要消费的topic)
  • Group/Leader Selection:cooridnator确定这个group包括哪些成员,并且选择其中的一个作为leader。
  • State Assignment: leader收集所有成员的metadata,并且给它们分配状态(state,可以理解为资源,或者任务)。
  • Group Stabilization: 每个成员收到leader分配的状态,并且开始处理。

这里边有三个角色:coordinator, group memeber, group leader.

有这么几个情况:

  1. 所有的成员要先向coordinator注册,由coordinator选出leader, 然后由leader来分配state。这里存在着3个角色,其分工并不像storm的nimbus和supervisor或者其它的master-slave系统一样,而更类似于Yarn的resource manager, application master和node manager. 它们也都是为了解决扩展性的问题。单个Kafka集群可能会存在着比broker的数量大得多的消费者和消费者组,而消费者的情况可能是不稳定的,可能会频繁变化,每次变化都需要一次协调,如果由broker来负责实际的协调工作,会给broker增加很多负担。所以,从group memeber里选出来一个做为leader,由leader来执行性能开销大的协调任务, 这样把负载分配到client端,可以减轻broker的压力,支持更多数量的消费组。
  2. 但是leader和follower具体的行为是怎么样的呢?follower的心跳直接发给leader吗?state assign是leader直接发送给follower的吗?
    1. 这里肯定与YARN有所不同,毕竟Kafka并不存在像NodeManager一样的东西。也就是说如果leader至少需要向coordinator发heartbeat。
    2. YARN的RM是只负责资源分配的,Kafka的coordinator按照上面注释的说法还需要确定group的成员,即使在leader确定后,leader也不负责确定group的成员,可以推断出,所有group member都需要发心跳给coordinator,这样coordinator才能确定group的成员。为什么心跳不直接发给leader呢?或许是为了可靠性。毕竟,leader和follower之间是可能存在着网络分区的情况的。但是,coordinator作为broker,如果任何group member无法与coordinator通讯,那也就肯定不能作为这个group的成员了。这也决定了,这个Group Management Protocol不应依赖于follower和leader之间可靠的网络通讯,因为leader不应该与follower直接交互。而应该通过coordinator来管理这个组。这种行为与YARN有明显的区别,因为YARN的每个节点都在集群内部,而Kafka的client却不是集群的一部分,可能存在于这种网络环境和地理位置。
    3. 对于Kafka consumer,它的实际上必须跟coordinator保持连接,因为它还需要提交offset给coordinator。所以coordinator实际上负责commit offset,那么,即使leader来确定状态的分配,但是每个partition的消费起始点,还需要coordinator来确定。这就带来了一问题,每个partition的消费开始的offset是由leader向coordinator请求,然后做为state分配,还是leader只分配partition,而follower去coordinator处请求开始消费的offset?

要回答这些问题,就要看代码了。AbstractCoordinator的注释还没完,它接下来这么说:

这是说AbstractCoordinator的实现必须实现三个方法: metadata(), performAssignment(String, String, Map)和onJoinComplete(int, String, String, ByteBuffer)。

从这三个方法入手,可以了解Group Management Protocol的一些细节。

Metadata

metadata()

这个方法返回的是这个group member所支持的协议,以及适用于生个协议的protocol。这些数据会提交给coordinator,coordinator会考虑到所有成员支持的协议,来为它们选择一个通用的协议。

下面看一下ConsumerCoordinator对它的实现:

    @Override
public List<ProtocolMetadata> metadata() {
List<ProtocolMetadata> metadataList = new ArrayList<>();
for (PartitionAssignor assignor : assignors) {
Subscription subscription = assignor.subscription(subscriptions.subscription());
ByteBuffer metadata = ConsumerProtocol.serializeSubscription(subscription);
metadataList.add(new ProtocolMetadata(assignor.name(), metadata));
}
return metadataList;
}

在这里,consumer提供给每个协议的metadata都是一样的,是Subscription对象包含的数据。Subscription是PartitionAssignor的一个内部类,它有两个field

    class Subscription {
private final List<String> topics;
private final ByteBuffer userData;
...
}

也就是说,consumer提供给coordinator的有两部分信息:1. 它订阅了哪些topic 2. userData。对于consumer, userData实际上是一个空数组。不过PartitionAssignor这么定义Subscription是有其用意的,userData是干啥的呢?再看一下PartitionAssgnor的注释。这也有助于了解ConsumerCoordinator#metadata()方法时使用的assignors是哪来的。

PartitionAssignor

这段注释也回答了一些之前在分析AbstractCoordinator的注释时提出的问题。这段注释提供了以下几点信息

  1. PartitionAssignor这个接口是用来定义KafkaConsumer所用的“分区分配策略”. 用户可以实现这个接口,以定义自己所需的策略。
  2. consumer group的成员把它们所订阅的topic发送给coordinator。然后coordinator来选择一个leader, 然后由coordinator把这个group的所有成员的订阅情况发给leader,由leader来执行分区的分配。
  3. leader调用PartitionAssignor的assign方法,来执行分区,然后把结果发给coordinator。由coordinator来转发分配的结果到每个group的成员。
  4. 有时候,需要利用各个consumer的额外的信息来决定分配结果,比如consumer所在的机架情况。这时候,在实现PartitionAssignor时,就可以覆盖subscription(Set)方法,在其返回的Subscription对象中提供自己需要的userData。

俺觉得,某些资源调度框架可能会受益于自定的PartitionAssignor,除了rack-aware之外,它们还可以根据每个机器上分配的consumer个数以及机器的性能来更好地进行负载匀衡。而且,这个东东也可以用来实现partition分配的“粘性”,即某个consumer可以一直被分配特定的分区,以便于它维持本地的状态。

performAssignment

这里leader Id, allMemeberMetadata都是Coordinator通过JoinGroupRespone发给leader的。leader基于这些信息做出分配,然后把分配结果写在SyncGroupRequest里发回给cooridnator,由Cooridnator把每个member被分配的状态发给这个member。

下面来看一下ConsumerCooridnator对这个方法的实现:

    @Override
protected Map<String, ByteBuffer> performAssignment(String leaderId,
String assignmentStrategy,
Map<String, ByteBuffer> allSubscriptions) {
//根据coordinator选择的协议确定PartitionAssignor
PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
if (assignor == null)
throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy); //确定当前group订阅的所有topic,以及每个member订阅了哪些topic
Set<String> allSubscribedTopics = new HashSet<>();
Map<String, Subscription> subscriptions = new HashMap<>();
for (Map.Entry<String, ByteBuffer> subscriptionEntry : allSubscriptions.entrySet()) {
Subscription subscription = ConsumerProtocol.deserializeSubscription(subscriptionEntry.getValue());
subscriptions.put(subscriptionEntry.getKey(), subscription);
allSubscribedTopics.addAll(subscription.topics());
} // the leader will begin watching for changes to any of the topics the group is interested in,
// which ensures that all metadata changes will eventually be seen
//leader会监听这个group订部的所有topic的metadata的变化
this.subscriptions.groupSubscribe(allSubscribedTopics);
metadata.setTopics(this.subscriptions.groupSubscription()); // update metadata (if needed) and keep track of the metadata used for assignment so that
// we can check after rebalance completion whether anything has changed
//根据需要更新metadata,并且记录assign时用的metadata到assignmentSnapshot里
client.ensureFreshMetadata();
assignmentSnapshot = metadataSnapshot; log.debug("Performing assignment for group {} using strategy {} with subscriptions {}",
groupId, assignor.name(), subscriptions); //执行分配。metadata.fetch会获得当前的metadata,由于KafkaConsumer是单线程的,所以这里fetch的metadata和前边保存的是一致的
Map<String, Assignment> assignment = assignor.assign(metadata.fetch(), subscriptions); log.debug("Finished assignment for group {}: {}", groupId, assignment); //生成groupAssignment。它指明了哪个group member该消费哪个TopicPartition
Map<String, ByteBuffer> groupAssignment = new HashMap<>();
for (Map.Entry<String, Assignment> assignmentEntry : assignment.entrySet()) {
ByteBuffer buffer = ConsumerProtocol.serializeAssignment(assignmentEntry.getValue());
groupAssignment.put(assignmentEntry.getKey(), buffer);
} return groupAssignment;
}

这里的Assignor有两种: RangeAssignor和RoundRobinAssignor。

两者都是把一个Topic的分区依次分给所有订阅这个topic的consumer.以t表示topic, c表示consumer,p表示partition, 字母后边的数字表示topic, partiton, consumer的id。

RangeAssignor与RoundRobinAssignor的区别在于对于一个topic的分区的分配,是否会受到其它topic分区分配的影响。

RangeAssignor

RangeAssignor对于每个topic,都是从consumer0开始分配。比如,topic0有3个分区,订阅它的有两个consumer。那么consumer0会分到t0p0和t0p1, 而consumer1会分到t0p2.

如果它两个consumer也都订阅了另一个有三个分区的topic1, 那么consumer0还会分到t1p0和t1p1,而consumer1会分到t1p2。具体的算法RangeAssignor的JavaDoc有描述。

可见RangeAssignor有某些情况下是不公平的,在上边的例子中,如果这两个consumer订阅了更多有三个分区的topic,那么consumer0分配的partition数量会一直是consumer1的两倍。

RoundRobinAssignor

RoundRobinAssignor会首先把这个group订阅的所有TopicPartition排序,排序是先按topic排序,同一个topic的分区按partition id排序。具体的算法RoundRobinAssignor的JavaDoc有描述。比如,假如有两个各有三个分区的topic,它们的TopicPartition排序后为t0p0 t0p1 t0p2 t1p0 t1p1 t1p2。

分配时会把这个排序后的TopicPartition列表依次分配给订阅它们的consumer。比如c0和c1都订阅了这两个topic, 那么分配结果是

t0p0t0p1 t0p2t1p0t1p1t1p2
c0c1c0c1c0c1

这样c0分到了: t0p0, t0p2, t1p2.    c1分到了: t0p1, t1p0, t1p2

如果有三个consumer,

c0订阅了t0, t1, t3.

c1订阅了t0, t2, t4。

c2订阅了t0, t2, t4。

t0有两个分区,而其它topic都只有一个分区。

那么排序后的TopicPartition以及分配的结果为

t0p0t0p1t1p0t2p0t3p0t4p0
c0c1c0c1c0c1

可见c3干脆就分不到分区了。所以RoundRobinAssignor也不能保证绝对公平。不过这只是比较极端的例子。

onJoinComplete

    /**
* Invoked when a group member has successfully joined a group.
* @param generation The generation that was joined
* @param memberId The identifier for the local member in the group
* @param protocol The protocol selected by the coordinator
* @param memberAssignment The assignment propagated from the group leader
*/
protected abstract void onJoinComplete(int generation,
String memberId,
String protocol,
ByteBuffer memberAssignment);

ConsumerCoordinator对它的实现是:

    @Override
protected void onJoinComplete(int generation,
String memberId,
String assignmentStrategy,
ByteBuffer assignmentBuffer) {
// if we were the assignor, then we need to make sure that there have been no metadata updates
// since the rebalance begin. Otherwise, we won't rebalance again until the next metadata change
if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot)) {
subscriptions.needReassignment();
return;
} PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
if (assignor == null)
throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy); Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer); // set the flag to refresh last committed offsets
subscriptions.needRefreshCommits(); // update partition assignment
subscriptions.assignFromSubscribed(assignment.partitions()); // give the assignor a chance to update internal state based on the received assignment
assignor.onAssignment(assignment); // reschedule the auto commit starting from now
if (autoCommitEnabled)
autoCommitTask.reschedule(); // execute the user's callback after rebalance
ConsumerRebalanceListener listener = subscriptions.listener();
log.info("Setting newly assigned partitions {} for group {}", subscriptions.assignedPartitions(), groupId);
try {
Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions());
listener.onPartitionsAssigned(assigned);
} catch (WakeupException e) {
throw e;
} catch (Exception e) {
log.error("User provided listener {} for group {} failed on partition assignment",
listener.getClass().getName(), groupId, e);
}
}

首先,对于leader来说,它要检查一下进行分配时的metadata跟当前的metadata是否一致,不一致的话,就标记下需要重新协调一次assign.

如果不存在上边的情况,就做以下几个事情:

  • 设置“需要刷新last committed offset"的标志
  • 更新这个conumser所采集的TopicPartition集合
  • 调用Assignor的onAssignment方法,设Assignor来处理一下自己的内部状态
  • 重新调度autoCommit任务。这个任务用于周期性地 commit offset
  • 调用ConsumerRebalanceListener。这个Listener是用户传给KafkaConsumer的。

这里需要注意的是,所有KafkaConsumer的操作都是在一个线程完成的,而且大部分都是在poll这个方法调用中完成。所以上边代码中的

subscriptions.needReassignment()和subscriptions.needRefreshCommits()

这些方法,都是改变了subscription对象的状态,并没有直正执行reassign和refresh commit操作。KafkaConsumer在执行poll方法时,会检查这subscription对象的状态,然后执行所需要的操作。所以,代码里这两句

        // set the flag to refresh last committed offsets
subscriptions.needRefreshCommits(); // update partition assignment
subscriptions.assignFromSubscribed(assignment.partitions());

当freshCommit执行时,第二句assignFromSubscribed已经执行完了,所以是获取分配给这个consumer的所有partition的last committed offset.

Kafka Client-side Assignment Proposal

Kafka Cooridnator的具体行为,可以参照这篇wiki。

04-25 04:13
查看更多