flink DataStream BroadcastStream廣播流scala使用示例

1.scala代碼示例java

import com.streamingkmeans.utils.EuclideanDistanceMeasure
import org.apache.flink.api.common.state.{BroadcastState, ListState, ListStateDescriptor, MapStateDescriptor, ReadOnlyBroadcastState}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.ml.math.DenseVector
import org.apache.flink.streaming.api.datastream.BroadcastStream
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
/**
 * @Author: ch
 * @Date: 25/05/2020 2:55 PM
 * @Version 1.0
 * @Describe:
 */
object MyTest {
  /**
   * 測試廣播流
   * @param args
   */
    def main(args: Array[String]): Unit = {
      // the port to connect to
      var port = 0
      try {
        ParameterTool.fromArgs(args).getInt("port")
      } catch {
        case e: Exception => {
          port  = 9000
        }
      }
      val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
      val socketText: DataStream[String] = env.socketTextStream("127.0.0.1", port, '\n')
      val arr1 = DenseVector(Array[Double](1,1,1))
      val arr2 = DenseVector(Array[Double](-1,-1,-1))
      val arr = Array[DenseVector](arr1,arr2)
      val k = 2
      val centerDetail:Array[Center] = Array.fill(k)(null)
      for(i <- 0 to k-1){
        centerDetail.update(i,Center(i,arr(i),1))
      }
//      centerDetail.foreach(println)
      val centers: DataStream[Center] = env.fromCollection(centerDetail)

      //廣播狀態的描述符,廣播流只支持MapState的結構
      val broadcastStateDescritor = new MapStateDescriptor[Integer,Center]("centers",classOf[Integer],classOf[Center])
      //使用 廣播狀態的描述符 建立 廣播流
      val centersBroadcast: BroadcastStream[Center] = centers
          .broadcast(broadcastStateDescritor)

      val result: DataStream[String] = socketText
          .connect(centersBroadcast)
          .process(new UpdateCenter(k))


      result.print()
      env.execute()
    }
}
//定義廣播處理函數,能夠傳遞參數進行
class UpdateCenter(k:Int) extends BroadcastProcessFunction[String,Center,String]{//IN1, IN2, OUT。也就是非廣播流類型,廣播流類型,輸出流類型
  //廣播狀態的描述符
  private lazy val broadcastStateDescritor = new MapStateDescriptor[Integer,Center]("centers",classOf[Integer],classOf[Center])

  //處理廣播流元素,value是傳進來的廣播流元素,經過ctx能夠獲取可修改的廣播狀態
  override def processBroadcastElement(value: Center, ctx: BroadcastProcessFunction[String, Center, String]#Context, out: Collector[String]): Unit = {
    val centers: BroadcastState[Integer, Center] = ctx.getBroadcastState(broadcastStateDescritor)
    if(centers.contains(value.id)){
      centers.remove(value.id)
    }
    centers.put(value.id,value)//把廣播流元素添加到廣播狀態中,狀態會保存在本地內存中
  }

  //處理非廣播流元素,value是傳進來的非廣播流元素,經過ctx只能獲取只讀的廣播狀態
  override def processElement(value: String, ctx: BroadcastProcessFunction[String, Center, String]#ReadOnlyContext, out: Collector[String]): Unit = {
    //讀取廣播狀態
    val centers: ReadOnlyBroadcastState[Integer, Center] = ctx.getBroadcastState(broadcastStateDescritor)
    val centersArr: Array[Center] = Array.fill(k)(null)
    for(i <- 0 to k-1){
      val currCenter = centers.get(i)
      centersArr.update(currCenter.id,currCenter)
    }
    out.collect(centersArr.toString)//將須要的結果傳出
  }
}
相關文章
相關標籤/搜索