> tar -xzf kafka_2.10-0.8.2.0.tgz> cd kafka_2.10-0.8.2.0
Kafka 使用 ZooKeeper 所以須要首先啓動 ZooKeeper 服務。若是你沒有ZooKeeper 服務,可使用Kafka自帶腳本啓動一個應急的單點的 ZooKeeper 實例。 java
> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)...
Now start the Kafka server:
> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)...
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
運行以下命令查看此topic信息:
> bin/kafka-topics.sh --list --zookeeper localhost:2181 test
咱們也能夠經過配置
,讓brokers
自動建立該topic,
當向一個不存在的topic發佈消息時 。
運行producer 而後輸入幾條消息到控制檯,輸入回車發送到服務端。 node
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message
不加參數運行命令會顯示詳細使用文檔. apache
首先咱們爲每一個broker 準備一個配置文件: dom
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties
Now 編輯這些新文件,內容以下:
config/server-1.properties:
broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2
以前已經開啓過Zookeeper 和 broker了,如今只須要再啓動兩就好: socket
> bin/kafka-server-start.sh config/server-1.properties &...
> bin/kafka-server-start.sh config/server-2.properties &...
建立一個新的topic 併爲其設置副本因子:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
OK,如今咱們已經搭建好一個集羣了,可是咱們還不知道他是怎麼工做的?使用"describe topics"命令查看:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0輸出的第一行全部分區的概述, 附加的每行是關於每一個分區的說明。由於咱們只有一個分區因此,這裏只有一行。
咱們再查看下原來的 topic: 工具
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
這沒什麼驚訝的—原來的 topic 沒有設置副本,因此
Replicas是
0, 咱們建立它時集羣中只有一個節點。
讓咱們發佈幾條消息到咱們的新 topic: 測試
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C
Now let's consume these messages:
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C
如今讓咱們測試下容錯性。這裏 Broker 1 是 leader 因此咱們 kill 掉它:
> ps | grep server-1.properties
7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home/bin/java
...
> kill -9 7564
而後leader 切換爲其餘節點而且節點1 也再也不 in-sync 設置中了:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
可是消息仍然能夠消費, 即便是以前的leader 寫下的日誌:
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C