kafka的背景知識已經講了不少了,讓咱們如今開始實踐吧,假設你已經準備好JDK
和ZooKeeper
環境。java
下載2.12-1.1.0版本而且解壓它。apache
tar -xvf kafka_2.12-1.1.0.tgz -C /home/${user}/software/kafka/
複製代碼
修改$KAFKA_HOME/config/server.properties文件bootstrap
# 指定代理id,borker.id能夠任意指定,前提是保證集羣內每臺機器的broker.id惟一,第二臺機器設置爲2...以此類推
broker.id=0
# 提供給客戶端響應的端口
port=9092
# kafka數據的存放目錄,而非Kafka的日誌目錄
log.dirs=/tmp/kafka-logs-0
# 設置zookeeper集羣地址
zookeeper.connect=192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181
# 設置本機地址,設置爲本服務器的ip地址。若是不設置會在建立主題和發送消息時,發生NOT LEADER FOR PARTITION異常。
host.name=192.168.0.1
複製代碼
運行kafka須要使用Zookeeper,因此你須要先啓動Zookeeper。
若是你沒有Zookeeper,你可使用kafka自帶打包和配置好的Zookeeper。bash
./bin/zookeeper-server-start.sh config/zookeeper.properties
複製代碼
顯示:服務器
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
複製代碼
表明Zookeeper啓動成功,如今啓動kafka服務。併發
./bin/kafka-server-start.sh -daemon config/server.properties
複製代碼
顯示:socket
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...
複製代碼
表示啓動成功工具
建立一個名爲「test」的Topic,只有一個分區和一個備份:post
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
複製代碼
建立好以後,能夠經過運行如下命令,查看已建立的topic信息:測試
./bin/kafka-topics.sh --list --zookeeper localhost:2181
複製代碼
或者,除了手工建立topic外,你也能夠配置你的broker,當發佈一個不存在的topic時自動建立topic。 配置項以下:
auto.create.topics.enable=true
複製代碼
Kafka提供了一個命令行的工具,能夠從輸入文件或者命令行中讀取消息併發送給Kafka集羣。每一行是一條消息。
運行producer(生產者),而後在控制檯輸入幾條消息到服務器。
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
複製代碼
This is a message
This is another message
複製代碼
Kafka也提供了一個消費消息的命令行工具,將存儲的信息輸出出來。
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
複製代碼
This is a message
This is another message
複製代碼
若是你有2臺不一樣的終端上運行上述命令,那麼當你在運行生產者時,消費者就能消費到生產者發送的消息。
到目前,咱們只是單一的運行一個broker,沒什麼意思。對於Kafka,一個broker僅僅只是一個集羣的大小,全部讓咱們多設幾個broker。
首先爲每一個broker建立一個配置文件:
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
複製代碼
如今編輯這些新建的文件,設置如下屬性:
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
複製代碼
broker.id
是集羣中每一個節點的惟一且永久的名稱,咱們修改端口和日誌目錄是由於咱們如今在同一臺機器上運行,咱們要防止broker在同一端口上註冊和覆蓋對方的數據。
咱們已經運行了zookeeper和剛纔的一個kafka節點,全部咱們只須要在啓動2個新的kafka節點。
./bin/kafka-server-start.sh -daemon config/server-1.properties
複製代碼
./bin/kafka-server-start.sh -daemon config/server-2.properties
複製代碼
如今,咱們建立一個新topic,把備份設置爲:3
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
複製代碼
好了,如今咱們已經有了一個集羣了,咱們怎麼知道每一個集羣在作什麼呢?運行命令"describe topics"
./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
複製代碼
顯示:
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
複製代碼
輸出解釋:第一行是全部分區的摘要,其次,每一行提供一個分區信息,由於咱們只有一個分區,因此只有一行。
"leader":該節點負責該分區的全部的讀和寫,每一個節點的leader都是隨機選擇的。
"replicas":備份的節點列表,不管該節點是不是leader或者目前是否還活着,只是顯示。
"isr":「同步備份」的節點列表,也就是活着的節點而且正在同步leader。
複製代碼
咱們運行這個命令,看看一開始咱們建立的那個節點:
./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
複製代碼
顯示:
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
複製代碼
這並不奇怪,剛纔建立的主題沒有Replicas,而且在服務器"0"上,咱們建立它的時候,集羣中只有一個服務器,因此是"0"。
讓咱們來發布一些信息在新的topic上:
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
複製代碼
顯示:
...
my test message 1
my test message 2
複製代碼
如今,消費這些消息。
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
複製代碼
顯示:
...
my test message 1
my test message 2
複製代碼
咱們要測試集羣的容錯,kill掉leader,Broker1做爲當前的leader,也就是kill掉Broker1。
ps | grep server-1.properties
複製代碼
顯示:
7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home/bin/java...
複製代碼
執行:
kill -9 7564
複製代碼
在Windows上使用:
wmic process where "caption = 'java.exe' and commandline like '%server-1.properties%'" get processid
複製代碼
顯示:
ProcessId
6016
複製代碼
執行:
taskkill /pid 6016 /f
複製代碼
備份節點之一成爲新的leader,而broker1已經不在同步備份集合裏了。
./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
複製代碼
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
複製代碼
可是,消息仍然沒丟:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
複製代碼
...
my test message 1
my test message 2
複製代碼