Flink DataStream API

1.  API基本概念html

Flink程序能夠對分佈式集合進行轉換(例如: filtering, mapping, updating state, joining, grouping, defining windows, aggregating)java

集合最初是從源建立的(例如,從文件、kafka主題或本地內存集合中讀取)apache

結果經過sink返回,例如,能夠將數據寫入(分佈式)文件,或者寫入標準輸出(例如,命令行終端)windows

根據數據源的類型(有界或無界數據源),能夠編寫批處理程序流處理程序,其中使用DataSet API進行批處理,並使用DataStream API進行流處理api

Flink有特殊的類DataSetDataStream來表示程序中的數據。在DataSet的狀況下,數據是有限的,而對於DataStream,元素的數量能夠是無限的。 app

Flink程序看起來像轉換數據集合的常規程序。每一個程序都包含相同的基本部分:socket

  • 獲取一個執行環境
  • 加載/建立初始數據
  • 指定數據上的轉換
  • 指定計算結果放在哪裏
  • 觸發程序執行

 

爲了方便演示,先建立一個項目,能夠從maven模板建立,例如:async

mvn archetype:generate \
      -DarchetypeGroupId=org.apache.flink \
      -DarchetypeArtifactId=flink-quickstart-java \
      -DarchetypeVersion=1.10.0 \
      -DgroupId=com.cjs.example \
      -DartifactId=flink-quickstart \
      -Dversion=1.0.0-SNAPSHOT \
      -Dpackage=com.cjs.example.flink \
      -DinteractiveMode=false

也能夠直接建立SpringBoot項目,自行引入依賴:maven

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.10.0</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>1.10.0</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
    <version>1.10.0</version>
</dependency>

StreamExecutionEnvironment是全部Flink程序的基礎。你能夠在StreamExecutionEnvironment上使用如下靜態方法得到一個:分佈式

getExecutionEnvironment()

createLocalEnvironment()

createRemoteEnvironment(String host, int port, String... jarFiles)

一般,只須要使用getExecutionEnvironment()便可,由於該方法會根據上下文自動推斷出當前的執行環境

從文件中讀取數據,例如:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = env.readTextFile("file:///path/to/file");

對DataStream應用轉換,例如:

DataStream<String> input = ...;

DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
    @Override
    public Integer map(String value) {
        return Integer.parseInt(value);
    }
});

經過建立一個sink將結果輸出,例如:

writeAsText(String path)

print()

最後,調用StreamExecutionEnvironment上的execute()執行:

//  Triggers the program execution
env.execute();

//  Triggers the program execution asynchronously
final JobClient jobClient = env.executeAsync();
final JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult(userClassloader).get();

下面經過單詞統計的例子來加深對這一流程的理解,WordCount程序之於大數據就至關因而HelloWorld之於Java,哈哈哈

package com.cjs.example.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
 * Map-Reduce思想
 * 先分組,再求和
 * @author ChengJianSheng
 * @date 2020-05-26
 */
public class WordCount {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<String> text = env.readTextFile("/Users/asdf/Desktop/input.txt");
        DataSet<Tuple2<String, Integer>> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                text.flatMap(new Tokenizer())
                        // group by the tuple field "0" and sum up tuple field "1"
                        .groupBy(0)
                        .sum(1);
        
        counts.writeAsCsv("/Users/asdf/Desktop/aaa", "\n", " ");
        env.execute();
    }

    static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
            // normalize and split the line
            String[] tokens = value.toLowerCase().split("\\W+");

            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        }
    }
}

爲Tuple定義keys

Python中也有Tuple(元組)

DataStream<Tuple3<Integer,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)

元組按第一個字段(整數類型的字段)分組

還能夠使用POJO的屬性來定義keys,例如:

// some ordinary POJO (Plain old Java Object)
public class WC {
  public String word;
  public int count;
}
DataStream<WC> words = // [...]
DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);

先來了解一下KeyedStream

所以能夠經過KeySelector方法來自定義

// some ordinary POJO
public class WC {public String word; public int count;}
DataStream<WC> words = // [...]
KeyedStream<WC> keyed = words
  .keyBy(new KeySelector<WC, String>() {
     public String getKey(WC wc) { return wc.word; }
   });

如何指定轉換方法呢?

方式一:匿名內部類

data.map(new MapFunction<String, Integer> () {
    public Integer map(String value) { return Integer.parseInt(value); }
});

方式二:Lamda

data.filter(s -> s.startsWith("http://"));
data.reduce((i1,i2) -> i1 + i2);

2.  DataStream API

下面這個例子,每10秒鐘統計一次來自Web Socket的單詞次數

package com.cjs.example.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class WindowWordCount {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<String, Integer>> dataStream = env.socketTextStream("localhost", 9999)
                .flatMap(new Splitter())
                .keyBy(0)
                .timeWindow(Time.seconds(10))
                .sum(1);

        dataStream.print();

        env.execute("Window WordCount");
    }

    static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
            String[] words = value.split("\\W+");
            for (String word : words) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }
}

爲了運行此程序,首先要在終端啓動一個監聽

nc -lk 9999

 

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/datastream_api.html 

相關文章
相關標籤/搜索