你是否面對海量數據查詢緩慢而備受煎熬 ?受限於查詢速度而只能屈服於數據量?想在同一個分析工具下分析不一樣來源的數據?想在不一樣的 BI 工具下仍具備敏捷的查詢響應速度?在 Apache Kylin 面前這些都不是問題,一圖勝千言。前端
好了,你可能想說:「前面的問題你卻是快給我解決啊,這圖我表示看不懂」。別急,「工欲善其事,必先利其器」,對於基本的概念咱們得了解,方能壓住這「麒麟」神獸。java
根據文章開頭提出的系列問題能瞭解到最基本的一點,Apache Kylin 可以解決海量數據查詢的問題。既然要查詢,首先咱們須要有個查詢的入口,Kylin 提供了標準 SQL 做爲對外查詢的接口,你只須要和平時寫 SQL 同樣進行數據查詢便可,只是換了個查詢的平臺,不用關心如何解決查詢緩慢的問題,由於 Kylin 在背後幫你解決了這個問題。算法
「既然不用我瞭解如何作到海量數據亞秒級查詢的,那總得讓我明白怎麼把須要查詢的數據給 Kylin 吧?」sql
這個好說,最先期 Kylin 只支持 Hive 做爲數據源,數據源理解爲你須要查詢的真實數據,現在 Kylin 不只支持 Kafka 消息流做爲數據源,還支持 RDBMS 等數據源,好比 Mysql,下面拿 Hive 舉例講一講數據源。json
Hive 自己的數據源不強制使用特定文件格式,你能夠將 csv 文件做爲 Hive 數據的來源,將 csv 文件中的數據存儲到 Hive 中,儘管此時你能夠直接經過 Hive 查詢數據,但面對海量數據效率是很低的。因而在 Kylin 中咱們僅僅將 Hive 做爲數據存儲的介質,提供數據的來源,隨後咱們會採用一種叫預計算的方式將要查詢的數據存儲到 Cube 中,Kylin 查詢 Cube 中已經計算好的數據。api
以上對於整個 Kylin 有個最淺的認識,一句話歸納就是隻要咱們提供數據源,Kylin 即可以實現海量數據的秒級甚至亞秒級查詢,飛通常的感受。那麼它到底是如何作到的呢?架構
Kylin 可以實現亞秒級查詢,是運用了預計算這個概念,預計算字面上容易理解,就是提早計算好,查詢時直接返回結果。 對於分析師而言,明細數據基本上用不着,而是須要根據業務邏輯聚合以後的數據。好比商家須要根據某個月份商品的銷量調整庫存,此時只須要知道該月份該商品的總銷量便可,對於天天的銷量數據在這種業務場景下沒有利用價值。此時即可以利用預計算的思想,提早將該月份該商品總銷量計算出來,當業務分析師進行查詢時,直接返回計算好的結果便可,這就是預計算的簡單理解。app
在 Kylin 中,應對的不是這種簡單的計算場景,而是對海量數據,多維度的計算。既然想要很好的利用預計算的思想,那如何讓 Kylin 提早準備好這些計算後的結果呢?此時就須要建模,經過建模告訴 Kylin 你想要根據哪些維度計算出什麼值。Kylin 根據定義好的模型,按照不一樣的維度組合進行計算,至關於業務分析師從數據的不一樣角度提出了不一樣的計算需求,Kylin 將各類維度組合計算出來,每個計算結果保存成一個物化視圖,稱爲 Cuboid,全部維度計算的結果組合在一塊兒,也就是全部 Cuboid 做爲一個總體,稱爲 Cube。ide
根據各類維度聚合以後計算的值已經存在 Cube 中了,那麼分析師輸入的 SQL 便不須要查數據源中的數據,直接查詢 Cube 中計算好的數據便可,固然查詢 Cube 還須要將 SQL 查詢轉化成 Cube 的查詢,這個過程稱爲後計算。此時能夠再看一遍文章開頭的架構圖,用戶輸入 SQL,經過各類 Rest API、JDBC/ODBC 接口,來到 Rest 服務層,再轉交給查詢引擎,查詢引擎會解析 SQL,生成基於關係表的邏輯執行計劃,而後將其轉譯爲基於 Cube 的物理執行計劃,最後查詢預計算生成的 Cube 併產生結果。關係表是數據的輸入形式,符合星型模型或雪花模型,簡單理解爲一種關係模型就行了。函數
你可能會產生疑問,一直提到 Cube,感受它只是一個抽象的概念,那它到底存在哪呢?Cube 的存儲實際上由存儲引擎實現,Kylin 將其存儲在 HBase 中。Kylin 將存儲引擎進行了抽象,可使用不一樣的存儲。就如上文提到的數據源,它也通過抽象,能夠實現不一樣的數據源,簡單類比爲 Java 中的接口能夠有不一樣的實現類。
「Cube 是如何生成的呢?」
上面咱們提到 Cube 是 Cuboid 的組合,Cuboid 經過維度組合計算出來,重點是如何計算每一個 Cuboid 維度組合的值,當沒有值時,Cube 其實就至關因而一個骨架,沒有肉。而 Cube 構建引擎正是要填充這個骨架,也就是將數據源中的數據根據維度聚合,計算出相應的值,填充到對應的 Cuboid,Cube 採用分層構建,先計算出 Base Cuboid,也就是包含全部維度的一個 Cuboid,再逐層構建,最後存儲到 HBase 中。實際上 Cube 構建遠不止這麼簡單,中間還有不少原理和細節,而且 Cube 構建算法不只有逐層構建,還有 Fast Cubing 算法,須要進一步探索。
「若是我又新產生了數據該怎麼辦呢?再重複上一次的過程,加載全部數據進行 Cube 構建嗎?」
Cube 有兩種構建方式,全量構建和增量構建。從字面也比較容易理解,全量就是讀取數據源中的全部數據,增量就是讀取數據源中相對於某一個時間段新產生的數據,也就是每次構建只讀取一個時間範圍的數據。這裏又產生一個新的概念,叫 Segment,能夠理解爲根據時間段進行構建的小 Cube,Segment 能夠根據配置以必定的方式進行合併,好比每滿兩個月的數據進行一次合併,全量構建實際上就是隻有一個 Segment,它沒有根據時間進行分割。由此能夠看出全量構建一般適用於事實表的數據不隨時間增加或事實表的數據比較小、更新頻率很低的場景,只有在這樣的場景下,全量構建纔不會形成大開銷。而現實業務中,數據會不斷增長,增量構建纔是應用最普遍構建方式。
「Cube 中必定能知足我全部查詢的狀況嗎?」
固然不是,Cube 根據維度覆蓋到的是大多數查詢狀況,但並不是全部,因此對於 Cube 中查詢不到的時候,Kylin 也可以查詢數據源中的數據,這個概念就叫查詢下壓,此時查詢的速度會較慢。
「爲何預計算有這麼明顯的優點,MPP 大規模並行處理和列式存儲難道不能解決這些問題嗎?」
還真不能。MPP 並行處理是經過增長物理資源,提升並行計算能力,在機器必定的狀況下,當數據量不斷增長,計算時間也將線性增長,爲了計算海量數據不斷增長大量機器那我豈不是得虧本。列式存儲是將數據記錄按列存放,能夠提升讀取的效率,但查詢性能與數據量呈線性相關仍然沒法改變。經過上面的瞭解你已經知道,Kylin 經過預計算的方式打破查詢時間隨數據量呈線性增加的規律,這就是 Kylin 關鍵的思想,經過預計算,用空間換時間。你或許聽過交互式分析這個詞,若是隻有 MPP 和列式存儲,面對海量數據沒法實現交互式,Kylin 則解決了這個問題,你不再用點擊查詢再去喝一杯咖啡了。
經過以上對 Kylin 的基本瞭解,再看看文章開頭的圖,相信你會有新的理解。咱們來總結一下對 Apache Kylin 的認識,Kylin 經過預計算思想,利用大數據計算引擎,將數據源中的海量數據根據維度進行計算,並按必定格式存儲到 HBase 中,解決海量數據場景下,查詢速度不隨數據量增加而線性增加的問題。
接下來咱們着重瞭解下 Cube 構建的過程。經過以上內容的學習,咱們瞭解到,Cube 構建其實是將 Hive 表中的數據轉化成 HBase 的存儲結構,也就是 Key-Value 鍵值對的形式,HBase 中存儲的數據是依據維度計算好的結果。下面學習下 Cube 構建的詳細流程。
當咱們提交 Cube 構建任務時,Kylin 會建立一張臨時的 Hive 平表,根據模型中設置的列,從 Hive 數據源中抽取出來並插入到平表中,後續的構建就基於這張中間表進行。經過 hive cli 查詢 hive 表能夠看到,當咱們點擊構建後,多了一張名爲 kylin_intermediate 開頭的中間表。
在第一步會將抽出來造成中間表的數據存儲在 hdfs 上,這些數據文件大小不均勻,後續 Map 任務處理時間長短會不一致,經過 Hive 的 INSERT INTO … DISTRIBUTE BY 重分佈語句將處理數據的任務均衡。
Extract Fact Table Distinct Columns(獲取不一樣維度的值)
Build Dimension Dictionary(構建維度字典)
經過上一步的維度值構建字典,就是將維度值映射成編碼,能夠節約存儲資源,HBase 的 RowKey 中就不須要使用維度值了,直接使用其對應的編碼就能夠。
保存 Cuboid 的相關統計信息。
建立 HBase 表用於保存以後要生成的 Cube 數據。
計算 Base Cuboid(全部維度的組合)
計算除 Base Cuboid 以外的 Cuboid。
上面兩步是逐層構建 Cuboid,這一步是 In-Mem 快速構建,對內存消耗較高,構建速度更快,在具體執行時會自動選擇一種構建方式,沒被選中的方式會自動跳過。
經過查看任務詳情,我在本例中該步驟被跳過,使用的是逐層構建的方式。
HFile 是 HBase 持久化的存儲文件,也就是 HBase 存儲數據的文件形式。這一步會將構建好的 Cuboid 轉換成 HFile。
將 HFile 轉成 HBase 中的數據。
更新 Cube 相關信息,此時 Cube 已經準備好,更改 Cube 狀態後可進行查詢。
清理 Hive 中間表,經過查看 Hive 的表,發現第一步生成的中間表已經被刪除了。
清理元數據信息中的臨時數據,好比第 3 步中統計的不一樣維度,已經構建好字典了,這些數據再也不須要使用,從 output 中能夠看到刪除掉的相關元數據。
經過頁面點擊進行構建,查看請求 api,查看 Kylin 源碼,能夠看到從前端頁面點擊構建請求,最終調用了 CubeController
的 rebuild
函數,返回一個 JobInstance
實例
/** * Build/Rebuild a cube segment */
@RequestMapping(value = "/{cubeName}/rebuild", method = { RequestMethod.PUT }, produces = { "application/json" })
@ResponseBody
public JobInstance rebuild(@PathVariable String cubeName, @RequestBody JobBuildRequest req) {
return buildInternal(cubeName, new TSRange(req.getStartTime(), req.getEndTime()), null, null, null,
req.getBuildType(), req.isForce() || req.isForceMergeEmptySegment(), req.getPriorityOffset());
}
複製代碼
上一步僅僅是接收請求,獲取部分參數。
private JobInstance buildInternal(String cubeName, TSRange tsRange, SegmentRange segRange, // Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd, String buildType, boolean force, Integer priorityOffset) {
try {
//Returns the name of this principal,獲取提交該任務的用戶
String submitter = SecurityContextHolder.getContext().getAuthentication().getName();
// 根據 Cube 名稱獲取 Cube 實例
CubeInstance cube = jobService.getCubeManager().getCube(cubeName);
// 檢測 Cube 構建的任務數量,默認是 10 個,若是超過了則拋出異常,最終調用瞭如下配置得到最大構建任務數
//public int getMaxBuildingSegments() {
// return Integer.parseInt(getOptional("kylin.cube.max-building-segments", "10"));
// }
checkBuildingSegment(cube);
//提交 Cube 構建任務
return jobService.submitJob(cube, tsRange, segRange, sourcePartitionOffsetStart, sourcePartitionOffsetEnd,
CubeBuildTypeEnum.valueOf(buildType), force, submitter, priorityOffset);
} catch (Throwable e) {
logger.error(e.getLocalizedMessage(), e);
throw new InternalErrorException(e.getLocalizedMessage(), e);
}
}
複製代碼
查看 submitJob
方法。
public JobInstance submitJob(CubeInstance cube, TSRange tsRange, SegmentRange segRange, // Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd, CubeBuildTypeEnum buildType, boolean force, String submitter, Integer priorityOffset) throws IOException {
//經過 Cube 實例進一步獲取 Project 實例,經過封裝好的權限檢查工具 AclUtil 進行權限檢查。
aclEvaluate.checkProjectOperationPermission(cube);
// 提交任務
JobInstance jobInstance = submitJobInternal(cube, tsRange, segRange, sourcePartitionOffsetStart,
sourcePartitionOffsetEnd, buildType, force, submitter, priorityOffset);
return jobInstance;
}
複製代碼
查看 submitJobInternal
方法。
public JobInstance submitJobInternal(CubeInstance cube, TSRange tsRange, SegmentRange segRange, // Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd, // CubeBuildTypeEnum buildType, boolean force, String submitter, Integer priorityOffset) throws IOException {
Message msg = MsgPicker.getMsg();
// 檢測 Cube 的狀態
if (cube.getStatus() == RealizationStatusEnum.DESCBROKEN) {
throw new BadRequestException(String.format(Locale.ROOT, msg.getBUILD_BROKEN_CUBE(), cube.getName()));
}
// 根據 Cube 的元信息,進行簽名校驗,將已有簽名與計算出來的進行比較,判斷 Cube 信息的正確性
checkCubeDescSignature(cube);
// 檢測 Cube 是否符合構建狀態,進一步查看源碼可知若是已經有處於 pending 狀態的 segment,則該 Cube 不能進行構建,拋出異常
checkAllowBuilding(cube);
// 檢測是否能並行構建
if (buildType == CubeBuildTypeEnum.BUILD || buildType == CubeBuildTypeEnum.REFRESH) {
checkAllowParallelBuilding(cube);
}
DefaultChainedExecutable job;
CubeSegment newSeg = null;
try {
if (buildType == CubeBuildTypeEnum.BUILD) {
// 獲取數據源類型,目前支持 Hive,JDBC,Kafka
ISource source = SourceManager.getSource(cube);
// 得到構建範圍
SourcePartition src = new SourcePartition(tsRange, segRange, sourcePartitionOffsetStart,
sourcePartitionOffsetEnd);
// 在此填充數據,我的理解爲對於實時數據,在執行完前面的步驟後又產生了時間間隔,將這部分數據加載進來,並添加一個 segment
src = source.enrichSourcePartitionBeforeBuild(cube, src);
newSeg = getCubeManager().appendSegment(cube, src);
// 經過構建引擎生成新的 job 任務
job = EngineFactory.createBatchCubingJob(newSeg, submitter, priorityOffset);
} else if (buildType == CubeBuildTypeEnum.MERGE) {
newSeg = getCubeManager().mergeSegments(cube, tsRange, segRange, force);
job = EngineFactory.createBatchMergeJob(newSeg, submitter);
} else if (buildType == CubeBuildTypeEnum.REFRESH) {
newSeg = getCubeManager().refreshSegment(cube, tsRange, segRange);
job = EngineFactory.createBatchCubingJob(newSeg, submitter, priorityOffset);
} else {
throw new BadRequestException(String.format(Locale.ROOT, msg.getINVALID_BUILD_TYPE(), buildType));
}
// 將任務添加到任務調度系統
getExecutableManager().addJob(job);
} catch (Exception e) {
if (newSeg != null) {
logger.error("Job submission might failed for NEW segment {}, will clean the NEW segment from cube",
newSeg.getName());
try {
// Remove this segment
getCubeManager().updateCubeDropSegments(cube, newSeg);
} catch (Exception ee) {
// swallow the exception
logger.error("Clean New segment failed, ignoring it", e);
}
}
throw e;
}
// 返回任務實例信息
JobInstance jobInstance = getSingleJobInstance(job);
return jobInstance;
}
複製代碼
查看建立 job 任務 createBatchCubingJob
方法,最終會根據構建引擎,建立相應的 builder 實例並調用 build 方法。
// 該方法用於返回構建引擎,根據下圖中可看出支持 MR 和 Spark
public static IBatchCubingEngine batchEngine(IEngineAware aware) {
ImplementationSwitch<IBatchCubingEngine> current = engines.get();
if (current == null) {
current = new ImplementationSwitch<>(KylinConfig.getInstanceFromEnv().getJobEngines(),
IBatchCubingEngine.class);
engines.set(current);
}
return current.get(aware.getEngineType());
}
// 這裏是先根據配置得到構建引擎,建立對應的 builder 實例並調用 build 方法。
public static DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter, Integer priorityOffset) {
return batchEngine(newSegment).createBatchCubingJob(newSegment, submitter, priorityOffset);
}
複製代碼
先來看看 MR 構建引擎的 build 方法
public CubingJob build() {
logger.info("MR_V2 new job to BUILD segment " + seg);
// 得到一個初始化的 Job 實例
final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
final String jobId = result.getId();
// 獲取 cuboid 的數據路徑,以配置的 working-dir 開頭
final String cuboidRootPath = getCuboidRootPath(jobId);
// Phase 1: Create Flat Table & Materialize Hive View in Lookup Tables
inputSide.addStepPhase1_CreateFlatTable(result);
// Phase 2: Build Dictionary
result.addTask(createFactDistinctColumnsStep(jobId));
// 判斷是不是高基維(UHC),若是是則添加新的任務對高基維進行處理
if (isEnableUHCDictStep()) {
result.addTask(createBuildUHCDictStep(jobId));
}
// 構建字典
result.addTask(createBuildDictionaryStep(jobId));
// 保存 cuboid 統計數據
result.addTask(createSaveStatisticsStep(jobId));
// add materialize lookup tables if needed
LookupMaterializeContext lookupMaterializeContext = addMaterializeLookupTableSteps(result);
// 建立 HTable
outputSide.addStepPhase2_BuildDictionary(result);
if (seg.getCubeDesc().isShrunkenDictFromGlobalEnabled()) {
result.addTask(createExtractDictionaryFromGlobalJob(jobId));
}
// Phase 3: Build Cube
addLayerCubingSteps(result, jobId, cuboidRootPath); // layer cubing, only selected algorithm will execute
addInMemCubingSteps(result, jobId, cuboidRootPath); // inmem cubing, only selected algorithm will execute
outputSide.addStepPhase3_BuildCube(result);
// Phase 4: Update Metadata & Cleanup
result.addTask(createUpdateCubeInfoAfterBuildStep(jobId, lookupMaterializeContext));
inputSide.addStepPhase4_Cleanup(result);
outputSide.addStepPhase4_Cleanup(result);
// Set the task priority if specified
result.setPriorityBasedOnPriorityOffset(priorityOffset);
result.getTasks().forEach(task -> task.setPriorityBasedOnPriorityOffset(priorityOffset));
return result;
}
複製代碼
從源碼中能夠看到,該流程與頁面上 Cube 構建的任務流程基本一致,而且在 Phase 3:Build Cube 處有兩種構建算法,只有被選中的算法纔會被最終執行。
再看看 Spark 的 build 方法實現,Spark 的 build
方法流程和 MR 的基本上一致,再也不作註釋,但構建算法處有一些不一樣,Spark 構建引擎中只使用了分層構建算法,至於這兩種算法的具體原理以及在兩種構建引擎中選用的區別,將在以後的文章中作進一步探討。
public CubingJob build() {
logger.info("Spark new job to BUILD segment " + seg);
final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
final String jobId = result.getId();
final String cuboidRootPath = getCuboidRootPath(jobId);
// Phase 1: Create Flat Table & Materialize Hive View in Lookup Tables
inputSide.addStepPhase1_CreateFlatTable(result);
// Phase 2: Build Dictionary
KylinConfig config = KylinConfig.getInstanceFromEnv();
if (config.isSparkFactDistinctEnable()) {
result.addTask(createFactDistinctColumnsSparkStep(jobId));
} else {
result.addTask(createFactDistinctColumnsStep(jobId));
}
if (isEnableUHCDictStep()) {
result.addTask(createBuildUHCDictStep(jobId));
}
result.addTask(createBuildDictionaryStep(jobId));
result.addTask(createSaveStatisticsStep(jobId));
// add materialize lookup tables if needed
LookupMaterializeContext lookupMaterializeContext = addMaterializeLookupTableSteps(result);
outputSide.addStepPhase2_BuildDictionary(result);
// Phase 3: Build Cube
addLayerCubingSteps(result, jobId, cuboidRootPath); // layer cubing, only selected algorithm will execute
outputSide.addStepPhase3_BuildCube(result);
// Phase 4: Update Metadata & Cleanup
result.addTask(createUpdateCubeInfoAfterBuildStep(jobId, lookupMaterializeContext));
inputSide.addStepPhase4_Cleanup(result);
outputSide.addStepPhase4_Cleanup(result);
// Set the task priority if specified
result.setPriorityBasedOnPriorityOffset(priorityOffset);
result.getTasks().forEach(task -> task.setPriorityBasedOnPriorityOffset(priorityOffset));
return result;
}
複製代碼
本文從 Apache Kylin 最基本的認識到源碼分析,一步步探索了 Kylin 關鍵部分的流程,可是本文還處於比較表層的理解,還有很是多能夠深刻學習的點以及算法有待進一步學習。
文章內容可能不嚴謹或有錯誤,歡迎指出。
參考來源:
《Apache Kylin 權威指南》