目錄html
在WeTest輿情項目中,須要對天天千萬級的遊戲評論信息進行詞頻統計,在生產者一端,咱們將數據按照天天的拉取時間存入了Kafka當中,而在消費者一端,咱們利用了spark streaming從kafka中不斷拉取數據進行詞頻統計。本文首先對spark streaming嵌入kafka的方式進行概括總結,以後簡單闡述Spark streaming+kafka在輿情項目中的應用,最後將本身在Spark Streaming+kafka的實際優化中的一些經驗進行概括總結。(若有任何紕漏歡迎補充來踩,我會第一時間改正^v^)java
用spark streaming流式處理kafka中的數據,第一步固然是先把數據接收過來,轉換爲spark streaming中的數據結構Dstream。接收數據的方式有兩種:1.利用Receiver接收數據,2.直接從kafka讀取數據。git
這種方式利用接收器(Receiver)來接收kafka中的數據,其最基本是使用Kafka高階用戶API接口。對於全部的接收器,從kafka接收來的數據會存儲在spark的executor中,以後spark streaming提交的job會處理這些數據。以下圖:
在使用時,咱們須要添加相應的依賴包:github
<dependency><!-- Spark Streaming Kafka --> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>1.6.3</version> </dependency>
而對於Scala的基本使用方式以下:sql
import org.apache.spark.streaming.kafka._ val kafkaStream = KafkaUtils.createStream(streamingContext, [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
還有幾個須要注意的點:數據庫
KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)
在spark1.3以後,引入了Direct方式。不一樣於Receiver的方式,Direct方式沒有receiver這一層,其會週期性的獲取Kafka中每一個topic的每一個partition中的最新offsets,以後根據設定的maxRatePerPartition來處理每一個batch。其形式以下圖:
這種方法相較於Receiver方式的優點在於:apache
以上主要是對官方文檔[1]的一個簡單翻譯,詳細內容你們能夠直接看下官方文檔這裏再也不贅述。bootstrap
不一樣於Receiver的方式,是從Zookeeper中讀取offset值,那麼天然zookeeper就保存了當前消費的offset值,那麼若是從新啓動開始消費就會接着上一次offset值繼續消費。而在Direct的方式中,咱們是直接從kafka來讀數據,那麼offset須要本身記錄,能夠利用checkpoint、數據庫或文件記錄或者回寫到zookeeper中進行記錄。這裏咱們給出利用Kafka底層API接口,將offset及時同步到zookeeper中的通用類,我將其放在了github上:
Spark streaming+Kafka demo
示例中KafkaManager是一個通用類,而KafkaCluster是kafka源碼中的一個類,因爲包名權限的緣由我把它單獨提出來,ComsumerMain簡單展現了通用類的使用方法,在每次建立KafkaStream時,都會先從zooker中查看上次的消費記錄offsets,而每一個batch處理完成後,會同步offsets到zookeeper中。數組
上文闡述了Spark如何從Kafka中流式的讀取數據,下面我整理向Kafka中寫數據。與讀數據不一樣,Spark並無提供統一的接口用於寫入Kafka,因此咱們須要使用底層Kafka接口進行包裝。
最直接的作法咱們能夠想到以下這種方式:緩存
input.foreachRDD(rdd => // 不能在這裏建立KafkaProducer rdd.foreachPartition(partition => partition.foreach{ case x:String=>{ val props = new HashMap[String, Object]() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") println(x) val producer = new KafkaProducer[String,String](props) val message=new ProducerRecord[String, String]("output",null,x) producer.send(message) } } ) )
可是這種方式缺點很明顯,對於每一個partition的每條記錄,咱們都須要建立KafkaProducer,而後利用producer進行輸出操做,注意這裏咱們並不能將KafkaProducer的新建任務放在foreachPartition外邊,由於KafkaProducer是不可序列化的(not serializable)。顯然這種作法是不靈活且低效的,由於每條記錄都須要創建一次鏈接。如何解決呢?
import java.util.concurrent.Future import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord, RecordMetadata } class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable { /* This is the key idea that allows us to work around running into NotSerializableExceptions. */ lazy val producer = createProducer() def send(topic: String, key: K, value: V): Future[RecordMetadata] = producer.send(new ProducerRecord[K, V](topic, key, value)) def send(topic: String, value: V): Future[RecordMetadata] = producer.send(new ProducerRecord[K, V](topic, value)) } object KafkaSink { import scala.collection.JavaConversions._ def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = { val createProducerFunc = () => { val producer = new KafkaProducer[K, V](config) sys.addShutdownHook { // Ensure that, on executor JVM shutdown, the Kafka producer sends // any buffered messages to Kafka before shutting down. producer.close() } producer } new KafkaSink(createProducerFunc) } def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap) }
// 廣播KafkaSink val kafkaProducer: Broadcast[KafkaSink[String, String]] = { val kafkaProducerConfig = { val p = new Properties() p.setProperty("bootstrap.servers", Conf.brokers) p.setProperty("key.serializer", classOf[StringSerializer].getName) p.setProperty("value.serializer", classOf[StringSerializer].getName) p } log.warn("kafka producer init done!") ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig)) }
這樣咱們就能在每一個executor中愉快的將數據輸入到kafka當中:
//輸出到kafka segmentedStream.foreachRDD(rdd => { if (!rdd.isEmpty) { rdd.foreach(record => { kafkaProducer.value.send(Conf.outTopics, record._1.toString, record._2) // do something else }) } })
WeTest輿情監控對於天天爬取的千萬級遊戲玩家評論信息都要實時的進行詞頻統計,對於爬取到的遊戲玩家評論數據,咱們會生產到Kafka中,而另外一端的消費者咱們採用了Spark Streaming來進行流式處理,首先利用上文咱們闡述的Direct方式從Kafka拉取batch,以後通過分詞、統計等相關處理,回寫到DB上(至於Spark中DB的回寫方式可參考我以前總結的博文:Spark踩坑記——數據庫(Hbase+Mysql)),由此高效實時的完成天天大量數據的詞頻統計任務。
Spark streaming+Kafka的使用中,當數據量較小,不少時候默認配置和使用便可以知足狀況,可是當數據量大的時候,就須要進行必定的調整和優化,而這種調整和優化自己也是不一樣的場景須要不一樣的配置。
幾乎全部的Spark Streaming調優文檔都會說起批處理時間的調整,在StreamingContext初始化的時候,有一個參數即是批處理時間的設定。若是這個值設置的太短,即個batchDuration所產生的Job並不能在這期間完成處理,那麼就會形成數據不斷堆積,最終致使Spark Streaming發生阻塞。並且,通常對於batchDuration的設置不會小於500ms,由於太小會致使SparkStreaming頻繁的提交做業,對整個streaming形成額外的負擔。在平時的應用中,根據不一樣的應用場景和硬件配置,我設在1~10s之間,咱們能夠根據SparkStreaming的可視化監控界面,觀察Total Delay來進行batchDuration的調整,以下圖:
對於Spark Streaming消費kafka中數據的應用場景,這個配置是很是關鍵的,配置參數爲:spark.streaming.kafka.maxRatePerPartition。這個參數默認是沒有上線的,即kafka當中有多少數據它就會直接所有拉出。而根據生產者寫入Kafka的速率以及消費者自己處理數據的速度,同時這個參數須要結合上面的batchDuration,使得每一個partition拉取在每一個batchDuration期間拉取的數據可以順利的處理完畢,作到儘量高的吞吐量,而這個參數的調整能夠參考可視化監控界面中的Input Rate和Processing Time,以下圖:
Spark中的RDD和SparkStreaming中的Dstream,若是被反覆的使用,最好利用cache(),將該數據流緩存起來,防止過分的調度資源形成的網絡開銷。能夠參考觀察Scheduling Delay參數,以下圖:
長期使用Java的小夥伴都知道,JVM中的垃圾回收機制,可讓咱們不過多的關注與內存的分配回收,更加專一於業務邏輯,JVM都會爲咱們搞定。對JVM有些瞭解的小夥伴應該知道,在Java虛擬機中,將內存分爲了初生代(eden generation)、年輕代(young generation)、老年代(old generation)以及永久代(permanent generation),其中每次GC都是須要耗費必定時間的,尤爲是老年代的GC回收,須要對內存碎片進行整理,一般採用標記-清楚的作法。一樣的在Spark程序中,JVM GC的頻率和時間也是影響整個Spark效率的關鍵因素。在一般的使用中建議:
--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"
CPU的core數量,每一個executor能夠佔用一個或多個core,能夠經過觀察CPU的使用率變化來了解計算資源的使用狀況,例如,很常見的一種浪費是一個executor佔用了多個core,可是總的CPU使用率卻不高(由於一個executor並不總能充分利用多核的能力),這個時候能夠考慮讓麼個executor佔用更少的core,同時worker下面增長更多的executor,或者一臺host上面增長更多的worker來增長並行執行的executor的數量,從而增長CPU利用率。可是增長executor的時候須要考慮好內存消耗,由於一臺機器的內存分配給越多的executor,每一個executor的內存就越小,以至出現過多的數據spill over甚至out of memory的狀況。
partition和parallelism,partition指的就是數據分片的數量,每一次task只能處理一個partition的數據,這個值過小了會致使每片數據量太大,致使內存壓力,或者諸多executor的計算能力沒法利用充分;可是若是太大了則會致使分片太多,執行效率下降。在執行action類型操做的時候(好比各類reduce操做),partition的數量會選擇parent RDD中最大的那一個。而parallelism則指的是在RDD進行reduce類操做的時候,默認返回數據的paritition數量(而在進行map類操做的時候,partition數量一般取自parent RDD中較大的一個,並且也不會涉及shuffle,所以這個parallelism的參數沒有影響)。因此說,這兩個概念密切相關,都是涉及到數據分片的,做用方式實際上是統一的。經過spark.default.parallelism能夠設置默認的分片數量,而不少RDD的操做均可以指定一個partition參數來顯式控制具體的分片數量。
在SparkStreaming+kafka的使用中,咱們採用了Direct鏈接方式,前文闡述過Spark中的partition和Kafka中的Partition是一一對應的,咱們通常默認設置爲Kafka中Partition的數量。
這裏參考了美團技術團隊的博文,並無作過具體的性能測試,其建議以下:
這個優化原則我自己也沒有通過測試,可是好多優化文檔有提到,這裏也記錄下來。
在Spark中,主要有三個地方涉及到了序列化:
對於這三種出現序列化的地方,咱們均可以經過使用Kryo序列化類庫,來優化序列化和反序列化的性能。Spark默認使用的是Java的序列化機制,也就是ObjectOutputStream/ObjectInputStream API來進行序列化和反序列化。可是Spark同時支持使用Kryo序列化庫,Kryo序列化類庫的性能比Java序列化類庫的性能要高不少。官方介紹,Kryo序列化機制比Java序列化機制,性能高10倍左右。Spark之因此默認沒有使用Kryo做爲序列化類庫,是由於Kryo要求最好要註冊全部須要進行序列化的自定義類型,所以對於開發者來講,這種方式比較麻煩。
如下是使用Kryo的代碼示例,咱們只要設置序列化類,再註冊要序列化的自定義類型便可(好比算子函數中使用到的外部變量類型、做爲RDD泛型類型的自定義類型等):
// 建立SparkConf對象。 val conf = new SparkConf().setMaster(...).setAppName(...) // 設置序列化器爲KryoSerializer。 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // 註冊要序列化的自定義類型。 conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
通過種種調試優化,咱們最終要達到的目的是,Spark Streaming可以實時的拉取Kafka當中的數據,而且可以保持穩定,以下圖所示:
固然不一樣的應用場景會有不一樣的圖形,這是本文詞頻統計優化穩定後的監控圖,咱們能夠看到Processing Time這一柱形圖中有一Stable的虛線,而大多數Batch都可以在這一虛線下處理完畢,說明總體Spark Streaming是運行穩定的。