本文基於《Spark 高級數據分析》第2章 用Scala和Spark進行數據分析。
完整代碼見 https://github.com/libaoquan95/aasPractice/tree/master/c2/Intogit
數據集來自加州大學歐文分校機器學習資料庫(UC Irvine Machine Learning Repository),這個資料庫爲研究和教學提供了大量很是好的數據源, 這些數據源很是有意義,而且是免費的。 咱們要分析的數據集來源於一項記錄關聯研究,這項研究是德國一家醫院在 2010 年完成的。這個數據集包含數百萬對病人記錄,每對記錄都根據不一樣標準來匹配,好比病人姓名(名字和姓氏)、地址、生日。每一個匹配字段都被賦予一個數值評分,範圍爲 0.0 到 1.0, 分值根據字符串類似度得出。而後這些數據交由人工處理,標記出哪些表明同一我的哪些表明不一樣的人。 爲了保護病人隱私,建立數據集的每一個字段原始值被刪除了。病人的 ID、 字段匹配分數、匹配對標示(包括匹配的和不匹配的)等信息是公開的,可用於記錄關聯研究github
下載地址:sql
val sc = SparkSession.builder().appName("Into").master("local").getOrCreate() import sc.implicits._
讀取數據集app
// 數據地址 val dataDir = "inkage/block_*.csv" // 讀取有頭部標題的CSV文件,並設置空值 val parsed = sc.read .option("header", "true") .option("nullValue", "?") .option("inferSchema", "true") .csv(dataDir) // 查看錶 parsed.show() // 查看錶結構 parsed.printSchema() parsed.cache()
首先按 is_match 字段聚合數據,有兩種方式能夠進行數據聚合,一是使用 groupby 函數,二是使用 Spark Sql機器學習
// 聚合 parsed.groupBy("is_match").count().orderBy($"count".desc).show() // 先註冊爲臨時表 parsed.createOrReplaceTempView("linkage") // 使用sql查詢,效果同上 sc.sql(""" SELECT is_match, COUNT(*) cnt FROM linkage GROUP BY is_match ORDER BY cnt DESC """).show()
以後使用 describe 函數獲取每一個字段的最值,均值等信息函數
// 獲取每一列的最值,平均值信息 val summary = parsed.describe() summary.show() summary.select("summary", "cmp_fname_c1", "cmp_fname_c2").show()
按此方式獲取匹配記錄和不匹配記錄的 describe學習
// 獲取匹配和不匹配的信息 val matches = parsed.where("is_match = true") val misses = parsed.filter($"is_match" === false) val matchSummary = matches.describe() val missSummary = misses.describe() matchSummary .show() missSummary .show()
能夠看到這個數據不方便進行操做,能夠考慮將其轉置,方便使用sql對數據進行分析ui
def longForm(desc: DataFrame): DataFrame = { import desc.sparkSession.implicits._ // For toDF RDD -> DataFrame conversion val schema = desc.schema desc.flatMap(row => { val metric = row.getString(0) (1 until row.size).map(i => (metric, schema(i).name, row.getString(i).toDouble)) }) .toDF("metric", "field", "value") } def pivotSummary(desc: DataFrame): DataFrame = { val lf = longForm(desc) lf.groupBy("field"). pivot("metric", Seq("count", "mean", "stddev", "min", "max")). agg(first("value")) } // 轉置,重塑數據 val matchSummaryT = pivotSummary(matchSummary) val missSummaryT = pivotSummary(missSummary) matchSummaryT.createOrReplaceTempView("match_desc") missSummaryT.createOrReplaceTempView("miss_desc") sc.sql(""" SELECT a.field, a.count + b.count total, a.mean - b.mean delta FROM match_desc a INNER JOIN miss_desc b ON a.field = b.field ORDER BY delta DESC, total DESC """).show()