Kylin 用戶在使用 Spark的過程當中,常常會遇到任務提交緩慢、構建節點不穩定的問題。爲了更方便地向 Spark 提交、管理和監控任務,有些用戶會使用 Livy 做爲 Spark 的交互接口。在最新的 Apache Kylin 3.0 版本中,Kylin 加入了經過 Apache Livy 遞交 Spark 任務的新功能[KYLIN-3795],特此感謝滴滴靳國衛同窗對此功能的貢獻。git
Apache Livy 是一個基於 Spark 的開源 REST 服務,是 Apache 基金會的一個孵化項目,它可以經過 REST 的方式將代碼片斷或是序列化的二進制代碼提交到 Spark 集羣中去執行。它提供了以下基本功能:github
Apache Livy 架構sql
1. 當前 Spark 存在的問題shell
Spark 當前支持兩種交互方式:apache
兩種方式都須要用戶登陸到 Gateway 節點上經過腳本啓動 Spark 進程,可是會出現如下問題:服務器
2. Livy 優點session
一方面,接受並解析用戶的 REST 請求,轉換成相應的操做;另外一方面,它管理着用戶所啓動的全部的 Spark 集羣。架構
Livy 具備以下功能:ide
1. 引入 Livy 以前 Kylin 是如何使用 Spark 的工具
Spark 是在 Kylin v2.0 引入的,主要應用於 Cube 構建,構建過程介紹能夠查看:https://kylin.apache.org/blog/2017/02/23/by-layer-spark-cubing/
下面是 SparkExecutable 類的 doWork 方法關於提交 Spark job 的一段代碼,咱們能夠看到 Kylin 會從配置中獲取 Spark job 包的路徑(默認爲 $KYLIN_HOME/lib),經過本地指令的形式提交 Spark job,而後循環獲取 Spark job 的執行狀態和結果。咱們能夠看到 Kylin 單獨開了一個線程在本地向 Spark 客戶端發送來 job 請求而且循環獲取結果,額外增長了節點系統壓力。
@Override protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { //略... String jobJar = config.getKylinJobJarPath(); //獲取job jar的路徑 //略... final String cmd = String.format(Locale.ROOT, stringBuilder.toString(), hadoopConf,KylinConfig.getSparkHome(), jars, jobJar, formatArgs()); //構建本地command //略... //建立指令執行線程 Callable callable = new Callable<Pair<Integer, String>>() { @Override public Pair<Integer, String> call() throws Exception { Pair<Integer, String> result; try { result = exec.execute(cmd, patternedLogger); } catch (Exception e) { logger.error("error run spark job:", e); result = new Pair<>(-1, e.getMessage()); } return result; } }; //略... try { Future<Pair<Integer, String>> future = executorService.submit(callable); Pair<Integer, String> result = null; while (!isDiscarded() && !isPaused()) { if (future.isDone()) { result = future.get(); //循環獲取指令執行結果 break; } else { Thread.sleep(5000); //每隔5秒檢查一次job執行狀態 } } //略... } catch (Exception e) { logger.error("Error run spark job:", e); return ExecuteResult.createError(e); } //略... }
2. Livy for Kylin 詳細解析
Livy 向 Spark 提交 job 一共有兩種,分別是 Session 和 Batch,Kylin 是經過 Batch 的方式提交 job 的,須要提早構建好 Spark job 對應的 jar 包並上傳到 HDFS 中,而且將配置項 kylin.engine.livy-conf.livy-key.file=hdfs:///path-to-kylin-job-jar 加入到 kyiln.properties 中。
Batch 一共具備以下九種狀態:
public enum LivyStateEnum { starting, running, success, dead, error, not_started, idle, busy, shutting_down; }
下面是 SparkExecutableLivy 類的 doWork 方法和 LivyRestExecutor 類的 execute 方法關於提交 Spark job 的一段代碼,Kylin 經過 livyRestBuilder 讀取配置文件獲取 Spark job 的包路徑,而後經過 restClient 向 Livy 發送 Http 請求。在提交 job 以後會每隔 10 秒查詢一次 job 執行的結果,直到 job 的狀態變爲 shutting_down, error, dead, success 中的一種。每一次都是經過 Http 的方式發送請求,相比較於經過本地 Spark 客戶端提交任務,更加穩定並且減小了 Kylin 節點系統壓力。
@Override protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { //略... livyRestBuilder.setLivyTypeEnum(LivyTypeEnum.job); executor.execute(livyRestBuilder, patternedLogger); //調用LivyRestExecutor類的execute方法 if (isDiscarded()) { return new ExecuteResult(ExecuteResult.State.DISCARDED, "Discarded"); } if (isPaused()) { return new ExecuteResult(ExecuteResult.State.STOPPED, "Stopped"); } //略... } public void execute(LivyRestBuilder livyRestBuilder, Logger logAppender) { LivyRestClient restClient = new LivyRestClient(); String result = restClient.livySubmitJobBatches(dataJson); //向Livy發送http請求 JSONObject resultJson = new JSONObject(result); String state = resultJson.getString("state"); //獲得Livy請求結果 final String livyTaskId = resultJson.getString("id"); while (!LivyStateEnum.shutting_down.toString().equalsIgnoreCase(state) && !LivyStateEnum.error.toString().equalsIgnoreCase(state) && !LivyStateEnum.dead.toString().equalsIgnoreCase(state) && !LivyStateEnum.success.toString().equalsIgnoreCase(state)) { String statusResult = restClient.livyGetJobStatusBatches(livyTaskId); //獲取Spark job執行狀態 JSONObject stateJson = new JSONObject(statusResult); if (!state.equalsIgnoreCase(stateJson.getString("state"))) { logAppender.log("Livy status Result: " + stateJson.getString("state")); } state = stateJson.getString("state"); Thread.sleep(10*1000); //每10秒檢查一次結果 } }
3. Livy 在 Kylin 中的應用
構建 Intermediate Flat Hive Table 和 Redistribute Flat Hive Table 本來都是經過 Hive 客戶端(Cli 或 Beeline)進行構建的,引入 Livy 以後,Kylin 經過 Livy 來調用 SparkSQL 進行構建,提升了平表的構建速度。在引入 Livy 以後,Cube 的構建主要改變的是如下幾個步驟,對應的任務日誌輸出以下:
1)構建 Cube
2)轉換 Cuboid 爲 HFile
4. 引入 Livy 對 Kylin 的好處
5. 如何在 Kylin 中啓用 Livy
在 Kylin 啓用 Livy 前,請先確保 Livy 可以正常工做
1)在 Kylin.properties 中,加入以下配置,並重啓使之生效。
//此處爲CDH5.7環境下的配置 kylin.engine.livy-conf.livy-enabled=true kylin.engine.livy-conf.livy-url=http://cdh-client:8998 kylin.engine.livy-conf.livy-key.file=hdfs:///path/kylin-job-3.0.0-SNAPSHOT.jar //請根據我的環境替換對應版本的包 kylin.engine.livy-conf.livy-arr.jars=hdfs:///path/hbase-client-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-common-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-hadoop-compat-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-hadoop2-compat-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-server-1.2.0-cdh5.7.5.jar,hdfs:///path/htrace-core-3.2.0-incubating.jar,hdfs:///path/metrics-core-2.2.0.jar
其中 livy-key.file 和 livy-arr.jars 地址之間不要有空格,不然可能會出不可預知的錯誤。
2)Cube 構建引擎選用 Spark。
如下問題每每爲使用不當和配置錯誤的緣由,非 Kylin 自己存在的問題,此處僅爲友情提示。
1. Table or view not found
輸出日誌:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Table or view not found: `DEFAULT`.`KYLIN_SALES`; line 21 pos 6;
解決方法:
//將hive-site.xml拷貝到spark的配置文件目錄中 ln -s /etc/hive/conf/hive-site.xml $SPARK_CONF_DIR
2. livy request 400 error
解決方法:
//kylin.properties Livy配置項jar包地址之間不要留空格 //此處爲CDH5.7環境下的依賴包,請根據我的環境替換對應版本的包 kylin.engine.livy-conf.livy-arr.jars=hdfs:///path/hbase-client-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-common-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-hadoop-compat-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-hadoop2-compat-1.2.0-cdh5.7.5.jar,hdfs:///path/
3. NoClassDefFoundError
輸出日誌:
NoClassDefFoundError: org/apache/hadoop/hbase/protobuf/generated/HFileProtos
解決方法:
find /opt -type f -name "hbase-protocol*.jar" cp /path/to/hbase-protocol-1.2.0-cdh5.7.5.jar $SPARK_HOME/jars
4. livy sql 執行錯誤
解決方法:
//kylin.properties中添加以下配置 kylin.source.hive.quote-enabled=false
Livy 本質上是在 Spark 上的 REST 服務,對於 Kylin cube 的構建沒有本質上的性能提高,可是經過引入 Livy,Kylin 可以直接經過 Spark SQL 代替 Hive 構建 Flat Table,並且管理 Spark job 也更加方便。可是,Livy 當前也存在一些問題,好比使用較低或較高版本的 Spark 沒法正常工做以及單點故障等問題,用戶能夠考慮自身的實際場景選擇是否須要在 Kylin 中使用 Livy。
做者簡介:王汝鵬,Kyligence 大數據研發工程師,主要負責 Apache Kylin 社區維護和開發。GitHub:https://github.com/rupengwang。