5三、Spark Streaming:輸入DStream之Kafka數據源實戰

1、基於Receiver的方式java

一、概述apache

基於Receiver的方式: Receiver是使用Kafka的高層次Consumer API來實現的。receiver從Kafka中獲取的數據都是存儲在Spark Executor的內存中的, 而後Spark Streaming啓動的job會去處理那些數據。 然而,在默認的配置下,這種方式可能會由於底層的失敗而丟失數據。若是要啓用高可靠機制,讓數據零丟失,就必須啓用Spark Streaming的 預寫日誌機制(Write Ahead Log,WAL)。該機制會同步地將接收到的Kafka數據寫入分佈式文件系統(好比HDFS)上的預寫日誌中。因此, 即便底層節點出現了失敗,也可使用預寫日誌中的數據進行恢復。 如何進行Kafka數據源鏈接? 1、在maven添加依賴 groupId = org.apache.spark artifactId = spark-streaming-kafka_2.10 version = 1.5.1

2、使用第三方工具類建立輸入DStream JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils.createStream(streamingContext, [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]); 須要注意的要點: 1、Kafka中的topic的partition,與Spark中的RDD的partition是沒有關係的。因此,在KafkaUtils.createStream()中, 提升partition的數量,只會增長一個Receiver中,讀取partition的線程的數量。不會增長Spark處理數據的並行度。 2、能夠建立多個Kafka輸入DStream,使用不一樣的consumer group和topic,來經過多個receiver並行接收數據。 3、若是基於容錯的文件系統,好比HDFS,啓用了預寫日誌機制,接收到的數據都會被複制一份到預寫日誌中。所以, 在KafkaUtils.createStream()中,設置的持久化級別是StorageLevel.MEMORY_AND_DISK_SER。 Kafka命令: bin/kafka-topics.sh --zookeeper 192.168.1.107:2181,192.168.1.108:2181,192.168.1.109:2181 --topic TestTopic --replication-factor 1 --partitions 1 --create bin/kafka-console-producer.sh --broker-list 192.168.1.107:9092,192.168.1.108:9092,192.168.1.109:9092 --topic TestTopic [ZK quorum: 192.168.1.191:2181,192.168.1.192:2181,192.168.1.193:2181

 

二、java版本api

package cn.spark.study.streaming; import java.util.Arrays; import java.util.HashMap; import java.util.Map; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; import scala.Tuple2; /** * 基於Kafka receiver方式的實時wordcount程序 * @author Administrator * */
public class KafkaReceiverWordCount { public static void main(String[] args) { SparkConf conf = new SparkConf() .setMaster("local[2]") .setAppName("KafkaWordCount"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5)); // 使用KafkaUtils.createStream()方法,建立針對Kafka的輸入數據流
        Map<String, Integer> topicThreadMap = new HashMap<String, Integer>(); // 使用多少個線程去拉取topic的數據
        topicThreadMap.put("WordCount", 1); // 這裏接收的四個參數;第一個:streamingContext // 第二個:ZK quorum; 第三個:consumer group id 能夠本身寫; // 第四個:per-topic number of Kafka partitions to consume
        JavaPairReceiverInputDStream<String, String> lines = KafkaUtils.createStream( jssc, "192.168.1.135:2181,192.168.1.136:2181,192.168.1.137:2181", "DefaultConsumerGroup", topicThreadMap); // wordcount邏輯
        JavaDStream<String> words = lines.flatMap( new FlatMapFunction<Tuple2<String,String>, String>() { private static final long serialVersionUID = 1L; @Override public Iterable<String> call(Tuple2<String, String> tuple) throws Exception { return Arrays.asList(tuple._2.split(" ")); } }); JavaPairDStream<String, Integer> pairs = words.mapToPair( new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } }); JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey( new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); wordCounts.print(); jssc.start(); jssc.awaitTermination(); jssc.close(); } } ##運行程序 ##新建一個topic [root@spark1 kafka]# bin/kafka-topics.sh --zookeeper 192.168.1.135:2181,192.168.1.136:2181,192.168.1.137:2181 --topic WordCount --replication-factor 1 --partitions 1 --create ##啓動生產者,而後能夠輸入一些數據,觀察程序端的輸出統計 [root@spark1 kafka]# bin/kafka-console-producer.sh --broker-list 192.168.1.135:9092,192.168.1.136:9092,192.168.1.137:9092 --topic WordCount

 

2、基於Direct的方式maven

一、概述分佈式

這種新的不基於Receiver的直接方式,是在Spark 1.3中引入的,從而可以確保更加健壯的機制。替代掉使用Receiver來接收數據後,這種方式會週期性地查詢Kafka,來得到 每一個topic+partition的最新的offset,從而定義每一個batch的offset的範圍。當處理數據的job啓動時,就會使用Kafka的簡單consumer api來獲取Kafka指定offset範圍的數據。 這種方式有以下優勢: 1、簡化並行讀取:若是要讀取多個partition,不須要建立多個輸入DStream而後對它們進行union操做。Spark會建立跟Kafka partition同樣多的RDD partition,而且會並行 從Kafka中讀取數據。因此在Kafka partition和RDD partition之間,有一個一對一的映射關係。 2、高性能:若是要保證零數據丟失,在基於receiver的方式中,須要開啓WAL機制。這種方式其實效率低下,由於數據實際上被複制了兩份,Kafka本身自己就有高可靠的機制, 會對數據複製一份,而這裏又會複製一份到WAL中。而基於direct的方式,不依賴Receiver,不須要開啓WAL機制,只要Kafka中做了數據的複製,那麼就能夠經過Kafka的副本進行恢復。 3、一次且僅一次的事務機制: 基於receiver的方式,是使用Kafka的高階API來在ZooKeeper中保存消費過的offset的。這是消費Kafka數據的傳統方式。這種方式配合着WAL機制能夠保證數據 零丟失的高可靠性,可是卻沒法保證數據被處理一次且僅一次,可能會處理兩次。由於Spark和ZooKeeper之間多是不一樣步的。 基於direct的方式,使用kafka的簡單api,Spark Streaming本身就負責追蹤消費的offset,並保存在checkpoint中。Spark本身必定是同步的,所以能夠保證數據 是消費一次且僅消費一次。 JavaPairReceiverInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(streamingContext, [key class], [value class], [key decoder class], [value decoder class], [map of Kafka parameters], [set of topics to consume]); Kafka命令: bin/kafka-topics.sh --zookeeper 192.168.1.107:2181,192.168.1.108:2181,192.168.1.109:2181 --topic TestTopic --replication-factor 1 --partitions 1 --create bin/kafka-console-producer.sh --broker-list 192.168.1.107:9092,192.168.1.108:9092,192.168.1.109:9092 --topic TestTopic 192.168.1.191:2181,192.168.1.192:2181,192.168.1.193:2181 metadata.broker.list


二、java版本ide

package cn.spark.study.streaming; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import kafka.serializer.StringDecoder; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaPairInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; import scala.Tuple2; /** * 基於Kafka Direct方式的實時wordcount程序 * @author Administrator * */
public class KafkaDirectWordCount { public static void main(String[] args) { SparkConf conf = new SparkConf() .setMaster("local[2]") .setAppName("KafkaDirectWordCount"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5)); // 首先,要建立一份kafka參數map
        Map<String, String> kafkaParams = new HashMap<String, String>(); kafkaParams.put("metadata.broker.list", "192.168.1.135:9092,192.168.1.136:9092,192.168.1.137:9092"); // 而後,要建立一個set,裏面放入,你要讀取的topic // 這個,就是咱們所說的,它本身給你作的很好,能夠並行讀取多個topic
        Set<String> topics = new HashSet<String>(); topics.add("WordCount"); // 建立輸入DStream
        JavaPairInputDStream<String, String> lines = KafkaUtils.createDirectStream( jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics); // 執行wordcount操做
        JavaDStream<String> words = lines.flatMap( new FlatMapFunction<Tuple2<String,String>, String>() { private static final long serialVersionUID = 1L; @Override public Iterable<String> call(Tuple2<String, String> tuple) throws Exception { return Arrays.asList(tuple._2.split(" ")); } }); JavaPairDStream<String, Integer> pairs = words.mapToPair( new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } }); JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey( new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); wordCounts.print(); jssc.start(); jssc.awaitTermination(); jssc.close(); } }
相關文章
相關標籤/搜索