Operators transform one or more DataStreams into a new DataStream. java
Operators操做轉換一個或多個DataStream到一個新的DataStream 。ide
object DataStreamTransformationApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment filterFunction(env) env.execute("DataStreamTransformationApp") } def filterFunction(env: StreamExecutionEnvironment): Unit = { val data=env.addSource(new CustomNonParallelSourceFunction) data.map(x=>{ println("received:" + x) x }).filter(_%2 == 0).print().setParallelism(1) } }
數據源選擇以前的任意一個數據源便可。spa
這裏的map中沒有作任何實質性的操做,filter中將全部的數都對2取模操做,打印結果以下:code
received:1 received:2 2 received:3 received:4 4 received:5 received:6 6 received:7 received:8 8
說明map中獲得的全部的數據,而在filter中進行了過濾操做。orm
public static void filterFunction(StreamExecutionEnvironment env) { DataStreamSource<Long> data = env.addSource(new JavaCustomParallelSourceFunction()); data.setParallelism(1).map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { System.out.println("received:"+value); return value; } }).filter(new FilterFunction<Long>() { @Override public boolean filter(Long value) throws Exception { return value % 2==0; } }).print().setParallelism(1); }
須要先使用data.setParallelism(1)而後再進行map操做,不然會輸出屢次。由於咱們用的是JavaCustomParallelSourceFunction(),而當咱們使用JavaCustomNonParallelSourceFunction時,默認就是並行度1,能夠不用設置。get
def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // filterFunction(env) unionFunction(env) env.execute("DataStreamTransformationApp") } def unionFunction(env: StreamExecutionEnvironment): Unit = { val data01 = env.addSource(new CustomNonParallelSourceFunction) val data02 = env.addSource(new CustomNonParallelSourceFunction) data01.union(data02).print().setParallelism(1) }
Union操做將兩個數據集綜合起來,能夠一同處理,上面打印輸出以下:it
1 1 2 2 3 3 4 4
public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); // filterFunction(environment); unionFunction(environment); environment.execute("JavaDataStreamTransformationApp"); } public static void unionFunction(StreamExecutionEnvironment env) { DataStreamSource<Long> data1 = env.addSource(new JavaCustomNonParallelSourceFunction()); DataStreamSource<Long> data2 = env.addSource(new JavaCustomNonParallelSourceFunction()); data1.union(data2).print().setParallelism(1); }
split能夠將一個流拆成多個流,select能夠從多個流中進行選擇處理的流。io
def splitSelectFunction(env: StreamExecutionEnvironment): Unit = { val data = env.addSource(new CustomNonParallelSourceFunction) val split = data.split(new OutputSelector[Long] { override def select(value: Long): lang.Iterable[String] = { val list = new util.ArrayList[String]() if (value % 2 == 0) { list.add("even") } else { list.add("odd") } list } }) split.select("odd","even").print().setParallelism(1) }
能夠根據選擇的名稱來處理數據。function
public static void splitSelectFunction(StreamExecutionEnvironment env) { DataStreamSource<Long> data = env.addSource(new JavaCustomNonParallelSourceFunction()); SplitStream<Long> split = data.split(new OutputSelector<Long>() { @Override public Iterable<String> select(Long value) { List<String> output = new ArrayList<>(); if (value % 2 == 0) { output.add("odd"); } else { output.add("even"); } return output; } }); split.select("odd").print().setParallelism(1); }