Spark SQL 中 UDF 和 UDAF 的使用

Spark SQL 支持 Hive 的 UDF(User defined functions) 和 UDAF(User defined aggregation functions)java

UDF 傳入參數只能是表中的 1 行數據(能夠是多列字段),傳出參數也是 1 行,具體使用以下:sql

/** * 拼接一行中兩列字段,數據類型一個爲長整型,一個爲字符串 * Created by zhulei on 2017/6/20. */
public class ConcatLongStringUDF implements UDF3<Long, String, String, String> {
    @Override
    public String call(Long v1, String v2, String split) throws Exception {
        return String.valueOf(v1) + split + v2;
    }
}

//而後在 main 方法中註冊
sqlContext.udf().register("concat_long_string", new ConcatLongStringUDF(), DataTypes.StringType);

複製代碼

UDAF 傳入參數是多行的數據,而後經過聚合運算輸出一行數據,具體使用以下:ide

/** * <p> * 組內拼接去重函數 * 多行輸入,聚合成一行輸出 * Created by zhulei on 2017/6/20. */
public class GroupConcatDistinctUDAF extends UserDefinedAggregateFunction {

    /** * 定義輸入數據的 schema * 好比你要將多行多列的數據合併,能夠理解成輸入多行多列的數據所對應的 schema * 這裏輸入的只有一列數據,因此 schema 也就只有一個字段 */
    @Override
    public StructType inputSchema() {
        return DataTypes.createStructType(Collections.singletonList(
                DataTypes.createStructField("cityInfo", DataTypes.StringType, true)));
    }

    /** * 定義用來存儲中間計算結果的 buffer 對應的 schema * 這個值是根據你的計算過程來定的 */
    @Override
    public StructType bufferSchema() {
        return DataTypes.createStructType(Collections.singletonList(
                DataTypes.createStructField("bufferCityInfo", DataTypes.StringType, true)
        ));
    }

    /** * 輸出值的數據類型 */
    @Override
    public DataType dataType() {
        return DataTypes.StringType;
    }

    /** * 輸入值和輸出值是否是肯定的 */
    @Override
    public boolean deterministic() {
        return true;
    }

    /** * 初始化中間計算結果變量 */
    @Override
    public void initialize(MutableAggregationBuffer buffer) {
        buffer.update(0, "");
    }

    /** * 更新計算結果 * 不斷的將每一個輸入值經過你的計算方法去計算 */
    @Override
    public void update(MutableAggregationBuffer buffer, Row input) {
        String bufferCityIno = buffer.getString(0);
        String inputCityInfo = input.getString(0);

        if (!bufferCityIno.contains(inputCityInfo)) {
            if ("".equals(bufferCityIno)) {
                bufferCityIno += inputCityInfo;
            } else {
                bufferCityIno += "," + inputCityInfo;
            }
            buffer.update(0, bufferCityIno);
        }
    }

    /** * update 操做是某個節點上的計算 * merge 是將多個節點的結果進行合併 */
    @Override
    public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
        String aggBuffer1 = buffer1.getString(0);
        String aggBuffer2 = buffer2.getString(0);

        for (String ele : aggBuffer2.split(",")) {
            if (!aggBuffer1.contains(ele)) {
                if ("".equals(aggBuffer1)) {
                    aggBuffer1 += ele;
                } else {
                    aggBuffer1 += "," + ele;
                }
            }
        }

        buffer1.update(0, aggBuffer1);
    }

    /** * 輸出最終計算結果 */
    @Override
    public Object evaluate(Row buffer) {
        return buffer.getString(0);
    }

}

//而後在 main 方法中註冊
sqlContext.udf().register("group_concat_distinct", new GroupConcatDistinctUDAF());
複製代碼
相關文章
相關標籤/搜索