必讀:Spark與kafka010整合

版權聲明:本文爲博主原創文章,未經博主贊成不得轉載。

https://blog.csdn.net/rlnLo2pNEfx9c/article/details/79648890 css

SparkStreaming與kafka010整合html

讀本文以前。請先閱讀以前文章:java

必讀:再講Spark與kafka 0.8.2.1+整合
apache

Spark Streaming與kafka 0.10的整合,和0.8版本號的direct Stream方式很是像。Kafka的分區和spark的分區是一一相應的,可以獲取offsets和元數據。bootstrap

API使用起來沒有顯著的差異。這個整合版本號標記爲experimental。因此API有可能改變。api

project依賴緩存

首先,加入依賴。安全

groupId = org.apache.sparksession

artifactId = spark-streaming-kafka-0-10_2.11app

version = 2.2.1

不要手動加入org.apache.kafka相關的依賴。如kafka-clients。

spark-streaming-kafka-0-10已經包括相關的依賴了,不一樣的版本號會有不一樣程度的不兼容。

代碼案例

首先導入包正確的包org.apache.spark.streaming.kafka010

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
ssc = new StreamingContext(sparkConf, Milliseconds(1000))
val preferredHosts = LocationStrategies.PreferConsistent
val kafkaParams = Map[String, Object](
 "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
 "key.deserializer" -> classOf[StringDeserializer],
 "value.deserializer" -> classOf[StringDeserializer],
 "group.id" -> "use_a_separate_group_id_for_each_stream",
 "auto.offset.reset" -> "latest",
 "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
 ssc,
 preferredHosts,
 Subscribe[String, String](topics, kafkaParams)
)

stream.map(record => (record.key, record.value))


kafka的參數,請參考kafka官網。假設。你的spark批次時間超過了kafka的心跳時間(30s),需要添加heartbeat.interval.ms和session.timeout.ms。好比。批處理時間是5min,那麼就需要調整group.max.session.timeout.ms。注意。樣例中是將enable.auto.commit設置爲了false。

LocationStrategies(本地策略)

新版本號的消費者API會預取消息入buffer。

所以,爲了提高性能,在Executor端緩存消費者(而不是每個批次又一次建立)是很是有必要的,優先調度那些分區到已經有了合適消費者主機上。

在很是多狀況下,你需要像上文同樣使用LocationStrategies.PreferConsistent,這個參數會將分區儘可能均勻地分配到所有的可以Executor上去。

假設。你的Executor和kafka broker在同一臺機器上,可以用PreferBrokers。這將優先將分區調度到kafka分區leader所在的主機上。最後,分區間負荷有明顯的傾斜,可以用PreferFixed。這個贊成你指定一個明白的分區到主機的映射(沒有指定的分區將會使用連續的地址)。

消費者緩存的數目默認最大值是64。假設你但願處理超過(64*excutor數目)kafka分區。spark.streaming.kafka.consumer.cache.maxCapacity這個參數可以幫助你改動這個值。

假設你想禁止kafka消費者緩存,可以將spark.streaming.kafka.consumer.cache.enabled改動爲false。

禁止緩存緩存可能需要解決SPARK-19185描寫敘述的問題。一旦這個bug解決。這個屬性將會在後期的spark版本號中移除。

Cache是依照topicpartition和groupid進行分組的,因此每次調用creaDirectStream的時候要單獨設置group.id。

ConsumerStrategies(消費策略)

新的kafka消費者api有多個不一樣的方法去指定消費者,當中有些方法需要考慮post-object-instantiation設置。

ConsumerStrategies提供了一個抽象,它贊成spark可以得到正確配置的消費者。即便從Checkpoint從新啓動以後。

ConsumerStrategies.Subscribe,如上面展現的同樣,贊成你訂閱一組固定的集合的主題。

SubscribePattern贊成你使用正則來指定本身感興趣的主題。注意,跟0.8整合不一樣的是,使用subscribe或者subscribepattern在執行stream期間應相應到加入分區。

事實上,Assign執行你指定固定分區的集合。這三種策略都有重載構造函數。贊成您指定特定分區的起始偏移量。

ConsumerStrategy是一個public類。贊成你進行本身定義策略。

建立kafkaRDD

類似於spark streaming的批處理,現在你可以經過指定本身定義偏移範圍本身建立kafkaRDD。

def getKafkaParams(extra: (String, Object)*): JHashMap[String, Object] = {
 val kp = new JHashMap[String, Object]()
 kp.put("bootstrap.servers", kafkaTestUtils.brokerAddress)
 kp.put("key.deserializer", classOf[StringDeserializer])
 kp.put("value.deserializer", classOf[StringDeserializer])
 kp.put("group.id", s"test-consumer-${Random.nextInt}-${System.currentTimeMillis}")
 extra.foreach(e => kp.put(e._1, e._2))
 kp
}

val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
// Import dependencies and create kafka params as in Create Direct Stream above

val offsetRanges = Array(
 // topic, partition, inclusive starting offset, exclusive ending offset
 OffsetRange("test", 0, 0, 100),
 OffsetRange("test", 1, 0, 100)
)

val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)


注意。在這裏是不能使用PreferBrokers的。因爲不是流處理的話就沒有driver端的消費者幫助你尋找元數據。必須使用PreferFixed,而後本身指定元數據

你們可以進入createRDD裏面。看其源代碼。事實上就是依據你的參數封裝成了RDD,跟流式批處理是一致的。

def createRDD[K, V](
   sc: SparkContext,
   kafkaParams: ju.Map[String, Object],
   offsetRanges: Array[OffsetRange],
   locationStrategy: LocationStrategy
 ): RDD[ConsumerRecord[K, V]] = {
 val preferredHosts = locationStrategy match {
   case PreferBrokers =>
     throw new AssertionError(
       "If you want to prefer brokers, you must provide a mapping using PreferFixed " +
       "A single KafkaRDD does not have a driver consumer and cannot look up brokers for you.")
   case PreferConsistent => ju.Collections.emptyMap[TopicPartition, String]()
   case PreferFixed(hostMap) => hostMap
 }
 val kp = new ju.HashMap[String, Object](kafkaParams)
 fixKafkaParams(kp)
 val osr = offsetRanges.clone()

 new KafkaRDD[K, V](sc, kp, osr, preferredHosts, true)
}

獲取偏移

Spark Streaming與kafka整合是執行你獲取其消費的偏移的,詳細方法例如如下:

stream.foreachRDD { rdd =>
 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
 rdd.foreachPartition { iter =>
   val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
   println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
 }
}

注意。HashOffsetRanges只在spark計算鏈條的開始才幹類型轉換成功。要知道kafka分區和spark分區的一一相應關係在Shuffle後就會喪失,比方reduceByKey()或者window()。

存儲偏移

Kafka在有可能存在任務失敗的狀況下的從消息傳輸語義(至少一次。最多一次,剛好一次)是取決於什麼時候存儲offset。Spark輸出操做是至少一次傳輸語義。因此,假設你想實現只一次的消費語義,你必需要麼在密等輸出後存儲offset,要麼就是offset的存儲和結果輸出是一次事務。

現在kafka有了3種方式,來提升可靠性(以及代碼複雜性),用於存儲偏移量。

1, Checkpoint

假設使能了Checkpoint,offset被存儲到Checkpoint。

這個儘管很是easy作到,但是也有一些缺點。因爲會屢次輸出結果,因此結果輸出必須是知足冪等性。

同一時候事務性不可選。另外,假設代碼變動,你是不可以從Checkpoint恢復的。針對代碼升級更新操做,你可以同一時候執行你的新任務和舊任務(因爲你的輸出結果是冪等性)。對於之外的故障,並且同一時候代碼變動了,確定會丟失數據的,除非另有方式來識別啓動消費的偏移。

2。 Kafka自身

Kafka提供的有api。可以將offset提交到指定的kafkatopic。默認狀況下,新的消費者會週期性的本身主動提交offset到kafka。但是有些狀況下,這也會有些問題,因爲消息可能已經被消費者從kafka拉去出來。但是spark還沒處理,這樣的狀況下會致使一些錯誤。

這也是爲何樣例中stream將enable.auto.commit設置爲了false。

然而在已經提交spark輸出結果以後。你可以手動提交偏移到kafka。

相對於Checkpoint,offset存儲到kafka的優勢是kafka既是一個容錯的存儲系統,也是可以避免代碼變動帶來的麻煩。提交offsetkafka和結果輸出也不是一次事務,因此也要求你的輸出結果是知足冪等性。

stream.foreachRDD { rdd =>
 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

 // some time later, after outputs have completed
 stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

因爲帶有HasOffsetRanges。到CanCommitOffsets的轉換將會在剛執行createDirectStream以後成功,而不是通過各類操做算子後。

commitAsync是線程安全的。必須在結果提交後進行執行。

3。 本身定義存儲位置

對於輸出解僱支持事務的狀況,可以將offset和輸出結果在同一個事務內部提交,這樣即便在失敗的狀況下也可以保證二者同步。

假設您關心檢測反覆或跳過的偏移範圍。回滾事務可以防止反覆或丟失的消息。

這至關於一次語義。也可以使用這樣的策略,甚至是聚合所產生的輸出,聚合產生的輸出通常是很是難生成冪等的。代碼演示樣例

// The details depend on your data store, but the general idea looks like this

// begin from the the offsets committed to the database
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
 new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
}.toMap

val stream = KafkaUtils.createDirectStream[String, String](
 streamingContext,
 PreferConsistent,
 Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)

stream.foreachRDD { rdd =>
 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

 val results = yourCalculation(rdd)

 // begin your transaction

 // update results
 // update offsets where the end of existing offsets matches the beginning of this batch of offsets
 // assert that offsets were updated correctly

 // end your transaction
}

SSL/TLS配置使用

新的kafka消費者支持SSL。只需要在執行createDirectStream / createRDD以前設置kafkaParams。

注意。這隻應用與Spark和kafkabroker之間的通信。仍然負責分別確保節點間通訊的安全。

val kafkaParams = Map[String, Object](
 // the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
 "security.protocol" -> "SSL",
 "ssl.truststore.location" -> "/some-directory/kafka.client.truststore.jks",
 "ssl.truststore.password" -> "test1234",
 "ssl.keystore.location" -> "/some-directory/kafka.client.keystore.jks",
 "ssl.keystore.password" -> "test1234",
 "ssl.key.password" -> "test1234"
)

Spark相關書籍,請進入浪尖微店。

推薦閱讀:

1。Hdfs的數據磁盤大小不均衡怎樣處理

2。數據科學的工做流程

3,大數據基礎系列之spark的監控體系介紹

4,金融反欺詐場景下的Spark實踐

640?wx_fmt=png

相關文章
相關標籤/搜索