code
val linkPairSum = F.udf(
(list:List[Map[Long,Int]]) => {
var map = Map[Long,Int]()
for(m <- list){
if(m != null){
println("----")
println(m)
map = map ++ m.map(t => t._1 -> (t._2 + map.getOrElse(t._1, 0)))
println("====")
println(map)
}
}
map
}
)
val sum = all.groupBy("window", "mapVersion", "linkId")
.agg(F.sum("passthrough").alias("passthrough"),F.sum("resident").alias("resident"),F.first("driverId").alias("driverId"),
linkPairSum(F.collect_list("inLink")).alias("inLink"),linkPairSum(F.collect_list("outLink")).alias("outLink")
).as[PassThroughFeature]
報錯
Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [Lscala.collection.immutable.Map;
緣由
So it looks like the ArrayType on Dataframe "idDF" is really a WrappedArray and not an Array - So the function call to "filterMapKeysWithSet" failed as it expected an Array but got a WrappedArray/ Seq instead (which doesn't implicitly convert to Array in Scala 2.8 and above).
修改
val linkPairSum = F.udf(
(list:List[Map[Long,Int]]) => {
var map = Map[Long,Int]()
for(m <- list){
if(m != null){
println("----")
println(m)
map = map ++ m.map(t => t._1 -> (t._2 + map.getOrElse(t._1, 0)))
println("====")
println(map)
}
}
map
}
)
修改成:
val linkPairSum = F.udf(
(list:Seq[Map[Long,Int]]) => {
var map = Map[Long,Int]()
for(m <- list){
if(m != null){
println("----")
println(m)
map = map ++ m.map(t => t._1 -> (t._2 + map.getOrElse(t._1, 0)))
println("====")
println(map)
}
}
map
}
)