前言
最近因公司業務需求,須要使用到大數據分析。選擇了flink,第一次據說flink我也是很懵逼的狀態,不過一段時間下來有了一點心得,在這裏和你們分享分享。有不少描述不許確的,你們多提提意見。java
1.flink是什麼,爲何要flink?
其實大數據框架有不少,好比Hadoop(批處理),Storm(流處理),Samza(流處理),Spark...可是咱們選擇的是flink,爲何呢?由於flink是「流式批處理」,flink將每一項視做真正的數據流。Flink提供的DataStream API可用於處理無盡的數據流。Flink可配合使用的基本組件包括:git
說了這麼多,咱們作一個簡單的demo來體驗一下flink:
假設咱們在電商平臺,須要近實時(5min)統計(1h內)商品點擊量的前三名。而後實時展現出來。若是使用java,咱們須要作一個定時任務,監聽商品點擊事件,而後每5min使用sql計算一下...若是數據量小,間隔時間比較長,還比較好,若是數據量大,間隔時間比較短...那服務器的壓力就會賊大...可是使用flink會怎麼樣呢?先看下代碼(40幾W條數據從阿里淘寶獲取,github上):github
/**web
*/
public class HotItems {sql
public static void main(String[] args) throws Exception { // 建立 execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 告訴系統按照 EventTime 處理 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 爲了打印到控制檯的結果不亂序,配置全局的併發爲1,改變併發對結果正確性沒有影響 env.setParallelism(1); // URL fileUrl = HotItems.class.getClassLoader().getResource("D:\\mft\\codes\\flink-learnning\\src\\main\\java\\cn\\crawler\\mft\\UserBehavior.csv"); Path filePath = Path.fromLocalFile(new File("D:\\mft\\codes\\flink-learnning\\src\\main\\java\\cn\\crawler\\mft\\UserBehavior.csv")); // 抽取 UserBehavior 的 TypeInformation,是一個 PojoTypeInfo PojoTypeInfo<UserBehavior> pojoType = (PojoTypeInfo<UserBehavior>) TypeExtractor.createTypeInfo(UserBehavior.class); // 因爲 Java 反射抽取出的字段順序是不肯定的,須要顯式指定下文件中字段的順序 String[] fieldOrder = new String[]{"userId", "itemId", "categoryId", "behavior", "timestamp"}; // 建立 PojoCsvInputFormat PojoCsvInputFormat<UserBehavior> csvInput = new PojoCsvInputFormat<>(filePath, pojoType, fieldOrder); env // 建立數據源,獲得 UserBehavior 類型的 DataStream .createInput(csvInput, pojoType) // 抽取出時間和生成 watermark .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() { @Override public long extractAscendingTimestamp(UserBehavior userBehavior) { // 原始數據單位秒,將其轉成毫秒 return userBehavior.timestamp * 1000; } }) // 過濾出只有點擊的數據 .filter(new FilterFunction<UserBehavior>() { @Override public boolean filter(UserBehavior userBehavior) throws Exception { // 過濾出只有點擊的數據 return userBehavior.behavior.equals("pv"); } }) .keyBy("itemId") .timeWindow(Time.minutes(60), Time.minutes(5)) .aggregate(new CountAgg(), new WindowResultFunction()) .keyBy("windowEnd") .process(new TopNHotItems(3)) .print(); env.execute("Hot Items Job"); } /** 求某個窗口中前 N 名的熱門點擊商品,key 爲窗口時間戳,輸出爲 TopN 的結果字符串 */ public static class TopNHotItems extends KeyedProcessFunction<Tuple, ItemViewCount, String> { private final int topSize; public TopNHotItems(int topSize) { this.topSize = topSize; } // 用於存儲商品與點擊數的狀態,待收齊同一個窗口的數據後,再觸發 TopN 計算 private ListState<ItemViewCount> itemState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); ListStateDescriptor<ItemViewCount> itemsStateDesc = new ListStateDescriptor<>( "itemState-state", ItemViewCount.class); itemState = getRuntimeContext().getListState(itemsStateDesc); } @Override public void processElement( ItemViewCount input, Context context, Collector<String> collector) throws Exception { // 每條數據都保存到狀態中 itemState.add(input); // 註冊 windowEnd+1 的 EventTime Timer, 當觸發時,說明收齊了屬於windowEnd窗口的全部商品數據 context.timerService().registerEventTimeTimer(input.windowEnd + 1); } @Override public void onTimer( long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { // 獲取收到的全部商品點擊量 List<ItemViewCount> allItems = new ArrayList<>(); for (ItemViewCount item : itemState.get()) { allItems.add(item); } // 提早清除狀態中的數據,釋放空間 itemState.clear(); // 按照點擊量從大到小排序 allItems.sort(new Comparator<ItemViewCount>() { @Override public int compare(ItemViewCount o1, ItemViewCount o2) { return (int) (o2.viewCount - o1.viewCount); } }); // 將排名信息格式化成 String, 便於打印 StringBuilder result = new StringBuilder(); result.append("====================================\n"); result.append("時間: ").append(new Timestamp(timestamp-1)).append("\n"); for (int i=0; i<allItems.size() && i < topSize; i++) { ItemViewCount currentItem = allItems.get(i); // No1: 商品ID=12224 瀏覽量=2413 result.append("No").append(i).append(":") .append(" 商品ID=").append(currentItem.itemId) .append(" 瀏覽量=").append(currentItem.viewCount) .append("\n"); } result.append("====================================\n\n"); // 控制輸出頻率,模擬實時滾動結果 Thread.sleep(1000); out.collect(result.toString()); } } /** 用於輸出窗口的結果 */ public static class WindowResultFunction implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> { @Override public void apply( Tuple key, // 窗口的主鍵,即 itemId TimeWindow window, // 窗口 Iterable<Long> aggregateResult, // 聚合函數的結果,即 count 值 Collector<ItemViewCount> collector // 輸出類型爲 ItemViewCount ) throws Exception { Long itemId = ((Tuple1<Long>) key).f0; Long count = aggregateResult.iterator().next(); collector.collect(ItemViewCount.of(itemId, window.getEnd(), count)); } } /** COUNT 統計的聚合函數實現,每出現一條記錄加一 */ public static class CountAgg implements AggregateFunction<UserBehavior, Long, Long> { @Override public Long createAccumulator() { return 0L; } @Override public Long add(UserBehavior userBehavior, Long acc) { return acc + 1; } @Override public Long getResult(Long acc) { return acc; } @Override public Long merge(Long acc1, Long acc2) { return acc1 + acc2; } } /** 商品點擊量(窗口操做的輸出類型) */ public static class ItemViewCount { public long itemId; // 商品ID public long windowEnd; // 窗口結束時間戳 public long viewCount; // 商品的點擊量 public static ItemViewCount of(long itemId, long windowEnd, long viewCount) { ItemViewCount result = new ItemViewCount(); result.itemId = itemId; result.windowEnd = windowEnd; result.viewCount = viewCount; return result; } } /** 用戶行爲數據結構 **/ public static class UserBehavior { public long userId; // 用戶ID public long itemId; // 商品ID public int categoryId; // 商品類目ID public String behavior; // 用戶行爲, 包括("pv", "buy", "cart", "fav") public long timestamp; // 行爲發生的時間戳,單位秒 }
}數據庫
實時模擬的結果:api
==================================== 時間: 2017-11-26 09:05:00.0 No0: 商品ID=5051027 瀏覽量=3 No1: 商品ID=3493253 瀏覽量=3 No2: 商品ID=4261030 瀏覽量=3 ==================================== ==================================== 時間: 2017-11-26 09:10:00.0 No0: 商品ID=812879 瀏覽量=5 No1: 商品ID=2600165 瀏覽量=4 No2: 商品ID=2828948 瀏覽量=4 ==================================== ==================================== 時間: 2017-11-26 09:15:00.0 No0: 商品ID=812879 瀏覽量=7 No1: 商品ID=138964 瀏覽量=5 No2: 商品ID=4568476 瀏覽量=5 ==================================== ==================================== 時間: 2017-11-26 09:20:00.0 No0: 商品ID=812879 瀏覽量=8 No1: 商品ID=2338453 瀏覽量=8 No2: 商品ID=2563440 瀏覽量=7 ====================================
能夠看到,咱們用比較簡單的代碼,就實現了熱點TOP n的問題.可見flink使用起來仍是很方便的(至少比java方便很多)。數組
2.flink這麼強大?爲甚?
從上一個例子裏面,咱們已經初步體會到了flink的方便之處。我想從一下幾個方面解釋一下:服務器
1. 支持多種窗口
1.1 關於flink窗口我手動畫了一個簡單的圖:網絡
1.2flink窗口函數
窗口函數就是這四個:ReduceFunction,AggregateFunction,FoldFunction,ProcessWindowFunction.(固然也能夠自定義window)
3.flink工做流程?
dataSource -> DataTransformation(*) ->dataSink
3.1 登錄監控demo瞭解 dataSource和dataSink
dataSource: 基於本地集合的 source、基於文件的 source、基於網絡套接字的 source、自定義的 source 自定義source: a:flink提供了不少定義好的sourceFunction 好比Kafka,RabbitMq,Mysql... b:StreamExecutionEnvironment.addSource(sourceFunction) 本身寫sourceFunction (實現ParallelSourceFunction / RichParallelSourceFunction ) dataSink: 寫入文件、打印出來、寫入 socket 、自定義的 sink 自定義的sink a:同理,dataSink提供了不少定義好的dataSink... b:自定義dataSink
3.2 DataTransformation(*)
簡單的Transformation示意圖【圖2】 Transformation:數據轉換的各類操做,有 Map / FlatMap / Filter / KeyBy / Reduce / Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select / Project 等, 操做不少,能夠將數據轉換計算成你想要的數據。 hello-demo 注【1】
4.flink在咱們測試環境上集成的demo
1:登錄異地監控 (講清楚架構關係) 2:代理樹
5.flink怎麼發佈?web操做界面簡單介紹。
打jar包,設置參數(併發度,main函數等),上傳
注:
【1】
map就是作一些映射,好比咱們把兩個字符串合併成一個字符串,把一個字符串拆成兩個或者三個字符串。
flatMap相似於把一個記錄拆分紅兩條、三條、甚至是四條記錄,例如把一個字符串分割成一個字符數組。
Filter就相似於過濾。
keyBy就等效於SQL裏的group by。
aggregate是一個聚合操做,如計數、求和、求平均等。
reduce就相似於MapReduce裏的reduce。
join操做就有點相似於咱們數據庫裏面的join。
connect實現把兩個流連成一個流。
repartition是一個從新分區操做(還沒研究)。
project操做就相似於SQL裏面的snacks(還沒研究)
【以上涉及到的代碼,我已經上傳到github上面:https://github.com/iamcrawler...】