前文有提到過Spark Streaming事務是如何保證exactly once的語義的。app
從spark core程序來說,讀取固定數據來源好比hdfs中,spark只是作爲一個計算框架。負載均衡
而在流處理中,只是多了一個時間維度。框架
若在某一時刻,知道所需處理數據的來源,直接讀取,而不用被動的接收(Receiver),那就是和普通的Spark 程序沒什麼差異了。函數
本文將着重Kafka中direct方式的讀取,以案例切入,跟蹤源碼分析。源碼分析
入口是KafkaUtils,先建立了一個回調函數定義,再獲取到kafka集羣,並獲取到起始偏移量,最後建立一個DirectKafkaInputDStream,用於建立RDD。spa
// KafkaUtils.scala line 473 def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag] ( ssc: StreamingContext, kafkaParams: Map[String, String], topics: Set[String] ): InputDStream[(K, V)] = { val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message) val kc = new KafkaCluster(kafkaParams) val fromOffsets = getFromOffsets(kc, kafkaParams, topics) new DirectKafkaInputDStream[K, V, KD, VD, (K, V)]( ssc, kafkaParams, fromOffsets, messageHandler) }
KafkaCluster 在實例化時沒有任何動做,只是單純的建立對象。.net
接下來獲取每一個partition的偏移量,scala
DStream建立以後,整個DAG回溯的lineage以下:code
DirectKafkaInputDStream -> MappedDStream > FlatMappedDStream -> MappedDStream -> ShuffledDStream -> ForEachDStream對象
當DAG回溯到DirectKafkaInputDStream時,會調用compute。建立KafkaRDD,而且將最新的偏移量保存,以便下次計算新的偏移量。
從當RDD進入計算時,會調用compute。,此處的offsetRanges就是Kafka的TopicAndPartition和對應的偏移量。最後結果就是kafka有多少個partition,spark就會有多少個partition與之對應。
直接抓Kafka數據的方式與Receiver的方式的對比: