本文主要介紹如何在單節點上安裝 Kafka 並測試 broker、producer 和 consumer 功能。html
進入下載頁面:http://kafka.apache.org/downloads.html ,選擇 Binary downloads下載 (Source download須要編譯才能使用),這裏我下載 kafka_2.11-0.8.2.1
,其對應的 Scala 版本爲 2.11
:java
$ wget http://apache.fayea.com/kafka/0.8.2.1/kafka_2.11-0.8.2.1.tgz
解壓並進入目錄:node
$ tar -xzvf kafka_2.11-0.8.2.1.tgz $ cd kafka_2.11-0.8.2.1
查看目錄結構:nginx
tree -L 2
.
├── bin
│ ├── kafka-console-consumer.sh
│ ├── kafka-console-producer.sh
│ ├── kafka-consumer-offset-checker.sh
│ ├── kafka-consumer-perf-test.sh
│ ├── kafka-mirror-maker.sh
│ ├── kafka-preferred-replica-election.sh
│ ├── kafka-producer-perf-test.sh
│ ├── kafka-reassign-partitions.sh
│ ├── kafka-replay-log-producer.sh
│ ├── kafka-replica-verification.sh
│ ├── kafka-run-class.sh
│ ├── kafka-server-start.sh
│ ├── kafka-server-stop.sh
│ ├── kafka-simple-consumer-shell.sh
│ ├── kafka-topics.sh
│ ├── windows
│ ├── zookeeper-server-start.sh
│ ├── zookeeper-server-stop.sh
│ └── zookeeper-shell.sh
├── config
│ ├── consumer.properties
│ ├── log4j.properties
│ ├── producer.properties
│ ├── server.properties
│ ├── test-log4j.properties
│ ├── tools-log4j.properties
│ └── zookeeper.properties
├── libs
│ ├── jopt-simple-3.2.jar
│ ├── kafka_2.11-0.8.2.1.jar
│ ├── kafka_2.11-0.8.2.1-javadoc.jar
│ ├── kafka_2.11-0.8.2.1-scaladoc.jar
│ ├── kafka_2.11-0.8.2.1-sources.jar
│ ├── kafka_2.11-0.8.2.1-test.jar
│ ├── kafka-clients-0.8.2.1.jar
│ ├── log4j-1.2.16.jar
│ ├── lz4-1.2.0.jar
│ ├── metrics-core-2.2.0.jar
│ ├── scala-library-2.11.5.jar
│ ├── scala-parser-combinators_2.11-1.0.2.jar
│ ├── scala-xml_2.11-1.0.2.jar
│ ├── slf4j-api-1.7.6.jar
│ ├── slf4j-log4j12-1.6.1.jar
│ ├── snappy-java-1.1.1.6.jar
│ ├── zkclient-0.3.jar
│ └── zookeeper-3.4.6.jar
├── LICENSE
└── NOTICE
4 directories, 45 files
運行 kafka ,須要依賴 zookeeper,你能夠使用已有的 zookeeper 集羣或者利用 kafka 提供的腳本啓動一個 zookeeper 實例:shell
$ bin/zookeeper-server-start.sh config/zookeeper.properties &
默認的,zookeeper 會監聽在 *:2181/tcp
。apache
中止剛纔啓動的 zookeeper 實例:windows
$ bin/zookeeper-server-stop.sh
啓動Kafka server:api
$ bin/kafka-server-start.sh config/server.properties &
config/server.properties 中有一些默認的配置參數,這裏僅僅列出參數,不作解釋:tomcat
broker.id=0 port=9092 #host.name=localhost #advertised.host.name=<hostname routable by clients> #advertised.port=<port accessible by clients> num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 #log.flush.interval.messages=10000 #log.flush.interval.ms=1000 log.retention.hours=168 #log.retention.bytes=1073741824 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 log.cleaner.enable=false zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=6000
若是你像我同樣是在虛擬機中測試 kafka,那麼你須要修改 kafka 啓動參數中 JVM 內存大小。查看 kafka-server-start.sh 腳本,修改 KAFKA_HEAP_OPTS
處 -Xmx
和 -Xms
的值。bash
啓動成功以後,會看到以下日誌:
[2015-03-17 11:19:30,528] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2015-03-17 11:19:30,604] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2015-03-17 11:19:30,605] INFO [Socket Server on Broker 0], Started (kafka.network.SocketServer)
[2015-03-17 11:19:30,687] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2015-03-17 11:19:30,756] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2015-03-17 11:19:30,887] INFO Registered broker 0 at path /brokers/ids/0 with address cdh1:9092. (kafka.utils.ZkUtils$)
[2015-03-17 11:19:30,928] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
[2015-03-17 11:19:31,048] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
從日誌能夠看到:
中止 Kafka server :
$ bin/kafka-server-stop.sh
在啓動 kafka-server 以後啓動,運行producer:
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
在另外一個終端運行 consumer:
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
在 producer 端輸入字符串並回車,查看 consumer 端是否顯示。
接下來參考 Running a Multi-Broker Apache Kafka 0.8 Cluster on a Single Node 這篇文章,基於 config/server.properties 配置文件建立多個 broker 的 kafka 集羣。
建立第一個 broker:
$ cp config/server.properties config/server1.properties
編寫 config/server1.properties 並修改下面配置:
broker.id=1 port=9092 log.dir=/tmp/kafka-logs-1
建立第二個 broker:
$ cp config/server.properties config/server2.properties
編寫 config/server2.properties 並修改下面配置:
broker.id=2 port=9093 log.dir=/tmp/kafka-logs-2
建立第三個 broker:
$ cp config/server.properties config/server3.properties
編寫 config/server3.properties 並修改下面配置:
broker.id=3 port=9094 log.dir=/tmp/kafka-logs-3
接下來分別啓動這三個 broker:
$ JMX_PORT=9999 ; nohup bin/kafka-server-start.sh config/server1.properties & $ JMX_PORT=10000 ; nohup bin/kafka-server-start.sh config/server2.properties & $ JMX_PORT=10001 ; nohup bin/kafka-server-start.sh config/server3.properties &
下面是三個 broker 監聽的網絡接口和端口列表:
Broker 1 Broker 2 Broker 3
----------------------------------------------
Kafka *:9092/tcp *:9093/tcp *:9094/tcp
JMX *:9999/tcp *:10000/tcp *:10001/tcp
在 Kafka 0.8 中有兩種方式建立一個新的 topic:
auto.create.topics.enable
參數,當 broker 接收到一個新的 topic 上的消息時候,會經過 num.partitions
和 default.replication.factor
兩個參數自動建立 topic。bin/kafka-topics.sh
命令建立一個名稱爲 zerg.hydra
的 topic:
$ bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic zerg.hydra --partitions 3 --replication-factor 2
使用下面查看建立的 topic:
$ bin/kafka-topics.sh --zookeeper localhost:2181 --list test zerg.hydra
還能夠查看更詳細的信息:
$ bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic zerg.hydra Topic:zerg.hydra PartitionCount:3 ReplicationFactor:2 Configs: Topic: zerg.hydra Partition: 0 Leader: 2 Replicas: 2,3 Isr: 2,3 Topic: zerg.hydra Partition: 1 Leader: 3 Replicas: 3,0 Isr: 3,0 Topic: zerg.hydra Partition: 2 Leader: 0 Replicas: 0,2 Isr: 0,2
默認的,Kafka 持久化 topic 到 log.dir
參數定義的目錄。
$ tree /tmp/kafka-logs-{1,2,3} /tmp/kafka-logs-1 # first broker (broker.id = 1) ├── zerg.hydra-0 # replica of partition 0 of topic "zerg.hydra" (this broker is leader) │ ├── 00000000000000000000.index │ └── 00000000000000000000.log ├── zerg.hydra-2 # replica of partition 2 of topic "zerg.hydra" │ ├── 00000000000000000000.index │ └── 00000000000000000000.log └── replication-offset-checkpoint /tmp/kafka-logs-2 # second broker (broker.id = 2) ├── zerg.hydra-0 # replica of partition 0 of topic "zerg.hydra" │ ├── 00000000000000000000.index │ └── 00000000000000000000.log ├── zerg.hydra-1 # replica of partition 1 of topic "zerg.hydra" (this broker is leader) │ ├── 00000000000000000000.index │ └── 00000000000000000000.log └── replication-offset-checkpoint /tmp/kafka-logs-3 # third broker (broker.id = 3) ├── zerg.hydra-1 # replica of partition 1 of topic "zerg.hydra" │ ├── 00000000000000000000.index │ └── 00000000000000000000.log ├── zerg.hydra-2 # replica of partition 2 of topic "zerg.hydra" (this broker is leader) │ ├── 00000000000000000000.index │ └── 00000000000000000000.log └── replication-offset-checkpoint 6 directories, 15 files
以 sync
模式啓動一個 producer:
$ bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --sync --topic zerg.hydra
而後,輸入如下內容:
Hello, world!
Rock: Nerf Paper. Scissors is fine.
在另外一個終端運行:
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic zerg.hydra --from-beginning
注意,生產環境一般不會添加 --from-beginning
參數。
觀察輸出,你會看到下面內容:
Hello, world!
Rock: Nerf Paper. Scissors is fine.
把 consumer 停掉再啓動,你還會看到相同的輸出結果。
例如,將 apache 或者 nginx 或者 tomcat 等產生的日誌 push 到 kafka,只須要執行下面代碼便可:
$ tail -n 0 -f /var/log/nginx/access.log | bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --sync --topic zerg.hydra