在《Apache Flink 漫談系列 - SQL概覽》中咱們介紹了JOIN算子的語義和基本的使用方式,介紹過程當中你們發現Apache Flink在語法語義上是遵循ANSI-SQL標準的,那麼再深思一下傳統數據庫爲啥須要有JOIN算子呢?在實現原理上面Apache Flink內部實現和傳統數據庫有什麼區別呢?本篇將詳盡的爲你們介紹傳統數據庫爲何須要JOIN算子,以及JOIN算子在Apache Flink中的底層實現原理和在實際使用中的優化!php
在《Apache Flink 漫談系列 - SQL概覽》中我對JOIN算子有過簡單的介紹,這裏咱們以具體實例的方式讓你們對JOIN算子加深印象。JOIN的本質是分別從N(N>=1)張表中獲取不一樣的字段,進而獲得最完整的記錄行。好比咱們有一個查詢需求:在學生表(學號,姓名,性別),課程表(課程號,課程名,學分)和成績表(學號,課程號,分數)中查詢全部學生的姓名,課程名和考試分數。以下:css
JOIN的本質是數據拼接,那麼若是咱們將全部數據列存儲在一張大表中,是否是就不須要JOIN了呢?若是真的能將所需的數據都在一張表存儲,我想就真的不須要JOIN的算子了,但現實業務中真的能作到將所需數據放到同一張大表裏面嗎?答案是否認的,核心緣由有2個:mysql
產生數據的源頭可能不是一個系統;sql
產生數據的源頭是同一個系統,可是數據冗餘的沉重代價,迫使咱們會遵循數據庫範式,進行表的設計。簡說NF以下:數據庫
固然還有 4NF,5NF,不過在實際的數據庫設計過程當中作到BCNF已經足夠了!(並不是否認4NF,5NF存在的意義,只是我的尚未遇到必定要用4NF,5NF的場景,設計每每會按存儲成本,查詢性能等綜合因素考量)ruby
JOIN 在傳統數據庫中有以下分類:數據結構
CROSS JOIN - 交叉鏈接,計算笛卡兒積;併發
INNER JOIN - 內鏈接,返回知足條件的記錄;數據庫設計
OUTER JOIN分佈式
SELF JOIN - 自鏈接,將表查詢時候命名不一樣的別名。
JOIN 在SQL89和SQL92中有不一樣的語法,以INNER JOIN爲例說明:
SELECT
a.colA,
b.colA
FROM
tab1 AS a , tab2 AS b WHERE a.id = b.id and a.other > b.other
SELECT
a.colA,
b.colA
FROM
tab1 AS a JOIN tab2 AS b ON a.id = b.id WHERE a.other > b.other
本篇中的後續示例將應用SQL92語法進行SQL的編寫,語法以下:
tableExpression [ LEFT|RIGHT|FULL|INNER|SELF ] JOIN tableExpression [ ON joinCondition ] [WHERE filterCondition]
在《Apache Flink 漫談系列 - SQL概覽》中對JOIN語義有過簡單介紹,這裏會進行展開介紹。 咱們以開篇示例中的三張表:學生表(學號,姓名,性別),課程表(課程號,課程名,學分)和成績表(學號,課程號,分數)來介紹各類JOIN的語義。
交叉鏈接會對兩個表進行笛卡爾積,也就是LEFT表的每一行和RIGHT表的全部行進行聯接,所以生成結果表的行數是兩個錶行數的乘積,如student和course表的CROSS JOIN結果以下:
mysql> SELECT * FROM student JOIN course;
+------+-------+------+-----+-------+--------+
| no | name | sex | no | name | credit | +------+-------+------+-----+-------+--------+ | S001 | Sunny | M | C01 | Java | 2 | | S002 | Tom | F | C01 | Java | 2 | | S003 | Kevin | M | C01 | Java | 2 | | S001 | Sunny | M | C02 | Blink | 3 | | S002 | Tom | F | C02 | Blink | 3 | | S003 | Kevin | M | C02 | Blink | 3 | | S001 | Sunny | M | C03 | Spark | 3 | | S002 | Tom | F | C03 | Spark | 3 | | S003 | Kevin | M | C03 | Spark | 3 | +------+-------+------+-----+-------+--------+ 9 rows in set (0.00 sec)
如上結果咱們獲得9行=student(3) x course(3)。交叉聯接通常會消耗較大的資源,也被不少用戶質疑交叉聯接存在的意義?(任什麼時候候咱們都有質疑的權利,同時也建議咱們養成本身質疑本身「質疑」的習慣,就像小時候不理解父母的「廢話」同樣)。
咱們以開篇的示例說明交叉聯接的巧妙之一,開篇中咱們的查詢需求是:在學生表(學號,姓名,性別),課程表(課程號,課程名,學分)和成績表(學號,課程號,分數)中查詢全部學生的姓名,課程名和考試分數。開篇中的SQL語句獲得的結果以下:
mysql> SELECT
-> student.name, course.name, score
-> FROM student JOIN score ON student.no = score.s_no
-> JOIN course ON score.c_no = course.no;
+-------+-------+-------+
| name | name | score | +-------+-------+-------+ | Sunny | Java | 80 | | Sunny | Blink | 98 | | Sunny | Spark | 76 | | Kevin | Java | 78 | | Kevin | Blink | 88 | | Kevin | Spark | 68 | +-------+-------+-------+ 6 rows in set (0.00 sec)
如上INNER JOIN的結果咱們發現少了Tom同窗的成績,緣由是Tom同窗沒有參加考試,在score表中沒有Tom的成績,可是咱們可能但願雖然Tom沒有參加考試但仍然但願Tom的成績可以在查詢結果中顯示(成績 0 分),面對這樣的需求,咱們怎麼處理呢?交叉聯接能夠幫助咱們:
mysql> SELECT
-> stu.no, c.no, stu.name, c.name
-> FROM student stu JOIN course c 笛卡爾積
-> ORDER BY stu.no; -- 排序只是方便你們查看:) +------+-----+-------+-------+ | no | no | name | name | +------+-----+-------+-------+ | S001 | C03 | Sunny | Spark | | S001 | C01 | Sunny | Java | | S001 | C02 | Sunny | Blink | | S002 | C03 | Tom | Spark | | S002 | C01 | Tom | Java | | S002 | C02 | Tom | Blink | | S003 | C02 | Kevin | Blink | | S003 | C03 | Kevin | Spark | | S003 | C01 | Kevin | Java | +------+-----+-------+-------+ 9 rows in set (0.00 sec)
mysql> SELECT
-> stu.no, c.no, stu.name, c.name,
-> CASE
-> WHEN s.score IS NULL THEN 0 -> ELSE s.score -> END AS score -> FROM student stu JOIN course c -- 迪卡爾積 -> LEFT JOIN score s ON stu.no = s.s_no and c.no = s.c_no -- LEFT OUTER JOIN -> ORDER BY stu.no; -- 排序只是爲了你們好看一點:) +------+-----+-------+-------+-------+ | no | no | name | name | score | +------+-----+-------+-------+-------+ | S001 | C03 | Sunny | Spark | 76 | | S001 | C01 | Sunny | Java | 80 | | S001 | C02 | Sunny | Blink | 98 | | S002 | C02 | Tom | Blink | 0 | -- TOM 雖然沒有參加考試,可是仍然看到他的信息 | S002 | C03 | Tom | Spark | 0 | | S002 | C01 | Tom | Java | 0 | | S003 | C02 | Kevin | Blink | 88 | | S003 | C03 | Kevin | Spark | 68 | | S003 | C01 | Kevin | Java | 78 | +------+-----+-------+-------+-------+ 9 rows in set (0.00 sec)
通過CROSS JOIN幫咱們將Tom的信息也查詢出來了!(TOM 雖然沒有參加考試,可是仍然看到他的信息)
內聯接在SQL92中 ON 表示聯接添加,可選的WHERE子句表示過濾條件,如開篇的示例就是一個多表的內聯接,咱們在看一個簡單的示例: 查詢成績大於80分的學生學號,學生姓名和成績:
mysql> SELECT
-> stu.no, stu.name , s.score
-> FROM student stu JOIN score s ON stu.no = s.s_no
-> WHERE s.score > 80; +------+-------+-------+ | no | name | score | +------+-------+-------+ | S001 | Sunny | 98 | | S003 | Kevin | 88 | +------+-------+-------+ 2 rows in set (0.00 sec)
上面按語義的邏輯是:
mysql> SELECT
-> stu.no, stu.name , s.score
-> FROM student stu JOIN score s ON stu.no = s.s_no ;
+------+-------+-------+
| no | name | score | +------+-------+-------+ | S001 | Sunny | 80 | | S001 | Sunny | 98 | | S001 | Sunny | 76 | | S003 | Kevin | 78 | | S003 | Kevin | 88 | | S003 | Kevin | 68 | +------+-------+-------+ 6 rows in set (0.00 sec)
-> WHERE s.score > 80; +------+-------+-------+ | no | name | score | +------+-------+-------+ | S001 | Sunny | 98 | | S003 | Kevin | 88 | +------+-------+-------+ 2 rows in set (0.00 sec)
上面的查詢過程符合語義,可是若是在filter條件能過濾不少數據的時候,先進行數據的過濾,在進行內聯接會獲取更好的性能,好比咱們手工寫一下:
mysql> SELECT
-> no, name , score
-> FROM student stu JOIN ( SELECT s_no, score FROM score s WHERE s.score >80) as sc ON no = s_no; +------+-------+-------+ | no | name | score | +------+-------+-------+ | S001 | Sunny | 98 | | S003 | Kevin | 88 | +------+-------+-------+ 2 rows in set (0.00 sec)
上面寫法語義和第一種寫法語義一致,獲得相同的查詢結果,上面查詢過程是:
mysql> SELECT s_no, score FROM score s WHERE s.score >80; +------+-------+ | s_no | score | +------+-------+ | S001 | 98 | | S003 | 88 | +------+-------+ 2 rows in set (0.00 sec)
-> ON no = s_no;
+------+-------+-------+
| no | name | score | +------+-------+-------+ | S001 | Sunny | 98 | | S003 | Kevin | 88 | +------+-------+-------+ 2 rows in set (0.00 sec)
如上兩種寫法在語義上一致,但查詢性能在數量很大的狀況下會有很大差距。上面爲了和你們演示相同的查詢語義,能夠有不一樣的查詢方式,不一樣的執行計劃。實際上數據庫自己的優化器會自動進行查詢優化,在內聯接中ON的聯接條件和WHERE的過濾條件具備相同的優先級,具體的執行順序能夠由數據庫的優化器根據性能消耗決定。也就是說物理執行計劃能夠先執行過濾條件進行查詢優化,若是細心的讀者可能發現,在第二個寫法中,子查詢咱們不但有行的過濾,也進行了列的裁剪(去除了對查詢結果沒有用的c_no列),這兩個變化實際上對應了數據庫中兩個優化規則:
如上優化規則以filter push down 爲例,示意優化器對執行plan的優化變更:
左外聯接語義是返回左表全部行,右表不存在補NULL,爲了演示做用,咱們查詢沒有參加考試的全部學生的成績單:
mysql> SELECT
-> no, name , s.c_no, s.score
-> FROM student stu LEFT JOIN score s ON stu.no = s.s_no
-> WHERE s.score is NULL;
+------+------+------+-------+
| no | name | c_no | score | +------+------+------+-------+ | S002 | Tom | NULL | NULL | +------+------+------+-------+ 1 row in set (0.00 sec)
上面查詢的執行邏輯上也是分紅兩步:
mysql> SELECT
-> no, name , s.c_no, s.score
-> FROM student stu LEFT JOIN score s ON stu.no = s.s_no;
+------+-------+------+-------+
| no | name | c_no | score | +------+-------+------+-------+ | S001 | Sunny | C01 | 80 | | S001 | Sunny | C02 | 98 | | S001 | Sunny | C03 | 76 | | S002 | Tom | NULL | NULL | -- 右表不存在的補NULL | S003 | Kevin | C01 | 78 | | S003 | Kevin | C02 | 88 | | S003 | Kevin | C03 | 68 | +------+-------+------+-------+ 7 rows in set (0.00 sec)
mysql> SELECT
-> no, name , s.c_no, s.score
-> FROM student stu LEFT JOIN score s ON stu.no = s.s_no
-> WHERE s.score is NULL;
+------+------+------+-------+
| no | name | c_no | score | +------+------+------+-------+ | S002 | Tom | NULL | NULL | +------+------+------+-------+ 1 row in set (0.00 sec)
這兩個過程和上面分析的INNER JOIN同樣,可是這時候可否利用上面說的 filter push down的優化呢?根據LEFT OUTER JOIN的語義來說,答案是否認的。咱們手工操做看一下:
mysql> SELECT * FROM score s WHERE s.score is NULL; Empty set (0.00 sec)
mysql> SELECT
-> no, name , s.c_no, s.score
-> FROM student stu LEFT JOIN (SELECT * FROM score s WHERE s.score is NULL) AS s ON stu.no = s.s_no;
+------+-------+------+-------+
| no | name | c_no | score | +------+-------+------+-------+ | S001 | Sunny | NULL | NULL | | S002 | Tom | NULL | NULL | | S003 | Kevin | NULL | NULL | +------+-------+------+-------+ 3 rows in set (0.00 sec)
咱們發現兩種寫法的結果不一致,第一種寫法只返回Tom沒有參加考試,是咱們預期的。第二種寫法返回了Sunny,Tom和Kevin三名同窗都沒有參加考試,這明顯是非預期的查詢結果。全部LEFT OUTER JOIN不能利用INNER JOIN的 filter push down優化。
右外連接語義是返回右表全部行,左邊不存在補NULL,以下:
mysql> SELECT
-> s.c_no, s.score, no, name
-> FROM score s RIGHT JOIN student stu ON stu.no = s.s_no;
+------+-------+------+-------+
| c_no | score | no | name | +------+-------+------+-------+ | C01 | 80 | S001 | Sunny | | C02 | 98 | S001 | Sunny | | C03 | 76 | S001 | Sunny | | NULL | NULL | S002 | Tom | -- 左邊沒有的進行補 NULL | C01 | 78 | S003 | Kevin | | C02 | 88 | S003 | Kevin | | C03 | 68 | S003 | Kevin | +------+-------+------+-------+ 7 rows in set (0.00 sec)
上面右外連接我只是將上面左外連接查詢的左右表交換了一下:)。
全外連接語義返回左表和右表的並集,不存在一邊補NULL,用於演示的MySQL數據庫不支持FULL OUTER JOIN。這裏不作演示了。
上面介紹的INNER JOIN、OUTER JOIN都是不一樣表之間的聯接查詢,自聯接是一張表以不一樣的別名作爲左右兩個表,能夠進行如上的INNER JOIN和OUTER JOIN。以下看一個INNER 自聯接:
mysql> SELECT * FROM student l JOIN student r where l.no = r.no;
+------+-------+------+------+-------+------+
| no | name | sex | no | name | sex | +------+-------+------+------+-------+------+ | S001 | Sunny | M | S001 | Sunny | M | | S002 | Tom | F | S002 | Tom | F | | S003 | Kevin | M | S003 | Kevin | M | +------+-------+------+------+-------+------+ 3 rows in set (0.00 sec)
這裏說的不等值聯接是SQL92語法裏面的ON子句裏面只有不等值聯接,好比:
mysql> SELECT
-> s.c_no, s.score, no, name
-> FROM score s RIGHT JOIN student stu ON stu.no != s.c_no;
+------+-------+------+-------+
| c_no | score | no | name | +------+-------+------+-------+ | C01 | 80 | S001 | Sunny | | C01 | 80 | S002 | Tom | | C01 | 80 | S003 | Kevin | | C02 | 98 | S001 | Sunny | | C02 | 98 | S002 | Tom | | C02 | 98 | S003 | Kevin | | C03 | 76 | S001 | Sunny | | C03 | 76 | S002 | Tom | | C03 | 76 | S003 | Kevin | | C01 | 78 | S001 | Sunny | | C01 | 78 | S002 | Tom | | C01 | 78 | S003 | Kevin | | C02 | 88 | S001 | Sunny | | C02 | 88 | S002 | Tom | | C02 | 88 | S003 | Kevin | | C03 | 68 | S001 | Sunny | | C03 | 68 | S002 | Tom | | C03 | 68 | S003 | Kevin | +------+-------+------+-------+ 18 rows in set (0.00 sec)
上面這示例,其實沒有什麼實際業務價值,在實際的使用場景中,不等值聯接每每是結合等值聯接,將不等值條件在WHERE子句指定,即, 帶有WHERE子句的等值聯接。
CROSS | INNER | OUTER | SELF | ON | WHERE | |
---|---|---|---|---|---|---|
Apache Flink | N | Y | Y | Y | 必選 | 可選 |
Apache Flink目前支持INNER JOIN和LEFT OUTER JOIN(SELF 能夠轉換爲普通的INNER和OUTER)。在語義上面Apache Flink嚴格遵照標準SQL的語義,與上面演示的語義一致。下面我重點介紹Apache Flink中JOIN的實現原理。
傳統數據庫表的JOIN是兩張靜態表的數據聯接,在流上面是 動態表(關於流與動態表的關係請查閱 《Apache Flink 漫談系列 - 流表對偶(duality)性)》,雙流JOIN的數據不斷流入與傳統數據庫表的JOIN有以下3個核心區別:
數據Shuffle
分佈式流計算全部數據會進行Shuffle,怎麼才能保障左右兩邊流的要JOIN的數據會在相同的節點進行處理呢?在雙流JOIN的場景,咱們會利用JOIN中ON的聯接key進行partition,確保兩個流相同的聯接key會在同一個節點處理。
數據的保存
不管是INNER JOIN仍是OUTER JOIN 都須要對左右兩邊的流的數據進行保存,JOIN算子會開闢左右兩個State進行數據存儲,左右兩邊的數據到來時候,進行以下操做:
INNER JOIN 實現
JOIN有不少複雜的場景,咱們先以最簡單的場景進行實現原理的介紹,好比:最直接的兩個進行INNER JOIN,好比查詢產品庫存和訂單數量,庫存變化事件流和訂單事件流進行INNER JOIN,JION條件是產品ID,具體以下:
雙流JOIN兩邊事件都會存儲到State裏面,如上,事件流按照標號前後流入到join節點,咱們假設右邊流比較快,先流入了3個事件,3個事件會存儲到state中,但由於左邊尚未數據,全部右邊前3個事件流入時候,沒有join結果流出,當左邊第一個事件序號爲4的流入時候,先存儲左邊state,再與右邊已經流入的3個事件進行join,join的結果如圖 三行結果會流入到下游節點sink。當第5號事件流入時候,也會和左邊第4號事件進行join,流出一條jion結果到下游節點。這裏關於INNER JOIN的語義和你們強調兩點:
LEFT OUTER JOIN 實現
LEFT OUTER JOIN 能夠簡寫 LEFT JOIN,語義上和INNER JOIN的區別是不論右流是否有JOIN的事件,左流的事件都須要流入下游節點,但右流沒有能夠JION的事件時候,右邊的事件補NULL。一樣咱們以最簡單的場景說明LEFT JOIN的實現,好比查詢產品庫存和訂單數量,庫存變化事件流和訂單事件流進行LEFT JOIN,JION條件是產品ID,具體以下:
下圖也是表達LEFT JOIN的語義,只是展示方式不一樣:
上圖主要關注點是當左邊先流入1,2事件時候,右邊沒有能夠join的事件時候會向下遊發送左邊事件並補NULL向下遊發出,當右邊第一個相同的Join key到來的時候會將左邊先來的事件發出的帶有NULL的事件撤回(對應上面command的-記錄,+表明正向記錄,-表明撤回記錄)。這裏強調三點:
RIGHT JOIN內部實現與LEFT JOIN相似, FULL JOIN和LEFT JOIN的區別是左右兩邊都會產生補NULL和撤回的操做。對於State的使用都是類似的,這裏再也不重複說明了。
上面咱們介紹了雙流JOIN會使用State記錄左右兩邊流的事件,同時咱們示例數據的場景也是比較簡單,好比流上沒有更新事件(沒有撤回事件),同時流上沒有重複行事件。那麼咱們嘗試思考下面的事件流在雙流JOIN時候是怎麼處理的?
上圖示例是連續產生了2筆銷售數量同樣的訂單,同時在產生一筆銷售數量爲5的訂單以後,又將該訂單取消了(或者退貨了),這樣在事件流上面就會是上圖的示意,這種狀況Blink內部如何支撐呢?
根據JOIN的語義以INNER JOIN爲例,右邊有兩條相同的訂單流入,咱們就應該向下游輸出兩條JOIN結果,當有撤回的事件流入時候,咱們也須要將已經下發下游的JOIN事件撤回,以下:
上面的場景以及LEFT JOIN部分介紹的撤回狀況,Apache Flink內部須要處理以下幾個核心點:
雙流JOIN的State數據結構
在Apache Flink內部對不一樣的場景有特殊的數據結構優化,本篇咱們只針對上面說的狀況(通用設計)介紹一下雙流JOIN的State的數據結構和用途:
數據結構
Map<JoinKey, Map<rowData, count>>;
咱們在 《Apache Flink 漫談系列 - 持續查詢(Continuous Queries)》篇中以雙流JOIN爲例介紹瞭如何構造業務上的PK source,構造PK source本質上在保證業務語義的同時也是對雙流JOIN的一種優化,好比多級LEFT JOIN會讓流上的數據不斷膨脹,形成JOIN節點性能較慢,JOIN以後的下游節點邊堵(數據量大致使,非熱點)。那麼嫌少流入JOIN的數據,好比構造PK source就會大大減小JOIN數據的膨脹。這裏再也不重複舉例,你們能夠查閱 《Apache Flink 漫談系列 - 持續查詢(Continuous Queries)》 的雙流JOIN示例部分。
好比咱們有A LEFT JOIN B ON A.aCol = B.bCol LEFT JOIN C ON B.cCol = C.cCol 的業務,JOB的DAG以下:
假設在實際業務中有這樣的特色,大部分時候當A事件流入的時候,B尚未能夠JOIN的數據,可是B來的時候,A已經有能夠JOIN的數據了,這特色就會致使,A LEFT JOIN B 會產生大量的 (A, NULL),其中包括B裏面的 cCol 列也是NULL,這時候當與C進行LEFT JOIN的時候,首先Blink內部會利用cCol對AB的JOIN產生的事件流進行Shuffle, cCol是NULL進而是下游節點大量的NULL事件流入,形成熱點。那麼這問題如何解決呢?
咱們能夠改變JOIN的前後順序,來保證A LEFT JOIN B 不會產生NULL的熱點問題,以下:
對於JOIN算子的實現咱們知道左右兩邊的事件都會存儲到State中,在流入事件時候在從另外一邊讀取全部事件進行JOIN計算,這樣的實現邏輯在數據量很大的場景會有必定的state操做瓶頸,咱們某些場景能夠經過業務角度調整JOIN的順序,來消除性能瓶頸,好比:A JOIN B ON A.acol = B.bcol JOIN C ON B.bcol = C.ccol. 這樣的場景,若是 A與B進行JOIN產生數據量很大,可是B與C進行JOIN產生的數據量很小,那麼咱們能夠強制調整JOIN的聯接順序,B JOIN C ON b.bcol = c.ccol JOIN A ON a.acol = b.bcol. 以下示意圖:
本篇向你們介紹了數據庫設計範式的要求和實際業務的查詢須要是傳統數據庫JOIN算子存在的緣由,並以具體示例的方式向你們介紹JOIN在數據庫的查詢過程,以及潛在的查詢優化,再以實際的例子介紹Apache Flink上面的雙流JOIN的實現原理和State數據結構設計,最後向你們介紹兩個雙流JOIN的使用優化。
本文爲雲棲社區原創內容,未經容許不得轉載。