翻譯自:Why Apache Spark is a Crossover Hit for Data Scientists,有刪減。java
Spark是一個超有潛力的通用數據計算平臺,不管是對統計科學家仍是數據工程師。程序員
數據科學是一個廣闊的領域。我自認是一個數據科學家,但和另一批數據科學家又有不少的不一樣。數據科學家一般分爲統計科學家和數據工程師兩個陣營,而我正處於第二陣營。
統計科學家使用交互式的統計工具(好比R)來回答數據中的問題,得到全景的認識。與之相比,數據工程師則更像一名程序員,他們在服務器上編寫代碼,建立和應用機器學習模型,熟悉C++和Java等系統級語言,常常須要和企業級數據中心的某些組件打交道,好比Hadoop。
而有的數據科學家專一於更細的領域,就像精通R但從未據說過Python或者scikit-learn(反之亦然),即使二者都提供了豐富的統計庫。算法
若是能夠提供一種統一的工具,運行在統一的架構,用統一的語言編程,並能夠同時知足統計科學家和數據工程師的需求,那該多好啊。我一開始就精通Java,難道爲了研究數據,我就必須去學一種像Python或R的語言?我一直使用傳統的數據分析工具,難道爲了應對大規模計算,就必須去懂MapReduce?正是統計工具的不完美造就了這種局面:sql
其餘的數據科學工具同樣沒法盡善盡美。基於Java和Hadoop的背景,我開始幻想一個理想的數據科學利器:一個像R和Python的能實現RPEL(讀取-估值-打印-循環)的自帶統計庫函數的命令行解釋器,又具有自然的分佈式可擴展的屬性;擁有像Crunch同樣的分佈式集合,並且能經過命令行解釋器調用。shell
這就是Spark讓我興奮的緣由。大部分人討論到Spark時,老是注意到將數據駐留內存以提升計算效率的方面(相對MapReduce),但對我來講這根本不是關鍵。Spark擁有許多的特徵,使之真正成爲一個融合統計科學和數據工程的交叉點:apache
Spark和MLib還有待完善:整個項目有很多bug,效率也還有提高的空間,和YARN的整合也存在問題。Spark還沒辦法提供像R那樣豐富的數據分析函數。但Spark已然是世界上最好的數據平臺,足以讓來自任何背景的數據科學家側目。編程
Stack Overflow是一個著名的軟件技術問答平臺,在上面提的每一個問題有可能被打上若干個短文本的標籤,好比java
或者sql
,咱們的目標在於創建一套系統,使用ALS推薦算法,爲新問題的標籤提供預測和建議。從推薦系統的角度,你能夠把問題想象成user
,把標籤想象成item
。
首先,從Stack Overflow下載官方提供的截至20140120的問答數據stackoverflow.com-Posts.7z
。
這是一個可以直接用於分佈式計算的bzip格式文件,但在咱們的場景下,必須先解壓並拷貝到HDFS:ruby
bzcat stackoverflow.com-Posts.7z | hdfs dfs -put - /user/srowen/Posts.xml
解壓後的文件大約是24.4GB,包含210萬個問題,1800萬個回答,總共標註了930萬個標籤,這些標籤排重以後大概是34000個。
確認機器安裝了Spark以後,輸入spark-shell
便可打開Scala的REPL環境。首先,咱們讀取一個存儲在HDFS的Posts.xml
文件:服務器
scalaval postsXML = sc.textFile("hdfs:///user/srowen/Posts.xml")
這時命令行工具會返回:數據結構
postsXML: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at :12
顯示文本文件已轉化爲一個String型的RDD,你能夠經過調用RDD的函數,實現任意的查詢運算。好比統計文件的行數:
scalapostsXML.count
這條指令生成大量的輸出,顯示Spark正在利用分佈式的環境計數,最終打印出18066983
。
下一步,將XML文件的每一行都存入形如(questionID, tag)
的元組。得益於Scala的函數式編程的風格,RDD和Scala集合同樣可使用map等方法:
scalaval postIDTags = postsXML.flatMap { line => // Matches Id="..." ... Tags="..." in line val idTagRegex = "Id=\"(\\d+)\".+Tags=\"([^\"]+)\"".r // // Finds tags like <TAG> value from above val tagRegex = "<([^&]+)>".r // Yields 0 or 1 matches: idTagRegex.findFirstMatchIn(line) match { // No match -- not a line case None => None // Match, and can extract ID and tags from m case Some(m) => { val postID = m.group(1).toInt val tagsString = m.group(2) // Pick out just TAG matching group val tags = tagRegex.findAllMatchIn(tagsString).map(_.group(1)).toList // Keep only question with at least 4 tags, and map to (post,tag) tuples if (tags.size >= 4) tags.map((postID,_)) else None } } // Because of flatMap, individual lists will concatenate // into one collection of tuples }
你會發現這條指令的執行是當即返回的,而不像count
同樣須要等待,由於到目前爲止,Spark並未啓動任何主機間的數據變換。
ALS的MLib實現必須使用數值ID而非字符串做爲唯一標識,而問題的標籤數據是字符串格式的,因此須要把字符串哈希成一個非負整數,同時保留非負整數到字符串的映射。這裏咱們先定義一個哈希函數以便複用。
scaladef nnHash(tag: String) = tag.hashCode & 0x7FFFFF var tagHashes = postIDTags.map(_._2).distinct.map(tag =>(nnHash(tag),tag))
如今把元組轉換爲ALS計算所需的輸入:
scalaimport org.apache.spark.mllib.recommendation._ // Convert to Rating(Int,Int,Double) objects val alsInput = postIDTags.map(t => Rating(t._1, nnHash(t._2), 1.0)) // Train model with 40 features, 10 iterations of ALS val model = ALS.trainImplicit(alsInput, 40, 10)
這一步生成特徵矩陣,能夠被用來預測問題與標籤之間的關聯。因爲目前MLib還處於不完善的狀態,沒有提供一個recommend的接口來獲取建議的標籤,咱們能夠簡單定義一個:
scaladef recommend(questionID: Int, howMany: Int = 5): Array[(String, Double)] = { // Build list of one question and all items and predict value for all of them val predictions = model.predict(tagHashes.map(t => (questionID,t._1))) // Get top howMany recommendations ordered by prediction value val topN = predictions.top(howMany)(Ordering.by[Rating,Double](_.rating)) // Translate back to tags from IDs topN.map(r => (tagHashes.lookup(r.product)(0), r.rating)) }
經過上述函數,咱們能夠得到任意一個問題好比ID爲7122697的How to make substring-matching query work fast on a large table?
的至少4個標籤:
scalarecommend(7122697).foreach(println)
推薦結果以下所示:
scala(sql,0.17745152481166354) (database,0.13526622226672633) (oracle,0.1079428707621154) (ruby-on-rails,0.06067207312463499) (postgresql,0.050933613169706474)
注意:
- 每次運行獲得的結果不盡相同,是由於ALS是從隨機解開始迭代的
- 若是你但願得到實時性更高的結果,能夠在recommend
前輸入tagHashes = tagHashes.cache
真實的問題標籤是postgresql
、query-optimization
、substring
和text-search
。不過,預測結果也有必定的合理性(postgresql
常常和ruby-on-rails
一塊兒出現)。
固然,以上的示例還不夠優雅和高效,可是,我但願全部來自R的分析師、鼓搗Python的黑客和熟悉Hadoop的開發者,都能從中找到大家熟悉的部分,從而找到一條適合大家的路徑去探索Spark,並從中獲益。
來自:建造者說