Apache Flink 漫談系列 - JOIN 算子

聊什麼

在《Apache Flink 漫談系列 - SQL概覽》中咱們介紹了JOIN算子的語義和基本的使用方式,介紹過程當中你們發現Apache Flink在語法語義上是遵循ANSI-SQL標準的,那麼再深思一下傳統數據庫爲啥須要有JOIN算子呢?在實現原理上面Apache Flink內部實現和傳統數據庫有什麼區別呢?本篇將詳盡的爲你們介紹傳統數據庫爲何須要JOIN算子,以及JOIN算子在Apache Flink中的底層實現原理和在實際使用中的優化!php

什麼是JOIN

在《Apache Flink 漫談系列 - SQL概覽》中我對JOIN算子有過簡單的介紹,這裏咱們以具體實例的方式讓你們對JOIN算子加深印象。JOIN的本質是分別從N(N>=1)張表中獲取不一樣的字段,進而獲得最完整的記錄行。好比咱們有一個查詢需求:在學生表(學號,姓名,性別),課程表(課程號,課程名,學分)和成績表(學號,課程號,分數)中查詢全部學生的姓名,課程名和考試分數。以下:css

 

爲啥須要JOIN

JOIN的本質是數據拼接,那麼若是咱們將全部數據列存儲在一張大表中,是否是就不須要JOIN了呢?若是真的能將所需的數據都在一張表存儲,我想就真的不須要JOIN的算子了,但現實業務中真的能作到將所需數據放到同一張大表裏面嗎?答案是否認的,核心緣由有2個:mysql

  • 產生數據的源頭可能不是一個系統;sql

  • 產生數據的源頭是同一個系統,可是數據冗餘的沉重代價,迫使咱們會遵循數據庫範式,進行表的設計。簡說NF以下:數據庫

    • 1NF - 列不可再分;
    • 2NF - 符合1NF,而且非主鍵屬性所有依賴於主鍵屬性;
    • 3NF - 符合2NF,而且消除傳遞依賴,即:任何字段不能由其餘字段派生出來;
    • BCNF - 符合3NF,而且主鍵屬性之間無依賴關係。

固然還有 4NF,5NF,不過在實際的數據庫設計過程當中作到BCNF已經足夠了!(並不是否認4NF,5NF存在的意義,只是我的尚未遇到必定要用4NF,5NF的場景,設計每每會按存儲成本,查詢性能等綜合因素考量)ruby

JOIN種類

JOIN 在傳統數據庫中有以下分類:數據結構

  • CROSS JOIN - 交叉鏈接,計算笛卡兒積;併發

  • INNER JOIN - 內鏈接,返回知足條件的記錄;數據庫設計

  • OUTER JOIN分佈式

    • LEFT - 返回左表全部行,右表不存在補NULL;
    • RIGHT - 返回右表全部行,左邊不存在補NULL;
    • FULL - 返回左表和右表的並集,不存在一邊補NULL;
  • SELF JOIN - 自鏈接,將表查詢時候命名不一樣的別名。

JOIN語法

JOIN 在SQL89和SQL92中有不一樣的語法,以INNER JOIN爲例說明:

  • SQL89 - 表之間用「,」逗號分割,連接條件和過濾條件都在Where子句指定:
SELECT 
  a.colA, 
  b.colA
FROM  
  tab1 AS a , tab2 AS b WHERE a.id = b.id and a.other > b.other 
  • SQL92 - SQL92將連接條件在ON子句指定,過濾條件在WHERE子句指定,邏輯更爲清晰:
SELECT 
  a.colA, 
  b.colA
FROM 
  tab1 AS a JOIN tab2 AS b ON a.id = b.id WHERE a.other > b.other 

本篇中的後續示例將應用SQL92語法進行SQL的編寫,語法以下:

tableExpression [ LEFT|RIGHT|FULL|INNER|SELF ] JOIN tableExpression [ ON joinCondition ] [WHERE filterCondition] 

語義示例說明

在《Apache Flink 漫談系列 - SQL概覽》中對JOIN語義有過簡單介紹,這裏會進行展開介紹。 咱們以開篇示例中的三張表:學生表(學號,姓名,性別),課程表(課程號,課程名,學分)和成績表(學號,課程號,分數)來介紹各類JOIN的語義。

CROSS JOIN

交叉鏈接會對兩個表進行笛卡爾積,也就是LEFT表的每一行和RIGHT表的全部行進行聯接,所以生成結果表的行數是兩個錶行數的乘積,如student和course表的CROSS JOIN結果以下:

mysql> SELECT * FROM student JOIN course;
+------+-------+------+-----+-------+--------+
| no | name | sex | no | name | credit | +------+-------+------+-----+-------+--------+ | S001 | Sunny | M | C01 | Java | 2 | | S002 | Tom | F | C01 | Java | 2 | | S003 | Kevin | M | C01 | Java | 2 | | S001 | Sunny | M | C02 | Blink | 3 | | S002 | Tom | F | C02 | Blink | 3 | | S003 | Kevin | M | C02 | Blink | 3 | | S001 | Sunny | M | C03 | Spark | 3 | | S002 | Tom | F | C03 | Spark | 3 | | S003 | Kevin | M | C03 | Spark | 3 | +------+-------+------+-----+-------+--------+ 9 rows in set (0.00 sec) 

如上結果咱們獲得9行=student(3) x course(3)。交叉聯接通常會消耗較大的資源,也被不少用戶質疑交叉聯接存在的意義?(任什麼時候候咱們都有質疑的權利,同時也建議咱們養成本身質疑本身「質疑」的習慣,就像小時候不理解父母的「廢話」同樣)。
咱們以開篇的示例說明交叉聯接的巧妙之一,開篇中咱們的查詢需求是:在學生表(學號,姓名,性別),課程表(課程號,課程名,學分)和成績表(學號,課程號,分數)中查詢全部學生的姓名,課程名和考試分數。開篇中的SQL語句獲得的結果以下:

mysql> SELECT 
    ->   student.name, course.name, score 
    -> FROM student JOIN  score ON student.no = score.s_no 
    ->              JOIN course ON score.c_no = course.no;
+-------+-------+-------+
| name | name | score | +-------+-------+-------+ | Sunny | Java | 80 | | Sunny | Blink | 98 | | Sunny | Spark | 76 | | Kevin | Java | 78 | | Kevin | Blink | 88 | | Kevin | Spark | 68 | +-------+-------+-------+ 6 rows in set (0.00 sec) 

如上INNER JOIN的結果咱們發現少了Tom同窗的成績,緣由是Tom同窗沒有參加考試,在score表中沒有Tom的成績,可是咱們可能但願雖然Tom沒有參加考試但仍然但願Tom的成績可以在查詢結果中顯示(成績 0 分),面對這樣的需求,咱們怎麼處理呢?交叉聯接能夠幫助咱們:

  • 第一步 student和course 進行交叉聯接:
mysql> SELECT 
    ->   stu.no, c.no, stu.name, c.name
    -> FROM student stu JOIN course c  笛卡爾積
    -> ORDER BY stu.no; -- 排序只是方便你們查看:) +------+-----+-------+-------+ | no | no | name | name | +------+-----+-------+-------+ | S001 | C03 | Sunny | Spark | | S001 | C01 | Sunny | Java | | S001 | C02 | Sunny | Blink | | S002 | C03 | Tom | Spark | | S002 | C01 | Tom | Java | | S002 | C02 | Tom | Blink | | S003 | C02 | Kevin | Blink | | S003 | C03 | Kevin | Spark | | S003 | C01 | Kevin | Java | +------+-----+-------+-------+ 9 rows in set (0.00 sec) 
  • 第二步 將交叉聯接的結果與score表進行左外聯接,以下:
mysql> SELECT 
    ->   stu.no, c.no, stu.name, c.name,
    ->    CASE 
    ->     WHEN s.score IS NULL THEN 0 -> ELSE s.score -> END AS score -> FROM student stu JOIN course c -- 迪卡爾積 -> LEFT JOIN score s ON stu.no = s.s_no and c.no = s.c_no -- LEFT OUTER JOIN -> ORDER BY stu.no; -- 排序只是爲了你們好看一點:) +------+-----+-------+-------+-------+ | no | no | name | name | score | +------+-----+-------+-------+-------+ | S001 | C03 | Sunny | Spark | 76 | | S001 | C01 | Sunny | Java | 80 | | S001 | C02 | Sunny | Blink | 98 | | S002 | C02 | Tom | Blink | 0 | -- TOM 雖然沒有參加考試,可是仍然看到他的信息 | S002 | C03 | Tom | Spark | 0 | | S002 | C01 | Tom | Java | 0 | | S003 | C02 | Kevin | Blink | 88 | | S003 | C03 | Kevin | Spark | 68 | | S003 | C01 | Kevin | Java | 78 | +------+-----+-------+-------+-------+ 9 rows in set (0.00 sec) 

通過CROSS JOIN幫咱們將Tom的信息也查詢出來了!(TOM 雖然沒有參加考試,可是仍然看到他的信息)

INNER JOIN

內聯接在SQL92中 ON 表示聯接添加,可選的WHERE子句表示過濾條件,如開篇的示例就是一個多表的內聯接,咱們在看一個簡單的示例: 查詢成績大於80分的學生學號,學生姓名和成績:

mysql> SELECT 
    ->   stu.no, stu.name , s.score
    -> FROM student stu JOIN score s ON  stu.no = s.s_no 
    -> WHERE s.score > 80; +------+-------+-------+ | no | name | score | +------+-------+-------+ | S001 | Sunny | 98 | | S003 | Kevin | 88 | +------+-------+-------+ 2 rows in set (0.00 sec) 

上面按語義的邏輯是:

  • 第一步:先進行student和score的內鏈接,以下:
mysql> SELECT 
    ->   stu.no, stu.name , s.score
    -> FROM student stu JOIN score s ON  stu.no = s.s_no ;
+------+-------+-------+
| no | name | score | +------+-------+-------+ | S001 | Sunny | 80 | | S001 | Sunny | 98 | | S001 | Sunny | 76 | | S003 | Kevin | 78 | | S003 | Kevin | 88 | | S003 | Kevin | 68 | +------+-------+-------+ 6 rows in set (0.00 sec) 
  • 第二步:對內聯結果進行過濾, score > 80 獲得,以下最終結果:
-> WHERE s.score > 80; +------+-------+-------+ | no | name | score | +------+-------+-------+ | S001 | Sunny | 98 | | S003 | Kevin | 88 | +------+-------+-------+ 2 rows in set (0.00 sec) 

上面的查詢過程符合語義,可是若是在filter條件能過濾不少數據的時候,先進行數據的過濾,在進行內聯接會獲取更好的性能,好比咱們手工寫一下:

mysql> SELECT 
    ->   no, name , score
    -> FROM student stu JOIN ( SELECT s_no, score FROM score s WHERE s.score >80) as sc ON no = s_no; +------+-------+-------+ | no | name | score | +------+-------+-------+ | S001 | Sunny | 98 | | S003 | Kevin | 88 | +------+-------+-------+ 2 rows in set (0.00 sec) 

上面寫法語義和第一種寫法語義一致,獲得相同的查詢結果,上面查詢過程是:

  • 第一步:執行過濾子查詢
mysql> SELECT s_no, score FROM score s WHERE s.score >80; +------+-------+ | s_no | score | +------+-------+ | S001 | 98 | | S003 | 88 | +------+-------+ 2 rows in set (0.00 sec) 
  • 第二步:執行內鏈接
-> ON no = s_no;
+------+-------+-------+
| no | name | score | +------+-------+-------+ | S001 | Sunny | 98 | | S003 | Kevin | 88 | +------+-------+-------+ 2 rows in set (0.00 sec) 

如上兩種寫法在語義上一致,但查詢性能在數量很大的狀況下會有很大差距。上面爲了和你們演示相同的查詢語義,能夠有不一樣的查詢方式,不一樣的執行計劃。實際上數據庫自己的優化器會自動進行查詢優化,在內聯接中ON的聯接條件和WHERE的過濾條件具備相同的優先級,具體的執行順序能夠由數據庫的優化器根據性能消耗決定。也就是說物理執行計劃能夠先執行過濾條件進行查詢優化,若是細心的讀者可能發現,在第二個寫法中,子查詢咱們不但有行的過濾,也進行了列的裁剪(去除了對查詢結果沒有用的c_no列),這兩個變化實際上對應了數據庫中兩個優化規則:

  • filter push down
  • project push down

如上優化規則以filter push down 爲例,示意優化器對執行plan的優化變更:

LEFT OUTER JOIN

左外聯接語義是返回左表全部行,右表不存在補NULL,爲了演示做用,咱們查詢沒有參加考試的全部學生的成績單:

mysql> SELECT 
    ->   no, name , s.c_no, s.score
    -> FROM student stu LEFT JOIN score s ON stu.no = s.s_no
    -> WHERE s.score is NULL;
+------+------+------+-------+
| no | name | c_no | score | +------+------+------+-------+ | S002 | Tom | NULL | NULL | +------+------+------+-------+ 1 row in set (0.00 sec) 

上面查詢的執行邏輯上也是分紅兩步:

  • 第一步:左外聯接查詢
mysql> SELECT 
    ->   no, name , s.c_no, s.score
    -> FROM student stu LEFT JOIN score s ON stu.no = s.s_no;
+------+-------+------+-------+
| no | name | c_no | score | +------+-------+------+-------+ | S001 | Sunny | C01 | 80 | | S001 | Sunny | C02 | 98 | | S001 | Sunny | C03 | 76 | | S002 | Tom | NULL | NULL | -- 右表不存在的補NULL | S003 | Kevin | C01 | 78 | | S003 | Kevin | C02 | 88 | | S003 | Kevin | C03 | 68 | +------+-------+------+-------+ 7 rows in set (0.00 sec) 
  • 第二步:過濾查詢
mysql> SELECT 
    ->   no, name , s.c_no, s.score
    -> FROM student stu LEFT JOIN score s ON stu.no = s.s_no
    -> WHERE s.score is NULL;
+------+------+------+-------+
| no | name | c_no | score | +------+------+------+-------+ | S002 | Tom | NULL | NULL | +------+------+------+-------+ 1 row in set (0.00 sec) 

這兩個過程和上面分析的INNER JOIN同樣,可是這時候可否利用上面說的 filter push down的優化呢?根據LEFT OUTER JOIN的語義來說,答案是否認的。咱們手工操做看一下:

  • 第一步:先進行過濾查詢(得到一個空表)
mysql> SELECT * FROM score s WHERE s.score is NULL; Empty set (0.00 sec) 
  • 第二步: 進行左外連接
mysql> SELECT 
    ->   no, name , s.c_no, s.score
    -> FROM student stu LEFT JOIN (SELECT * FROM score s WHERE s.score is NULL) AS s ON stu.no = s.s_no;
+------+-------+------+-------+
| no | name | c_no | score | +------+-------+------+-------+ | S001 | Sunny | NULL | NULL | | S002 | Tom | NULL | NULL | | S003 | Kevin | NULL | NULL | +------+-------+------+-------+ 3 rows in set (0.00 sec) 

咱們發現兩種寫法的結果不一致,第一種寫法只返回Tom沒有參加考試,是咱們預期的。第二種寫法返回了Sunny,Tom和Kevin三名同窗都沒有參加考試,這明顯是非預期的查詢結果。全部LEFT OUTER JOIN不能利用INNER JOIN的 filter push down優化。

RIGHT OUTER JOIN

右外連接語義是返回右表全部行,左邊不存在補NULL,以下:

mysql> SELECT 
    ->   s.c_no, s.score, no, name
    -> FROM score s RIGHT JOIN student stu ON stu.no = s.s_no;
+------+-------+------+-------+
| c_no | score | no | name | +------+-------+------+-------+ | C01 | 80 | S001 | Sunny | | C02 | 98 | S001 | Sunny | | C03 | 76 | S001 | Sunny | | NULL | NULL | S002 | Tom | -- 左邊沒有的進行補 NULL | C01 | 78 | S003 | Kevin | | C02 | 88 | S003 | Kevin | | C03 | 68 | S003 | Kevin | +------+-------+------+-------+ 7 rows in set (0.00 sec) 

上面右外連接我只是將上面左外連接查詢的左右表交換了一下:)。

FULL OUTER JOIN

全外連接語義返回左表和右表的並集,不存在一邊補NULL,用於演示的MySQL數據庫不支持FULL OUTER JOIN。這裏不作演示了。

SELF JOIN

上面介紹的INNER JOIN、OUTER JOIN都是不一樣表之間的聯接查詢,自聯接是一張表以不一樣的別名作爲左右兩個表,能夠進行如上的INNER JOIN和OUTER JOIN。以下看一個INNER 自聯接:

mysql> SELECT * FROM student l JOIN student r where l.no = r.no;
+------+-------+------+------+-------+------+
| no | name | sex | no | name | sex | +------+-------+------+------+-------+------+ | S001 | Sunny | M | S001 | Sunny | M | | S002 | Tom | F | S002 | Tom | F | | S003 | Kevin | M | S003 | Kevin | M | +------+-------+------+------+-------+------+ 3 rows in set (0.00 sec) 
不等值聯接

這裏說的不等值聯接是SQL92語法裏面的ON子句裏面只有不等值聯接,好比:

mysql> SELECT 
    ->   s.c_no, s.score, no, name
    -> FROM score s RIGHT JOIN student stu ON stu.no != s.c_no;
+------+-------+------+-------+
| c_no | score | no | name | +------+-------+------+-------+ | C01 | 80 | S001 | Sunny | | C01 | 80 | S002 | Tom | | C01 | 80 | S003 | Kevin | | C02 | 98 | S001 | Sunny | | C02 | 98 | S002 | Tom | | C02 | 98 | S003 | Kevin | | C03 | 76 | S001 | Sunny | | C03 | 76 | S002 | Tom | | C03 | 76 | S003 | Kevin | | C01 | 78 | S001 | Sunny | | C01 | 78 | S002 | Tom | | C01 | 78 | S003 | Kevin | | C02 | 88 | S001 | Sunny | | C02 | 88 | S002 | Tom | | C02 | 88 | S003 | Kevin | | C03 | 68 | S001 | Sunny | | C03 | 68 | S002 | Tom | | C03 | 68 | S003 | Kevin | +------+-------+------+-------+ 18 rows in set (0.00 sec) 

上面這示例,其實沒有什麼實際業務價值,在實際的使用場景中,不等值聯接每每是結合等值聯接,將不等值條件在WHERE子句指定,即, 帶有WHERE子句的等值聯接。

Apache Flink雙流JOIN

  CROSS INNER OUTER SELF ON WHERE
Apache Flink N Y Y Y 必選 可選

Apache Flink目前支持INNER JOIN和LEFT OUTER JOIN(SELF 能夠轉換爲普通的INNER和OUTER)。在語義上面Apache Flink嚴格遵照標準SQL的語義,與上面演示的語義一致。下面我重點介紹Apache Flink中JOIN的實現原理。

雙流JOIN與傳統數據庫表JOIN的區別

傳統數據庫表的JOIN是兩張靜態表的數據聯接,在流上面是 動態表(關於流與動態表的關係請查閱 《Apache Flink 漫談系列 - 流表對偶(duality)性)》,雙流JOIN的數據不斷流入與傳統數據庫表的JOIN有以下3個核心區別:

  • 左右兩邊的數據集合無窮 - 傳統數據庫左右兩個表的數據集合是有限的,雙流JOIN的數據會源源不斷的流入;
  • JOIN的結果不斷產生/更新 - 傳統數據庫表JOIN是一次執行產生最終結果後退出,雙流JOIN會持續不斷的產生新的結果。在 《Apache Flink 漫談系列 - 持續查詢(Continuous Queries)》篇也有相關介紹。
  • 查詢計算的雙邊驅動 - 雙流JOIN因爲左右兩邊的流的速度不同,會致使左邊數據到來的時候右邊數據尚未到來,或者右邊數據到來的時候左邊數據沒有到來,因此在實現中要將左右兩邊的流數據進行保存,以保證JOIN的語義。在Blink中會以State的方式進行數據的存儲。State相關請查看《Apache Flink 漫談系列 - State》篇。

數據Shuffle

分佈式流計算全部數據會進行Shuffle,怎麼才能保障左右兩邊流的要JOIN的數據會在相同的節點進行處理呢?在雙流JOIN的場景,咱們會利用JOIN中ON的聯接key進行partition,確保兩個流相同的聯接key會在同一個節點處理。

數據的保存

不管是INNER JOIN仍是OUTER JOIN 都須要對左右兩邊的流的數據進行保存,JOIN算子會開闢左右兩個State進行數據存儲,左右兩邊的數據到來時候,進行以下操做:

  • LeftEvent到來存儲到LState,RightEvent到來的時候存儲到RState;
  • LeftEvent會去RightState進行JOIN,併發出全部JOIN以後的Event到下游;
  • RightEvent會去LeftState進行JOIN,併發出全部JOIN以後的Event到下游。
簡單場景介紹實現原理

INNER JOIN 實現

JOIN有不少複雜的場景,咱們先以最簡單的場景進行實現原理的介紹,好比:最直接的兩個進行INNER JOIN,好比查詢產品庫存和訂單數量,庫存變化事件流和訂單事件流進行INNER JOIN,JION條件是產品ID,具體以下:

 

雙流JOIN兩邊事件都會存儲到State裏面,如上,事件流按照標號前後流入到join節點,咱們假設右邊流比較快,先流入了3個事件,3個事件會存儲到state中,但由於左邊尚未數據,全部右邊前3個事件流入時候,沒有join結果流出,當左邊第一個事件序號爲4的流入時候,先存儲左邊state,再與右邊已經流入的3個事件進行join,join的結果如圖 三行結果會流入到下游節點sink。當第5號事件流入時候,也會和左邊第4號事件進行join,流出一條jion結果到下游節點。這裏關於INNER JOIN的語義和你們強調兩點:

  • INNER JOIN只有符合JOIN條件時候纔會有JOIN結果流出到下游,好比右邊最早來的1,2,3個事件,流入時候沒有任何輸出,由於左邊尚未能夠JOIN的事件;
  • INNER JOIN兩邊的數據不論如何亂序,都可以保證和傳統數據庫語義一致,由於咱們保存了左右兩個流的全部事件到state中。

LEFT OUTER JOIN 實現

LEFT OUTER JOIN 能夠簡寫 LEFT JOIN,語義上和INNER JOIN的區別是不論右流是否有JOIN的事件,左流的事件都須要流入下游節點,但右流沒有能夠JION的事件時候,右邊的事件補NULL。一樣咱們以最簡單的場景說明LEFT JOIN的實現,好比查詢產品庫存和訂單數量,庫存變化事件流和訂單事件流進行LEFT JOIN,JION條件是產品ID,具體以下:

 

下圖也是表達LEFT JOIN的語義,只是展示方式不一樣:

 

上圖主要關注點是當左邊先流入1,2事件時候,右邊沒有能夠join的事件時候會向下遊發送左邊事件並補NULL向下遊發出,當右邊第一個相同的Join key到來的時候會將左邊先來的事件發出的帶有NULL的事件撤回(對應上面command的-記錄,+表明正向記錄,-表明撤回記錄)。這裏強調三點:

  • 左流的事件當右邊沒有JOIN的事件時候,將右邊事件列補NULL後流向下游;* 當右邊事件流入發現左邊已經有能夠JOIN的key的時候,而且是第一個能夠JOIN上的右邊事件(好比上面的3事件是第一個能夠和左邊JOIN key P001進行JOIN的事件)須要撤回左邊下發的NULL記錄,並下發JOIN完整(帶有右邊事件列)的事件到下游。後續來的4,5,6,8等待後續P001的事件是不會產生撤回記錄的。
  • 在Apache Flink系統內部事件類型分爲正向事件標記爲「+」和撤回事件標記爲「-」。
RIGHT OUTER JOIN 和 FULL OUTER JOIN

RIGHT JOIN內部實現與LEFT JOIN相似, FULL JOIN和LEFT JOIN的區別是左右兩邊都會產生補NULL和撤回的操做。對於State的使用都是類似的,這裏再也不重複說明了。

複雜場景介紹State結構

上面咱們介紹了雙流JOIN會使用State記錄左右兩邊流的事件,同時咱們示例數據的場景也是比較簡單,好比流上沒有更新事件(沒有撤回事件),同時流上沒有重複行事件。那麼咱們嘗試思考下面的事件流在雙流JOIN時候是怎麼處理的?

 

上圖示例是連續產生了2筆銷售數量同樣的訂單,同時在產生一筆銷售數量爲5的訂單以後,又將該訂單取消了(或者退貨了),這樣在事件流上面就會是上圖的示意,這種狀況Blink內部如何支撐呢?
根據JOIN的語義以INNER JOIN爲例,右邊有兩條相同的訂單流入,咱們就應該向下游輸出兩條JOIN結果,當有撤回的事件流入時候,咱們也須要將已經下發下游的JOIN事件撤回,以下:

 

上面的場景以及LEFT JOIN部分介紹的撤回狀況,Apache Flink內部須要處理以下幾個核心點:

  • 記錄重複記錄(完整記錄重複記錄或者記錄相同記錄的個數)
  • 記錄正向記錄和撤回記錄(完整記錄正向和撤回記錄或者記錄個數)
  • 記錄哪一條事件是第一個能夠與左邊事件進行JOIN的事件

雙流JOIN的State數據結構

在Apache Flink內部對不一樣的場景有特殊的數據結構優化,本篇咱們只針對上面說的狀況(通用設計)介紹一下雙流JOIN的State的數據結構和用途:

數據結構

  • Map<JoinKey, Map<rowData, count>>;

    • 第一級MAP的key是Join key,好比示例中的P001, value是流上面的全部完整事件;
    • 第二級MAP的key是行數據,好比示例中的P001, 2,value是相同事件值的個數

數據結構的利用

  • 記錄重複記錄 - 利用第二級MAP的value記錄重複記錄的個數,這樣大大減小存儲和讀取
  • 正向記錄和撤回記錄 - 利用第二級MAP的value記錄,當count=0時候刪除該元素
  • 判斷右邊是否產生撤回記錄 - 根據第一級MAP的value的size來判斷是否產生撤回,只有size由0變成1的時候(第一條和左能夠JOIN的事件)才產生撤回

雙流JOIN的應用優化

構造更新流

咱們在 《Apache Flink 漫談系列 - 持續查詢(Continuous Queries)》篇中以雙流JOIN爲例介紹瞭如何構造業務上的PK source,構造PK source本質上在保證業務語義的同時也是對雙流JOIN的一種優化,好比多級LEFT JOIN會讓流上的數據不斷膨脹,形成JOIN節點性能較慢,JOIN以後的下游節點邊堵(數據量大致使,非熱點)。那麼嫌少流入JOIN的數據,好比構造PK source就會大大減小JOIN數據的膨脹。這裏再也不重複舉例,你們能夠查閱 《Apache Flink 漫談系列 - 持續查詢(Continuous Queries)》 的雙流JOIN示例部分。

NULL形成的熱點

好比咱們有A LEFT JOIN B ON A.aCol = B.bCol LEFT JOIN C ON B.cCol = C.cCol 的業務,JOB的DAG以下:

 

假設在實際業務中有這樣的特色,大部分時候當A事件流入的時候,B尚未能夠JOIN的數據,可是B來的時候,A已經有能夠JOIN的數據了,這特色就會致使,A LEFT JOIN B 會產生大量的 (A, NULL),其中包括B裏面的 cCol 列也是NULL,這時候當與C進行LEFT JOIN的時候,首先Blink內部會利用cCol對AB的JOIN產生的事件流進行Shuffle, cCol是NULL進而是下游節點大量的NULL事件流入,形成熱點。那麼這問題如何解決呢?
咱們能夠改變JOIN的前後順序,來保證A LEFT JOIN B 不會產生NULL的熱點問題,以下:

 

JOIN ReOrder

對於JOIN算子的實現咱們知道左右兩邊的事件都會存儲到State中,在流入事件時候在從另外一邊讀取全部事件進行JOIN計算,這樣的實現邏輯在數據量很大的場景會有必定的state操做瓶頸,咱們某些場景能夠經過業務角度調整JOIN的順序,來消除性能瓶頸,好比:A JOIN B ON A.acol = B.bcol JOIN C ON B.bcol = C.ccol. 這樣的場景,若是 A與B進行JOIN產生數據量很大,可是B與C進行JOIN產生的數據量很小,那麼咱們能夠強制調整JOIN的聯接順序,B JOIN C ON b.bcol = c.ccol JOIN A ON a.acol = b.bcol. 以下示意圖:

 

小結

本篇向你們介紹了數據庫設計範式的要求和實際業務的查詢須要是傳統數據庫JOIN算子存在的緣由,並以具體示例的方式向你們介紹JOIN在數據庫的查詢過程,以及潛在的查詢優化,再以實際的例子介紹Apache Flink上面的雙流JOIN的實現原理和State數據結構設計,最後向你們介紹兩個雙流JOIN的使用優化。

 

原文連接

本文爲雲棲社區原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索