kafka集羣搭建

kafka的背景知識已經講了不少了,讓咱們如今開始實踐吧,假設你已經準備好JDKZooKeeper環境。java

1. 下載代碼

下載2.12-1.1.0版本而且解壓它。apache

tar -xvf kafka_2.12-1.1.0.tgz -C /home/${user}/software/kafka/
複製代碼

3. 修改配置

修改$KAFKA_HOME/config/server.properties文件bootstrap

# 指定代理id,borker.id能夠任意指定,前提是保證集羣內每臺機器的broker.id惟一,第二臺機器設置爲2...以此類推
broker.id=0
# 提供給客戶端響應的端口
port=9092
# kafka數據的存放目錄,而非Kafka的日誌目錄
log.dirs=/tmp/kafka-logs-0
# 設置zookeeper集羣地址
zookeeper.connect=192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181
# 設置本機地址,設置爲本服務器的ip地址。若是不設置會在建立主題和發送消息時,發生NOT LEADER FOR PARTITION異常。
host.name=192.168.0.1
複製代碼

2. 啓動服務

運行kafka須要使用Zookeeper,因此你須要先啓動Zookeeper。
若是你沒有Zookeeper,你可使用kafka自帶打包和配置好的Zookeeper。bash

./bin/zookeeper-server-start.sh config/zookeeper.properties
複製代碼

顯示:服務器

[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
複製代碼

表明Zookeeper啓動成功,如今啓動kafka服務。併發

./bin/kafka-server-start.sh  -daemon config/server.properties
複製代碼

顯示:socket

[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...
複製代碼

表示啓動成功工具

3. 建立Topic

建立一個名爲「test」的Topic,只有一個分區和一個備份:post

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
複製代碼

建立好以後,能夠經過運行如下命令,查看已建立的topic信息:測試

./bin/kafka-topics.sh --list --zookeeper localhost:2181
複製代碼

或者,除了手工建立topic外,你也能夠配置你的broker,當發佈一個不存在的topic時自動建立topic。 配置項以下:

auto.create.topics.enable=true 
複製代碼

4. 發送消息

Kafka提供了一個命令行的工具,能夠從輸入文件或者命令行中讀取消息併發送給Kafka集羣。每一行是一條消息。
運行producer(生產者),而後在控制檯輸入幾條消息到服務器。

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
複製代碼
This is a message
This is another message
複製代碼

5. 消費消息

Kafka也提供了一個消費消息的命令行工具,將存儲的信息輸出出來。

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
複製代碼
This is a message
This is another message
複製代碼

若是你有2臺不一樣的終端上運行上述命令,那麼當你在運行生產者時,消費者就能消費到生產者發送的消息。

6. 設置多個broker集羣

到目前,咱們只是單一的運行一個broker,沒什麼意思。對於Kafka,一個broker僅僅只是一個集羣的大小,全部讓咱們多設幾個broker。

首先爲每一個broker建立一個配置文件:

cp config/server.properties config/server-1.properties 
cp config/server.properties config/server-2.properties
複製代碼

如今編輯這些新建的文件,設置如下屬性:

config/server-1.properties: 
    broker.id=1 
    listeners=PLAINTEXT://:9093 
    log.dir=/tmp/kafka-logs-1

config/server-2.properties: 
    broker.id=2 
    listeners=PLAINTEXT://:9094 
    log.dir=/tmp/kafka-logs-2
複製代碼

broker.id是集羣中每一個節點的惟一且永久的名稱,咱們修改端口和日誌目錄是由於咱們如今在同一臺機器上運行,咱們要防止broker在同一端口上註冊和覆蓋對方的數據。

咱們已經運行了zookeeper和剛纔的一個kafka節點,全部咱們只須要在啓動2個新的kafka節點。

./bin/kafka-server-start.sh  -daemon config/server-1.properties
複製代碼
./bin/kafka-server-start.sh  -daemon config/server-2.properties
複製代碼

如今,咱們建立一個新topic,把備份設置爲:3

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
複製代碼

好了,如今咱們已經有了一個集羣了,咱們怎麼知道每一個集羣在作什麼呢?運行命令"describe topics"

./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
複製代碼

顯示:

Topic:my-replicated-topic    PartitionCount:1    ReplicationFactor:3    Configs:
Topic: my-replicated-topic    Partition: 0    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
複製代碼

輸出解釋:第一行是全部分區的摘要,其次,每一行提供一個分區信息,由於咱們只有一個分區,因此只有一行。

"leader":該節點負責該分區的全部的讀和寫,每一個節點的leader都是隨機選擇的。  
"replicas":備份的節點列表,不管該節點是不是leader或者目前是否還活着,只是顯示。  
"isr":「同步備份」的節點列表,也就是活着的節點而且正在同步leader。  
複製代碼

咱們運行這個命令,看看一開始咱們建立的那個節點:

./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
複製代碼

顯示:

Topic:test    PartitionCount:1    ReplicationFactor:1    Configs:
Topic: test    Partition: 0    Leader: 0    Replicas: 0    Isr: 0
複製代碼

這並不奇怪,剛纔建立的主題沒有Replicas,而且在服務器"0"上,咱們建立它的時候,集羣中只有一個服務器,因此是"0"。

讓咱們來發布一些信息在新的topic上:

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
複製代碼

顯示:

...
my test message 1
my test message 2
複製代碼

如今,消費這些消息。

./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
複製代碼

顯示:

...
my test message 1
my test message 2
複製代碼

咱們要測試集羣的容錯,kill掉leader,Broker1做爲當前的leader,也就是kill掉Broker1。

ps | grep server-1.properties
複製代碼

顯示:

7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home/bin/java... 
複製代碼

執行:

kill -9 7564
複製代碼

在Windows上使用:

wmic process where "caption = 'java.exe' and commandline like '%server-1.properties%'" get processid
複製代碼

顯示:

ProcessId
6016
複製代碼

執行:

taskkill /pid 6016 /f
複製代碼

備份節點之一成爲新的leader,而broker1已經不在同步備份集合裏了。

./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
複製代碼
Topic:my-replicated-topic    PartitionCount:1    ReplicationFactor:3    Configs:
Topic: my-replicated-topic    Partition: 0    Leader: 2    Replicas: 1,2,0    Isr: 2,0
複製代碼

可是,消息仍然沒丟:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
複製代碼
...
my test message 1
my test message 2
複製代碼

文章轉載自:kafka安裝和啓動
相關文章
相關標籤/搜索