原文出處:大數據最佳實踐
連接:https://mp.weixin.qq.com/s/VlYyzLvTECM5XSRLklGrOg算法
目前的數倉大概分爲離線數倉和實時數倉。離線數倉通常是T+1的數據ETL方案;實時數倉通常是分鐘級別甚至更短的時間內的ETL方案。實時數倉通常是將上游業務庫的數據經過binlog等形式,實時抽取到Kafka,進行實時ETL。但目前主流的實時數倉也會細分爲兩類,一類是標準的實時數倉,全部的ETL過程都經過Spark或Flink等實時計算、落地,也就是說數據從binlog抽取到kafka,後續全部的ETL都是讀取kafka、計算、寫入kafka的形式串聯起來的,這種符合完整的數倉定義;還有一類是簡化的實時數倉,ETL簡化爲有限的兩層,binlog落地到kafka以後,Spark或Flink讀取kafka計算完指標後落地HBase等存儲供外部查詢分析,固然也有經過Kylin或Druid來完成指標計算的。sql
那麼「準實時數倉」又是一種什麼方案呢?數據庫
其實「準實時數倉」是離線數倉的一種簡單的升級,它將離線的、天級別的ETL過程,縮短爲小時或半小時級,但同時又對外提供實時的ODS層數據查詢。縮短離線數倉的計算頻率比較簡單,就是每小時或半小時增量抽取數據,MERGE到ODS層,後續的ETL過程與離線數倉徹底一致。緩存
對外提供實時的ODS層數據查詢有什麼使用場景呢?架構
互聯網公司的業務庫通常都是MySQL,數據量比較大的狀況下,會進行分庫分表;每一個業務庫又會有不一樣的MySQL實例。若是想跨產品查詢數據就會很是麻煩,那誰會跨產品查詢數據呢?客服系統。客服系統通常能夠查詢到某用戶全部的信息,若是用戶的信息分佈在不一樣的MySQL實例、不一樣的庫、不一樣的表,查詢起來必定會涉及到sharding-jdbc,若是各個產品的sharding字段不一樣、算法不一樣,查詢必定會比較慢且很是複雜。此時就會須要有一個數據庫把這些數據彙總到一塊,而數倉的ODS層比較適合作這個工做。併發
準實時數倉」的兩個功能會涉及兩個技術難點:1)數據的增量抽取和增量MERGE;2)提供實時查詢接口。下面針對這兩個技術難點分別介紹對應的解決方案。性能
數據的增量抽取和MERGE。大數據
其實增量抽取還算簡單,就是根據數據的某個字段進行增量抽取,這個字段多是自增的ID或者更新時間。二者有什麼不一樣呢?優化
能夠按照ID抽取的表,其中的數據通常不作更新,只是簡單的追加。那是否是隻須要記錄上次抽取的最大ID,下一次從整個ID開始抽取就能夠了呢?固然不是。ui
如今有如下場景:USER_LOGIN_HISTORY表有10個分表,分別爲USER_LOGIN_HISTORY_0~ USER_LOGIN_HISTORY_9,其中ID是自增的,好比是auto_increment類型。但假如如今有3個併發事務,分配了3個ID值,好比是1/2/3。但這3個併發事務還沒提交的狀況下,又來了3個併發事務,他們的ID應該是4/5/6。假如後面3個事務,提早提交,那麼在進行增量抽數據的時候,當前ID的最大值是6,,很不巧,此時正值進行增量抽取數據,ID爲1/2/3的數據並無抽取進來。那麼這3個事務的數據就再也抽取不過來了!由於下次抽取時,ID的最大值是6,應該會從7開始抽取!
此時應該從上次抽取數據ID的最小值開始抽取,雖然有重複數據,但卻能夠保證抽取的數據不會丟失。也就是說,若是當前批次是3,則應該從批次1抽取數據的最小ID開始抽取。這是爲何呢?請看下圖。
Batch1/batch2/batch3抽取時,MySQL表當前的最大ID分別是1001/2001/4002。Batch2抽取時應該是抽取1001~2001的數據,但很不巧的是,此時ID範圍在1990~1999的10條數據尚未提交,抽取時就會漏掉。Batch3抽取時應該從哪裏開始抽取呢?1001,1990,仍是2001?
理想的狀況應該從1990開始抽取,由於只是漏了1990~1999的數據。但怎麼才能肯定漏了哪些數據呢?答案是不知道。由於你根本不知道抽數時,哪些事務尚未提交。很顯然應該從1001開始抽取。其實簡單來講就是每次都要抽取兩個batch的數據,來避免事務的影響。那必定就是兩個batch嗎?其實此處設置爲兩個batch,實際上是假設事務的最大持續時間小於每一個batch的間隔時間的。對於準實時倉庫來講,每一個batch通常都是小時或半小時,都會比事務最大持續時間大,因此兩個batch就夠了。若是batch時間間隔很小,那麼久多向前推幾個batch就好了。
數據增量抽取後,MERGE就比較簡單了,其實就是用增量數據表與ODS全量表進行FULL JOIN,以增量數據爲準就好了。但還須要考慮的是業務庫的表是否容許物理刪除,好比咱們是不容許物理刪除的,因此FULL JOIN就好了。容許物理刪除就比較麻煩了,增量抽取是沒法查詢到已經刪除的數據的!怎麼辦呢?可使用binlog把業務庫的Delete數據抽取到另一張表,再用它來清洗ODS全量表就行了。
按照數據更新時間抽取,與上面的方案差很少,但有一點須要注意。抽取的時候,只能限制時間的最小值,而不能限制最大值。好比某個batch抽取時,當前時間是「2019年2月22日18:00:00.153」,由於從計算當前時間到實際抽取可能仍是會相差幾毫秒或者幾秒,那麼這期間更新的數據就可能會丟失,由於這些數據可能每次都會在這個間隔內被更新掉!
實時查詢接口。
實時且跨產品、跨庫、跨表查詢的系統通常都是中後臺業務系統,這類系統的特色就是查詢數據源多、查詢結果數據量比較小。通常都是查詢某個或某些用戶的數據。這能夠經過sharding-jdbc或ElasticSearch全文檢索來實現。sharding-jdbc雖然可能會有問題,但實施起來比較簡單,配置好sharding規則、寫好sql就能夠了。
ElasticSearch全文檢索就比較麻煩了,因爲ES沒有完善的sql接口,因此只能先將所需的數據彙總好,這又涉及到多表實時關聯彙總的問題。假設某個查詢結果涉及上游3張表,他們之間的關聯條件又不一樣,在實時彙總時,若是其中一張表的數據沒有到,另外兩張表就沒法入庫,只能是先緩存數據等全部數據到達時再次彙總,實施難度仍是比較大的。
固然也能夠將所需的全部業務庫,抽取到某一個MySQL庫,進行查詢。數據量比較大時,這種方案就會很糟糕。
那比較好的方案是什麼呢?咱們能夠把數據按照邏輯表(分庫分表整合後的表)經過binlog實時抽取到Phoenix,前臺業務系統經過Phoenix的JDBC接口實時查詢。因爲Phoenix支持索引,咱們能夠像使用MySQL同樣查詢Phoenix,固然了SQL可能須要優化。
因爲Phoenix底層是基於HBase作的,能夠承載海量數據的讀寫;並且HIVE也能夠映射Phoenix進行離線查詢。這樣咱們就把實時查詢和離線分析的需求進行了統一!那麼這個方案有沒有什麼問題呢?仍是有一點須要考慮的:實時抽取的準確性如何保證呢?也就是說,binlog到Phoenix過程當中,若是某一條更新日誌丟失了該怎麼辦呢?
很顯然能夠用增量抽取的數據,補充到Phoenix中。那按照上面的增量抽取、補數邏輯是否是就沒事了呢?
其實仍是有問題的,仍然是事務的問題,只不過此次是補數時的事務問題。增量數據通常比較大,那麼耗時就比較久,假設爲3分鐘,那麼這個時間段內,實時更新的數據會不會被覆蓋掉呢?很顯然,必定會。既然會覆蓋,補數時就判斷一下主鍵相同的數據的更新時間嘍,以時間最大的爲準。這仍是有問題的,由於Phoenix默認是不開啓事務的,也就是說,判斷的時候,增量數據是最新的,但更新到Phoenix時,增量數據就不必定是最新的了,由於這個時間差內,實時數據進來了。
那就開啓Phoenix的事務唄,開啓應該能解決這個問題,但目前Phoenix的事務機制仍是Beta版本,並且這還可能帶來性能問題和死鎖問題。
那怎麼解決實時數據和離線增量數據相互覆蓋的問題了?有沒有一箭雙鵰的方案呢?
熟悉HBase的同窗必定知道,HBase有時間戳的概念,經過時間戳又支持多版本的查詢。經過Phoenix插入HBase時,全部列的時間戳都是RegionServer的當前時間,也就是說同一ID插入時,時間戳是遞增的,查詢時只能查詢到最新的數據。那這個跟上面的問題有啥關係呢?
若是咱們把數據的更新時間映射到Phoenix底層HBase表的時間戳,是否是就完美解決事務的問題了呢?很簡單,數據的更新時間映射到HBase的時間戳,實時數據和增量數據,只須要簡單的插入Phoenix就行了,Phoenix查詢時只會查詢最新的數據!
然而,理想是完美的,現實是殘酷的,目前Phoenix不支持不一樣字段映射到HBase的時間戳!
沒辦法,只能改源碼。經過修改Phoenix源碼,咱們使Phoenix支持了ROW_TS這一特殊的字段類型,這個類型的值會寫入HBase的時間戳,也就是說Phoenix插入數據時能夠自由指定時間戳!下面是改造後的結果,很顯然,符合預期。
至此,「準實時數倉」的方案就介紹完了,下面經過架構圖簡單總結一下。
1) 上游MySQL的binlog經過Debezium (或Canal)和flume實時寫入Phoenix。新增字段時,實時修改Phoenix表結構
2) 按照數據更新時間,每小時抽取MySQL增量數據,將該部分數據批量MERGE到Phoenix
3) 天天自動建立Hive到Phoenix表的外部表(也能夠建立Hive到Phoenix底層HBase表的外部表),進行後續的ETL過程。
4) 實時查詢平臺經過JDBC鏈接Phoenix,按照主鍵或索引實時查詢數據