如何在 Kylin 中優雅地使用 Spark

前言

Kylin 用戶在使用 Spark的過程當中,常常會遇到任務提交緩慢、構建節點不穩定的問題。爲了更方便地向 Spark 提交、管理和監控任務,有些用戶會使用 Livy 做爲 Spark 的交互接口。在最新的 Apache Kylin 3.0 版本中,Kylin 加入了經過 Apache Livy 遞交 Spark 任務的新功能[KYLIN-3795],特此感謝滴滴靳國衛同窗對此功能的貢獻。git

 

Livy 介紹

Apache Livy 是一個基於 Spark 的開源 REST 服務,是 Apache 基金會的一個孵化項目,它可以經過 REST 的方式將代碼片斷或是序列化的二進制代碼提交到 Spark 集羣中去執行。它提供了以下基本功能:github

  • 提交 Scala、Python 或是 R 代碼片斷到遠端的 Spark 集羣上執行。
  • 提交 Java、Scala、Python 所編寫的 Spark 做業到遠端的 Spark 集羣上執行。


Apache Livy 架構sql

 

爲何使用 Livy

1. 當前 Spark 存在的問題shell

Spark 當前支持兩種交互方式:apache

  • 交互式處理用戶使用 spark-shell 或 pyspark 腳本啓動 Spark 應用程序,伴隨應用程序啓動的同時,Spark 會在當前終端啓動 REPL(Read–Eval–Print Loop) 來接收用戶的代碼輸入,並將其編譯成 Spark 做業。
  • 批處理批處理的程序邏輯由用戶實現並編譯打包成 jar 包,spark-submit 腳本啓動 Spark 應用程序來執行用戶所編寫的邏輯,與交互式處理不一樣的是批處理程序在執行過程當中用戶沒有與 Spark 進行任何的交互。

兩種方式都須要用戶登陸到 Gateway 節點上經過腳本啓動 Spark 進程,可是會出現如下問題:服務器

  • 增長 Gateway 節點的資源使用負擔和故障發生的可能性。
  • 同時 Gateway 節點的故障會帶來單點問題,形成 Spark 程序的失敗。
  • 難以管理、審計以及與已有的權限管理工具的集成。因爲 Spark 採用腳本的方式啓動應用程序,所以相比於 WEB 方式少了許多管理、審計的便利性,同時也難以與已有的工具結合,如 Apache Knox 等。
  • 將 Gateway 節點上的部署細節以及配置不可避免地暴露給了登錄用戶。

2. Livy 優點session

一方面,接受並解析用戶的 REST 請求,轉換成相應的操做;另外一方面,它管理着用戶所啓動的全部的 Spark 集羣。架構

Livy 具備以下功能:ide

  • 經過 Livy session 實時提交代碼片斷與 Spark 的 REPL 進行交互。
  • 經過 Livy batch 提交 Scala、Java、Python 編寫的二進制包來提交批處理任務。
  • 多用戶可以使用同一個服務器(支持用戶模擬)。
  • 可以經過 REST 接口在任何設備上提交任務、查看任務執行狀態和結果。

 

Kylin with Livy

1. 引入 Livy 以前 Kylin 是如何使用 Spark 的工具

Spark 是在 Kylin v2.0 引入的,主要應用於 Cube 構建,構建過程介紹能夠查看:https://kylin.apache.org/blog/2017/02/23/by-layer-spark-cubing/

下面是 SparkExecutable 類的 doWork 方法關於提交 Spark job 的一段代碼,咱們能夠看到 Kylin 會從配置中獲取 Spark job 包的路徑(默認爲 $KYLIN_HOME/lib),經過本地指令的形式提交 Spark job,而後循環獲取 Spark job 的執行狀態和結果。咱們能夠看到 Kylin 單獨開了一個線程在本地向 Spark 客戶端發送來 job 請求而且循環獲取結果,額外增長了節點系統壓力。

@Override
protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
    //略...
    String jobJar = config.getKylinJobJarPath(); //獲取job jar的路徑
    //略...
    final String cmd = String.format(Locale.ROOT, stringBuilder.toString(), hadoopConf,KylinConfig.getSparkHome(), jars, jobJar, formatArgs()); //構建本地command
    //略...
    //建立指令執行線程
    Callable callable = new Callable<Pair<Integer, String>>() {
        @Override
        public Pair<Integer, String> call() throws Exception {
            Pair<Integer, String> result;
            try {
                result = exec.execute(cmd, patternedLogger);
                } catch (Exception e) {
                logger.error("error run spark job:", e);
                result = new Pair<>(-1, e.getMessage());
            }
            return result;
        }
    };
    //略...
    try {
        Future<Pair<Integer, String>> future = executorService.submit(callable);
        Pair<Integer, String> result = null;
        while (!isDiscarded() && !isPaused()) {
            if (future.isDone()) {
                result = future.get(); //循環獲取指令執行結果
                break;
            } else {
                Thread.sleep(5000); //每隔5秒檢查一次job執行狀態
            }
        }
    //略...
    } catch (Exception e) {
        logger.error("Error run spark job:", e);
        return ExecuteResult.createError(e);
    }
    //略...
}

2. Livy for Kylin 詳細解析

Livy 向 Spark 提交 job 一共有兩種,分別是 Session 和 Batch,Kylin 是經過 Batch 的方式提交 job 的,須要提早構建好 Spark job 對應的 jar 包並上傳到 HDFS 中,而且將配置項 kylin.engine.livy-conf.livy-key.file=hdfs:///path-to-kylin-job-jar 加入到 kyiln.properties 中。

Batch 一共具備以下九種狀態:

public enum LivyStateEnum {
    starting, running, success, dead, error, not_started, idle, busy, shutting_down;
}

下面是 SparkExecutableLivy 類的 doWork 方法和 LivyRestExecutor 類的 execute 方法關於提交 Spark job 的一段代碼,Kylin 經過 livyRestBuilder 讀取配置文件獲取 Spark job 的包路徑,而後經過 restClient 向 Livy 發送 Http 請求。在提交 job 以後會每隔 10 秒查詢一次 job 執行的結果,直到 job 的狀態變爲 shutting_down, error, dead, success 中的一種。每一次都是經過 Http 的方式發送請求,相比較於經過本地 Spark 客戶端提交任務,更加穩定並且減小了 Kylin 節點系統壓力。

@Override
protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
    //略...
    livyRestBuilder.setLivyTypeEnum(LivyTypeEnum.job);
    executor.execute(livyRestBuilder, patternedLogger); //調用LivyRestExecutor類的execute方法
    if (isDiscarded()) {
        return new ExecuteResult(ExecuteResult.State.DISCARDED, "Discarded");
    }
    if (isPaused()) {
        return new ExecuteResult(ExecuteResult.State.STOPPED, "Stopped");
    }
    //略...
}

public void execute(LivyRestBuilder livyRestBuilder, Logger logAppender) {
    LivyRestClient restClient = new LivyRestClient();
    String result = restClient.livySubmitJobBatches(dataJson); //向Livy發送http請求
    JSONObject resultJson = new JSONObject(result);
    String state = resultJson.getString("state"); //獲得Livy請求結果
    final String livyTaskId = resultJson.getString("id");
    while (!LivyStateEnum.shutting_down.toString().equalsIgnoreCase(state)
            && !LivyStateEnum.error.toString().equalsIgnoreCase(state)
            && !LivyStateEnum.dead.toString().equalsIgnoreCase(state)
            && !LivyStateEnum.success.toString().equalsIgnoreCase(state)) {
        String statusResult = restClient.livyGetJobStatusBatches(livyTaskId); //獲取Spark job執行狀態
        JSONObject stateJson = new JSONObject(statusResult);
        if (!state.equalsIgnoreCase(stateJson.getString("state"))) {
            logAppender.log("Livy status Result: " + stateJson.getString("state"));
        }
        state = stateJson.getString("state");
        Thread.sleep(10*1000); //每10秒檢查一次結果
    }
}

3. Livy 在 Kylin 中的應用

構建 Intermediate Flat Hive Table 和 Redistribute Flat Hive Table 本來都是經過 Hive 客戶端(Cli 或 Beeline)進行構建的,引入 Livy 以後,Kylin 經過 Livy 來調用 SparkSQL 進行構建,提升了平表的構建速度。在引入 Livy 以後,Cube 的構建主要改變的是如下幾個步驟,對應的任務日誌輸出以下:

  • 構建 Intermediate Flat Hive Table

  • 構建 Redistribute Flat Hive Table

  • 使用 Spark-Submit 的地方都用 Livy 的 Batch API 進行替換

1)構建 Cube

2)轉換 Cuboid 爲 HFile

4. 引入 Livy 對 Kylin 的好處

  • 無需準備 Spark 的客戶端配置,Kylin 部署更加輕量化。
  • Kylin 節點系統壓力更低,無需在 Kylin 節點啓動 Spark 客戶端。
  • 構建 Flat Hive Table 更快,經過 Livy 可使用 Spark SQL 構建平表,而 Spark SQL 要快於 Hive。
  • 提交 job 更快,job 狀態獲取更方便。

5. 如何在 Kylin 中啓用 Livy

在 Kylin 啓用 Livy 前,請先確保 Livy 可以正常工做

1)在 Kylin.properties 中,加入以下配置,並重啓使之生效。

//此處爲CDH5.7環境下的配置
kylin.engine.livy-conf.livy-enabled=true
kylin.engine.livy-conf.livy-url=http://cdh-client:8998
kylin.engine.livy-conf.livy-key.file=hdfs:///path/kylin-job-3.0.0-SNAPSHOT.jar
//請根據我的環境替換對應版本的包
kylin.engine.livy-conf.livy-arr.jars=hdfs:///path/hbase-client-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-common-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-hadoop-compat-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-hadoop2-compat-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-server-1.2.0-cdh5.7.5.jar,hdfs:///path/htrace-core-3.2.0-incubating.jar,hdfs:///path/metrics-core-2.2.0.jar

其中 livy-key.file 和 livy-arr.jars 地址之間不要有空格,不然可能會出不可預知的錯誤。

2)Cube 構建引擎選用 Spark。

 

常見問題

如下問題每每爲使用不當和配置錯誤的緣由,非 Kylin 自己存在的問題,此處僅爲友情提示。

1. Table or view not found

輸出日誌:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Table or view not found: `DEFAULT`.`KYLIN_SALES`; line 21 pos 6;

解決方法:

//將hive-site.xml拷貝到spark的配置文件目錄中
ln -s /etc/hive/conf/hive-site.xml $SPARK_CONF_DIR

2. livy request 400 error
解決方法:

//kylin.properties Livy配置項jar包地址之間不要留空格
//此處爲CDH5.7環境下的依賴包,請根據我的環境替換對應版本的包
kylin.engine.livy-conf.livy-arr.jars=hdfs:///path/hbase-client-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-common-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-hadoop-compat-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-hadoop2-compat-1.2.0-cdh5.7.5.jar,hdfs:///path/

3. NoClassDefFoundError
輸出日誌:

NoClassDefFoundError: org/apache/hadoop/hbase/protobuf/generated/HFileProtos

解決方法:

find /opt -type f -name "hbase-protocol*.jar"
cp /path/to/hbase-protocol-1.2.0-cdh5.7.5.jar $SPARK_HOME/jars

4. livy sql 執行錯誤
解決方法:

//kylin.properties中添加以下配置
kylin.source.hive.quote-enabled=false

 

總結

Livy 本質上是在 Spark 上的 REST 服務,對於 Kylin cube 的構建沒有本質上的性能提高,可是經過引入 Livy,Kylin 可以直接經過 Spark SQL 代替 Hive 構建 Flat Table,並且管理 Spark job 也更加方便。可是,Livy 當前也存在一些問題,好比使用較低或較高版本的 Spark 沒法正常工做以及單點故障等問題,用戶能夠考慮自身的實際場景選擇是否須要在 Kylin 中使用 Livy。

 

參考文章

  1. https://hortonworks.com/blog/livy-a-rest-interface-for-apache-spark/
  2. https://wiki.apache.org/incubator/LivyProposal
  3. https://kylin.apache.org/blog/2017/02/23/by-layer-spark-cubing/

 

做者簡介:王汝鵬,Kyligence 大數據研發工程師,主要負責 Apache Kylin 社區維護和開發。GitHub:https://github.com/rupengwang。

 

瞭解更多大數據資訊,點擊進入Kyligence官網

相關文章
相關標籤/搜索