Spark-Streaming獲取kafka數據的兩種方式-Receiver與Direct的方式,能夠從代碼中簡單理解成Receiver方式是經過zookeeper來鏈接kafka隊列,Direct方式是直接鏈接到kafka的節點上獲取數據了。java
1、基於Receiver的方式bootstrap
這種方式使用Receiver來獲取數據。Receiver是使用Kafka的高層次Consumer API來實現的。receiver從Kafka中獲取的數據都是存儲在Spark Executor的內存中的,而後Spark Streaming啓動的job會去處理那些數據。
然而,在默認的配置下,這種方式可能會由於底層的失敗而丟失數據。若是要啓用高可靠機制,讓數據零丟失,就必須啓用Spark Streaming的預寫日誌機制(Write Ahead Log,WAL)。該機制會同步地將接收到的Kafka數據寫入分佈式文件系統(好比HDFS)上的預寫日誌中。因此,即便底層節點出現了失敗,也可使用預寫日誌中的數據進行恢復。api
java代碼分佈式
JavaPairReceiverInputDStream<String,String> lines = KafkaUtils.createStream(jsc, kafkaZookeeper, kafkaGroup, topicmap,StorageLevel.MEMORY_AND_DISK_SER);
須要注意的要點ide
一、Kafka中的topic的partition,與Spark中的RDD的partition是沒有關係的。因此,在KafkaUtils.createStream()中,提升partition的數量,只會增長一個Receiver中,讀取partition的線程的數量。不會增長Spark處理數據的並行度。
二、能夠建立多個Kafka輸入DStream,使用不一樣的consumer group和topic,來經過多個receiver並行接收數據。
三、若是基於容錯的文件系統,好比HDFS,啓用了預寫日誌機制,接收到的數據都會被複制一份到預寫日誌中。所以,在KafkaUtils.createStream()中,設置的持久化級別是StorageLevel.MEMORY_AND_DISK_SER。性能
2、基於Direct的方式ui
這種新的不基於Receiver的直接方式,是在Spark 1.3中引入的,從而可以確保更加健壯的機制。替代掉使用Receiver來接收數據後,這種方式會週期性地查詢Kafka,來得到每一個topic+partition的最新的offset,從而定義每一個batch的offset的範圍。當處理數據的job啓動時,就會使用Kafka的簡單consumer api來獲取Kafka指定offset範圍的數據。線程
這種方式有以下優勢:
一、簡化並行讀取:若是要讀取多個partition,不須要建立多個輸入DStream而後對它們進行union操做。Spark會建立跟Kafka partition同樣多的RDD partition,而且會並行從Kafka中讀取數據。因此在Kafka partition和RDD partition之間,有一個一對一的映射關係。
二、高性能:若是要保證零數據丟失,在基於receiver的方式中,須要開啓WAL機制。這種方式其實效率低下,由於數據實際上被複制了兩份,Kafka本身自己就有高可靠的機制,會對數據複製一份,而這裏又會複製一份到WAL中。而基於direct的方式,不依賴Receiver,不須要開啓WAL機制,只要Kafka中做了數據的複製,那麼就能夠經過Kafka的副本進行恢復。
三、一次且僅一次的事務機制:
基於receiver的方式,是使用Kafka的高階API來在ZooKeeper中保存消費過的offset的。這是消費Kafka數據的傳統方式。這種方式配合着WAL機制能夠保證數據零丟失的高可靠性,可是卻沒法保證數據被處理一次且僅一次,可能會處理兩次。由於Spark和ZooKeeper之間多是不一樣步的。
基於direct的方式,使用kafka的簡單api,Spark Streaming本身就負責追蹤消費的offset,並保存在checkpoint中。Spark本身必定是同步的,所以能夠保證數據是消費一次且僅消費一次。日誌
java主要代碼:code
咱們主要用的是kafkaUtils.createDirectStream這個接口
JavaInputDStream<R> createDirectStream(JavaStreamingContext jssc, Class<K> keyClass, Class<V> valueClass, Class<KD> keyDecoderClass, Class<VD> valueDecoderClass, Class<R> recordClass, java.util.Map<String,String> kafkaParams, java.util.Map<kafka.common.TopicAndPartition,Long> fromOffsets, Function<kafka.message.MessageAndMetadata<K,V>,R> messageHandler) Parameters: jssc - JavaStreamingContext object keyClass - Class of the keys in the Kafka records valueClass - Class of the values in the Kafka records keyDecoderClass - Class of the key decoder valueDecoderClass - Class of the value decoder recordClass - Class of the records in DStream kafkaParams - Kafka configuration parameters. Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s) (NOT zookeeper servers), specified in host1:port1,host2:port2 form. fromOffsets - Per-topic/partition Kafka offsets defining the (inclusive) starting point of the stream messageHandler - Function for translating each message and metadata into the desired type Returns: DStream of R
JavaInputDStream<String> input = KafkaUtils.createDirectStream(jsc, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParams, consumerOffsets, //消費偏移量 new Function<MessageAndMetadata<String, String>, String>() { private static final long serialVersionUID = 1L; public String call(MessageAndMetadata<String, String> v1) throws Exception { return v1.message(); } });//讀取kafka數據 final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>(); JavaDStream<String> javaDStream = input.transform(new Function<JavaRDD<String>,JavaRDD<String>>(){ private static final long serialVersionUID = 1L; @Override public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception { // TODO Auto-generated method stub OffsetRange[] offsets = ((HasOffsetRanges)rdd.rdd()).offsetRanges(); offsetRanges.set(offsets); return rdd; } });//保存讀取偏移量 javaDStream.foreachRDD(new VoidFunction<JavaRDD<String>>(){ private static final long serialVersionUID = 1L; @Override public void call(JavaRDD<String> t) throws Exception { // TODO Auto-generated method stub if(t.isEmpty()){ return; } for (OffsetRange o : offsetRanges.get()) { System.out.println( o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset() ); ConsumerOffset.updateZkOffset(zookeeper,kafkaGroup,kafkaTopic,o.partition(),o.untilOffset()); } } });//更新偏移量