https://blog.csdn.net/rlnLo2pNEfx9c/article/details/79648890 css
SparkStreaming與kafka010整合html
讀本文以前。請先閱讀以前文章:java
必讀:再講Spark與kafka 0.8.2.1+整合
apache
Spark Streaming與kafka 0.10的整合,和0.8版本號的direct Stream方式很是像。Kafka的分區和spark的分區是一一相應的,可以獲取offsets和元數據。bootstrap
API使用起來沒有顯著的差異。這個整合版本號標記爲experimental。因此API有可能改變。api
project依賴緩存
首先,加入依賴。安全
groupId = org.apache.sparksession
artifactId = spark-streaming-kafka-0-10_2.11app
version = 2.2.1
不要手動加入org.apache.kafka相關的依賴。如kafka-clients。
spark-streaming-kafka-0-10已經包括相關的依賴了,不一樣的版本號會有不一樣程度的不兼容。
代碼案例
首先導入包正確的包org.apache.spark.streaming.kafka010
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
ssc = new StreamingContext(sparkConf, Milliseconds(1000))
val preferredHosts = LocationStrategies.PreferConsistent
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092,anotherhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
preferredHosts,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => (record.key, record.value))
kafka的參數,請參考kafka官網。假設。你的spark批次時間超過了kafka的心跳時間(30s),需要添加heartbeat.interval.ms和session.timeout.ms。好比。批處理時間是5min,那麼就需要調整group.max.session.timeout.ms。注意。樣例中是將enable.auto.commit設置爲了false。
LocationStrategies(本地策略)
新版本號的消費者API會預取消息入buffer。
所以,爲了提高性能,在Executor端緩存消費者(而不是每個批次又一次建立)是很是有必要的,優先調度那些分區到已經有了合適消費者主機上。
在很是多狀況下,你需要像上文同樣使用LocationStrategies.PreferConsistent,這個參數會將分區儘可能均勻地分配到所有的可以Executor上去。
假設。你的Executor和kafka broker在同一臺機器上,可以用PreferBrokers。這將優先將分區調度到kafka分區leader所在的主機上。最後,分區間負荷有明顯的傾斜,可以用PreferFixed。這個贊成你指定一個明白的分區到主機的映射(沒有指定的分區將會使用連續的地址)。
消費者緩存的數目默認最大值是64。假設你但願處理超過(64*excutor數目)kafka分區。spark.streaming.kafka.consumer.cache.maxCapacity這個參數可以幫助你改動這個值。
假設你想禁止kafka消費者緩存,可以將spark.streaming.kafka.consumer.cache.enabled改動爲false。
禁止緩存緩存可能需要解決SPARK-19185描寫敘述的問題。一旦這個bug解決。這個屬性將會在後期的spark版本號中移除。
Cache是依照topicpartition和groupid進行分組的,因此每次調用creaDirectStream的時候要單獨設置group.id。
ConsumerStrategies(消費策略)
新的kafka消費者api有多個不一樣的方法去指定消費者,當中有些方法需要考慮post-object-instantiation設置。
ConsumerStrategies提供了一個抽象,它贊成spark可以得到正確配置的消費者。即便從Checkpoint從新啓動以後。
ConsumerStrategies.Subscribe,如上面展現的同樣,贊成你訂閱一組固定的集合的主題。
SubscribePattern贊成你使用正則來指定本身感興趣的主題。注意,跟0.8整合不一樣的是,使用subscribe或者subscribepattern在執行stream期間應相應到加入分區。
事實上,Assign執行你指定固定分區的集合。這三種策略都有重載構造函數。贊成您指定特定分區的起始偏移量。
ConsumerStrategy是一個public類。贊成你進行本身定義策略。
建立kafkaRDD
類似於spark streaming的批處理,現在你可以經過指定本身定義偏移範圍本身建立kafkaRDD。
def getKafkaParams(extra: (String, Object)*): JHashMap[String, Object] = {
val kp = new JHashMap[String, Object]()
kp.put("bootstrap.servers", kafkaTestUtils.brokerAddress)
kp.put("key.deserializer", classOf[StringDeserializer])
kp.put("value.deserializer", classOf[StringDeserializer])
kp.put("group.id", s"test-consumer-${Random.nextInt}-${System.currentTimeMillis}")
extra.foreach(e => kp.put(e._1, e._2))
kp
}
val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
// Import dependencies and create kafka params as in Create Direct Stream above
val offsetRanges = Array(
// topic, partition, inclusive starting offset, exclusive ending offset
OffsetRange("test", 0, 0, 100),
OffsetRange("test", 1, 0, 100)
)
val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)
注意。在這裏是不能使用PreferBrokers的。因爲不是流處理的話就沒有driver端的消費者幫助你尋找元數據。必須使用PreferFixed,而後本身指定元數據
你們可以進入createRDD裏面。看其源代碼。事實上就是依據你的參數封裝成了RDD,跟流式批處理是一致的。
def createRDD[K, V](
sc: SparkContext,
kafkaParams: ju.Map[String, Object],
offsetRanges: Array[OffsetRange],
locationStrategy: LocationStrategy
): RDD[ConsumerRecord[K, V]] = {
val preferredHosts = locationStrategy match {
case PreferBrokers =>
throw new AssertionError(
"If you want to prefer brokers, you must provide a mapping using PreferFixed " +
"A single KafkaRDD does not have a driver consumer and cannot look up brokers for you.")
case PreferConsistent => ju.Collections.emptyMap[TopicPartition, String]()
case PreferFixed(hostMap) => hostMap
}
val kp = new ju.HashMap[String, Object](kafkaParams)
fixKafkaParams(kp)
val osr = offsetRanges.clone()
new KafkaRDD[K, V](sc, kp, osr, preferredHosts, true)
}
獲取偏移
Spark Streaming與kafka整合是執行你獲取其消費的偏移的,詳細方法例如如下:
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { iter =>
val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
}
注意。HashOffsetRanges只在spark計算鏈條的開始才幹類型轉換成功。要知道kafka分區和spark分區的一一相應關係在Shuffle後就會喪失,比方reduceByKey()或者window()。
存儲偏移
Kafka在有可能存在任務失敗的狀況下的從消息傳輸語義(至少一次。最多一次,剛好一次)是取決於什麼時候存儲offset。Spark輸出操做是至少一次傳輸語義。因此,假設你想實現只一次的消費語義,你必需要麼在密等輸出後存儲offset,要麼就是offset的存儲和結果輸出是一次事務。
現在kafka有了3種方式,來提升可靠性(以及代碼複雜性),用於存儲偏移量。
1, Checkpoint
假設使能了Checkpoint,offset被存儲到Checkpoint。
這個儘管很是easy作到,但是也有一些缺點。因爲會屢次輸出結果,因此結果輸出必須是知足冪等性。
同一時候事務性不可選。另外,假設代碼變動,你是不可以從Checkpoint恢復的。針對代碼升級更新操做,你可以同一時候執行你的新任務和舊任務(因爲你的輸出結果是冪等性)。對於之外的故障,並且同一時候代碼變動了,確定會丟失數據的,除非另有方式來識別啓動消費的偏移。
2。 Kafka自身
Kafka提供的有api。可以將offset提交到指定的kafkatopic。默認狀況下,新的消費者會週期性的本身主動提交offset到kafka。但是有些狀況下,這也會有些問題,因爲消息可能已經被消費者從kafka拉去出來。但是spark還沒處理,這樣的狀況下會致使一些錯誤。
這也是爲何樣例中stream將enable.auto.commit設置爲了false。
然而在已經提交spark輸出結果以後。你可以手動提交偏移到kafka。
相對於Checkpoint,offset存儲到kafka的優勢是:kafka既是一個容錯的存儲系統,也是可以避免代碼變動帶來的麻煩。提交offset到kafka和結果輸出也不是一次事務,因此也要求你的輸出結果是知足冪等性。
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// some time later, after outputs have completed
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
因爲帶有HasOffsetRanges。到CanCommitOffsets的轉換將會在剛執行createDirectStream以後成功,而不是通過各類操做算子後。
commitAsync是線程安全的。必須在結果提交後進行執行。
3。 本身定義存儲位置
對於輸出解僱支持事務的狀況,可以將offset和輸出結果在同一個事務內部提交,這樣即便在失敗的狀況下也可以保證二者同步。
假設您關心檢測反覆或跳過的偏移範圍。回滾事務可以防止反覆或丟失的消息。
這至關於一次語義。也可以使用這樣的策略,甚至是聚合所產生的輸出,聚合產生的輸出通常是很是難生成冪等的。代碼演示樣例
// The details depend on your data store, but the general idea looks like this
// begin from the the offsets committed to the database
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
}.toMap
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val results = yourCalculation(rdd)
// begin your transaction
// update results
// update offsets where the end of existing offsets matches the beginning of this batch of offsets
// assert that offsets were updated correctly
// end your transaction
}
SSL/TLS配置使用
新的kafka消費者支持SSL。只需要在執行createDirectStream / createRDD以前設置kafkaParams。
注意。這隻應用與Spark和kafkabroker之間的通信。仍然負責分別確保節點間通訊的安全。
val kafkaParams = Map[String, Object](
// the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
"security.protocol" -> "SSL",
"ssl.truststore.location" -> "/some-directory/kafka.client.truststore.jks",
"ssl.truststore.password" -> "test1234",
"ssl.keystore.location" -> "/some-directory/kafka.client.keystore.jks",
"ssl.keystore.password" -> "test1234",
"ssl.key.password" -> "test1234"
)
Spark相關書籍,請進入浪尖微店。
推薦閱讀: