Spark是一個通用的大規模數據快速處理引擎。能夠簡單理解爲Spark就是一個大數據分佈式處理框架。基於內存計算的Spark的計算速度要比Hadoop的MapReduce快上50倍以上,基於磁盤的計算速度也快於10倍以上。Spark運行在Hadoop第二代的yarn集羣管理之上,能夠輕鬆讀取Hadoop的任何數據。可以讀取HBase、HDFS等Hadoop的數據源。java
從Spark 1.0版本起,Spark開始支持Spark SQL,它最主要的用途之一就是可以直接從Spark平臺上面獲取數據。而且Spark SQL提供比較流行的Parquet列式存儲格式以及從Hive表中直接讀取數據的支持。以後,Spark SQL還增長了對JSON等其餘格式的支持。到了Spark 1.3 版本Spark還可使用SQL的方式進行DataFrames的操做。咱們經過JDBC的方式經過前臺業務邏輯執行相關sql的增刪改查,經過遠程鏈接linux對文件進行導入處理,使項目可以初步支持Spark平臺,現現在已支持Spark1.6版本。那麼從應用的前臺與後臺兩個部分來簡介基於Spark的項目開發實踐。mysql
一、 JDBC鏈接方式。linux
前臺咱們使用ThriftServer鏈接後臺SparkSQL,它是一個JDBC/ODBC接口,經過配置Hive-site.xml,就可使前臺用JDBC/ODBC鏈接ThriftServer來訪問HDFS的數據。ThriftServer經過調用hive元數據信息找到表或文件信息在hdfs上的具體位置,並經過Spark的RDD實現了hive的接口。對於業務的增、刪、改、查都是經過SparkSQL對HDFS上存儲的相應表文件進行操做。項目前臺中須要引入相應hive-jdbc等的jar包。sql
<dependency>shell
<groupId>org.apache.hadoop</groupId>數據庫
<artifactId>hadoop-common</artifactId>apache
<version>${hadoop-common.version}</version>api
<exclusions>緩存
<exclusion>tomcat
<artifactId>jdk.tools</artifactId>
<groupId>jdk.tools</groupId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-compiler</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>jsp-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-common</artifactId>
<version>${hive-common.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive-exec.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${hive-jdbc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
<version>${hive-metastore.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-service</artifactId>
<version>${hive-service.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${httpclient.version}</version>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libfb303</artifactId>
<version>${libfb303.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j-api.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j-log4j12.version}</version>
</dependency>
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
<version>${jsch.version}</version>
</dependency>
二、Parquet列式文件存儲格式
咱們使用Parquet面向列存存儲的文件存儲結構現現在的Spark版本已經支持了列式存儲格式parquet,由於Parquet具備高壓縮比的特色且適合嵌套數據類型的存儲,可以避免沒必要要的IO性能。但在Spark1.3時並無默認支持,這裏就再也不對該文件格式進行過多的說明,建立parquet格式表結構建表語句以下:
Create table yangsy as select * from table name
三、數據的導入。
使用的是Apache的一個項目,最先做爲Hadoop的一個第三方模塊存在,主要功能是在Hadoop(hive)與傳統的數據庫(mysql、oracle等)間進行數據的傳遞,能夠將一個關係型數據庫中的數據導入到Hadoop的HDFS中,也能夠將HDFS的數據導進到關係數據庫中。
這裏注意,執行sqoop導入須要佔用yarn的資源進行mapreduce,因爲spark開啓後即使在空閒狀態下也不釋放內存,故修改spark-env.sh配置下降memory或暫時停用thfitserver,分以便運行sqoop。
Create job -f 3 -t 4
Creating job for links with from id 3 and to id 4
Please fill following values to create new job object
Name: Sqoopy
From database configuration
Schema name: hive
Table name: TBLS
Table SQL statement:
Table column names:
Partition column name:
Null value allowed for the partition column:
Boundary query:
ToJob configuration
Output format:
0 : TEXT_FILE
1 : SEQUENCE_FILE
Choose: 0
Compression format:
successfully created with validation status OK and persistent id 2
0 : NONE
1 : DEFAULT
2 : DEFLATE
3 : GZIP
4 : BZIP2
5 : LZO
6 : LZ4
7 : SNAPPY
8 : CUSTOM
Choose: 0
Custom compression format:
Output directory: hdfs://hadoop000:8020/sqoop2
Throttling resources
Extractors:
Loaders:
New job was
四、先後臺的交互實現工具類。
工具類提供靜態的方法,能夠進行相應業務邏輯的調用,因爲Hadoop集羣存在於服務器端,前臺須要實現跨平臺服務器的鏈接,才能執行相應的Hadoop命令,實現對HDFS上文件的操做。這次設計的ShellUtils類,經過jsch鏈接Linux服務器執行shell命令.須要引入jsch的jar包:
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
<version>${jsch.version}</version>
</dependency>
private static JSch jsch;
private static Session session;
public static void connect(String user, String passwd, String host) throws JSchException {
jsch = new JSch();
session = jsch.getSession(user, host,22);
session.setPassword(passwd);
java.util.Properties config = new java.util.Properties();
config.put("StrictHostKeyChecking", "no");
session.setConfig(config);
五、數據的下載:
經過傳入的Linux命令、用戶名、密碼等參數對遠程linux服務器進行鏈接。調用hadoop的cat命令直接將文件從HDFS上合併下來經過ftp方式傳入tomcat所在服務器,拿到相應的清單文件,大大減小了讀取生成文件所須要的時間。命令以下:
String command = "cd " + ftpPath + " && " + hadoopPath + "hadoop fs -cat '" + hdfsPath+ listRandomName + "/*'>" + listName1+".csv;"+ "sed -i '1i"+ title +"' " + listName1+".csv;"
CodecUtil類,用來實現不一樣類型壓縮文件的解壓工做,經過傳入的壓縮類型,利用反射機制鎖定壓縮的類型,因爲存儲在hdfs上的文件都是以文件塊的形式存在的,因此首先須要獲取hdfs中文件的二級子目錄,遍歷查詢到每個文件塊的文件路徑,隨後經過輸入輸出流進行文件的解壓工做。而後將此類打包成jar包放入集羣中,經過前臺遠程鏈接服務端,執行hadoop命令操做執行,實現類部分代碼以下:
public class CodecUtil{
public static void main(String[] args) throws Exception {
//compress("org.apache.hadoop.io.compress.GzipCodec");
String listName = args[0];
String codecType = args[1];
String hdfsPath = args[2];
uncompress(listName,codecType,hdfsPath);
//解壓縮
public static void uncompress(String listName,String CodecType,String hdfsPath) throws Exception{
Class<?> codecClass = Class.forName(CodecType);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path listf =new Path(hdfsPath+listName);
//獲取根目錄下的全部2級子文件目錄
FileStatus stats[]=fs.listStatus(listf);
CompressionCodec codec = (CompressionCodec)
ReflectionUtils.newInstance(codecClass, conf);
int i;
for ( i = 0; i < stats.length; i++){
//得到子文件塊的文件路徑
String Random = findRandom();
Path list = new Path(stats[i].getPath().toString());
InputStream in = codec.createInputStream(inputStream);
FSDataOutputStream output = fs.create(new Path(hdfsPath + listName+"/"+unListName));
IOUtils.copyBytes(in, output, conf);
IOUtils.closeStream(in);
}
}
}
六、功能性導入文件
經過功能選擇,將須要導入的CSV文件經過ftp方式上傳到Spark所在服務器,再將文件經過load的方式導入表中,實現導入文件的業務導入。執行sql以下:
String sql = " LOAD DATA LOCAL INPATH '" + Path + fileName
+ "' OVERWRITE INTO TABLE " + tabName;
七、獲取表頭信息。
可使用describe table,從而獲取HDFS上對應的表頭信息,從而根據業務進行相應的業務邏輯處理。
八、JDBC鏈接問題
這裏簡要說一下執行的性能問題,咱們經過JDBC方式提交SQL給spark,假若SQL中含有大量的窗口函數像row_number over()一類的,在大數據量的狀況下會形成任務執行完畢,但前臺jdbc卡死,程序沒法繼續進行的狀況。這是因爲像窗口函數以及聚合函數都是至關於MapReduce的Shuffle操做。在提交至Spark運行過程當中, DAGScheduler會把Shuffle的過程切分紅map和reduce兩個Stage(以前一直被我叫作shuffle前和shuffle後),map的中間結果是寫入到本地硬盤的,而不是內存,因此對磁盤的讀寫要求很是高,(最好是固態硬盤比較快,本人親自嘗試,一樣的性能參數下,固態硬盤會比普通磁盤快10倍。還有,經過調大shuffle partition的數目從而減小每一個task所運算的時間,能夠減小運行的時間,不至於前臺卡死。但不建議前臺使用窗口函數進行業務邏輯處理,前臺卡死的概率仍是很大。
九、性能調優部分參數
Spark默認序列化方式爲Java的ObjectOutputStream序列化一個對象,速度較慢,序列化產生的結果有時也比較大。因此項目中咱們使用kryo序列化方式,經過kryo序列化,使產生的結果更爲緊湊,減小內存的佔用空間,同時減小了對象自己的元數據信息與基本數據類型的開銷,從而更好地提升了性能。
Spark默認用於緩存RDD的空間爲一個executor的60%,項目中因爲考慮到標籤數量爲成百個,使用一樣規則與數量的標籤進行客戶羣探索及客戶羣生成的機率很小。因此修改spark.storage.memoryFaction=0.4,這樣使百分之60%的內存空間能夠在task執行過程當中緩存建立新對象,從而加大task的任務執行效率,以及spark.shuffle.memoryFraction參數。不過從至今Spark1.6已經動態的調整計算內存與緩存內存的大小,這個參數也可不比手動配置,具體要根據項目是緩存的數據仍是計算數據的比例來決定。
十、decimal數據類型改成double數據類型
Decimal數據類型在spark1.3及spark1.4版本沒法更好的支持parquet文件格式,生成文件時會報沒法識別該類型,現現在的版本已經更加優化了decimal,但具體是否支持暫時還沒有嘗試。
至此,前臺的相關方法就介紹完畢,開始後臺
所謂的後臺,就是進行真正的數據處理,用Scala編寫處理邏輯生成jar包提交於spark-submit,生成從而服務於上層應用的數據表。
一、 環境變量的加載
val sparkConf = new SparkConf()
val sc: SparkContext = new SparkContext(sparkConf)
val sqlContext = new HiveContext(sc)
這裏能夠經過直接set參數從而告訴spark要申請多少內存,多少個核,啓動多少個executer例如:
val sparkConf = new SparkConf().setMaster("yarn").setAppName("app")
.set("spark.executor.memory", "4g")
不過不建議在代碼中寫死,能夠寫個配置文件加載類往裏面傳入參數,也能夠經過在提交spark-submit的時候指定參數:
./bin/spark-submit --conf spark.ui.port=4444 --name "app" --master yarn-client --num-executors(num) --executor-cores (num) --executor-memory (num) --class main.asiainfo.coc.CocDss $CocBackHome/jar_name.jar
加載mysql中的配置信息表,從而進行相應的業務邏輯處理:
val mySQLUrl = "jdbc:mysql://localhost:3306/yangsy?user=root&password=yangsiyi"
val table_name = sqlContext.jdbc(mySQLUrl,"table_name").cache()
二、多表關聯:
val table_join = table1.join(table2, table1 ("id") === table2 ("id"),"inner") 或
Val table_join = table1.join(table2,」id」)
這裏要注意一點,多表進行join的時候很容易形成ID衝突,因爲應用於生產環境的依舊是Spark1.4版本(Spark1.5,1.6是否穩定有待測試,因此暫時沒有用),因此仍是使用第一種方法穩妥,該方法爲Spark1.3的使用方法,畢竟穩定第一。
二、 讀取本地數據文件,根據某個字段排序並註冊成表:
case class test(column1 : String,column2: String,column3: String)
(這裏要注意,case class必定要寫在業務處理方法以外)
val loadData = sc.textFile(path)
val data = loadData.map(_.split(",")).sortBy(line => line.indexOf(1))
val dataTable = sqlContext.repartition(400).createDataFrame(data).registerTempTable("test")
sqlContext.sql("drop table if exists asiainfo_yangsy")
sqlContext.sql("select * from test").toDF().saveAsTable("asiainfo_yangsy ")
這裏要注意的是,要調用registerTempTable函數,必須調用createDataFrame,通過資料查閱,讀取文件生成的RDD只是個普通的RDD,而registerTempTable並不屬於RDD類,因此經過建立SchemaRDD實例進行調用。隨後註冊成表後,轉化爲DataFrame,保存表至HDFS。,
順便提一下repartition函數,經過此函數來設置patition的數量。由於一個partition對應的就是stage的一個task,那麼根據真實的數據量進行設置,從而減小OOM的可能性。不過現現在Spark1.6版本已經支持自行調整parition數量,代碼中可不比添加。
四、讀取HDFS中的表或數據文件:
val loadData2 = sqlContext.read.table("asiainfo_")
五、describe函數
val select_table_cache = sqlContext.table(save_table_name)
al describeTable1 = select_table_cache.describe().show()
這裏要強調下,describe函數的調用並非前臺那樣,獲取表頭信息。而是獲取相應列數據的count、mean、stddev、min以及max值。用於作一些簡單的統計。
六、根據join後的DF生成須要業務數據的DF,並根據某個table某一字段進行排序
val select_table= table.select(table1("id"),
table("update_cycle"), table1("column_id"),
table2("table_id")).repartition(400).sort(table1("label_id"))
七、數據的MapReduce
val loadData = sc.textFile(path)
val map_reduce = loadData.
flatMap(line =>line.split(",")).map(w =>(w,1)).reduceByKey(_+_).foreach(println)
剩下還有不少使用的函數就不一一說明了,具體應用查官網API便可。