flink學習

flink介紹:html

  Apache Flink is an open source platform for distributed stream and batch data processing. Flink’s core is a streaming dataflow engine that provides data distribution, communication, and fault tolerance for distributed computations over data streams. Flink builds batch processing on top of the streaming engine, overlaying native iteration support, managed memory, and program optimization. 是一個用於分佈式流和批處理數據的開源平臺。java

 

flink 1.7 官方文檔:web

  https://ci.apache.org/projects/flink/flink-docs-release-1.7/數據庫

flink Java項目🌰:apache

  https://ci.apache.org/projects/flink/flink-docs-release-1.7/tutorials/datastream_api.htmlbootstrap

 

flink 分爲三層,能夠處理流數據,批數據,能夠運行在本地也能夠運行在雲主機或者yarn。flink的核心爲runtime 數據處理引擎api

數據源: 流數據,批數據服務器

處理方法 runtime的各類接口ssh

數據結果:sink分佈式

 

1、本地安裝使用flink

https://www.jianshu.com/p/17676d34dd35 

一、flink 默認安裝目錄(mac):

  

/usr/local/Cellar/apache-flink/1.7.2/libexec/bin

二、flink 啓動中止

$ ./start-cluster.sh
$ ./stop-cluster.sh

 

三、訪問方式

http://localhost:8081

 

四、flink log文件地址

/usr/local/Cellar/apache-flink/1.7.2/libexec/log

 

五、Java調用flink

  flink調用程序都有如下基礎部分組成

  一、獲取一個運行環境 ExecutionEnvironment

  二、拉取或者建立一個初始的數據集

  三、對數據進行運算處理,轉換操做

  四、指定計算結果的存儲方式

  五、觸發程序執行,(單個運行或者在flink集羣上運行)  

 

  一、先建立flink環境 

  The StreamExecutionEnvironment is the basis for all Flink programs. You can obtain one using these static methods on StreamExecutionEnvironment:

    getExecutionEnvironment()     createLocalEnvironment()     createRemoteEnvironment(String host, int port, String... jarFiles)

  ExecutionEnvironment, StreamExecutionEnvironment,TableEnvironment 三種env

    對應三種環境獲取方式(本地已有環境 getExecutionEnvironment,建立本地環境 createLocalEnvironment,建立遠端環境createRemoteEnvironment)

public static void main(String[] args) {
        ExecutionEnvironment env =  ExecutionEnvironment.getExecutionEnvironment(); // 可根據上下文肯定是批處理仍是流數據
        StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment(); // 獲取一個已有的flink環境
//        StreamExecutionEnvironment scenv = StreamExecutionEnvironment.createLocalEnvironment();  // 新建一個local環境
//        StreamExecutionEnvironment reenv = StreamExecutionEnvironment.createRemoteEnvironment("ss",9000); // 建立一個遠端環境


// https://www.jianshu.com/p/742272e9a347 關係型數據庫流式輸入處理

TableEnvironment tenv = TableEnvironment.getTableEnvironment(env); // 是流處理和批處理通用的關係型API,具體用法 https://www.jianshu.com/p/742272e9a347 }

  

  二、數據源接入

    文件接入:

    kafka:

    dataset(數據庫接入 批數據)  

Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "tens.gss.com:9092"); # 設置
//        public static classSimpleStringGenerator implements SourceFunction<SinkApp>
        System.out.println(properties);
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("ad_test", new SimpleStringSchema(), properties);
        consumer.setStartFromEarliest(); //從最先的數據開始讀取

        DataStream<String> data = senv.addSource(consumer); # 添加源

  三、算子計算

  

data.timeWindowAll(Time.seconds(1)).process(new ProcessAllWindowFunction<String, String, TimeWindow>() {

            @Override
            public void process(Context context, Iterable<String> elements, Collector<String> out) throws Exception {
                System.out.println("test");
                for (String i : elements)

                    out.collect(i);
                System.out.println(out);
            }
        }).print();

   四、數據保存

  

data.addSink();
data.writeUsingOutputFormat();

  用過以上兩種,

  第一種,繼承 RichSinkFunction

doneData.addSink(new WriteHbaseRich()).setParallelism(1); // 重寫RichSinkFunction方法

  第二種,實現接口 OutputFormat

 

dataStream.writeUsingOutputFormat(new WriteHbase()); // 實現OutputFormat接口

 

  具體的實現方法在下一篇 

  

六、分佈式服務運行栗子

  命令添加運行

 ./bin/flink run -c testflink.SocketTextStreamWordCount /your path/IdeaProjects/testFlink/target/original-d-1.0-SNAPSHOT.jar 127.0.0.1 9000

  -c 後面跟的是要具體運行的類,從java包下一層開始,後面跟的是項目jar包,再接 運行參數(void main(String[] args)中須要的參數)

  管理頁面添加運行

    若是運行在服務器上,須要建隧道訪問 flink

ssh -i id_rsa.pub -L 127.0.0.1:7878:flink集羣masterIP:flink集羣master web端口默認8081 跳板機用戶名@跳板機地址

    而後訪問 http://localhost:7878 

    

   若是運行在本地,直接訪問 http://localhost:8081 

相關文章
相關標籤/搜索