本文來自官網: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/#physical-partitioningjava
Flink還經過如下函數對轉換後的數據精確流分區進行低級控制(若是須要)。apache
一、自定義分區json
使用用戶定義的分區程序爲每一個元素選擇目標任務。性能優化
dataStream.partitionCustom(partitioner, "someKey")
dataStream.partitionCustom(partitioner, 0)
如簡單的hash 分區(下面的實例不是官網):網絡
val input = env.addSource(source) .map(json => { // json : {"id" : 0, "createTime" : "2019-08-24 11:13:14.942", "amt" : "9.8"} val id = json.get("id").asText() val createTime = json.get("createTime").asText() val amt = json.get("amt").asText() LateDataEvent("key", id, createTime, amt) }) .setParallelism(1) .partitionCustom(new Partitioner[String] { override def partition(key: String, numPartitions: Int): Int = { // numPartitions 是下游算子的併發數 key.hashCode % numPartitions } }, "id") .map(l => { LateDataEvent(l.key, l.id, l.amt, l.createTime) }) .setParallelism(3)
注:key 是傳入的field 的類型併發
二、隨機分區dom
根據均勻分佈隨機分配元素(相似於: random.nextInt(5),0 - 5 在機率上是均勻的)ide
dataStream.shuffle()
源碼:函數
@Internal public class ShufflePartitioner<T> extends StreamPartitioner<T> { private static final long serialVersionUID = 1L; private Random random = new Random(); @Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
// 傳入下游分區數 return random.nextInt(numberOfChannels); } @Override public StreamPartitioner<T> copy() { return new ShufflePartitioner<T>(); } @Override public String toString() { return "SHUFFLE"; } }
三、均勻分區 rebalance
性能
分區元素循環,每一個分區建立相等的負載。在存在數據偏斜時用於性能優化。
dataStream.rebalance()
源碼:
public class RebalancePartitioner<T> extends StreamPartitioner<T> { private static final long serialVersionUID = 1L; private int nextChannelToSendTo; @Override public void setup(int numberOfChannels) { super.setup(numberOfChannels); nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels); } @Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { // 輪訓的發往下游分區 nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels; return nextChannelToSendTo; } public StreamPartitioner<T> copy() { return this; } @Override public String toString() { return "REBALANCE"; } }
四、rescale
分區元素循環到下游操做的子集。若是您但願擁有管道,例如,從源的每一個並行實例扇出到多個映射器的子集以分配負載但又不但願發生rebalance()會產生徹底從新平衡,那麼這很是有用。這將僅須要本地數據傳輸而不是經過網絡傳輸數據,具體取決於其餘配置值,例如TaskManagers的插槽數。
上游操做發送元素的下游操做的子集取決於上游和下游操做的並行度。例如,若是上游操做具備並行性2而且下游操做具備並行性4,則一個上游操做將元素分配給兩個下游操做,而另外一個上游操做將分配給另外兩個下游操做。另外一方面,若是下游操做具備並行性2而上游操做具備並行性4,那麼兩個上游操做將分配到一個下游操做,而另外兩個上游操做將分配到其餘下游操做。在不一樣並行度不是彼此的倍數的狀況下,一個或多個下游操做將具備來自上游操做的不一樣數量的輸入。
dataStream.rescale()
源碼:
public class RescalePartitioner<T> extends StreamPartitioner<T> { private static final long serialVersionUID = 1L; private int nextChannelToSendTo = -1; @Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
if (++nextChannelToSendTo >= numberOfChannels) { nextChannelToSendTo = 0; } return nextChannelToSendTo; } public StreamPartitioner<T> copy() { return this; } @Override public String toString() { return "RESCALE"; } }
很遺憾這段代碼只能看出,上游分區往下游分區發的時候,每一個上游分區內部的數據是輪訓發到下游分區的(沒找到具體分配的地方,從這段代碼debug,一直往上,找到分區出如今 RuntimeEnvironment 的對象裏面,找不具體分配的地方)。
五、廣播
向每一個分區廣播元素。
dataStream.broadcast()