病人記錄spark建立完整代碼

相似於spark statCounter類的東西,處理缺失值java

import org.apache.spark.util.StatCounter

class NAStatCounter extends Serializable {
  val stats:StatCounter =new StatCounter()
  var missing:Long=0
  def add(x:Double):NAStatCounter={
    if(java.lang.Double.isNaN(x)){
      missing += 1
    }else{
      stats.merge(x)
    }
    this
  }
  def merge(other:NAStatCounter):NAStatCounter={
    stats.merge(other.stats)
    missing+=other.missing
    this
  }

  override def toString: String = {
    "stats:"+stats.toString()+" NaN "+missing
  }
}
object NAStatCounter extends Serializable{
  def apply(x:Double) = new NAStatCounter().add(x)
}

編譯:數據庫

調用以前的方法parse:apache

val nas1=Array(1.0,Double.NaN).map(d=>NAStatCounter(d))
val nas2=Array(Double.NaN,2.0).map(d=>NAStatCounter(d))
//經過拉鍊進行聚合
val mergerd = nas1.zip(nas2).map(p=>p._1.merge(p._2))

集合之間作reduce數組

val nas=List(nas1,nas2)
val mergerd=mas.reduce((n1,n2)=>{
 n1.zip(n2).map{
  case (a,b)=>a.merge(b)
 }
})
/*
與map方法相似,map是對rdd中的每個元素進行操做,而mapPartitions(foreachPartition)則是對rdd中的每一個分區的迭代器進行操做。若是在map過程當中須要頻繁建立額外的對象(例如將rdd中的數據經過jdbc寫入數據庫,map須要爲每一個元素建立一個連接而mapPartition爲每一個partition建立一個連接),則mapPartitions效率比map高的多。
*/
def statswith(rdd:RDD[Array[Double]]):Array[NAStatCounter]={
    val nastats = rdd.mapPartitions((iter:Iterator[Array[Double]])=>{
      val nas=Array[NAStatCounter]=iter.next().map(d=>NAStatCounter(d))
      iter.foreach(arr=>{
        nas.zip(arr).foreach{case (n,d) => n.add(d)}
      })
      Iterator(nas)
    })
}

變量的選擇和評分簡介app

val statsm = statswith(parsed.filter(_.matched).map(_.scores))
val statsn = statswith(parsed.filter(!_.matched).map(_.scores))
/*
statsm和statsn這兩個數組結構相同,但對應不一樣的數據子集:statsm包含匹配記錄匹配分值數組的概要統計信息,而statsn對應不匹配記錄分值數組的概要統計信息。對匹配和不匹配記錄列的值作簡單差別分析
*/
statsm.zip(statsn).map{
 case(m,n)=>{
  (m.missing+n.missing,m.stats.mean - n.stats.mean)
 }.foreach(println)
}

定義一個簡單的評分模型:ide

def naz(d:Double) = if (Double.NaN.equals(d)) 0.0 else d
  case class Scored(md:MatchData,score:Double)
  val ct = parsed.map(md=>{
    val score=Array(2,5,6,7,8).map(i=>naz(md.scores(i))).sum
    Scored(md,score)
  })

過濾一個閾值 4.0this

ct.filter(s=>s.score>=4.0).map(s=>s.md.matched).countByValue()
相關文章
相關標籤/搜索