SparkStreaming+Kafka 處理實時WIFI數據

 

 


以爲有用的話,點個贊啊~~~ O(∩_∩)O~~html


業務背景

經過實時抽取華爲ESIGHT系統的wifi數據,與校園的學生數據、課程數據、地理位置數據等進行關聯,進行校園大數據的流數據處理與分析。java

技術選型

  • Kafka調用ESIGHT的resutful API,接入無線數據;
  • Sparkstreaming將流數據與Hive中的其餘校園數據關聯分析
  • 使用ES For Hadoop將分析結果導出到ES集羣中

Kafka Producer

技術常規,使用kafka接入ESIGHT數據,只須要注意apache

  • 默認的分區方法是否產生數據偏移
  • 若是偏移須要自定義kafka.producer.Partitioner

SparkStreaming 接收Kafka數據流

用spark streaming流式處理kafka中的數據,第一步固然是先把數據接收過來,轉換爲spark streaming中的數據結構Dstream。
接收數據的方式有兩種:api

基於Receiver接收數據

這種方式使用Receiver來獲取數據。Receiver是使用Kafka的高層次Consumer API來實現的。receiver從Kafka中獲取的數據都是存儲在Spark Executor的內存中的,而後Spark Streaming啓動的job會去處理那些數據。
然而,在默認的配置下,這種方式可能會由於底層的失敗而丟失數據。若是要啓用高可靠機制,讓數據零丟失,就必須啓用Spark Streaming的預寫日誌機制(Write Ahead Log,WAL)。該機制會同步地將接收到的Kafka數據寫入分佈式文件系統(好比HDFS)上的預寫日誌中。因此,即便底層節點出現了失敗,也可使用預寫日誌中的數據進行恢復。數組

須要注意的問題有:緩存

  • 在Receiver的方式中,Spark中的partition和kafka中的partition並非相關的,因此若是咱們加大每一個topic的partition數量,僅僅是增長線程來處理由單一Receiver消費的主題。可是這並無增長Spark在處理數據上的並行度。
  • 對於不一樣的Group和topic咱們可使用多個Receiver建立不一樣的Dstream來並行接收數據,以後能夠利用union來統一成一個Dstream。
  • 若是咱們啓用了Write Ahead Logs複製到文件系統如HDFS,那麼storage level須要設置成 StorageLevel.MEMORY_AND_DISK_SER,也就是KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER

直連方式讀取kafka數據

這種新的不基於Receiver的直接方式,是在Spark 1.3以後引入的,從而可以確保更加健壯的機制。替代掉使用Receiver來接收數據後,這種方式會週期性地查詢Kafka,來得到每一個topic+partition的最新的offset,從而定義每一個batch的offset的範圍。當處理數據的job啓動時,就會使用Kafka的簡單consumer api來獲取Kafka指定offset範圍的數據。ruby

這種方式有以下優勢:markdown

  • 簡化並行讀取:若是要讀取多個partition,不須要建立多個輸入DStream而後對它們進行union操做。Spark會建立跟Kafka partition同樣多的RDD partition,而且會並行從Kafka中讀取數據。因此在Kafka partition和RDD partition之間,有一個一對一的映射關係。網絡

  • 高性能:若是要保證零數據丟失,在基於receiver的方式中,須要開啓WAL機制。這種方式其實效率低下,由於數據實際上被複制了兩份,Kafka本身自己就有高可靠的機制,會對數據複製一份,而這裏又會複製一份到WAL中。而基於direct的方式,不依賴Receiver,不須要開啓WAL機制,只要Kafka中做了數據的複製,那麼就能夠經過Kafka的副本進行恢復。數據結構

  • 一次且僅一次(extract-once)的事務機制: 基於receiver的方式,是使用Kafka的高階API來在ZooKeeper中保存消費過的offset的。這是消費Kafka數據的傳統方式。這種方式配合着WAL機制能夠保證數據零丟失的高可靠性,可是卻沒法保證數據被處理一次且僅一次,可能會處理兩次。由於Spark和ZooKeeper之間多是不一樣步的。 基於direct的方式,使用kafka的簡單api,Spark Streaming本身就負責追蹤消費的offset,並保存在checkpoint中。Spark本身必定是同步的,所以能夠保證數據是消費一次且僅消費一次。

Direct鏈接示例

import org.apache.spark.streaming.kafka.*;

 JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(streamingContext, [key class], [value class], [key decoder class], [value decoder class], [map of Kafka parameters], [set of topics to consume]);

但Direct鏈接方式爲了可以進行異常恢復,須要考慮如何維護KafkaOffset的問題。一般由兩種方式維護

  • 使用Spark的checkpoint機制,根據須要按期checkpoint並恢復。因爲項目使用SparkSQL從Hive中拉取數據,可能因爲SparkSQLContext的恢復處理不當,在恢復的時候會失敗;
  • 經過SparkStreaming的API在Zookeeper中維護Kafka的Offset

使用Zookeeper維護KafkaOffset示例


import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.HasOffsetRanges;
import org.apache.spark.streaming.kafka.KafkaCluster;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.OffsetRange;

import com.sugon.smartcampus.etl.wifi.conf.WIFIConfig;

import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import scala.Predef;
import scala.Tuple2;
import scala.collection.JavaConversions;
import lombok.extern.slf4j.*;

@Slf4j
public class KafkaOffsetExample {
	private static KafkaCluster kafkaCluster = null;
	private static HashMap<String, String> kafkaParam = new HashMap<String, String>();
	private static Broadcast<HashMap<String, String>> kafkaParamBroadcast = null;
	private static scala.collection.immutable.Set<String> immutableTopics = null;

	/** * Create the Kafka Stream Directly With Offset in ZK * * @param jssc * SparkStreamContext * @param consumerOffsetsLong * Save the Offset of Kafka Topic * @return */
	private static JavaInputDStream<String> createKafkaDStream(JavaStreamingContext jssc,
			Map<TopicAndPartition, Long> consumerOffsetsLong) {
		KafkaOffsetExample.log.warn("Create KafkaDriectStream with Offset");
		JavaInputDStream<String> message = KafkaUtils.createDirectStream(jssc, String.class, String.class,
				StringDecoder.class, StringDecoder.class, String.class, kafkaParamBroadcast.getValue(),
				consumerOffsetsLong, new Function<MessageAndMetadata<String, String>, String>() {
					private static final long serialVersionUID = 1L;

					@Override
					public String call(MessageAndMetadata<String, String> v1) throws Exception {
						return v1.message();
					}
				});
		return message;
	}

	private static Map<TopicAndPartition, Long> initConsumerOffset(String topic) {
		Set<String> topicSet = new HashSet<String>();
		topicSet.add(topic);
		scala.collection.mutable.Set<String> mutableTopics = JavaConversions.asScalaSet(topicSet);
		immutableTopics = mutableTopics.toSet();
		scala.collection.immutable.Set<TopicAndPartition> topicAndPartitionSet = kafkaCluster
				.getPartitions(immutableTopics).right().get();
		
		// kafka direct stream 初始化時使用的offset數據
		Map<TopicAndPartition, Long> consumerOffsetsLong = new HashMap<TopicAndPartition, Long>();
		if (kafkaCluster.getConsumerOffsets(kafkaParam.get("group.id"), topicAndPartitionSet).isLeft()) {
			KafkaOffsetExample.log.warn("沒有保存offset, 各個partition offset 默認爲0");
			Set<TopicAndPartition> topicAndPartitionSet1 = JavaConversions.setAsJavaSet(topicAndPartitionSet);
			for (TopicAndPartition topicAndPartition : topicAndPartitionSet1) {
				consumerOffsetsLong.put(topicAndPartition, 0L);
			}
		}
		else {
			KafkaOffsetExample.log.warn("offset已存在, 使用保存的offset");
			scala.collection.immutable.Map<TopicAndPartition, Object> consumerOffsetsTemp = kafkaCluster
					.getConsumerOffsets(kafkaParam.get("group.id"), topicAndPartitionSet).right().get();

			Map<TopicAndPartition, Object> consumerOffsets = JavaConversions.mapAsJavaMap(consumerOffsetsTemp);
			Set<TopicAndPartition> topicAndPartitionSet1 = JavaConversions.setAsJavaSet(topicAndPartitionSet);

			KafkaOffsetExample.log.warn("put data in consumerOffsetsLong");
			for (TopicAndPartition topicAndPartition : topicAndPartitionSet1) {
				Long offset = (Long) consumerOffsets.get(topicAndPartition);
				consumerOffsetsLong.put(topicAndPartition, offset);
			}
		}
		return consumerOffsetsLong;
	}
	
	private static JavaDStream<String> getAndUpdateKafkaOffset(JavaInputDStream<String> message, 
			AtomicReference<OffsetRange[]> offsetRanges) {
		JavaDStream<String> javaDStream = message.transform(new Function<JavaRDD<String>, JavaRDD<String>>() {
			private static final long serialVersionUID = 1L;
			public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception {
				OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
				offsetRanges.set(offsets);
				for (int i = 0; i < offsets.length; i++)
					KafkaOffsetExample.log.warn("topic : {}, partitions: {}, fromoffset: {}, untiloffset: {}",
							offsets[i].topic(), offsets[i].partition(), offsets[i].fromOffset(),
							offsets[i].untilOffset());
				return rdd;
			}
		});
		KafkaOffsetExample.log.warn("foreachRDD");
		// output
		javaDStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
			private static final long serialVersionUID = 1L;

			public void call(JavaRDD<String> rdd) throws Exception {
				if (rdd.isEmpty()) {
					KafkaOffsetExample.log.warn("Empty RDD");
					return;
				}
				for (OffsetRange o : offsetRanges.get()) {
					// 封裝topic.partition 與 offset對應關係 java Map
					TopicAndPartition topicAndPartition = new TopicAndPartition(o.topic(), o.partition());
					Map<TopicAndPartition, Object> topicAndPartitionObjectMap = new HashMap<TopicAndPartition, Object>();
					topicAndPartitionObjectMap.put(topicAndPartition, o.untilOffset());

					KafkaOffsetExample.log.warn(
							"Topic: " + o.topic() + " partitions: " + o.partition() + " offset : " + o.untilOffset());

					// 轉換java map to scala immutable.map
					scala.collection.mutable.Map<TopicAndPartition, Object> testMap = JavaConversions
							.mapAsScalaMap(topicAndPartitionObjectMap);
					scala.collection.immutable.Map<TopicAndPartition, Object> scalatopicAndPartitionObjectMap = testMap
							.toMap(new Predef.$less$colon$less<Tuple2<TopicAndPartition, Object>, Tuple2<TopicAndPartition, Object>>() {
								private static final long serialVersionUID = 1L;

								@Override
								public Tuple2<TopicAndPartition, Object> apply(Tuple2<TopicAndPartition, Object> v1) {
									return v1;
								}
							});
					// 更新offset到kafkaCluster
					kafkaCluster.setConsumerOffsets(kafkaParamBroadcast.getValue().get("group.id"),
							scalatopicAndPartitionObjectMap);
				}
			}
		});
		return javaDStream;
	}
	
	private static void initKafkaParams() {
		kafkaParam.put("metadata.broker.list", WIFIConfig.BROKER_LIST);
		kafkaParam.put("zookeeper.connect", WIFIConfig.ZK_CONNECT);
		kafkaParam.put("auto.offset.reset", WIFIConfig.AUTO_OFFSET_RESET);
		kafkaParam.put("group.id", WIFIConfig.GROUP_ID);
	}
	
	private static KafkaCluster initKafkaCluster() {
		KafkaOffsetExample.log.warn("transform java Map to scala immutable.map");
		// transform java Map to scala immutable.map
		scala.collection.mutable.Map<String, String> testMap = JavaConversions.mapAsScalaMap(kafkaParam);
		scala.collection.immutable.Map<String, String> scalaKafkaParam = testMap
				.toMap(new Predef.$less$colon$less<Tuple2<String, String>, Tuple2<String, String>>() {
					private static final long serialVersionUID = 1L;

					@Override
					public Tuple2<String, String> apply(Tuple2<String, String> arg0) {
						return arg0;
					}
				});

		// init KafkaCluster
		KafkaOffsetExample.log.warn("Init KafkaCluster");
		return new KafkaCluster(scalaKafkaParam);
	}
	
	public static void run() {
		initKafkaParams();
		kafkaCluster = initKafkaCluster();

		SparkConf sparkConf = new SparkConf().setMaster("local[4]").setAppName("tachyon-test-consumer");
		JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(5000));
		
		// 獲得rdd各個分區對應的offset, 並保存在offsetRanges中
		KafkaOffsetExample.log.warn("initConsumer Offset");
		Map<TopicAndPartition, Long> consumerOffsetsLong = initConsumerOffset(WIFIConfig.KAFKA_TOPIC);
		kafkaParamBroadcast = jssc.sparkContext().broadcast(kafkaParam);
		
		JavaInputDStream<String> message = createKafkaDStream(jssc, consumerOffsetsLong);
		final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<OffsetRange[]>();
		JavaDStream<String> javaDStream = getAndUpdateKafkaOffset(message, offsetRanges);
		
		javaDStream.print();
		
		jssc.start();
		try {
			jssc.awaitTermination();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	
	public static void main(String[] args) throws Exception {
		String testPath = "E:\\javaCodes\\svn\\SmartCampus\\Trunk\\smartcampus.etl.wifi\\src\\main\\resources\\WifiConfig.yaml";
		WIFIConfig.init(testPath);
		KafkaOffsetExample.log.warn(WIFIConfig.toStr());
		
		KafkaOffsetExample.run();
	}
}

  



SparkStreaming 數據處理

根據須要,將流式數據與Hive中的靜態數據關聯,結果經過Elasticsearch For Hadoop導出到ES集羣中。

若是靜態數據須要定時更新,能夠在建立數據流後,在foreachRDD邏輯中,根據實際狀況按期更新靜態數據。

調優

因爲我的經驗較少,處理的數據量不大,如下內容大可能是紙上談兵,僅供參考。

合理的批處理時間(batchDuration)

  • 幾乎全部的Spark Streaming調優文檔都會說起批處理時間的調整,在StreamingContext初始化的時候,有一個參數即是批處理時間的設定。
  • 若是這個值設置的太短,即個batchDuration所產生的Job並不能在這期間完成處理,那麼就會形成數據不斷堆積,最終致使Spark Streaming發生阻塞。
  • 通常對於batchDuration的設置不會小於500ms,由於太小會致使SparkStreaming頻繁的提交做業,對整個streaming形成額外的負擔。
  • 在平時的應用中,根據不一樣的應用場景和硬件配置,我設在1~10s之間,咱們能夠根據SparkStreaming的可視化監控界面,觀察Total Delay來進行batchDuration的調整,直達SparkStreaming剛剛能及時處理完上一個批處理的數據,這樣就是目前狀況的最優值。

 

合理的Kafka拉取量(maxRatePerPartition重要)

spark.streaming.kafka.maxRatePerPartition參數配置指定了每秒每個topic的每個分區獲取的最大消息數。

對於Spark Streaming消費kafka中數據的應用場景,這個配置是很是關鍵的。這個參數默認是沒有上限的,即kafka當中有多少數據它就會直接所有拉出。而根據生產者寫入Kafka的速率以及消費者自己處理數據的速度,同時這個參數須要結合上面的batchDuration,使得每一個partition拉取在每一個batchDuration期間拉取的數據可以順利的處理完畢,作到儘量高的吞吐量,而這個參數的調整能夠參考可視化監控界面中的Input Rate和Processing Time。

 

緩存反覆使用的Dstream(RDD)

Spark中的RDD和SparkStreaming中的Dstream,若是被反覆的使用,最好利用cache(),將該數據流緩存起來,防止過分的調度資源形成的網絡開銷。能夠參考觀察Scheduling Delay參數。

設置合理的GC

長期使用Java的小夥伴都知道,JVM中的垃圾回收機制,可讓咱們不過多的關注與內存的分配回收,更加專一於業務邏輯,JVM都會爲咱們搞定。對JVM有些瞭解的小夥伴應該知道,在Java虛擬機中,將內存分爲了初生代(eden generation)、年輕代(young generation)、老年代(old generation)以及永久代(permanent generation),其中每次GC都是須要耗費必定時間的,尤爲是老年代的GC回收,須要對內存碎片進行整理,一般採用標記-清楚的作法。一樣的在Spark程序中,JVM GC的頻率和時間也是影響整個Spark效率的關鍵因素。在一般的使用中建議:

設置年老代爲併發收集。
--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"

設置合理的CPU資源數

CPU的core數量,每一個executor能夠佔用一個或多個core,能夠經過觀察CPU的使用率變化來了解計算資源的使用狀況,例如,很常見的一種浪費是一個executor佔用了多個core,可是總的CPU使用率卻不高(由於一個executor並不總能充分利用多核的能力),這個時候能夠考慮讓麼個executor佔用更少的core,同時worker下面增長更多的executor,或者一臺host上面增長更多的worker來增長並行執行的executor的數量,從而增長CPU利用率。

可是增長executor的時候須要考慮好內存消耗,由於一臺機器的內存分配給越多的executor,每一個executor的內存就越小,以至出現過多的數據spill over甚至out of memory的狀況。

設置合理的parallelism

partition和parallelism,partition指的就是數據分片的數量,每一次task只能處理一個partition的數據,這個值過小了會致使每片數據量太大,致使內存壓力,或者諸多executor的計算能力沒法利用充分;可是若是太大了則會致使分片太多,執行效率下降。在執行action類型操做的時候(好比各類reduce操做),partition的數量會選擇parent RDD中最大的那一個。而parallelism則指的是在RDD進行reduce類操做的時候,默認返回數據的paritition數量(而在進行map類操做的時候,partition數量一般取自parent RDD中較大的一個,並且也不會涉及shuffle,所以這個parallelism的參數沒有影響)。因此說,這兩個概念密切相關,都是涉及到數據分片的,做用方式實際上是統一的。經過spark.default.parallelism能夠設置默認的分片數量,而不少RDD的操做均可以指定一個partition參數來顯式控制具體的分片數量。 在SparkStreaming+kafka的使用中,咱們採用了Direct鏈接方式,前文闡述過Spark中的partition和Kafka中的Partition是一一對應的,咱們通常默認設置爲Kafka中Partition的數量。

使用高性能的算子

這裏參考了美團技術團隊的博文,並無作過具體的性能測試,其建議以下:

  • 使用reduceByKey/aggregateByKey替代groupByKey

  • 使用mapPartitions替代普通map

  • 使用foreachPartitions替代foreach

  • 使用filter以後進行coalesce操做

  • 使用repartitionAndSortWithinPartitions替代repartition與sort類操做

  • 使用Kryo優化序列化性能 這個優化原則我自己也沒有通過測試,可是好多優化文檔有提到,這裏也記錄下來。 在Spark中,主要有三個地方涉及到了序列化:

  • 在算子函數中使用到外部變量時,該變量會被序列化後進行網絡傳輸。

  • 將自定義的類型做爲RDD的泛型類型時(好比JavaRDD,Student是自定義類型),全部自定義類型對象,都會進行序列化。所以這種狀況下,也要求自定義的類必須實現Serializable接口。

  • 使用可序列化的持久化策略時(好比MEMORY_ONLY_SER),Spark會將RDD中的每一個partition都序列化成一個大的字節數組。

對於這三種出現序列化的地方,咱們均可以經過使用Kryo序列化類庫,來優化序列化和反序列化的性能。Spark默認使用的是Java的序列化機制,也就是ObjectOutputStream/ObjectInputStream API來進行序列化和反序列化。可是Spark同時支持使用Kryo序列化庫,Kryo序列化類庫的性能比Java序列化類庫的性能要高不少。
官方介紹,Kryo序列化機制比Java序列化機制,性能高10倍左右。Spark之因此默認沒有使用Kryo做爲序列化類庫,是由於Kryo要求最好要註冊全部須要進行序列化的自定義類型,所以對於開發者來講,這種方式比較麻煩。

如下是使用Kryo的代碼示例,咱們只要設置序列化類,再註冊要序列化的自定義類型便可(好比算子函數中使用到的外部變量類型、做爲RDD泛型類型的自定義類型等):

// 建立SparkConf對象。 val conf = new SparkConf().setMaster(...).setAppName(...) // 設置序列化器爲KryoSerializer。 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // 註冊要序列化的自定義類型。 conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

參考

相關文章
相關標籤/搜索