本文來自OPPO互聯網技術團隊,是《剖析Spark數據分區》系列文章的第三篇,本篇咱們將分析Spark streaming,TiSpark中的數據分區。segmentfault
系列一:剖析Spark數據分區之Hadoop分片緩存
系列二:剖析Spark數據分區之Spark RDD分區數據結構
系列三:剖析Spark數據分區之Spark streaming & TiSpark分區架構
Spark Streaming從Kafka接收數據,轉換爲Spark Streaming中的數據結構DStream即離散數據流。併發
數據接收方式有兩種:負載均衡
當前spark已經不支持該模式。分佈式
receiver模式的並行度由spark.streaming.blockInterval決定,默認是200ms。oop
receiver模式接收block.batch數據後會封裝到RDD中,這裏的block對應RDD中的partition。源碼分析
batchInterval必定的狀況下:性能
Spark會建立跟Kafka partition同樣多的RDD partition,而且會並行從Kafka中讀取數據。因此在Kafka partition和RDD partition之間,有一個一對一的映射關係。
DirectKafkaInputDStream按期生成的RDD的類型是KafkaRDD。
咱們首先看看 KafkaRDD是如何劃分分區的:
它會根據從初始化時接收的offset信息參數,生成KafkaRDDPartition分區;每一個分區對應着Kafka的一個topic partition 的一段數據,這段數據的信息OffsetRange表示, 它保存了數據的位置。
下面咱們詳細分析DirectKafkaInputDStream的compute方法:
經過源碼分析可知:Partition的計算方法是爲topic的每個partition建立一個OffsetRange,全部的OffsetRange生成一個KafkaRDD。
下面咱們分析KafkaRDD的getPartitions方法:
每一個OffsetRange生成一個Partition。
如何增大RDD的分區數,讓每一個partition處理的數據量增大?
經過源碼分析,可經過調小Kafka消息中Topic的分區數目;想要增長RDD的並行度,可經過調大Kafka消息中Topic的分區數目。
TiDB集羣主要分爲3個組件:
TiDB Server負責接收SQL請求,處理SQL相關的邏輯,並經過PD找到存儲計算所須要的TiKV地址,與TiKV交互獲取數據,最終返回結構;
TiDB Server並不存儲數據,只負責計算,能夠無限水平擴展,能夠經過負載均衡組件如LVS, HAProxy,F5等對外提供的接入地址。
TiKV負責數據存儲,從外部看是一個分佈式的提供事物的Key-Value存儲引擎。
存儲數據的基本單位是Region,每一個Region負責存儲一個Key Range (從StartKey到EndKey的左閉右開區間) 區間的數據,每一個TiKV節點會負責多個Region,數據在多個TiKV之間的負載均衡由PD調度,也是以Region爲單位進行調度。
TiDB 的數據分佈是以 Region 爲單位的。一個 Region 包含了一個範圍內的數據,一般是 96MB 的大小,Region 的 meta 信息包含了 StartKey 和 EndKey 這兩個屬性。
當某個 key >= StartKey && key < EndKey 的時候:咱們就知道了這個 key 所在的 Region,而後咱們就能夠經過查找該 Region 所在的 TiKV 地址,去這個地址讀取這個 key 的數據。
獲取 key 所在的 Region, 是經過向 PD 發送請求完成的。
GetRegion(ctx context.Context, key []byte) (metapb.Region, metapb.Peer, error)
經過調用這個接口,咱們就能夠定位這個 key 所在的 Region 了。
若是須要獲取一個範圍內的多個 Region:咱們會從這個範圍的 StartKey 開始,屢次調用 GetRegion 這個接口,每次返回的 Region 的 EndKey 作爲下次請求的 StartKey,直到返回的 Region 的 EndKey 大於請求範圍的 EndKey。
以上執行過程有一個很明顯的問題:就是咱們每次讀取數據的時候,都須要先去訪問 PD,這樣會給 PD 帶來巨大壓力,同時影響請求的性能。
爲了解決這個問題:tikv-client 實現了一個 RegionCache 的組件,緩存 Region 信息。
當須要定位 key 所在的 Region 的時候:若是 RegionCache 命中,就不須要訪問 PD 了。
RegionCache 內部有兩種數據結構保存 Region信息:
嚴格來講:PD 上保存的 Region 信息,也是一層 cache;真正最新的 Region 信息是存儲在 tikv-server 上的,每一個 tikv-server 會本身決定何時進行 Region 分裂。
在 Region 變化的時候,把信息上報給 PD,PD 用上報上來的 Region 信息,知足 tidb-server 的查詢需求。
當咱們從 cache 獲取了 Region 信息,併發送請求之後, tikv-server 會對 Region 信息進行校驗,確保請求的 Region 信息是正確的。
若是由於 Region 分裂,Region 遷移致使了 Region 信息變化。請求的 Region 信息就會過時,這時 tikv-server 就會返回 Region 錯誤。
遇到了 Region 錯誤,咱們就須要清理 RegionCache,從新獲取最新的 Region 信息,並從新發送請求。
TiSpark深度整合了Spark Catalyst引擎,使得Spark可以高效的讀取TiKV中存儲的數據進行分佈式計算。
下面分析TiRDD中的getPartitions方法:
經過源碼分析:首先經過splitRangeByRegion獲取keyWithRegionTasks, 對於每個RegionTask建立一個TiPartition。
可見TiSpark的Partition分區數目與TiKV的region數目一致,若是想要提升TiSpark任務的並行度,可修改以下兩個參數值:
經過以上種種狀況的分析,只要咱們能正確的認識在各類場景下分區與task的關係,進而加以實際的對影響分區的參數調優,也可讓數據量大的任務也能快起來,同時能清楚的解答數據分析師的問題。