通过上一课时的介绍我们了解到,业务线程使用 KafkaProducer.send() 方法发送 message 的时候,会先将其写入RecordAccumulator 中进行缓冲,当 RecordAccumulator 中缓存的 message 达到一定阈值的时候,会由 IO 线程批量形成请求,发送到 kafka 集群。本课时我们就重点来看一下 RecordAccumulator 这个缓冲区的结构。
首先,我们从上图中可以看出,RecordAccumulator 会由业务线程写入、Sender 线程读取,这是一个非常明显的生产者-消费者模式,所以我们需要保证 RecordAccumulator 是线程安全的。
RecordAccumulator 中维护了一个 ConcurrentMap<TopicPartition, Deque<ProducerBatch>>
类型的集合,其中的 Key 是 TopicPartition 用来表示目标 partition,Value 是 ArrayDeque<ProducerBatch>
队列,用来缓冲发往目标 partition 的消息。 这里的 ArrayDeque 并不是线程安全的集合,后面我们会看到加锁的相关操作。
在每个 ProducerBatch 中都维护了一个 MemoryRecordsBuilder 对象,MemoryRecordsBuilder 才是真正存储 message 的地方。RecordAccumulator 、ProducerBatch、MemoryRecordsBuilder 这三个核心类的关系如下图所示:
message 格式
既然我们准备深入 KafkaProducer 进行分析,那我们就需要了解 message 在 kafka 内部的格式,而不是简单知道 message 是个 KV。kafka 目前的 message 的格式有三个版本:
- V0:kafka0.10 版本之前
- V1:kafka 0.10 ~ 0.11 版本
- V2:kafka 0.11.0 之后的版本
V0 版本
在使用 V0 版本的 message 时,message 在 RecordAccumulator 中只是简单的堆积,并没有进行聚合,每个 message 都有独立的元信息,如下图所示:
其中唯一要说明就是 attributes 部分,其中的低 3 位用来标识当前使用的压缩算法,高 5 位没有使用。
V1 版本
V1 版本 与 V0 版本的格式基本类似,就是多了一个 timestamp 字段,具体结构如下:
其中 attributes 部分的低 3 位依旧用来标识当前使用的压缩算法,第 4 位用来标识时间戳的类型。
在 V1 版本中引入 timestamp 主要是为了接口下面几个问题:
- 更准确的日志保留策略。在前面我们已经简单描述过了根据 message 存在时间的保留策略,在使用 V0 版本的时候,kafka broker 会直接根据磁盘上的 segment 文件的最后修改时间来判断是否执行删除操作,但是这种方案比较大的弊端就是如果发生 replica 迁移或是 replica 扩容,新增加 replica 中的 segment 文件就都是新创建的,其中包含的旧 message 就不会被删除。
- 更准确的日志切分策略。在前面我们已经提到过 segment 文件会定时、定量进行切分,在 V0 版本使用 segment 创建时间进行切分的话,也会存在上述同样的问题,导致出现单个大文件,也可能因为没有 message 写入,切分出很小的 segment 文件。
V1 版本中的压缩
对于常见压缩算法来说,压缩内容越多,压缩效果比例越高。但是单条 message 的长度一般都不会特别长,如果要让我们来解决这个矛盾的话,就是将多条 message 放到一起再压缩。kafka 也确实是这么干的,在 V1 版本中,kafka 使用了 wrapper message 的方式来提高压缩效率。简单理解,wrapper message 也是一条 message,但是其中的 value 值则是多条普通 message 组成的 message 集合,这些内部的普通 message 也称为 inner message。如下图所示:
为了进一步降低 message 的无效负载,kafka 只在 wrapper message 中记录完整的 offset 值,inner message 中的 offset 只是相对于 wrapper message offset 的一个偏移量,如下图所示:
当 wrapper message 发送到 kafka broker 之后,broker 无需进行解压缩,直接存储即可,当 consumer 拉取 message 的时候,也是原封不动的进行传递,真正的解压缩在 consumer 完成,这样就可以节省 broker 解压缩和重新压缩的资源。
再谈 V1 版本中的时间戳
V1 版本 message 中的 timestamp 类型由 attributes 中的第 4 位标识,有 CreateTime 和 LogAppendTime 两种类型:
- CreateTime:timestamp 字段中记录的是 producer 生产这条 message 的时间戳
- LogAppendTime:timestamp 字段中记录的是 broker 将该 message 写入 segment 文件的时间戳。
在 producer 生成 message 的时候,message 中的时间戳是 CreateTime,wrapper message 中的 timestamp 是所有 inner message timestamp 的最大值。
当 message 传递到 broker 的时候,broker 会按照自身的 log.message.timestamp.type 配置(或 topic 的 message.timestamp.type 配置)(默认值为CreateTime)修改 wrapper message 的时间戳。如果 broker 使用的是 CreateTime,我们还可以设置 max.message.time.difference.ms 参数,当 message 中的时间戳与 broker 本地时间之差大于该配置值时,broker 会拒绝写入这条 message。
如果 broker 或是 topic 使用 LogAppendTime,那么会将 broker 本地时间直接设置到 message 的 timestamp 字段中,并将 attributes 中的 timestamp type 位修改为 1。如果是压缩 message,只会修改 wrapper message 中的 timestamp 和 timestamp type,不会修改 inner message,这是为了避免解压缩和重新压缩。也就是说,broker 只关心 wrapper message 的时间戳,忽略 inner message 的时间戳。
当 message 被拉去到 consumer 的时候,consumer 只会根据 timestampe type 的值进行处理。如果 wrapper message 为 CreateTime,则 consumer 使用 inner message 的 timestamp 作为 CreateTime;如果 wrapper message 为 LogAppendTime ,则 consumer 使用 wrapper message 作为所有 inner message 的 LogAppendTime,忽略 inner message 的 timestamp 值。
最后,message 中的 timestamp 也是时间戳索引的重要基础,这个我们后面介绍 broker 的时候,详细介绍。
V2 版本
在 kafka 0.11 版本之后,开始使用 V2 版本的 message 格式,同时也兼容 V0、V1 版本的 message,当然,使用旧版本的 message 也就无法使用 kafka 中的一些新特性。
V2 版本的 message 格式参考了 Protocol Buffer 的一些特性,引入了Varints(变长整型)和 ZigZag 编码,其中,Varints 是使用一个或多个字节来序列化整数的一种方法,数值越小,占用的字节数就越少,说白了,还是为了减少 message 的体积。ZigZag 编码是为了解决 Varints 对负数编码效率低的问题,ZigZag 会将有符号整数映射为无符号整数,从而提高 Varints 对绝对值较小的负数的编码效率 ,如下图所示:
了解了 V2 版本格式的理论基础之后,我们来看 V2 中message 的格式(也被称为 Record):
其中需要关注的是,所有标识长度的字段都是 varint(或 varlong),也就是变长字段,timestamp 和 offset 都是 delta 值,也就是偏移量。另外,就是 attribute 字段中的所有位都废弃了,并添加 header 扩展。
除了基础的 Record 格式之外,V2 版本中还定义了一个 Record Batch 的结构,同学们可以对比 V1 版本格式,Record 是内层结构,Record Batch 是外层结构,如下图所示:
Record Batch 中包含的字段非常多,我们一个个来看:
- baseOffset:当前 RecordBatch 的起始位移,Record 中的 offset delta 与该 baseOffset 相加才能得到真正的 offset 值。当 RecordBatch 还在 producer 端的时候,offset 是 producer 分配的一个值,不是 partition 分配的,别搞混了。
- batchLength:RecordBatch 的总长度。
- partitionLeaderEpoch:用于标记目标 partition 中 leader replica 的纪元信息,后面介绍具体实现时会再次看到该值的相关实现。
- magic:V2 版本中的魔数值 2。
- crc 校验码:参与校验的部分是从属性值到 RecordBatch 末尾的全部数据,partitionLeaderEpoch 不在 CRC 里面是因为每次 broker 收到 RecordBatch 的时候,都会赋值 partitionLeaderEpoch,如果包含在 CRC 里面会导致需要重新计算CRC。这个实现后面会说。
- attributes:从 V1 版本中的 8 位扩展到 16 位,0~2 位表示压缩类型,第 3 位表示时间戳类型,第 4 位表示是否是事务型记录。所谓“事务”是Kafka的新功能,开启事务之后,只有在事务提交之后,事务型 consumer 才可以看到记录。5表示是否是 Control Record,这类记录总是单条出现,被包含在一个 control record batch 里面,它可以用于标记“事务是否已经提交”、“事务是否已经中止” 等,它只会在 broker 内处理,不会被传输给 consumer 和 producer,即对客户端是透明的。
- lastOffsetDelta:RecordBatch 最后一个 Record 的相对位移,用于 broker 确认 RecordBatch 中 Records 的组装正确性。
- firstTimestamp:RecordBatch 第一条 Record 的时间戳。
- maxTimestamp:RecordBatch 中最大的时间戳,一般情况下是最后一条消息的时间戳,用于 broker 确认RecordBatch 中 Records 的组装正确性。
- producer id:生产者编号,用于支持幂等性(Exactly Once 语义),参考KIP-98 - Exactly Once Delivery and Transactional Messaging。
- producer epoch:生产者纪元,用于支持幂等性(Exactly Once 语义)。
- base sequence:基础序号,用于支持幂等性(Exactly Once 语义),用于校验是否是重复 Record。
- records count:Record 的数量。
通过分析 V2 版本的消息格式我们知道,kafka message 不仅提供了类似事务、幂等等新功能,还对空间占用提供了足够的优化,总体提升很大
MemoryRecordsBuilder
了解了 kafka message 格式的演变之后,我们回到 KafkaProducer 的代码。
每个 MemoryRecordsBuilder 底层依赖一个 ByteBuffer 完成 message 的存储,我们后面会深入介绍 KafkaProducer 对 ByteBuffer 的管理。在 MemoryRecordsBuilder 中会将 ByteBuffer 封装成 ByteBufferOutputStream,ByteBufferOutputStream 实现了 OutputStream,这样我们就可以按照流的方式写入数据了。同时,ByteBufferOutputStream 提供了自动扩容底层 ByteBuffer 的能力。
还有一个需要关注的是 compressionType 字段,它用来指定当前 MemoryRecordsBuilder 使用哪种压缩算法来压缩 ByteBuffer 中的数据,kafka 目前已支持的压缩算法有:GZIP、SNAPPY、LZ4、ZSTD 四种,注意:只有 kafka V2 版本协议支持 ZSTD 压缩算法。
1 | public MemoryRecordsBuilder(ByteBuffer buffer,...) { // 省略其他参数 |
这样,我们得到的 appendStream 就如下图所示:
了解了 MemoryRecordsBuilder 底层的存储方式之后,我们来看 MemoryRecordsBuilder 的核心方法。首先是 appendWithOffset() 方法,逻辑并不复杂,需要明确的是 ProducerBatch 对标的 V2 中的 RecordBatch,我们写入的数据对标 V2 中的 Record:
1 | public Long append(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) { |
appendDefaultRecord() 方法中会计算 Record 中的 offsetDelta、timestampDelta,然后完成 Record 写入,最后更新RecordBatch 的元数据,具体实现如下:
1 | private void appendDefaultRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value, |
MemoryRecordsBuilder 中另一个需要关注的方法是 hasRoomFor() 方法,它主要用来估计当前 MemoryRecordsBuilder 是否还有空间来容纳此次写入的 Record,具体实现如下:
1 | public boolean hasRoomFor(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) { |
ProducerBatch
接下来我们向上走一层,来看 ProducerBatch 的实现,其中最核心方法是 tryAppend() 方法,核心步骤如下:
- 通过 MemoryRecordsBuilder的hasRoomFor() 方法检查当前 ProducerBatch 是否还有足够的空间来存储此次待写入的 Record。
- 调用 MemoryRecordsBuilder.append() 方法将 Record 追加到底层的 ByteBuffer 中。
- 创建 FutureRecordMetadata 对象,FutureRecordMetadata 继承了 Future 接口,对应此次 Record 的发送。
- 将 FutureRecordMetadata 对象以及 Record 关联的 Callback 回调封装成Thunk 对象,记录到 thunks (List
类型)中。
下面是 ProducerBatch.tryAppend() 方法的具体实现:
1 | public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) { |
除了 MemoryRecordsBuilder 之外,ProducerBatch 中还记录了很多其他关键信息:
这里我们先来关注 ProduceRequestResult 这个类,其中维护了一个 CountDownLatch 对象(count 值为 1),实现了类似于Future的功能。当 ProducerBatch 形成的请求被 broker 端响应(正常响应、超时、异常响应)或是 KafkaProducer 关闭的时候,都会调用 ProduceRequestResult.done() 方法,该方法就会调用 CountDownLatch 对象的 countDown() 方法唤醒阻塞在 CountDownLatch 对象的 await() 方法的线程。这些线程后续可以通过 ProduceRequestResult 的 error 字段来判断此次请求是成功还是失败。
在 ProduceRequestResult 中还有一个 baseOffset 字段,用来记录 broker 端为关联 ProducerBatch 中第一条 Record 分配的 offset 值,这样,每个 Record 的真实 offset 就可以根据自身在 ProducerBatch 的位置计算出来了(Record 的真实 offset = ProduceRequestResult.baseOffset + relativeOffset)。
接下来看 FutureRecordMetadata,它实现了 JDK 中的 Future 接口,表示一个 Record 的状态。FutureRecordMetadata 中除了维护一个关联的 ProduceRequestResult 对象之外,还维护了一个 relativeOffset 字段,relativeOffset 用来记录对应 Record 在 ProducerBatch 中的偏移量。
在 FutureRecordMetadata 中,有两个值得注意的方法,一个是 get() 方法,其中会依赖ProduceRequestResult中的CountDown来实现阻塞等待,并调用 value() 方法返回 RecordMetadata 对象:
1 | public RecordMetadata get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { |
另一个是 value() 方法,其中会将 partition信息、baseOffset、relativeOffset、时间戳(LogAppendTime或CreateTime)等信息封装成 RecordMetadata 对象返回:
1 | RecordMetadata value() { |
最后来看 ProducerBatch 中的 thunks 集合,其中的每个 Thunk 对象对应一个 Record 对象,在 Thunk 对象中记录了对应 Record 关联的 Callback 对象以及关联的 FutureRecordMetadata 对象。
了解了 ProducerBatch 写入数据的相关内容之后,我们回到 ProducerBatch 来关注其 done() 方法。当 KafkaProducer 收到 ProducerBatch 对应的正常响应、或超时、或关闭生产者时,都会调用 ProducerBatch 的 done()方法。在 done() 方法中,ProducerBatch 首先会更新 finalState 状态,然后调用 completeFutureAndFireCallbacks() 方法触发各个 Record 的 Callback 回调,具体实现如下:
1 | public boolean done(long baseOffset, long logAppendTime, RuntimeException exception) { |
在 completeFutureAndFireCallbacks() 方法中,会遍历 thunks 集合触发每个 Record 的 Callback,更新 ProduceRequestResult 中的 baseOffset、logAppendTime、error字段,并调用其 done() 方法释放阻塞在其上的线程,具体实现如下:
1 | private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) { |
BufferPool
前面提到,MemoryRecordsBuilder 底层使用 ByteBuffer 来存储写入的 Record 数据,但是创建 ByteBuffer 对象本身是一种比较消耗资源的行为,所以 KafkaProducer 使用 BufferPool 来实现 ByteBuffer 的统一管理。BufferPool 说白了就是一个 ByteBuffer 的资源池,当需要 ByteBuffer 的时候,我们就从其中获取,当使用完成之后,就将 ByteBuffer 归还到 BufferPool 中。
BufferPool 是一个比较简单的资源池实现,它只会针对特定大小(poolableSize 字段)的 ByteBuffer 进行管理,对于其他大小的 ByteBuffer 选择视而不见(Netty 里面 Buffer 池更加复杂,之后介绍 Netty 源码的时候会详细说)。
一般情况下,我们会调整 ProducerBatch 的大小(batch.size 配置(指定 Record 个数)* 单个 Record 的预估大小),使每个 ProducerBatch 可以缓存多条 Record。但当出现一条 Record 的字节数大于整个 ProducerBatch 的意外情况时,就不会尝试从 BufferPool 申请 ByteBuffer,而是直接新分配 ByteBuffer 对象,待其被使用完后直接丢弃由GC回收。
下面来看一下 BufferPool 的核心字段:
BufferPool 分配 ByteBuffer 的核心逻辑位于 allocate() 方法中,逻辑并不复杂,直接上代码和注释:
1 | public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException { |
从 BufferPool 中分配出来的 ByteBuffer 在使用之后,会调用 deallocate() 方法来释放,具体实现如下:
1 | public void deallocate(ByteBuffer buffer, int size) { |
RecordAccumulator
分析完 MemoryRecordsBuilder、ProducerBatch 以及 BufferPool 与写入相关的方法之后,我们再来看 RecordAccumulator 的实现。
分析一个类的时候,还是要先看其数据结构,然后再来看其行为(方法)。RecordAccumulator 中的关键字段如下:
在前面分析 KafkaProducer.doSend() 方法发送 message 的时候,直接调用了 RecordsAccumulator.append() 方法,这也是调用 ProducerBatch.tryAppend() 方法将消息追加到底层 MemoryRecordsBuilder 的地方。下面我们就来看 RecordAccumulator.append() 方法的核心逻辑:
- 在 batches 集合中查找目标 partition 对应的 ArrayDeque
集合,如果查找失败,则创建新的ArrayDeque ,并添加到 batches 集合中。 - 对步骤 1 中拿到的 ArrayDeque
集合进行加锁。这里使用 synchronized 代码块进行加锁。 - 执行 tryAppend() 方法,尝试向 ArrayDeque
中的最后一个 ProducerBatch 写入 Record。
1 | private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, Deque<ProducerBatch> deque, long nowMs) { |
- synchronized 块执行结束,自动释放锁。
- 如果步骤 3 中的追加操作成功,则返回 RecordAppendResult。
- 如果步骤 3 中的追加 Record 失败,则可能是因为当前使用的 ProducerBatch 已经被填满了。这里会判断abortOnNewBatch 参数是否为 true,如果是的话,会立即返回 RecordAppendResult 结果(其中的 abortForNewBatch 字段设置为 true),返回的 RecordAppendResult 中如果 abortForNewBatch 为true,会再触发一次 RecordAccumulator.append()方法。
- 如果 abortForNewBatch 参数不为 true,则会开始从 BufferPool 中分配新的 ByteBuffer,并封装成新的 ProducerBatch 对象。
- 再次对 ArrayDeque
加锁,并尝试将 Record 追加到新建的 ProducerBatch 中,同时将新建的ProducerBatch追加到对应的 Deque 尾部。 - 将新建的 ProducerBatch 添加到 incomplete集合中。synchronized块结束,自动解锁。
- 返回 RecordAppendResult,RecordAppendResult 会中 batchIsFull 字段和 newBatchCreated 字段会作为唤醒Sender 线程的条件。KafkaProducer.doSend() 方法中唤醒 Sender 线程的代码片段如下:
1 | if (result.batchIsFull || result.newBatchCreated) { |
下面来看 RecordAccumulator.append() 方法的具体实现:
1 | public RecordAppendResult append(TopicPartition tp, long timestamp, |
这里我们清晰的看到,对 ArrayDeque
除了两次 ArrayDeque 加锁操作,我们还看到第二次加锁后重试,这主要是为了防止多个线程并发从 BufferPool 申请空间后,造成内部碎片。这种场景如下图所示,线程 A 发现最后一个 ProducerBatch 空间不足,申请空间并创建一个新ProducerBatch 对象添加到 ArrayDeque 的尾部,然后线程 B 与线程 A 并发执行,也将新创建一个 ProducerBatch 添加到ArrayDeque 尾部。从上面 tryAppend() 方法的逻辑中我们可以看到,后续的写入只会在 ArrayDeque 尾部的 ProducerBatch 上进行,这样就会导致下图中的 ProducerBatch3 不再被写入,从而出现内部碎片:
了解了 RecordAccumulator 对 Record 写入的支持之后,我们再来看 RecordAccumulator.ready()方法,它是 Sender 线程发送 Record 到 kafka broker 之前被调用的,该方法会根据集群元数据,获取能够接收待发送 Record 的节点集合,具体筛选条件如下:
- batchs 集合中的 ArrayDeque 中有多个 RecordBatch 或是第一个 RecordBatch 是否满了。
- 等待时间是否足够长。这主要是两个方面,如果有重试的话,需要超过 retryBackoffMs 的退避时长;如果没有重试的话,需要超过 linger.ms 配置指定的等待时长(linger.ms 默认是 0)。
- 是否有其他线程在等待 BufferPool 释放空间。
- 是否有线程调用了 flush() 方法,正在等待 flush 操作完成。
下面来看是 ready 方法的代码,它会遍历batches集合中每个分区,首先查找目标 partition 的 leader replica 所在的Node,只有知道 Node 信息,KafkaProducer 才知道往哪里发。然后针对每个 ArrayDeque 进行处理,如果满足上述四个条件,则将对应的 Node 信息记录到 readyNodes 集合中。最后,ready() 方法返回的是 ReadyCheckResult 对象,其中记录了满足发送条件的 Node 集合、在遍历过程中找不到 leader replica 的 topic 以及下次调用 ready() 方法进行检查的时间间隔。
1 | public ReadyCheckResult ready(Cluster cluster, long nowMs) { |
调用 RecordAccumulator.ready() 方法得到的 readyNodes 集合后,此集合还要经过 NetworkClient 的过滤(在介绍Sender线程的时候再详细介绍)之后,才能得到最终能够发送消息的 Node 集合。
之后,Sender 线程会调用 RecordAccumulator.drain() 方法会根据上述 Node 集合获取要发送的 ProducerBatch,返回Map<Integer, List
1 | public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) { |
总结
本课时首先介绍了 kafka 中 message 格式的演变,详细分析了 V0、V1、V2 三个版本 message 的格式变迁。
然后介绍了 KafkaProducer 中 RecordAccumulator 相关的核心内容,它是业务线程和 Sender 线程之间数据的中转站,主要涉及到了 MemoryRecordsBuilder、ProducerBatch、BufferPool 等底层组件,以及 RecordAccumulator 的核心方法。
下一课时,我们将开始介绍 KafkaProducer 中 Sender 线程相关的内容。
本课时的文章和视频讲解,还会放到:
微信公众号:
B 站搜索:杨四正