病人spark處理-元組和case class 對數據進行結構化

//39932,40902,1,?,1,?,1,1,1,1,1,TRUE

/*
前兩個字段是整數型ID,表明記錄中匹配的兩個病人;
後面9個值,表明病人記錄中不一樣字段(姓名,生日,地址)的匹配值
最後一個字段:布爾。表明該行病人記錄是否匹配。
咱們用‘,’切割一下
*/

val p = head(5).split(',')
//p: Array[String] = Array(36950, 42116, 1, ?, 1, 1, 1, 1, 1, 1, 1, TRUE)

/*隱士類型轉換:當調用scala對象方法時,若是定義該對象的類型中找不到方法定義,Scala編譯器就將該對象轉換成響應的方法定義的類的實例*/
val id1 = p(0).toInt

須要對9個字段值進行轉換,能夠先用Scala Array 類的slice方法提取一部分元素,而後調用map函數,將slice中每一個元素的類型從String轉成Doublejava

val raws=p.slice(2,11)
//raws: Array[String] = Array(1, ?, 1, 1, 1, 1, 1, 1, 1)

raws.map(s=>s.toDouble)
/*
出錯,主要是由於遇到了?,因此咱們寫一個函數來對它進行處理
java.lang.NumberFormatException: For input string: "?"
*/

def toDouble(s:String)={
      if("?".equals(s)) Double.NaN else s.toDouble
      }
//toDouble: (s: String)Double

val sorce = raws.map(toDouble)
//sorce: Array[Double] = Array(1.0, NaN, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0)

結合全部,咱們寫一個方法,將以前的總結起來緩存

def parse(line:String)={
 val pieces = line.split(',')
 val id1 = pieces(0).toInt
 val id2 = pieces(1).toInt
 val scores = pieces.slice(2,11).map(toDouble)
 val matched = pieces(11).toBoolean
 (id1,id2,scores,matched)
}
val tup = parse(line)

咱們建立一個case class,方便取值:函數

case class MatchData(id1:Int,id2:Int,scores:Array[Double],matched:Boolean)

而後以後返回的時候,就不是元祖類型,而是MatchData類型;spa

def parse(line:String)={
 val pieces = line.split(',')
 val id1 = pieces(0).toInt
 val id2 = pieces(1).toInt
 val scores = pieces.slice(2,11).map(toDouble)
 val matched = pieces(11).toBoolean
 MatchData(id1,id2,scores,matched)
}
val tup = parse(line)

//這塊的tup就已是MatchData類型
//能夠直接用過tup.id1 拿值了

接下來咱們就能夠調用函數。scala

val mds=head.filter(x=>!isHeader(x)).map(x=>parse(x))

//解析集羣數據,再noheader上調用map函數:

val parsed = noheader.map(line=>parse(line))

//若是須要緩存,能夠直接使用緩存,spark有本身的緩存機制
parsed.cache()

可是數據不必定在一臺機器上,因此咱們須要聚合,對其聚合時,數據傳輸的效率確定是擔憂的一個問題。code

val group = mds.groupBy(md=>md.matched)

/*
獲得grouped變量中的值之後,就能夠經過在grouped上調用mapValues方法獲得計數。
*/
group.mapValues(x=>x.size).foreach(println)

建立直方圖orm

連續變量的概要統計,如如下代碼:對象

val stats =(0 until 9).map(i=>{
 parsed.map(md=>md.scores(i)).filter(!isNaN(_)).stats()
})
相關文章
相關標籤/搜索