爲何Spark將成爲數據科學家的統一平臺

翻譯自:Why Apache Spark is a Crossover Hit for Data Scientists,有刪減。java

Spark是一個超有潛力的通用數據計算平臺,不管是對統計科學家仍是數據工程師。程序員

數據科學是一個廣闊的領域。我自認是一個數據科學家,但和另一批數據科學家又有不少的不一樣。數據科學家一般分爲統計科學家和數據工程師兩個陣營,而我正處於第二陣營。
統計科學家使用交互式的統計工具(好比R)來回答數據中的問題,得到全景的認識。與之相比,數據工程師則更像一名程序員,他們在服務器上編寫代碼,建立和應用機器學習模型,熟悉C++和Java等系統級語言,常常須要和企業級數據中心的某些組件打交道,好比Hadoop。
而有的數據科學家專一於更細的領域,就像精通R但從未據說過Python或者scikit-learn(反之亦然),即使二者都提供了豐富的統計庫。算法

不完美的統計工具

若是能夠提供一種統一的工具,運行在統一的架構,用統一的語言編程,並能夠同時知足統計科學家和數據工程師的需求,那該多好啊。我一開始就精通Java,難道爲了研究數據,我就必須去學一種像Python或R的語言?我一直使用傳統的數據分析工具,難道爲了應對大規模計算,就必須去懂MapReduce?正是統計工具的不完美造就了這種局面:sql

  • R提供了一個豐富的統計分析和機器學習的解釋器。但R難以在分佈式條件下執行數據的分析和清洗,以便開展其所擅長的數據分析,也不以一種主流的開發語言爲人所知。
  • Python是一種通用的編程語言,也不乏出色的第三方數據分析庫(像Pandas和scikit-learn),但Python也有和R同樣的缺陷:只能侷限在處理單機能負載的數據量。
  • 在經典的MapReduce計算框架上開發分佈式的機器學習算法是可行的(參考Mahout),但程序員須要從零開始,更別說移植複雜計算的難度。
  • 爲下降複雜計算移植到MapReduce的難度,Crunch提供一個簡單的、傻瓜式的Java API,但MapReduce天生決定了它在迭代計算方面是低效的,儘管大多數機器學習算法都須要迭代計算。

其餘的數據科學工具同樣沒法盡善盡美。基於Java和Hadoop的背景,我開始幻想一個理想的數據科學利器:一個像R和Python的能實現RPEL(讀取-估值-打印-循環)的自帶統計庫函數的命令行解釋器,又具有自然的分佈式可擴展的屬性;擁有像Crunch同樣的分佈式集合,並且能經過命令行解釋器調用。shell

Spark的優點

這就是Spark讓我興奮的緣由。大部分人討論到Spark時,老是注意到將數據駐留內存以提升計算效率的方面(相對MapReduce),但對我來講這根本不是關鍵。Spark擁有許多的特徵,使之真正成爲一個融合統計科學和數據工程的交叉點:apache

  • Spark附帶了一個機器學習庫MLib,雖然只是在初始階段。
  • Spark是用Scala語言編寫的,運行在Java虛擬機上,同時也提供像R和Python的命令行解釋器。
  • 對Java程序員,Scala的學習曲線是比較陡峭的,但所幸Scala能夠兼容一切的Java庫。
  • Spark的RDD(彈性分佈式數據集),是Crunch開發者熟知的一種數據結構。
  • Spark模仿了Scala的集合計算API,對Java和Scala開發者來講耳熟能詳,而Python開發者也不難上手,而Scala對統計計算的支持也不錯。
  • Spark和其底層的Scala語言,並不僅是爲機器學習而誕生的,除此以外,像數據訪問、日誌ETL和整合均可以經過API輕鬆搞定。就像Python,你能夠把整個數據計算流程搬到Spark平臺上來,而不只僅是模型擬合和分析。
  • 在命令行解釋器中執行的代碼,和編譯後運行的效果相同。並且,命令行的輸入能夠獲得實時反饋,你將看到數據透明地在集羣間傳遞與計算。

Spark和MLib還有待完善:整個項目有很多bug,效率也還有提高的空間,和YARN的整合也存在問題。Spark還沒辦法提供像R那樣豐富的數據分析函數。但Spark已然是世界上最好的數據平臺,足以讓來自任何背景的數據科學家側目。編程

實戰:Stack Overflow問題的自動標註

Stack Overflow是一個著名的軟件技術問答平臺,在上面提的每一個問題有可能被打上若干個短文本的標籤,好比java或者sql,咱們的目標在於創建一套系統,使用ALS推薦算法,爲新問題的標籤提供預測和建議。從推薦系統的角度,你能夠把問題想象成user,把標籤想象成item
首先,從Stack Overflow下載官方提供的截至20140120的問答數據stackoverflow.com-Posts.7z
這是一個可以直接用於分佈式計算的bzip格式文件,但在咱們的場景下,必須先解壓並拷貝到HDFS:ruby

bzcat stackoverflow.com-Posts.7z | hdfs dfs -put - /user/srowen/Posts.xml

解壓後的文件大約是24.4GB,包含210萬個問題,1800萬個回答,總共標註了930萬個標籤,這些標籤排重以後大概是34000個。
確認機器安裝了Spark以後,輸入spark-shell便可打開Scala的REPL環境。首先,咱們讀取一個存儲在HDFS的Posts.xml文件:服務器

scalaval postsXML = sc.textFile("hdfs:///user/srowen/Posts.xml")

這時命令行工具會返回:數據結構

postsXML: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at :12

顯示文本文件已轉化爲一個String型的RDD,你能夠經過調用RDD的函數,實現任意的查詢運算。好比統計文件的行數:

scalapostsXML.count

這條指令生成大量的輸出,顯示Spark正在利用分佈式的環境計數,最終打印出18066983
下一步,將XML文件的每一行都存入形如(questionID, tag)的元組。得益於Scala的函數式編程的風格,RDD和Scala集合同樣可使用map等方法:

scalaval postIDTags = postsXML.flatMap { line =>
  // Matches Id="..." ... Tags="..." in  line
  val idTagRegex = "Id=\"(\\d+)\".+Tags=\"([^\"]+)\"".r

  // // Finds tags like <TAG> value from above
  val tagRegex = "&lt;([^&]+)&gt;".r

  // Yields 0 or 1 matches:
  idTagRegex.findFirstMatchIn(line) match {
    // No match -- not a  line
    case None => None
    // Match, and can extract ID and tags from m
    case Some(m) => {
      val postID = m.group(1).toInt
      val tagsString = m.group(2)
      // Pick out just TAG matching group
      val tags = tagRegex.findAllMatchIn(tagsString).map(_.group(1)).toList
      // Keep only question with at least 4 tags, and map to (post,tag) tuples
      if (tags.size >= 4) tags.map((postID,_)) else None
     }
  }
  // Because of flatMap, individual lists will concatenate
  // into one collection of tuples
}

你會發現這條指令的執行是當即返回的,而不像count同樣須要等待,由於到目前爲止,Spark並未啓動任何主機間的數據變換。
ALS的MLib實現必須使用數值ID而非字符串做爲唯一標識,而問題的標籤數據是字符串格式的,因此須要把字符串哈希成一個非負整數,同時保留非負整數到字符串的映射。這裏咱們先定義一個哈希函數以便複用。

scaladef nnHash(tag: String) = tag.hashCode & 0x7FFFFF
var tagHashes = postIDTags.map(_._2).distinct.map(tag =>(nnHash(tag),tag))

如今把元組轉換爲ALS計算所需的輸入:

scalaimport org.apache.spark.mllib.recommendation._
// Convert to Rating(Int,Int,Double) objects
val alsInput = postIDTags.map(t => Rating(t._1, nnHash(t._2), 1.0))
// Train model with 40 features, 10 iterations of ALS
val model = ALS.trainImplicit(alsInput, 40, 10)

這一步生成特徵矩陣,能夠被用來預測問題與標籤之間的關聯。因爲目前MLib還處於不完善的狀態,沒有提供一個recommend的接口來獲取建議的標籤,咱們能夠簡單定義一個:

scaladef recommend(questionID: Int, howMany: Int = 5): Array[(String, Double)] = {
  // Build list of one question and all items and predict value for all of them
  val predictions = model.predict(tagHashes.map(t => (questionID,t._1)))
  // Get top howMany recommendations ordered by prediction value
  val topN = predictions.top(howMany)(Ordering.by[Rating,Double](_.rating))
  // Translate back to tags from IDs
  topN.map(r => (tagHashes.lookup(r.product)(0), r.rating))
}

經過上述函數,咱們能夠得到任意一個問題好比ID爲7122697的How to make substring-matching query work fast on a large table?的至少4個標籤:

scalarecommend(7122697).foreach(println)

推薦結果以下所示:

scala(sql,0.17745152481166354)
(database,0.13526622226672633)
(oracle,0.1079428707621154)
(ruby-on-rails,0.06067207312463499)
(postgresql,0.050933613169706474)

注意:
- 每次運行獲得的結果不盡相同,是由於ALS是從隨機解開始迭代的
- 若是你但願得到實時性更高的結果,能夠在recommend前輸入tagHashes = tagHashes.cache

真實的問題標籤是postgresqlquery-optimizationsubstringtext-search。不過,預測結果也有必定的合理性(postgresql常常和ruby-on-rails一塊兒出現)。
固然,以上的示例還不夠優雅和高效,可是,我但願全部來自R的分析師、鼓搗Python的黑客和熟悉Hadoop的開發者,都能從中找到大家熟悉的部分,從而找到一條適合大家的路徑去探索Spark,並從中獲益。

來自:建造者說

相關文章
相關標籤/搜索