以爲有用的話,點個贊啊~~~ O(∩_∩)O~~html
業務背景
經過實時抽取華爲ESIGHT系統的wifi數據,與校園的學生數據、課程數據、地理位置數據等進行關聯,進行校園大數據的流數據處理與分析。java
技術選型
- Kafka調用ESIGHT的resutful API,接入無線數據;
- Sparkstreaming將流數據與Hive中的其餘校園數據關聯分析
- 使用ES For Hadoop將分析結果導出到ES集羣中
Kafka Producer
技術常規,使用kafka接入ESIGHT數據,只須要注意apache
- 默認的分區方法是否產生數據偏移
- 若是偏移須要自定義kafka.producer.Partitioner
SparkStreaming 接收Kafka數據流
用spark streaming流式處理kafka中的數據,第一步固然是先把數據接收過來,轉換爲spark streaming中的數據結構Dstream。
接收數據的方式有兩種:api
基於Receiver接收數據
這種方式使用Receiver來獲取數據。Receiver是使用Kafka的高層次Consumer API來實現的。receiver從Kafka中獲取的數據都是存儲在Spark Executor的內存中的,而後Spark Streaming啓動的job會去處理那些數據。
然而,在默認的配置下,這種方式可能會由於底層的失敗而丟失數據。若是要啓用高可靠機制,讓數據零丟失,就必須啓用Spark Streaming的預寫日誌機制(Write Ahead Log,WAL)。該機制會同步地將接收到的Kafka數據寫入分佈式文件系統(好比HDFS)上的預寫日誌中。因此,即便底層節點出現了失敗,也可使用預寫日誌中的數據進行恢復。數組
須要注意的問題有:緩存
- 在Receiver的方式中,Spark中的partition和kafka中的partition並非相關的,因此若是咱們加大每一個topic的partition數量,僅僅是增長線程來處理由單一Receiver消費的主題。可是這並無增長Spark在處理數據上的並行度。
- 對於不一樣的Group和topic咱們可使用多個Receiver建立不一樣的Dstream來並行接收數據,以後能夠利用union來統一成一個Dstream。
- 若是咱們啓用了Write Ahead Logs複製到文件系統如HDFS,那麼storage level須要設置成 StorageLevel.MEMORY_AND_DISK_SER,也就是KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER
直連方式讀取kafka數據
這種新的不基於Receiver的直接方式,是在Spark 1.3以後引入的,從而可以確保更加健壯的機制。替代掉使用Receiver來接收數據後,這種方式會週期性地查詢Kafka,來得到每一個topic+partition的最新的offset,從而定義每一個batch的offset的範圍。當處理數據的job啓動時,就會使用Kafka的簡單consumer api來獲取Kafka指定offset範圍的數據。ruby
這種方式有以下優勢:markdown
-
簡化並行讀取:若是要讀取多個partition,不須要建立多個輸入DStream而後對它們進行union操做。Spark會建立跟Kafka partition同樣多的RDD partition,而且會並行從Kafka中讀取數據。因此在Kafka partition和RDD partition之間,有一個一對一的映射關係。網絡
-
高性能:若是要保證零數據丟失,在基於receiver的方式中,須要開啓WAL機制。這種方式其實效率低下,由於數據實際上被複制了兩份,Kafka本身自己就有高可靠的機制,會對數據複製一份,而這裏又會複製一份到WAL中。而基於direct的方式,不依賴Receiver,不須要開啓WAL機制,只要Kafka中做了數據的複製,那麼就能夠經過Kafka的副本進行恢復。數據結構
-
一次且僅一次(extract-once)的事務機制: 基於receiver的方式,是使用Kafka的高階API來在ZooKeeper中保存消費過的offset的。這是消費Kafka數據的傳統方式。這種方式配合着WAL機制能夠保證數據零丟失的高可靠性,可是卻沒法保證數據被處理一次且僅一次,可能會處理兩次。由於Spark和ZooKeeper之間多是不一樣步的。 基於direct的方式,使用kafka的簡單api,Spark Streaming本身就負責追蹤消費的offset,並保存在checkpoint中。Spark本身必定是同步的,所以能夠保證數據是消費一次且僅消費一次。
Direct鏈接示例
import org.apache.spark.streaming.kafka.*;
JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(streamingContext, [key class], [value class], [key decoder class], [value decoder class], [map of Kafka parameters], [set of topics to consume]);
但Direct鏈接方式爲了可以進行異常恢復,須要考慮如何維護KafkaOffset的問題。一般由兩種方式維護
- 使用Spark的checkpoint機制,根據須要按期checkpoint並恢復。因爲項目使用SparkSQL從Hive中拉取數據,可能因爲SparkSQLContext的恢復處理不當,在恢復的時候會失敗;
- 經過SparkStreaming的API在Zookeeper中維護Kafka的Offset
使用Zookeeper維護KafkaOffset示例
import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.HasOffsetRanges; import org.apache.spark.streaming.kafka.KafkaCluster; import org.apache.spark.streaming.kafka.KafkaUtils; import org.apache.spark.streaming.kafka.OffsetRange; import com.sugon.smartcampus.etl.wifi.conf.WIFIConfig; import kafka.common.TopicAndPartition; import kafka.message.MessageAndMetadata; import kafka.serializer.StringDecoder; import scala.Predef; import scala.Tuple2; import scala.collection.JavaConversions; import lombok.extern.slf4j.*; @Slf4j public class KafkaOffsetExample { private static KafkaCluster kafkaCluster = null; private static HashMap<String, String> kafkaParam = new HashMap<String, String>(); private static Broadcast<HashMap<String, String>> kafkaParamBroadcast = null; private static scala.collection.immutable.Set<String> immutableTopics = null; /** * Create the Kafka Stream Directly With Offset in ZK * * @param jssc * SparkStreamContext * @param consumerOffsetsLong * Save the Offset of Kafka Topic * @return */ private static JavaInputDStream<String> createKafkaDStream(JavaStreamingContext jssc, Map<TopicAndPartition, Long> consumerOffsetsLong) { KafkaOffsetExample.log.warn("Create KafkaDriectStream with Offset"); JavaInputDStream<String> message = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParamBroadcast.getValue(), consumerOffsetsLong, new Function<MessageAndMetadata<String, String>, String>() { private static final long serialVersionUID = 1L; @Override public String call(MessageAndMetadata<String, String> v1) throws Exception { return v1.message(); } }); return message; } private static Map<TopicAndPartition, Long> initConsumerOffset(String topic) { Set<String> topicSet = new HashSet<String>(); topicSet.add(topic); scala.collection.mutable.Set<String> mutableTopics = JavaConversions.asScalaSet(topicSet); immutableTopics = mutableTopics.toSet(); scala.collection.immutable.Set<TopicAndPartition> topicAndPartitionSet = kafkaCluster .getPartitions(immutableTopics).right().get(); // kafka direct stream 初始化時使用的offset數據 Map<TopicAndPartition, Long> consumerOffsetsLong = new HashMap<TopicAndPartition, Long>(); if (kafkaCluster.getConsumerOffsets(kafkaParam.get("group.id"), topicAndPartitionSet).isLeft()) { KafkaOffsetExample.log.warn("沒有保存offset, 各個partition offset 默認爲0"); Set<TopicAndPartition> topicAndPartitionSet1 = JavaConversions.setAsJavaSet(topicAndPartitionSet); for (TopicAndPartition topicAndPartition : topicAndPartitionSet1) { consumerOffsetsLong.put(topicAndPartition, 0L); } } else { KafkaOffsetExample.log.warn("offset已存在, 使用保存的offset"); scala.collection.immutable.Map<TopicAndPartition, Object> consumerOffsetsTemp = kafkaCluster .getConsumerOffsets(kafkaParam.get("group.id"), topicAndPartitionSet).right().get(); Map<TopicAndPartition, Object> consumerOffsets = JavaConversions.mapAsJavaMap(consumerOffsetsTemp); Set<TopicAndPartition> topicAndPartitionSet1 = JavaConversions.setAsJavaSet(topicAndPartitionSet); KafkaOffsetExample.log.warn("put data in consumerOffsetsLong"); for (TopicAndPartition topicAndPartition : topicAndPartitionSet1) { Long offset = (Long) consumerOffsets.get(topicAndPartition); consumerOffsetsLong.put(topicAndPartition, offset); } } return consumerOffsetsLong; } private static JavaDStream<String> getAndUpdateKafkaOffset(JavaInputDStream<String> message, AtomicReference<OffsetRange[]> offsetRanges) { JavaDStream<String> javaDStream = message.transform(new Function<JavaRDD<String>, JavaRDD<String>>() { private static final long serialVersionUID = 1L; public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception { OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); offsetRanges.set(offsets); for (int i = 0; i < offsets.length; i++) KafkaOffsetExample.log.warn("topic : {}, partitions: {}, fromoffset: {}, untiloffset: {}", offsets[i].topic(), offsets[i].partition(), offsets[i].fromOffset(), offsets[i].untilOffset()); return rdd; } }); KafkaOffsetExample.log.warn("foreachRDD"); // output javaDStream.foreachRDD(new VoidFunction<JavaRDD<String>>() { private static final long serialVersionUID = 1L; public void call(JavaRDD<String> rdd) throws Exception { if (rdd.isEmpty()) { KafkaOffsetExample.log.warn("Empty RDD"); return; } for (OffsetRange o : offsetRanges.get()) { // 封裝topic.partition 與 offset對應關係 java Map TopicAndPartition topicAndPartition = new TopicAndPartition(o.topic(), o.partition()); Map<TopicAndPartition, Object> topicAndPartitionObjectMap = new HashMap<TopicAndPartition, Object>(); topicAndPartitionObjectMap.put(topicAndPartition, o.untilOffset()); KafkaOffsetExample.log.warn( "Topic: " + o.topic() + " partitions: " + o.partition() + " offset : " + o.untilOffset()); // 轉換java map to scala immutable.map scala.collection.mutable.Map<TopicAndPartition, Object> testMap = JavaConversions .mapAsScalaMap(topicAndPartitionObjectMap); scala.collection.immutable.Map<TopicAndPartition, Object> scalatopicAndPartitionObjectMap = testMap .toMap(new Predef.$less$colon$less<Tuple2<TopicAndPartition, Object>, Tuple2<TopicAndPartition, Object>>() { private static final long serialVersionUID = 1L; @Override public Tuple2<TopicAndPartition, Object> apply(Tuple2<TopicAndPartition, Object> v1) { return v1; } }); // 更新offset到kafkaCluster kafkaCluster.setConsumerOffsets(kafkaParamBroadcast.getValue().get("group.id"), scalatopicAndPartitionObjectMap); } } }); return javaDStream; } private static void initKafkaParams() { kafkaParam.put("metadata.broker.list", WIFIConfig.BROKER_LIST); kafkaParam.put("zookeeper.connect", WIFIConfig.ZK_CONNECT); kafkaParam.put("auto.offset.reset", WIFIConfig.AUTO_OFFSET_RESET); kafkaParam.put("group.id", WIFIConfig.GROUP_ID); } private static KafkaCluster initKafkaCluster() { KafkaOffsetExample.log.warn("transform java Map to scala immutable.map"); // transform java Map to scala immutable.map scala.collection.mutable.Map<String, String> testMap = JavaConversions.mapAsScalaMap(kafkaParam); scala.collection.immutable.Map<String, String> scalaKafkaParam = testMap .toMap(new Predef.$less$colon$less<Tuple2<String, String>, Tuple2<String, String>>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, String> apply(Tuple2<String, String> arg0) { return arg0; } }); // init KafkaCluster KafkaOffsetExample.log.warn("Init KafkaCluster"); return new KafkaCluster(scalaKafkaParam); } public static void run() { initKafkaParams(); kafkaCluster = initKafkaCluster(); SparkConf sparkConf = new SparkConf().setMaster("local[4]").setAppName("tachyon-test-consumer"); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(5000)); // 獲得rdd各個分區對應的offset, 並保存在offsetRanges中 KafkaOffsetExample.log.warn("initConsumer Offset"); Map<TopicAndPartition, Long> consumerOffsetsLong = initConsumerOffset(WIFIConfig.KAFKA_TOPIC); kafkaParamBroadcast = jssc.sparkContext().broadcast(kafkaParam); JavaInputDStream<String> message = createKafkaDStream(jssc, consumerOffsetsLong); final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<OffsetRange[]>(); JavaDStream<String> javaDStream = getAndUpdateKafkaOffset(message, offsetRanges); javaDStream.print(); jssc.start(); try { jssc.awaitTermination(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) throws Exception { String testPath = "E:\\javaCodes\\svn\\SmartCampus\\Trunk\\smartcampus.etl.wifi\\src\\main\\resources\\WifiConfig.yaml"; WIFIConfig.init(testPath); KafkaOffsetExample.log.warn(WIFIConfig.toStr()); KafkaOffsetExample.run(); } }
SparkStreaming 數據處理
根據須要,將流式數據與Hive中的靜態數據關聯,結果經過Elasticsearch For Hadoop導出到ES集羣中。
若是靜態數據須要定時更新,能夠在建立數據流後,在foreachRDD邏輯中,根據實際狀況按期更新靜態數據。
調優
因爲我的經驗較少,處理的數據量不大,如下內容大可能是紙上談兵,僅供參考。
合理的批處理時間(batchDuration)
- 幾乎全部的Spark Streaming調優文檔都會說起批處理時間的調整,在StreamingContext初始化的時候,有一個參數即是批處理時間的設定。
- 若是這個值設置的太短,即個batchDuration所產生的Job並不能在這期間完成處理,那麼就會形成數據不斷堆積,最終致使Spark Streaming發生阻塞。
- 通常對於batchDuration的設置不會小於500ms,由於太小會致使SparkStreaming頻繁的提交做業,對整個streaming形成額外的負擔。
-
在平時的應用中,根據不一樣的應用場景和硬件配置,我設在1~10s之間,咱們能夠根據SparkStreaming的可視化監控界面,觀察Total Delay來進行batchDuration的調整,直達SparkStreaming剛剛能及時處理完上一個批處理的數據,這樣就是目前狀況的最優值。
合理的Kafka拉取量(maxRatePerPartition重要)
spark.streaming.kafka.maxRatePerPartition參數配置指定了每秒每個topic的每個分區獲取的最大消息數。
對於Spark Streaming消費kafka中數據的應用場景,這個配置是很是關鍵的。這個參數默認是沒有上限的,即kafka當中有多少數據它就會直接所有拉出。而根據生產者寫入Kafka的速率以及消費者自己處理數據的速度,同時這個參數須要結合上面的batchDuration,使得每一個partition拉取在每一個batchDuration期間拉取的數據可以順利的處理完畢,作到儘量高的吞吐量,而這個參數的調整能夠參考可視化監控界面中的Input Rate和Processing Time。
緩存反覆使用的Dstream(RDD)
Spark中的RDD和SparkStreaming中的Dstream,若是被反覆的使用,最好利用cache(),將該數據流緩存起來,防止過分的調度資源形成的網絡開銷。能夠參考觀察Scheduling Delay參數。
設置合理的GC
長期使用Java的小夥伴都知道,JVM中的垃圾回收機制,可讓咱們不過多的關注與內存的分配回收,更加專一於業務邏輯,JVM都會爲咱們搞定。對JVM有些瞭解的小夥伴應該知道,在Java虛擬機中,將內存分爲了初生代(eden generation)、年輕代(young generation)、老年代(old generation)以及永久代(permanent generation),其中每次GC都是須要耗費必定時間的,尤爲是老年代的GC回收,須要對內存碎片進行整理,一般採用標記-清楚的作法。一樣的在Spark程序中,JVM GC的頻率和時間也是影響整個Spark效率的關鍵因素。在一般的使用中建議:
設置年老代爲併發收集。
--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"
設置合理的CPU資源數
CPU的core數量,每一個executor能夠佔用一個或多個core,能夠經過觀察CPU的使用率變化來了解計算資源的使用狀況,例如,很常見的一種浪費是一個executor佔用了多個core,可是總的CPU使用率卻不高(由於一個executor並不總能充分利用多核的能力),這個時候能夠考慮讓麼個executor佔用更少的core,同時worker下面增長更多的executor,或者一臺host上面增長更多的worker來增長並行執行的executor的數量,從而增長CPU利用率。
可是增長executor的時候須要考慮好內存消耗,由於一臺機器的內存分配給越多的executor,每一個executor的內存就越小,以至出現過多的數據spill over甚至out of memory的狀況。
設置合理的parallelism
partition和parallelism,partition指的就是數據分片的數量,每一次task只能處理一個partition的數據,這個值過小了會致使每片數據量太大,致使內存壓力,或者諸多executor的計算能力沒法利用充分;可是若是太大了則會致使分片太多,執行效率下降。在執行action類型操做的時候(好比各類reduce操做),partition的數量會選擇parent RDD中最大的那一個。而parallelism則指的是在RDD進行reduce類操做的時候,默認返回數據的paritition數量(而在進行map類操做的時候,partition數量一般取自parent RDD中較大的一個,並且也不會涉及shuffle,所以這個parallelism的參數沒有影響)。因此說,這兩個概念密切相關,都是涉及到數據分片的,做用方式實際上是統一的。經過spark.default.parallelism能夠設置默認的分片數量,而不少RDD的操做均可以指定一個partition參數來顯式控制具體的分片數量。 在SparkStreaming+kafka的使用中,咱們採用了Direct鏈接方式,前文闡述過Spark中的partition和Kafka中的Partition是一一對應的,咱們通常默認設置爲Kafka中Partition的數量。
使用高性能的算子
這裏參考了美團技術團隊的博文,並無作過具體的性能測試,其建議以下:
-
使用reduceByKey/aggregateByKey替代groupByKey
-
使用mapPartitions替代普通map
-
使用foreachPartitions替代foreach
-
使用filter以後進行coalesce操做
-
使用repartitionAndSortWithinPartitions替代repartition與sort類操做
-
使用Kryo優化序列化性能 這個優化原則我自己也沒有通過測試,可是好多優化文檔有提到,這裏也記錄下來。 在Spark中,主要有三個地方涉及到了序列化:
-
在算子函數中使用到外部變量時,該變量會被序列化後進行網絡傳輸。
-
將自定義的類型做爲RDD的泛型類型時(好比JavaRDD,Student是自定義類型),全部自定義類型對象,都會進行序列化。所以這種狀況下,也要求自定義的類必須實現Serializable接口。
-
使用可序列化的持久化策略時(好比MEMORY_ONLY_SER),Spark會將RDD中的每一個partition都序列化成一個大的字節數組。
對於這三種出現序列化的地方,咱們均可以經過使用Kryo序列化類庫,來優化序列化和反序列化的性能。Spark默認使用的是Java的序列化機制,也就是ObjectOutputStream/ObjectInputStream API來進行序列化和反序列化。可是Spark同時支持使用Kryo序列化庫,Kryo序列化類庫的性能比Java序列化類庫的性能要高不少。
官方介紹,Kryo序列化機制比Java序列化機制,性能高10倍左右。Spark之因此默認沒有使用Kryo做爲序列化類庫,是由於Kryo要求最好要註冊全部須要進行序列化的自定義類型,所以對於開發者來講,這種方式比較麻煩。
如下是使用Kryo的代碼示例,咱們只要設置序列化類,再註冊要序列化的自定義類型便可(好比算子函數中使用到的外部變量類型、做爲RDD泛型類型的自定義類型等):
// 建立SparkConf對象。 val conf = new SparkConf().setMaster(...).setAppName(...) // 設置序列化器爲KryoSerializer。 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // 註冊要序列化的自定義類型。 conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))