consumer group rebalance 原理
在开始分析 KafkaConsumer 的具体实现之前,我们先来介绍一下 KafkaConsumer 涉及到的一些基础理论。在第一课时介绍 Consumer Group 时提到,对于同一个 Consumer Group 来说,同一个 Topic 的不同 partition 会分配给不同的 consumer 进行消费,那如何分配 partition,如何在有新 consumer 加入以及 consumer 宕机的时候重新分配 partition,就是我们说的 consumer group rebalance。
初始方案
在 Kafka 最初始的解决方案中,是依赖 Zookeeper 的 Watcher 实现的。该方案中,每个 Consumer Group 在 Zookeeper 下都维护了一个对应的 /consumers/{group_id}/ids
路径,该路径下使用临时节点记录该 Consumer Group 中的 Consumer Id,这个 Consumer Id 临时节点在 Consumer 启动时创建。另外,kafka 还会创建 owners
和 offsets
两个节点,这两个节点与 ids
节点同级,其中 owners
记录了 consumer 与 partition 的分配关系;offsets
节点用来记录了对应 Consumer Group 在相应 partition 上的消费位置。
除了 consumer group 相关的 Zookeeper 节点之外,kafka 还会在 Zookeeper 上维护集群的元数据,如下图所示:
每个 Consumer 都会在 /consumers/{group_id}/ids
和 /brokers/ids
节点上注册一个 Watcher,当 /consumers/{group_id}/ids
的子节点发生变化时,表示 Consumer Group中的消费者出现了变化;当 /brokers/ids
子节点发生变化时,表示 kafka 集群中的 broker 出现了增减。通过上述两个 Watcher,consumer 就可以监控 Consumer Group 状态以及 Kafka 集群的状态了。
原理上,该方案是可行的,但是重度依赖于 Zookeeper 集群,这就会导致一些问题:
当 consumer 很多的时候,一个 consumer 或是一个 broker 的变化(加入、退出 consumer group或 broker 重启),就会导致全部 consumer 被通知,这个时候,很多 consumer 并不关心这个事件,过多无用的 Watcher 通知会给 Zookeeper 集群带来压力,也会给 consumer 带来压力。
每个 consumer 都依赖 Zookeeper 集群来判断 Consumer Group 状态、Broker 的状态,但是不同 consumer 在同一时刻可能连接到 Zookeeper 集群的不同节点,由于Zookeeper 只保证
最终一致性
(不保证强一致性),那不同 consumer 看到的元数据就可能不一样,这就会造成多余的 Reblance 操作。
Eager Rebalance Protocol
为了解决上述问题,Kafka 在后续版本对 Rebalance 方案进行了改进(也就是 Eager Rebalance Protocol),改进方案的核心设计思想是:将全部的 consumer group 分成多个子集,每个 consumer group 集合在 broker 对应一个 GroupCoordinator,由 GroupCoordinator 管理对应 consumer groups 的 rebalance(每个 broker 都拥有成为 GroupCoordinator 的能力)。
kafka 通过加一层 GroupCoordinator 的方式,让所有 consumer 不再直接依赖 Zookeeper,而是靠 GroupCoordinator 在Zookeeper 上添加 Watcher。下面简单描述一下该升级方案的 rebalance 流程:
1、 当前 consumer 准备加入 consumer group 或 GroupCoordinator发生故障转移时,consumer 并不知道GroupCoordinator 的 host 和 port,所以 consumer 会向 Kafka 集群中的任一 broker 节点发送 FindCoordinatorRequest 请求,收到请求的 broker 节点会返回 ConsumerMetadataResponse 响应,其中就包含了负责管理该 Consumer Group 的 GroupCoordinator 的地址。
2、 接下来,consumer 会连接到 GroupCoordinator 节点,并周期性的发送心跳请求。GroupCoordinator 会通过心跳消费确定 consumer 是否正常在线,长时间收不到一个心跳信息时,GroupCoordinator 会认为 consumer 宕机了,就会为该 consumer group 触发新一轮的 Rebalance 操作。
3、 在 consumer 收到中带有 IllegalGeneration 异常的心跳响应时,就表明 GroupCoordinator 发起了 Rebalance 操作。此时 consumer 会向 GroupCoordinator 发送 JoinGroupRequest ,向 GroupCoordinator 表明自己要加入指定的Consumer Group。
4、 GroupCoordinator 等待一个 consumer group 中全部 consumer 都发送了 JoinGroupRequest 请求之后,就会结合Zookeeper 中的 partition 的元数据,进行 partition 的分配。
5、 GroupCoordinator 在分配完 partition 之后,会将 partition 与 consumer 之间的映射关系写入到 Zookeeper 中保存,同时还会将分配结果通过 JoinGroupResponse 返回给 consumer。
6、 consumer 根据根据 JoinGroupResponse 响应中的分配结果消费对应的 partition,同时会定时发送HeartbeatRequest 请求表明自己在线。如果后续出现 consumer 加入或下线、broker 上下线、partition 增加等状况时,GroupCoordinator 返回的 HeartbeatResponse 会包含 IllegalGeneration 异常,接下来就会进入步骤3。
上述方案看起来已经比较完美的了,但是有个问题是 rebalance 的策略是在 GroupCoordinator 中实现的,扩展性上多多少少有点问题,当我们要使用新 rebalance 策略时,需要修改 GroupCoordinator 的行为。
Eager Rebalance Protocol 升级
为了解决上述问题,Kafka 0.9 版本中进行了重新设计,将 partition 分配的工作放到了 consumer 这一端进行处理,Consumer Group 管理的工作则依然由GroupCoordinator 处理。
该版本的 rebalance 协议将 JoinGroupRequest 的处理过程拆分成了两个阶段,分别是 Join Group 阶段和 Synchronizing Group State 阶段。
1、 当 consumer 通过 FindCoordinatorRequest 查找到其 Consumer Group 对应的 GroupCoordinator 之后,就会进入 Join Group 阶段。下面是 FindCoordinatorRequest 和 FindCoordinatorResponse 的具体格式:
1 | // FindCoordinator请求结构 |
2、 Consumer 先向 GroupCoordinator 发送 JoinGroupRequest 请求,其中包含 consumer 的相关信息,如下图所示:
下面是 JoinGroupRequest 的格式:
1 | JoinGroup Request (Version: 7) => group_id session_timeout_ms rebalance_timeout_ms member_id group_instance_id protocol_type [protocols] TAG_BUFFER |
3、 GroupCoordinator 收到 JoinGroupRequest 后会暂存该 consumer 信息,然后等待全部 consumer 的 JoinGroupRequest 请求。JoinGroup Request 中的 session.timeout.ms
和 rebalance_timeout_ms
( max.poll.interval.ms
)是就是用来告诉 GroupCoordinator 等待多久的。
4、 GroupCoordinator 会根据全部 consumer 的 JoinGroupRequest 请求来确定 Consumer Group 中可用的 consumer,从中选取一个 consumer 成为 Group Leader,同时还会决定 partition 分配策略,最后会将这些信息封装成JoinGroupResponse 返回给 Group Leader Consumer,下面是 JoinGroupResponse 的具体格式。
1 | JoinGroup Response (Version: 7) => throttle_time_ms error_code generation_id protocol_type protocol_name leader member_id [members] TAG_BUFFER |
5、 每个 consumer 都会收到 JoinGroupResponse 响应,但是只有 Group Leader 收到的 JoinGroupResponse 响应中封装的所有 consumer 信息以及 Group Leader 信息。当其中一个 consumer 确定了自己的 Group Leader后,会根据 consumer 信息、kafka 集群元数据以及 partition 分配策略计算 partition 的分片结果。其他非 Group Leader consumer 收到 JoinResponse 为空响应,也就不会进行任何操作,只是原地等待。
6、 接下来,所有 consumer 进入 Synchronizing Group State 阶段,所有 consumer 会向 GroupCoordinator 发送 SyncGroupRequest。其中,Group Leader Consumer 的 SyncGroupRequest 请求包含了 partition 分配结果,普通 consumer 的 SyncGroupRequest 为空请求。
SyncGroup Request 请求的格式如下:
1 | SyncGroup Request (Version: 5) => group_id generation_id member_id group_instance_id protocol_type protocol_name [assignments] TAG_BUFFER |
7、 GroupCoordinator 接下来会将 partition 分配结果封装成 SyncGroupResponse 返回给所有 consumer。
SyncResponse 的格式如下:
1 | SyncGroup Response (Version: 5) => throttle_time_ms error_code protocol_type protocol_name assignment TAG_BUFFER |
8、 consumer 收到 SyncGroupResponse 后进行解析,就可以明确 partition 与 consumer 的映射关系。
8、 当然,后续 consumer 还是会与 GroupCoordinator 保持定期的心跳。触发 rebalance 的条件也是心跳响应中包含 IllegalGeneration 异常。
1 | Heartbeat Request (Version: 4) => group_id generation_id member_id group_instance_id TAG_BUFFER |
很明显,这次 rebalance 协议的升级是将 rebalance 的行为迁移到了 consumer 端,也就解决了 Eager Rebalance Protocol
最开始版本中的扩展性问题。
Stop The World
在 Eager Rebalance Protocol 中都存在一个比较严重的问题,那就是长时间的 Stop The World
,也就是在整个 rebalance 的过程中,所有 partition 都会被 回收
(revoke),consumer 是无法消费任何 partition 的。
例如,有一个 consumer 退出 consumer group 时,会发送 LeaveGroup Request 请求到 GroupCoordinator,如下图所示:
LeaveGroup Request 的格式如下:
1 | LeaveGroup Request (Version: 4) => group_id [members] TAG_BUFFER |
GroupCoordinator 将会在下个心跳响应中通知 consumer group 中剩余的 consumer 进行新一轮的 rebalance。在新一轮的 rebalance 完成之前,剩余的 consumer 都不会消费任何 message。有新 consumer 加入 consumer group 也是一样。
在实际生产中发布 consumer 是非常常见的操作,这样的话,每个consumer 实例的更新都会导致两次上述的 rebalance 操作,影响整个 consumer group 消费。
还有一个问题就是 consumer 出现长 GC 的时候,如下图所示,此时 GroupCoordinator 会因为长时间收不到 consumer 心跳,认为 consumer 下线,触发 rebalance。
Static Membership
为了解决上述问题,kafka 在 2.3 版本中引入了 Static Membership 协议,相关 JIRA 参考。
Static Membership 优化协议的核心是:
- 在 consumer 端增加
group.instance.id
配置(group.instance.id
是 consumer 的唯一标识)。如果 consumer 启动的时候明确指定了group.instance.id
配置值,consumer 会在 JoinGroup Request 中携带该值,表示该 consumer 为 static member。 为了保证group.instance.id
的唯一性,我们可以考虑使用 hostname、ip 等。 - 在 GroupCoordinator 端会记录 group.instance.id → member.id 的映射关系,以及已有的 partition 分配关系。当 GroupCoordinator 收到已知 group.instance.id 的 consumer 的 JoinGroup Request 时,不会进行 rebalance,而是将其原来对应的 partition 分配给它。
Static Membership 协议可以让 consumer group 只在下面的 4 种情况下进行 rebalance:
- 有新 consumer 加入 consumer group 时。
- Group Leader 重新加入 Group 时。
- consumer 下线时间超过阈值(
session.timeout.ms
) - GroupCoordinator 收到 static member 的 LeaveGroup Request 时。
这样的话,在使用 Static Membership 协议的场景下,只要在 consumer 重新部署的时候,不发送 LeaveGroup Request 且在 session.timeout.ms
时长内重启成功,就不会触发 rebalance。
Incremental Cooperative Rebalance
在 kafka 2.4 版本中,为了进一步减少 rebalance 带来的 Stop The World
,提出了 Incremental Cooperative Rebalance
协议。其核心思想就是使用将一次全局的 rebalance,改成多次小规模 rebalance,最终收敛到 rebalance 的状态。
在开始介绍 Incremental Cooperative Rebalance 协议之前,我们先来明确 Eager Rebalance 协议中回收(revoke)全部 partition 的根本原因 —— 一个 partition 只能分配给一个 consumer。如下图所示,Eager Rebalance 协议中,consumer 为了满足这个要求,在发送 JoinGroup Request 的时候就停止了所有 partition 的消费,直至收到 SyncGroup Response(也就是收到新的 partition 分配结果之后)。如果将分布式系统简化成一个多线程应用,整个 rebalance 过程就类似于一个内存屏障(Sync Barrier),用来同步所有 consumer 的状态。
说明完 rebalance 的本质之后,我们开始正式介绍 Incremental Cooperative Rebalance 协议,该协议最核心的思想就是:
- consumer 比较新旧两个 partition 分配结果,只停止消费回收(revoke)的 partition,对于两次都分配给自己的 partition,consumer 根本没有必要停止消费,这也就解决了
Stop The World
的问题。 - 通过多轮的局部 rebalance 来最终实现全局的 rebalance。下面会通过示例说明每轮 rebalance 都做了什么。
上图就展示了一个 consumer 在一次 rebalance 中比较操作: owned partitions
和 assigned partitions
分别是该 consumer 在 rebalance 前后要处理的 partition 集合,其中,consumer 在整个 rebalance 过程中无需停止对 unchanged partitions
集合中 partition 的消费。
介绍完 Incremental Cooperative Rebalance 协议的核心思想之后,我们通过示例来说明 Incremental Cooperative Rebalance 协议的工作原理。
A New Consumer Joins
如上图所示,当前有 consumer 1 和 consumer 2,分别消费 P1 ~ P3、P4~P6,6个 partition,此时 consumer3 加入到 consumer group 中,触发第一轮 rebalance:
consumer 3 会向 GroupCoordinator 发送 JoinGroupRequest 触发第一轮 rebalance。
GroupCoordinator 会在下一轮心跳响应中通知 consumer 1 和 consumer 2 需要进行 rebalance。
consumer 1 和 consumer 2 会将自己当前正在处理的 partition 信息封装到 JoinGroup Request 中(metadata 字段)发往 GroupCoordinator:
- consumer 1 发送的 JoinGroup Request(assigned: P1、P2、P3)
- consumer 2 发送的 JoinGroup Request(assigned: P4、P5、P6)。
- consumer 3 发送的 JoinGroup Request(assigned: )。
此时的 consumer 1 和 consumer 2 并不会停止对 partition 的消费。
经过 GroupCoordinator 处理之后,此次选举 consumer 1 作为 Group Leader,GroupCoordinator 发送给 consumer 1 的 JoinGroup Response 中同样包含各个 consumer 目前处理的 partition 信息:c1 ( P1、P2、P3 ),c2 ( P4、P5、P6)。
consumer 1 在进行 partition 分配的时候发现有三个 consumer 参与,决定把 P1、P2 继续分配给 consumer 1,P3 回收(revoke);把 P4、P5 继续分配给 consumer 2,P6 回收。consumer 1 在完成上述分配之后,会将分配结果封装成 SyncGroup Request 发送给 GroupCoordinator。
GroupCoordinator 会根据 consumer 1 的 SyncGroup Request,生成 SyncGroup Response 返回给三个 consumer:
- consumer 1 收到的 SyncGroup Response(assigned: P1、P2,revoked:P3)
- consumer 2 收到的 SyncGroup Response(assigned: P4、P5,revoked:P6)。
- consumer 3 收到的 SyncGroup Response(assigned: ,revoked:)。
到此为止,第一轮 rebalance 结束,在第一轮 rebalance 的过程中,consumer 1 依旧在消费 P1、P2、P3,consumer 2 依旧在消费 P4、P5、P6。
在 consumer 1 和 consumer 2 收到包含 revoked 的 SyncGroup Response 之后,会立刻停止对 P3 和 P6 的消费,并立即发起第二轮 rebalance。
- consumer 1 发送的 JoinGroup Request(assigned: P1、P2)
- consumer 2 发送的 JoinGroup Request(assigned: P4、P5)。
- consumer 3 发送的 JoinGroup Request(assigned: )。
接下来,GroupCoordinator 将收到的全部 JoinGroup Request 整理后,将目前的分配关系封装到 JoinGroup Response 中返回给 Group Leader(这里假设还是 consumer 1),其他非 Group Leader 的 consumer 得到空 JoinGroup Response。
consumer 1 此时发现该 topic 有 P1 到 P6 六个 partition,但目前 partition 只分配了 4 个 partition,还有一个 consumer 空闲,自然会选择将 P3、P6 分给 consumer 3。
经过 SyncGroup Request、SyncGruop Response 的交互之后:
- consumer 1 收到的 SyncGroup Response(assigned: P1、P2,revoked:)
- consumer 2 收到的 SyncGroup Response(assigned: P4、P5,revoked:)。
- consumer 3 收到的 SyncGroup Response(assigned: P3、P6 ,revoked:)。
在 consumer 3 收到该 SyncGroup Response 之后,会立刻开始消费 P3、P6。到此为止,第二轮 rebalance 结束,整个 rebalance 也完成了。
An Existing Consumer Bounces
如上图所示,当前有三个 consumer,consumer 2 离开 consumer group 且离开时间超过了 session.timeout 时长,此时 GroupCoordinator 会触发第一轮 rebalance。
首先,GroupCoordinator 会在下一轮心跳响应中通知 consumer 1 和 consumer 3 发起第一轮 rebalance。
consumer 1 和 consumer 3 会将自己当前正在处理的 partition 信息封装到 JoinGroup Request 中(metadata 字段)发往 GroupCoordinator:
- consumer 1 发送的 JoinGroup Request(assigned: P1、P2)
- consumer 3 发送的 JoinGroup Request(assigned: P3、P6)。
假设 GroupCoordinator 在这里选择 consumer 1 作为 Group Leader,GroupCoordinator 会将 partition 目前的分配状态通过 JoinGroup Response 发送给 consumer 1。
consumer 1 发现 P4、P5 两个 partition 并未出现(处于 lost 状态),此时 consumer 1 并不会立即解决当前的不平衡问题,返回的 partition 分配结果不变(同时会携带一个 delay 时间,
scheduled.rebalance.max.delay.ms
,默认 5 分钟)。GroupCoordinator 会根据 consumer 1 的 SyncGroup Request,生成 SyncGroup Response 返回给两个存活的 consumer:- consumer 1 收到的 SyncGroup Response(delay,assigned: P1、P2,revoked:)
- consumer 3 收到的 SyncGroup Response(delay,assigned: P3、P6,revoked:)。
到此为止,第一轮 rebalance 结束。整个 rebalance 过程中,consumer 1 和 consumer 3 并不会停止消费。
在
scheduled.rebalance.max.delay.ms
这个时间段内,consumer 2 故障恢复,重新加入到 consumer group 时,会向 GroupCoordinator 发送 JoinGroup Request,触发第二轮的 rebalance。GroupCoordinator 在下一次心跳响应中会通知 consumer 1 和 consumer 3 参与第二轮 rebalance。consumer 1 和 consumer 3 在收到心跳之后,会发送 JoinGroup Request 参与第二轮 rebalance:
- consumer 1 发送的 JoinGroup Request(assigned: P1、P2)
- consumer 3 发送的 JoinGroup Request(assigned: P3、P6)。
在此次第二轮 rebalance 中,consumer 1 依旧被选为 Group Leader,它会发现 delay 的时间(
scheduled.rebalance.max.delay.ms
)是否已经到了,如果没到,则依旧不会立即解决当前的不平衡问题,继续返回目前的分配结果:- consumer 1 收到的 SyncGroup Response(remaining delay,assigned: P1、P2,revoked:)
- consumer 2 收到的 SyncGroup Response(remaining delay,assigned:,revoked:)
- consumer 3 收到的 SyncGroup Response(remaining delay,assigned: P3、P6,revoked:)。
我们看到返回的 SyncGroup Response 中更新了 delay 的剩余时间(remaining delay = delay - pass_time)。到此为止,第二轮 rebalance 结束。整个 rebalance 过程中,consumer 1 和 consumer 3 并不会停止消费。
当 remaining delay 时间到期之后,consumer 全部重新送 JoinGroup Request,触发第三轮 rebalance:
- consumer 1 发送的 JoinGroup Request(assigned: P1、P2)
- consumer 2 发送的 JoinGroup Request(assigned: )
- consumer 3 发送的 JoinGroup Request(assigned: P3、P6)。
在此次 rebalance 中,consumer 1 依旧被选为 Group Leader,它会发现 delay 已经到期了,开始解决不平衡的问题,对 partition 进行重新分配。最新的分配结果最终通过 SyncGroup Response 返回到各个 consumer:
- consumer 1 收到的 SyncGroup Response(assigned:P1、P2,revoked:)
- consumer 2 收到的 SyncGroup Response(assigned:P4、P5,revoked:)
- consumer 3 收到的 SyncGroup Response(assigned:P3、P6,revoked:)。
到此为止,第三轮 rebalance 结束。整个 rebalance 过程中,consumer 1 和 consumer 3 的消费都不会停止。
An existing worker leaves permanently
如上图所示,依旧是三个 consumer,其中的 consumer 2 退出了 consumer group,与上面介绍的重启(bounces)场景不同的是:consumer 2 是退出 consumer group 的时间超过了 scheduled.rebalance.max.delay.ms
的值,其他 consumer 会认为其永久退出了,直接进行上图中第二轮 rebalance,处理不均衡的情况,这里就不再重复了。
最后,我们深入 Incremental Cooperative Rebalance
协议本质,如下图所示,它是使用多轮 rebalance 的方式来实现 Syncchronization Barrier 的效果,也就保证了不会一次回收(revoke)全部 consumer 的全部 partition,从而避免 Stop The World
。
总结
本课时重点介绍了 consumer group rebalance 协议的演进和各个版本协议的原理。
下一课时将正式开始分析 kafka consumer 的代码。
本课时相关文档和视频会同步到:
微信订阅号:杨四正
B站:杨四正