1.報錯代碼java
val window = .keyBy(0) .timeWindow(Time.seconds(5)) //簡寫,具體調用TumblingProcessingTimeWindows仍是TumblingEventTimeWindows取決於配置的時間特性? .process(new MyProcessWindowFunction) class MyProcessWindowFunction extends ProcessWindowFunction[(Int,DenseVector),DenseVector,Int,TimeWindow] { override def process(key: Int, context: Context, elements: Iterable[(Int, DenseVector)], out: Collector[DenseVector]): Unit = { val points: Iterable[DenseVector] = elements.map(_._2) val sumpoint: DenseVector = points.reduce((p1, p2)=>{ BLAS.axpy(1.0,p1,p2) p2 }) out.collect(sumpoint) } }
2.報錯提示apache
Error:(292, 16) type mismatch; found : com.streamingkmeans.MyProcessWindowFunction required: org.apache.flink.streaming.api.scala.function.ProcessWindowFunction[(Int, org.apache.flink.ml.math.DenseVector),?,org.apache.flink.api.java.tuple.Tuple,org.apache.flink.streaming.api.windowing.windows.TimeWindow] .process(new MyProcessWindowFunction)
3.使用case class替代tuplewindows
val window = centerPoints.map(centerPoint=>IdPoint(centerPoint._1,centerPoint._2)) .keyBy(_.id) .timeWindow(Time.seconds(5)) //簡寫,具體調用TumblingProcessingTimeWindows仍是TumblingEventTimeWindows取決於配置的時間特性? .process(new MyProcessWindowFunction2) class MyProcessWindowFunction2 extends ProcessWindowFunction[IdPoint,DenseVector,Int,TimeWindow] { override def process(key: Int, context: Context, elements: Iterable[IdPoint], out: Collector[DenseVector]): Unit = { val points: Iterable[DenseVector] = elements.map(_.point) val sumpoint: DenseVector = points.reduce((p1, p2)=>{ BLAS.axpy(1.0,p1,p2) p2 }) out.collect(sumpoint) } } case class IdPoint(id:Int,point:DenseVector)
報錯消失
總結:要使用case class,使用keyBy(_.id)的方式,不能使用tuple類型keyBy(0)的形式,不然窗口不知道id的類型(推測)。api