Kafka 0.10的Spark Streaming集成設計與0.8 Direct Stream方法相似。 它提供了簡單的並行性,Kafka分區和Spark分區之間的1:1對應關係,以及對偏移量和元數據的訪問。 可是,因爲較新的集成使用新的Kafka消費者API而不是簡單的API,因此在使用上存在顯着差別。 這個版本的集成被標記爲實驗,因此API可能會有變化。java
對於使用SBT / Maven項目定義的Scala / Java應用程序,請將您的流應用程序與如下工件連接起來。正則表達式
1 groupId = org.apache.spark 2 artifactId = spark-streaming-kafka-0-10_2.11 3 version = 2.2.0
不要在org.apache.kafka構件上手動添加依賴項(例如kafka-clients)。 spark-streaming-kafka-0-10已經具備適當的傳遞依賴性,不一樣的版本可能在診斷的方式上不兼容。apache
請注意,導入的名稱空間包含版本org.apache.spark.streaming.kafka010編程
1 import org.apache.kafka.clients.consumer.ConsumerRecord 2 import org.apache.kafka.common.serialization.StringDeserializer 3 import org.apache.spark.streaming.kafka010._ 4 import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent 5 import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe 6 7 val kafkaParams = Map[String, Object]( 8 "bootstrap.servers" -> "localhost:9092,anotherhost:9092", 9 "key.deserializer" -> classOf[StringDeserializer], 10 "value.deserializer" -> classOf[StringDeserializer], 11 "group.id" -> "use_a_separate_group_id_for_each_stream", 12 "auto.offset.reset" -> "latest", 13 "enable.auto.commit" -> (false: java.lang.Boolean) 14 ) 15 16 val topics = Array("topicA", "topicB") 17 val stream = KafkaUtils.createDirectStream[String, String]( 18 streamingContext, 19 PreferConsistent, 20 Subscribe[String, String](topics, kafkaParams) 21 ) 22 23 stream.map(record => (record.key, record.value))
流中的每一個項目都是一個ConsumerRecord(消費者記錄)bootstrap
有關可能的kafkaParams,請參閱Kafka使用者配置文檔。 若是您的Spark批處理持續時間大於默認Kafka心跳會話超時(30秒),請適當增長heartbeat.interval.ms和session.timeout.ms。 對於大於5分鐘的批次,這將須要更改代理上的group.max.session.timeout.ms。 請注意,該示例將enable.auto.commit設置爲false,有關討論,請參閱下面的「存儲偏移量」。緩存
新的Kafka消費者API將預取消息到緩衝區中。所以,性能方面的緣由是Spark集成將緩存的消費者保留在執行者上(而不是爲每一個批次從新建立它們),而且傾向於在具備相應使用者的主機位置上調度分區。安全
在大多數狀況下,您應該使用LocationStrategies.PreferConsistent,如上所示。這會將分區平均分配給可用的執行者。若是您的執行者(executors )與您的Kafka經紀人(brokers)位於同一個主機上,請使用PreferBrokers,它將優先爲該分區的Kafka領導安排分區。最後,若是分區間負載存在明顯偏移,請使用PreferFixed。這容許您指定分區到主機的明確映射(任何未指定的分區將使用一致的位置)。session
消費者緩存的默認最大大小爲64.若是您但願處理多於(執行者數量爲64 *)的Kafka分區,則能夠經過spark.streaming.kafka.consumer.cache.maxCapacity更改此設置。ide
若是您想禁用Kafka使用者的緩存,則能夠將spark.streaming.kafka.consumer.cache.enabled設置爲false。可能須要禁用緩存來解決SPARK-19185中描述的問題。 SPARK-19185解決後,該屬性可能會在更高版本的Spark中刪除。函數
緩存由topicpartition和group.id鍵入,所以每次調用createDirectStream時都要使用一個單獨的group.id。
新的Kafka消費者API有許多不一樣的方式來指定主題,其中一些須要大量的後對象實例化設置。 ConsumerStrategies提供了一個抽象,容許Spark從檢查點從新啓動後便可得到正確配置的使用者。
ConsumerStrategies.Subscribe,如上所示,容許您訂閱固定的主題集合。 SubscribePattern容許您使用正則表達式來指定感興趣的主題。 請注意,與0.8集成不一樣,使用Subscribe或SubscribePattern應該在正在運行的流中添加分區。 最後,Assign容許你指定一個固定的分區集合。 全部這三種策略都有重載的構造函數,容許您指定特定分區的起始偏移量。
若是您具備以上選項不能知足的特定消費者設置需求,則ConsumerStrategy是能夠擴展的公共類。
若是您有一個更適合批處理的用例,則能夠爲已定義的偏移範圍建立一個RDD。
1 // Import dependencies and create kafka params as in Create Direct Stream above 2 3 val offsetRanges = Array( 4 // topic, partition, inclusive starting offset, exclusive ending offset 5 OffsetRange("test", 0, 0, 100), 6 OffsetRange("test", 1, 0, 100) 7 ) 8 9 val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)
請注意,您不能使用PreferBrokers,由於若是沒有流,則不存在驅動程序方面的消費者爲您自動查找代理元數據。 若有必要,請使用PreferFixed與您本身的元數據查找。
1 stream.foreachRDD { rdd => 2 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 3 rdd.foreachPartition { iter => 4 val o: OffsetRange = offsetRanges(TaskContext.get.partitionId) 5 println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") 6 } 7 }
請注意,HasOffsetRanges的類型轉換隻會在createDirectStream的結果中調用的第一個方法中完成,而不是稍後向下的一系列方法。 請注意,RDD分區與Kafka分區之間的一對一映射在任何混洗或從新分區的方法(例如, reduceByKey()或window()。
Kafka交付語義在失敗的狀況下取決於如何以及什麼時候存儲偏移量。 spark輸出操做至少一次。 因此,若是你想要至關於一次的語義,你必須在冪等輸出以後存儲偏移量,或者在輸出中存儲原子事務的偏移量。 經過這種集成,您能夠選擇3個選項,以提升可靠性(和代碼複雜度),以便如何存儲偏移量。
若是啓用了Spark檢查點,偏移量將被存儲在檢查點中。 這很容易啓用,但也有缺點。 你的輸出操做必須是冪等的,由於你將獲得重複的輸出; 轉換不是一種選擇。 此外,若是應用程序代碼已更改,則沒法從檢查點恢復。 對於計劃升級,能夠經過在舊代碼的同時運行新代碼來緩解這種狀況(由於不管如何輸出須要是冪等的,它們不該該發生衝突)。 可是對於須要更改代碼的意外故障,除非您有另外的方法來識別已知的良好起始偏移量,不然將會丟失數據。
Kafka有一個偏移提交API,用於在特殊的Kafka主題中存儲偏移量。 默認狀況下,新的使用者將按期自動提交偏移量。 這幾乎確定不是你想要的,由於由消費者成功輪詢的消息可能尚未致使Spark輸出操做,致使未定義的語義。 這就是爲何上面的流示例將「enable.auto.commit」設置爲false的緣由。 可是,在知道輸出已存儲以後,可使用commitAsync API將偏移量提交給Kafka。 與檢查點相比,好處在於,不管應用程序代碼如何變化,Kafka都是耐用的商店。 然而,Kafka不是轉換型的,因此你的輸出仍然是冪等的。
1 stream.foreachRDD { rdd => 2 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 3 4 // some time later, after outputs have completed 5 stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) 6 }
和HasOffsetRanges同樣,若是調用createDirectStream的結果,而不是在轉換以後,轉換爲CanCommitOffsets將會成功。 commitAsync調用是線程安全的,可是若是你想要有意義的語義,則必須在輸出以後進行。
對於支持事務的數據存儲,即便在出現故障的狀況下,也能夠在同一個事務中保存偏移量,以保持二者同步。 若是仔細檢測重複或跳過的偏移量範圍,回滾事務將防止重複或丟失的消息影響結果。 這給出了至關於一次的語義。 甚至可使用這種策略,即便對於一般難以產生冪等性的聚合產生的輸出也是如此。
1 // The details depend on your data store, but the general idea looks like this 2 3 // begin from the the offsets committed to the database 4 val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet => 5 new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset") 6 }.toMap 7 8 val stream = KafkaUtils.createDirectStream[String, String]( 9 streamingContext, 10 PreferConsistent, 11 Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets) 12 ) 13 14 stream.foreachRDD { rdd => 15 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 16 17 val results = yourCalculation(rdd) 18 19 // begin your transaction 20 21 // update results 22 // update offsets where the end of existing offsets matches the beginning of this batch of offsets 23 // assert that offsets were updated correctly 24 25 // end your transaction 26 }
新的Kafka使用者支持SSL。 要啓用它,請在傳遞給createDirectStream / createRDD以前適當地設置kafkaParams。 請注意,這僅適用於Spark和Kafkabroker之間的溝通; 您仍負責單獨確保Spark節點間通訊。
1 val kafkaParams = Map[String, Object]( 2 // the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS 3 "security.protocol" -> "SSL", 4 "ssl.truststore.location" -> "/some-directory/kafka.client.truststore.jks", 5 "ssl.truststore.password" -> "test1234", 6 "ssl.keystore.location" -> "/some-directory/kafka.client.keystore.jks", 7 "ssl.keystore.password" -> "test1234", 8 "ssl.key.password" -> "test1234" 9 )
與任何Spark應用程序同樣,spark-submit用於啓動您的應用程序。
對於Scala和Java應用程序,若是您使用SBT或Maven進行項目管理,請將spark-streaming-kafka-0-10_2.11及其依賴項打包到應用程序JAR中。 確保spark-core_2.11和spark-streaming_2.11被標記爲提供的依賴關係,由於這些已經存在於Spark安裝中。 而後使用spark-submit啓動您的應用程序(請參閱主編程指南中的部署)。