kafka 隨記

1.查看版本兼容問題
  https://www.cloudera.com/documentation/enterprise/release-notes/topics/rn_consolidated_pcm.html#pcm_kafka
  
2.下載CSD包KAFKA-1.2.0.jar
   http://archive.cloudera.com/csds/kafka/
   上傳到/opt/cloudera/csd 目錄下
   
3.下載KAFKA-3.0.0-1.3.0.0.p0.40-el6.parcel 和 KAFKA-3.0.0-1.3.0.0.p0.40-el6.parcel.sha一、manifest.json
  http://archive.cloudera.com/kafka/parcels/latest/
  上傳到/opt/cloudera/parcel-repo 目錄下html

##建立topic replication-factor 副本數
kafka-topics --create --topic kafka-test --zookeeper host0:2181,host2:2181,host7:2181 --partitions 1 --replication-factor 1
#列出topic
kafka-topics --list --zookeeper host0:2181,host2:2181,host7:2181
#查看topic
kafka-topics --zookeeper host0:2181,host2:2181,host7:2181 --topic "direct_test" --describe
#啓動kafka 生產者
kafka-console-producer --broker-list host0:9092,host2:9092,host7:9092 --topic direct_test
##啓動消費者
kafka-console-consumer --zookeeper host0:2181,host2:2181,host7:2181 --topic direct_test
##刪除topic
kafka-topics --delete --zookeeper host0:2181,host2:2181,host7:2181 --topic direct_test
##kafka 修改分區數量
kafka-topics --alter --zookeeper host0:2181,host2:2181,host7:2181  --partitions 5 --topic flink_hbase
kafka-topics --alter --zookeeper host0:2181,host2:2181,host7:2181  --replication-factor 3 --topic flink_hbase    
##查詢offset的最小值:
kafka-run-class kafka.tools.GetOffsetShell --broker-list host0:9092,host2:9092,host7:9092 -topic flink_hbase --time -2
##查詢offset的最大值:
kafka-run-class kafka.tools.GetOffsetShell --broker-list host0:9092,host2:9092,host7:9092 -topic flink_hbase --time -1
##kafka查看topic各個分區的消息的信息
kafka-run-class kafka.tools.ConsumerOffsetChecker  --group client_kafka-test_0 --topic kafka-test  --zookeeper host0:2181,host2:2181,host7:2181/
##刪除group
ls  /consumers
rmr  /consumers/group1shell

經過下面命令設置consumer group:DynamicRangeGroup topic:DynamicRange partition:0的offset爲1288:
set /consumers/DynamicRangeGroup/offsets/DynamicRange/0 1288apache

spark-shell --master local[2] -jars /root/tanj/kafka_2.11-0.8.2.1.jar,/root/tanj/spark-streaming-kafka_2.11-1.6.0.jarjson

import org.apache.spark.SparkConf
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Durations, StreamingContext}bootstrap

val ssc = new StreamingContext(sc, Durations.seconds(5))
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc
  , Map("bootstrap.servers" -> "host0:2181,host2:2181,host7:2181"
  , "metadata.broker.list" -> "host0:9092,host2:9092,host7:9092"
  , "group.id" -> "StreamingWordCountSelfKafkaDirectStreamScala")
  , Set("direct_test")).map(t => t._2).flatMap(_.toString.split(" ")).map((_, 1)).reduceByKey(_ + _).print()
ssc.start()學習

學習資料:
http://blog.csdn.net/sun_qiangwei/article/details/52080147#t2
http://blog.csdn.net/qq_21234493/article/details/51340138spa

相關文章
相關標籤/搜索