Spark 實踐——用 Scala 和 Spark 進行數據分析

本文基於《Spark 高級數據分析》第2章 用Scala和Spark進行數據分析。
完整代碼見 https://github.com/libaoquan95/aasPractice/tree/master/c2/Intogit

1.獲取數據集

數據集來自加州大學歐文分校機器學習資料庫(UC Irvine Machine Learning Repository),這個資料庫爲研究和教學提供了大量很是好的數據源, 這些數據源很是有意義,而且是免費的。 咱們要分析的數據集來源於一項記錄關聯研究,這項研究是德國一家醫院在 2010 年完成的。這個數據集包含數百萬對病人記錄,每對記錄都根據不一樣標準來匹配,好比病人姓名(名字和姓氏)、地址、生日。每一個匹配字段都被賦予一個數值評分,範圍爲 0.0 到 1.0, 分值根據字符串類似度得出。而後這些數據交由人工處理,標記出哪些表明同一我的哪些表明不一樣的人。 爲了保護病人隱私,建立數據集的每一個字段原始值被刪除了。病人的 ID、 字段匹配分數、匹配對標示(包括匹配的和不匹配的)等信息是公開的,可用於記錄關聯研究github

下載地址:sql

  1. http://bit.ly/1Aoywaq (需FQ)
  2. https://github.com/libaoquan95/aasPractice/tree/master/c2/linkage(已解壓,block_1.csv 到 block_10.csv)

2.設置Spark運行環境,讀取數據

val sc = SparkSession.builder().appName("Into").master("local").getOrCreate()
import sc.implicits._

讀取數據集app

// 數據地址
val dataDir = "inkage/block_*.csv"
// 讀取有頭部標題的CSV文件,並設置空值
val parsed = sc.read .option("header", "true") .option("nullValue", "?") .option("inferSchema", "true") .csv(dataDir)
// 查看錶
parsed.show() 
// 查看錶結構
parsed.printSchema() 
parsed.cache()

3.處理數據

首先按 is_match 字段聚合數據,有兩種方式能夠進行數據聚合,一是使用 groupby 函數,二是使用 Spark Sql機器學習

// 聚合
parsed.groupBy("is_match").count().orderBy($"count".desc).show()

// 先註冊爲臨時表
parsed.createOrReplaceTempView("linkage")
// 使用sql查詢,效果同上
sc.sql("""
  SELECT is_match, COUNT(*) cnt
  FROM linkage
  GROUP BY is_match
  ORDER BY cnt DESC
""").show()

以後使用 describe 函數獲取每一個字段的最值,均值等信息函數

// 獲取每一列的最值,平均值信息
val summary = parsed.describe()
summary.show()
summary.select("summary", "cmp_fname_c1", "cmp_fname_c2").show()

按此方式獲取匹配記錄和不匹配記錄的 describe學習

// 獲取匹配和不匹配的信息
val matches = parsed.where("is_match = true")
val misses = parsed.filter($"is_match" === false)
val matchSummary = matches.describe()
val missSummary = misses.describe()
matchSummary .show()
missSummary .show()


能夠看到這個數據不方便進行操做,能夠考慮將其轉置,方便使用sql對數據進行分析ui

def longForm(desc: DataFrame): DataFrame = {
  import desc.sparkSession.implicits._ // For toDF RDD -> DataFrame conversion
  val schema = desc.schema
  desc.flatMap(row => {
    val metric = row.getString(0)
    (1 until row.size).map(i => (metric, schema(i).name, row.getString(i).toDouble))
  })
    .toDF("metric", "field", "value")
}
def pivotSummary(desc: DataFrame): DataFrame = {
  val lf = longForm(desc)
  lf.groupBy("field").
    pivot("metric", Seq("count", "mean", "stddev", "min", "max")).
    agg(first("value"))
}

// 轉置,重塑數據
val matchSummaryT = pivotSummary(matchSummary)
val missSummaryT = pivotSummary(missSummary)
matchSummaryT.createOrReplaceTempView("match_desc")
missSummaryT.createOrReplaceTempView("miss_desc")
sc.sql("""
  SELECT a.field, a.count + b.count total, a.mean - b.mean delta
  FROM match_desc a INNER JOIN miss_desc b ON a.field = b.field
  ORDER BY delta DESC, total DESC
""").show()


相關文章
相關標籤/搜索