Spark Machine Learning 01 Spark Use @Scala、Python

Preface

Apache Spark

簡化了海量數據的存儲(HDFS) 和 計算 (MR--在集羣多個節點進行並行計算的框架) 流程, MapReduce 缺點, 如 : 啓動任務時的高開銷、對中間數據 和 計算結果 寫入磁盤的依賴。這使得 Hadoop 不適合 迭代式低延遲 的任務。html

機器學習 算法並不是爲並行架構而設計。 機器學習模型通常具備迭代式的特性, 這與 Spark 的設計目標一致。並行計算框架 不多能 兼顧 速度、擴展性、內存處理、容錯性的同時,還提供靈活、表達力豐富的 API。Spark 全新的分佈式計算框架, 注重 : 低延遲任務的優化、並將中間數據和結果保存在內存中。Spark 提供函數式API,併兼容 Hadoop Ecosystemjava

Spark 提供了對 Scala、Java、Python 語言的原生 API。python

Spark 框架對 資源調度、任務提交、執行、跟蹤, 節點間通訊以及數據並行處理的內在底層操做都進行了抽象。它提供了高級別 API 用於處理分佈式數據。git

Apche Spark

全新的分佈式計算框架, 注重 : 低延遲任務的優化、並將中間數據和結果保存在內存中。Spark 提供函數式API,併兼容 Hadoop Ecosystemgithub

Spark 提供了對 Scala、Java、Python 語言的原生 API。算法

Spark 框架對 資源調度、任務提交、執行、跟蹤, 節點間通訊以及數據並行處理的內在底層操做都進行了抽象。它提供了高級別 API 用於處理分佈式數據。shell

Spark 的四種運行模式apache

  1. 本地單機模式 -- Spark 進程 all run in One JVM編程

  2. 集羣單機模式 -- 使用 Spark 本身內置的 任務調度框架緩存

  3. 基於 Mesos

  4. 基於 YARN

Chap 1 Spark 搭建使用

1.1 Spark的運行

運行示例程序來測試是否一切正常:

./bin/run-example org.apache.spark.examples.SparkPi

該命令將在本地單機模式下執行SparkPi這個示例。在該模式下,全部的Spark進程均運行於同一個JVM中,而並行處理則經過多線程來實現。默認狀況下,該示例會啓用與本地系統的CPU核心數目相同的線程。

要在本地模式下設置並行的級別,以local[N]的格式來指定一個master變量便可。好比只使用兩個線程時,可輸入以下命令:

MASTER=local[2] ./bin/run-example org.apache.spark.examples.SparkPi

1.2 Spark集羣

Spark集羣由兩類程序構成:一個驅動程序和多個執行程序。本地模式時全部的處理都運行在同一個JVM內,而在集羣模式時它們一般運行在不一樣的節點上。

舉例來講,一個採用單機模式的Spark集羣包括:

  1. 一個運行Spark單機主進程和驅動程序的主節點;

  2. 各自運行一個執行程序進程的多個工做節點。

好比在一個Spark單機集羣上運行,只需傳入主節點的URL便可:

MASTER=spark://IP:PORT ./bin/run-example org.apache.spark.examples.SparkPi

其中的IP和PORT分別是主節點IP地址和端口號。這是告訴Spark讓示例程序運行在主節點所對應的集羣上

(? 貌似和單機效果同樣)

1.3 Spark編程模型

1.3.1 SparkContext類

SparkContext類與SparkConf類

任何Spark程序的編寫都是從SparkContext開始的。SparkContext的初始化須要一個SparkConf對象,後者包含了Spark集羣配置的各類參數(好比主節點的URL)。

初始化後,咱們即可用SparkContext對象所包含的各類方法來建立和操做RDD。Spark shell(在Scala和Python下能夠,但不支持Java)能自動完成上述初始化。若要用Scala代碼來實現的話,可參照下面的代碼:

val conf = new SparkConf().setAppName("Test Spark App").setMaster("local[4]")
val sc = new SparkContext(conf)

這段代碼會建立一個4線程的SparkContext對象,並將其相應的任務命名爲Test Spark APP。咱們也可經過以下方式調用SparkContext的簡單構造函數

val sc = new SparkContext("local[4]", "Test Spark App")

1.3.2 Spark shell

Spark支持 用 Scala or Python REPL(Read-Eval-Print-Loop,即交互式shell)來進行交互式的程序編寫。

./bin/spark-shell

會啓動Scala shell 並初始化一個SparkContext對象。咱們能夠經過 sc這個Scala值來調用這個對象

1.3.3 RDD

一個 RDD 表明一系列的「記錄」(嚴格來講,某種類型的對象)。
這些記錄被分配或分區到一個集羣的多個節點上(在本地模式下,能夠相似地理解爲單個進程裏的多個線程上)。

Spark中的RDD具有容錯性,即當某個節點或任務失敗時(因非用戶代碼緣由而引發,如硬件故障、網絡不通等),RDD會在餘下的節點上自動重建,以便任務能最終完成。

1. 建立RDD

RDD可從現有的集合建立 :

val collection = List("a", "b", "c", "d", "e")
val rddFromCollection = sc.parallelize(collection)

RDD也能夠基於Hadoop的輸入源建立,好比本地文件系統、HDFS。基於Hadoop的RDD可使用任何實現了Hadoop InputFormat接口的輸入格式,包括文本文件、其餘Hadoop標準格式、HBase等。如下舉例說明如何用一個本地文件系統裏的文件建立RDD:

val rddFromTextFile = sc.textFile("LICENSE")

上述代碼中的textFile函數(方法)會返回一個RDD對象。該對象的每一條記錄都是一個表示文本文件中某一行文字的String(字符串)對象。

2. Spark操做

在Spark編程模式下,全部的操做被分爲 transformationaction 兩種。

transformation 操做是對一個數據集裏的全部記錄執行某種函數,從而使記錄發生改變;

action 一般是運行某些計算或聚合操做,並將結果返回運行 SparkContext 的那個驅動程序。

Spark 的操做一般採用函數式風格。

Spark程序中最經常使用的轉換操做即是map操做。該操做對一個RDD裏的每一條記錄都執行某個函數,從而將輸入映射成爲新的輸出。

好比,下面這段代碼便對一個從本地文本文件建立的RDD進行操做。它對該RDD中的每一條記錄都執行size函數。
建立一個這樣的由若干String構成的RDD對象。經過map函數,咱們將每個字符串都轉換爲一個整數,從而返回一個由若干Int構成的RDD對象。

scala> rddFromTextFile.count
res2: Long = 294

scala> val intsFromStringsRDD = rddFromTextFile.map(line => line.size)
intsFromStringsRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at map at <console>:23

scala> intsFromStringsRDD.count
res3: Long = 294

scala> val sumOfRecords = intsFromStringsRDD.sum
sumOfRecords: Double = 17062.0

scala> val numRecords = intsFromStringsRDD.count
numRecords: Long = 294

scala> val aveLengthOfRecord = sumOfRecords / numRecords
aveLengthOfRecord: Double = 58.034013605442176

// 等價於

val aveLengthOfRecordChained = rddFromTextFile.map(line => line.size).sum / rddFromTextFile.count

示例中 => 是Scala下表示匿名函數的語法。語法 line => line.size 表示以 => 操做符左邊的部分做爲輸入,對其執行一個函數,並以 => 操做符右邊代碼的執行結果爲輸出。在這個例子中,輸入爲line,輸出則是 line.size 函數的執行結果。在Scala語言中,這種將一個String對象映射爲一個Int的函數被表示爲String => Int。

Spark的大多數操做都會返回一個新RDD,但多數的Action操做則是返回計算的結果

注 : Spark 中的轉換操做是延後的。也就是說,在RDD上調用一個轉換操做並不會當即觸發相應的計算。 只有必要時才計算結果並將其返回給驅動程序,從而提升了Spark的效率。

scala> val transformedRDD = rddFromTextFile.map(line => line.size).
     | filter(size => size > 10).map(size => size * 2)
transformedRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at map at <console>:24

scala>

沒有觸發任何計算,也沒有結果被返回。
若是咱們如今在新的RDD上調用一個執行操做,好比sum,該計算將會被觸發:

觸發計算

scala> val computation = transformedRDD.sum
computation: Double = 34106.0

3. RDD緩存策略

Spark最爲強大的功能之一即是可以把數據緩存在集羣的內存裏。這經過調用RDD的cache函數來實現:

scala> rddFromTextFile.cache
res4: rddFromTextFile.type = MapPartitionsRDD[2] at textFile at <console>:21

scala> val aveLengthOfRecordChainedFromCached = rddFromTextFile.map(line => line.size).sum / rddFromTextFile.count
aveLengthOfRecordChainedFromCached: Double = 58.034013605442176

在RDD首次調用一個執行操做時,這個操做對應的計算會當即執行,數據會從數據源裏讀出並保存到內存。所以,首次調用cache函數所須要的時間會部分取決於Spark從輸入源讀取數據所須要的時間。可是,當下一次訪問該數據集的時候,數據能夠直接從內存中讀出從而減小低效的I/O操做,加快計算。多數狀況下,這會取得數倍的速度提高。

Spark支持更爲細化的緩存策略。經過persist函數能夠指定Spark的數據緩存策略。關於RDD緩存的更多信息可參見:http://spark.apache.org/docs/...

1.3.4 廣播變量和累加器

Spark的另外一個核心功能是能建立兩種特殊類型的變量:廣播變量 和 累加器。

廣播變量(broadcast variable)爲只讀變量,它由運行SparkContext的驅動程序建立後發送給會參與計算的節點。對那些須要讓各工做節點高效地訪問相同數據的應用場景,好比機器學習,這很是有用。

Spark下建立廣播變量只需在SparkContext上調用一個方法便可:

scala> val broadcastAList = sc.broadcast(List("a", "b", "c", "d", "e"))
broadcastAList: org.apache.spark.broadcast.Broadcast[List[String]] = Broadcast(11)

scala>

廣播變量 也能夠被非驅動程序所在的節點(即工做節點)訪問,訪問的方法是調用該變量的value方法:

scala> val broadcastAList = sc.broadcast(List("a", "b", "c", "d", "e"))
broadcastAList: org.apache.spark.broadcast.Broadcast[List[String]] = Broadcast(11)

scala> sc.parallelize(List("1", "2", "3")).map(x => broadcastAList.value ++ x).collect
res5: Array[List[Any]] = Array(List(a, b, c, d, e, 1), List(a, b, c, d, e, 2), List(a, b, c, d, e, 3))

注意,collect 函數通常僅在的確須要將整個結果集返回驅動程序並進行後續處理時纔有必要調用。若是在一個很是大的數據集上調用該函數,可能耗盡驅動程序的可用內存,進而致使程序崩潰。

高負荷的處理應儘量地在整個集羣上進行,從而避免驅動程序成爲系統瓶頸。然而在很多狀況下,將結果收集到驅動程序的確是有必要的。不少機器學習算法的迭代過程便屬於這類狀況。

累加器(accumulator)也是一種被廣播到工做節點的變量。累加器與廣播變量的關鍵不一樣,是後者只能讀取而前者卻可累加。

關於累加器的更多信息,可參見《Spark編程指南》:http://spark.apache.org/docs/...

1.4 Spark Scala 編程入門

scala-spark-app

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

/**
 * A simple Spark app in Scala
 */
object ScalaApp {

  def main(args: Array[String]) {
    val sc = new SparkContext("local[2]", "First Spark App")

    // we take the raw data in CSV format and convert it into a set of records of the form (user, product, price)
    val data = sc.textFile("data/UserPurchaseHistory.csv")
      .map(line => line.split(","))
      .map(purchaseRecord => (purchaseRecord(0), purchaseRecord(1), purchaseRecord(2)))

    // let's count the number of purchases
    val numPurchases = data.count()

    // let's count how many unique users made purchases
    val uniqueUsers = data.map { case (user, product, price) => user }.distinct().count()

    // let's sum up our total revenue
    val totalRevenue = data.map { case (user, product, price) => price.toDouble }.sum()

    // let's find our most popular product
    val productsByPopularity = data
      .map { case (user, product, price) => (product, 1) }
      .reduceByKey(_ + _)
      .collect()
      .sortBy(-_._2)
    val mostPopular = productsByPopularity(0)

    // finally, print everything out
    println("Total purchases: " + numPurchases)
    println("Unique users: " + uniqueUsers)
    println("Total revenue: " + totalRevenue)
    println("Most popular product: %s with %d purchases".format(mostPopular._1, mostPopular._2))

    sc.stop()
  }
}

1.5 Spark Java 編程入門

Java API與Scala API本質上很類似。Scala代碼能夠很方便地調用Java代碼,但某些Scala代碼卻沒法在Java裏調用,特別是那些使用了隱式類型轉換、默認參數和採用了某些Scala反射機制的代碼。

SparkContext有了對應的Java版本JavaSparkContext,而RDD則對應JavaRDD。
Spark提供對Java 8匿名函數(lambda)語法的支持。

用Scala編寫時,鍵/值對記錄的RDD能支持一些特別的操做(好比reduceByKey和saveAsSequenceFile)。這些操做能夠經過隱式類型轉換而自動被調用。用Java編寫時,則須要特別類型的JavaRDD來支持這些操做。它們包括用於鍵/值對的JavaPairRDD,以及用於數值記錄的JavaDoubleRDD。

Java 8 RDD以及Java 8 lambda表達式更多信息可參見《Spark編程指南》:http://spark.apache.org/docs/...

1.6 Spark Python 編程入門

"""用Python編寫的一個簡單Spark應用"""
from pyspark import SparkContext

sc = SparkContext("local[2]", "First Spark App")
# 將CSV格式的原始數據轉化爲(user,product,price)格式的記錄集
data = sc.textFile("data/UserPurchaseHistory.csv").map(lambda line:
line.split(",")).map(lambda record: (record[0], record[1], record[2]))
# 求總購買次數
numPurchases = data.count()
# 求有多少不一樣客戶購買過商品
uniqueUsers = data.map(lambda record: record[0]).distinct().count()
# 求和得出總收入
totalRevenue = data.map(lambda record: float(record[2])).sum()
# 求最暢銷的產品是什麼
products = data.map(lambda record: (record[1], 1.0)).
reduceByKey(lambda a, b: a + b).collect()
mostPopular = sorted(products, key=lambda x: x[1], reverse=True)[0]

print "Total purchases: %d" % numPurchases
print "Unique users: %d" % uniqueUsers
print "Total revenue: %2.2f" % totalRevenue
print "Most popular product: %s with %d purchases" % (mostPopular[0], mostPopular[1])

匿名函數在Python語言中亦稱lambda函數,lambda也是語法表達上的關鍵字。

用Scala編寫時,一個將輸入x映射爲輸出y的匿名函數表示爲x => y,而在Python中則是lambda x : y。

>$SPARK_HOME/bin/spark-submit pythonapp.py

Spark的Python API幾乎覆蓋了全部Scala API所能提供的功能. 但的確有些特性,好比Spark Streaming和個別的API方法,暫不支持。

具體參見《Spark編程指南》的Python部分

1.7 小結

體會了 函數式 編程的威力, scala、python 均可以。java 不適合寫 spark 程序

相關文章
相關標籤/搜索