從官方這裏下載Kafka的壓縮包並解壓node
> tar -xzf kafka_2.12-2.2.0.tgz
> cd kafka_2.12-2.2.0
複製代碼
啓動Kafka自帶的ZooKeeper,若是啓動有問題,嘗試sudoshell
> bin/zookeeper-server-start.sh config/zookeeper.properties
複製代碼
啓動Kafka一個broker,默認啓動端口是9092apache
> bin/kafka-server-start.sh config/server.properties
複製代碼
建立一個叫test
的topic,只包含一個分區,一個副本(分區的備份)bootstrap
> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
複製代碼
咱們經過命令行啓動一個消費者,指定剛剛建立叫test的topic。執行完能夠看到等待輸出小程序
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
複製代碼
啓動一個生產者,一樣指定test主題。這個腳本容許咱們從控制檯輸入,一行即一條消息bash
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
複製代碼
能夠看到命令行停住,咱們在控制檯輸入幾句話分佈式
This is a message
Another message
複製代碼
能夠看到消費者控制檯出現咱們輸入的信息,相似一個聊天室的小程序post
Kafka是一個分佈式集羣系統,上面咱們只是開了一個broker,下面來嘗試開三個broker學習
首先,複製多兩份配置文件ui
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties
複製代碼
每一個配置文件修改如下屬性
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/tmp/kafka-logs-2
複製代碼
broker.id
:標識每一個節點的惟一值listeners
:這個broker啓動監聽的地址端口log.dirs
:日誌路徑在以前已經啓動了9092端口的broker基礎上,咱們用這兩份配置文件啓動多兩個broker
> bin/kafka-server-start.sh config/server-1.properties &
> bin/kafka-server-start.sh config/server-2.properties &
複製代碼
而後咱們建立一個分區爲3,副本爲3的主題
> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic my-replicated-topic
複製代碼
能夠用看一下這個topic的狀況
> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
複製代碼
能夠看到輸出
Topic:my-replicated-topic PartitionCount:3 ReplicationFactor:3 Configs:segment.bytes=1073741824
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Topic: my-replicated-topic Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: my-replicated-topic Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
複製代碼
能夠看到,Kafka把分區平均地分到每一個broker上,每一個分區都有一個Leader以及3個副本。能夠嘗試鏈接不一樣broker進行發送,消費消息,這裏再也不演示
咱們先用broker0發佈幾條消息
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
This is a 3-replicated message
333333
複製代碼
接着咱們能夠"掐斷"broker0,看一下會怎樣
> ps aux|grep server.properties
root 1202 0.2 2.4 7047864 394736 s001 S.....
> sudo kill -9 1202
複製代碼
而後看一下這個topic的狀況
> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
複製代碼
Topic:my-replicated-topic PartitionCount:3 ReplicationFactor:3 Configs:segment.bytes=1073741824
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Topic: my-replicated-topic Partition: 1 Leader: 1 Replicas: 0,1,2 Isr: 1,2
Topic: my-replicated-topic Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
複製代碼
能夠看到分區0的Leader已經變成broker1,這時候若是咱們嘗試消費test,會發現消費不了,由於test只有一個副本在broker0上,而後broker0已經斷線了
> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test
[2019-05-15 21:44:05,436] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
...
複製代碼
可是若是咱們鏈接broker1去從新消費,依然能夠消費到broker0被"掐斷"前發送的消息
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --from-beginning
This is a 3-replicated message
333333
複製代碼