Ignite集成Spark之IgniteRDD

本系列共兩篇文章,會探討如何將Ignite和Spark進行集成。java

Ignite是一個分佈式的內存數據庫、緩存和處理平臺,爲事務型、分析型和流式負載而設計,在保證擴展性的前提下提供了內存級的性能。git

Spark是一個流式數據和計算引擎,一般從HDFS或者其餘存儲中獲取數據,一直以來,他都傾向於OLAP型業務,而且聚焦於MapReduce類型負載。github

所以,這兩種技術是能夠互補的。sql

將Ignite與Spark整合

整合這兩種技術會爲Spark用戶帶來若干明顯的好處:shell

  • 經過避免大量的數據移動,得到真正可擴展的內存級性能;
  • 提升RDD、DataFrame和SQL的性能;
  • 在Spark做業之間更方便地共享狀態和數據。

下圖中顯示瞭如何整合這兩種技術,而且標註了顯著的優點: 在本系列的第一篇文章中會聚焦於Ignite RDD,在第二篇文章中會聚焦於Ignite DataFrame。數據庫

Ignite RDD

Ignite提供了一個SparkRDD的實現,叫作IgniteRDD,這個實現能夠在內存中跨Spark做業共享任何數據和狀態,IgniteRDD爲Ignite中相同的內存數據提供了一個共享的、可變的視圖,它能夠跨多個不一樣的Spark做業、工做節點或者應用,相反,原生的SparkRDD沒法在Spark做業或者應用之間進行共享。apache

IgniteRDD做爲Ignite分佈式緩存的視圖,既能夠在Spark做業執行進程中部署,也能夠在Spark工做節點中部署,也能夠在它本身的集羣中部署。所以,根據預配置的部署模型,狀態共享既能夠只存在於一個Spark應用的生命週期的內部(嵌入式模式),或者也能夠存在於Spark應用的外部(獨立模式)。編程

Ignite還能夠幫助Spark用戶提升SQL的性能,雖然SparkSQL支持豐富的SQL語法,可是它沒有實現索引。從結果上來講,即便在普通的較小的數據集上,Spark查詢也可能花費幾分鐘的時間,由於須要進行全表掃描。若是使用Ignite,Spark用戶能夠配置主索引和二級索引,這樣能夠帶來上千倍的性能提高。緩存

IgniteRDD示例

下面經過一些代碼以及建立若干應用的方式,演示如何使用IgniteRDD以及看到它的好處,代碼的完整版本,能夠從GitHub上進行下載。框架

代碼共包括兩個簡單的Scala應用和兩個Java應用。這是爲了說明可使用多種語言來訪問Ignite RDD,這在使用不一樣編程語言和框架的組織中可能存在這樣的場景。此外,會從兩個不一樣的環境運行應用:從終端運行Scala應用以及經過IDE運行Java應用。做爲一個花絮,還會在Java應用程序中運行一些SQL代碼。

對於Scala應用,一個應用會用於往IgniteRDD中寫入部分數據,而另外一個應用會執行部分過濾而後結果集。使用Maven將代碼構建爲一個jar文件後在終端窗口中執行這個程序,下面是詳細的代碼:

object RDDWriter extends App {
  val conf = new SparkConf().setAppName("RDDWriter")
  val sc = new SparkContext(conf)
  val ic = new IgniteContext(sc, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml")
  val sharedRDD: IgniteRDD[Int, Int] = ic.fromCache("sharedRDD")
  sharedRDD.savePairs(sc.parallelize(1 to 1000, 10).map(i => (i, i)))
  ic.close(true)
  sc.stop()
}

object RDDReader extends App {
  val conf = new SparkConf().setAppName("RDDReader")
  val sc = new SparkContext(conf)
  val ic = new IgniteContext(sc, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml")
  val sharedRDD: IgniteRDD[Int, Int] = ic.fromCache("sharedRDD")
  val greaterThanFiveHundred = sharedRDD.filter(_._2 > 500)
  println("The count is " + greaterThanFiveHundred.count())
  ic.close(true)
  sc.stop()
}

在這個Scala的RDDWriter中,首先建立了包含應用名的SparkConf,以後基於這個配置建立了SparkContext,最後,根據這個SparkContext建立一個IgniteContext。建立IgniteContext有不少種方法,本例中會使用一個叫作example-shared-rdd.xml的XML文件,該文件會結合Ignite發行版而後根據需求進行了預配置。顯然,須要根據本身的環境修改路徑(Ignite主目錄),以後指定IgniteRDD持有的整數值元組,最後,將從1到1000的整數值存入IgniteRDD,數值的存儲使用了10個parallel操做。

在這個Scala的RDDReader中,初始化和配置與Scala RDDWriter相同,也會使用同一個xml配置文件,應用會執行部分過濾,而後關注存儲了多少大於500的值,答案最後會輸出出來。

關於IgniteContextIgniteRDD的更多信息,能夠看Ignite的文檔

要構建jar文件,可使用下面的maven命令:

mvn clean install

接下來,看下Java代碼,先寫一個Java應用往IgniteRDD中寫入多個元組,而後另外一個應用會執行部分過濾而後返回結果集,下面是RDDWriter的代碼細節:

public class RDDWriter {
    public static void main(String args[]) {
        SparkConf sparkConf = new SparkConf()
                .setAppName("RDDWriter")
                .setMaster("local")
                .set("spark.executor.instances", "2");

        JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

        Logger.getRootLogger().setLevel(Level.OFF);
        Logger.getLogger("org.apache.ignite").setLevel(Level.OFF);

        JavaIgniteContext<Integer, Integer> igniteContext = new JavaIgniteContext<Integer, Integer>(
                sparkContext, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml", true);

        JavaIgniteRDD<Integer, Integer> sharedRDD = igniteContext.<Integer, Integer>fromCache("sharedRDD");

        List<Integer> data = new ArrayList<>(20);

        for (int i = 1001; i <= 1020; i++) {
            data.add(i);
        }

        JavaRDD<Integer> javaRDD = sparkContext.<Integer>parallelize(data);

        sharedRDD.savePairs(javaRDD.<Integer, Integer>mapToPair(new PairFunction<Integer, Integer, Integer>() {
            public Tuple2<Integer, Integer> call(Integer val) throws Exception {
                return new Tuple2<Integer, Integer>(val, val);
            }
        }));

        igniteContext.close(true);

        sparkContext.close();
    }
}

在這個Java的RDDWriter中,首先建立了包含應用名和執行器數量的SparkConf,以後基於這個配置建立了SparkContext,最後,根據這個SparkContext建立一個IgniteContext。建立IgniteContext有不少種方法,本例中會使用一個叫作example-shared-rdd.xml的XML文件,該文件會結合Ignite發行版而後根據需求進行了預配置。顯然,須要根據本身的環境修改路徑(Ignite主目錄),最後,往IgniteRDD中添加了額外的20個值。

在這個Java的RDDReader中,初始化和配置與Java RDDWriter相同,也會使用同一個xml配置文件,應用會執行部分過濾,而後關注存儲了多少大於500的值,答案最後會輸出出來,下面是Java RDDReader的代碼:

public class RDDReader {
    public static void main(String args[]) {
        SparkConf sparkConf = new SparkConf()
                .setAppName("RDDReader")
                .setMaster("local")
                .set("spark.executor.instances", "2");

        JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

        Logger.getRootLogger().setLevel(Level.OFF);
        Logger.getLogger("org.apache.ignite").setLevel(Level.OFF);

        JavaIgniteContext<Integer, Integer> igniteContext = new JavaIgniteContext<Integer, Integer>(
                sparkContext, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml", true);

        JavaIgniteRDD<Integer, Integer> sharedRDD = igniteContext.<Integer, Integer>fromCache("sharedRDD");

        JavaPairRDD<Integer, Integer> greaterThanFiveHundred =
                sharedRDD.filter(new Function<Tuple2<Integer, Integer>, Boolean>() {
                    public Boolean call(Tuple2<Integer, Integer> tuple) throws Exception {
                        return tuple._2() > 500;
                    }
                });

        System.out.println("The count is " + greaterThanFiveHundred.count());

        System.out.println(">>> Executing SQL query over Ignite Shared RDD...");

        Dataset df = sharedRDD.sql("select _val from Integer where _val > 10 and _val < 100 limit 10");

        df.show();

        igniteContext.close(true);

        sparkContext.close();
    }
}

最後,立刻就能夠對代碼進行測試了。

運行這個應用

在第一個終端窗口中,啓動Spark的主節點,以下:

$SPARK_HOME/sbin/start-master.sh

在第二個終端窗口中,啓動Spark工做節點,以下:

$SPARK_HOME/bin/spark-class org.apache.spark.deploy.worker.Worker spark://ip:port

根據本身的環境,修改IP地址和端口號(ip:port)。

在第三個終端窗口中,啓動一個Ignite節點,以下:

$IGNITE_HOME/bin/ignite.sh examples/config/spark/example-shared-rdd.xml

這裏使用了以前討論過的example-shared-rdd.xml文件。

在第四個終端窗口中,能夠運行Scala版的RDDWriter應用,以下:

$SPARK_HOME/bin/spark-submit --class "com.gridgain.RDDWriter" --master spark://ip:port "/path_to_jar_file/ignite-spark-scala-1.0.jar"

根據本身的環境修改IP地址和端口(ip:port),以及jar文件的路徑(/path_to_jar_file)。

會產生以下的輸出:

The count is 500

這是咱們指望的值。

接下來,殺掉Spark的主節點和工做節點,而Ignite節點仍然在運行中而且IgniteRDD對於其餘應用仍然可用,下面會使用IDE經過Java應用接入IgniteRDD。

運行Java版RDDWriter會擴展以前存儲於IgniteRDD中的元組列表,經過運行Java版RDDReader能夠進行測試,它會產生以下的輸出:

The count is 520

這也是咱們指望的。

最後,SQL查詢會在IgniteRDD中執行一個SELECT語句,返回範圍在10到100之間的最初10個值,輸出以下:

+----+
|_VAL|
+----+
|  11|
|  12|
|  13|
|  14|
|  15|
|  16|
|  17|
|  18|
|  19|
|  20|
+----+

結果正確。

總結

本文中,看到了如何從多個環境中使用多個編程語言輕鬆地訪問IgniteRDD。能夠對IgniteRDD進行數據的讀寫,而且即便Spark已經關閉狀態也經過Ignite得以保持,所以能夠看到,這爲Spark用戶帶來了很大的靈活性和好處。

在本系列的下一篇文章中,會看到Ignite和Spark整合以後的Ignite DataFrames及其優點。

相關文章
相關標籤/搜索