深潜 kafka producer —— 核心架构
kafka 自定义了一套网络协议,我们可以使用任意语言来实现这套协议,实现向 kafka 集群 push message 以及从 kafka 集群 pull message 的效果。在 kafka 2.8.0 版本的源码中的 clients 模块就是官方默认提供的 Java 版本 producer、consumer 实现,我们本课时重点关注其中的 producer 部分实现。
kafka producer 示例演示
按照国际惯例,先来一个 demo 示例,带同学们了解一下 kafka Producer 的基本使用,示例的具体代码如下:
1 | public class ProducerDemo { |
在执行 ProducerDemo 之前,我们执行kafka-console-consumer.sh
命令启动命令行 consumer:
1 | ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_topic |
然后执行 ProducerDemo 可以在控制台看到如下输出:
在 kafka-console-consumer.sh
命令行中看到如下输出:
kafka producer 架构概述
了解了 kafka producer 的基本使用之后,我们开始深入 producer 的架构进行介绍,千言万语不及不急一张图,下图就是 kafka producer 的核心架构:
这里描述一下上图中涉及到的核心组件在,这里涉及到两个线程,一个是我们的业务线程(也就是图中的主线程),另一个是 Sender 线程,我们一个个来说。
首先是主线程的逻辑:
- 首先是 ProducerInterceptors 对 message 进行过滤或是修改。
- 使用 Serializer 对 message 的 key 和 value 进行序列化。
- Partitioner 为根据一定策略为 message 选择合适的 partition。
- 将 message 封装成 ProducerRecord 写入到 RecordAccumulator 中暂存,RecordAccumulator 对象中维护了多个队列,可以看做是 message 的缓冲区,用来实现 message 的批量发送。
下面来看 Sender 线程的逻辑:
- Sender 线程从 RecordAccumulator 中批量获取 message 数据,构造 ClientRequest。
- 将构造好的 ClientRequest 交给 NetworkClient 客户端发送。
- NetworkClient 客户端将请求放入KafkaChannel的缓存。
- NetworkClient 执行网络 I/O,完成请求的发送。
- NetworkClient 收到响应,调用 ClientRequest 的回调函数,最终触发每个 message 上注册的回调函数。
KafkaProducer.send() 核心
介绍完 kafka producer 的核心架构和流程之后,我们开始深入分析 KafkaProducer.send() 方法,即主线程的核心逻辑,还是开局一张图,后面都好说:
下面来描述一下 KafkaProducer.send() 方法的核心逻辑,也就是上图的核心步骤:
- 主线程首先会调用 ProducerInterceptors.onSend() 方法,对 message 进行拦截或是修改。这里
- 然后,通过 waitOnMetadata()方法更新 Kafka 集群的信息,其底层实际上是通过唤醒 Sender 线程来更新 Metadata,Metadata 中保存的是 Kafka 集群元数据。
- 接下来,执行 Serializer.serialize()方法完成 message key 和 value 的序列化。
- 随后调用 partition() 为 message 选择合适的 partition。
- 调用 append()方法,将 message 写入到 RecordAccumulator 中暂存。
- 最后,唤醒 Sender 线程,后续就由 Sender 线程从 RecordAccumulator 中批量发送 message 到 kafka 集群。
ProducerInterceptor
首先来看 ProducerInterceptors,其中维护了一个 ProducerInterceptor 集合,其 onSend()方法、onAcknowledgement()方法、onSendError()方法,实际上,是循环调用 ProducerInterceptor 集合的方法。
我们可以通过实现 ProducerInterceptor 接口的 onSend() 方法来拦截或修改待发送的 message,也可以通过实现 onAcknowledgement()方法、onSendError()方法先于用户的 Callback,对kafka集群响应进行预处理。
Kafka Metadata
在我们通过 KafkaProducer 发送 message 的时候,我们只明确指定了 message 要写入哪个 topic ,并没有明确指定要写入的 partition。
但是同一个 topic 的 partition 可能位于 kafka 的不同 broker 上,所以 producer 需要明确的知道该 topic 下所有 partition 的元信息(即所在 broker 的 IP、端口等信息),这样才能与 partition 所在 broker 建立网络连接并发送 message。
在 KafkaProducer 中,使用 Node、TopicPartition、PartitionInfo 三个类来记录 Kafka 集群元数据:
- Node 表示 kafka 集群中的一个节点,其中维护了节点的 host、ip、port 等基础信息。
- TopicPartition 用来抽象一个 topic 中的的一个 partition,其中维护 topic 的名称以及 partition 的编号信息。
- PartitionInfo 用来抽象一个 partition 的信息,其中:
- leader 字段记录了 leader replica 所在节点的 id
- replica 字段记录了全部 replica 所在的节点信息
- inSyncReplicas 字段记录了ISR集合中所有replica 所在的节点信息。
kafka producer 会将上述三个维度的基础信息封装成 Cluster 对象使用,下面是 Cluster 包含的信息:
再向上一层,Cluster对象会被维护到Metadata中,Metadata同时还维护了Cluster的版本号、过期时间、监听器等等信息,如下图所示:
经过上面的分析,我们可以得到下面这张简图:
静态数据结构分析完了之后,我们来看 KafkaProducer.waitOnMetadata()方法是如何工作的:
1 | private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long nowMs, long maxWaitMs) throws InterruptedException { |
这里具体如何更新元数据,我们将在介绍 Sender 线程工作流程的时候,详细分析。
序列化器
分布式系统中各个节点相互通信,必然涉及到内存对象与字节流之间的转换,也就是序列化与反序列化。
kafka 中的序列化器接口是 Serializer,负责将对象转换成字节数组;反序列化器是 Deserializer 接口,负责将字节数组转换成内存中的对象。
下面展示了 Serializer 和 Deserializer 接口的实现类:
从上图中我们可以看出,kafka 自带了常用 Java 类型的 Serializer 实现和 Deserializer 实现,当然,我们也可以自定义Serializer和Deserializer实现来处理复杂类型。
下面我们以 StringSerializer 实现为例说明一下 Serializer 的核心实现:
- configure()方法是在执行序列化操作之前的配置,例如,在StringSerializer.configure()方法中会选择合适的编码类型(encoding),默认是UTF-8
- serializer()方法是真正进行序列化的地方,将传入的Java对象序列化为byte[]。
partition选择
在 waitOnMetadata() 方法拿到最新的集群元数据之后,下面就要开始确定待发送的 message 要发送到哪个 partition 了。
如果我们明确指定了目标 partition,则以用户指定的为准,但是一般情况下,业务并不会指定 message 需要写入到哪个 partition,此时就会通过 Partitioner 结合 集群元数据计算出一个目标 partition。
下图展示了 Partitioner 接口的全部实现:
从名字也能看出,DefaultPartitioner 是默认实现,其中的 partition() 方法中:
- 如果 message 存在的 key 的话,则取 key 的 hash 值(使用的是murmur2这种高效率低碰撞的Hash算法),然后与 partition 总数取模,得到目标 partition 编号,这样可以保证同一 key 的 message 进入同一 partition。
- 如果 message 没有 key,则通过 StickyPartitionCache.partition() 方法计算目标 partition。
这里解释一下 StickyPartitionCache 的功能。我们前面介绍整个 KafkaProducer 流程的时候说过,RecordAccumulator 是一个缓冲区,主线程发送的 message 会先进入 RecordAccumulator,然后 Sender 线程攒够了 message 的时候进行批量发送。
触发 Sender 线程批量发送堆积 message 的条件主要有两方面:
- message 的延迟时间到了,也就是说,我们的业务场景对 message 发送有延迟要求,message 不能一直在 producer 端缓存。我们可以通过 linger.ms 配置降低 message 的发送延迟。
- message 堆积的足够多,达到了一定阈值,才适合批量发送,这样有效负载较高。批量发送的 batch.size 默认值是 16KB。
StickyPartitionCache 主要实现的是”黏性选择”,就是尽可能的先往一个 partition 发送 message,让发往这个 partition 的缓冲区快速填满,这样的话,就可以降低 message 的发送延迟。我们不用担心出现 partition 数据量不均衡的情况,因为只要业务运行时间足够长,message 还是会均匀的发送到每个 partition 上的。
下面来看 StickyPartitionCache 的实现,其中维护了一个 ConcurrentMap(indexCache字段),key 是 topic,value 是当前黏住了哪个 partition。
在 partition() 方法中,StickyPartitionCache 会先从 indexCache 字段中获取黏住的 partition,如果没有,则调用 nextPartition() 方法向 indexCache 中写入一个。在 nextPartition() 方法中,会先获取目标 topic 中可用的 partition,并从中随机选择一个写入 indexCache。
最后,同学们可能问,什么时候更新黏住的 partition 呢?我们看一下 KafkaProducer.doSend()方法中,有这么一个片段:
1 | // 尝试向RecordAccumulator中追加message |
RecordAccumulator.append()方法我们后面分析。
UniformStickyPartitioner 这个 Partitioner 底层也是依赖 StickyPartitionCache 实现黏性发送的,不再展开介绍。
再来看 RoundRobinPartitioner 实现,从名字也可以看出,它是按照轮训的策略来计算目标 partition,其中也维护了一个 ConcurrentMap 集合(topicCounterMap字段),其中的 key 是 topic 的名称,value 是一个递增的 AtomicInteger。
在 RoundRobinPartitioner.partition() 方法中,会先查找目标 topic 的 partition 总数,然后自增上述 AtomicInteger 值并与 partition 总数取模,得到目标 partition 的编号。
总结
本课时我们首先介绍了 KafkaProducer 的基础使用,然后介绍了 KafkaProducer 的核心架构,最后介绍了 KafkaProducer.send() 方法中主线程的核心操作。
下一课时,我们将开始介绍 KafkaProducer 中 RecordAccumulator 相关的内容。
本课时的文章和视频讲解,还会放到:
微信公众号:
B 站搜索:杨四正