kafka(二)—Kafka 的安裝和使用(Quickstart)

本文已同步至我的博客 liaosi's blog-Kafka 的安裝和使用(Quickstart)

Step1:下載Kafka

官網下載地址下載最新版本並解壓:java

tar -xzf kafka_2.11-1.1.0.tgz
cd kafka_2.11-1.1.0

Step2:配置

  1. ZooKeeper的配置,參考前面的ZooKeeper的文章,此處不詳細作說明。
  2. Kafka的配置
    在Kafka的home目錄下,vim config/server.properties命令,最主要的是設置ZooKeeper鏈接的地址和接口:
    zookeeper.connect=localhost:2181

Step3:啓動服務

Kafka用到了ZooKeeper,因此要先啓動ZooKeepr。
第一種方式(推薦)是啓動獨立的ZooKeeper程序,在$ZOOKEEPER_HOME/bin目錄下,執行./zkServer.sh start
第二種方式是啓動Kafka內置的一個單節點ZooKeeper實例,方法是在$KAFKA_HOME目錄下執行:apache

bin/zookeeper-server-start.sh config/zookeeper.properties

啓動了ZooKeeper以後,就能夠啓動Kafka服務了,在$KAFKA_HOME目錄下執行:bootstrap

bin/kafka-server-start.sh config/server.properties

若是要以守護線程啓動,則執行:vim

bin/kafka-server-start.sh -daemon config/server.properties

Step4:建立一個topic

建立一個叫作testTopic的topic,而且它只有一個分區,一個副本。併發

[root@server01 kafka_2.11-1.0.0]# bin/kafka-topics.sh --create --topic testTopic --zookeeper localhost:2181 --replication-factor 1 --partitions 1
Created topic "testTopic".

partitions參數:表示主題的分區(partition)的數量。
replication-factor參數:表示每一個partition的備份數量。
zookeeper參數:表示zookeeper的主機地址和客戶端鏈接端口。若是zookeeper是集羣,這裏也能夠只寫一個節點就行。app

列出Kafka上的全部topic:oop

[root@server01 kafka_2.11-1.0.0]# bin/kafka-topics.sh --list --zookeeper localhost:2181
myTest
testTopic

上面說明個人kafka已經建立了兩個主題:myTest和testTopic。測試

Step5:發送消息

Kafka 使用一個簡單的命令行客戶端producer,能夠從文件中或者從標準輸入中讀取消息併發送到Kafka服務端集羣。默認的每一行都將做爲一條獨立的消息ui

運行producer並在控制檯中輸一些消息,這些消息將被髮送到服務端:命令行

[root@myServer kafka_2.11-1.0.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testTopic
>This is a message
>This is another message

Step6:啓動consumer

Kafka也有一個命令行consumer能夠讀取消息並輸出到標準輸出:

[root@myServer kafka_2.11-1.0.0]#bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic testTopic --from-beginning
This is a message
This is another message

若是你在一個終端中運行consumer命令行,另外一個終端中運行producer命令行,就能夠在一個終端輸入消息,另外一個終端讀取消息。
這兩個命令都有本身的可選參數,能夠在運行的時候不加任何參數能夠看到幫助信息。

Step7:搭建一個多個broker的集羣

剛纔只是啓動了單個broker,如今啓動有3個broker組成的集羣,這些broker節點也都是在本機上的:
首先爲每一個節點編寫配置文件:

> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties

在拷貝出的新文件中修改以下3個參數:

config/server-1.properties:

broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=/tmp/kafka-logs-1

config/server-2.properties:

broker.id=2
    listeners=PLAINTEXT://:9094
    log.dir=/tmp/kafka-logs-2

broker.id在集羣中惟一的標註一個節點,由於在同一個機器上,因此必須制定不一樣的端口和日誌文件,避免數據被覆蓋。
剛纔已經啓動可Zookeeper和一個節點,如今啓動另外兩個Kafka節點:

> bin/kafka-server-start.sh  -daemon config/server-1.properties 
...
> bin/kafka-server-start.sh  -daemon config/server-2.properties 
...

查看進程:

[root@server01 kafka_2.11-1.0.0]# jps -lm
9857 sun.tools.jps.Jps -lm
1737 org.apache.zookeeper.server.quorum.QuorumPeerMain /home/hadoop/app/zookeeper-3.4.11/bin/../conf/zoo.cfg
9517 kafka.Kafka config/server-2.properties
5566 kafka.Kafka ../config/server.properties
9199 kafka.Kafka config/server-1.properties

建立一個擁有3個副本的topic:

[root@server01 kafka_2.11-1.0.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic my-replicated-topic
Created topic "my-replicated-topic".

如今咱們搭建了一個集羣,怎麼知道每一個節點的信息呢?運行「"describe topics」命令就能夠了:

[root@server01 kafka_2.11-1.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:3    ReplicationFactor:2 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 2   Replicas: 2,0   Isr: 2,0
    Topic: my-replicated-topic  Partition: 1    Leader: 0   Replicas: 0,1   Isr: 0,1
    Topic: my-replicated-topic  Partition: 2    Leader: 1   Replicas: 1,2   Isr: 1,2

下面解釋一下這些輸出。第一行是對全部分區的一個描述,而後每一個分區都會對應一行,咱們建立topic的時候指明是3個分區,因此有3行。
leader:負責處理消息的讀和寫的broker,leader是從全部節點中隨機選擇的.
replicas:列出了全部的副本節點,無論節點是否在服務中.
isr:是正在服務中的節點.

向topic發送消息:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C

消費這些消息:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 2
my test message 1
^C

這裏接收到的順序不一致時由於發送到了不一樣的partiton所致。

測試一下kafka集羣的容錯能力。對於partition=1,其leader是brokerid=0的節點,如今咱們kill掉它:

> ps -aux | grep server.properties
root     32658  0.5 17.4 4305700 329028 pts/0  Sl+  10:06   0:15 /opt/jdk1.8.0_121/bin/java...
> kill -9 32658

再次查詢該topic,能夠看到另一個brokerid=1的節點被選作了leader,brokeid=0的節點再也不出如今 in-sync 副本列表中:

[root@server01 kafka_2.11-1.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:3    ReplicationFactor:2 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 2   Replicas: 2,0   Isr: 2
    Topic: my-replicated-topic  Partition: 1    Leader: 1   Replicas: 0,1   Isr: 1
    Topic: my-replicated-topic  Partition: 2    Leader: 1   Replicas: 1,2   Isr: 1,2

雖然最初負責寫消息的leader down掉了,但以前的消息仍是能夠消費的,不過bootstrap-server參數要選擇沒有down掉的節點:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --from-beginning --topic my-replicated-topic
...
my test message 2
my test message 1
^C
相關文章
相關標籤/搜索