kafka 2.8.0 源码环境搭建
安装 JDK
JDK 的安装非常简单,这里我们安装 JDK 8 即可(虽然 JDK 现在的新版本是 16,但是国内生产环境还有相当一大部分还是停留在 8 这个版本上)。首先到 JDK下载地址下载对应系统的 JDK 安装包即可,这里我使用的是 mac 系统,下载 dmg 文件即可。
下载完成之后,双击 dmg 文件,一直下一步即可安装完成。
安装 scala
我们这里安装的 scala 版本是 2.13.1 版本 scala 2.13.1 下载地址。下载完成之后,直接解压到当前目录下即可。
下载 gradle
kafka 的代码依赖是通过 gradle 管理的,我们需要从 下载地址 下载 gradle 6.8 的压缩包,然后直接解压到当前目录下即可。
配置环境变量
安装完 JDK、scala 以及 gradle 之后,我们打开命令行,跳转到当前用户的根目录,打开 bash_profile
1 | sudo vim .bash_profile |
在 bash_profile 文件中配置 JAVA_HOME、SCALA_HOME、GRADLE_HOME 三个环境变量,并将它们添加到 PATH 变量上,如下图所示:
接下来,保存 bash_profile 文件,并执行 source 命令刷新文件:
1 | source .bash_profile |
最后,执行 java -version
,scala -version
以及 gradle -version
命令,检查一下环境变量是否配置成功,得到下面张图展示的输出,即表示配置成功:
安装 Zookeeper
kafka在2.8.0版本之前是依赖 Zookeeper 来存储元数据信息的,从 2.8.0 版本开始,kafka 不再强依赖 Zookeeper ,而是自己实现了 raft 协议来存储元数据。
这里我们依旧搭建一个伪分布式的 Zookeeper 环境。首先我们去 Zookeeper 官网下载 3.6.3 版本的压缩包 Zookeeper-3.6.3下载地址 。下载完成之后,我们直接解压 apache-zookeeper-3.6.3-bin.tar.gz 包到当前目录。
然后进入 ${ZOOKEEPER_HOME}/conf 目录,拷贝 zoo_sample.cfg 文件并重命名为 zoo.cfg:
1 | cp zoo_sample.cfg zoo.cfg |
最后,进入 ${ZOOKEEPER_HOME}/bin 目录,并启动 Zookeeper 服务:
1 | ./zkServer.sh start |
启动 Zookeeper 服务成功的话,会有下图的输出:
下载 kafka 源码
上述基础依赖安装完成之后,我们就可以正式开始搭建 kafka 的源码环境了。
首先,我们执行 git clone 命令下载 kafka 源码:
1 | git clone https://github.com/apache/kafka.git |
下载完成之后,我们进入 kafka 源码目录,执行 git checkout 命令检出 origin/2.8 分支:
1 | git checkout -b 2.8 origin/2.8 |
为了将 kafka 源码导入到 IDEA 编辑器中,我们需要执行 gradle idea
命令,这个命令会下载 kafka 的相关依赖,耗时比较长,执行成功之后,会有如下的输出:
最后,将 kafka 源码导入到 IDEA 中,得到的项目结果如下图所示:
在 kafka 中,很多 request 和 response 类都是在编译过程中生成的,下面我们需要执行一下 ./gradlew jar
来生成这些类,该命令执行时间较长,请耐心等待。
./gradlew jar
命令执行完成之后,我们需要在 IDEA 中找到下图几个模块中的 generated 目录,并将其中 java 目录添加到 classpath 中
- client 模块:
- core 模块:
- metadata 模块:
- raft 模块:
验证环境
下面我们来验证一下 kafka 源码环境是否搭建成功。
首先,我们将 conf 目录下的 log4j.properties 配置文件拷贝到 src/main/scala 目录下,如下图所示:
接下来,我们修改 conf 目录下的 server.properties 文件,将修改其中的 log.dir 配置项,将其指向 kafka 源码目录下的 kafka-logs 目录:
1 | /Users/xxx/source/kafka/kafka-logs = |
server.properties 文件中的其他配置暂时不用修改。
最后,我们在 IDEA 中配置 kafka.Kafka 这个入口类,启动 kafka broker,具体配置如下图所示:
启动成功的话,控制台输出没有异常,且能看到如下输出:
P.S.可能遇到的问题:
将 slf4j-log4j12
这个依赖的 scope 调整成 Runtime
即可,如下图所示:
发送、消费 message
我们这里使用 kafka 自带的脚本工具来验证上面搭建的 kafka 源码环境
首先,我们进入到 ${KAFKA_HOME}/bin
目录,通过 kafka-topics.sh 命令来创建一个名为 test_topic
的 topic:
1 | ./kafka-topics.sh --zookeeper localhost:2181 --create --topic test_topic --partitions 1 --replication-factor 1 |
执行效果如下图所示:
然后我们通过 kafka-console-consumer.sh
命令启动一个命令行的 consumer 来消费 test_topic
这个 topic,如下:
1 | ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_topic |
执行效果如下图所示:
此时命令行会一直 hang 住,当接收到 message 的时候,会打印在命令行中。
接下来,我们通过 kafka-console-producer.sh
命令启动一个命令行的 producer 向 test_topic
这个 topic 中生成数据,如下:
1 | ./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test_topic --property parse.key=true |
执行效果如下图所示:
此时命令行会一直 hang 住,当我们输入一条 message 并回车之后,message 会发送到 test_topic
这个 topic 中。
到此为止,kafka 的 broker、producer、consumer 都已经启动好了,我们下面就在 producer 中输入一条带 key 的 message hello YangSizheng
(中间是\t
,不是空格,\t
之前的部分是 key,\t
之后的部分是 value),执行效果如下:
我们输入完 message 并回车之后,就可以在 consumer 处收到该 message 了,效果如下图所示:
总结
本课时我们重点介绍了 kafka 2.8.0 版本的源码环境搭建。
- 首先,我们下载并安装 JDK、scala、gradle 等基础软件,并配置了它们的环境变量。
- 然后,我们安装了 Zookeeper,并启动了 Zookeeper 服务。
- 接下来,我们通过 git 下载了最新的 kafka 源码,并编译启动了 kafka broker
- 最后,我们通过 kafka 自带的命令行 producer 和 consumer 完成了发送和消费 message,达到了验证源码环境的效果。
感谢同学们的观看,课程的相关文章和视频还会放到
- 微信:
- B 站:kafka2.8.0源码环境搭建
- 抖音: