15-Flink實戰項目之實時熱銷排行

戳更多文章:

1-Flink入門java

2-本地環境搭建&構建第一個Flink應用程序員

3-DataSet API面試

4-DataSteam APIredis

5-集羣部署sql

6-分佈式緩存bootstrap

7-重啓策略c#

8-Flink中的窗口緩存

9-Flink中的Timeapp

Flink時間戳和水印框架

Broadcast廣播變量

FlinkTable&SQL

Flink實戰項目實時熱銷排行

Flink寫入RedisSink

17-Flink消費Kafka寫入Mysql

需求

某個圖書網站,但願看到雙十一秒殺期間實時的熱銷排行榜單。咱們能夠將「實時熱門商品」翻譯成程序員更好理解的需求:每隔5秒鐘輸出最近一小時內點擊量最多的前 N 個商品/圖書.

需求分解

將這個需求進行分解咱們大概要作這麼幾件事情:

  • 告訴 Flink 框架基於時間作窗口,咱們這裏用processingTime,不用自帶時間戳
  • 過濾出圖書點擊行爲數據
  • 按一小時的窗口大小,每5秒鐘統計一次,作滑動窗口聚合(Sliding Window)
  • 聚合,輸出窗口中點擊量前N名的商品

代碼實現

向Kafka發消息模擬購買事件

public class KafkaProducer { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> text = env.addSource(new MyNoParalleSource()).setParallelism(1); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "127.0.0.1:9092"); //new FlinkKafkaProducer("topn",new KeyedSerializationSchemaWrapper(new SimpleStringSchema()),properties,FlinkKafkaProducer.Semantic.EXACTLY_ONCE); FlinkKafkaProducer<String> producer = new FlinkKafkaProducer("topn",new SimpleStringSchema(),properties); /* //event-timestamp事件的發生時間 producer.setWriteTimestampToKafka(true); */ text.addSink(producer); env.execute(); } }// 

其中的:MyNoParalleSource 是做者本身實現的一個並行度爲1的發送器,用來向kafka發送數據:

public class MyNoParalleSource implements SourceFunction<String> {//1 //private long count = 1L; private boolean isRunning = true; /** * 主要的方法 * 啓動一個source * 大部分狀況下,都須要在這個run方法中實現一個循環,這樣就能夠循環產生數據了 * * @param ctx * @throws Exception */ @Override public void run(SourceContext<String> ctx) throws Exception { while(isRunning){ //圖書的排行榜 List<String> books = new ArrayList<>(); books.add("Pyhton從入門到放棄");//10 books.add("Java從入門到放棄");//8 books.add("Php從入門到放棄");//5 books.add("C++從入門到放棄");//3 books.add("Scala從入門到放棄");//0-4 int i = new Random().nextInt(5); ctx.collect(books.get(i)); //每1秒產生一條數據 Thread.sleep(1000); } } //取消一個cancel的時候會調用的方法 @Override public void cancel() { isRunning = false; } } 

可見,咱們每過1秒向Kafka的topn這個topic隨機發送一本書的名字用來模擬購買行爲。

總體實現代碼以下:

public class TopN { public static void main(String[] args) throws Exception{ /** * * 書1 書2 書3 * (書1,1) (書2,1) (書3,1) * * */ //每隔5秒鐘 計算過去1小時 的 Top 3 商品 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //以processtime做爲時間語義 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "127.0.0.1:9092"); FlinkKafkaConsumer<String> input = new FlinkKafkaConsumer<>("topn", new SimpleStringSchema(), properties); //從最先開始消費 位點 input.setStartFromEarliest(); DataStream<String> stream = env .addSource(input); DataStream<Tuple2<String, Integer>> ds = stream .flatMap(new LineSplitter()); //將輸入語句split成一個一個單詞並初始化count值爲1的Tuple2<String, Integer>類型 DataStream<Tuple2<String, Integer>> wcount = ds .keyBy(0) .window(SlidingProcessingTimeWindows.of(Time.seconds(600),Time.seconds(5))) //key以後的元素進入一個總時間長度爲600s,每5s向後滑動一次的滑動窗口 .sum(1);// 將相同的key的元素第二個count值相加 wcount .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))//(shu1, xx) (shu2,xx).... //全部key元素進入一個5s長的窗口(選5秒是由於上游窗口每5s計算一輪數據,topN窗口一次計算只統計一個窗口時間內的變化) .process(new TopNAllFunction(3)) .print(); //redis sink redis -> 接口 env.execute(); }// private static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { // normalize and split the line //String[] tokens = value.toLowerCase().split("\\W+"); // emit the pairs /*for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } }*/ //(書1,1) (書2,1) (書3,1) out.collect(new Tuple2<String, Integer>(value, 1)); } } private static class TopNAllFunction extends ProcessAllWindowFunction<Tuple2<String, Integer>, String, TimeWindow> { private int topSize = 3; public TopNAllFunction(int topSize) { this.topSize = topSize; } public void process( ProcessAllWindowFunction<Tuple2<String, Integer>, String, TimeWindow>.Context arg0, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception { TreeMap<Integer, Tuple2<String, Integer>> treemap = new TreeMap<Integer, Tuple2<String, Integer>>( new Comparator<Integer>() { @Override public int compare(Integer y, Integer x) { return (x < y) ? -1 : 1; } }); //treemap按照key降序排列,相同count值不覆蓋 for (Tuple2<String, Integer> element : input) { treemap.put(element.f1, element); if (treemap.size() > topSize) { //只保留前面TopN個元素 treemap.pollLastEntry(); } } for (Map.Entry<Integer, Tuple2<String, Integer>> entry : treemap .entrySet()) { out.collect("=================\n熱銷圖書列表:\n"+ new Timestamp(System.currentTimeMillis()) + treemap.toString() + "\n===============\n"); } } } }// 

查看輸出:

=================
熱銷圖書列表:
2019-03-05 22:32:40.004{8=(Java從入門到放棄,8), 7=(C++從入門到放棄,7), 5=(Php從入門到放棄,5)}
===============
=================
熱銷圖書列表:
2019-03-05 22:32:45.004{8=(Java從入門到放棄,8), 7=(C++從入門到放棄,7), 5=(Php從入門到放棄,5)}
===============

全部代碼,我放在了個人公衆號,回覆Flink能夠下載

  • 海量【java和大數據的面試題+視頻資料】整理在公衆號,關注後能夠下載~
  • 更多大數據技術歡迎和做者一塊兒探討~
 
image
相關文章
相關標籤/搜索