整合Kafka到Spark Streaming——代碼示例和挑戰

做者Michael G. Noll是瑞士的一位工程師和研究員,效力於Verisign,是Verisign實驗室的大規模數據分析基礎設施(基礎Hadoop)的技術主管。本文,Michael詳細的演示瞭如何將Kafka整合到Spark Streaming中。 期間, Michael還提到了將Kafka整合到 Spark Streaming中的一些現狀,很是值得閱讀,雖然有一些信息在Spark 1.2版本中已發生了一些變化,好比HA策略: 經過Spark Contributor、Spark佈道者陳超咱們瞭解到 ,在Spark 1.2版本中,Spark Streaming開始支持fully HA模式(選擇使用),經過添加一層WAL(Write Ahead Log),每次收到數據後都會存在HDFS上,從而避免了之前版本中的數據丟失狀況,可是不可避免的形成了必定的開銷,須要開發者自行衡量。php

如下爲譯文html

做爲一個實時大數據處理工具, Spark Sreaming 近日一直被普遍關注,與 Apache Storm 的對比也常常出現。可是依我說,缺乏與Kafka整合,任何實時大數據處理工具都是不完整的,所以我將一個示例Spark Streaming應用程序添加到 kafka-storm-starter ,而且示範如何從Kafka讀取,以及如何寫入到Kafka。在這個過程當中,我還使用Avro做爲數據格式,以及Twitter Bijection進行數據序列化。java

在本篇文章,我將詳細地講解這個Spark Streaming示例;同時,我還會穿插當下Spark Streaming與Kafka整合的一些焦點話題。免責聲明:這是我首次試驗Spark Streaming,僅做爲參考。node

當下,這個Spark Streaming示例被上傳到GitHub,下載訪問: kafka-storm-starter。項目的名稱或許會讓你產生某些誤解,不過,不要在乎這些細節:)git

什麼是Spark Streaming

Spark Streaming 是Apache Spark的一個子項目。Spark是個相似於Apache Hadoop的開源批處理平臺,而Spark Streaming則是個實時處理工具,運行在Spark引擎之上。github

Spark Streaming vs. Apache Storm

Spark Streaming與Apache Storm有一些類似之處,後者是當下最流行的大數據處理平臺。前不久,雅虎的Bobby Evans 和Tom Graves曾發表過一個「 Spark and Storm at Yahoo! 」的演講,在這個演講中,他們對比了兩個大平臺,並提供了一些選擇參考。相似的,Hortonworks的P. Taylor Goetz也分享過名爲 Apache Storm and Spark Streaming Compared 的講義。web

這裏,我也提供了一個很是簡短的對比:對比Spark Streaming,Storm的產業採用更高,生產環境應用也更穩定。可是從另外一方面來講,對比Storm,Spark擁有更清晰、等級更高的API,所以Spark使用起來也更加愉快,最起碼是在使用Scala編寫Spark應用程序的狀況(毫無疑問,我更喜歡Spark中的API)。可是,請別這麼直接的相信個人話,多看看上面的演講和講義。算法

無論是Spark仍是Storm,它們都是Apache的頂級項目,當下許多大數據平臺提供商也已經開始整合這兩個框架(或者其中一個)到其商業產品中,好比Hortonworks就同時整合了Spark和Storm,而Cloudera也整合了Spark。shell

附錄:Spark中的Machines、cores、executors、tasks和receivers 

本文的後續部分將講述許多Spark和Kafka中的parallelism問題,所以,你須要掌握一些Spark中的術語以弄懂這些環節。數據庫

  • 一個Spark集羣必然包含了1個以上的工者做節點,又稱爲從主機(爲了簡化架構,這裏咱們先拋棄開集羣管理者不談)。

  • 一個工做者節點能夠運行一個以上的executor

  • Executor是一個用於應用程序或者工做者節點的進程,它們負責處理tasks,並將數據保存到內存或者磁盤中。每一個應用程序都有屬於本身的executors,一個executor則包含了必定數量的cores(也被稱爲slots)來運行分配給它的任務。

  • Task是一個工做單元,它將被傳送給executor。也就是說,task將是你應用程序的計算內容(或者是一部分)。SparkContext將把這些tasks發送到executors進行執行。每一個task都會佔用父executor中的一個core(slot)。

  • Receiver( API  文檔 )將做爲一個長期運行的task跑在一個executor上。每一個receiver都會負責一個所謂的input DStream(好比從Kafka中讀取的一個輸入流),同時每一個receiver( input DStream)佔用一個core/slot。

  • input DStream:input DStream是DStream的一個類型,它負責將Spark Streaming鏈接到外部的數據源,用於讀取數據。對於每一個外部數據源(好比Kafka)你都須要配置一個input DStream。一個Spark Streaming會經過一個input DStream與一個外部數據源進行鏈接,任何後續的DStream都會創建標準的DStreams。

在Spark的執行模型,每一個應用程序都會得到本身的executors,它們會支撐應用程序的整個流程,並以多線程的方式運行1個以上的tasks,這種隔離途徑很是相似Storm的執行模型。一旦引入相似YARN或者Mesos這樣的集羣管理器,整個架構將會變得異常複雜,所以這裏將不會引入。你能夠經過Spark文檔中的 Cluster Overview 瞭解更多細節。

整合Kafka到Spark Streaming

概述

簡而言之,Spark是支持Kafka的,可是這裏存在許多不完善的地方。

Spark代碼庫中的 KafkaWordCount 對於咱們來講是個很是好的起點,可是這裏仍然存在一些開放式問題。

特別是我想了解如何去作:

  • 從kafaka中並行讀入。在Kafka,一個話題(topic)能夠有N個分區。理想的狀況下,咱們但願在多個分區上並行讀取。這也是 Kafka spout in Storm 的工做。

  • 從一個Spark Streaming應用程序向Kafka寫入,一樣,咱們須要並行執行。

在完成這些操做時,我一樣碰到了Spark Streaming和/或Kafka中一些已知的問題,這些問題大部分都已經在Spark mailing list中列出。在下面,我將詳細總結Kafka集成到Spark的現狀以及一些常見問題。

Kafka中的話題、分區(partitions)和parallelism

詳情能夠查看我以前的博文: Apache Kafka 0.8 Training Deck and Tutorial Running a Multi-Broker Apache Kafka 0.8 Cluster on a Single Node 

Kafka將數據存儲在話題中,每一個話題都包含了一些可配置數量的分區。話題的分區數量對於性能來講很是重要,而這個值通常是消費者parallelism的最大數量:若是一個話題擁有N個分區,那麼你的應用程序最大程度上只能進行N個線程的並行,最起碼在使用Kafka內置Scala/Java消費者API時是這樣的。

與其說應用程序,不如說Kafka術語中的消費者羣(consumer group)。消費者羣,經過你選擇的字符串識別,它是邏輯消費者應用程序集羣範圍的識別符。同一個消費者羣中的全部消費者將分擔從一個指定Kafka話題中的讀取任務,同時,同一個消費組中全部消費者從話題中讀取的線程數最大值便是N(等同於分區的數量),多餘的線程將會閒置。

多個不一樣的Kafka消費者羣能夠並行的運行:毫無疑問,對同一個Kafka話題,你能夠運行多個獨立的邏輯消費者應用程序。這裏,每一個邏輯應用程序都會運行本身的消費者線程,使用一個惟一的消費者羣id。而每一個應用程序一般可使用不一樣的read parallelisms(見下文)。當在下文我描述不一樣的方式配置read parallelisms時,我指的是如何完成這些邏輯消費者應用程序中的一個設置。

這裏有一些簡單的例子

  • 你的應用程序使用「terran」消費者羣id對一個名爲「zerg.hydra」的kafka話題進行讀取,這個話題擁有10個分區。若是你的消費者應用程序只配置一個線程對這個話題進行讀取,那麼這個線程將從10個分區中進行讀取。

  • 同上,可是此次你會配置5個線程,那麼每一個線程都會從2個分區中進行讀取。

  • 同上,此次你會配置10個線程,那麼每一個線程都會負責1個分區的讀取。

  • 同上,可是此次你會配置多達14個線程。那麼這14個線程中的10個將平分10個分區的讀取工做,剩下的4個將會被閒置。

這裏咱們不妨看一下現實應用中的複雜性——Kafka中的再平衡事件。在Kafka中,再平衡是個生命週期事件(lifecycle event),在消費者加入或者離開消費者羣時都會觸發再平衡事件。這裏咱們不會進行詳述,更多再平衡詳情可參見個人 Kafka training deck 一文。

你的應用程序使用消費者羣id「terran」,而且從1個線程開始,這個線程將從10個分區中進行讀取。在運行時,你逐漸將線程從1個提高到14個。也就是說,在同一個消費者羣中,parallelism忽然發生了變化。毫無疑問,這將形成Kafka中的再平衡。一旦在平衡結束,你的14個線程中將有10個線程平分10個分區的讀取工做,剩餘的4個將會被閒置。所以如你想象的同樣,初始線程之後只會讀取一個分區中的內容,將不會再讀取其餘分區中的數據。

如今,咱們終於對話題、分區有了必定的理解,而分區的數量將做爲從Kafka讀取時parallelism的上限。可是對於一個應用程序來講,這種機制會產生一個什麼樣的影響,好比一個Spark Streaming job或者 Storm topology從Kafka中讀取數據做爲輸入。

1. Read parallelism: 一般狀況下,你指望使用N個線程並行讀取Kafka話題中的N個分區。同時,鑑於數據的體積,你指望這些線程跨不一樣的NIC,也就是跨不一樣的主機。在Storm中,這能夠經過TopologyBuilder#setSpout()設置Kafka spout的parallelism爲N來實現。在Spark中,你則須要作更多的事情,在下文我將詳述如何實現這一點。

2. Downstream processing parallelism: 一旦使用Kafka,你但願對數據進行並行處理。鑑於你的用例,這種等級的parallelism必然與read parallelism有所區別。若是你的用例是計算密集型的,舉個例子,對比讀取線程,你指望擁有更多的處理線程;這能夠經過從多個讀取線程shuffling或者網路「fanning out」數據處處理線程實現。所以,你經過增加網絡通訊、序列化開銷等將訪問交付給更多的cores。在Storm中,你經過shuffle grouping 將Kafka spout shuffling到下游的bolt中。在Spark中,你須要經過DStreams上的 repartition 轉換來實現。

一般狀況下,你們都渴望去耦從Kafka的parallelisms讀取,並當即處理讀取來的數據。在下一節,我將詳述使用 Spark Streaming從Kafka中的讀取和寫入。

從Kafka中讀取

Spark Streaming中的Read parallelism

相似Kafka,Read parallelism中也有分區的概念。瞭解Kafka的per-topic話題與RDDs in Spark 中的分區沒有關聯很是重要。

Spark Streaming中的 KafkaInputDStream (又稱爲Kafka鏈接器)使用了Kafka的高等級消費者API ,這意味着在Spark中爲Kafka設置 read parallelism將擁有兩個控制按鈕。

1. Input DStreams的數量。 由於Spark在每一個Input DStreams都會運行一個receiver(=task),這就意味着使用多個input DStreams將跨多個節點並行進行讀取操做,所以,這裏寄但願於多主機和NICs。

2. Input DStreams上的消費者線程數量。 這裏,相同的receiver(=task)將運行多個讀取線程。這也就是說,讀取操做在每一個core/machine/NIC上將並行的進行。

在實際狀況中,第一個選擇顯然更是你們指望的。

爲何會這樣?首先以及最重要的,從Kafka中讀取一般狀況下會受到網絡/NIC限制,也就是說,在同一個主機上你運行多個線程不會增長讀的吞吐量。另外一方面來說,雖然不常常,可是有時候從Kafka中讀取也會遭遇CPU瓶頸。其次,若是你選擇第二個選項,多個讀取線程在將數據推送到blocks時會出現鎖競爭(在block生產者實例上,BlockGenerator的「+=」方法真正使用的是「synchronized」方式)。

input DStreams創建的RDDs分區數量:KafkaInputDStream將儲存從Kafka中讀取的每一個信息到Blocks。從個人理解上,一個新的Block由 spark.streaming.blockInterval在毫秒級別創建,而每一個block都會轉換成RDD的一個分區,最終由DStream創建。若是個人這種假設成立,那麼由KafkaInputDStream創建的RDDs分區數量由batchInterval / spark.streaming.blockInterval決定,而batchInterval則是數據流拆分紅batches的時間間隔,它能夠經過StreamingContext的一個構造函數參數設置。舉個例子,若是你的批時間價格是2秒(默認狀況下),而block的時間間隔是200毫秒(默認狀況),那麼你的RDD將包含10個分區。若是有錯誤的話,能夠提醒我。

選項1:控制input DStreams的數量

下面這個例子能夠從 Spark Streaming Programming Guide 中得到:

val ssc: StreamingContext = ??? // ignore for nowval kafkaParams: Map[String, String] = Map("group.id" -> "terran", /* ignore rest */)val numInputDStreams = 5val kafkaDStreams = (1 to numInputDStreams).map { _ => KafkaUtils.createStream(...) }

在這個例子中,咱們創建了5個input DStreams,所以從Kafka中讀取的工做將分擔到5個核心上,寄但願於5個主機/NICs(之因此說是寄但願於,由於我也不肯定Spark Streaming task佈局策略是否會將receivers投放到多個主機上)。全部Input Streams都是「terran」消費者羣的一部分,而Kafka將保證topic的全部數據能夠同時對這5個input DSreams可用。換句話說,這種「collaborating」input DStreams設置能夠工做是基於消費者羣的行爲是由Kafka API提供,經過KafkaInputDStream完成。

在這個例子中,我沒有提到每一個input DSream會創建多少個線程。在這裏,線程的數量能夠經過KafkaUtils.createStream方法的參數設置(同時,input topic的數量也能夠經過這個方法的參數指定)。在下一節中,咱們將經過實際操做展現。

可是在開始以前,在這個步驟我先解釋幾個Spark Streaming中常見的幾個問題,其中有些由於當下Spark中存在的一些限制引發,另外一方面則是因爲當下Kafka input DSreams的一些設置形成:

當你使用我上文介紹的多輸入流途徑,而這些消費者都是屬於同一個消費者羣,它們會給消費者指定負責的分區。這樣一來則可能致使syncpartitionrebalance的失敗,系統中真正工做的消費者可能只會有幾個。爲了解決這個問題,你能夠把再均衡嘗試設置的很是高,從而得到它的幫助。而後,你將會碰到另外一個坑——若是你的receiver宕機(OOM,亦或是硬件故障),你將中止從Kafka接收消息。

Spark用戶討論 markmail.org/message/…

這裏,咱們須要對「中止從Kafka中接收」問題 作一些解釋 。當下,當你經過ssc.start()開啓你的streams應用程序後,處理會開始並一直進行,即便是輸入數據源(好比Kafka)變得不可用。也就是說,流不能檢測出是否與上游數據源失去連接,所以也不會對丟失作出任何反應,舉個例子來講也就是重連或者結束執行。相似的,若是你丟失這個數據源的一個receiver,那麼 你的流應用程序可能就會生成一些空的RDDs 

這是一個很是糟糕的狀況。最簡單也是最粗糙的方法就是,在與上游數據源斷開鏈接或者一個receiver失敗時,重啓你的流應用程序。可是,這種解決方案可能並不會產生實際效果,即便你的應用程序須要將Kafka配置選項auto.offset.reset設置到最小——由於Spark Streaming中一些已知的bug,可能致使你的流應用程序發生一些你意想不到的問題,在下文Spark Streaming中常見問題一節咱們將詳細的進行介紹。

選擇2:控制每一個input DStream上小發着線程的數量

在這個例子中,咱們將創建一個單一的input DStream,它將運行3個消費者線程——在同一個receiver/task,所以是在同一個core/machine/NIC上對Kafka topic 「zerg.hydra」進行讀取。

val ssc: StreamingContext = ??? // ignore for nowval kafkaParams: Map[String, String] = Map("group.id" -> "terran", ...)val consumerThreadsPerInputDstream = 3val topics = Map("zerg.hydra" -> consumerThreadsPerInputDstream)val stream = KafkaUtils.createStream(ssc, kafkaParams, topics, ...)

KafkaUtils.createStream方法被重載,所以這裏有一些不一樣方法的特徵。在這裏,咱們會選擇Scala派生以得到最佳的控制。

結合選項1和選項2

下面是一個更完整的示例,結合了上述兩種技術:

val ssc: StreamingContext = ???val kafkaParams: Map[String, String] = Map("group.id" -> "terran", ...)val numDStreams = 5val topics = Map("zerg.hydra" -> 1)val kafkaDStreams = (1 to numDStreams).map { _ =>
    KafkaUtils.createStream(ssc, kafkaParams, topics, ...)
  }

咱們創建了5個input DStreams,它們每一個都會運行一個消費者線程。若是「zerg.hydra」topic擁有5個分區(或者更少),那麼這將是進行並行讀取的最佳途徑,若是你在乎系統最大吞吐量的話。

Spark Streaming中的並行Downstream處理

在以前的章節中,咱們覆蓋了從Kafka的並行化讀取,那麼咱們就能夠在Spark中進行並行化處理。那麼這裏,你必須弄清楚Spark自己是如何進行並行化處理的。相似Kafka,Spark將parallelism設置的與(RDD)分區數量有關, 經過在每一個RDD分區上運行task進行 。在有些文檔中,分區仍然被稱爲「slices」。

在任何Spark應用程序中,一旦某個Spark Streaming應用程序接收到輸入數據,其餘處理都與非streaming應用程序相同。也就是說,與普通的Spark數據流應用程序同樣,在Spark Streaming應用程序中,你將使用相同的工具和模式。更多詳情可見Level of Parallelism in Data Processing 文檔。

所以,咱們一樣將得到兩個控制手段:

1. input DStreams的數量 ,也就是說,咱們在以前章節中read parallelism的數量做爲結果。這是咱們的立足點,這樣一來,咱們在下一個步驟中既能夠保持原樣,也能夠進行修改。

2. DStream轉化的重分配 。這裏將得到一個全新的DStream,其parallelism等級可能增長、減小,或者保持原樣。在DStream中每一個返回的RDD都有指定的N個分區。DStream由一系列的RDD組成,DStream.repartition則是經過RDD.repartition實現。接下來將對RDD中的全部數據作隨機的reshuffles,而後創建或多或少的分區,並進行平衡。同時,數據會在全部網絡中進行shuffles。換句話說,DStream.repartition很是相似Storm中的shuffle grouping。

所以,repartition是從processing parallelism解耦read parallelism的主要途徑。在這裏,咱們能夠設置processing tasks的數量,也就是說設置處理過程當中全部core的數量。間接上,咱們一樣設置了投入machines/NICs的數量。

一個DStream轉換相關是 union 。這個方法一樣在StreamingContext中,它將從多個DStream中返回一個統一的DStream,它將擁有相同的類型和滑動時間。一般狀況下,你更願意用StreamingContext的派生。一個union將返回一個由Union RDD支撐的UnionDStream。Union RDD由RDDs統一後的全部分區組成,也就是說,若是10個分區都聯合了3個RDDs,那麼你的聯合RDD實例將包含30個分區。換句話說,union會將多個 DStreams壓縮到一個 DStreams或者RDD中,可是須要注意的是,這裏的parallelism並不會發生改變。你是否使用union依賴於你的用例是否須要從全部Kafka分區進行「in one place」信息獲取決定,所以這裏大部分都是基於語義需求決定。舉個例子,當你須要執行一個不用元素上的(全局)計數。

注意: RDDs是無序的。所以,當你union RDDs時,那麼結果RDD一樣不會擁有一個很好的序列。若是你須要在RDD中進行sort。

你的用例將決定須要使用的方法,以及你須要使用哪一個。若是你的用例是CPU密集型的,你但願對zerg.hydra topic進行5 read parallelism讀取。也就是說,每一個消費者進程使用5個receiver,可是卻能夠將processing parallelism提高到20。

val ssc: StreamingContext = ???val kafkaParams: Map[String, String] = Map("group.id" -> "terran", ...)val readParallelism = 5val topics = Map("zerg.hydra" -> 1)val kafkaDStreams = (1 to readParallelism).map { _ =>
  KafkaUtils.createStream(ssc, kafkaParams, topics, ...)
  }//> collection of five *input* DStreams = handled by five receivers/tasksval unionDStream = ssc.union(kafkaDStreams) // often unnecessary, just showcasing how to do it//> single DStreamval processingParallelism = 20val processingDStream = unionDStream(processingParallelism)//> single DStream but now with 20 partitions

在下一節中,我將把全部部分結合到一塊兒,而且聯合實際數據處理進行講解。

寫入到Kafka

寫入到Kafka須要從foreachRDD輸出操做進行:

通用的輸出操做者都包含了一個功能(函數),讓每一個RDD都由Stream生成。這個函數須要將每一個RDD中的數據推送到一個外部系統,好比將RDD保存到文件,或者經過網絡將它寫入到一個數據庫。須要注意的是,這裏的功能函數將在驅動中執行,同時其中一般會伴隨RDD行爲,它將會促使流RDDs的計算。

注意: 重提「功能函數是在驅動中執行」,也就是Kafka生產者將從驅動中進行,也就是說「功能函數是在驅動中進行評估」。當你使用foreachRDD從驅動中讀取Design Patterns時,實際過程將變得更加清晰。

在這裏,建議你們去閱讀Spark文檔中的 Design Patterns for using foreachRDD一節,它將詳細講解使用foreachRDD讀外部系統中的一些經常使用推薦模式,以及常常出現的一些陷阱。

在咱們這個例子裏,咱們將按照推薦來重用Kafka生產者實例,經過生產者池跨多個RDDs/batches。 我經過 Apache Commons Pool 實現了這樣一個工具,已經上傳到GitHub 。這個生產者池自己經過 broadcast variable 提供給tasks。

最終結果看起來以下:

val producerPool = {
  // See the full code on GitHub for details on how the pool is created
  val pool = createKafkaProducerPool(kafkaZkCluster.kafka.brokerList, outputTopic.name)
  ssc.sparkContext.broadcast(pool)}stream.map { ... }.foreachRDD(rdd => {
  rdd.foreachPartition(partitionOfRecords => {
    // Get a producer from the shared pool
    val p = producerPool.value.borrowObject()
    partitionOfRecords.foreach { case tweet: Tweet =>
      // Convert pojo back into Avro binary format
      val bytes = converter.value.apply(tweet)
      // Send the bytes to Kafka
      p.send(bytes)
    }
    // Returning the producer to the pool also shuts it down
    producerPool.value.returnObject(p)
  })})

須要注意的是, Spark Streaming每分鐘都會創建多個RDDs,每一個都會包含多個分區,所以你無需爲Kafka生產者實例創建新的Kafka生產者,更不用說每一個Kafka消息。上面的步驟將最小化Kafka生產者實例的創建數量,同時也會最小化TCP鏈接的數量(一般由Kafka集羣肯定)。你可使用這個池設置來精確地控制對流應用程序可用的Kafka生產者實例數量。若是存在疑惑,儘可能用更少的。

完整示例

下面的代碼是示例Spark Streaming應用程序的要旨(全部代碼參見 這裏 )。這裏,我作一些解釋:

  • 並行地從Kafka topic中讀取Avro-encoded數據。咱們使用了一個最佳的read parallelism,每一個Kafka分區都配置了一個單線程 input DStream。

  • 並行化Avro-encoded數據到pojos中,而後將他們並行寫到binary,序列化能夠經過Twitter Bijection 執行。

  • 經過Kafka生產者池將結果寫回一個不一樣的Kafka topic。

// Set up the input DStream to read from Kafka (in parallel)val kafkaStream = {
  val sparkStreamingConsumerGroup = "spark-streaming-consumer-group"
  val kafkaParams = Map(
    "zookeeper.connect" -> "zookeeper1:2181",
    "group.id" -> "spark-streaming-test",
    "zookeeper.connection.timeout.ms" -> "1000")
  val inputTopic = "input-topic"
  val numPartitionsOfInputTopic = 5
  val streams = (1 to numPartitionsOfInputTopic) map { _ =>
    KafkaUtils.createStream(ssc, kafkaParams, Map(inputTopic -> 1), StorageLevel.MEMORY_ONLY_SER).map(_._2)
  }
  val unifiedStream = ssc.union(streams)
  val sparkProcessingParallelism = 1 // You'd probably pick a higher value than 1 in production.
  unifiedStream.repartition(sparkProcessingParallelism)}// We use accumulators to track global "counters" across the tasks of our streaming appval numInputMessages = ssc.sparkContext.accumulator(0L, "Kafka messages consumed")val numOutputMessages = ssc.sparkContext.accumulator(0L, "Kafka messages produced")// We use a broadcast variable to share a pool of Kafka producers, which we use to write data from Spark to Kafka.val producerPool = {
  val pool = createKafkaProducerPool(kafkaZkCluster.kafka.brokerList, outputTopic.name)
  ssc.sparkContext.broadcast(pool)}// We also use a broadcast variable for our Avro Injection (Twitter Bijection)val converter = ssc.sparkContext.broadcast(SpecificAvroCodecs.toBinary[Tweet])// Define the actual data flow of the streaming jobkafkaStream.map { case bytes =>
  numInputMessages += 1
  // Convert Avro binary data to pojo
  converter.value.invert(bytes) match {
    case Success(tweet) => tweet    case Failure(e) => // ignore if the conversion failed
  }}.foreachRDD(rdd => {
  rdd.foreachPartition(partitionOfRecords => {
    val p = producerPool.value.borrowObject()
    partitionOfRecords.foreach { case tweet: Tweet =>
      // Convert pojo back into Avro binary format
      val bytes = converter.value.apply(tweet)
      // Send the bytes to Kafka
      p.send(bytes)
      numOutputMessages += 1
    }
    producerPool.value.returnObject(p)
  })})// Run the streaming jobssc.start()ssc.awaitTermination()

更多的細節和解釋能夠在這裏看全部源代碼。

就我本身而言,我很是喜歡 Spark Streaming代碼的簡潔和表述。在Bobby Evans和 Tom Graves講話中沒有提到的是,Storm中這個功能的等價代碼是很是繁瑣和低等級的: kafka-storm-starter 中的 KafkaStormSpec 會運行一個Stormtopology來執行相同的計算。同時,規範文件自己只有很是少的代碼,固然是除下說明語言,它們能更好的幫助理解;同時,須要注意的是,在Storm的Java API中,你不能使用上文Spark Streaming 示例中所使用的匿名函數,好比map和foreach步驟。取而代之的是,你必須編寫完整的類來得到相同的功能,你能夠查看 AvroDecoderBolt 。這感受是將Spark的API轉換到Java,在這裏使用匿名函數是很是痛苦的。

最後,我一樣也很是喜歡 Spark的說明文檔 ,它很是適合初學者查看,甚至還包含了一些 進階使用 。關於Kafka整合到Spark,上文已經基本介紹完成,可是咱們仍然須要瀏覽mailing list和深挖源代碼。這裏,我不得不說,維護幫助文檔的同窗作的實在是太棒了。

知曉Spark Streaming中的一些已知問題

你可能已經發如今Spark中仍然有一些還沒有解決的問題,下面我描述一些個人發現:

一方面,在對Kafka進行讀寫上仍然存在一些含糊不清的問題,你能夠在相似Multiple Kafka Receivers and Union  How to scale more consumer to Kafka stream mailing list的討論中發現。

另外一方面,Spark Streaming中一些問題是由於Spark自己的固有問題致使,特別是故障發生時的數據丟失問題。換句話說,這些問題讓你不想在生產環境中使用Spark。

  • 在Spark 1.1版本的驅動中,Spark並不會考慮那些已經接收卻由於種種緣由沒有進行處理的元數據( 點擊這裏查看更多細節 )。所以,在某些狀況下,你的Spark可能會丟失數據。Tathagata Das指出驅動恢復問題會在Spark的1.2版本中解決,當下已經釋放。

  • 1.1版本中的Kafka鏈接器是基於Kafka的高等級消費者API。這樣就會形成一個問題,Spark Streaming不能夠依賴其自身的KafkaInputDStream將數據從Kafka中從新發送,從而沒法解決下游數據丟失問題(好比Spark服務器發生故障)。

  • 有些人甚至認爲這個版本中的Kafka鏈接器不該該投入生產環境使用,由於它是基於Kafka的高等級消費者API。取而代之,Spark應該使用簡單的消費者API(就像Storm中的Kafka spout),它將容許你控制便宜和分區分配肯定性。

  • 可是當下Spark社區已經在致力這些方面的改善,好比Dibyendu Bhattacharya的Kafka鏈接器。後者是Apache Storm Kafka spout的一個端口,它基於Kafka所謂的簡單消費者API,它包含了故障發生情景下一個更好的重放機制。

  • 即便擁有如此多志願者的努力,Spark團隊更願意非特殊狀況下的Kafka故障恢復策略,他們的目標是「在全部轉換中提供強保證,通用的策略」,這一點很是難以理解。從另外一個角度來講,這是浪費Kafka自己的故障恢復策略。這裏確實難以抉擇。

  • 這種狀況一樣也出如今寫入狀況中,極可能會形成數據丟失。

  • Spark的Kafka消費者參數auto.offset.reset的使用一樣與Kafka的策略不一樣。在Kafka中,將auto.offset.reset設置爲最小是消費者將自動的將offset設置爲最小offset,這一般會發生在兩個狀況:第一,在ZooKeeper中不存在已有offsets;第二,已存在offset,可是不在範圍內。而在Spark中,它會始終刪除全部的offsets,並從頭開始。這樣就表明着,當你使用auto.offset.reset = 「smallest」重啓你的應用程序時,你的應用程序將徹底從新處理你的Kafka應用程序。更多詳情能夠在下面的兩個討論中發現: 1  2 

  • Spark-1341:用於控制Spark Streaming中的數據傳輸速度。這個能力能夠用於不少狀況,當你已經受Kafka引發問題所煩惱時(好比auto.offset.reset所形成的),而後可能讓你的應用程序從新處理一些舊數據。可是鑑於這裏並無內置的傳輸速率控制,這個功能可能會致使你的應用程序過載或者內存不足。

在這些故障處理策略和Kafka聚焦的問題以外以外,擴展性和穩定性上的關注一樣不可忽視。再一次,仔細觀看 Bobby和Tom的視頻 以得到更多細節。在Spark使用經驗上,他們都永遠比我更豐富。

固然,我也有個人 評論 ,在 G1 garbage(在Java 1.7.0u4+中) 上可能也會存在問題。可是,我歷來都沒碰到這個問題。

Spark使用技巧和敲門

在我實現這個示例的代碼時,我作了一些重要的筆記。雖然這不是一個全面的指南,可是在你開始Kafka整合時可能發揮必定的做用。它包含了 Spark Streaming programming guide 中的一些信息,也有一些是來自Spark用戶的mailing list。

通用

  • 當你創建你的Spark環境時,對Spark使用的cores數量配置須要特別投入精力。你必須爲Spark配置receiver足夠使用的cores(見下文),固然實際數據處理所須要的cores的數量也要進行配置。在Spark中,每一個receiver都負責一個input DStream。同時,每一個receiver(以及每一個input DStream) occies一個core,這樣作是服務於每一個文件流中的讀取(詳見文檔)。舉個例子,你的做業須要從兩個input streams中讀取數據,可是隻訪問兩個cores,這樣一來,全部數據都只會被讀取而不會被處理。

  • 注意,在一個流應用程序中,你能夠創建多個input DStreams來並行接收多個數據流。在上文從Kafka並行讀取一節中,我曾演示過這個示例做業。

  • 你可使用 broadcast variables在不一樣主機上共享標準、只讀參數,相關細節見下文的優化指導。在示例做業中,我使用了broadcast variables共享了兩個參數:第一,Kafka生產者池(做業經過它將輸出寫入Kafka);第二,encoding/decoding Avro數據的注入(從Twitter Bijection中)。 Passing functions to Spark 

  • 你可使用累加器參數來跟蹤流做業上的全部全局「計數器」,這裏能夠對照Hadoop做業計數器。在示例做業中,我使用累加器分別計數全部消費的Kafka消息,以及全部對Kafka的寫入。若是你對累加器進行命名,它們一樣能夠在Spark UI上展現。

  • 不要忘記import Spark和Spark Streaming環境:

// Required to gain access to RDD transformations via implicits.import org.apache.spark.SparkContext._// Required when working on `PairDStreams` to gain access to e.g. `DStream.reduceByKey`// (versus `DStream.transform(rddBatch => rddBatch.reduceByKey()`) via implicits.//// See also http://spark.apache.org/docs/1.1.0/programming-guide.html#working-with-key-value-pairsimport org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions

若是你是 Twitter Algebird的愛好者,你將會喜歡使用Count-Min Sketch和Spark中的一些特性,表明性的,你會使用reduce或者reduceByWindow這樣的操做(好比,DStreams上的轉換 )。Spark項目包含了 Count-Min Sketch  HyperLogLog 的示例介紹。

若是你須要肯定Algebird數據結構的內存介紹,好比Count-Min Sketch、HyperLogLog或者Bloom Filters,你可使用SparkContext日誌進行查看,更多細節參見 Determining Memory Consumption 

Kafka整合

我前文所述的一些增補:

  • 你可能須要修改Spark Streaming中的一些Kafka消費者配置。舉個例子,若是你須要從Kafka中讀取大型消息,你必須添加fetch.message.max.bytes消費設置。你可使用KafkaUtils.createStream(…)將這樣定製的Kafka參數給Spark Streaming傳送。

測試

  • 首先,肯定 已經 在一個finally bloc或者測試框架的teardown method中使用stop()關閉了StreamingContext 和/或 SparkContext,由於在同一個程序(或者JVM?)中Spark不支持並行運行兩種環境。

  • 根據個人經驗,在使用sbt時,你但願在測試中將你的創建配置到分支JVM中。最起碼在kafka-storm-starter中,測試必須並行運行多個線程,好比ZooKeeper、Kafka和Spark的內存實例。開始時,你能夠參考 build.sbt 

  • 一樣,若是你使用的是Mac OS X,你可能指望關閉JVM上的IPv6用以阻止DNS相關超時。這個問題與Spark無關,你能夠查看 .sbtopts 來得到關閉IPv6的方法。

性能調優

  • 肯定你理解做業中的運行時影響,若是你須要與外部系統通訊,好比Kafka。在使用foreachRDD時,你應該閱讀中 Spark Streaming programming guide 中的Design Patterns一節。舉個例子,個人用例中使用Kafka產生者池來優化 Spark Streaming到Kafka的寫入。在這裏,優化意味着在多個task中共享同一個生產者,這個操做能夠顯著地減小由Kafka集羣創建的新TCP鏈接數。

  • 使用Kryo作序列化,取代默認的Java serialization,詳情能夠訪問 Tuning Spark 。個人例子就使用了Kryo和註冊器,舉個例子,使用Kryo生成的Avro-generated Java類(見 KafkaSparkStreamingRegistrator )。除此以外,在Storm中相似的問題也可使用Kryo來解決。

  • 經過將spark.streaming.unpersist設置爲true將Spark Streaming 做業設置到明確持續的RDDs。這能夠顯著地減小Spark在RDD上的內存使用,同時也能夠改善GC行爲。(點擊訪問 來源 

  • 經過MEMORY_ONLY_SER開始你的儲存級別P&S測試(在這裏,RDD被存儲到序列化對象,每一個分區一個字節)。取代反序列化,這樣作更有空間效率,特別是使用Kryo這樣的高速序列化工具時,可是會增長讀取上的CPU密集操做。這個優化對 Spark Streaming做業也很是有效。對於本地測試來講,你可能並不想使用*_2派生(2=複製因子)。

總結

完整的Spark Streaming示例代碼能夠在 kafka-storm-starter 查看。這個應用包含了Kafka、Zookeeper、Spark,以及上文我講述的示例。

整體來講,我對個人初次Spark Streaming體驗很是滿意。固然,在Spark/Spark Streaming也存在一些須要特別指明的問題,可是我確定Spark社區終將解決這些問題。在這個過程當中,獲得了Spark社區積極和熱情的幫助,同時我也很是期待Spark 1.2版本的新特性。

在大型生產環境中,基於Spark還須要一些TLC才能達到Storm能力,這種狀況我可能將它投入生產環境中麼?大部分狀況下應該不會,更準確的說應該是如今不會。那麼在當下,我又會使用Spark Streaming作什麼樣的處理?這裏有兩個想法,我認爲確定存在更多:

  • 它能夠很是快的原型數據流。若是你由於數據流太大而遭遇擴展性問題,你能夠運行 Spark Streaming,在一些樣本數據或者一部分數據中。

  • 搭配使用Storm和Spark Streaming。舉個例子,你可使用Storm將原始、大規模輸入數據處理到易管理等級,而後使用Spark Streaming來作下一步的分析,由於後者能夠開箱即用大量有趣的算法、計算指令和用例。

感謝Spark社區對大數據領域所做出的貢獻!

 

翻譯/童陽

文章出處:推酷-CSDN

相關文章
相關標籤/搜索