kafka 學習之初體驗

 


學習問題:html

1.kafka是否須要zookeeper?
2.kafka是什麼?
3.kafka包含哪些概念?
4.如何模擬客戶端發送、接受消息初步測試?(kafka安裝步驟)
5.kafka cluster怎麼同zookeeper交互的?java

 

1.kafka是否須要zoopkeepershell

kafka應用須要zookeeper,可使用kafka安裝包提供的zookeeper,也能夠單獨下載zookeeperapache

2.kafka是什麼.json

kafka是一個分佈式消息系統。Kafka是一個 分佈式的、可分區的、可複製的消息系統api

3.kafka常見的概念:app

product:生產者,向kafka發送的消息的程序,稱爲生成者socket

consumer:消費者,從kafka訂閱消息的程序,稱爲消費者tcp

topic:kafka以是topic爲單位進行概括消息的,或者就是一組收集好的消息分佈式

broker:kafka的服務。kafka是怎麼運行的,是集羣形式運行,而集羣能夠由一個或者的多個服務組成,其中每一個服務就稱爲一個broker

partition:分區。一個top由一個或者多個分區組成。分區能夠將一個topitc分移到多個地方存儲,用於提升並行處理能力。

replication:副本。一個分區由一個副本或者多個副本組成。副本用於分區的備份。


4.安裝步驟
(1)下載kafka_2.10-0.9.0.0.tgz包,放在/usr/local目錄下
   tar zxvf  kafka_2.10-0.9.0.0.tgz
   ln -sv kafka_2.10-0.9.0.0 kafka
(2)配置java運行環境,kafka啓動須要zookeeper,且zookeeper啓動須要java運行環境。
   安裝好好jdk 配置環境變量:JAVA_HOME 、PATH、CLASSPATH
   顯示下面的便可
   [root@kafka bin]# echo $JAVA_HOME
   /usr/local/java/
   [root@kafka bin]# echo $CLASSPATH
   .:/usr/local/java//lib:/usr/local/java//jre/lib
   [root@kafka ~]# echo $PATH
   /usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin:/usr/local/java//bin:/root/bin
(3)進入到kafka目錄,找到kafka配置目錄下zookeeper配置文件,啓動zookeeper 可使用這裏zookeeper.properties默認配置
cd /usr/local/kafka/conf
[root@kafka config]# ls zookeeper.properties
zookeeper.properties
  
[root@kafka config]# egrep -v '^#|^$' zookeeper.properties
dataDir=/tmp/zookeeper    
clientPort=2181
maxClientCnxns=0
(4)用kafka自帶的腳本啓動zookeeper,注意用腳本啓動的時候要帶上配置文件。能夠從上面默認的配置文件看出zookeeper
默認監聽的端口是2181,用於提供消費者。consumer, 指定的Socket(localhost+2181),說明消費者的消息來自zookeeper(協調轉發)
啓動zookeeper:
[root@kafka kafka]# ./bin/zookeeper-server-start.sh  config/zookeeper.properties 

[2016-07-08 21:52:14,446] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2016-07-08 21:52:14,449] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2016-07-08 21:52:14,449] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
[2016-07-08 21:52:14,449] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
[2016-07-08 21:52:14,449] WARN Either no config or no quorum defined in config, running  in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2016-07-08 21:52:14,475] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2016-07-08 21:52:14,475] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)
[2016-07-08 21:52:14,490] INFO Server environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT (org.apache.zookeeper.server.ZooKeeperServer)
[2016-07-08 21:52:14,490] INFO Server environment:host.name=kafka (org.apache.zookeeper.server.ZooKeeperServer)
[2016-07-08 21:52:14,490] INFO Server environment:java.version=1.7.0_71 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-07-08 21:52:14,490] INFO Server environment:java.vendor=Oracle Corporation (org.apache.zookeeper.server.ZooKeeperServer)
[2016-07-08 21:52:14,490] INFO Server environment:java.home=/usr/local/java/jre (org.apache.zookeeper.server.ZooKeeperServer)
[2016-07-08 21:52:14,490] INFO Server environment:java.class.path=.:/usr/local/java//lib:/usr/local/java//jre/lib:/usr/local/kafka/bin/../libs/jetty-security-9.2.12.v20150709.jar:/usr/local/kafka/bin/../libs/hk2-utils-2.4.0-b31.jar:/usr/local/kafka/bin/../libs/kafka_2.10-0.9.0.0-scaladoc.jar:/usr/local/kafka/bin/../libs/zookeeper-3.4.6.jar:/usr/local/kafka/bin/../libs/slf4j-log4j12-1.7.6.jar:/usr/local/kafka/bin/../libs/kafka-clients-0.9.0.0.jar:/usr/local/kafka/bin/../libs/metrics-core-2.2.0.jar:/usr/local/kafka/bin/../libs/kafka_2.10-0.9.0.0-javadoc.jar:/usr/local/kafka/bin/../libs/kafka_2.10-0.9.0.0-sources.jar:/usr/local/kafka/bin/../libs/lz4-1.2.0.jar:/usr/local/kafka/bin/../libs/connect-runtime-0.9.0.0.jar:/usr/local/kafka/bin/../libs/kafka-log4j-appender-0.9.0.0.jar:/usr/local/kafka/bin/../libs/snappy-java-1.1.1.7.jar:/usr/local/kafka/bin/../libs/kafka-tools-0.9.0.0.jar:/usr/local/kafka/bin/../libs/slf4j-api-1.7.6.jar:/usr/local/kafka/bin/../libs/jetty-servlet-9.2.12.v20150709.jar:/usr/local/kafka/bin/../libs/aopalliance-repackaged-2.4.0-b31.jar:/usr/local/kafka/bin/../libs/osgi-resource-locator-1.0.1.jar:/usr/local/kafka/bin/../libs/javax.annotation-api-1.2.jar:/usr/local/kafka/bin/../libs/jackson-annotations-2.5.0.jar:/usr/local/kafka/bin/../libs/jetty-io-9.2.12.v20150709.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-json-provider-2.5.4.jar:/usr/local/kafka/bin/../libs/hk2-locator-2.4.0-b31.jar:/usr/local/kafka/bin/../libs/kafka_2.10-0.9.0.0.jar:/usr/local/kafka/bin/../libs/jersey-common-2.22.1.jar:/usr/local/kafka/bin/../libs/jersey-media-jaxb-2.22.1.jar:/usr/local/kafka/bin/../libs/log4j-1.2.17.jar:/usr/local/kafka/bin/../libs/jersey-client-2.22.1.jar:/usr/local/kafka/bin/../libs/jetty-http-9.2.12.v20150709.jar:/usr/local/kafka/bin/../libs/connect-file-0.9.0.0.jar:/usr/local/kafka/bin/../libs/jopt-simple-3.2.jar:/usr/local/kafka/bin/../libs/zkclient-0.7.jar:/usr/local/kafka/bin/../libs/javax.inject-1.jar:/usr/local/kafka/bin/../libs/jetty-util-9.2.12.v20150709.jar:/usr/local/kafka/bin/../libs/jackson-module-jaxb-annotations-2.5.4.jar:/usr/local/kafka/bin/../libs/scala-library-2.10.5.jar:/usr/local/kafka/bin/../libs/connect-json-0.9.0.0.jar:/usr/local/kafka/bin/../libs/connect-api-0.9.0.0.jar:/usr/local/kafka/bin/../libs/hk2-api-2.4.0-b31.jar:/usr/local/kafka/bin/../libs/javassist-3.18.1-GA.jar:/usr/local/kafka/bin/../libs/javax.servlet-api-3.1.0.jar:/usr/local/kafka/bin/../libs/jetty-server-9.2.12.v20150709.jar:/usr/local/kafka/bin/../libs/argparse4j-0.5.0.jar:/usr/local/kafka/bin/../libs/jackson-core-2.5.4.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-base-2.5.4.jar:/usr/local/kafka/bin/../libs/jersey-server-2.22.1.jar:/usr/local/kafka/bin/../libs/javax.ws.rs-api-2.0.1.jar:/usr/local/kafka/bin/../libs/jersey-guava-2.22.1.jar:/usr/local/kafka/bin/../libs/jackson-databind-2.5.4.jar:/usr/local/kafka/bin/../libs/validation-api-1.1.0.Final.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-2.22.1.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-core-2.22.1.jar:/usr/local/kafka/bin/../libs/javax.inject-2.4.0-b31.jar:/usr/local/kafka/bin/../libs/kafka_2.10-0.9.0.0-test.jar (org.apache.zookeeper.server.ZooKeeperServer)
[2016-07-08 21:52:14,490] INFO Server environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.server.ZooKeeperServer)
[2016-07-08 21:52:14,490] INFO Server environment:java.io.tmpdir=/tmp (org.apache.zookeeper.server.ZooKeeperServer)
[2016-07-08 21:52:14,490] INFO Server environment:java.compiler=<NA> (org.apache.zookeeper.server.ZooKeeperServer)
[2016-07-08 21:52:14,490] INFO Server environment:os.name=Linux (org.apache.zookeeper.server.ZooKeeperServer)
[2016-07-08 21:52:14,490] INFO Server environment:os.arch=amd64 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-07-08 21:52:14,490] INFO Server environment:os.version=2.6.32-431.el6.x86_64 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-07-08 21:52:14,490] INFO Server environment:user.name=root (org.apache.zookeeper.server.ZooKeeperServer)
[2016-07-08 21:52:14,491] INFO Server environment:user.home=/root (org.apache.zookeeper.server.ZooKeeperServer)
[2016-07-08 21:52:14,491] INFO Server environment:user.dir=/usr/local/kafka_2.10-0.9.0.0 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-07-08 21:52:14,500] INFO tickTime set to 3000 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-07-08 21:52:14,500] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-07-08 21:52:14,500] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-07-08 21:52:14,511] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
提示信息說明zookeeper已經啓動 能夠用命令jps、netstat驗證進程或者端口
[root@kafka ~]# jps |grep -vi jps
21380 QuorumPeerMain         #zookeeper進程
[root@kafka ~]# netstat -tlnp|grep 2181
tcp        0      0 :::2181                     :::*                        LISTEN      21380/java    #zookeeper服務端口

(3)用kafka自帶腳本啓動kafka,帶上kafka配置文件
[root@kafka config]# egrep -v '^$|^#'  server.properties
broker.id=0                 #服務id
listeners=PLAINTEXT://:9092
port=9092                   #kafka默認監聽端口
host.name=127.0.0.1         #主機名
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs    #kafka的日誌文件
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=127.0.0.1:2181      #zookeeper集羣
zookeeper.connection.timeout.ms=6000  #zookeeper鏈接超時時間
啓動kafka:
[root@kafka kafka]# ./bin/kafka-server-start.sh  config/server.properties
啓動以後這裏有一串信息。
一樣能夠經過jps命令或者netstat命令驗證kafka啓動狀況。
[root@kafka ~]# jps |grep -i kafka
21646 Kafka
[root@kafka ~]# netstat -tlnp|grep 9092
tcp        0      0 :::9092                     :::*                        LISTEN      21646/java
這樣(3)、(4)就啓動zookeeper和kafka應用了。
(4)模擬客戶端發送、接受消息初步測試。先必須建立一個topic
啓動建立topic腳本:./kafka-topics.sh --create --zookeeper localhost:9092 --partitions 1 --replication-factor 1  --topic  kafka_test02
解釋下,kafka-topics.sh是kafka安裝包自動的topic腳本,其用法能夠--help
--create #建立
--zookeeper localhost:9092  #指定--zookeeper 爲consumer提供服務端口。
--partitions 1    #建立一個分區,由於topic必需要有一個分區或者多個分區,這裏建立一個。
--replication-factor  #副本,爲分區的副本
--replication-factor  #指定新建立的topic名
執行後。
[root@kafka kafka]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka-test02
Created topic "kafka-test02".  #顯示topic kafka-test02成功。
另外能夠經過命令查看現有的

[root@kafka kafka]#  bin/kafka-topics.sh --list  --zookeeper localhost:2181
--topic
kafak-test
kafka-test02
my_first_topic
(5)用kafka自帶一個kafka-console-producer.sh 建立一個product的xshell,用
.bin/kafka-console-producer.sh   --broker-list localhost:9092  --topic  kafka-test02
#  kafka-test02是剛剛建立的topic,producer,指定的Socket(localhost+9092),說明生產者的消息要發往kafka,也便是broker。
[root@kafka kafka]# bin/kafka-console-producer.sh --broker-list localhost:9092  --topic  kafka-test02
hello world       #在product的shell發送hello world


bin/kafka-console-consumer.sh  --zookeeper localhost:2181 --topic kafka-test02 --from-beginning
#consumer,  指定的Socket(localhost+2181),說明消費者的消息來自zookeeper(協調轉發)


[root@kafka kafka]# bin/kafka-console-consumer.sh  --zookeeper localhost:2181 --topic kafka-test02 --from-beginning

hello world       #在consumer的shell收到hello world 信息

(5)jps命令查看producer、consumer
[root@kafka ~]# jps
22380 ConsoleProducer
22468 ConsoleConsumer
22575 Jps
21646 Kafka
21380 QuorumPeerMain
這裏實驗的是創建的一個服務broker上面的。


5。單節點的多個broker集羣
啓動多個broker服務,此時的kafka配置文件須要從新修改。
[root@kafka config]# egrep -v '^$|^#'  server.properties
broker.id=0                 #服務id
listeners=PLAINTEXT://:9092
port=9092                   #kafka默認監聽端口
host.name=127.0.0.1         #主機名
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs    #kafka的日誌文件
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=127.0.0.1:2181      #zookeeper集羣
zookeeper.connection.timeout.ms=6000  #zookeeper鏈接超時時間

參考博客:http://www.aboutyun.com/thread-12847-1-1.html

相關文章
相關標籤/搜索