Sparkstreaming and Kafka

簡介

Kafka 0.10的Spark Streaming集成設計與0.8 Direct Stream方法相似。 它提供了簡單的並行性,Kafka分區和Spark分區之間的1:1對應關係,以及對偏移量和元數據的訪問。 可是,因爲較新的集成使用新的Kafka消費者API而不是簡單的API,因此在使用上存在顯着差別。 這個版本的集成被標記爲實驗,因此API可能會有變化。java

LINK(依賴)

對於使用SBT / Maven項目定義的Scala / Java應用程序,請將您的流應用程序與如下工件連接起來。正則表達式

1 groupId = org.apache.spark
2 artifactId = spark-streaming-kafka-0-10_2.11
3 version = 2.2.0

不要在org.apache.kafka構件上手動添加依賴項(例如kafka-clients)。 spark-streaming-kafka-0-10已經具備適當的傳遞依賴性,不一樣的版本可能在診斷的方式上不兼容。apache

Creating a Direct Stream

請注意,導入的名稱空間包含版本org.apache.spark.streaming.kafka010編程

複製代碼

1 import org.apache.kafka.clients.consumer.ConsumerRecord
 2 import org.apache.kafka.common.serialization.StringDeserializer
 3 import org.apache.spark.streaming.kafka010._
 4 import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
 5 import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
 6 
 7 val kafkaParams = Map[String, Object](
 8   "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
 9   "key.deserializer" -> classOf[StringDeserializer],
10   "value.deserializer" -> classOf[StringDeserializer],
11   "group.id" -> "use_a_separate_group_id_for_each_stream",
12   "auto.offset.reset" -> "latest",
13   "enable.auto.commit" -> (false: java.lang.Boolean)
14 )
15 
16 val topics = Array("topicA", "topicB")
17 val stream = KafkaUtils.createDirectStream[String, String](
18   streamingContext,
19   PreferConsistent,
20   Subscribe[String, String](topics, kafkaParams)
21 )
22 
23 stream.map(record => (record.key, record.value))

複製代碼

流中的每一個項目都是一個ConsumerRecord(消費者記錄)bootstrap

有關可能的kafkaParams,請參閱Kafka使用者配置文檔。 若是您的Spark批處理持續時間大於默認Kafka心跳會話超時(30秒),請適當增長heartbeat.interval.ms和session.timeout.ms。 對於大於5分鐘的批次,這將須要更改代理上的group.max.session.timeout.ms。 請注意,該示例將enable.auto.commit設置爲false,有關討論,請參閱下面的「存儲偏移量」。緩存

LocationStrategies(位置策略)

新的Kafka消費者API將預取消息到緩衝區中。所以,性能方面的緣由是Spark集成將緩存的消費者保留在執行者上(而不是爲每一個批次從新建立它們),而且傾向於在具備相應使用者的主機位置上調度分區。安全

在大多數狀況下,您應該使用LocationStrategies.PreferConsistent,如上所示。這會將分區平均分配給可用的執行者。若是您的執行者(executors )與您的Kafka經紀人(brokers)位於同一個主機上,請使用PreferBrokers,它將優先爲該分區的Kafka領導安排分區。最後,若是分區間負載存在明顯偏移,請使用PreferFixed。這容許您指定分區到主機的明確映射(任何未指定的分區將使用一致的位置)。session

消費者緩存的默認最大大小爲64.若是您但願處理多於(執行者數量爲64 *)的Kafka分區,則能夠經過spark.streaming.kafka.consumer.cache.maxCapacity更改此設置。ide

若是您想禁用Kafka使用者的緩存,則能夠將spark.streaming.kafka.consumer.cache.enabled設置爲false。可能須要禁用緩存來解決SPARK-19185中描述的問題。 SPARK-19185解決後,該屬性可能會在更高版本的Spark中刪除。函數

緩存由topicpartition和group.id鍵入,所以每次調用createDirectStream時都要使用一個單獨的group.id。

ConsumerStrategies(消費者策略)

新的Kafka消費者API有許多不一樣的方式來指定主題,其中一些須要大量的後對象實例化設置。 ConsumerStrategies提供了一個抽象,容許Spark從檢查點從新啓動後便可得到正確配置的使用者。

ConsumerStrategies.Subscribe,如上所示,容許您訂閱固定的主題集合。 SubscribePattern容許您使用正則表達式來指定感興趣的主題。 請注意,與0.8集成不一樣,使用Subscribe或SubscribePattern應該在正在運行的流中添加分區。 最後,Assign容許你指定一個固定的分區集合。 全部這三種策略都有重載的構造函數,容許您指定特定分區的起始偏移量。

若是您具備以上選項不能知足的特定消費者設置需求,則ConsumerStrategy是能夠擴展的公共類。

Creating an RDD

若是您有一個更適合批處理的用例,則能夠爲已定義的偏移範圍建立一個RDD。

複製代碼

1 // Import dependencies and create kafka params as in Create Direct Stream above
2 
3 val offsetRanges = Array(
4   // topic, partition, inclusive starting offset, exclusive ending offset
5   OffsetRange("test", 0, 0, 100),
6   OffsetRange("test", 1, 0, 100)
7 )
8 
9 val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)

複製代碼

請注意,您不能使用PreferBrokers,由於若是沒有流,則不存在驅動程序方面的消費者爲您自動查找代理元數據。 若有必要,請使用PreferFixed與您本身的元數據查找。

Obtaining Offsets

複製代碼

1 stream.foreachRDD { rdd =>
2   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
3   rdd.foreachPartition { iter =>
4     val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
5     println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
6   }
7 }

複製代碼

請注意,HasOffsetRanges的類型轉換隻會在createDirectStream的結果中調用的第一個方法中完成,而不是稍後向下的一系列方法。 請注意,RDD分區與Kafka分區之間的一對一映射在任何混洗或從新分區的方法(例如, reduceByKey()或window()。

Storing Offsets

Kafka交付語義在失敗的狀況下取決於如何以及什麼時候存儲偏移量。 spark輸出操做至少一次。 因此,若是你想要至關於一次的語義,你必須在冪等輸出以後存儲偏移量,或者在輸出中存儲原子事務的偏移量。 經過這種集成,您能夠選擇3個選項,以提升可靠性(和代碼複雜度),以便如何存儲偏移量。

Checkpoints

若是啓用了Spark檢查點,偏移量將被存儲在檢查點中。 這很容易啓用,但也有缺點。 你的輸出操做必須是冪等的,由於你將獲得重複的輸出; 轉換不是一種選擇。 此外,若是應用程序代碼已更改,則沒法從檢查點恢復。 對於計劃升級,能夠經過在舊代碼的同時運行新代碼來緩解這種狀況(由於不管如何輸出須要是冪等的,它們不該該發生衝突)。 可是對於須要更改代碼的意外故障,除非您有另外的方法來識別已知的良好起始偏移量,不然將會丟失數據。

Kafka itself

Kafka有一個偏移提交API,用於在特殊的Kafka主題中存儲偏移量。 默認狀況下,新的使用者將按期自動提交偏移量。 這幾乎確定不是你想要的,由於由消費者成功輪詢的消息可能尚未致使Spark輸出操做,致使未定義的語義。 這就是爲何上面的流示例將「enable.auto.commit」設置爲false的緣由。 可是,在知道輸出已存儲以後,可使用commitAsync API將偏移量提交給Kafka。 與檢查點相比,好處在於,不管應用程序代碼如何變化,Kafka都是耐用的商店。 然而,Kafka不是轉換型的,因此你的輸出仍然是冪等的。

1 stream.foreachRDD { rdd =>
2   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
3 
4   // some time later, after outputs have completed
5   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
6 }

和HasOffsetRanges同樣,若是調用createDirectStream的結果,而不是在轉換以後,轉換爲CanCommitOffsets將會成功。 commitAsync調用是線程安全的,可是若是你想要有意義的語義,則必須在輸出以後進行。

Your own data store

對於支持事務的數據存儲,即便在出現故障的狀況下,也能夠在同一個事務中保存偏移量,以保持二者同步。 若是仔細檢測重複或跳過的偏移量範圍,回滾事務將防止重複或丟失的消息影響結果。 這給出了至關於一次的語義。 甚至可使用這種策略,即便對於一般難以產生冪等性的聚合產生的輸出也是如此。

複製代碼

1 // The details depend on your data store, but the general idea looks like this
 2 
 3 // begin from the the offsets committed to the database
 4 val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
 5   new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
 6 }.toMap
 7 
 8 val stream = KafkaUtils.createDirectStream[String, String](
 9   streamingContext,
10   PreferConsistent,
11   Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
12 )
13 
14 stream.foreachRDD { rdd =>
15   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
16 
17   val results = yourCalculation(rdd)
18 
19   // begin your transaction
20 
21   // update results
22   // update offsets where the end of existing offsets matches the beginning of this batch of offsets
23   // assert that offsets were updated correctly
24 
25   // end your transaction
26 }

複製代碼

SSL / TLS

新的Kafka使用者支持SSL。 要啓用它,請在傳遞給createDirectStream / createRDD以前適當地設置kafkaParams。 請注意,這僅適用於Spark和Kafkabroker之間的溝通; 您仍負責單獨確保Spark節點間通訊。

複製代碼

1 val kafkaParams = Map[String, Object](
2   // the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
3   "security.protocol" -> "SSL",
4   "ssl.truststore.location" -> "/some-directory/kafka.client.truststore.jks",
5   "ssl.truststore.password" -> "test1234",
6   "ssl.keystore.location" -> "/some-directory/kafka.client.keystore.jks",
7   "ssl.keystore.password" -> "test1234",
8   "ssl.key.password" -> "test1234"
9 )

複製代碼

Deploying

與任何Spark應用程序同樣,spark-submit用於啓動您的應用程序。

對於Scala和Java應用程序,若是您使用SBT或Maven進行項目管理,請將spark-streaming-kafka-0-10_2.11及其依賴項打包到應用程序JAR中。 確保spark-core_2.11和spark-streaming_2.11被標記爲提供的依賴關係,由於這些已經存在於Spark安裝中。 而後使用spark-submit啓動您的應用程序(請參閱主編程指南中的部署)。

相關文章
相關標籤/搜索