這是我參與更文挑戰的第7天,活動詳情查看: 更文挑戰java
本文正在參加「Java主題月 - Java 開發實戰」,詳情查看 活動連接shell
Kafka是一種高吞吐量的分佈式發佈訂閱的消息隊列系統apache
Kafka是一種高吞吐量的分佈式發佈訂閱的消息隊列系統,Kafka對消息進行保存時是經過tipic進行分組的。今天咱們僅實現Kafka集羣的配置。理論的抽空在聊
複製代碼
瞭解kafka的都知道。kafka是經過Zookeeper實現分佈操做的。不論是broker,consumer,仍是provide信息都是存儲在Zookeeper中的。當broker掛掉都是Zookeeper來進行從新分配選擇的。因此實現kafka集羣前咱們得先實現Zookeeper的集羣配置。windows
首先咱們從官網上下載Zookeeper到本地。我這裏下載的是Zookeeper-3.4.6.tar.gz版本的。讀者能夠根據本身狀況下載。下載好以後進行文件解壓。而後找到conf文件中的zoo_sample.cfg文件。該文件是Zookeeper官網給咱們提供的一套樣板。咱們賦值該文件到同級下並更名爲zoo.cfg.以下圖api
而後咱們來看看這個配置文件裏面都有些啥服務器
tickTime=2000
複製代碼
initLimit=10
複製代碼
syncLimit=5
複製代碼
dataDir
複製代碼
dataLogDir
複製代碼
clientPort=2181
複製代碼
server.1=192.168.1.130:28881:38881
server.2=192.168.1.130:28882:38882
server.3=192.168.1.130:28883:38883
複製代碼
知道配置文件裏的意思應該就知道如何修改了吧markdown
dataDir+dataLogDir+clientPort
複製代碼
上面的格式咱們能夠簡單的總結爲 server.num=B:C:D。 num:是正整數表明的服務的惟一標識。這個要和後面說道的myid文件保持一致。網絡
B: 標識Zookeeper集羣中某一個服務的ip或者域名 192.168.1.130app
C:表示server.num這個服務於集羣中leader進行信息交流的端口。在kafka中咱們leader和follower須要進行數據備份。具體服務就是經過這個地方制定的端口進行通訊的。socket
D:表示萬一leader宕機了,咱們就經過這個端口來進行再follower中選舉新的leader。
網上的不少教程也就介紹到這裏。稍微好點就提了一下建立myid文件的事,我當時就糾結在這裏。由於我根本不知道穿件的myid的類型。我就隨便建立txt文件。結果是錯的。這裏咱們建立myid我有兩種方式。還有myid裏面的內容就是咱們對應的配置文件中server.num中的num。
第一種就是咱們經過cmd窗口到咱們要建立myid的文件夾下
執行以下命令
echo 1 > myid
複製代碼
第二種是咱們先建立TXT文件將對應的內容寫入。而後txt後綴刪掉就能夠了。
順便提一下myid應該放在咱們conf/zoo.cfg文件中指定的dataDir 的對應的文件路徑下。
所謂的集羣就是講上面的Zookeeper複製成多個,將上面提到的幾個重要的屬性更改掉就好了。
若是你到這一步說明你離成功已經不遠了。下面咱們只須要開啓服務就好了。開啓服務在咱們解壓的bin目錄下。
上面咱們配置的Zookeeper在開啓第一個時候回報錯。爲何呢。緣由就是咱們開啓了一個服務,。可是咱們的配置文件配置的是集羣的信息。這個時候就回去尋找其餘服務。可是這個時候其餘的服務尚未開啓呢。因此這個錯誤是正常。等咱們集羣中的全部的服務都開啓了就不會報錯。這裏你們不要被嚇到。
除此以外,還有一點就是Zookeeper的安裝目錄(解壓目錄)是絕對不能包含漢字的。我上面的截圖有漢字那是我計算機上設置的。實際的路徑是沒有漢字的。不要被上面的圖片誘導。
當全部的服務都開啓了,咱們如何查看咱們的服務是否開啓成功呢。這很簡單。咱們從新打開一個新的cmd窗口。直接執行jps就能夠看到咱們的服務了。QuorumPeerMain就是咱們的服務主類
上面咱們就完成了Zookeeper的集羣的配置。實際上Kafka中就自帶有Zookeeper的服務。可是爲了數據的高可用性。咱們最好選擇本身搭建Zookeeper集羣。這也是官網上的建議。
這裏個人Kafka版本選擇的是0.8.1.1。建議單價不要選擇過高的版本。剛出的版本可能有未知的bug。
一樣這裏的集羣就是講Kafka複製多個。這裏我選擇其中一個進行講解。其餘的都是同樣的主要就是講端口改掉就好了。
將官網下載的Kafka解壓更名爲kafka1(其餘的更名數字遞增就行。或者自定義別的名字)。找到config/server.properties文件。
一樣的先來了解裏面的參數含義吧
broker.id=1
複製代碼
port=9091
複製代碼
host.name=192.168.1.130
複製代碼
advertised.host.name=192.168.1.130
複製代碼
broker服務將通知消費者和生產者 換言之,就是消費者和生產者就是經過這個主機(IP)來進行通訊的。若是沒有設置就默認採用host.name。
num.network.threads=2
複製代碼
num.io.threads=8
複製代碼
socket.send.buffer.bytes=1048576
複製代碼
socket.receive.buffer.bytes=1048576
複製代碼
socket.request.max.bytes=104857600
複製代碼
log.dirs=\logs
複製代碼
num.partitions=2
複製代碼
log.retention.hours=168
複製代碼
log.segment.bytes=536870912
複製代碼
達到要求 都會刪除該文件 在建立topic時能夠從新制定。若沒有.則選取該默認值
log.retention.check.interval.ms=60000
複製代碼
log.cleaner.enable=false
複製代碼
zookeeper.connect=192.168.1.130:num1,192.168.1.130:num2,192.168.1.130:num3
複製代碼
zookeeper.connection.timeout.ms=1000000
複製代碼
broker.id + port
複製代碼
set ivyPath=%USERPROFILE%\.ivy2\cache
set snappy=%ivyPath%/org.xerial.snappy/snappy-java/bundles/snappy-java-1.0.5.jar
call :concat %snappy%
set library=%ivyPath%/org.scala-lang/scala-library/jars/scala-library-2.8.0.jar
call :concat %library%
set compiler=%ivyPath%/org.scala-lang/scala-compiler/jars/scala-compiler-2.8.0.jar
call :concat %compiler%
set log4j=%ivyPath%/log4j/log4j/jars/log4j-1.2.15.jar
call :concat %log4j%
set slf=%ivyPath%/org.slf4j/slf4j-api/jars/slf4j-api-1.6.4.jar
call :concat %slf%
set zookeeper=%ivyPath%/org.apache.zookeeper/zookeeper/jars/zookeeper-3.3.4.jar
call :concat %zookeeper%
set jopt=%ivyPath%/net.sf.jopt-simple/jopt-simple/jars/jopt-simple-3.2.jar
call :concat %jopt%
for %%i in (%BASE_DIR%\core\target\scala-2.8.0\*.jar) do (
call :concat %%i
)
for %%i in (%BASE_DIR%\core\lib\*.jar) do (
call :concat %%i
)
for %%i in (%BASE_DIR%\perf\target\scala-2.8.0/kafka*.jar) do (
call :concat %%i
)
複製代碼
for %%i in (%BASE_DIR%\libs\*.jar) do (
call :concat %%i
)
複製代碼
首先第一行提示 set JMX_PORT to default value 9999 這個錯誤是由於我沒有設置這個值。這卻是小事。可是後面報說找不到或沒法加載主類kafka.Kafka這就讓我費解。在這裏我也是卡了一天了。後來在網上找到了一個方法。我不知道這是否是Kafka的bug。反正用這個方法我是解決了這個錯誤了。
解決辦法就是將kafka-run-class.bat文件中
set COMMAND= %JAVA% %KAFKA_OPTS% %KAFKA_JMX_OPTS% -cp %CLASSPATH% %*
複製代碼
set COMMAND= %JAVA% %KAFKA_OPTS% %KAFKA_JMX_OPTS% -cp "%CLASSPATH%" %*
複製代碼
到這一步咱們離kafka的成功又不遠了。咱們新開cmd窗口cd到kafka的bin目錄中。
可是在執行開啓以前咱們須要先執行
Set JMX_PORT=19091(每一個服務數字不能同樣)
複製代碼
kafka-server-start.bat ..\config\server.properties
複製代碼
kafka-run-class.bat kafka.admin.TopicCommand %*
複製代碼
建立Topic
kafka-topics.bat --create --zookeeper 192.168.1.130:2181 --replication-factor 2 --partitions 3 --topic my-replicated-topic
複製代碼
查看Topic
kafka-topics.bat --describe --zookeeper 192.168.1.130:2181 --topic my-replicated-topic
複製代碼
生產topic消息
kafka-console-producer.bat --broker-list 192.168.1.130:9093 --topic my-replicated-topic
複製代碼
消費topic消息
kafka-console-consumer.bat --zookeeper 192.168.1.130:2181 --from-beginning --topic my-replicated-topic
複製代碼
資源因爲大小限制暫時沒法上傳!後續再上傳