Flink DataSet API Programming Guide

 https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.htmlhtml

 

Example Program

編程的風格和spark很相似,java

ExecutionEnvironment  -- SparkContextgit

DataSet – RDDgithub

Transformationsweb

這裏用Java的接口,因此傳入function須要用FlatMapFunction類對象數據庫

 

public class WordCountExample {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<String> text = env.fromElements(
            "Who's there?",
            "I think I hear them. Stand, ho! Who's there?");

        DataSet<Tuple2<String, Integer>> wordCounts = text
            .flatMap(new LineSplitter())
            .groupBy(0)
            .sum(1);

        wordCounts.print();
    }

    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
            for (String word : line.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }
}

 

Specifying Keys

如何定義key,apache

1. 用tuple的index,以下用tuple的第一個和第二個作聯合key編程

DataSet<Tuple3<Integer,String,Long>> input = // [...]
DataSet<Tuple3<Integer,String,Long> grouped = input
    .groupBy(0,1)
    .reduce(/*do something*/);

 

2. 對於POJO對象,使用Field Expressionsjson

// some ordinary POJO (Plain old Java Object)
public class WC {
  public String word; 
  public int count;
}
DataSet<WC> words = // [...]
DataSet<WC> wordCounts = words.groupBy("word").reduce(/*do something*/);

 

3. 使用Key Selector Functionsapi

// some ordinary POJO
public class WC {public String word; public int count;}
DataSet<WC> words = // [...]
DataSet<WC> wordCounts = words
                         .groupBy(
                           new KeySelector<WC, String>() {
                             public String getKey(WC wc) { return wc.word; }
                           })
                         .reduce(/*do something*/);

 

Passing Functions to Flink

1. 實現function interface

class MyMapFunction implements MapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
});
data.map (new MyMapFunction());

或使用匿名類,

data.map(new MapFunction<String, Integer> () {
  public Integer map(String value) { return Integer.parseInt(value); }
});

 

2. 使用Rich functions

Rich functions provide, in addition to the user-defined function (map, reduce, etc), four methods: open, close, getRuntimeContext, and setRuntimeContext.

These are useful for parameterizing the function (see Passing Parameters to Functions), creating and finalizing local state, accessing broadcast variables (see Broadcast Variables, and for accessing runtime information such as accumulators and counters (seeAccumulators and Counters, and information on iterations (see Iterations).

Rich functions的使用和普通的function是同樣的,不一樣的就是,多4個接口函數,能夠用於一些特殊的場景,好比給function傳參,或訪問broadcast變量,accumulators和counter,由於這些場景你須要先getRuntimeContext

class MyMapFunction extends RichMapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
});

 

Execution Configuration

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ExecutionConfig executionConfig = env.getConfig();
  • enableClosureCleaner() / disableClosureCleaner(). The closure cleaner is enabled by default. The closure cleaner removes unneeded references to the surrounding class of anonymous functions inside Flink programs. With the closure cleaner disabled, it might happen that an anonymous user function is referencing the surrounding class, which is usually not Serializable. This will lead to exceptions by the serializer.
    對於Java,好比傳入function也要生成function對象,這樣裏面的function是會reference這個對象的,其實這種狀況,你須要的只是function邏輯,因此closureCleaner會去掉這個reference
    這樣的好處是,傳輸類對象時候,是要求對象可序列化的,若是每一個去實現序列號接口很麻煩,不實現又會報錯,因此這裏乾脆clean掉這個reference

  • getParallelism() / setParallelism(int parallelism) Set the default parallelism for the job.
    設置Job的全局的parallelism

  • getExecutionRetryDelay() / setExecutionRetryDelay(long executionRetryDelay) Sets the delay in milliseconds that the system waits after a job has failed, before re-executing it. The delay starts after all tasks have been successfully been stopped on the TaskManagers, and once the delay is past, the tasks are re-started. This parameter is useful to delay re-execution in order to let certain time-out related failures surface fully (like broken connections that have not fully timed out), before attempting a re-execution and immediately failing again due to the same problem. This parameter only has an effect if the number of execution re-tries is one or more.
    getExecutionMode() / setExecutionMode(). The default execution mode is PIPELINED. Sets the execution mode to execute the program. The execution mode defines whether data exchanges are performed in a batch or on a pipelined manner.
    和失敗重試相關的配置
  • enableObjectReuse() / disableObjectReuse() By default, objects are not reused in Flink. Enabling the object reuse mode will instruct the runtime to reuse user objects for better performance. Keep in mind that this can lead to bugs when the user-code function of an operation is not aware of this behavior.
    這個因爲Java什麼都要生成對象,好比function,因此會生成大量重複對象,這個能夠打開object重用,提升性能
  • enableSysoutLogging() / disableSysoutLogging() JobManager status updates are printed to System.out by default. This setting allows to disable this behavior.
    打開和關閉系統日誌
  • getGlobalJobParameters() / setGlobalJobParameters() This method allows users to set custom objects as a global configuration for the job. Since the ExecutionConfig is accessible in all user defined functions, this is an easy method for making configuration globally available in a job.
    能夠設置Job全局參數

  • 其餘的參數都是序列化相關的,不列了

 

Data Sinks

Data sinks consume DataSets and are used to store or return them. Data sink operations are described using an OutputFormat.

能夠custom output format: 好比寫數據庫,

DataSet<Tuple3<String, Integer, Double>> myResult = [...]

// write Tuple DataSet to a relational database
myResult.output(
    // build and configure OutputFormat
    JDBCOutputFormat.buildJDBCOutputFormat()
                    .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
                    .setDBUrl("jdbc:derby:memory:persons")
                    .setQuery("insert into persons (name, age, height) values (?,?,?)")
                    .finish()
    );

 

還有個功能,能夠作locally的排序,

DataSet<Tuple3<Integer, String, Double>> tData = // [...]
DataSet<Tuple2<BookPojo, Double>> pData = // [...]
DataSet<String> sData = // [...]

// sort output on String field in ascending order
tData.print().sortLocalOutput(1, Order.ASCENDING);

// sort output on Double field in descending and Integer field in ascending order
tData.print().sortLocalOutput(2, Order.DESCENDING).sortLocalOutput(0, Order.ASCENDING);

 

Debugging

本地執行,LocalEnvironement

final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();

DataSet<String> lines = env.readTextFile(pathToTextFile);
// build your program

env.execute();

 

便於調式的datasouce,

final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();

// Create a DataSet from a list of elements
DataSet<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);

// Create a DataSet from any Java collection
List<Tuple2<String, Integer>> data = ...
DataSet<Tuple2<String, Integer>> myTuples = env.fromCollection(data);

// Create a DataSet from an Iterator
Iterator<Long> longIt = ...
DataSet<Long> myLongs = env.fromCollection(longIt, Long.class);

 

便於輸出的datasink,

DataSet<Tuple2<String, Integer>> myResult = ...

List<Tuple2<String, Integer>> outData = new ArrayList<Tuple2<String, Integer>>();
myResult.output(new LocalCollectionOutputFormat(outData));

 

Iteration Operators

Iterations implement loops in Flink programs. The iteration operators encapsulate a part of the program and execute it repeatedly, feeding back the result of one iteration (the partial solution) into the next iteration. There are two types of iterations in Flink: BulkIteration and DeltaIteration.

參考, https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/iterations.html

BulkIteration就是正常的Iteration,每次都處理全量數據

image

DeltaIteration,就是每次都只處理部分數據並delta更新,效率更高

image 

 

Semantic Annotations

Semantic annotations can be used to give Flink hints about the behavior of a function.

目的是作性能優化,優化器在明確知道function讀參數的使用狀況,好比若是知道某些field只是作forward,就能夠保留它的sorting or partitioning信息

有3種語義annotation,

Forwarded Fields Annotation

表示,輸入的某個field會原封不動的copy到輸出的某個field

下面的例子,表示輸入的第一個field會copy到輸出的第3個field
能夠看到,輸出tuple的第三個field是val.f0

@ForwardedFields("f0->f2")
public class MyMap implements 
              MapFunction<Tuple2<Integer, Integer>, Tuple3<String, Integer, Integer>> {
  @Override
  public Tuple3<String, Integer, Integer> map(Tuple2<Integer, Integer> val) {
    return new Tuple3<String, Integer, Integer>("foo", val.f1 / 2, val.f0);
  }
}

 

Non-Forwarded Fields

和上面相反,除指定的fields,其餘fields都是原位置copy

例子,除輸入的第二個field,其餘都是原位置copy

@NonForwardedFields("f1") // second field is not forwarded
public class MyMap implements 
              MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
  @Override
  public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> val) {
    return new Tuple2<Integer, Integer>(val.f0, val.f1 / 2);
  }
}

 

Read Fields

代表這個fields會在function被讀到或用到,

代表,輸入的第一個field和第4個field會被讀到或用到

@ReadFields("f0; f3") // f0 and f3 are read and evaluated by the function. 
public class MyMap implements 
              MapFunction<Tuple4<Integer, Integer, Integer, Integer>, 
                          Tuple2<Integer, Integer>> {
  @Override
  public Tuple2<Integer, Integer> map(Tuple4<Integer, Integer, Integer, Integer> val) {
    if(val.f0 == 42) {
      return new Tuple2<Integer, Integer>(val.f0, val.f1);
    } else {
      return new Tuple2<Integer, Integer>(val.f3+10, val.f1);
    }
  }
}

 

Broadcast Variables

Broadcast variables allow you to make a data set available to all parallel instances of an operation, in addition to the regular input of the operation. This is useful for auxiliary data sets, or data-dependent parameterization. The data set will then be accessible at the operator as a Collection.

  • Broadcast: broadcast sets are registered by name via withBroadcastSet(DataSet, String), and
  • Access: accessible via getRuntimeContext().getBroadcastVariable(String) at the target operator.
// 1. The DataSet to be broadcasted
DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3);

DataSet<String> data = env.fromElements("a", "b");

data.map(new RichMapFunction<String, String>() {
    @Override
    public void open(Configuration parameters) throws Exception {
      // 3. Access the broadcasted DataSet as a Collection
      Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
    }


    @Override
    public String map(String value) throws Exception {
        ...
    }
}).withBroadcastSet(toBroadcast, "broadcastSetName"); // 2. Broadcast the DataSet

這個場景,就是有些不大的公共數據,是要被全部的實例訪問到的,好比一些查詢表

上面的例子,會將toBroadcast設置爲廣播變量broadcastSetName,這樣在運行時,能夠用getRuntimeContext().getBroadcastVariable獲取該變量使用

 

Passing Parameters to Functions

應該是若是將參數傳遞給function類,這個徹底由java冗餘致使的

首先,固然能夠用類構造函數來傳參數,

ataSet<Integer> toFilter = env.fromElements(1, 2, 3);

toFilter.filter(new MyFilter(2));

private static class MyFilter implements FilterFunction<Integer> {

  private final int limit;

  public MyFilter(int limit) {
    this.limit = limit;
  }

  @Override
  public boolean filter(Integer value) throws Exception {
    return value > limit;
  }
}

自定義MyFilter,構造函數能夠傳入limit

 

也可使用withParameters(Configuration)

DataSet<Integer> toFilter = env.fromElements(1, 2, 3);

Configuration config = new Configuration();
config.setInteger("limit", 2);

toFilter.filter(new RichFilterFunction<Integer>() {
    private int limit;

    @Override
    public void open(Configuration parameters) throws Exception {
      limit = parameters.getInteger("limit", 0);
    }

    @Override
    public boolean filter(Integer value) throws Exception {
      return value > limit;
    }
}).withParameters(config);

能夠用withParameters將定義好的config傳入function

而後用RichFunction的Open接口,將參數解析出來使用

這樣和上面的比有啥好處,我怎麼以爲上面那個看着更方便些?能夠用匿名類?

 

固然你也能夠用全局參數,這個和廣播變量有什麼區別?相同點就是都是全局可見,全局參數只能用於參數形式,廣播變量能夠是任意dataset

Setting a custom global configuration

Configuration conf = new Configuration();
conf.setString("mykey","myvalue");
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(conf);

Accessing values from the global configuration

public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {

    private String mykey;
    @Override
    public void open(Configuration parameters) throws Exception {
      super.open(parameters);
      ExecutionConfig.GlobalJobParameters globalParams = getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
      Configuration globConf = (Configuration) globalParams;
      mykey = globConf.getString("mykey", null);
    }
    // ... more here ...

 

Accumulators & Counters

用於分佈式計數,job結束的時候,會所有彙總

Flink currently has the following built-in accumulators. Each of them implements the Accumulator interface.

  • IntCounter, LongCounter and DoubleCounter: See below for an example using a counter.
  • Histogram: A histogram implementation for a discrete number of bins. Internally it is just a map from Integer to Integer. You can use this to compute distributions of values, e.g. the distribution of words-per-line for a word count program.
//定義和註冊counter
private IntCounter numLines = new IntCounter();
getRuntimeContext().addAccumulator("num-lines", this.numLines);

//在任意地方進行計數
this.numLines.add(1);

//最終取得結果
myJobExecutionResult.getAccumulatorResult("num-lines")

 

Execution Plans

首先能夠打印出執行plan,json格式,

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

...

System.out.println(env.getExecutionPlan());

 

打開這個網頁,

The HTML document containing the visualizer is located undertools/planVisualizer.html.

 

將Json貼入,就能夠看到執行計劃,

 

Web Interface

Flink offers a web interface for submitting and executing jobs. If you choose to use this interface to submit your packaged program, you have the option to also see the plan visualization.

The script to start the webinterface is located under bin/start-webclient.sh. After starting the webclient (per default on port 8080), your program can be uploaded and will be added to the list of available programs on the left side of the interface.

也能夠經過web interface來提交job和查看執行計劃

相關文章
相關標籤/搜索