Ignite 與 Spark 都很強,那若是把它們整合起來會怎樣?

前面的文章中,咱們分別介紹了 Ignite 和 Spark 這兩種技術,從功能上對二者進行了全面深刻的對比。通過分析,能夠得出這樣一個結論:二者都很強大,可是差異很大,定位不一樣,所以會有不一樣的適用領域。前端

可是,這兩種技術也是能夠互補的,那麼它們互補適用於場景是什麼呢?主要是這麼幾個方面:若是以爲 Spark 中的 SQL 等運行速度較慢,那麼 Ignite 經過本身的方式提供了對 Spark 應用進行進一步加速的解決方案,這方面可選的解決方案並很少,推薦開發者考慮,另外就是數據和狀態的共享,固然這方面的解決方案也有不少,並非必定要用 Ignite 實現。java

Ignite 原生提供了對 Spark 的支持,本文主要探討爲什麼如何將 Ignite 和 Spark 進行集成。git

1.將 Ignite 與 Spark 整合

整合這兩種技術會爲 Spark 應用帶來若干明顯的好處:github

  • 經過避免大量的數據移動,得到真正可擴展的內存級性能;
  • 提升 RDD、DataFrame 和 SQL 的性能;
  • 在 Spark 做業之間更方便地共享狀態和數據。

下圖顯示瞭如何整合這兩種技術,而且標註了顯著的優點: sql

經過該圖,能夠從總體架構的角度看到 Ignite 在整個 Spark 應用中的位置和做用。數據庫

Ignite 對 Spark 的支持主要體現爲兩個方面,一個是 Ignite RDD,一個是 Ignite DataFrame。本文會首先聚焦於 Ignite RDD,以後再講講 Ignite DataFrame。apache

2.Ignite RDD

Ignite 提供了一個SparkRDD的實現,叫作IgniteRDD,這個實現能夠在內存中跨 Spark 做業共享任何數據和狀態,IgniteRDD爲 Ignite 中相同的內存數據提供了一個共享的、可變的視圖,它能夠跨多個不一樣的 Spark 做業、工做節點或者應用,相反,原生的 SparkRDD 沒法在 Spark 做業或者應用之間進行共享。編程

IgniteRDD做爲 Ignite 分佈式緩存的視圖,既能夠在 Spark 做業執行進程中部署,也能夠在 Spark 工做節點中部署,也能夠在它本身的集羣中部署。所以,根據預配置的部署模型,狀態共享既能夠只存在於一個 Spark 應用的生命週期內部(嵌入式模式),也能夠存在於 Spark 應用的外部(獨立模式)。json

Ignite 還能夠幫助 Spark 應用提升 SQL 的性能,雖然 SparkSQL 支持豐富的 SQL 語法,可是它沒有實現索引。從結果上來講,即便在普通較小的數據集上,Spark 查詢也可能花費幾分鐘的時間,由於須要進行全表掃描。若是使用 Ignite,Spark 用戶能夠配置主索引和二級索引,這樣能夠帶來上千倍的性能提高。小程序

2.1.IgniteRDD 示例

下面經過一些代碼以及建立若干應用的方式,展現 IgniteRDD 帶來的好處。

可使用多種語言來訪問 Ignite RDD,這對於有跨語言需求的團隊來講有友好的,下邊代碼共包括兩個簡單的 Scala 應用和兩個 Java 應用。此外,會從兩個不一樣的環境運行應用:從終端運行 Scala 應用以及經過 IDE 運行 Java 應用。另外還會在 Java 應用中運行一些 SQL 查詢。

對於 Scala 應用,一個應用會用於往 IgniteRDD 中寫入數據,而另外一個應用會執行部分過濾而後返回結果集。使用 Maven 將代碼構建爲一個 jar 文件後在終端窗口中執行這個程序,下面是詳細的代碼:

object RDDWriter extends App {

    val conf = new SparkConf().setAppName("RDDWriter")

    val sc = new SparkContext(conf)

    val ic = new IgniteContext(sc, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml")

    val sharedRDD: IgniteRDD[Int, Int] = ic.fromCache("sharedRDD")

    sharedRDD.savePairs(sc.parallelize(1 to 1000, 10).map(i => (i, i)))

    ic.close(true)

    sc.stop()

}

object RDDReader extends App {

    val conf = new SparkConf().setAppName("RDDReader")

    val sc = new SparkContext(conf)

    val ic = new IgniteContext(sc, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml")

    val sharedRDD: IgniteRDD[Int, Int] = ic.fromCache("sharedRDD")

    val greaterThanFiveHundred = sharedRDD.filter(_._2 > 500)

    println("The count is " + greaterThanFiveHundred.count())

    ic.close(true)

    sc.stop()

}

在這個 Scala 的RDDWriter中,首先建立了包含應用名的SparkConf,以後基於這個配置建立了SparkContext,最後,根據這個SparkContext建立一個IgniteContext。建立IgniteContext有不少種方法,本例中使用一個叫作example-shared-rdd.xml的 XML 文件,該文件會結合 Ignite 發行版而後根據需求進行預配置。顯然,須要根據本身的環境修改路徑(Ignite 主目錄),以後指定 IgniteRDD 持有的整數值元組,最後,將從 1 到 1000 的整數值存入 IgniteRDD,數值的存儲使用了 10 個 parallel 操做。

在這個 Scala 的RDDReader中,初始化和配置與 Scala RDDWriter相同,也會使用同一個 XML 配置文件,應用會執行部分過濾,而後關注存儲了多少大於 500 的值,答案最後會輸出。

關於IgniteContextIgniteRDD的更多信息,能夠看 Ignite 的文檔

要構建 jar 文件,可使用下面的 maven 命令:

mvn clean install

接下來,看下 Java 代碼,先寫一個 Java 應用往IgniteRDD中寫入多個記錄,而後另外一個應用會執行部分過濾而後返回結果集,下面是RDDWriter的代碼細節:

public class RDDWriter {

    public static void main(String args[]) {

        SparkConf sparkConf = new SparkConf().setAppName("RDDWriter").setMaster("local").set("spark.executor.instances", "2");

        JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

        Logger.getRootLogger().setLevel(Level.OFF);

        Logger.getLogger("org.apache.ignite").setLevel(Level.OFF);

        JavaIgniteContext<Integer, Integer> igniteContext = new JavaIgniteContext<Integer, Integer>(

            sparkContext, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml", true);

        JavaIgniteRDD<Integer, Integer> sharedRDD = igniteContext.<Integer, Integer>fromCache("sharedRDD");

        List<Integer> data = new ArrayList<>(20);

        for (int i = 1001; i <= 1020; i++) {

            data.add(i);

        }

        JavaRDD<Integer> javaRDD = sparkContext.<Integer>parallelize(data);

        sharedRDD.savePairs(javaRDD.<Integer, Integer>mapToPair(new PairFunction<Integer, Integer, Integer>() {

            public Tuple2<Integer, Integer> call(Integer val) throws Exception {

                return new Tuple2<Integer, Integer>(val, val);

            }

        }));

        igniteContext.close(true);

        sparkContext.close();

    }

}

在這個 Java 的RDDWriter中,首先建立了包含應用名和執行器數量的SparkConf,以後基於這個配置建立了SparkContext,最後,根據這個SparkContext建立一個IgniteContext。最後,往 IgniteRDD 中添加了額外的 20 個值。

在這個 Java 的RDDReader中,初始化和配置與 Java RDDWriter相同,也會使用同一個 XML 配置文件,應用會執行部分過濾,而後關注存儲了多少大於 500 的值,答案最後會輸出,下面是 Java RDDReader的代碼:

public class RDDReader {

    public static void main(String args[]) {

        SparkConf sparkConf = new SparkConf().setAppName("RDDReader").setMaster("local").set("spark.executor.instances", "2");

        JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

        Logger.getRootLogger().setLevel(Level.OFF);

        Logger.getLogger("org.apache.ignite").setLevel(Level.OFF);

        JavaIgniteContext<Integer, Integer> igniteContext = new JavaIgniteContext<Integer, Integer>(

            sparkContext, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml", true);

        JavaIgniteRDD<Integer, Integer> sharedRDD = igniteContext.<Integer, Integer>fromCache("sharedRDD");

        JavaPairRDD<Integer, Integer> greaterThanFiveHundred = sharedRDD.filter(new Function<Tuple2<Integer, Integer>, Boolean>() {

            public Boolean call(Tuple2<Integer, Integer> tuple) throws Exception {

                return tuple._2() > 500;

            }

        });

        System.out.println("The count is " + greaterThanFiveHundred.count());

        System.out.println(">>> Executing SQL query over Ignite Shared RDD...");

        Dataset df = sharedRDD.sql("select _val from Integer where _val > 10 and _val < 100 limit 10");

        df.show();

        igniteContext.close(true);

        sparkContext.close();

    }

}

到這裏就能夠對代碼進行測試了。

2.2.運行應用

在第一個終端窗口中,啓動 Spark 的主節點,以下:

$SPARK_HOME/sbin/start-master.sh

在第二個終端窗口中,啓動 Spark 工做節點,以下:

$SPARK_HOME/bin/spark-class org.apache.spark.deploy.worker.Worker spark://ip:port

根據本身的環境,修改 IP 地址和端口號(ip:port)。

在第三個終端窗口中,啓動一個 Ignite 節點,以下:

$IGNITE_HOME/bin/ignite.sh examples/config/spark/example-shared-rdd.xml

這裏使用了以前討論過的example-shared-rdd.xml文件。

在第四個終端窗口中,能夠運行 Scala 版的 RDDWriter 應用,以下:

$SPARK_HOME/bin/spark-submit --class "com.gridgain.RDDWriter" --master spark://ip:port "/path_to_jar_file/ignite-spark-scala-1.0.jar"

根據本身的環境修改 IP 地址和端口(ip:port),以及 jar 文件的路徑(/path_to_jar_file)。

會產生以下的輸出:

The count is 500

這是指望的輸出。

接下來,殺掉 Spark 的主節點和工做節點,而 Ignite 節點仍然在運行中而且IgniteRDD對於其它應用仍然可用,下面會使用 IDE 經過 Java 應用接入IgniteRDD

運行 Java 版RDDWriter會擴展以前存儲於 IgniteRDD 中的記錄列表,經過運行 Java 版RDDReader能夠進行測試,它會產生以下的輸出:

The count is 520

這也是指望的輸出。

最後,SQL 查詢會在IgniteRDD中執行一個 SELECT 語句,返回範圍在 10 到 100 之間的最初 10 個值,輸出以下:

結果正確。

3.IgniteDataframes

Spark 的 DataFrame API 爲描述數據引入了模式的概念,Spark 經過表格的形式進行模式的管理和數據的組織。

DataFrame 是一個組織爲命名列形式的分佈式數據集,從概念上講,DataFrame 等同於關係數據庫中的表,並容許 Spark 使用 Catalyst 查詢優化器來生成高效的查詢執行計劃。而 RDD 只是跨集羣節點分區化的元素集合。

Ignite 擴展了 DataFrames,簡化了開發,改進了將 Ignite 做爲 Spark 的內存存儲時的數據訪問時間,好處包括:

  • 經過 Ignite 讀寫 DataFrames 時,能夠在 Spark 做業之間共享數據和狀態;
  • 經過優化 Spark 的查詢執行計劃加快 SparkSQL 查詢,這些主要是經過 IgniteSQL 引擎的高級索引以及避免了 Ignite 和 Spark 之間的網絡數據移動實現的。

3.1.IgniteDataframes 示例

下面經過一些代碼以及搭建幾個小程序的方式,瞭解如何經過 Ignite DataFrames 整合 Ignite 與 Spark。

一共會寫兩個 Java 的小應用,而後在 IDE 中運行,還會在這些 Java 應用中執行一些 SQL 查詢。

一個 Java 應用會從 JSON 文件中讀取一些數據,而後建立一個存儲於 Ignite 的 DataFrame,這個 JSON 文件 Ignite 的發行版中已經提供,另外一個 Java 應用會從 Ignite 的 DataFrame 中讀取數據而後使用 SQL 進行查詢。

下面是寫應用的代碼:

public class DFWriter {

    private static final String CONFIG = "config/example-ignite.xml";

    public static void main(String args[]) {

        Ignite ignite = Ignition.start(CONFIG);

        SparkSession spark = SparkSession.builder().appName("DFWriter").master("local").config("spark.executor.instances", "2").getOrCreate();

        Logger.getRootLogger().setLevel(Level.OFF);

        Logger.getLogger("org.apache.ignite").setLevel(Level.OFF);

        Dataset<Row> peopleDF = spark.read().json(

            resolveIgnitePath("resources/people.json").getAbsolutePath());

        System.out.println("JSON file contents:");

        peopleDF.show();

        System.out.println("Writing DataFrame to Ignite.");

        peopleDF.write().format(IgniteDataFrameSettings.FORMAT_IGNITE()).option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), CONFIG).option(IgniteDataFrameSettings.OPTION_TABLE(), "people").option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS(), "id").option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PARAMETERS(), "template=replicated").save();

        System.out.println("Done!");

        Ignition.stop(false);

    }

}

DFWriter中,首先建立了SparkSession,它包含了應用名,以後會使用spark.read().json()讀取 JSON 文件而且輸出文件內容,下一步是將數據寫入 Ignite 存儲。下面是DFReader的代碼:

public class DFReader {

    private static final String CONFIG = "config/example-ignite.xml";

    public static void main(String args[]) {

        Ignite ignite = Ignition.start(CONFIG);

        SparkSession spark = SparkSession.builder().appName("DFReader").master("local").config("spark.executor.instances", "2").getOrCreate();

        Logger.getRootLogger().setLevel(Level.OFF);

        Logger.getLogger("org.apache.ignite").setLevel(Level.OFF);

        System.out.println("Reading data from Ignite table.");

        Dataset<Row> peopleDF = spark.read().format(IgniteDataFrameSettings.FORMAT_IGNITE()).option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), CONFIG).option(IgniteDataFrameSettings.OPTION_TABLE(), "people").load();

        peopleDF.createOrReplaceTempView("people");

        Dataset<Row> sqlDF = spark.sql("SELECT * FROM people WHERE id > 0 AND id < 6");

        sqlDF.show();

        System.out.println("Done!");

        Ignition.stop(false);

    }

}

DFReader中,初始化和配置與DFWriter相同,這個應用會執行一些過濾,需求是查找全部的 id > 0 以及 < 6 的人,而後輸出結果。

在 IDE 中,經過下面的代碼能夠啓動一個 Ignite 節點:

public class ExampleNodeStartup {

    public static void main(String[] args) throws IgniteException {

        Ignition.start("config/example-ignite.xml");

    }

}

到此,就能夠對代碼進行測試了。

3.2.運行應用

首先在 IDE 中啓動一個 Ignite 節點,而後運行DFWriter應用,輸出以下:

若是將上面的結果與 JSON 文件的內容進行對比,會顯示二者是一致的,這也是指望的結果。

下一步運行DFReader,輸出以下:

這也是指望的輸出。

4.總結

經過本文,會發現 Ignite 與 Spark 的集成很簡單,也能夠看到如何從多個環境中使用多個編程語言輕鬆地訪問IgniteRDD。能夠對IgniteRDD進行數據的讀寫,而且即便 Spark 已經關閉狀態也能經過 Ignite 得以保持,也看到了經過 Ignite 進行 DataFrame 的讀寫。讀者能夠輕鬆嘗試一下。

若是想要這些示例的源代碼,能夠從這裏下載。

做者

李玉珏,架構師,有豐富的架構設計和技術研發團隊管理經驗,社區技術翻譯做者以及撰稿人,開源技術貢獻者。Apache Ignite 技術中文文檔翻譯做者,長期在國內進行 Ignite 技術的推廣/技術支持/諮詢工做。

本文系做者投稿文章。歡迎投稿。

投稿內容要求

  • 互聯網技術相關,包括但不限於開發語言、網絡、數據庫、架構、運維、前端、DevOps(DevXXX)、AI、區塊鏈、存儲、移動、安全、技術團隊管理等內容。
  • 文章不須要首發,能夠是已經在開源中國博客或網上其它平臺發佈過的。可是鼓勵首發,首發內容被收錄可能性較大。
  • 若是你是記錄某一次解決了某一個問題(這在博客中佔絕大比例),那麼須要將問題的來龍去脈描述清楚,最直接的就是結合圖文等方式將問題復現,同時完整地說明解決思路與最終成功的方案。
  • 若是你是分析某一技術理論知識,請從定義、應用場景、實際案例、關鍵技術細節、觀點等方面,對其進行較爲全面地介紹。
  • 若是你是以實際案例分享本身或者公司對諸如某一架構模型、通用技術、編程語言、運維工具的實踐,那麼請將事件相關背景、具體技術細節、演進過程、思考、應用效果等方面描述清楚
  • 其它未盡 case 具體狀況具體分析,不虛的,文章投過來試試先,好比咱們並不拒絕就某個熱點事件對其進行的報導、深刻解析。

投稿方式

重要說明

  • 做者須要擁有所投文章的全部權,不能將別人的文章拿過來投遞。
  • 投遞的文章須要通過審覈,若是開源中國編輯以爲須要的話,將與做者一塊兒進一步完善文章,意在使文章更佳、傳播更廣。
  • 文章版權歸做者全部,開源中國得到文章的傳播權,可在開源中國各個平臺進行文章傳播,同時保留文章原始出處和做者信息,可在官方博客中標原創標籤。
相關文章
相關標籤/搜索