Flink簡介及使用

1、Flink概述
    官網:https://flink.apache.org/
    
    mapreduce-->maxcompute
    HBase-->部門
    quickBI
    DataV
    Hive-->高德地圖
    Storm-->Jstorm
    ......
    
    2019年1月,阿里正式開源flink-->blink
    
    Apache Flink是一個框架和分佈式處理引擎,用於對無界和有界數據流進行有狀態計算。
    Flink設 計爲在全部常見的集羣環境中運行,之內存速度和任何規模執行計算。

    大數據計算框架
    
2、Flink特色
    1)mapreduce
    2)storm
    3)spark
    
    適用於全部企業,不一樣企業有不一樣的業務場景。處理數據量,模型都不同。
    
    1)隨機部署應用
    以其餘組件集成!
    flink是分佈式系統,須要計算資源纔可執行程序。flink能夠與常見的集羣資源管理器進行集成(Hadoop Yarn,Apache Mesos...)。
    能夠單獨做爲獨立集羣運行。
    經過不一樣部署模式實現。
    這些模式容許flink以其慣有的方式進行交互。
    當咱們部署flink應用程序時,Flink會根據應用程序配置的並行性自動識別所需資源。從資源管理器中請求它們。
    若是發生故障,flink會請求新的資源來替換髮生故障的容器。
    提交或控制程序都經過REST調用進行,簡化Flink在許多環境的集成。孵化...
    
    2)以任何比例應用程序(小集羣、無限集羣)
    Flink旨在以任何規模運行有狀態流應用程序。應用程序能夠並行化在集羣中分佈和同時執行程序。
    所以,咱們的應用集羣能夠利用無限的cpu和磁盤與網絡IO。
    Flink能夠輕鬆的維護很是大的應用程序狀態。
    用戶可拓展性報告:
        1)應用程序天天能夠處理萬億個事件
        2)應用程序天天能夠維護多個TB的狀態
        3)應用程序能夠在數千個內核運行
        
    3)利用內存中的性能
    有狀態Flink應用程序針對於對本地狀態訪問進行了優化。任務狀態始終的保留在內存中,或者若是
    大小超過了可用內存,則保存在訪問高效的磁盤數據結構中(SSD 機械/固態)。
    任務能夠經過訪問本地來執行全部計算。歷來產生極小的延遲。
    Flink按期和異步檢查本地狀態持久存儲來保持出現故障時一次狀態的一致性。

3、有界無界
    1)無界
    有開始,沒有結束...
    處理實時數據。
    2)有界
    有開始,有結束...
    處理批量數據。
    
4、無界數據集應用場景(實時計算)
    1)源源不斷的日誌數據
    2)web應用,指標分析
    3)移動設備終端(分析app情況)
    4)應用在任何數據源不斷產生的項目中
    
5、Flink運行模型
    1)流計算
    數據源源不斷產生,咱們的需求是源源不斷的處理。程序須要一直保持在計算的狀態。
    2)批處理
    計算一段完整的數據集,計算成功後釋放資源,那麼此時工做結束。

6、Flink的使用
    1)處理結果準確:不管是有序數據仍是延遲到達的數據。
    2)容錯機制:
    有狀態:保持每次的結果往下傳遞,實現累加。DAG(有向無環圖)。
    3)有很強大的吞吐量和低延遲。
    計算速度快,吞吐量處理的量級大。
    4)精準的維護一次的應用狀態。
    storm:會發生要麼多計算一次,要麼漏計算。
    5)支持大規模的計算
    能夠運行在數千臺節點上。
    6)支持流處理和窗口化操做
    7)版本化處理
    8)檢查點機制實現精準的一次性計算保證
    checkpoint
    9)支持yarn與mesos資源管理器

7、flink單節點安裝部署
    1)下載安裝包
    https://archive.apache.org/dist/flink/flink-1.6.2/flink-1.6.2-bin-hadoop28-scala_2.11.tgz
    2)上傳安裝包到/root下
    
    3)解壓
    cd /root
    tar -zxvf flink-1.6.2-bin-hadoop28-scala_2.11.tgz -C hd
    
    4)啓動
    cd /root/hd/flink-1.6.2
    bin/start-cluster.sh
    
    5)啓動
    cd /root/hd/flink-1.6.2
    bin/stop-cluster.sh
    
    6)訪問ui界面
    http://192.168.146.132:8081
    
8、flink集羣安裝部署
    1)下載安裝包
    https://archive.apache.org/dist/flink/flink-1.6.2/flink-1.6.2-bin-hadoop28-scala_2.11.tgz
    2)上傳安裝包到/root下
    
    3)解壓
    cd /root
    tar -zxvf flink-1.6.2-bin-hadoop28-scala_2.11.tgz -C hd
    
    4)修改配置文件
    vi flink-conf.yaml
    第33行修改成:
    jobmanager.rpc.address: hd09-1
    
    5)修改slaves
    vi slaves
    hd09-2
    hd09-3
    
    6)分發flink到其餘機器
    cd /root/hd
    scp -r flink-1.6.2/ hd09-2:$PWD
    scp -r flink-1.6.2/ hd09-3:$PWD

    7)啓動集羣
    cd /root/hd/flink-1.6.2
    bin/start-cluster.sh
    
    8)關閉集羣
    cd /root/hd/flink-1.6.2
    bin/stop-cluster.sh
    
    9)訪問ui界面
    http://192.168.146.132:8081  

9、flink結構java

10、WordCount簡單實現web

  需求:實時的wordcount
  往端口中發送數據,實時的計算數據apache

一、SocketWordCount類api

package com.demo.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * 需求:實時的wordcount
 * 往端口中發送數據,實時的計算數據
 */
public class SocketWordCount {
    public static void main(String[] args) throws Exception {
        //1.定義鏈接端口
        final int port = 9999;
        //2.建立執行環境對象
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //3.獲得套接字對象(指定:主機、端口、分隔符)
        DataStreamSource<String> text = env.socketTextStream("192.168.146.132", port, "\n");

        //4.解析數據,統計數據-單詞計數 hello lz hello world
        DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
            public void flatMap(String s, Collector<WordWithCount> collector){
                //按照空白符進行切割
                for (String word : s.split("\\s")) {
                    //<單詞,1>
                    collector.collect(new WordWithCount(word, 1L));
                }
            }
        })
        //按照key進行分組
        .keyBy("word")
        //設置窗口的時間長度 5秒一次窗口 1秒計算一次
        .timeWindow(Time.seconds(5), Time.seconds(1))
        //聚合,聚合函數
        .reduce(new ReduceFunction<WordWithCount>() {
            public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {
                //按照key聚合
                return new WordWithCount(a.word, a.count + b.count);
            }
        });

        //5.打印能夠設置併發度
        windowCounts.print().setParallelism(1);

        //6.執行程序
        env.execute("Socket window WordCount");
    }

    public static class WordWithCount {
        public String word;
        public long count;

        public WordWithCount() {

        }

        public WordWithCount(String word, long count){
            this.word = word;
            this.count = count;
        }

        public String toString(){
            return word + " : " + count;
        }
    }
}

二、flink的maven依賴服務器

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.6.2</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>1.6.2</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>1.6.2</version>
</dependency>

三、運行SocketWordCount類的main方法網絡

四、服務器安裝netcat數據結構

// 安裝netcat
yum install -y nc

// 使用nc,其中9999是SocketWordCount類中定義的端口號 nc
-lk -p 9999

五、此時在服務器的nc下輸入單詞後,SocketWordCount的main方法會時時監控到該單詞並進行計算處理。併發

六、也能夠把SocketWordCount程序打成jar包放置到服務器上,執行app

[root@hd09-1 flink-1.6.2]# bin/flink run -c com.demo.flink.SocketWordCount /root/FlinkTest-1.0-SNAPSHOT.jar

啓動WordCount計算程序,此時結果會寫到/root/hd/flink-1.6.2/log下的flink-root-taskexecutor-0-hd09-1.out文件中。框架

相關文章
相關標籤/搜索