1.scala代碼示例java
import com.streamingkmeans.utils.EuclideanDistanceMeasure import org.apache.flink.api.common.state.{BroadcastState, ListState, ListStateDescriptor, MapStateDescriptor, ReadOnlyBroadcastState} import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.api.scala._ import org.apache.flink.configuration.Configuration import org.apache.flink.ml.math.DenseVector import org.apache.flink.streaming.api.datastream.BroadcastStream import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.util.Collector /** * @Author: ch * @Date: 25/05/2020 2:55 PM * @Version 1.0 * @Describe: */ object MyTest { /** * 測試廣播流 * @param args */ def main(args: Array[String]): Unit = { // the port to connect to var port = 0 try { ParameterTool.fromArgs(args).getInt("port") } catch { case e: Exception => { port = 9000 } } val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val socketText: DataStream[String] = env.socketTextStream("127.0.0.1", port, '\n') val arr1 = DenseVector(Array[Double](1,1,1)) val arr2 = DenseVector(Array[Double](-1,-1,-1)) val arr = Array[DenseVector](arr1,arr2) val k = 2 val centerDetail:Array[Center] = Array.fill(k)(null) for(i <- 0 to k-1){ centerDetail.update(i,Center(i,arr(i),1)) } // centerDetail.foreach(println) val centers: DataStream[Center] = env.fromCollection(centerDetail) //廣播狀態的描述符,廣播流只支持MapState的結構 val broadcastStateDescritor = new MapStateDescriptor[Integer,Center]("centers",classOf[Integer],classOf[Center]) //使用 廣播狀態的描述符 建立 廣播流 val centersBroadcast: BroadcastStream[Center] = centers .broadcast(broadcastStateDescritor) val result: DataStream[String] = socketText .connect(centersBroadcast) .process(new UpdateCenter(k)) result.print() env.execute() } } //定義廣播處理函數,能夠傳遞參數進行 class UpdateCenter(k:Int) extends BroadcastProcessFunction[String,Center,String]{//IN1, IN2, OUT。也就是非廣播流類型,廣播流類型,輸出流類型 //廣播狀態的描述符 private lazy val broadcastStateDescritor = new MapStateDescriptor[Integer,Center]("centers",classOf[Integer],classOf[Center]) //處理廣播流元素,value是傳進來的廣播流元素,經過ctx能夠獲取可修改的廣播狀態 override def processBroadcastElement(value: Center, ctx: BroadcastProcessFunction[String, Center, String]#Context, out: Collector[String]): Unit = { val centers: BroadcastState[Integer, Center] = ctx.getBroadcastState(broadcastStateDescritor) if(centers.contains(value.id)){ centers.remove(value.id) } centers.put(value.id,value)//把廣播流元素添加到廣播狀態中,狀態會保存在本地內存中 } //處理非廣播流元素,value是傳進來的非廣播流元素,經過ctx只能獲取只讀的廣播狀態 override def processElement(value: String, ctx: BroadcastProcessFunction[String, Center, String]#ReadOnlyContext, out: Collector[String]): Unit = { //讀取廣播狀態 val centers: ReadOnlyBroadcastState[Integer, Center] = ctx.getBroadcastState(broadcastStateDescritor) val centersArr: Array[Center] = Array.fill(k)(null) for(i <- 0 to k-1){ val currCenter = centers.get(i) centersArr.update(currCenter.id,currCenter) } out.collect(centersArr.toString)//將須要的結果傳出 } }