本文已同步至我的博客 liaosi's blog-Kafka 的安裝和使用(Quickstart)
去官網下載地址下載最新版本並解壓:java
tar -xzf kafka_2.11-1.1.0.tgz cd kafka_2.11-1.1.0
vim config/server.properties
命令,最主要的是設置ZooKeeper鏈接的地址和接口:Kafka用到了ZooKeeper,因此要先啓動ZooKeepr。
第一種方式(推薦)是啓動獨立的ZooKeeper程序,在$ZOOKEEPER_HOME/bin
目錄下,執行./zkServer.sh start
。
第二種方式是啓動Kafka內置的一個單節點ZooKeeper實例,方法是在$KAFKA_HOME
目錄下執行:apache
bin/zookeeper-server-start.sh config/zookeeper.properties
啓動了ZooKeeper以後,就能夠啓動Kafka服務了,在$KAFKA_HOME
目錄下執行:bootstrap
bin/kafka-server-start.sh config/server.properties
若是要以守護線程啓動,則執行:vim
bin/kafka-server-start.sh -daemon config/server.properties
建立一個叫作testTopic
的topic,而且它只有一個分區,一個副本。併發
[root@server01 kafka_2.11-1.0.0]# bin/kafka-topics.sh --create --topic testTopic --zookeeper localhost:2181 --replication-factor 1 --partitions 1 Created topic "testTopic".
partitions參數:表示主題的分區(partition)的數量。
replication-factor參數:表示每一個partition的備份數量。
zookeeper參數:表示zookeeper的主機地址和客戶端鏈接端口。若是zookeeper是集羣,這裏也能夠只寫一個節點就行。app
列出Kafka上的全部topic:oop
[root@server01 kafka_2.11-1.0.0]# bin/kafka-topics.sh --list --zookeeper localhost:2181 myTest testTopic
上面說明個人kafka已經建立了兩個主題:myTest和testTopic。測試
Kafka 使用一個簡單的命令行客戶端producer,能夠從文件中或者從標準輸入中讀取消息併發送到Kafka服務端集羣。默認的每一行都將做爲一條獨立的消息ui
運行producer並在控制檯中輸一些消息,這些消息將被髮送到服務端:命令行
[root@myServer kafka_2.11-1.0.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testTopic >This is a message >This is another message
Kafka也有一個命令行consumer能夠讀取消息並輸出到標準輸出:
[root@myServer kafka_2.11-1.0.0]#bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic testTopic --from-beginning This is a message This is another message
若是你在一個終端中運行consumer命令行,另外一個終端中運行producer命令行,就能夠在一個終端輸入消息,另外一個終端讀取消息。
這兩個命令都有本身的可選參數,能夠在運行的時候不加任何參數能夠看到幫助信息。
剛纔只是啓動了單個broker,如今啓動有3個broker組成的集羣,這些broker節點也都是在本機上的:
首先爲每一個節點編寫配置文件:
> cp config/server.properties config/server-1.properties > cp config/server.properties config/server-2.properties
在拷貝出的新文件中修改以下3個參數:
config/server-1.properties:
broker.id=1 listeners=PLAINTEXT://:9093 log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2 listeners=PLAINTEXT://:9094 log.dir=/tmp/kafka-logs-2
broker.id在集羣中惟一的標註一個節點,由於在同一個機器上,因此必須制定不一樣的端口和日誌文件,避免數據被覆蓋。
剛纔已經啓動可Zookeeper和一個節點,如今啓動另外兩個Kafka節點:
> bin/kafka-server-start.sh -daemon config/server-1.properties ... > bin/kafka-server-start.sh -daemon config/server-2.properties ...
查看進程:
[root@server01 kafka_2.11-1.0.0]# jps -lm 9857 sun.tools.jps.Jps -lm 1737 org.apache.zookeeper.server.quorum.QuorumPeerMain /home/hadoop/app/zookeeper-3.4.11/bin/../conf/zoo.cfg 9517 kafka.Kafka config/server-2.properties 5566 kafka.Kafka ../config/server.properties 9199 kafka.Kafka config/server-1.properties
建立一個擁有3個副本的topic:
[root@server01 kafka_2.11-1.0.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic my-replicated-topic Created topic "my-replicated-topic".
如今咱們搭建了一個集羣,怎麼知道每一個節點的信息呢?運行「"describe topics」命令就能夠了:
[root@server01 kafka_2.11-1.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:3 ReplicationFactor:2 Configs: Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,0 Isr: 2,0 Topic: my-replicated-topic Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1 Topic: my-replicated-topic Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2
下面解釋一下這些輸出。第一行是對全部分區的一個描述,而後每一個分區都會對應一行,咱們建立topic的時候指明是3個分區,因此有3行。
leader:負責處理消息的讀和寫的broker,leader是從全部節點中隨機選擇的.
replicas:列出了全部的副本節點,無論節點是否在服務中.
isr:是正在服務中的節點.
向topic發送消息:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic ... my test message 1 my test message 2 ^C
消費這些消息:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic ... my test message 2 my test message 1 ^C
這裏接收到的順序不一致時由於發送到了不一樣的partiton所致。
測試一下kafka集羣的容錯能力。對於partition=1,其leader是brokerid=0的節點,如今咱們kill掉它:
> ps -aux | grep server.properties root 32658 0.5 17.4 4305700 329028 pts/0 Sl+ 10:06 0:15 /opt/jdk1.8.0_121/bin/java... > kill -9 32658
再次查詢該topic,能夠看到另一個brokerid=1的節點被選作了leader,brokeid=0的節點再也不出如今 in-sync 副本列表中:
[root@server01 kafka_2.11-1.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:3 ReplicationFactor:2 Configs: Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,0 Isr: 2 Topic: my-replicated-topic Partition: 1 Leader: 1 Replicas: 0,1 Isr: 1 Topic: my-replicated-topic Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2
雖然最初負責寫消息的leader down掉了,但以前的消息仍是能夠消費的,不過bootstrap-server參數要選擇沒有down掉的節點:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --from-beginning --topic my-replicated-topic ... my test message 2 my test message 1 ^C