本系列共兩篇文章,會探討如何將Ignite和Spark進行集成。java
Ignite是一個分佈式的內存數據庫、緩存和處理平臺,爲事務型、分析型和流式負載而設計,在保證擴展性的前提下提供了內存級的性能。git
Spark是一個流式數據和計算引擎,一般從HDFS或者其餘存儲中獲取數據,一直以來,他都傾向於OLAP型業務,而且聚焦於MapReduce類型負載。github
所以,這兩種技術是能夠互補的。sql
整合這兩種技術會爲Spark用戶帶來若干明顯的好處:shell
下圖中顯示瞭如何整合這兩種技術,而且標註了顯著的優點: 在本系列的第一篇文章中會聚焦於Ignite RDD,在第二篇文章中會聚焦於Ignite DataFrame。數據庫
Ignite提供了一個SparkRDD的實現,叫作IgniteRDD,這個實現能夠在內存中跨Spark做業共享任何數據和狀態,IgniteRDD爲Ignite中相同的內存數據提供了一個共享的、可變的視圖,它能夠跨多個不一樣的Spark做業、工做節點或者應用,相反,原生的SparkRDD沒法在Spark做業或者應用之間進行共享。apache
IgniteRDD做爲Ignite分佈式緩存的視圖,既能夠在Spark做業執行進程中部署,也能夠在Spark工做節點中部署,也能夠在它本身的集羣中部署。所以,根據預配置的部署模型,狀態共享既能夠只存在於一個Spark應用的生命週期的內部(嵌入式模式),或者也能夠存在於Spark應用的外部(獨立模式)。編程
Ignite還能夠幫助Spark用戶提升SQL的性能,雖然SparkSQL支持豐富的SQL語法,可是它沒有實現索引。從結果上來講,即便在普通的較小的數據集上,Spark查詢也可能花費幾分鐘的時間,由於須要進行全表掃描。若是使用Ignite,Spark用戶能夠配置主索引和二級索引,這樣能夠帶來上千倍的性能提高。緩存
下面經過一些代碼以及建立若干應用的方式,演示如何使用IgniteRDD以及看到它的好處,代碼的完整版本,能夠從GitHub上進行下載。框架
代碼共包括兩個簡單的Scala應用和兩個Java應用。這是爲了說明可使用多種語言來訪問Ignite RDD,這在使用不一樣編程語言和框架的組織中可能存在這樣的場景。此外,會從兩個不一樣的環境運行應用:從終端運行Scala應用以及經過IDE運行Java應用。做爲一個花絮,還會在Java應用程序中運行一些SQL代碼。
對於Scala應用,一個應用會用於往IgniteRDD中寫入部分數據,而另外一個應用會執行部分過濾而後結果集。使用Maven將代碼構建爲一個jar文件後在終端窗口中執行這個程序,下面是詳細的代碼:
object RDDWriter extends App { val conf = new SparkConf().setAppName("RDDWriter") val sc = new SparkContext(conf) val ic = new IgniteContext(sc, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml") val sharedRDD: IgniteRDD[Int, Int] = ic.fromCache("sharedRDD") sharedRDD.savePairs(sc.parallelize(1 to 1000, 10).map(i => (i, i))) ic.close(true) sc.stop() } object RDDReader extends App { val conf = new SparkConf().setAppName("RDDReader") val sc = new SparkContext(conf) val ic = new IgniteContext(sc, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml") val sharedRDD: IgniteRDD[Int, Int] = ic.fromCache("sharedRDD") val greaterThanFiveHundred = sharedRDD.filter(_._2 > 500) println("The count is " + greaterThanFiveHundred.count()) ic.close(true) sc.stop() }
在這個Scala的RDDWriter中,首先建立了包含應用名的SparkConf
,以後基於這個配置建立了SparkContext
,最後,根據這個SparkContext
建立一個IgniteContext
。建立IgniteContext
有不少種方法,本例中會使用一個叫作example-shared-rdd.xml
的XML文件,該文件會結合Ignite發行版而後根據需求進行了預配置。顯然,須要根據本身的環境修改路徑(Ignite主目錄),以後指定IgniteRDD持有的整數值元組,最後,將從1到1000的整數值存入IgniteRDD,數值的存儲使用了10個parallel操做。
在這個Scala的RDDReader中,初始化和配置與Scala RDDWriter相同,也會使用同一個xml配置文件,應用會執行部分過濾,而後關注存儲了多少大於500的值,答案最後會輸出出來。
關於IgniteContext
和IgniteRDD
的更多信息,能夠看Ignite的文檔。
要構建jar文件,可使用下面的maven命令:
mvn clean install
接下來,看下Java代碼,先寫一個Java應用往IgniteRDD中寫入多個元組,而後另外一個應用會執行部分過濾而後返回結果集,下面是RDDWriter的代碼細節:
public class RDDWriter { public static void main(String args[]) { SparkConf sparkConf = new SparkConf() .setAppName("RDDWriter") .setMaster("local") .set("spark.executor.instances", "2"); JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); Logger.getRootLogger().setLevel(Level.OFF); Logger.getLogger("org.apache.ignite").setLevel(Level.OFF); JavaIgniteContext<Integer, Integer> igniteContext = new JavaIgniteContext<Integer, Integer>( sparkContext, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml", true); JavaIgniteRDD<Integer, Integer> sharedRDD = igniteContext.<Integer, Integer>fromCache("sharedRDD"); List<Integer> data = new ArrayList<>(20); for (int i = 1001; i <= 1020; i++) { data.add(i); } JavaRDD<Integer> javaRDD = sparkContext.<Integer>parallelize(data); sharedRDD.savePairs(javaRDD.<Integer, Integer>mapToPair(new PairFunction<Integer, Integer, Integer>() { public Tuple2<Integer, Integer> call(Integer val) throws Exception { return new Tuple2<Integer, Integer>(val, val); } })); igniteContext.close(true); sparkContext.close(); } }
在這個Java的RDDWriter中,首先建立了包含應用名和執行器數量的SparkConf
,以後基於這個配置建立了SparkContext
,最後,根據這個SparkContext
建立一個IgniteContext
。建立IgniteContext
有不少種方法,本例中會使用一個叫作example-shared-rdd.xml
的XML文件,該文件會結合Ignite發行版而後根據需求進行了預配置。顯然,須要根據本身的環境修改路徑(Ignite主目錄),最後,往IgniteRDD中添加了額外的20個值。
在這個Java的RDDReader中,初始化和配置與Java RDDWriter相同,也會使用同一個xml配置文件,應用會執行部分過濾,而後關注存儲了多少大於500的值,答案最後會輸出出來,下面是Java RDDReader的代碼:
public class RDDReader { public static void main(String args[]) { SparkConf sparkConf = new SparkConf() .setAppName("RDDReader") .setMaster("local") .set("spark.executor.instances", "2"); JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); Logger.getRootLogger().setLevel(Level.OFF); Logger.getLogger("org.apache.ignite").setLevel(Level.OFF); JavaIgniteContext<Integer, Integer> igniteContext = new JavaIgniteContext<Integer, Integer>( sparkContext, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml", true); JavaIgniteRDD<Integer, Integer> sharedRDD = igniteContext.<Integer, Integer>fromCache("sharedRDD"); JavaPairRDD<Integer, Integer> greaterThanFiveHundred = sharedRDD.filter(new Function<Tuple2<Integer, Integer>, Boolean>() { public Boolean call(Tuple2<Integer, Integer> tuple) throws Exception { return tuple._2() > 500; } }); System.out.println("The count is " + greaterThanFiveHundred.count()); System.out.println(">>> Executing SQL query over Ignite Shared RDD..."); Dataset df = sharedRDD.sql("select _val from Integer where _val > 10 and _val < 100 limit 10"); df.show(); igniteContext.close(true); sparkContext.close(); } }
最後,立刻就能夠對代碼進行測試了。
在第一個終端窗口中,啓動Spark的主節點,以下:
$SPARK_HOME/sbin/start-master.sh
在第二個終端窗口中,啓動Spark工做節點,以下:
$SPARK_HOME/bin/spark-class org.apache.spark.deploy.worker.Worker spark://ip:port
根據本身的環境,修改IP地址和端口號(ip:port)。
在第三個終端窗口中,啓動一個Ignite節點,以下:
$IGNITE_HOME/bin/ignite.sh examples/config/spark/example-shared-rdd.xml
這裏使用了以前討論過的example-shared-rdd.xml
文件。
在第四個終端窗口中,能夠運行Scala版的RDDWriter應用,以下:
$SPARK_HOME/bin/spark-submit --class "com.gridgain.RDDWriter" --master spark://ip:port "/path_to_jar_file/ignite-spark-scala-1.0.jar"
根據本身的環境修改IP地址和端口(ip:port),以及jar文件的路徑(/path_to_jar_file)。
會產生以下的輸出:
The count is 500
這是咱們指望的值。
接下來,殺掉Spark的主節點和工做節點,而Ignite節點仍然在運行中而且IgniteRDD對於其餘應用仍然可用,下面會使用IDE經過Java應用接入IgniteRDD。
運行Java版RDDWriter會擴展以前存儲於IgniteRDD中的元組列表,經過運行Java版RDDReader能夠進行測試,它會產生以下的輸出:
The count is 520
這也是咱們指望的。
最後,SQL查詢會在IgniteRDD中執行一個SELECT語句,返回範圍在10到100之間的最初10個值,輸出以下:
+----+ |_VAL| +----+ | 11| | 12| | 13| | 14| | 15| | 16| | 17| | 18| | 19| | 20| +----+
結果正確。
本文中,看到了如何從多個環境中使用多個編程語言輕鬆地訪問IgniteRDD。能夠對IgniteRDD進行數據的讀寫,而且即便Spark已經關閉狀態也經過Ignite得以保持,所以能夠看到,這爲Spark用戶帶來了很大的靈活性和好處。
在本系列的下一篇文章中,會看到Ignite和Spark整合以後的Ignite DataFrames及其優點。