剖析Spark數據分區之Spark streaming & TiSpark

本文來自OPPO互聯網技術團隊,是《剖析Spark數據分區》系列文章的第三篇,本篇咱們將分析Spark streaming,TiSpark中的數據分區。segmentfault

系列一:剖析Spark數據分區之Hadoop分片緩存

系列二:剖析Spark數據分區之Spark RDD分區數據結構

系列三:剖析Spark數據分區之Spark streaming & TiSpark分區架構

1. Kafka +Spark Streaming

Spark Streaming從Kafka接收數據,轉換爲Spark Streaming中的數據結構DStream即離散數據流。併發

數據接收方式有兩種:負載均衡

  1. 使用Receiver接收的舊方法;
  2. 使用Direct拉取的新方法(Spark 1.3引入);

1.1 Receiver方式

當前spark已經不支持該模式。分佈式

receiver模式的並行度由spark.streaming.blockInterval決定,默認是200ms。oop

receiver模式接收block.batch數據後會封裝到RDD中,這裏的block對應RDD中的partition。源碼分析

batchInterval必定的狀況下:性能

  • 減小spark.streaming.Interval參數值,會增大DStream中的partition個數。
  • 建議spark.streaming.Interval最低不能低於50ms。

1.2 Direct方式

Spark會建立跟Kafka partition同樣多的RDD partition,而且會並行從Kafka中讀取數據。因此在Kafka partition和RDD partition之間,有一個一對一的映射關係。

DirectKafkaInputDStream按期生成的RDD的類型是KafkaRDD。

咱們首先看看 KafkaRDD是如何劃分分區的:

它會根據從初始化時接收的offset信息參數,生成KafkaRDDPartition分區;每一個分區對應着Kafka的一個topic partition 的一段數據,這段數據的信息OffsetRange表示, 它保存了數據的位置。

下面咱們詳細分析DirectKafkaInputDStream的compute方法:

經過源碼分析可知:Partition的計算方法是爲topic的每個partition建立一個OffsetRange,全部的OffsetRange生成一個KafkaRDD。

下面咱們分析KafkaRDD的getPartitions方法:

每一個OffsetRange生成一個Partition。

如何增大RDD的分區數,讓每一個partition處理的數據量增大?

經過源碼分析,可經過調小Kafka消息中Topic的分區數目;想要增長RDD的並行度,可經過調大Kafka消息中Topic的分區數目。

2. TiSpark

2.1 TiDB架構

TiDB集羣主要分爲3個組件:

TiDB Server負責接收SQL請求,處理SQL相關的邏輯,並經過PD找到存儲計算所須要的TiKV地址,與TiKV交互獲取數據,最終返回結構;

TiDB Server並不存儲數據,只負責計算,能夠無限水平擴展,能夠經過負載均衡組件如LVS, HAProxy,F5等對外提供的接入地址。

2.2 TiKV Server

TiKV負責數據存儲,從外部看是一個分佈式的提供事物的Key-Value存儲引擎。

存儲數據的基本單位是Region,每一個Region負責存儲一個Key Range (從StartKey到EndKey的左閉右開區間) 區間的數據,每一個TiKV節點會負責多個Region,數據在多個TiKV之間的負載均衡由PD調度,也是以Region爲單位進行調度。

2.3 Region

TiDB 的數據分佈是以 Region 爲單位的。一個 Region 包含了一個範圍內的數據,一般是 96MB 的大小,Region 的 meta 信息包含了 StartKey 和 EndKey 這兩個屬性。

當某個 key >= StartKey && key < EndKey 的時候:咱們就知道了這個 key 所在的 Region,而後咱們就能夠經過查找該 Region 所在的 TiKV 地址,去這個地址讀取這個 key 的數據。

獲取 key 所在的 Region, 是經過向 PD 發送請求完成的。
GetRegion(ctx context.Context, key []byte) (metapb.Region, metapb.Peer, error)

經過調用這個接口,咱們就能夠定位這個 key 所在的 Region 了。

若是須要獲取一個範圍內的多個 Region:咱們會從這個範圍的 StartKey 開始,屢次調用 GetRegion 這個接口,每次返回的 Region 的 EndKey 作爲下次請求的 StartKey,直到返回的 Region 的 EndKey 大於請求範圍的 EndKey。

以上執行過程有一個很明顯的問題:就是咱們每次讀取數據的時候,都須要先去訪問 PD,這樣會給 PD 帶來巨大壓力,同時影響請求的性能。

爲了解決這個問題:tikv-client 實現了一個 RegionCache 的組件,緩存 Region 信息。

當須要定位 key 所在的 Region 的時候:若是 RegionCache 命中,就不須要訪問 PD 了。

RegionCache 內部有兩種數據結構保存 Region信息:

  1. map;用 map 能夠快速根據 region ID 查找到 Region,
  2. b-tree;用 b-tree 能夠根據一個 key 找到包含該 key 的 Region。

嚴格來講:PD 上保存的 Region 信息,也是一層 cache;真正最新的 Region 信息是存儲在 tikv-server 上的,每一個 tikv-server 會本身決定何時進行 Region 分裂。

在 Region 變化的時候,把信息上報給 PD,PD 用上報上來的 Region 信息,知足 tidb-server 的查詢需求。

當咱們從 cache 獲取了 Region 信息,併發送請求之後, tikv-server 會對 Region 信息進行校驗,確保請求的 Region 信息是正確的。

若是由於 Region 分裂,Region 遷移致使了 Region 信息變化。請求的 Region 信息就會過時,這時 tikv-server 就會返回 Region 錯誤。

遇到了 Region 錯誤,咱們就須要清理 RegionCache,從新獲取最新的 Region 信息,並從新發送請求。

2.4 TiSpark架構圖

TiSpark深度整合了Spark Catalyst引擎,使得Spark可以高效的讀取TiKV中存儲的數據進行分佈式計算。

下面分析TiRDD中的getPartitions方法:

經過源碼分析:首先經過splitRangeByRegion獲取keyWithRegionTasks, 對於每個RegionTask建立一個TiPartition。

可見TiSpark的Partition分區數目與TiKV的region數目一致,若是想要提升TiSpark任務的並行度,可修改以下兩個參數值:

  1. region-max-size;
  2. region-split-size;

3. 結尾

經過以上種種狀況的分析,只要咱們能正確的認識在各類場景下分區與task的關係,進而加以實際的對影響分區的參數調優,也可讓數據量大的任務也能快起來,同時能清楚的解答數據分析師的問題。

相關文章
相關標籤/搜索