接着 http://www.javashuo.com/article/p-ydgpuhny-nz.html 這裏用Flink來實現對APP在每一個渠道的推廣狀況包括下載、查看、卸載等等行爲的分析java
由於以前的文章都是用scala寫的,這篇用純java來實現一波,
分別演示下用aggregate 聚合方式和process 方式的實現和效果mysql
總體思路sql
一、準備好數據源: 這裏用SimulatedSource 來本身隨機造一批數據 二、準備數據輸入樣例 `MarketUserBehavior` 和輸出樣例`MarketViewCountResult` 三、準備環境並設置watermark時間,和指定事件時間字段爲timestamp 四、進行過濾:uninstall 的行爲過濾掉(根據實際狀況來改) 五、根據行爲和渠道進行KeyBy統計 六、設置滑動窗口1小時,每10s輸出一次 七、進行聚合輸出
/** * @author mafei * @date 2021/1/9 */ package com.mafei.market; import cn.hutool.core.util.RandomUtil; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import static java.lang.Thread.sleep; /** * APP市場推廣分析 */ /** * 定義一個輸入數據的樣例類 */ class MarketUserBehavior { String userId; String behavior; String channel; Long timestamp; public MarketUserBehavior(String userId, String behavior, String channel, Long timestamp) { this.userId = userId; this.behavior = behavior; this.channel = channel; this.timestamp = timestamp; } } /** * 定義一個輸出數據的類 */ class MarketViewCountResult { Long windowStart; Long windowEnd; String channel; String behavior; Long count; public MarketViewCountResult(Long windowStart, Long windowEnd, String channel, String behavior, Long count) { this.windowStart = windowStart; this.windowEnd = windowEnd; this.channel = channel; this.behavior = behavior; this.count = count; getOutput(); } public void getOutput() { /** * 爲了驗證效果加的 */ StringBuffer stringBuffer = new StringBuffer(); stringBuffer.append("windowsStart: " + windowStart); stringBuffer.append(" windowEnd: " + windowEnd); stringBuffer.append(" channel: " + channel); stringBuffer.append(" behavior: " + behavior); stringBuffer.append(" count: " + count); //爲了驗證效果,追加打印的 System.out.println(stringBuffer.toString()); } } /** * 定義一個產生隨機數據源的類 */ class SimulatedSource extends RichSourceFunction<MarketUserBehavior> { /** * 是否運行的標誌位,主要在cancel 方法中調用 */ Boolean running = true; /** * 定義用戶行爲和渠道的集合 */ String[] userBeahviors = {"view", "download", "install", "uninstall"}; String[] channels = {"dingding", "wexin", "appstore"}; Long maxRunning = 64 * 10000L; Long currentRunningCount = 0L; @Override public void run(SourceContext<MarketUserBehavior> sourceContext) throws Exception { while (running && currentRunningCount < maxRunning) { String channel = RandomUtil.randomEle(channels); String beahvior = RandomUtil.randomEle(userBeahviors); Long timestamp = System.currentTimeMillis() * 1000; String userId = RandomUtil.randomString(20); sourceContext.collect(new MarketUserBehavior(userId, beahvior, channel, timestamp)); currentRunningCount += 1; sleep(100L); } } @Override public void cancel() { running = false; } } public class MarketChannelAnalysis { public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); environment.setParallelism(1); environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); SingleOutputStreamOperator<MarketUserBehavior> dataStream = environment.addSource(new SimulatedSource()) //設置watermark時間爲5秒,而且指定事件時間字段爲timestamp .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MarketUserBehavior>(Time.seconds(5)) { @Override public long extractTimestamp(MarketUserBehavior marketUserBehavior) { return marketUserBehavior.timestamp; } }); DataStreamSink<MarketViewCountResult> result = dataStream .filter(new FilterFunction<MarketUserBehavior>() { @Override public boolean filter(MarketUserBehavior marketUserBehavior) throws Exception { return !marketUserBehavior.behavior.equals("uninstall"); } }) // .keyBy("channel", "behavior") // scala的實現方式 .keyBy(new KeySelector<MarketUserBehavior, Tuple2<String, String>>() { @Override public Tuple2<String, String> getKey(MarketUserBehavior marketUserBehavior) throws Exception { // return new String[]{marketUserBehavior.behavior, marketUserBehavior.channel}; return Tuple2.of(marketUserBehavior.behavior, marketUserBehavior.channel); } }) .timeWindow(Time.hours(1), Time.seconds(10)) //窗口大小是1小時,每10秒輸出一次 .aggregate(new MyMarketChannelAnalysis(), new MyMarketChannelResult()) // .process(new MarkCountByChannel()) //用process方法也能夠實現 .print(); environment.execute(); } } /** * 2種實現思路,用process的時候能夠用這個方法 * process不用每來一條數據都定義怎麼作,而是把對應的數據會放到內存裏面,當窗口結束後進行統一處理,比較耗內存,看實際使用場景 */ class MarkCountByChannel extends ProcessWindowFunction<MarketUserBehavior, MarketViewCountResult, Tuple2<String, String>, TimeWindow> { @Override public void process(Tuple2<String, String> key, Context context, Iterable<MarketUserBehavior> iterable, Collector<MarketViewCountResult> collector) throws Exception { Long startTime = context.window().getStart(); Long endTime = context.window().getEnd(); String channel = key.f1; String behavior = key.f0; Long count = iterable.spliterator().estimateSize(); collector.collect(new MarketViewCountResult(startTime, endTime, channel, behavior, count)); } } /** * 定義聚合函數的具體操做,AggregateFunction 的3個參數: * IN,輸入的數據類型: 輸入已經在源頭定義爲 MarketUserBehavior * ACC,中間狀態的數據類型:由於每次要算count數,因此是Long類型 * OUT,輸出的數據類型:輸出的是統計的次數,因此也是Long類型 */ class MyMarketChannelAnalysis implements AggregateFunction<MarketUserBehavior, Long, Long> { @Override public Long createAccumulator() { /** * 初始化的操做,定義次數爲0 */ return 0L; } @Override public Long add(MarketUserBehavior marketUserBehavior, Long aLong) { /** * 每來一條數據作的操做,這裏直接加1就好了 */ return aLong + 1; } @Override public Long getResult(Long aLong) { /** * 最終輸出時調用的方法 */ return aLong; } @Override public Long merge(Long aLong, Long acc1) { /** * 這裏是多個的時候用到,主要是session window時會使用 */ return aLong + acc1; } } /** * 定義輸出的WindowFunction,要的參數能夠點進去看 * IN:這裏輸入是上一步的輸出窗口內add的數量,因此是Long類型 * OUT:自定義的輸出結構,這裏定義的是一個類,能夠直接改 * KEY:分組的Key,就是keyBy 裏頭定義的Tuple2.of(marketUserBehavior.behavior, marketUserBehavior.channel); * W extends Window:TimeWindow * */ class MyMarketChannelResult implements WindowFunction<Long, MarketViewCountResult, Tuple2<String, String>, TimeWindow> { @Override public void apply(Tuple2<String, String> stringStringTuple2, TimeWindow window, Iterable<Long> input, Collector<MarketViewCountResult> out) { out.collect(new MarketViewCountResult(window.getStart(), window.getEnd(), stringStringTuple2.f1, stringStringTuple2.f0, input.iterator().next())); } }
代碼結構及運行的效果,若是要輸出es、mysql、kafka之類的直接把print換成addSink就能夠了apache