使用 Spark+Scala 進行數據預處理,最大的優點就是能夠處理大數據量並且速度還會很快。
Scala 不只擁有相似與R語言同樣的語法特色,還比R語言更加靈活,能夠開發本身想要的工具。
Spark 基於 Scala 開發,雖然在大數據處理的時候某些語法和 List 有些不一樣,可是使用起來也是很是順手。java
下載測試數據集合,解壓縮,數據來源是:UC Irvine Machine Learning Repository,這個裏面有不少好東西。shell
$ mkdir linkage $ cd linkage/ $ curl -o donation.zip http://bit.ly/1Aoywaq $ unzip donation.zip $ unzip 'block_*.zip'
若是有 Spark 集羣能夠將數據上傳到集羣上面,下面的測試代碼只須要修改兩個地方就能夠運行。app
首先數據集合有10個文件,每一個文件都是 Table 結果,列按照逗號分割,並且每一個文件都有 header,空值使用 ? 表示。下面要作以下幾件事:curl
def isHead(line: String): Boolean = { line.contains("id_1") }
case class MatchData(id1: Int, id2: Int, scores: Array[Double], matched: Boolean) def toDouble(s: String) = { if ("?".equals(s)) Double.NaN else s.toDouble } def parse(line: String) = { val pieces = line.split(',') val id1 = pieces(0).toInt val id2 = pieces(1).toInt val scores = pieces.slice(2, 11).map(x => toDouble(x)) val matched = pieces(11).toBoolean MatchData(id1, id2, scores, matched) }
這個轉換的過程是比較有技術含量的,必須新建一個類,代替原來的 StatCounter 類,由於原來的 StatCounter 類不考慮空值。新建 DoubleNaNStatCounteride
class DoubleNaNStatCounter extends Serializable { val stats: StatCounter = new StatCounter() var nan: Long = 0 def add(x: Double): DoubleNaNStatCount = { if (java.lang.Double.isNaN(x)) nan += 1 else stats.merge(x) this } def merge(other: DoubleNaNStatCount): DoubleNaNStatCount = { stats.merge(other.stats) nan += other.nan this } override def toString = { "stats: " + stats.toString() + " NaN: " + nan } } object DoubleNaNStatCount extends Serializable { def apply(x: Double) = new DoubleNaNStatCount().add(x) }
再加一個輔助方法,這個方法很是重要,partition本地處理,減小數據傳輸,優化效率:工具
def statsWithMissing(rdd: RDD[Array[Double]]): Array[DoubleNaNStatCount] = { val nastats = rdd.mapPartitions((iter: Iterator[Array[Double]]) => { val nas: Array[DoubleNaNStatCount] = iter.next().map(d => DoubleNaNStatCount(d)) iter.foreach(arr => { nas.zip(arr).foreach { case (n, d) => n.add(d) } }) Iterator(nas) }) nastats.reduce((n1, n2) => { n1.zip(n2).map { case (a, b) => a.merge(b) } }) }
上面類和方法看懂了,基本對 Spark 工做原理就懂了。上面兩個操做,等於本身實現了一個更加通用的 Stat,能夠做爲之後的工做方法用。測試
def main(args: Array[String]) { //master指定爲本地,意味着這是測試 val conf = new SparkConf().setAppName("SparkInAction").setMaster("local[4]") val sc = new SparkContext(conf) //若是不是測試這個路徑修改成對應的正確位置 val rdd = sc.textFile("F:\\clebeg\\spark\\donation") //去除每一個文件的頭信息 val noHeader = rdd.filter(!isHead(_)) val parsed = noHeader.map(parse) //如何轉換成Map,如何爲Map排序 val matchCount = parsed.map(md => md.matched).countByValue() matchCount.foreach(println) matchCount.toSeq.sortBy(_._1).foreach(println) matchCount.toSeq.sortBy(_._2).foreach(println) //RDD[Double] 經過隱式類型轉換具備 stats 方法 //下面查看匹配和不匹配的數據之間的差別 val nasm = statsWithMissing(parsed.filter(_.matched).map(_.scores)) val nasn = statsWithMissing(parsed.filter(!_.matched).map(_.scores)) val diff = nasm.zip(nasn).map{case (a, b) => (a.nan + b.nan, a.stats.mean - b.stats.mean)} diff.foreach(println) )