使用flink ProcessWindowFunction出現Error type mismatch問題

1.報錯代碼java

val window = 
      .keyBy(0)
      .timeWindow(Time.seconds(5)) //簡寫,具體調用TumblingProcessingTimeWindows仍是TumblingEventTimeWindows取決於配置的時間特性?
      .process(new MyProcessWindowFunction)
      
class MyProcessWindowFunction
  extends ProcessWindowFunction[(Int,DenseVector),DenseVector,Int,TimeWindow] {
  override def process(key: Int,
                       context: Context,
                       elements: Iterable[(Int, DenseVector)],
                       out: Collector[DenseVector]): Unit = {
    val points: Iterable[DenseVector] = elements.map(_._2)
    val sumpoint: DenseVector = points.reduce((p1, p2)=>{
      BLAS.axpy(1.0,p1,p2)
      p2
    })
    out.collect(sumpoint)
  }
}

2.報錯提示apache

Error:(292, 16) type mismatch;
 found   : com.streamingkmeans.MyProcessWindowFunction
 required: org.apache.flink.streaming.api.scala.function.ProcessWindowFunction[(Int, org.apache.flink.ml.math.DenseVector),?,org.apache.flink.api.java.tuple.Tuple,org.apache.flink.streaming.api.windowing.windows.TimeWindow]
      .process(new MyProcessWindowFunction)

3.使用case class替代tuplewindows

val window = centerPoints.map(centerPoint=>IdPoint(centerPoint._1,centerPoint._2))
      .keyBy(_.id)
      .timeWindow(Time.seconds(5)) //簡寫,具體調用TumblingProcessingTimeWindows仍是TumblingEventTimeWindows取決於配置的時間特性?
      .process(new MyProcessWindowFunction2)
      
class MyProcessWindowFunction2
  extends ProcessWindowFunction[IdPoint,DenseVector,Int,TimeWindow] {
  override def process(key: Int,
                       context: Context,
                       elements: Iterable[IdPoint],
                       out: Collector[DenseVector]): Unit = {
    val points: Iterable[DenseVector] = elements.map(_.point)
    val sumpoint: DenseVector = points.reduce((p1, p2)=>{
      BLAS.axpy(1.0,p1,p2)
      p2
    })
    out.collect(sumpoint)
  }
}
case class IdPoint(id:Int,point:DenseVector)

報錯消失
總結:要使用case class,使用keyBy(_.id)的方式,不能使用tuple類型keyBy(0)的形式,不然窗口不知道id的類型(推測)。api

相關文章
相關標籤/搜索