ConsumerPartitionAssignor
在 rebalance 协议里面,group leader 在收到 JoinGroup Response 之后,会按照其中指定的分配策略完成 partition 分配,每个 partition 分配策略就是一个 ConsumerPartitionAssignor 接口的实现。下图展示了 ConsumerPartitionAssignor 继承结构:
ConsumerPartitionAssignor 接口中定义了五个内部类,我们一个个来说:
Subscription 中保存了 consumer 参加 rebalance 之前已经订阅的 topic(topics字段)、已分配的 partition(ownedPartitions 字段,
List<TopicPartition>
),以及在 consumer 端配置的 groupInstanceId(在 Static Membership 优化中使用)。Assignment 中保存了 partition 的分配结果,partitions 字段(
List<TopicPartition>
)表示的是分配给某 consumer 的 TopicPartition 集合,userData 是用户自定义的数据。GroupAssignment 底层通过
Map<String, Assignment>
维护全部的 partition 分配结果。GroupSubscription 底层通过
Map<String, Subscription>
记录全部 consumer 目前已分配的 partition 信息。这两个 Map 的 Key 都是 consumer 在 consumer group 中的唯一标识 —— MemberId。RebalanceProtocol 是个枚举,可选值有 EAGER、COOPERATIVE 两个,对应了我们前面介绍的两个 rebalance 协议的名称。
接下来看 ConsumerPartitionAssignor 的方法:
- assign() 方法是其中最核心的方法 ,它接收集 kafka 集群元数据信息以及每个 consumer 目前的 partition 订阅情况,返回的是 GroupAssignment,也就是 partition 的分配结果。
- 在每个 consumer 收到 group leader 发来的 partition 分配结果时,会调用 onAssignment() 这个回调方法,此调用发生在解析 SyncGroupResponse 之后。
- supportedProtocols() 方法用于获取当前 ConsumerPartitionAssignor 支持的 rebalance 协议。
AbstractPartitionAssignor 抽象类实现了 ConsumerPartitionAssignor 接口,其中主要是对 assign() 方法进行了简单的实现,具体实现如下:
1 | public GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscription) { |
RangeAssignor
下面来看 ConsumerPartitionAssignor 具体的实现类,RangeAssignor 策略的原理是按照 consumer 总数和 partition 总数进行整除运算来获得一个跨度,然后将 partition 按照跨度进行平均分配,以保证 partition 尽可能均匀地分配给所有的 consumer。
对于每一个 topic,RangeAssignor 策略会将 consumer group 内所有订阅这个 topic 的 consumer 按照名称的字典序排序,然后为每个 consumer 划分固定的 partition 范围,如果不够平均分配,那么字典序靠前的 consumer 会被多分配一个分区。
假设 n = partition_num / consumer_num
,m = partition_num % consumer_num
,那么前 m 个消费者每个分配 n+1 个 partition,后面的 consumer_num - m
个消费者每个分配 n 个 partition。我们举个栗子,假设 consumer group 内有 2 个 consumer,都订阅了 t0 和 t1 两个 topic,并且每个 topic 都有 4 个 partition,那么通过 RangeAssignor 分配的结果如下:
1 | consumer0:t0p0、t0p1、t1p0、t1p1(其中,t0p0 是 topic0 中的 partition0) |
看上去 RangeAssignor 的分配是相对均匀的,但是在有的场景中,分配结果就会出现不均匀的情况。假设上面的示例中,每个 topic 有三个 partition,RangeAssignor 的分配结果如下:
1 | consumer0:t0p0、t0p1、t1p0、t1p1 |
明显可以看到,这种分配结果显然是不均匀的,随着不均匀的 topic 越来越多,会导致 consumer 处理的 partition 不均衡。
RoundRobinAssignor
RoundRobinAssignor 分配策略的原理是将 consumer group 内所有 consumer 以及所有 TopicPartition 按照字典序排序,然后通过轮询方式逐个将 partition 分配给每个 consumer。
如果同一个 consumer group 内所有 consumer 订阅的 topic 都是相同的, RoundRobinAssignor 策略的分配结果是均匀的。依旧是上面两个 t0、t1 两个 topic,每个 topic 三个 partition,RoundRobinAssignor 的分配结果为:
1 | consumer0:t0p0、t0p2、t1p1 |
但如果 consumer group 内的 consumer 订阅了不同的 topic,有可能会导致分配结果不均匀。例如,假设 consumer group 内有 3 个 consumer,订阅了 3 个 topic,每个 topic 有 3 个 partition。其中,consumer0 订阅了 topic0,consumer1 订阅了 t0 和 t1 ,consumer2 订阅的是 t0、t1 和 t2,最终的分配结果如下:
1 | consumer0:t0p0 |
StickyAssignor
StickyAssignor 是一种黏性分配策略,它的主要目的是在分配均匀的前提下,尽可能的与上次分配结果保持一致。StickyAssignor策略的具体实现要比RangeAssignor和RoundRobinAssignor这两种分配策略要复杂很多。
下面我们通过一个示例来说明 StickyAssignor 的分配效果。假设 consumer group 内有 3 个 consumer,他们同时订阅了 4 个topic,每个 topic 有 2 个 partition。最终的分配结果如下:
1 | consumer0:t0p0、t1p1、t3p0 |
此时,consumer1 如果宕机离开 consumer group,那么 consumer group 会执行rebalance,新的分配结果就变成了:
1 | consumer0:t0p0、t1p1、t3p0、t2p0 |
而如果使用 RoundRobinAssignor ,分配的结果是:
1 | consumer0:t0p0、t1p0、t2p0、t3p0 |
比较这两个策略的分配结果可以看出,StickyAssignor 保留了上一次分配的全部结果,只是将 consumer1 负责的 partition 分配给了 consumer0 和 consumer1。
再举个栗子,三个 consumer订阅了三个 topic,每个 topic 三个 partition,consumer0 订阅了 t0,consumer1 订阅了 t0 和 t1,consumer2 订阅了 t0、t1 和 t2。此时采用 RoundRobinAssignor 策略得到的不均匀结果前面说明了,这里不再重复。如果使用 StickyAssignor 策略,分配结果为:
1 | consumer0:t0p0 |
如果此时 consumer0 宕机,RoundRobinAssignor策略产生的新分配结果为:
1 | consumer1:t0p0、t1p1 |
而使用 StickyAssignor 策略产生的新分配结果为:
1 | consumer1:t1p0、t1p1、t0p0 |
显然,StickyAssignor 策略比其他两种分配策略更加优秀。
下面来看一下 StickyAssignor 的实现吧,为啥不看 RangeAssignor、RoundRobinAssignor 的实现呢?因为简单,简单到从原理直接就可以给出代码。
具体实现分析
要看 StickyAssignor 先父类 AbstractStickyAssignor,它的 assign() 方法实现如下:
1 | public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, |
allSubscriptionsEqual() 方法中做了两件事,一件事是检查各个 consumer 订阅的 topic 是否一致,体现在该方法的返回值中,另一件事是填充 consumerToOwnedPartitions 集合,这个集合填充的是当前各个 consumer 处理的 TopicPartition 集合。深入 allSubscriptionsEqual() 方法可以看到填充 consumerToOwnedPartitions 集合的数据来源与 memberData() 这个抽象方法,StrickyAssignor 的 实现是从 Subscription.userData 中得到的。
如果 consumer group 中的 consumer 都订阅了同一个 topic,会进入 constrainedAssign() 方法这个分支,走优化算法。constrainedAssign() 方法的核心步骤是:
1、 执行一些准备工作,例如:
- 计算每个 consumer 能够分配的最大 partition 数(maxQuota)和最小 partition 数(minQuota)
- 用来存储分配 partition 已达上限的 consumer 的集合(maxCapacityMembers 集合)
- 用来存储分配 partition 已达下限的 consumer 的集合(minCapacityMembers 集合)
- 用来存储分配 partition 未达下限的 consumer 的集合(unfilledMembers 集合)。
- 存储全部未分配的 TopicPartition 的 unassignedPartitions 集合。
- 存储从任意 consumer revoke 回来的 TopicPartition 的 allRevokedPartitions 集合。
- 存储最终分配结果的 assignment 集合(
Map<String, List<TopicPartition>>
)。
2、 根据 consumerToOwnedPartitions 重新分配 partition,分配出去的 partition 将从 unassignedPartitions 中删除:
- 碰到 TopicPartition 数超过 maxQuota 的 consumer,多出来的 TopicPartition 进入 allRevokedPartitions 集合。
- 碰到 TopicPartition 数不足 minQuota 的 consumer,consumer 进入 unfilledMembers 集合。
- 碰到 TopicPartition 数等于 maxQuota 或 minQuota 的consumer,consumer 进入 maxCapacityMembers 或 minCapacityMembers 集合。
3、 如果 unfilledMembers、unassignedPartitions 两个集合不为空,则表示同时有未达标的 consumer 和未分配的 partition,开始将 unassignedPartitions 中的 TopicPartition 分配给 unfilledMembers 中的 consumer。
4、 如果 unfilledMembers 还不为空,说明给 maxCapacityMembers 集合中 consumer 分配 partition 分多了,就要开始从 maxCapacityMembers 的 consumer 那里偷 partition 了,但是要有个限度,不能把原来的 maxCapacityMembers 的 consumer 薅到 minQuota 以下。
5、 最后,来处理 unfilledMembers 为空,但是 unassignedPartitions 不为空的情况,这说明 minCapacityMembers 里面有太多 consumer 摸鱼,这里就要将 unassignedPartitions 中 partition 分配给 minCapacityMembers 中的摸鱼 consumer,提高他们的负载,但是也是有上限的,给他们分配再多,不能超过 maxQuato。
经过上面的这一通处理,assignment 集合就是最终的分配结果了。
如果 consumer group 中的 consumer 并没有订阅了同一个 topic,会进入 generalAssign() 方法这个分支。generalAssign() 方法涉及到的核心数据结构如下:
- consumer2AllPotentialPartitions(
Map<String, List<TopicPartition>>
)、partition2AllPotentialConsumers(Map<TopicPartition, List<String>>
),因为 consumer group 中的 consumer 订阅的 topic 不同,所以需要记录每个 consumer 能分配哪些 TopicPartition(consumer2AllPotentialPartitions),每个 TopicPartition 能分配给哪些 consumer(partition2AllPotentialConsumers) 。 - partitionsPerTopic: 每个 topic 有多少过 partition。
- subscriptions:每个 consumer 订阅了哪些 topic。
- currentAssignment(
Map<TopicPartition, ConsumerGenerationPair>
)记录了当前每个 consumer 分配的 partition 集合(就是此轮 rebalance 开始之前的 partition 分配结果)。后面会基于这个集合,得出此轮 rebalance 的结果。 - currentPartitionConsumer:记录了当前每个 Partition 分配给了哪个 Consumer,其实就是这个集合的翻转 currentAssignment (即 consumer –>
List<TopicPartition>
转成了 TopicPartition –> consumer)。 - sortedPartitions:TopicPartition 的有序集合,排序规则是按照 TopicPartition 能分配给多少 consumer 顺序排的。
- unassignedPartitions:记录还未分配出去的 TopicPartition。
- sortedCurrentSubscriptions:consumer 的有序集合,排序规则是按照目前已分配的 partition 数进行排序的。
- reassignablePartitions:可重新分配的 TopicPartition 集合,
- fixedPartitions:不可重新分配的 TopicPartition 集合。
- fixedAssignments:不可修改其分配结果的 consumer 集合。
generalAssign() 算法的核心步骤如下:
1、更新 currentAssignment 集合,这里会删除不存在的 consumer 和 TopicPartition。
2、分配 unassignedPartitions 集合中的 TopicPartition:这里会按照 sortedCurrentSubscriptions 集合的顺序分配 TopicPartition(当然,前提是 consumer 订阅了这个 TopicPartition)。这一轮操作下来之后,可能还有 TopicPartition 未分配出去,也可能有 consumer 没有分配任何 TopicPartition。
3、查询分配结果不能修改分配结果的 consumer,将这些 consumer 记录到 fixedAssignments 集合,同时将这些 consumer 从 currentAssignment 集合删除。(缩小问题范围)
4、接下来按照 sortedPartitions 集合的顺序,处理可以重新分配的 TopicPartition。如果移动一个 TopicPartition 可以提高整体的平衡性的话,就进行移动,直至达到 balance 的状态。
5、如果在步骤 4 中移动了 TopicPartition,但是没有提高平衡性( balance score)的话,就需要根据黏性条件,决定是否回滚到原来的分配结果。
6、最后,将 fixedAssignments 中记录的 consumer 重新添加回 currentAssignment 集合。
在 generalAssign() 方法中判断一次分配结果是否平衡的结果有两个:
- consumer 分配的 TopicPartition 数量只差不超过 1。
- 不存在重新分配 TopicPartition 提高平衡性的可能。这里平衡性(balance score)的计算方式是:所有 consumer 分配的 TopicPartition 数量的差值,这个差值越小,分配结果越平衡。
StickyAssignor 的算法核心与 AbstractStickyAssignor 一致,StickyAssignor 实现了序列化和反序列化 MemebrData 的方法,以及 onAssignment() 方法,这些方法没有什么难度,这里不展开了。
CooperativeStickyAssignor
CooperativeStickyAssignor 与 StickyAssignor 一样,都继承了 AbstractStickyAssignor 抽象类,其 assign() 方法首先会调用 super.assign() 方法完成 partition 分配。在分配过程中,如果一个 TopicPartition 发生了迁移(也就是更换了 consumer),则将该 TopicPartition 记录到 partitionsTransferringOwnership 集合中。最后,调用 adjustAssignment() 方法从分配结果中删除 partitionsTransferringOwnership 集合,从而满足 Incremental Cooperative Rebalancing 协议的需求。
总结
本课时重点介绍了 kafka consumer 中 partition 分配策略的具体实现。下一课时开始介绍 KafkaConsumer 的核心实现。