ETL實踐--Spark作數據清洗

 

ETL實踐--Spark作數據清洗java

 

 

上篇博客,說的是用hive代替kettle的表關聯。是爲了提升效率。mysql

本文要說的spark就不光是爲了效率的問題。sql

 

一、用spark的緣由app

(若是是一個sql能搞定的關聯操做,能夠直接用kettle導原始數據到hive,用hive視圖作關聯直接給kylin提供數據)函數

(1)、場景一以前用kettle須要多個轉換、關聯才能實現數據清洗的操做。學習

        用hive不知道如何進行,就算能進行也感受繁瑣,同時多個步驟必然下降數據時效性。用mr的話也是一樣道理太多步驟繁瑣不堪。測試

(2)、一些不能用sql來處理的數據清洗邏輯。好比循環類的,或者是更復雜的處理邏輯。用hive和kettle都不方便解決。大數據

一些其餘的緣由ui

(3)、支持的語言多,容易上手,而且以前也學習過一些。url

(4)、我公司用的大數據平臺上,提供了spark的支持,能夠方便的安裝和維護,而且能夠和現有平臺很好的融合(yarn部署方式)。

(5)、效率高。

(6)、恰好公司有須要用到spark streaming。

 

 

二、下面是我學習用spark處理業務問題的一個例子。有註釋和一些方法的測試。

 

public class EtlSpark5sp2Demo {

//三、解碼器
//static final Encoder<EcardAccessOutTime> outTimeEncoder = Encoders.bean(EcardAccessOutTime.class);
static final Encoder<EcardAccessInout> inoutEncoder = Encoders.bean(EcardAccessInout.class);

public static void main(String[] args) {
SparkSession spark = SparkSession
.builder().master("local")
.appName("Java Spark SQL data sources example")
.config("spark.some.config.option", "some-value")
.getOrCreate();

runJdbcDatasetExample(spark);
spark.stop();
}

private static void runJdbcDatasetExample(SparkSession spark) {

Properties connectionProperties = new Properties();
connectionProperties.put("user", "root123");
connectionProperties.put("password", "123");
String dbUrl="jdbc:mysql://192.168.100.4:3306/datasql?autoReconnect=true&useUnicode=true&characterEncoding=UTF-8";

//對比java中鏈接mysql的字符串(標紅的是區別,分割字符串是spark識別不了的,要去掉)
//  mysql.url=jdbc:mysql://192.168.100.4:3306/datasql?autoReconnect=true&amp;useUnicode=true&amp;characterEncoding=UTF-8

//一、查詢原始數據
final Dataset<Row> allRecord = spark.read()
.jdbc(dbUrl,
"( select a.kid,a.outid,a.ioflag,a.OpDT as 'indate',a.OpDT as 'outdate',a.school_code ,a.faculty_code,a.major_code,a.class_code,a.sex from access_record_inout_temp2 a limit 3000 ) t", connectionProperties);

final Dataset<EcardAccessInout> allRecordInout=allRecord.as(inoutEncoder);
 spark提供了把數據集註冊成視圖,而後用sql的方式對數據集進行處理的功能:以下二、3所示

//二、將原始數據註冊成視圖
allRecord.createOrReplaceTempView("view_access_record_inout_temp2");
allRecord.printSchema();//打印數據集結構

 //三、在上面註冊的視圖上執行sql測試:查詢出進入宿舍的記錄
 final Dataset<EcardAccessInout> inRecord = spark.sql(
" select a.kid,a.outid,a.ioflag,a.school_code,a.faculty_code,a.major_code,a.class_code,a.sex,a.indate,a.outdate " +
" from view_access_record_inout_temp2 a " +
" where a.ioflag = 0 ").as(inoutEncoder);
inRecord.printSchema();
//打印數據集結構
 inRecord.show();
 
 
 spark不光提供了針對註冊的視圖的sql查詢。也提供了經過方法來查詢數據集的方式:下面是2種方式


//四、filter用法測試:
 String outid="45723107";
long kid=7516452;
//filter方法可以正常使用
Dataset<Row> list1 = allRecord.filter(allRecord.col("outid").equalTo(outid).and(allRecord.col("kid").gt(kid))).orderBy(allRecord.col("indate"));//.take(1);//
list1.show();
//打印前20條記錄


 五、where用法測試

//where方法可以正常使用
 Dataset<Row> list2 = allRecord.where("outid = '"+ outid +"'").where("kid > "+ kid +"").orderBy("indate");
list2.show();
//打印前20條記錄




//六、分組取topN:測試::mysql中能夠group by 2個字段查詢所有的字段。實際返回值是取的分組後的第一條記錄。
(對應實際業務就按照學號和時間去重,數據當中有重複數據)
 6.一、mysql中的原始sql
   select a.id,a.outid,g.school_code,g.faculty_code,g.major_code,g.class_code,g.sex,a.OpDT,a.ioflag

                            from access_record a inner join own_org_student g on a.OutId=g.outid
                               where a.id > ?   group by a.OutId,a.OpDT

 
 6.二、hive和spark都不支持這種操做。他們的作法是同樣的。就是經過下面這個sql,用row_number()函數分組,取第一
 final Dataset<Row> topRecord = spark.sql(
" select t.kid,t.outid,t.ioflag,t.school_code,t.faculty_code,t.major_code,t.class_code,t.sex,t.indate,t.outdate from (" +
" select a.kid,a.outid,a.ioflag,a.school_code,a.faculty_code,a.major_code,a.class_code,a.sex,a.indate,a.outdate, " +
" row_number() over(partition by outid order by indate) as rowNumber " + //根據行號top
" from view_access_record_inout_temp2 a " +
" where a.ioflag = 0 " +
" ) t where rowNumber =1 ");

topRecord.show();//打印前20條記錄




//七、循環:合併進入記錄和出去的記錄
7.一、這裏先進行先按照學號分區,再按照時間排序。
Dataset<EcardAccessInout> allRecordSort= allRecordInout.repartition(allRecordInout.col("outid")).sortWithinPartitions(allRecordInout.col("indate"));
 七、循環:合併進入記錄和出去的記錄(由於已經排序了,本條記錄的下一條,若是是正常記錄就是出去的記錄)
 Iterator<EcardAccessInout> iterator = allRecordSort.toLocalIterator(); List<EcardAccessInout> result=new ArrayList<EcardAccessInout>(); while(iterator.hasNext()){ EcardAccessInout first= iterator.next(); if("0".endsWith(first.getIoflag())){//第一條記錄是:進入宿舍的記錄 if(iterator.hasNext()){ EcardAccessInout second= iterator.next(); //取比入記錄大的最小的一條出記錄的時間,做爲入記錄的出時間。(排序後,後面一條就是最小記錄) if(first.getOutid().endsWith(second.getOutid())&&("1".endsWith(second.getIoflag()))&&first.getIndate().before(second.getIndate())){ first.setOutdate(second.getIndate()); result.add(first); } } } } //八、處理後的數據寫入mysql。這裏只是個例子,實際數據應該是寫到hdfs,變成hive表 inRecord.write().mode(SaveMode.Append) .jdbc(dbUrl, "datacenter.access_record_inout_temp10", connectionProperties); }}
相關文章
相關標籤/搜索