Consumer 基础入门
在 Kafka Clients 模块中不仅提供了前面介绍的 KafkaProducer,还提供了 KafkaConsumer 的实现,核心实现位于 org.apache.kafka.clients.consumer 包中。
废话不多说了,先来看一下 KafkaConsumer 的基础使用:
1 | public class ConsumerDemo { |
我们可以通过前文介绍的 ProducerDemo 发送 message,然后使用上面的 ConsumerDemo 来消费 message,这里就不再展示效果了。
Consumer 接口
了解了 KafkaConsumer 的基本使用之后,我们来看 KafkaConsumer 的具体实现,首先是其实现的接口 —— Consumer,对 Consumer 接口中方法进行分类之后,我们可以得到如下三大类:
- 拉取元数据(查 topic、partition、offset):
- listTopics() 方法:查询 kafka 集群中全部 topic 的元信息。
- partitionsFor() 方法:查询指定 topic 的元数据。
- groupMetadata() 方法:查询当前 consumer group 的元信息。
- beginningOffsets() 方法、endOffsets() 方法:查询指定 partition 的最小 offset 值(因为日志压缩和日志保留策略的存在,所以最小 offset 不一定是 0)以及最大 offset 值。
- offsetsForTimes() 方法:根据时间戳查询指定 partition 中的 offset 值,返回 offset 值是大于指定时间的最小 offset(
earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition
)。
- 控制消费行为:
- assign(
Collection<TopicPartition>
) 方法:用来手动给 consumer 分配其订阅的 topic 以及 partition 集合,该方法与 subscribe() 方法冲突。 - subscribe()方法:订阅指定的 topic,后续将由 kafka 集群通过 rebalance 协议为 consumer 自动分配 partition。
- commit()、commitAsync() 方法:提交(异步提交)当前 consumer 的消费位置(也就是其已消费完成的 message 的 offset 值),默认情况下,consumer 的消费位置存储在
__consumer_offsets
这个特殊的 topic 中。 - poll() 方法:从当前 consumer 负责消费的 partition 中批量拉取 message。
- seek(TopicPartition)方法:指定 consumer 从指定 partition 的哪个 offset 位置开始后续消费。
- pause()、resume()方法:暂停/继续 consumer,暂停后 poll() 方法会返回集合。
- assign(
- 获取消费装填:
- assignment():查询当前 consumer 消费的 partition。
- position(TopicPartition) 方法:获取 consumer 下次从指定 partition 拉取的 message 的 offset 值。
- committed() 方法:当前 consumer group 在指定 partition 的消费位置 。
ConsumerNetworkClient
在前面介绍 KafkaProducer 的时候详细介绍了 NetworkClient 的实现,它负责管理 Producer 与 kafka 集群中各个 broker 之间的网络连接,底层依赖 KSelector 来完成网络读写操作,通过一系列 handle*() 方法处理正常响应、超时以及网络异常等。
ConsumerNetworkClient 是专门为 Consumer 端提供的客户端,它底层依赖 NetworkClient 的上述功能来完成网络操作。依旧本着是先看数据结构,再看行为的思路,来看 ConsumerNetworkClient 的核心字段,如下:
send
作为一个 Client,ConsumerNetworkClient 自然提供了发送请求的方法 —— send() 方法,它会将待发送的请求封装成ClientRequest 对象,保存到unsent集合中等待发送,具体实现如下:
1 | public RequestFuture<ClientResponse> send(Node node, AbstractRequest.Builder<?> requestBuilder, |
我们可以看到,这里创建 ClientRequest 时指定的 callback 对象是 RequestFutureCompletionHandler,其中关联了发送请求对应的 RequestFuture 对象(future 字段), ClientResponse 对象(response 字段)以及相关的 RuntimeException(e 字段)。在其 onComplete() 方法中,会初始化 ClientResponse 对象并将当前 RequestFutureCompletionHandler 对象添加到 pendingCompletion 集合;onFailure() 方法中会初始化 Exception 并将当前 RequestFutureCompletionHandler 对象添加到 pendingCompletion 集合。
RequestFutureCompletionHandler 中的 future 字段是 RequestFuture 类型, RequestFuture 是 ConsumerNetworkClient 发出的一个异步请求的 Future 对象,其中的 result 字段(AtomicReference<Object>
类型)用来记录请求结果(具体 result 类型由 RequestFuture 的泛型指定),completedLatch 字段(CountDownLatch)用来实现 RequestFuture 的 awaitDone() 方法的阻塞效果,listeners 集合记录了(ConcurrentLinkedQueue)用来记录相关监听器。
在 RequestFutureCompletionHandler.fireCompletion() 方法中会根据 e 字段调用 RequestFuture 的相应方法,完成(或异常完成)该 RequestFuture,关闭 completedLatch,唤醒阻塞在其上的线程。
RequestFuture 中的方法都比较简单,熟悉 JDK Future 的胖友应该都能看懂。这里说一下 RequestFuture 中两个有意思的方法,一个是 compose(RequestFutureAdapter) 方法,它主要是将当前 RequestFuture 对象转换成另一个 RequestFuture 对象,目的是为了转换 result 类型,具体实现如下:
1 | public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter) { |
另一处是 chain() 方法,它是为了将多个有依赖关系的 RequestFuture 组成一个责任链,具体实现原理与 compose() 方法类似,也是通过监听器的方式实现的。
poll
ConsumerNetworkClient 中最核心的方法是 poll(Timer timer, PollCondition pollCondition, boolean disableWakeup) 方法,这三个参数的含义如下:
timer(Timer 类型):Timer 是一个辅助类,用来帮助 NetworkClient.poll() 方法这种阻塞的方法计算超时时间。
pollCondition(PollCondition 类型):PollCondition 类似于一个钩子方法,通过外部提供的 shouldBlock() 钩子方法来控制 ConsumerNetworkClient.poll() 是否该进行阻塞。这里说一下 PollCondition 接口,它只有两个实现:
RequestFuture 中的 shouldBlock() 方法实际就是 !isDone() 方法,也就是说,我们在 ConsumerNetworkClient.poll() 方法中根据某个请求的状态,决定是否阻塞等待其响应。
disableWakeup(boolean 类型):标识此次 poll() 方法是否是可被唤醒的,也就是,是否关注 wakeup 字段的值。如果 disableWakeup 字段设置为 true,则在 poll() 方法中不会关注 wakeup 字段值;如果设置为 false,则 wakeup 为 true,则抛出 WakeupException 并退出此次 poll() 方法调用。
好了,下面来看 poll() 方法的核心逻辑:
1、 加锁,ConsumerNetworkClient 是线程安全,毕竟有很多线程会并发使用 ConsumerNetworkClient 这个对象来发送请求并接收响应。
2、 执行 handlePendingDisconnects() 方法处理 pendingDisconnects 集合,此处会清理 UnsentRequests 中每个 Node 对应的 ClientRequest 集合,并触发每个 ClientRequest 关联的 RequestFutureCompletionHandler 的 onFailure() 方法。同时,还会调用 NetworkClient 的 disconnect() 方法,清理 InFlightRequests 集合并触发相关回调。
3、 执行 trySend() 方法,尝试发送 unsent 集合中的请求。具体逻辑是:遍历每个 Node 对应的 ClientRequest 列表,每次循环都调用 NetworkClient.ready() 方法检测当前是否具备发送条件,其中包括网络连接状态、InFlightRequests 队列状态等等。检测通过之后,则调用 NetworkClient.send() 方法将请求放入InFlightRequests 队里中等待响应,也放入 KafkaChannel 的 send 字段中等待发送。同时,会将请求从 unsent 中删除。
4、 接下来根据条件决定 NetworkClient.poll() 方法的阻塞时长,这里的条件有两个:
- pendingCompletion 集合是否为空,如果不为空,则表示有 RequestFutureCompletionHandler 等待执行,所以阻塞时长为 0。
- pollCondition.shouldBlock() 这个钩子方法的返回值,如果返回 false,则表示无需阻塞。其实就是看自己关注的请求是否已经完成了。
5、 在 NetworkClient.poll() 方法中会更新连接状态,在 poll() 方法结束之后,这里会再次检查连接状态并更新处理 unsent 集合中未发送的请求,具体逻辑与步骤1基本一致,不再重复。
6、 检查 disableWakeup、wakeup,决定是否抛出 WakeupException 异常。检查当前线程是否被中断,如果被中断了,则抛出 InterruptException 异常。
7、 再次调用 trySend() 方法,之所以再次调用 trySend() 方法,是因为经历了一次 poll() 之后,可能有的连接、InFlightRequests 等条件准备好了。
8、 调用 failExpiredRequests() 方法处理 unsent 集合中过期的请求,这里会调用过期请求关联的 RequestFutureCompletionHandler.onFailure() 方法。
9、 清空 unsent 集合。
10、 执行 firePendingCompletedRequests() 方法,触发 pendingCompletion 集合中全部的 RequestFutureCompletionHandler,这里不仅仅会完成 RequestFuture,还会触发注册在其上的监听器。
其他方法简介
- awaitMetadataUpdate()方法:循环调用 poll() 方法,直到 Metadata 更新完毕(版本号增加),实现阻塞等待Metadata 更新完成。
- awaitPendingRequests()方法:阻塞等待 unsent 和 InFightRequests 中的请求全部完成(正常完成或异常完成都可以)。
- leastLoadedNode() 方法:查找 kafka 集群中负载最低的Node。
- hasReadyNodes() 方法:查询当前 kafka 集群是否有 Node 可以发送请求。
- disconnectAsync() 方法:关闭与指定 Node 的连接,这里会写入 pendingDisconnects 集合,并调用 NetworkClient 的 wakeup() 方法,尽快处理断开的连接。
SubscriptionState
通过前面的介绍我们知道,在 consumer 启动之后会经历 rebalance 来确定自己消费的 partition,紧接着 consumer 会从 __consumer_offsets
这个内部 topic 拉取 offset 确认自己从哪个位置开始消费。
KafkaConsumer 在内存中使用 SubscriptionState 来维护自身消费的 topic、partition 以及 offset。下面来看 SubscriptionState 的核心字段:
这里我们来关注一下 assignment 字段,TopicPartitionState 用来记录当前 consumer 消费对应 partition 的消费状态,核心字段如下:
TopicPartitionState 提供了非常多的方法来读写上述字段,这里不再展开,没啥难度。
接下来看 PartitionStates,其中最核心的就是 map 字段(LinkedHashMap 类型),其中的 Key 是 TopicPartition,Value 是 TopicPartitionState,这里之所以使用 LinkedHashMap 类型的 Map,是为了更新 TopicPartition 的顺序,保证 Fetch Request 不会出现饥饿的状态,胖友们可以看一下 updateAndMoveToEnd() 、moveToEnd() 方法,都是用来调整 TopicPartition 消费状态顺序的。另外,PartitionStates 中提供了一些其他的方法都是用来读写 map 集合,不再多说。
到此为止,SubscriptionState 的核心数据结构就介绍完了。SubscriptionState 本身提供的行为基本都是用来修改上述字段的,这里不再展开了。
ConsumerCoordinator
在 KafkaConsumer 中通过 ConsumerCoordinator 实现与服务端的 GroupCoordinator 的完成 rebalance 协议中的一系列交互,ConsumerCoordinator 继承了 AbstractCoordinator 抽象类。AbstractCoordinator 的核心字段如下: