背景java
自定義聚合函數git
實例講解github
背景
在網站性能測試中,咱們常常會選擇 TP50、TP95 或者 TP99 等做爲性能指標。接下來咱們講講這些指標的含義、以及在flink中如何實時統計:sql
TP50,top percent 50,即 50% 的數據都知足某一條件;apache
TP95,top percent 95,即 95% 的數據都知足某一條件;微信
TP99,top percent 99,即 99% 的數據都知足某一條件;app
咱們舉一個例子,咱們要統計網站一分鐘以內的的響應時間的TP90,正常的處理邏輯就是把這一分鐘以內全部的網站的響應時間從小到大排序,而後計算出總條數count,而後計算出排名在90%的響應時間是多少(count*0.9),就是咱們要的值。ide
自定義聚合函數
這個需求很明顯就是一個使用聚合函數來作的案例,Flink中提供了大量的聚合函數,好比count,max,min等等,可是對於這個需求,卻沒法知足,因此咱們須要自定義一個聚合函數來實現咱們的需求。函數
在前段時間,咱們聊了聊flink的聚合算子,具體可參考: flink實戰-聊一聊flink中的聚合算子 , 聚合算子是咱們在寫代碼的時候用來實現一個聚合功能,聚合函數其實和聚合算子相似,只不過聚合函數用於在寫sql的時候使用。性能
自定義聚合函數須要繼承抽象類org.apache.flink.table.functions.AggregateFunction。並實現下面幾個方法。
createAccumulator():這個方法會在一次聚合操做的開始調用一次,主要用於構造一個Accumulator,用於存儲在聚合過程當中的臨時對象。
accumulate() 這個方法,每來一條數據會調用一次這個方法,咱們就在這個方法裏實現咱們的聚合函數的具體邏輯。
getValue() 這個方法是在聚合結束之後,對中間結果作處理,而後將結果返回,最終sql中獲得的結果數據就是這個值。
實例講解
對於TP指標,正常的思路咱們能夠先建立一個臨時變量,裏面有一個list,每來一個數據,就放到這個list裏面,在getValue方法裏,進行排序,取相應的TP值。
可是這種思路會有一個問題,就是若是要聚合的時間範圍內,數據過多的話。就會在list存儲大量的數據,會形成checkpoint過大,時間過長,最後致使程序失敗。得不到正確的結果。
因此咱們須要換一個思路,既然最後咱們想要的是一個有序列表,那麼咱們是否是能夠把這個list結構優化一下,使用Treemap來存儲,map的key就是指標,好比響應時間。value就是對應的指標出現的次數。這樣getValue方法裏,只須要將map的value值累加,就能獲得總數count,而後計算出來相應的tp值的位置position,最後咱們再從頭累加map的value,直到累加結果大於相應的位置position,則map的key即爲所求。
示例以下:咱們先構建一個source,只是隨機生成一個變量,網站的相應時間response_time。
String sql = "CREATE TABLE source (\n" +
" response_time INT,\n" +
" ts AS localtimestamp,\n" +
" WATERMARK FOR ts AS ts," +
"proctime as proctime()\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='1000',\n" +
" 'fields.response_time.min'='1',\n" +
" 'fields.response_time.max'='1000'" +
")";
定義一個聚合函數用的臨時變量:
public static class TPAccum{
public Integer tp;
public Map<Integer,Integer> map = new HashMap<>();
}
實現自定義聚合函數類
public static class TP extends AggregateFunction<Integer,TPAccum>{
@Override
public TPAccum createAccumulator(){
return new TPAccum();
}
@Override
public Integer getValue(TPAccum acc){
if (acc.map.size() == 0){
return null;
} else {
Map<Integer,Integer> map = new TreeMap<>(acc.map);
int sum = map.values().stream().reduce(0, Integer::sum);
int tp = acc.tp;
int responseTime = 0;
int p = 0;
Double d = sum * (tp / 100D);
for (Map.Entry<Integer,Integer> entry: map.entrySet()){
p += entry.getValue();
int position = d.intValue() - 1;
if (p >= position){
responseTime = entry.getKey();
break;
}
}
return responseTime;
}
}
public void accumulate(TPAccum acc, Integer iValue, Integer tp){
acc.tp = tp;
if (acc.map.containsKey(iValue)){
acc.map.put(iValue, acc.map.get(iValue) + 1);
} else {
acc.map.put(iValue, 1);
}
}
}
實際的查詢sql以下:
String sqlSelect =
"select TUMBLE_START(proctime,INTERVAL '1' SECOND) as starttime,mytp(response_time,50) from source" +
" group by TUMBLE(proctime,INTERVAL '1' SECOND)";
完整代碼請參考: https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/function/UdafTP.java
更多內容,歡迎關注個人公衆號【大數據技術與應用實戰】
本文分享自微信公衆號 - 大數據技術與應用實戰(bigdata_bigdata)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。