參數可使用構造函數或者withParameters(Configuration)方法傳遞,參數將會做爲函數對象的一部分被序列化並傳遞到task實例中!官網地址batchhtml
1 使用構造函數方式apache
DataSet<Integer> toFilter = env.fromElements(1, 2, 3); toFilter.filter(new MyFilter(2)); private static class MyFilter implements FilterFunction<Integer> { private final int limit; public MyFilter(int limit) { this.limit = limit; } @Override public boolean filter(Integer value) throws Exception { return value > limit; } }
2 withParameters(Configuration)方式api
這個方法將會攜帶一個Configuration對象做爲參數,這個參數將會傳遞給Rich Function的open方法(關於Rich Function參見:rich function)。Configuration對象是一個Map,存儲Key/Value鍵值對.ide
DataSet<Integer> toFilter = env.fromElements(1, 2, 3); Configuration config = new Configuration(); config.setInteger("limit", 2); toFilter.filter(new RichFilterFunction<Integer>() { private int limit; @Override public void open(Configuration parameters) throws Exception { limit = parameters.getInteger("limit", 0); } @Override public boolean filter(Integer value) throws Exception { return value > limit; } }).withParameters(config);
3 使用全局的the ExecutionConfig方式函數
參數能夠被全部的rich function得到this
Configuration conf = new Configuration(); conf.setString("mykey","myvalue"); final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(conf); public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> { private String mykey; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); ExecutionConfig.GlobalJobParameters globalParams = getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); Configuration globConf = (Configuration) globalParams; mykey = globConf.getString("mykey", null); } // ... more here ...