浪尖 浪尖聊大數據java
spark streaming消費kafka,你們都知道有兩種方式,也是面試考基本功常問的:面試
a.基於receiver的機制。這個是spark streaming最基本的方式,spark streaming的receiver會定時生成block,默認是200ms,而後每一個批次生成blockrdd,分區數就是block數。架構以下:apache
b.direct API。這種api就是spark streaming會每一個批次生成一個kafkardd,而後kafkardd的分區數,由spark streaming消費的kafkatopic分區數決定。過程以下:
kafkardd與消費的kafka分區數的關係以下:bootstrap
2.常見積壓問題api
kafka的producer生產數據到kafka,正常狀況下,企業中應該是輪詢或者隨機,以保證kafka分區之間數據是均衡的。數組
在這個前提之下,通常狀況下,假如針對你的數據量,kafka分區數設計合理。實時任務,如spark streaming或者flink,有沒有長時間的停掉,那麼通常不會有有積壓。架構
消息積壓的場景:框架
a.任務掛掉。好比,週五任務掛了,有沒有寫自動拉起腳本,週一早上才處理。那麼spark streaming消費的數據至關於滯後兩天。這個確實新手會遇到。ide
週末不加班,估計會被罵。函數
b.kafka分區數設少了。其實,kafka單分區生產消息的速度qps仍是很高的,可是消費者因爲業務邏輯複雜度的不一樣,會有不一樣的時間消耗,就會出現消費滯後的狀況。
c.kafka消息的key不均勻,致使分區間數據不均衡。kafka生產消息支持指定key,用key攜帶寫信息,可是key要均勻,不然會出現kafka的分區間數據不均衡。
上面三種積壓狀況,企業中很常見,那麼如何處理數據積壓呢?
通常解決辦法,針對性的有如下幾種:
a.任務掛掉致使的消費滯後。
任務啓動從最新的消費,歷史數據採用離線修補。
最重要的是故障拉起腳本要有,還要就是實時框架異常處理能力要強,避免數據不規範致使的不能拉起。
b.任務掛掉致使的消費滯後。
任務啓動從上次提交處消費處理,可是要增長任務的處理能力,好比增長資源,讓任務能儘量的遇上消費最新數據。
c.kafka分區少了。
假設數據量大,直接增長kafka分區是根本,可是也能夠對kafkardd進行repartition,增長一次shuffle。
d.個別分區不均衡。
能夠生產者處能夠給key加隨機後綴,使其均衡。也能夠對kafkardd進行repartition。
3.浪尖的騷操做
其實,以上都不是你們想要的,由於spark streaming生產的kafkardd的分區數,徹底能夠是大於kakfa分區數的。
其實,常常閱讀源碼或者星球的看過浪尖的源碼視頻的朋友應該瞭解,rdd的分區數,是由rdd的getPartitions函數決定。好比kafkardd的getPartitions方法實現以下:
override def getPartitions: Array[Partition] = { offsetRanges.zipWithIndex.map { case (o, i) => new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset) }.toArray }
offsetRanges其實就是一個數組:
val offsetRanges: Array[OffsetRange],
OffsetRange存儲一個kafka分區元數據及其offset範圍,而後進行map操做,轉化爲KafkaRDDPartition。實際上,咱們能夠在這裏下手,將map改成flatmap,而後對offsetrange的範圍進行拆分,可是這個會引起一個問題,浪尖在這裏就不贅述了,你能夠測測。
其實,咱們能夠在offsetRange生成的時候作下轉換。位置是DirectKafkaInputDstream的compute方法。具體實現:
首先,浪尖實現中增長了三個配置,分別是:
是否開啓自動重分區分區 sparkConf.set("enable.auto.repartition","true") 避免沒必要要的重分區操做,增長個閾值,只有該批次要消費的kafka的分區內數據大於該閾值才進行拆分 sparkConf.set("per.partition.offsetrange.threshold","300") 拆分後,每一個kafkardd 的分區數據量。 sparkConf.set("per.partition.after.partition.size","100")
而後,在DirectKafkaInputDstream裏獲取着三個配置,方法以下:
val repartitionStep = _ssc.conf.getInt("per.partition.offsetrange.size",1000) val repartitionThreshold = _ssc.conf.getLong("per.partition.offsetrange.threshold",1000) val enableRepartition = _ssc.conf.getBoolean("enable.auto.repartition",false) 對offsetRanges生成的過程進行改造,只須要增長7行源碼便可。 val offsetRanges = untilOffsets.flatMap{ case (tp, uo) => val fo = currentOffsets(tp) val delta = uo -fo if(enableRepartition&&(repartitionThreshold < delta)){ val offsets = fo to uo by repartitionStep offsets.map(each =>{ val tmpOffset = each + repartitionStep OffsetRange(tp.topic, tp.partition, each, Math.min(tmpOffset,uo)) }).toList }else{ Array(OffsetRange(tp.topic, tp.partition, fo, uo)) } }
測試的主函數以下:
import bigdata.spark.config.Config import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.{SparkConf, TaskContext} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} /* 1. 直接消費新數據,數據離線修補。 2. repartition(10---->100),給足夠多的資源,以便任務逐漸消除滯後的數據。 3. directDstream api 生成的是kafkardd,該rdd與kafka分區一一對應。 */ object kafka010Repartition { def main(args: Array[String]) { // 建立一個批處理時間是2s的context 要增長環境變量 val sparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]") sparkConf.set("enable.auto.repartition","true") sparkConf.set("per.partition.offsetrange.threshold","300") sparkConf.set("per.partition.offsetrange.step","100") val ssc = new StreamingContext(sparkConf, Seconds(5)) // 使用broker和topic建立DirectStream val topicsSet = "test1".split(",").toSet val kafkaParams = Map[String, Object]("bootstrap.servers" -> Config.kafkaHost, "key.deserializer"->classOf[StringDeserializer], "value.deserializer"-> classOf[StringDeserializer], "group.id"->"test1", "auto.offset.reset" -> "earliest", "enable.auto.commit"->(false: java.lang.Boolean)) val messages = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)) messages.transform(rdd=>{ println("partition.size : "+rdd.getNumPartitions) rdd }).foreachRDD(rdd=>{ // rdd.foreachPartition(each=>println(111)) val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges offsetRanges.foreach(o=>{ println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") }) }) ssc.start() ssc.awaitTermination() } }
結果以下:
partition.size : 67 test1 0 447 547 test1 0 547 647 test1 0 647 747 test1 0 747 847 test1 0 847 947 test1 0 947 1047 test1 0 1047 1147 test1 0 1147 1247 test1 0 1247 1347 test1 0 1347 1447 test1 0 1447 1547 test1 0 1547 1647 test1 0 1647 1747 test1 0 1747 1847 test1 0 1847 1947 test1 0 1947 2047 test1 0 2047 2147 test1 0 2147 2247 test1 0 2247 2347 test1 0 2347 2447 test1 0 2447 2547 test1 0 2547 2647 test1 0 2647 2747 test1 0 2747 2847 test1 0 2847 2947 test1 0 2947 3047 test1 0 3047 3147 test1 0 3147 3247 test1 0 3247 3347 test1 0 3347 3447 test1 0 3447 3547 test1 0 3547 3647 test1 0 3647 3747 test1 0 3747 3847 test1 0 3847 3947 test1 0 3947 4047 test1 0 4047 4147 test1 0 4147 4247 test1 0 4247 4347 test1 0 4347 4447 test1 0 4447 4547 test1 0 4547 4647 test1 0 4647 4747 test1 0 4747 4847 test1 0 4847 4947 test1 0 4947 5047 test1 0 5047 5147 test1 0 5147 5247 test1 0 5247 5347 test1 0 5347 5447 test1 0 5447 5547 test1 0 5547 5647 test1 0 5647 5747 test1 0 5747 5847 test1 0 5847 5947 test1 0 5947 6047 test1 0 6047 6147 test1 0 6147 6247 test1 0 6247 6347 test1 0 6347 6447 test1 0 6447 6547 test1 0 6547 6647 test1 0 6647 6747 test1 0 6747 6847 test1 0 6847 6947 test1 0 6947 7047 test1 0 7047 7124
【完】