本案例根據某電網公司的真實業務需求,經過Blink SQL+UDAF實現實時流上的差值聚合計算,經過本案例,讓讀者熟悉UDAF編寫,並理解UDAF中的方法調用關係和順序。
感謝@軍長在實現過程當中的指導。筆者水平有限,如有紕漏,請批評指出。
html
電網公司天天採集各個用戶的電錶數據(格式以下表),其中data_date爲電錶數據上報時間,cons_id爲電錶id,r1爲電錶度數,其餘字段與計算邏輯無關,可忽略。爲了後續演示方便,僅輸入cons_id=100000002的數據。java
no(string) | data_date(string) | cons_id(string) | org_no(string) | r1(double) |
---|---|---|---|---|
101 | 20190716 | 100000002 | 35401 | 13.76 |
101 | 20190717 | 100000002 | 35401 | 14.12 |
101 | 20190718 | 100000002 | 35401 | 16.59 |
101 | 20190719 | 100000002 | 35401 | 18.89 |
表1:輸入數據
電網公司但願經過實時計算(Blink)對電錶數據處理後,天天獲得每一個電錶最近兩天(當天和前一天)的差值數據,結果相似以下表:sql
cons_id(string) | data_date(string) | subDegreeR1(double) |
---|---|---|
100000002 | 20190717 | 0.36 |
100000002 | 20190718 | 2.47 |
100000002 | 20190719 | 2.3 |
根據客戶的需求,比較容易獲得兩種解決方案:一、經過over窗口(2 rows over window)開窗進行差值聚合;二、經過hop窗口(sliding=1天,size=2天)進行差值聚合。
over窗口和hop窗口均是Blink支持的標準窗口,使用起來很是簡單。本需求的最大難點在於差值聚合,Blink支持SUM、MAX、MIN、AVG等內置的聚合函數,但沒有知足業務需求的差值聚合函數,所以須要經過自定義聚合函數(UDAF)來實現。
api
實時計算自定義函數開發搭建環境請參考UDX概述(https://help.aliyun.com/docum...,在此再也不贅述。本案例使用Blink2.2.7版本,下面簡要描述關鍵代碼的編寫。
完整代碼(爲了方便上傳,使用了txt格式):SubtractionUdaf.txt
一、在com.alibaba.blink.sql.udx.SubtractionUdaf包中建立一個繼承AggregateFunction類的SubtractionUdaf類。緩存
public class SubtractionUdaf extends AggregateFunction<Double, SubtractionUdaf.Accum>
其中Double是UDAF輸出的類型,在本案例中爲相鄰兩天的電錶差值度數。SubtractionUdaf.Accum是內部自定義的accumulator數據結構。
二、定義accumulator數據結構,用戶保存UDAF的狀態。網絡
public static class Accum { private long currentTime;//最新度數的上報時間 private double oldDegree;//前一次度數 private double newDegree;//當前最新度數 private long num; //accumulator中已經計算的record數量,主要用於merge private List<Tuple2<Double, Long>> listInput;//緩存全部的輸入,主要用於retract }
三、實現createAccumulator方法,初始化UDAF的accumulator數據結構
//初始化udaf的accumulator public SubtractionUdaf.Accum createAccumulator() { SubtractionUdaf.Accum acc = new SubtractionUdaf.Accum(); acc.currentTime = 0; acc.oldDegree = 0.0; acc.newDegree = 0.0; acc.num = 0; acc.listInput = new ArrayList<Tuple2<Double, Long>>(); return acc; }
四、實現getValue方法,用於經過存放狀態的accumulator計算UDAF的結果,本案例需求是計算新舊數據二者的差值。函數
public Double getValue(SubtractionUdaf.Accum accumulator) { return accumulator.newDegree - accumulator.oldDegree; }
五、實現accumulate方法,用於根據輸入數據更新UDAF存放狀態的accumulator。考慮到數據可能亂序以及可能的retract,數據數據包括了對應的度數iValue,還包括上報度數的時間(構造的事件時間ts)。性能
public void accumulate(SubtractionUdaf.Accum accumulator, double iValue, long ts) { System.out.println("method : accumulate" ); accumulator.listInput.add(Tuple2.of(Double.valueOf(iValue),Long.valueOf(ts))); Collections.sort(accumulator.listInput,this.comparator);//按照時間排序 accumulator.num ++; if(accumulator.listInput.size() == 1){ accumulator.newDegree = iValue; accumulator.oldDegree = 0.0; accumulator.currentTime = ts; }else {//處理可能存在的數據亂序問題 accumulator.newDegree = accumulator.listInput.get(0).f0; accumulator.currentTime = accumulator.listInput.get(0).f1; accumulator.oldDegree = accumulator.listInput.get(1).f0; } }
其中accumulator爲UDAF的狀態,iValue和ts爲實際的輸入數據。
注意須要處理可能存在的輸入數據亂序問題。
六、實現retract方法,用於在某些優化場景下(如使用over窗口)對retract的數據進行處理。
public void retract(SubtractionUdaf.Accum accumulator, double iValue, long ts) throws Exception{ if(accumulator.listInput.contains(Tuple2.of(iValue, ts))){ if(accumulator.listInput.indexOf(Tuple2.of(iValue, ts)) == 0){//retract的是最新值 accumulator.listInput.remove(0); accumulator.num--; if(accumulator.listInput.isEmpty()){ accumulator.currentTime = 0; accumulator.oldDegree = 0.0; accumulator.newDegree = 0.0; }else if(accumulator.listInput.size() == 1) { accumulator.currentTime = accumulator.listInput.get(0).f1; accumulator.newDegree = accumulator.listInput.get(0).f0; accumulator.oldDegree = 0.0; }else{ accumulator.currentTime = accumulator.listInput.get(0).f1; accumulator.newDegree = accumulator.listInput.get(0).f0; accumulator.oldDegree = accumulator.listInput.get(1).f0; } } else if(accumulator.listInput.indexOf(Tuple2.of(iValue, ts)) == 1){//retract的是次新值 accumulator.listInput.remove(1); accumulator.num--; if(accumulator.listInput.size() == 1){ accumulator.oldDegree = 0.0; }else { accumulator.oldDegree = accumulator.listInput.get(1).f0; } }else {//retract的是其餘值 accumulator.listInput.remove(Tuple2.of(iValue, ts)); accumulator.num--; } }else { throw new Exception("Cannot retract a unexist record : iValue = "+ iValue + "timestamp = "+ ts); } }
須要考慮retract的是最新的數據仍是次新的數據,須要不一樣的邏輯處理。
七、實現merge方法,用於某些優化場景(如使用hop窗口)。
public void merge(SubtractionUdaf.Accum accumulator, Iterable<SubtractionUdaf.Accum> its) { int i = 0; System.out.println("method : merge" ); System.out.println("accumulator : "+ accumulator.newDegree); System.out.println("accumulator : "+ accumulator.currentTime); for (SubtractionUdaf.Accum entry : its) { if(accumulator.currentTime < entry.currentTime){ if(entry.num > 1){ accumulator.currentTime = entry.currentTime; accumulator.oldDegree = entry.oldDegree; accumulator.newDegree = entry.newDegree; accumulator.num += entry.num; accumulator.listInput.addAll(entry.listInput); }else if(entry.num == 1){ accumulator.currentTime = entry.currentTime; accumulator.oldDegree = accumulator.newDegree; accumulator.newDegree = entry.newDegree; accumulator.num ++; accumulator.listInput.addAll(entry.listInput); } }else{ if(accumulator.num > 1){ accumulator.num += entry.num; accumulator.listInput.addAll(entry.listInput); }else if(accumulator.num == 1){ accumulator.oldDegree = entry.newDegree; accumulator.num += entry.num; accumulator.listInput.addAll(entry.listInput); }else if(accumulator.num == 0){ accumulator.currentTime = entry.currentTime; accumulator.oldDegree = entry.oldDegree; accumulator.newDegree = entry.newDegree; accumulator.num = entry.num; accumulator.listInput.addAll(entry.listInput); } } Collections.sort(accumulator.listInput,this.comparator); System.out.println("merge : "+i); System.out.println("newDegree : "+entry.newDegree); System.out.println("oldDegree = "+entry.oldDegree); System.out.println("currentTime : "+entry.currentTime); } }
須要考慮merge的是不是比當前新的數據,須要不一樣的處理邏輯。
八、其餘方面,考慮到須要對輸入度數按照事件時間排序,在open方法中實例化了自定義的Comparator類,對accumulator數據結構中的inputList按事件時間的降序排序。
public void open(FunctionContext context) throws Exception { //定義record的前後順序,用於listInput的排序,時間越新的record在list中越前面 this.comparator = new Comparator<Tuple2<Double, Long>>() { public int compare( Tuple2<Double, Long> o1, Tuple2<Double, Long> o2) { if (Long.valueOf(o1.f1) < Long.valueOf(o2.f1)) { return 1; } else if (Long.valueOf(o1.f1) > Long.valueOf(o2.f1)) { return -1; }else { return 0; } } }; }
請參考[使用IntelliJ IDEA開發自定義函數]()完成UDAF編譯、打包,並參考UDX概述完成資源的上傳和引用。
SQL代碼以下,語法檢查、上線、啓動做業(選擇當前啓動位點)。並將表1數據上傳至datahub。
CREATE FUNCTION OverWindowSubtractionUdaf as 'com.alibaba.blink.sql.udx.SubtractionUdaf'; CREATE TABLE input_dh_e_mp_read_curve ( `no` VARCHAR, data_date VARCHAR, cons_id VARCHAR, org_no VARCHAR, r1 DOUBLE, ts as TO_TIMESTAMP(concat(data_date,'000000'),'yyyyMMddHHmmss') ,WATERMARK wk FOR ts as withOffset(ts, 2000) ) WITH ( type = 'datahub', endPoint = 'http://dh-cn-shanghai.aliyun-inc.com', roleArn='acs:ram::XXX:role/aliyunstreamdefaultrole', project = 'jszc_datahub', topic = 'input_dh_e_mp_read_curve' ); CREATE TABLE data_out( cons_id varchar ,data_date varchar ,subDegreeR1 DOUBLE )with( type = 'print' ); INSERT into data_out SELECT cons_id ,last_value(data_date) OVER ( PARTITION BY cons_id ORDER BY ts ROWS BETWEEN 1 preceding AND CURRENT ROW) as data_date ,OverWindowSubtractionUdaf(r1,unix_timestamp(ts)) OVER ( PARTITION BY cons_id ORDER BY ts ROWS BETWEEN 1 preceding AND CURRENT ROW) as data_date FROM input_dh_e_mp_read_curve
因爲使用了print connector,從對應的sink的taskmanager.out日誌中能夠查看到輸出以下(已忽略其餘debug日誌):
task-1> (+)100000002,20190716,13.76 task-1> (+)100000002,20190717,0.35999999999999943 task-1> (+)100000002,20190718,2.4700000000000006
對比指望輸出(表2),20190717和20190718兩個窗口的數據均正確,代表業務邏輯正確,但此輸出與指望輸出有少量差別:
(1)20190716輸出爲13.76,這是由於第一個over窗口只有一條數據致使的,這種數據能夠在業務層過濾掉;
(2)20190719的數據沒有輸出,這是由於咱們設置了watermark,測試環境下20190719以後沒有數據進來觸發20190719對應的窗口的結束。
SQL代碼以下:語法檢查、上線、啓動做業(選擇當前啓動位點)。並將表1數據上傳至datahub。
CREATE FUNCTION HopWindowSubtractionUdaf as 'com.alibaba.blink.sql.udx.SubtractionUdaf'; CREATE TABLE input_dh_e_mp_read_curve ( `no` VARCHAR, data_date VARCHAR, cons_id VARCHAR, org_no VARCHAR, r1 DOUBLE, ts as TO_TIMESTAMP(concat(data_date,'000000'),'yyyyMMddHHmmss') ,WATERMARK wk FOR ts as withOffset(ts, 2000) ) WITH ( type = 'datahub', endPoint = 'http://dh-cn-shanghai.aliyun-inc.com', roleArn='acs:ram::XXX:role/aliyunstreamdefaultrole', project = 'jszc_datahub', topic = 'input_dh_e_mp_read_curve' ); CREATE TABLE data_out( cons_id varchar ,data_date varchar ,subDegreeR1 DOUBLE )with( type = 'print' ); INSERT into data_out SELECT cons_id ,DATE_FORMAT(HOP_end(ts, INTERVAL '1' day,INTERVAL '2' day), 'yyyyMMdd') ,HopWindowSubtractionUdaf(r1,unix_timestamp(ts)) FROM input_dh_e_mp_read_curve group by hop(ts, INTERVAL '1' day,INTERVAL '2' day),cons_id;
因爲使用了print connector,從對應的sink的taskmanager.out日誌中能夠查看到輸出以下(已忽略其餘debug日誌):
task-1> (+)100000002,20190716,13.76 task-1> (+)100000002,20190717,0.35999999999999943 task-1> (+)100000002,20190718,2.4700000000000006
對比指望輸出(表2),20190717和20190718兩個窗口的數據均正確,代表業務邏輯正確,但此輸出與指望輸出有少量差別:
(1)20190716輸出爲13.76,這是由於第一個hop窗口只有一條數據致使的,這種數據能夠在業務層過濾掉;
(2)20190719的數據沒有輸出,這是由於咱們設置了watermark,測試環境下20190719以後沒有數據進來觸發20190719對應的窗口的結束。
UDAF中主要有createAccumulator、getValue、accumulate、retract和merge方法,其調用關係和順序並非徹底肯定,而是與Blink底層優化、Blink版本、開窗類型(如hop仍是over窗口)等相關。
比較肯定的是一次正常(沒有failover)的做業,createAccumulator方法只在做業啓動時調用一次,accumulate方法在每條數據輸入時調用一次,在觸發數據輸出時會調用一次getValue(並不表明只調用一次)。
而retract方法和merge方法則跟具體的優化方式或開窗類型有關,本案例中over窗口調用retract方法而不調用merge方法,hop窗口調用merge方法而不調用retract方法。
你們能夠增長日誌,觀察這幾個方法的調用順序,仍是蠻有意思的。
UDAF中必須實現createAccumulator、getValue、accumulate方法,可選擇實現retract和merge方法。
通常狀況下,可先實現createAccumulator、getValue、accumulate三個方法,而後編寫SQL後進行語法檢查,SQL編譯器會提示是否須要retract或merge方法。
好比,若是沒有實現retract方法,在使用over窗口時,語法檢查會報相似以下錯誤:
org.apache.flink.table.api.ValidationException: Function class 'com.alibaba.blink.sql.udx.SubtractionUdaf' does not implement at least one method named 'retract' which is public, not abstract and (in case of table functions) not static.
好比,若是沒有實現merge方法,在使用over窗口時,語法檢查會報相似以下錯誤:
org.apache.flink.table.api.ValidationException: Function class 'com.alibaba.blink.sql.udx.SubtractionUdaf' does not implement at least one method named 'merge' which is public, not abstract and (in case of table functions) not static.
(1)本案例沒有考慮數據缺失的問題,好比由於某種緣由(網絡問題、數據採集問題等)缺乏20190717的數據。這種狀況下會是什麼樣的結果?你們能夠自行測試下;(2)本案例使用了一個List,而後經過Collections.sort方法進行排序,這不是很優的方法,若是用優先級隊列(priority queue)性能應該會更好;