1.scala代碼示例java
import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.api.scala._ import org.apache.flink.ml.math.DenseVector /** * @Author: ch * @Date: 25/05/2020 2:55 PM * @Version 1.0 * @Describe: */ object DataStreamIterateTest { /** * 測試迭代流 * @param args */ def main(args: Array[String]): Unit = { // the port to connect to var port = 0 try { ParameterTool.fromArgs(args).getInt("port") } catch { case e: Exception => { port = 9000 } } val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val socketText: DataStream[String] = env.socketTextStream("127.0.0.1", port, '\n') val iterate = socketText .iterate( //stepfunction: initialStream => (feedback, output) stepFunction =>{ val feedback: DataStream[String] = stepFunction.filter(s=>s=="haha").setParallelism(1) feedback.print() val output: DataStream[String] = stepFunction.filter(s=>s!="haha") (feedback,output) } ) .print() env.execute() } }
2.運行socket進行測試,輸入"hehe" "haha",其中"hehe"輸出到output流中,「haha」輸出到feedback流中並不斷迭代。
apache