相似於spark statCounter類的東西,處理缺失值java
import org.apache.spark.util.StatCounter class NAStatCounter extends Serializable { val stats:StatCounter =new StatCounter() var missing:Long=0 def add(x:Double):NAStatCounter={ if(java.lang.Double.isNaN(x)){ missing += 1 }else{ stats.merge(x) } this } def merge(other:NAStatCounter):NAStatCounter={ stats.merge(other.stats) missing+=other.missing this } override def toString: String = { "stats:"+stats.toString()+" NaN "+missing } } object NAStatCounter extends Serializable{ def apply(x:Double) = new NAStatCounter().add(x) }
編譯:數據庫
調用以前的方法parse:apache
val nas1=Array(1.0,Double.NaN).map(d=>NAStatCounter(d)) val nas2=Array(Double.NaN,2.0).map(d=>NAStatCounter(d)) //經過拉鍊進行聚合 val mergerd = nas1.zip(nas2).map(p=>p._1.merge(p._2))
集合之間作reduce數組
val nas=List(nas1,nas2) val mergerd=mas.reduce((n1,n2)=>{ n1.zip(n2).map{ case (a,b)=>a.merge(b) } })
/* 與map方法相似,map是對rdd中的每個元素進行操做,而mapPartitions(foreachPartition)則是對rdd中的每一個分區的迭代器進行操做。若是在map過程當中須要頻繁建立額外的對象(例如將rdd中的數據經過jdbc寫入數據庫,map須要爲每一個元素建立一個連接而mapPartition爲每一個partition建立一個連接),則mapPartitions效率比map高的多。 */ def statswith(rdd:RDD[Array[Double]]):Array[NAStatCounter]={ val nastats = rdd.mapPartitions((iter:Iterator[Array[Double]])=>{ val nas=Array[NAStatCounter]=iter.next().map(d=>NAStatCounter(d)) iter.foreach(arr=>{ nas.zip(arr).foreach{case (n,d) => n.add(d)} }) Iterator(nas) }) }
變量的選擇和評分簡介app
val statsm = statswith(parsed.filter(_.matched).map(_.scores)) val statsn = statswith(parsed.filter(!_.matched).map(_.scores)) /* statsm和statsn這兩個數組結構相同,但對應不一樣的數據子集:statsm包含匹配記錄匹配分值數組的概要統計信息,而statsn對應不匹配記錄分值數組的概要統計信息。對匹配和不匹配記錄列的值作簡單差別分析 */ statsm.zip(statsn).map{ case(m,n)=>{ (m.missing+n.missing,m.stats.mean - n.stats.mean) }.foreach(println) }
定義一個簡單的評分模型:ide
def naz(d:Double) = if (Double.NaN.equals(d)) 0.0 else d case class Scored(md:MatchData,score:Double) val ct = parsed.map(md=>{ val score=Array(2,5,6,7,8).map(i=>naz(md.scores(i))).sum Scored(md,score) })
過濾一個閾值 4.0this
ct.filter(s=>s.score>=4.0).map(s=>s.md.matched).countByValue()