Kafka安裝與實驗

接上面一篇文章:html

http://www.cnblogs.com/charlesblc/p/6038112.htmljava

 

主要參考這篇文章:node

http://www.open-open.com/lib/view/open1435884136903.htmlapache

還有以前一直在跟的這篇文章:bootstrap

http://blog.csdn.net/ymh198816/article/details/51998085xcode

 

下載了Kafka的安裝包:瀏覽器

http://apache.fayea.com/kafka/0.10.1.0/kafka_2.11-0.10.1.0.tgzapp

拷貝到 06機器,而後按照要求先啓動 Zookeepertcp

可是 Zookeeper 報錯,應該是Java版本問題,因此設置了 PATH和JAVA_HOMEide

export PATH=/home/work/.jumbo/opt/sun-java8/bin/:$PATH
export JAVA_HOME=/home/work/.jumbo/opt/sun-java8/

而後啓動Zookeeper命令:

nohup bin/zookeeper-server-start.sh config/zookeeper.properties & 
日誌:
[2016-11-09 20:50:01,032] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

而後啓動Kafka命令:

nohup bin/kafka-server-start.sh config/server.properties & 

能夠看到端口已經啓動:
$ netstat -nap | grep 9092
(Not all processes could be identified, non-owned process info
 will not be shown, you would have to be root to see it all.)
tcp        0      0 0.0.0.0:9092                0.0.0.0:*                   LISTEN      19508/java          

用Kafka自帶的命令行工具測試一下:

$ bin/kafka-console-producer.sh --zookeeper localhost:2181 --topic test
zookeeper is not a recognized option

producer用zookeeper發現報錯,改用broker-list,注意端口要變

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

啓動成功,沒有warning

在另外一個終端上
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

WARN看起來,用broker-list 直接連9092也能夠,沒有實驗

而後在第一個終端,輸入一些字符:
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
hihi
[2016-11-10 11:28:34,658] WARN Error while fetching metadata with correlation id 0 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
oh yeah

能夠看到第二個終端有輸出,連通成功:
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].



hihi
oh yeah

而後再用監控工具看一下Kafka,

下載 KafkaOffsetMonitor-assembly-0.2.0.jar(地址:link

拷貝到06機器的/home/work/data/installed/

而後啓動命令:

$ java -cp KafkaOffsetMonitor-assembly-0.2.0.jar \
>  com.quantifind.kafka.offsetapp.OffsetGetterWeb \
>  --zk localhost:2181 \
>  --port 8089 \
>  --refresh 10.seconds \
>  --retain 1.days

以後就能夠在瀏覽器訪問:

http://[06機器hostname]:8089/

 

 在Flume上面配一個新的Sink:

配兩個channel到兩個sink,可是報錯:

 Could not configure sink  kafka due to: Channel memorychannel2 not in active set.

原來是channel的大小寫寫錯了,修改以後的配置文件:

agent.sources = origin
agent.channels = memoryChannel memoryChannel2
agent.sinks = hdfsSink kafka

# For each one of the sources, the type is defined
agent.sources.origin.type =  exec
agent.sources.origin.command = tail -f /home/work/data/LogGenerator_jar/logs/generator.log
# The channel can be defined as follows.
agent.sources.origin.channels = memoryChannel memoryChannel2

# Each sink's type must be defined
agent.sinks.hdfsSink.type = hdfs
agent.sinks.hdfsSink.hdfs.path = /output/Logger
agent.sinks.hdfsSink.hdfs.fileType = DataStream
agent.sinks.hdfsSink.hdfs.writeFormati = TEXT
agent.sinks.hdfsSink.hdfs.rollInterval = 1
agent.sinks.hdfsSink.hdfs.filePrefix=%Y-%m-%d
agent.sinks.hdfsSink.hdfs.useLocalTimeStamp = true
#Specify the channel the sink should use
agent.sinks.hdfsSink.channel = memoryChannel

agent.sinks.kafka.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafka.brokerList=localhost:9092
agent.sinks.kafka.requiredAcks=1
agent.sinks.kafka.batchSize=100
agent.sinks.kafka.channel = memoryChannel2

# Each channel's type is defined.
agent.channels.memoryChannel.type = memory
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 100

agent.channels.memoryChannel2.type = memory
agent.channels.memoryChannel2.capacity = 100

啓動後,看到日誌是正常的:

10 Nov 2016 12:50:31,394 INFO  [hdfs-hdfsSink-call-runner-3] (org.apache.flume.sink.hdfs.BucketWriter$8.call:618)  - Renaming /output/Logger/2016-11-10.1478753429757.tmp to /output/Logger/2016-11-10.1478753429757
10 Nov 2016 12:50:31,417 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hdfs.BucketWriter.open:231)  - Creating /output/Logger/2016-11-10.1478753429758.tmp
10 Nov 2016 12:50:32,518 INFO  [hdfs-hdfsSink-roll-timer-0] (org.apache.flume.sink.hdfs.BucketWriter.close:357)  - Closing /output/Logger/2016-11-10.1478753429758.tmp
10 Nov 2016 12:50:32,527 INFO  [hdfs-hdfsSink-call-runner-9] (org.apache.flume.sink.hdfs.BucketWriter$8.call:618)  - Renaming /output/Logger/2016-11-10.1478753429758.tmp to /output/Logger/2016-11-10.1478753429758
10 Nov 2016 12:50:32,535 INFO  [hdfs-hdfsSink-roll-timer-0] (org.apache.flume.sink.hdfs.HDFSEventSink$1.run:382)  - Writer callback called.

而後發現flume的數據,kafka仍是收不到。檢查日誌,發現warn:

10 Nov 2016 12:50:25,662 WARN  [conf-file-poller-0] (org.apache.flume.sink.kafka.KafkaSink.translateOldProps:345)  - topic is deprecated. Please use the parameter kafka.topic
10 Nov 2016 12:50:25,662 WARN  [conf-file-poller-0] (org.apache.flume.sink.kafka.KafkaSink.translateOldProps:356)  - brokerList is deprecated. Please use the parameter kafka.bootstrap.servers
10 Nov 2016 12:50:25,662 WARN  [conf-file-poller-0] (org.apache.flume.sink.kafka.KafkaSink.translateOldProps:366)  - batchSize is deprecated. Please use the parameter flumeBatchSize
10 Nov 2016 12:50:25,662 WARN  [conf-file-poller-0] (org.apache.flume.sink.kafka.KafkaSink.translateOldProps:376)  - requiredAcks is deprecated. Please use the parameter kafka.producer.acks
10 Nov 2016 12:50:25,662 WARN  [conf-file-poller-0] (org.apache.flume.sink.kafka.KafkaSink.configure:300)  - Topic was not specified. Using default-flume-topic as the topic.

而後從新配置了Flume的Sink:

agent.sinks.kafka.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafka.topic=test1
agent.sinks.kafka.brokerList = localhost:9092
agent.sinks.kafka.channel = memoryChannel2

而後啓動,可以在日誌看到Flume正常了:

10 Nov 2016 13:03:39,263 INFO  [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:171)  - Starting Sink kafka
10 Nov 2016 13:03:39,264 INFO  [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:171)  - Starting Sink hdfsSink
10 Nov 2016 13:03:39,265 INFO  [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:182)  - Starting Source origin

而後運行生成日誌的命令:

cd /home/work/data/LogGenerator_jar;
java -jar LogGenerator.jar

在上面的可視化頁面,可以看到Topic test1,可是看不到valid consumer。

啓動一個命令行consumer:

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test1 --from-beginning

可以看到收到消息的輸出了:

Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
[INFO][main][2016-11-10 12:53:56][com.comany.log.generator.LogGenerator] - orderNumber: 971581478753636880 | orderDate: 2016-11-10 12:53:56 | paymentNumber: Paypal-21032218 | paymentDate: 2016-11-10 12:53:56 | merchantName: Apple | sku: [ skuName: 高腰闊腿休閒褲 skuNum: 1 skuCode: z1n6iyh653 skuPrice: 2000.0 totalSkuPrice: 2000.0; skuName: 塑身牛仔褲 skuNum: 1 skuCode: naaiy2z1jn skuPrice: 399.0 totalSkuPrice: 399.0; skuName: 高腰闊腿休閒褲 skuNum: 2 skuCode: 4iaz6zkxs6 skuPrice: 1000.0 totalSkuPrice: 2000.0; ] | price: [ totalPrice: 4399.0 discount: 10.0 paymentPrice: 4389.0 ]
[INFO][main][2016-11-10 12:53:56][com.comany.log.generator.LogGenerator] - orderNumber: 750331478753636880 | orderDate: 2016-11-10 12:53:56 | paymentNumber: Wechat-44874259 | paymentDate: 2016-11-10 12:53:56 | merchantName: 暴雪公司 | sku: [ skuName: 人字拖鞋 skuNum: 1 skuCode: 26nl39of2h skuPrice: 299.0 totalSkuPrice: 299.0; skuName: 灰色連衣裙 skuNum: 1 skuCode: vhft1qmcgo skuPrice: 299.0 totalSkuPrice: 299.0; skuName: 灰色連衣裙 skuNum: 3 skuCode: drym8nikkb skuPrice: 899.0 totalSkuPrice: 2697.0; ] | price: [ totalPrice: 3295.0 discount: 20.0 paymentPrice: 3275.0 ]
[INFO][main][2016-11-10 12:53:56][com.comany.log.generator.LogGenerator] - orderNumber: 724421478753636881 | orderDate: 2016-11-10 12:53:56 | paymentNumber: Paypal-62225213 | paymentDate: 2016-11-10 12:53:56 | merchantName: 哈毒婦 | sku: [ skuName: 高腰闊腿休閒褲 skuNum: 1 skuCode: 43sqzs1ebd skuPrice: 399.0 totalSkuPrice: 399.0; skuName: 塑身牛仔褲 skuNum: 3 skuCode: h5lzonfqkq skuPrice: 299.0 totalSkuPrice: 897.0; skuName: 圓腳牛仔褲 skuNum: 3 skuCode: ifbhzs2s2d skuPrice: 1000.0 totalSkuPrice: 3000.0; ] | price: [ totalPrice: 4296.0 discount: 20.0 paymentPrice: 4276.0 ]

而後在上面的可視化界面中的 「Topic List」->"test1"->"Active Consumers"裏面可以看到"console-consumer-75910"

點進去可以看到:

也就是有新的消息到達了並被消費了。

還有可視化的界面展現:

 

 

把原來的topic都刪了。固然還須要把conf裏面的 delete.topic.enable改爲true.

bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test1

 

 

後面就要看怎麼安裝配置Storm了。看下一篇文章:http://www.cnblogs.com/charlesblc/p/6050565.html

 

另外,找到這個博客講了一些Kafka的內容,有時間能夠看看:

http://blog.csdn.net/lizhitao/article/category/2194509

 

(完)

相關文章
相關標籤/搜索