其餘更多java基礎文章:
java基礎學習(目錄)html
SparkSQL中的UDF至關因而1進1出,UDAF至關因而多進一出,相似於聚合函數。sql
開窗函數通常分組取topn時經常使用。express
SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.setAppName("udf");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> parallelize = sc.parallelize(Arrays.asList("zhansan","lisi","wangwu"));
JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Row call(String s) throws Exception {
return RowFactory.create(s);
}
});
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("name", DataTypes.StringType,true));
StructType schema = DataTypes.createStructType(fields);
DataFrame df = sqlContext.createDataFrame(rowRDD,schema);
df.registerTempTable("user");
/**
* 根據UDF函數參數的個數來決定是實現哪個UDF UDF1,UDF2。。。。UDF1xxx
*/
sqlContext.udf().register("StrLen", new UDF1<String,Integer>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Integer call(String t1) throws Exception {
return t1.length();
}
}, DataTypes.IntegerType);
sqlContext.sql("select name ,StrLen(name) as length from user").show();
複製代碼
new UDF1<String,Integer>()
這些參數須要對應,UDF2就是表示傳兩個參數,UDF3就是傳三個參數。例如new UDF2<String, Integer, Integer>()
,表示傳入兩個參數,第一個爲String類型,第二個爲Integer類型,返回Integer類型的數據。apache
UDAF: 用戶自定義聚合函數,user defined aggreagatefunction
實現UDAF函數若是要自定義類要繼承UserDefinedAggregateFunction類api
package com.spark.sparksql.udf_udaf;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
/**
* UDAF 用戶自定義聚合函數
* @author root
*
*/
public class UDAF {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("udaf");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> parallelize = sc.parallelize(
Arrays.asList("zhangsan","lisi","wangwu","zhangsan","zhangsan","lisi"));
JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Row call(String s) throws Exception {
return RowFactory.create(s);
}
});
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);
DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
df.registerTempTable("user");
/**
* 註冊一個UDAF函數,實現統計相同值得個數
* 注意:這裏能夠自定義一個類繼承UserDefinedAggregateFunction類也是能夠的
*/
sqlContext.udf().register("StringCount",new UserDefinedAggregateFunction() {
/**
*
*/
private static final long serialVersionUID = 1L;
/**
* 初始化一個內部的本身定義的值,在Aggregate以前每組數據的初始化結果
*/
@Override
public void initialize(MutableAggregationBuffer buffer) {
buffer.update(0, 0);
}
/**
* 更新 能夠認爲一個一個地將組內的字段值傳遞進來 實現拼接的邏輯
* buffer.getInt(0)獲取的是上一次聚合後的值
* 至關於map端的combiner,combiner就是對每個map task的處理結果進行一次小聚合
* 大聚和發生在reduce端.
* 這裏便是:在進行聚合的時候,每當有新的值進來,對分組後的聚合如何進行計算
*/
@Override
public void update(MutableAggregationBuffer buffer, Row arg1) {
buffer.update(0, buffer.getInt(0)+1);
}
/**
* 合併 update操做,多是針對一個分組內的部分數據,在某個節點上發生的 可是可能一個分組內的數據,會分佈在多個節點上處理
* 此時就要用merge操做,將各個節點上分佈式拼接好的串,合併起來
* buffer1.getInt(0) : 大聚合的時候 上一次聚合後的值
* buffer2.getInt(0) : 此次計算傳入進來的update的結果
* 這裏便是:最後在分佈式節點完成後須要進行全局級別的Merge操做
* 也能夠是一個節點裏面的多個executor合併
*/
@Override
public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
buffer1.update(0, buffer1.getInt(0) + buffer2.getInt(0));
}
/**
* 在進行聚合操做的時候所要處理的數據的結果的類型
*/
@Override
public StructType bufferSchema() {
return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("bffer111", DataTypes.IntegerType, true)));
}
/**
* 最後返回一個和DataType的類型要一致的類型,返回UDAF最後的計算結果
*/
@Override
public Object evaluate(Row row) {
return row.getInt(0);
}
/**
* 指定UDAF函數計算後返回的結果類型
*/
@Override
public DataType dataType() {
return DataTypes.IntegerType;
}
/**
* 指定輸入字段的字段及類型
*/
@Override
public StructType inputSchema() {
return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("nameeee", DataTypes.StringType, true)));
}
/**
* 確保一致性 通常用true,用以標記針對給定的一組輸入,UDAF是否老是生成相同的結果。
*/
@Override
public boolean deterministic() {
return true;
}
});
sqlContext.sql("select name ,StringCount(name) as strCount from user group by name").show();
sc.stop();
}
}
複製代碼
row_number() 開窗函數是按照某個字段分組,而後取另外一字段的前幾個的值,至關於分組取topN
開窗函數格式:row_number() over (partitin by XXX order by XXX)bash
package com.spark.sparksql.windowfun;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.hive.HiveContext;
/**是hive的函數,必須在集羣中運行。
* row_number()開窗函數:
* 主要是按照某個字段分組,而後取另外一字段的前幾個的值,至關於 分組取topN
* row_number() over (partition by xxx order by xxx desc) xxx
* 注意:
* 若是SQL語句裏面使用到了開窗函數,那麼這個SQL語句必須使用HiveContext來執行,HiveContext默認狀況下在本地沒法建立
* @author root
*
*/
public class RowNumberWindowFun {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setAppName("windowfun");
JavaSparkContext sc = new JavaSparkContext(conf);
HiveContext hiveContext = new HiveContext(sc);
hiveContext.sql("use spark");
hiveContext.sql("drop table if exists sales");
hiveContext.sql("create table if not exists sales (riqi string,leibie string,jine Int) "
+ "row format delimited fields terminated by '\t'");
hiveContext.sql("load data local inpath '/root/test/sales' into table sales");
/**
* 開窗函數格式:
* 【 row_number() over (partition by XXX order by XXX) as rank】//起個別名
* 注意:rank 從1開始
*/
/**
* 以類別分組,按每種類別金額降序排序,顯示 【日期,種類,金額】 結果,如:
*
* 1 A 100
* 2 B 200
* 3 A 300
* 4 B 400
* 5 A 500
* 6 B 600
* 排序後:
* 5 A 500 --rank 1
* 3 A 300 --rank 2
* 1 A 100 --rank 3
* 6 B 600 --rank 1
* 4 B 400 --rank 2
* 2 B 200 --rank 3
*
*/
DataFrame result = hiveContext.sql("select riqi,leibie,jine "
+ "from ("
+ "select riqi,leibie,jine,"
+ "row_number() over (partition by leibie order by jine desc) rank "
+ "from sales) t "
+ "where t.rank<=3");
result.show(100);
/**
* 將結果保存到hive表sales_result
*/
result.write().mode(SaveMode.Overwrite).saveAsTable("sales_result");
sc.stop();
}
}
複製代碼