Spark是一個通用的大規模數據快速處理引擎。能夠簡單理解爲Spark就是一個大數據分佈式處理框架。基於內存計算的Spark的計算速度要比Hadoop的MapReduce快上100倍以上,基於磁盤的計算速度也快於10倍以上。Spark運行在Hadoop第二代的yarn集羣管理之上,能夠輕鬆讀取Hadoop的任何數據。可以讀取HBase、HDFS等Hadoop的數據源。java
從Spark 1.0版本起,Spark開始支持Spark SQL,它最主要的用途之一就是可以直接從Spark平臺上面獲取數據。而且Spark SQL提供比較流行的Parquet列式存儲格式以及從Hive表中直接讀取數據的支持。以後,Spark SQL還增長了對JSON等其餘格式的支持。到了Spark 1.3 版本Spark還可使用SQL的方式進行DataFrames的操做。咱們經過JDBC的方式經過前臺業務邏輯執行相關sql的增刪改查,經過遠程鏈接linux對文件進行導入處理,使項目可以初步支持Spark平臺,現現在已支持Spark1.4版本。mysql
SparkSQL具備內置的SQL擴展的基類實現Catalyst,提供了提供瞭解析(一個很是簡單的用Scala語言編寫的SQL解析器)、執行(Spark Planner,生成基於RDD的物理計劃)和綁定(數據徹底存放於內存中)。linux
前臺咱們使用ThriftServer鏈接後臺SparkSQL,它是一個JDBC/ODBC接口,經過配置Hive-site.xml,就可使前臺用JDBC/ODBC鏈接ThriftServer來訪問SparkSQL的數據。ThriftServer經過調用hive元數據信息找到表或文件信息在hdfs上的具體位置,並經過Spark的RDD實現了hive的接口。對於標籤、客戶羣探索的增、刪、改、查都是經過SparkSQL對HDFS上存儲的相應表文件進行操做,突破了傳統數據庫的瓶頸,同時爲之後的客戶羣智能分析做了鋪墊。sql
1.數據的存儲格式shell
咱們使用Parquet面向列存存儲的文件存儲結構,由於Parquet具備高壓縮比的特色且適合嵌套數據類型的存儲,可以避免沒必要要的IO性能。Parquet建表以下所示:數據庫
CREATE TABLE dw_coclbl_d01_20140512_lzo_256_parquet(op_time string,apache
join_id double, city_id int, product_no string, brand_id int, vip_level int, county_id int, l2_01_01_04 double, l2_01_01_04_01 double)服務器
ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'STORED ASsession
INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat'oracle
OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat';
二、因爲壓縮文件佔用的空間較少,文件load的速度比較快。故使用壓縮文件進行數據的load.使用gzip進行壓縮時,單個文件只能在一個節點上進行load,加載時間很長。使用split命令將解壓後的csv文件分割成多個256M的小文件,機器上每一個block塊的大小爲128M,故將小文件分割爲128M或256M以保證效率。因爲Parquet存儲格式暫時只支持Gzip,項目中暫時使用Gzip壓縮格式。經過在控制檯輸入set mapreduce.output.fileoutputformat.compress=true指令命令設置壓縮格式爲true。再執行set mapreduce.output. fileoutput format.compress.codec = org.apache.hadoop.io.compress.GzipCodec;將文件的壓縮格式設置爲Gzip壓縮格式
三、數據的導入。使用的是Apache的一個項目,最先做爲Hadoop的一個第三方模塊存在,主要功能是在Hadoop(hive)與傳統的數據庫(mysql、oracle等)間進行數據的傳遞,能夠將一個關係型數據庫中的數據導入到Hadoop的HDFS中,也能夠將HDFS的數據導進到關係數據庫中。
因爲執行sqoop導入須要經過yarn的任務調度進行mapreduce,因爲spark開啓後即使在空閒狀態下也不釋放內存,故修改spark-env.sh配置,分配多餘內存以便sqoop執行。
Create job -f 3 -t 4
Creating job for links with from id 3 and to id 4
Please fill following values to create new job object
Name: Sqoopy
From database configuration
Schema name: hive
Table name: TBLS
Table SQL statement:
Table column names:
Partition column name:
Null value allowed for the partition column:
Boundary query:
ToJob configuration
Output format:
0 : TEXT_FILE
1 : SEQUENCE_FILE
Choose: 0
Compression format:
successfully created with validation status OK and persistent id 2
0 : NONE
1 : DEFAULT
2 : DEFLATE
3 : GZIP
4 : BZIP2
5 : LZO
6 : LZ4
7 : SNAPPY
8 : CUSTOM
Choose: 0
Custom compression format:
Output directory: hdfs://hadoop000:8020/sqoop2
Throttling resources
Extractors:
Loaders:
New job was
四、前臺與後臺交互工具類
工具類提供靜態的方法,能夠進行相應業務邏輯的調用,因爲Hadoop集羣存在於服務器端,前臺須要實現跨平臺服務器的鏈接,才能執行相應的Hadoop命令,實現對HDFS上文件的操做。這次設計的ShellUtils類,經過jsch鏈接Linux服務器執行shell命令.
private static JSch jsch;
private static Session session;
public static void connect(String user, String passwd, String host) throws JSchException {
jsch = new JSch();
session = jsch.getSession(user, host,22);
session.setPassword(passwd);
java.util.Properties config = new java.util.Properties();
config.put("StrictHostKeyChecking", "no");
session.setConfig(config);
經過傳入的Linux命令、用戶名、密碼等參數對遠程linux服務器進行鏈接。因爲執行Hadoop命令根據不一樣文件的大小所需佔用的時間是不一樣的,在hadoop還沒有將文件徹底從hdfs上合併到本地時,本地會提早生成文件但文件內容爲空,至此這裏須要多傳入前臺客戶羣探索出來的客戶羣數目與文件條數進行對比,假若數目相同則說明執行完畢。
CodecUtil類,用來實現不一樣類型壓縮文件的解壓工做,經過傳入的壓縮類型,利用反射機制鎖定壓縮的類型,因爲存儲在hdfs上的文件都是以文件塊的形式存在的,因此首先須要獲取hdfs中文件的二級子目錄,遍歷查詢到每個文件塊的文件路徑,隨後經過輸入輸出流進行文件的解壓工做。而後將此類打包成jar包放入集羣中,經過前臺遠程鏈接服務端,執行hadoop命令操做執行,實現類部分代碼以下:
public class CodecUtil{
public static void main(String[] args) throws Exception {
//compress("org.apache.hadoop.io.compress.GzipCodec");
String listName = args[0];
String codecType = args[1];
String hdfsPath = args[2];
uncompress(listName,codecType,hdfsPath);
//解壓縮
public static void uncompress(String listName,String CodecType,String hdfsPath) throws Exception{
Class<?> codecClass = Class.forName(CodecType);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path listf =new Path(hdfsPath+listName);
//獲取根目錄下的全部2級子文件目錄
FileStatus stats[]=fs.listStatus(listf);
CompressionCodec codec = (CompressionCodec)
ReflectionUtils.newInstance(codecClass, conf);
int i;
for ( i = 0; i < stats.length; i++){
//得到子文件塊的文件路徑
String Random = findRandom();
Path list = new Path(stats[i].getPath().toString());
InputStream in = codec.createInputStream(inputStream);
FSDataOutputStream output = fs.create(new Path(hdfsPath + listName+"/"+unListName));
IOUtils.copyBytes(in, output, conf);
IOUtils.closeStream(in);
}
}
}
五、導入生成客戶羣
因爲sparksql不支持insert into value語句,沒法經過jdbc方式鏈接後臺HDFS經過sparksql對文件進行導入數據的操做。因而將須要導入的csv文件經過ftp方式上傳到遠程服務器,再將文件經過load的方式導入表中,實現導入生成客戶羣的功能。
// 將文件上傳到ftp服務器
CiFtpInfo ftp = customFileRelService.getCiFtpInfoByFtpType(1);
FtpUtil.ftp(ftp.getFtpServerIp(),ftp.getFtpPort(),ftp.getFtpUser(),DES.decrypt
(ftp.getFtpPwd()), ftpFileName, ftp.getFtpPath());
// 將文件load到表中
String ftpPath = ftp.getFtpPath();
if (!ftpPath.endsWith("/")) {
ftpPath = ftpPath + "/";
}
String sql = " LOAD DATA LOCAL INPATH '" + ftpPath + fileName
+ "' OVERWRITE INTO TABLE " + tabName;
log.info("loadSql=" + sql);
customersService.executeInBackDataBase(sql);
log.info("load table=" + tabName + " successful");
六、數據表或文件下載的實現
因爲存儲在hdfs上的數據爲Gzip壓縮格式,首先經過執行事先編好的解壓代碼對文件塊進行解壓,這裏須要傳入須要解壓的文件名、解壓類型、hdfs的徹底路徑,解壓完畢後經過執行hadoop文件合併命令將文件從hdfs上合併到本地服務器,合併完畢後因爲解壓縮後的文件會佔用hdfs的空間,同時執行hadoop文件刪除命令將解壓後的文件刪除,再經過ftp傳到前臺服務器,完成客戶羣清單下載。
String command = "cd " + ftpPath + ";" + hadoopPath + "hadoop jar "+hadoopPath+"CodecTable.jar " + listRandomName +" "+ CodecType
+" " + " "+ hdfsWholePath + ";" + hadoopPath + "hadoop fs -cat '" + hdfsPath + listRandomName + "/*'>" + listName1+".csv;" + hadoopPath +"hadoop fs -rm -r " + hdfsPath + listRandomName + ";" + "wc -l " + listName1 +".csv;";
LOG.debug(command);
flag = ShellUtils.execCmd(command, user, passwd, host,num);
清單的推送也是經過文件合併傳輸的方式進行其餘平臺的推送,大大下降了讀取數據插入表數據所消耗的時間。