Linux Kafka源碼環境搭建

本文主要講述的是如何搭建Kafka的源碼環境,主要針對的Linux操做系統下IntelliJ IDEA編譯器,其他操做系統或者IDE能夠類推。html

1.安裝和配置JDK
確認JDK版本至少爲1.7,最好是1.8及以上。使用java -version命令來查看當前JDK的版本,示例以下:java

lenmom@M1701:~/workspace/software/hadoop-2.7.3/bin$ java -version
java version "1.8.0_191"
Java(TM) SE Runtime Environment (build 1.8.0_191-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.191-b12, mixed mode)


2.下載並安裝配置Gradle
下載地址爲:https://gradle.org/releases/,本人使用的版本是5.2.1(官方在kafka2.1.0的源碼中使用的是4.10.2版本,本人在linux上使用5.2.1版本是成功的,可是在windows上是失敗的,若是在windows上進行搭建,建議採用4.10.2版本)。通常只須要將下載的包解壓,而後再將$GRADLE_HOME/bin的路徑添加到環境變量Path中便可,其中$GRADLE_HOME指的是Gradle的根目錄。可使用gradle -v命令來驗證Gradle是否已經配置完成,示例以下:node

wget https://downloads.gradle.org/distributions/gradle-5.2.1-bin.zip
unzip gradle-5.2.1-bin.zip -C /home/lenmom/software/

配置環境變量linux

vim /etc/profile

添加如下內容apache

export  GRADLE_HOME=/home/lenmom/software/grandle-5.2.1
export PATH=$GRADLE_HOME/bin:$PATH

保存退出,source /etc/profile使環境變量生效vim

 

3.下載並安裝配置Scala
下載地址爲:http://www.scala-lang.org/download/all.html,目前最新的版本是2.12.8,不過筆者這裏使用的版本是2.11.8。如Gradle同樣,只須要解壓並將$SCALA_HOME/bin的路徑添加到環境變量Path便可,其中$SCALA_HOME指的是Scala的根目錄。可使用scala -version命令來驗證scala是否已經配置完成,示例以下:windows

lenmom@M1701:~/workspace/software/hadoop-2.7.3/bin$ scala -version
Scala code runner version 2.11.8 -- Copyright 2002-2016, LAMP/EPFL

 

4. 構建Kafka源碼環境
Kafka下載地址爲:http://kafka.apache.org/downloads,目前最新的版本是2.10。將下載的壓縮包解壓,並在Kafka的根目錄執行gradle idea命令進行構建,若是你使用的是Eclipse,則只需採用gradle eclipse命令構建便可。構建細節以下所示:api

lenmom@M1701:~/workspace/open-source/kafka-2.1.0-src$ gradle idea
Starting a Gradle Daemon (subsequent builds will be faster)

> Configure project :
Building project 'core' with Scala version 2.11.8
Building project 'streams-scala' with Scala version 2.11.8

> Task :idea
Generated IDEA project at file:///home/lenmom/workspace/open-source/kafka-2.1.0-src/kafka-2.1.0-src.ipr

Deprecated Gradle features were used in this build, making it incompatible with Gradle 6.0.
Use '--warning-mode all' to show the individual deprecation warnings.
See https://docs.gradle.org/5.2.1/userguide/command_line_interface.html#sec:command_line_warnings

BUILD SUCCESSFUL in 19m 51s
28 actionable tasks: 28 executed

以後將Kafka導入到IDEA中便可。不過這樣尚未結束,對於IDEA而言,還須要安裝Scala插件,在Setting->Plugin中搜索scala並安裝,能夠參考下圖,筆者這裏是已經安裝好的狀態:session

 

 

5. 修改kafka_source_home/ gradle.properties中的scala版本eclipse

vim  /home/lenmom//workspace/open-source/kafka-2.1.0-src/gradle.properties

將其中的scalaVersion改成2.11.8,原來是2.11.11

修改 /home/lenmom/workspace/open-source/kafka-2.1.0-src/gradle/dependencies.gradle scala版本

/home/lenmom/workspace/open-source/kafka-2.1.0-src/gradle/dependencies.gradle 

將其中的def defaultScala211Version = '2.11.11'改成def defaultScala211Version = '2.11.8'

若是更改了scalaVersion,須要從新執行gradle idea命令來從新構建。雖然不少時候在操做系統中安裝其餘版本的Scala也並無什麼問題,好比安裝2.12.12版本。可是有些狀況下運行Kafka時會出現一些異常,而這些異常卻又是因爲Scala版本不一致而引發的,好比會出現下面示例中的報錯:

[2019-02-10 17:09:21,119] FATAL (kafka.Kafka$)
java.lang.NoSuchMethodError: scala.collection.TraversableOnce.$init$(Lscala/collection/TraversableOnce;)V
at kafka.message.MessageSet.<init>(MessageSet.scala:72)
at kafka.message.ByteBufferMessageSet.<init>(ByteBufferMessageSet.scala:129)
at kafka.message.MessageSet$.<init>(MessageSet.scala:32)
at kafka.message.MessageSet$.<clinit>(MessageSet.scala)
at kafka.server.Defaults$.<init>(KafkaConfig.scala:52)
at kafka.server.Defaults$.<clinit>(KafkaConfig.scala)
at kafka.server.KafkaConfig$.<init>(KafkaConfig.scala:686)
at kafka.server.KafkaConfig$.<clinit>(KafkaConfig.scala)
at kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28)
at kafka.Kafka$.main(Kafka.scala:82)
at kafka.Kafka.main(Kafka.scala)

因此爲了省去一些沒必要要的麻煩,仍是建議讀者在安裝Scala版本以前先查看下Kafka源碼中gradle.properties文件中配置的scalaVersion。

6. 配置Kafka源碼環境

6.1 在確保了scalaVersion以後,須要將config目錄下的log4j.properties文件拷貝到core/src/main/scala目錄下,這樣可讓Kafka在運行時可以輸出日誌信息

cp /home/lenmom/workspace/open-source/kafka-2.1.0-src/config/log4j.properties  /home/lenmom/workspace/open-source/kafka-2.1.0-src/core/src/main/scala/


6.2 配置server.properties文件,通常只須要修改如下一些配置項

# 是否容許topic被刪除,設置爲true則topic能夠被刪除,
# 開啓這個功能方便Kafka在運行一段時間以後,可以刪除一些不須要的臨時topic
delete.topic.enable=true
# 禁用自動建立topic的功能
auto.create.topics.enable=false
# 存儲log文件的目錄,默認值爲/tmp/kafka-logs
# 示例是在Windows環境下運行,因此須要修改這個配置,注意這裏的雙反斜槓。
log.dir=/home/lenmom/workspace/open-source/kafka-2.1.0-src/kafka-logs

# 配置kafka依賴的zookeeper路徑地址,這裏的前提是在本地開啓了一個zookeeper的服務
zookeeper.connect=localhost:2181

6.3 建立kafka日誌保存目錄

mkdir /home/lenmom/workspace/open-source/kafka-2.1.0-src/kafka-logs

 

6.4 確保zookeeper已經啓動

zookeeper的安裝與啓動請參見本人的前期博文

 

6.5 啓動kafka

配置參數:

Main class:    
kafka.Kafka

VM options:    
-Dkafka.logs.dir=/home/lenmom/workspace/open-source/kafka-2.1.0-src/logs
-Dlog4j.configuration=file:/home/lenmom/workspace/open-source/kafka-2.1.0-src/config/log4j.properties

Program arguments: 
/home/lenmom/workspace/open-source/kafka-2.1.0-src/config/server.properties

Enviroment variables:
JMX_PORT=9999

Use classpath of module:
Core_main 

 

這裏配置Main class爲kafka.Kafka,並制定啓動時所須要的配置文件地址,即:config/server.properties。配置JMX_PORT是爲了方便蒐集Kafka自身的Metrics數據。

如此即可以順利的運行Kafka服務了(第一次啓動時會有一個耗時較長的編譯過程),部分啓動日誌以下:

    zookeeper.session.timeout.ms = 6000
    zookeeper.set.acl = false
    zookeeper.sync.time.ms = 2000
 (kafka.server.KafkaConfig)
[2019-02-10 19:27:20,203] INFO [ThrottledChannelReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2019-02-10 19:27:20,205] INFO [ThrottledChannelReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2019-02-10 19:27:20,210] INFO [ThrottledChannelReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2019-02-10 19:27:20,367] INFO Loading logs. (kafka.log.LogManager)
[2019-02-10 19:27:20,403] INFO Logs loading complete in 36 ms. (kafka.log.LogManager)
[2019-02-10 19:27:20,461] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
[2019-02-10 19:27:20,467] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2019-02-10 19:27:21,958] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2019-02-10 19:27:22,098] INFO [SocketServer brokerId=0] Started 1 acceptor threads (kafka.network.SocketServer)
[2019-02-10 19:27:22,200] INFO [ExpirationReaper-0-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-02-10 19:27:22,204] INFO [ExpirationReaper-0-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-02-10 19:27:22,207] INFO [ExpirationReaper-0-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-02-10 19:27:22,272] INFO [LogDirFailureHandler]: Starting (kafka.server.ReplicaManager$LogDirFailureHandler)
[2019-02-10 19:27:22,533] INFO Creating /brokers/ids/0 (is it secure? false) (kafka.zk.KafkaZkClient)
[2019-02-10 19:27:22,539] INFO Result of znode creation at /brokers/ids/0 is: OK (kafka.zk.KafkaZkClient)
[2019-02-10 19:27:22,543] INFO Registered broker 0 at path /brokers/ids/0 with addresses: ArrayBuffer(EndPoint(M1701,9092,ListenerName(PLAINTEXT),PLAINTEXT)) (kafka.zk.KafkaZkClient)
[2019-02-10 19:27:22,760] INFO [ExpirationReaper-0-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-02-10 19:27:22,785] INFO [ExpirationReaper-0-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-02-10 19:27:22,791] INFO [ExpirationReaper-0-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-02-10 19:27:22,903] INFO [GroupCoordinator 0]: Starting up. (kafka.coordinator.group.GroupCoordinator)
[2019-02-10 19:27:22,907] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 2 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-02-10 19:27:22,907] INFO [GroupCoordinator 0]: Startup complete. (kafka.coordinator.group.GroupCoordinator)
[2019-02-10 19:27:22,961] INFO [ProducerId Manager 0]: Acquired new producerId block (brokerId:0,blockStartProducerId:6000,blockEndProducerId:6999) by writing to Zk with path version 7 (kafka.coordinator.transaction.ProducerIdManager)
[2019-02-10 19:27:23,044] INFO [TransactionCoordinator id=0] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
[2019-02-10 19:27:23,049] INFO [Transaction Marker Channel Manager 0]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2019-02-10 19:27:23,050] INFO [TransactionCoordinator id=0] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2019-02-10 19:27:23,209] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
[2019-02-10 19:27:23,230] INFO [SocketServer brokerId=0] Started processors for 1 acceptors (kafka.network.SocketServer)
[2019-02-10 19:27:23,235] WARN Error while loading kafka-version.properties :null (org.apache.kafka.common.utils.AppInfoParser)
[2019-02-10 19:27:23,238] INFO Kafka version : unknown (org.apache.kafka.common.utils.AppInfoParser)
[2019-02-10 19:27:23,238] INFO Kafka commitId : unknown (org.apache.kafka.common.utils.AppInfoParser)
[2019-02-10 19:27:23,244] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
[2019-02-10 19:27:27,404] WARN Client session timed out, have not heard from server in 4179ms for sessionid 0x1000144f4a50007 (org.apache.zookeeper.ClientCnxn)
[2019-02-10 19:27:27,406] INFO Client session timed out, have not heard from server in 4179ms for sessionid 0x1000144f4a50007, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2019-02-10 19:27:29,323] INFO Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2019-02-10 19:27:29,324] INFO Socket connection established to localhost/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2019-02-10 19:27:29,328] INFO Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x1000144f4a50007, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)

若是出現:

Failed to load class "org.slf4j.impl.StaticLoggerBinder" 錯誤消息,解決辦法爲:

下載

log4j-1.2.17.jar
slf4j-api-1.7.25.jar
slf4j-log4j12-1.7.25.jar

並在File->Project Structure->Project Settings>Modules->core->core_main->Dependencies,添加這幾個jar包爲依賴項。

 

本人在windows上按照這個操做亦成功搭建起了調試環境

 

or using command line, which skip runing tests and checkstyle validation:

gradle build -x test -x checkstyleMain -x checkstyleTest -x checkstyleScoverage -x spotbugsMain 
相關文章
相關標籤/搜索