原本這篇是準備5.15更的,可是上週一直在忙簽證和工做的事,沒時間就推遲了,如今終於有時間來寫寫Learning Spark最後一部份內容了。html
第10-11 章主要講的是Spark Streaming 和MLlib方面的內容。咱們知道Spark在離線處理數據上的性能很好,那麼它在實時數據上的表現怎麼樣呢?在實際生產中,咱們常常須要即便處理收到的數據,好比實時機器學習模型的應用,自動異常的檢測,實時追蹤頁面訪問統計的應用等。Spark Streaming能夠很好的解決上述相似的問題。算法
瞭解Spark Streaming ,只須要掌握如下幾點便可:apache
- DStream
- 概念:離散化流(discretized stream),是隨時間推移的數據。由每一個時間區間的RDD組成的序列。DStream能夠從Flume、Kafka或者HDFS等多個輸入源建立。
- 操做:轉換和輸出,支持RDD相關的操做,增長了「滑動窗口」等於時間相關的操做。
下面以一張圖來講明Spark Streaming的工做流程:緩存
從上圖中也能夠看到,Spark Streaming把流式計算當作一系列連續的小規模批處理來對待。它從各類輸入源讀取數據,並把數據分組爲小的批次,新的批次按均勻的時間間隔建立出來。在每一個時間區間開始的時候,一個新的批次就建立出來,在該區間內收到的數據都會被添加到這個批次中去。在時間區間結束時,批次中止增加。機器學習
轉化操做分佈式
- 無狀態轉化操做:把簡單的RDDtransformation分別應用到每一個批次上,每一個批次的處理不依賴於以前的批次的數據。包括map()、filter()、reduceBykey()等。
- 有狀態轉化操做:須要使用以前批次的數據或者中間結果來計算當前批次的數據。包括基於滑動窗口的轉化操做,和追蹤狀態變化的轉化操做(updateStateByKey())
無狀態轉化操做ide
有狀態轉化操做函數
Windows機制(一圖盛千言)oop
上圖應該很容易看懂,下面舉個實例(JAVA寫的):性能
UpdateStateByKey()轉化操做
主要用於訪問狀態變量,用於鍵值對形式的DStream。首先會給定一個由(鍵,事件)對構成的DStream,並傳遞一個指定如何我的劇新的事件更新每一個鍵對應狀態的函數,它能夠構建出一個新的DStream,爲(鍵,狀態)。通俗點說,加入咱們想知道一個用戶最近訪問的10個頁面是什麼,能夠把鍵設置爲用戶ID,而後UpdateStateByKey()就能夠跟蹤每一個用戶最近訪問的10個頁面,這個列表就是「狀態」對象。具體的要怎麼操做呢,UpdateStateByKey()提供了一個update(events,oldState)函數,用於接收與某鍵相關的時間以及該鍵以前對應的狀態,而後返回這個鍵對應的新狀態。
- events:是在當前批次中收到的時間列表()可能爲空。
- oldState:是一個可選的狀態對象,存放在Option內;若是一個鍵沒有以前的狀態,能夠爲空。
- newState:由函數返回,也以Option形式存在。若是返回一個空的Option,表示想要刪除該狀態。
UpdateStateByKey()的結果是一個新的DStream,內部的RDD序列由每一個時間區間對應的(鍵,狀態)對組成。
接下來說一下輸入源
- 核心數據源:文件流,包括文本格式和任意hadoop的輸入格式
- 附加數據源:kafka和flume比較經常使用,下面會講一下kafka的輸入
- 多數據源與集羣規模
Kafka的具體操做以下:
基於MLlib的機器學習
通常咱們經常使用的算法都是單機跑的,可是想要在集羣上運行,不能把這些算法直接拿過來用。一是數據格式不一樣,單機上咱們通常是離散型或者連續型的數據,數據類型通常爲array、list、dataframe比較多,以txt、csv等格式存儲,可是在spark上,數據是以RDD的形式存在的,如何把ndarray等轉化爲RDD是一個問題;此外,就算咱們把數據轉化成RDD格式,算法也會不同。舉個例子,你如今有一堆數據,存儲爲RDD格式,而後設置了分區,每一個分區存儲一些數據準備來跑算法,能夠把每一個分區看作是一個單機跑的程序,可是全部分區跑完之後呢?怎麼把結果綜合起來?直接求平均值?仍是別的方式?因此說,在集羣上跑的算法必須是專門寫的分佈式算法。並且有些算法是不能分佈式的跑。Mllib中也只包含可以在集羣上運行良好的並行算法。
MLlib的數據類型
- Vector:向量(mllib.linalg.Vectors)支持dense和sparse(稠密向量和稀疏向量)。區別在與前者的沒一個數值都會存儲下來,後者只存儲非零數值以節約空間。
- LabeledPoint:(mllib.regression)表示帶標籤的數據點,包含一個特徵向量與一個標籤,注意,標籤要轉化成浮點型的,經過StringIndexer轉化。
- Rating:(mllib.recommendation),用戶對一個產品的評分,用於產品推薦
- 各類Model類:每一個Model都是訓練算法的結果,通常都有一個predict()方法能夠用來對新的數據點或者數據點組成的RDD應用該模型進行預測
通常來講,大多數算法直接操做由Vector、LabledPoint或Rating組成的RDD,一般咱們從外部數據讀取數據後須要進行轉化操做構建RDD。具體的聚類和分類算法原理很少講了,能夠本身去看MLlib的在線文檔裏去看。下面舉個實例----垃圾郵件分類的運行過程:
步驟:
1.將數據轉化爲字符串RDD
2.特徵提取,把文本數據轉化爲數值特徵,返回一個向量RDD
3.在訓練集上跑模型,用分類算法
4.在測試繫上評估效果
具體代碼:
1 from pyspark.mllib.regression import LabeledPoint 2 from pyspark.mllib.feature import HashingTF 3 from pyspark.mllib.calssification import LogisticRegressionWithSGD 4 5 spam = sc.textFile("spam.txt") 6 normal = sc.textFile("normal.txt") 7 8 #建立一個HashingTF實例來把郵件文本映射爲包含10000個特徵的向量 9 tf = HashingTF(numFeatures = 10000) 10 #各郵件都被切分爲單詞,每一個單詞背映射爲一個特徵 11 spamFeatures = spam.map(lambda email: tf.transform(email.split(" "))) 12 normalFeatures = normal.map(lambda email: tf.transform(email.split(" "))) 13 14 #建立LabeledPoint數據集分別存放陽性(垃圾郵件)和陰性(正常郵件)的例子 15 positiveExamples = spamFeatures.map(lambda features: LabeledPoint(1,features)) 16 negativeExamples = normalFeatures.map(lambda features: LabeledPoint(0,features)) 17 trainingData = positiveExamples.union(negativeExamples) 18 trainingData.cache#由於邏輯迴歸是迭代算法,因此緩存數據RDD 19 20 #使用SGD算法運行邏輯迴歸 21 model = LogisticRegressionWithSGD.train(trainingData) 22 23 #以陽性(垃圾郵件)和陰性(正常郵件)的例子分別進行測試 24 posTest = tf.transform("O M G GET cheap stuff by sending money to...".split(" ")) 25 negTest = tf.transform("Hi Dad, I stared studying Spark the other ...".split(" ")) 26 print "Prediction for positive test examples: %g" %model.predict(posTest) 27 print "Prediction for negative test examples: %g" %model.predict(negTest)
這個例子很簡單,講的也頗有限,建議你們根據本身的需求,直接看MLlib的官方文檔,關於聚類,分類講的都很詳細。
注:圖片參考同事的PPT講義^_^,已受權哈哈