** 前言**html
最近工做中是作日誌分析的平臺,採用了sparkstreaming+kafka,採用kafka主要是看中了它對大數據量處理的高性能,處理日誌類應用再好不過了,採用了sparkstreaming的流處理框架 主要是考慮到它自己是基於spark核心的,之後的批處理能夠一站式服務,而且能夠提供準實時服務到elasticsearch中,能夠實現準實時定位系統日誌。java
** 實現**apache
Spark-Streaming獲取kafka數據的兩種方式-Receiver與Direct的方式。api
這種方式使用Receiver來獲取數據。Receiver是使用Kafka的高層次Consumer API來實現的。receiver從Kafka中獲取的數據都是存儲在Spark Executor的內存中的,而後Spark Streaming啓動的job會去處理那些數據。代碼以下:app
SparkConf sparkConf = new SparkConf().setAppName("log-etl").setMaster("local[4]"); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); int numThreads = Integer.parseInt("4"); Map<String, Integer> topicMap = new HashMap<String, Integer>(); topicMap.put("group-45", numThreads); //接收的參數分別是JavaStreamingConetxt,zookeeper鏈接地址,groupId,kafak的topic JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, "172.16.206.27:2181,172.16.206.28:2181,172.16.206.29:2181", "1", topicMap);
剛開始的時候系統正常運行,沒有發現問題,可是若是系統異常從新啓動sparkstreaming程序後,發現程序會重複處理已經處理過的數據,這種基於receiver的方式,是使用Kafka的高階API來在ZooKeeper中保存消費過的offset的。這是消費Kafka數據的傳統方式。這種方式配合着WAL機制能夠保證數據零丟失的高可靠性,可是卻沒法保證數據被處理一次且僅一次,可能會處理兩次。由於Spark和ZooKeeper之間多是不一樣步的。官方如今也已經不推薦這種整合方式,官網相關地址 http://spark.apache.org/docs/latest/streaming-kafka-integration.html ,下面咱們使用官網推薦的第二種方式kafkaUtils的createDirectStream()方式。框架
這種新的不基於Receiver的直接方式,是在Spark 1.3中引入的,從而可以確保更加健壯的機制。替代掉使用Receiver來接收數據後,這種方式會週期性地查詢Kafka,來得到每一個topic+partition的最新的offset,從而定義每一個batch的offset的範圍。當處理數據的job啓動時,就會使用Kafka的簡單consumer api來獲取Kafka指定offset範圍的數據。less
代碼以下:elasticsearch
SparkConf sparkConf = new SparkConf().setAppName("log-etl"); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2)); HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(","))); HashMap<String, String> kafkaParams = new HashMap<String, String>(); kafkaParams.put("metadata.broker.list", brokers); // Create direct kafka stream with brokers and topics JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream( jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet );
這種direct方式的優勢以下:性能
1.簡化並行讀取:若是要讀取多個partition,不須要建立多個輸入DStream而後對它們進行union操做。Spark會建立跟Kafka partition同樣多的RDD partition,而且會並行從Kafka中讀取數據。因此在Kafka partition和RDD partition之間,有一個一對一的映射關係。大數據
2.一次且僅一次的事務機制:基於receiver的方式,在spark和zk中通訊,頗有可能致使數據的不一致。
3.高效率:在receiver的狀況下,若是要保證數據的不丟失,須要開啓wal機制,這種方式下,爲、數據實際上被複制了兩份,一份在kafka自身的副本中,另一份要複製到wal中, direct方式下是不須要副本的。
貌似這種方式很完美,可是仍是有問題的,當業務須要重啓sparkstreaming程序的時候,業務日誌依然會打入到kafka中,當job重啓後只能從最新的offset開始消費消息,形成重啓過程當中的消息丟失。kafka中的offset以下圖(使用kafkaManager實時監控隊列中的消息):
‘’
當中止業務日誌的接受後,先重啓spark程序,可是發現job並無將先前打入到kafka中的數據消費掉。這是由於消息沒有通過zk,topic的offset也就沒有保存
通常有兩種方式處理這種問題,能夠先spark streaming 保存offset,使用spark checkpoint機制,第二種是程序中本身實現保存offset邏輯,我比較喜歡第二種方式,覺得這種方式可控,全部主動權都在本身手中。
先看下大致流程圖,
SparkConf sparkConf = new SparkConf().setMaster("local[2]").setAppName("log-etl"); Set<String> topicSet = new HashSet<String>(); topicSet.add("group-45"); kafkaParam.put("metadata.broker.list", "172.16.206.17:9092,172.16.206.31:9092,172.16.206.32:9092"); kafkaParam.put("group.id", "simple1"); // transform java Map to scala immutable.map scala.collection.mutable.Map<String, String> testMap = JavaConversions.mapAsScalaMap(kafkaParam); scala.collection.immutable.Map<String, String> scalaKafkaParam = testMap.toMap(new Predef.$less$colon$less<Tuple2<String, String>, Tuple2<String, String>>() { public Tuple2<String, String> apply(Tuple2<String, String> v1) { return v1; } }); // init KafkaCluster kafkaCluster = new KafkaCluster(scalaKafkaParam); scala.collection.mutable.Set<String> mutableTopics = JavaConversions.asScalaSet(topicSet); immutableTopics = mutableTopics.toSet(); scala.collection.immutable.Set<TopicAndPartition> topicAndPartitionSet2 = kafkaCluster.getPartitions(immutableTopics).right().get(); // kafka direct stream 初始化時使用的offset數據 Map<TopicAndPartition, Long> consumerOffsetsLong = new HashMap<TopicAndPartition, Long>(); // 沒有保存offset時(該group首次消費時), 各個partition offset 默認爲0 if (kafkaCluster.getConsumerOffsets(kafkaParam.get("group.id"), topicAndPartitionSet2).isLeft()) { System.out.println(kafkaCluster.getConsumerOffsets(kafkaParam.get("group.id"), topicAndPartitionSet2).left().get()); Set<TopicAndPartition> topicAndPartitionSet1 = JavaConversions.setAsJavaSet((scala.collection.immutable.Set)topicAndPartitionSet2); for (TopicAndPartition topicAndPartition : topicAndPartitionSet1) { consumerOffsetsLong.put(topicAndPartition, 0L); } } // offset已存在, 使用保存的offset else { scala.collection.immutable.Map<TopicAndPartition, Object> consumerOffsetsTemp = kafkaCluster.getConsumerOffsets("simple1", topicAndPartitionSet2).right().get(); Map<TopicAndPartition, Object> consumerOffsets = JavaConversions.mapAsJavaMap((scala.collection.immutable.Map)consumerOffsetsTemp); Set<TopicAndPartition> topicAndPartitionSet1 = JavaConversions.setAsJavaSet((scala.collection.immutable.Set)topicAndPartitionSet2); for (TopicAndPartition topicAndPartition : topicAndPartitionSet1) { Long offset = (Long)consumerOffsets.get(topicAndPartition); consumerOffsetsLong.put(topicAndPartition, offset); } } JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(5000)); kafkaParamBroadcast = jssc.sparkContext().broadcast(kafkaParam); // create direct stream JavaInputDStream<String> message = KafkaUtils.createDirectStream( jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParam, consumerOffsetsLong, new Function<MessageAndMetadata<String, String>, String>() { public String call(MessageAndMetadata<String, String> v1) throws Exception { System.out.println("接收到的數據《《==="+v1.message()); return v1.message(); } } ); // 獲得rdd各個分區對應的offset, 並保存在offsetRanges中 final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<OffsetRange[]>(); JavaDStream<String> javaDStream = message.transform(new Function<JavaRDD<String>, JavaRDD<String>>() { public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception { OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); offsetRanges.set(offsets); return rdd; } }); // output javaDStream.foreachRDD(new Function<JavaRDD<String>, Void>() { public Void call(JavaRDD<String> v1) throws Exception { if (v1.isEmpty()) return null; List<String> list = v1.collect(); for(String s:list){ System.out.println("數據==="+s); } for (OffsetRange o : offsetRanges.get()) { // 封裝topic.partition 與 offset對應關係 java Map TopicAndPartition topicAndPartition = new TopicAndPartition(o.topic(), o.partition()); Map<TopicAndPartition, Object> topicAndPartitionObjectMap = new HashMap<TopicAndPartition, Object>(); topicAndPartitionObjectMap.put(topicAndPartition, o.untilOffset()); // 轉換java map to scala immutable.map scala.collection.mutable.Map<TopicAndPartition, Object> testMap = JavaConversions.mapAsScalaMap(topicAndPartitionObjectMap); scala.collection.immutable.Map<TopicAndPartition, Object> scalatopicAndPartitionObjectMap = testMap.toMap(new Predef.$less$colon$less<Tuple2<TopicAndPartition, Object>, Tuple2<TopicAndPartition, Object>>() { public Tuple2<TopicAndPartition, Object> apply(Tuple2<TopicAndPartition, Object> v1) { return v1; } }); // 更新offset到kafkaCluster kafkaCluster.setConsumerOffsets(kafkaParamBroadcast.getValue().get("group.id"), scalatopicAndPartitionObjectMap); System.out.println("原數據====》"+o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset() ); } return null; } }); jssc.start(); jssc.awaitTermination(); }
基本使用這種方式就能夠解決數據丟失的問題。