一、spark是什麼?java
Spark是基於內存計算的大數據並行計算框架。sql
1.1 Spark基於內存計算apache
相比於MapReduce基於IO計算,提升了在大數據環境下數據處理的實時性。編程
1.2 高容錯性和高可伸縮性api
與mapreduce框架相同,容許用戶將Spark部署在大量廉價硬件之上,造成集羣。緩存
二、spark編程session
每個spark應用程序都包含一個驅動程序(driver program ),他會運行用戶的main函數,並在集羣上執行各類並行操做(parallel operations)app
spark提供的最主要的抽象概念有兩種:
彈性分佈式數據集(resilient distributed dataset)簡稱RDD ,他是一個元素集合,被分區地分佈到集羣的不一樣節點上,能夠被並行操做,RDDS能夠從hdfs(或者任意其餘的支持Hadoop的文件系統)上的一個文件開始建立,或者經過轉換驅動程序中已經存在的Scala集合獲得,用戶也可讓spark將一個RDD持久化到內存中,使其能再並行操做中被有效地重複使用,最後RDD能自動從節點故障中恢復框架
spark的第二個抽象概念是共享變量(shared variables),它能夠在並行操做中使用,在默認狀況下,當spark將一個函數以任務集的形式在不一樣的節點上並行運行時,會將該函數所使用的每一個變量拷貝傳遞給每個任務中,有時候,一個變量須要在任務之間,或者驅動程序之間進行共享,spark支持兩種共享變量:
廣播變量(broadcast variables),它能夠在全部節點的內存中緩存一個值。
累加器(accumulators):只能用於作加法的變量,例如計算器或求和器分佈式
三、spark-sql
spark-sql是將hive sql跑在spark引擎上的一種方式,提供了基於schema處理數據的方式。
四、代碼詳解
java spark和spark-sql依賴。
pom.xml
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.10</artifactId> <version>1.6.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.6.0</version> <scope>provided</scope> </dependency>
基於spark1.6建立HiveContext客戶端。在spark2.1已經開始使用sparksession了。請注意。
package com.xiaoju.dqa.fireman.driver; import com.xiaoju.dqa.fireman.exception.SparkInitException; import com.xiaoju.dqa.fireman.utils.PropertiesUtil; import org.apache.spark.SparkConf; import org.apache.spark.sql.SQLContext; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.hive.HiveContext; import java.io.IOException; import java.util.Properties; public class SparkClient { private SparkConf sparkConf; private JavaSparkContext javaSparkContext; public SparkClient() { initSparkConf(); javaSparkContext = new JavaSparkContext(sparkConf); } public SQLContext getSQLContext() throws SparkInitException { return new SQLContext(javaSparkContext); } public HiveContext getHiveContext() throws SparkInitException { return new HiveContext(javaSparkContext); } private void initSparkConf() { try { PropertiesUtil propUtil = new PropertiesUtil("fireman.properties"); Properties prop = propUtil.getProperties(); String warehouseLocation = System.getProperty("user.dir"); sparkConf = new SparkConf() .setAppName(prop.getProperty("spark.appname")) .set("spark.sql.warehouse.dir", warehouseLocation) .setMaster(prop.getProperty("spark.master")); } catch (IOException ex) { ex.printStackTrace(); } } }
驅動程序driver
一、這裏要實現可序列化接口,不然spark並不會識別這個類。
二、這裏在經過spark-sql讀取到row數據以後,將schema解析出來,而且映射爲hashmap。
public class FiremanDriver implements Serializable { private String db; private String table; private HiveContext hiveContext;public FiremanDriver(String db, String table) { try { this.db = db; this.table = table; SparkClient sparkClient = new SparkClient(); hiveContext = sparkClient.getHiveContext(); } catch (SparkInitException ex) { ex.printStackTrace(); } }
public void check() { HashMap<String, Object> result = null; try { String query = String.format("select * from %s.%s", db ,table); System.out.println(query); DataFrame rows = hiveContext.sql(query); JavaRDD<Row> rdd = rows.toJavaRDD(); result = rdd.map(new Function<Row, HashMap<String, Object>>() { @Override public HashMap<String, Object> call(Row row) throws Exception { HashMap<String, Object> fuseResult = new HashMap<String, Object>(); HashMap<String, Object> rowMap = formatRowMap(row); // 實際map過程 return mapResult; } }).reduce(new Function2<HashMap<String, Object>, HashMap<String, Object>, HashMap<String, Object>>() { @Override public HashMap<String, Object> call(HashMap<String, Object> map1, HashMap<String, Object> map2) throws Exception { // reduce merge過程
return mergeResult; } }); } catch (Exception ex) { ex.printStackTrace(); } } // 讀取shema,這裏在經過spark-sql讀取到row數據以後,將schema解析出來,而且映射爲hashmap private HashMap<String, Object> formatRowMap(Row row){ HashMap<String, Object> rowMap = new HashMap<String, Object>(); try { for (int i=0; i<row.schema().fields().length; i++) { String colName = row.schema().fields()[i].name(); Object colValue = row.get(i); rowMap.put(colName, colValue); }catch (Exception ex) { ex.printStackTrace(); } return rowMap; } public static void main(String[] args) { String db = args[0]; String table = args[1]; FiremanDriver firemanDriver = new FiremanDriver(db, table); firemanDriver.check(); } }