public class WordCountExample { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<String> text = env.fromElements( "Who's there?", "I think I hear them. Stand, ho! Who's there?"); DataSet<Tuple2<String, Integer>> wordCounts = text .flatMap(new LineSplitter()) .groupBy(0) .sum(1); wordCounts.print(); } public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String line, Collector<Tuple2<String, Integer>> out) { for (String word : line.split(" ")) { out.collect(new Tuple2<String, Integer>(word, 1)); } } } }
1. 用tuple的index,以下用tuple的第一個和第二個作聯合key編程
DataSet<Tuple3<Integer,String,Long>> input = // [...] DataSet<Tuple3<Integer,String,Long> grouped = input .groupBy(0,1) .reduce(/*do something*/);
2. 對於POJO對象,使用Field Expressionsjson
// some ordinary POJO (Plain old Java Object) public class WC { public String word; public int count; } DataSet<WC> words = // [...] DataSet<WC> wordCounts = words.groupBy("word").reduce(/*do something*/);
3. 使用Key Selector Functionsapi
// some ordinary POJO public class WC {public String word; public int count;} DataSet<WC> words = // [...] DataSet<WC> wordCounts = words .groupBy( new KeySelector<WC, String>() { public String getKey(WC wc) { return wc.word; } }) .reduce(/*do something*/);
1. 實現function interface
class MyMapFunction implements MapFunction<String, Integer> { public Integer map(String value) { return Integer.parseInt(value); } }); (new MyMapFunction());
或使用匿名類, MapFunction<String, Integer> () { public Integer map(String value) { return Integer.parseInt(value); } });
2. 使用Rich functions
Rich functions provide, in addition to the user-defined function (map, reduce, etc), four methods: open
, close
, getRuntimeContext
, and setRuntimeContext
These are useful for parameterizing the function (see Passing Parameters to Functions), creating and finalizing local state, accessing broadcast variables (see Broadcast Variables, and for accessing runtime information such as accumulators and counters (seeAccumulators and Counters, and information on iterations (see Iterations).
Rich functions的使用和普通的function是同樣的,不一樣的就是,多4個接口函數,能夠用於一些特殊的場景,好比給function傳參,或訪問broadcast變量,accumulators和counter,由於這些場景你須要先getRuntimeContext
class MyMapFunction extends RichMapFunction<String, Integer> { public Integer map(String value) { return Integer.parseInt(value); } });
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ExecutionConfig executionConfig = env.getConfig();
/ disableClosureCleaner()
. The closure cleaner is enabled by default. The closure cleaner removes unneeded references to the surrounding class of anonymous functions inside Flink programs. With the closure cleaner disabled, it might happen that an anonymous user function is referencing the surrounding class, which is usually not Serializable. This will lead to exceptions by the serializer.
/ setParallelism(int parallelism)
Set the default parallelism for the job.
/ setExecutionRetryDelay(long executionRetryDelay)
Sets the delay in milliseconds that the system waits after a job has failed, before re-executing it. The delay starts after all tasks have been successfully been stopped on the TaskManagers, and once the delay is past, the tasks are re-started. This parameter is useful to delay re-execution in order to let certain time-out related failures surface fully (like broken connections that have not fully timed out), before attempting a re-execution and immediately failing again due to the same problem. This parameter only has an effect if the number of execution re-tries is one or more. getExecutionMode()
/ setExecutionMode()
. The default execution mode is PIPELINED. Sets the execution mode to execute the program. The execution mode defines whether data exchanges are performed in a batch or on a pipelined manner. enableObjectReuse()
/ disableObjectReuse()
By default, objects are not reused in Flink. Enabling the object reuse mode will instruct the runtime to reuse user objects for better performance. Keep in mind that this can lead to bugs when the user-code function of an operation is not aware of this behavior. enableSysoutLogging()
/ disableSysoutLogging()
JobManager status updates are printed to System.out
by default. This setting allows to disable this behavior. getGlobalJobParameters()
/ setGlobalJobParameters()
This method allows users to set custom objects as a global configuration for the job. Since the ExecutionConfig
is accessible in all user defined functions, this is an easy method for making configuration globally available in a job.
Data sinks consume DataSets and are used to store or return them. Data sink operations are described using an OutputFormat.
能夠custom output format: 好比寫數據庫,
DataSet<Tuple3<String, Integer, Double>> myResult = [...] // write Tuple DataSet to a relational database myResult.output( // build and configure OutputFormat JDBCOutputFormat.buildJDBCOutputFormat() .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") .setDBUrl("jdbc:derby:memory:persons") .setQuery("insert into persons (name, age, height) values (?,?,?)") .finish() );
DataSet<Tuple3<Integer, String, Double>> tData = // [...] DataSet<Tuple2<BookPojo, Double>> pData = // [...] DataSet<String> sData = // [...] // sort output on String field in ascending order tData.print().sortLocalOutput(1, Order.ASCENDING); // sort output on Double field in descending and Integer field in ascending order tData.print().sortLocalOutput(2, Order.DESCENDING).sortLocalOutput(0, Order.ASCENDING);
final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); DataSet<String> lines = env.readTextFile(pathToTextFile); // build your program env.execute();
final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); // Create a DataSet from a list of elements DataSet<Integer> myInts = env.fromElements(1, 2, 3, 4, 5); // Create a DataSet from any Java collection List<Tuple2<String, Integer>> data = ... DataSet<Tuple2<String, Integer>> myTuples = env.fromCollection(data); // Create a DataSet from an Iterator Iterator<Long> longIt = ... DataSet<Long> myLongs = env.fromCollection(longIt, Long.class);
DataSet<Tuple2<String, Integer>> myResult = ... List<Tuple2<String, Integer>> outData = new ArrayList<Tuple2<String, Integer>>(); myResult.output(new LocalCollectionOutputFormat(outData));
Iterations implement loops in Flink programs. The iteration operators encapsulate a part of the program and execute it repeatedly, feeding back the result of one iteration (the partial solution) into the next iteration. There are two types of iterations in Flink: BulkIteration and DeltaIteration.
Semantic annotations can be used to give Flink hints about the behavior of a function.
目的是作性能優化,優化器在明確知道function讀參數的使用狀況,好比若是知道某些field只是作forward,就能夠保留它的sorting or partitioning信息
Forwarded Fields Annotation
@ForwardedFields("f0->f2") public class MyMap implements MapFunction<Tuple2<Integer, Integer>, Tuple3<String, Integer, Integer>> { @Override public Tuple3<String, Integer, Integer> map(Tuple2<Integer, Integer> val) { return new Tuple3<String, Integer, Integer>("foo", val.f1 / 2, val.f0); } }
Non-Forwarded Fields
@NonForwardedFields("f1") // second field is not forwarded public class MyMap implements MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> { @Override public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> val) { return new Tuple2<Integer, Integer>(val.f0, val.f1 / 2); } }
Read Fields
@ReadFields("f0; f3") // f0 and f3 are read and evaluated by the function. public class MyMap implements MapFunction<Tuple4<Integer, Integer, Integer, Integer>, Tuple2<Integer, Integer>> { @Override public Tuple2<Integer, Integer> map(Tuple4<Integer, Integer, Integer, Integer> val) { if(val.f0 == 42) { return new Tuple2<Integer, Integer>(val.f0, val.f1); } else { return new Tuple2<Integer, Integer>(val.f3+10, val.f1); } } }
Broadcast variables allow you to make a data set available to all parallel instances of an operation, in addition to the regular input of the operation. This is useful for auxiliary data sets, or data-dependent parameterization. The data set will then be accessible at the operator as a Collection.
withBroadcastSet(DataSet, String)
, and getRuntimeContext().getBroadcastVariable(String)
at the target operator.// 1. The DataSet to be broadcasted DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3); DataSet<String> data = env.fromElements("a", "b"); RichMapFunction<String, String>() { @Override public void open(Configuration parameters) throws Exception { // 3. Access the broadcasted DataSet as a Collection Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName"); } @Override public String map(String value) throws Exception { ... } }).withBroadcastSet(toBroadcast, "broadcastSetName"); // 2. Broadcast the DataSet
ataSet<Integer> toFilter = env.fromElements(1, 2, 3); toFilter.filter(new MyFilter(2)); private static class MyFilter implements FilterFunction<Integer> { private final int limit; public MyFilter(int limit) { this.limit = limit; } @Override public boolean filter(Integer value) throws Exception { return value > limit; } }
DataSet<Integer> toFilter = env.fromElements(1, 2, 3); Configuration config = new Configuration(); config.setInteger("limit", 2); toFilter.filter(new RichFilterFunction<Integer>() { private int limit; @Override public void open(Configuration parameters) throws Exception { limit = parameters.getInteger("limit", 0); } @Override public boolean filter(Integer value) throws Exception { return value > limit; } }).withParameters(config);
Setting a custom global configuration
Configuration conf = new Configuration(); conf.setString("mykey","myvalue"); final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(conf);
Accessing values from the global configuration
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> { private String mykey; @Override public void open(Configuration parameters) throws Exception {; ExecutionConfig.GlobalJobParameters globalParams = getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); Configuration globConf = (Configuration) globalParams; mykey = globConf.getString("mykey", null); } // ... more here ...
Flink currently has the following built-in accumulators. Each of them implements the Accumulator interface.
//定義和註冊counter private IntCounter numLines = new IntCounter(); getRuntimeContext().addAccumulator("num-lines", this.numLines); //在任意地方進行計數 this.numLines.add(1); //最終取得結果 myJobExecutionResult.getAccumulatorResult("num-lines")
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ... System.out.println(env.getExecutionPlan());
The HTML document containing the visualizer is located undertools/planVisualizer.html
Web Interface
Flink offers a web interface for submitting and executing jobs. If you choose to use this interface to submit your packaged program, you have the option to also see the plan visualization.
The script to start the webinterface is located under bin/
. After starting the webclient (per default on port 8080), your program can be uploaded and will be added to the list of available programs on the left side of the interface.
也能夠經過web interface來提交job和查看執行計劃