本教程簡要介紹瞭如何使用Spark。 咱們將首先經過Spark的交互式shell(在Python或Scala中)介紹API,而後展現如何使用Java,Scala和Python編寫應用程序。html
要繼續本指南,首先,從Spark網站下載Spark的打包版本。 因爲咱們不會使用HDFS,您能夠下載任何版本的Hadoop的軟件包。java
請注意,在Spark 2.0以前,Spark的主要編程接口是彈性分佈式數據集(RDD)。 在Spark 2.0以後,RDD被數據集取代,數據集相似於RDD同樣強類型,但在底層有更豐富的優化。 仍然支持RDD接口,您能夠在RDD編程指南中得到更詳細的參考。 可是,咱們強烈建議您切換到使用Dataset,它具備比RDD更好的性能。 請參閱SQL編程指南以獲取有關數據集的更多信息。python
Spark的shell提供了一種學習API的簡單方法,以及一種以交互方式分析數據的強大工具。 它能夠在Scala(在Java VM上運行,所以是使用現有Java庫的好方法)或Python中使用。 經過在Spark目錄中運行如下命令來啓動它:git
./bin/spark-shell
Spark的主要抽象是一個名爲Dataset的分佈式項目集合。 能夠從Hadoop InputFormats(例如HDFS文件)或經過轉換其餘數據集來建立數據集。 讓咱們從Spark源目錄中的README文件的文本中建立一個新的數據集:es6
scala > val textFile = spark . read . textFile ( "README.md" ) textFile : org.apache.spark.sql.Dataset [ String ] = [ value: string ]
您能夠經過調用某些操做直接從Dataset獲取值,或者轉換數據集以獲取新值。 有關更多詳細信息,請閱讀API文檔 。github
scala > textFile . count () // Number of items in this Dataset res0 : Long = 126 // May be different from yours as README.md will change over time, similar to other outputs scala > textFile . first () // First item in this Dataset res1 : String = # Apache Spark
如今讓咱們將這個數據集轉換爲新數據集。 咱們調用filter
來返回一個新的數據集,其中包含文件中項目的子集。算法
scala > val linesWithSpark = textFile . filter ( line => line . contains ( "Spark" )) linesWithSpark : org.apache.spark.sql.Dataset [ String ] = [ value: string ]
咱們能夠將轉換和行動聯繫在一塊兒:sql
scala > textFile . filter ( line => line . contains ( "Spark" )). count () // How many lines contain "Spark"? res3 : Long = 15
數據集操做和轉換可用於更復雜的計算。 假設咱們想要找到含有最多單詞的行:shell
scala > textFile . map ( line => line . split ( " " ). size ). reduce (( a , b ) => if ( a > b ) a else b ) res4 : Long = 15
這首先將一行映射爲整數值,從而建立一個新的數據集。 在該數據集上調用reduce
以查找最大字數。 map
和reduce
的參數是Scala函數文字(閉包),可使用任何語言特性或Scala / Java庫。 例如,咱們能夠輕鬆調用其餘地方聲明的函數。 咱們將使用Math.max()
函數使這段代碼更容易理解:apache
scala > import java.lang.Math import java.lang.Math scala > textFile . map ( line => line . split ( " " ). size ). reduce (( a , b ) => Math . max ( a , b )) res5 : Int = 15
一種常見的數據流模式是MapReduce,由Hadoop推廣。 Spark能夠輕鬆實現MapReduce流程:
scala > val wordCounts = textFile . flatMap ( line => line . split ( " " )). groupByKey ( identity ). count () wordCounts : org.apache.spark.sql.Dataset [( String , Long )] = [ value: string , count ( 1 ) : bigint ]
在這裏,咱們調用flatMap
將行數據集轉換爲單詞數據集,而後將groupByKey
和count
結合起來計算文件中的單詞計數做爲(String,Long)對的數據集。 要收集咱們的shell中的單詞計數,咱們能夠調用collect
:
scala > wordCounts . collect () res6 : Array [( String , Int )] = Array (( means , 1 ), ( under , 2 ), ( this , 3 ), ( Because , 1 ), ( Python , 2 ), ( agree , 1 ), ( cluster ., 1 ), ...)
Spark還支持將數據集提取到羣集範圍的內存緩存中。 這在重複訪問數據時很是有用,例如查詢小的「熱」數據集或運行像PageRank這樣的迭代算法時。 舉個簡單的例子,讓咱們標記要緩存的linesWithSpark
數據集:
scala > linesWithSpark . cache () res7 : linesWithSpark. type = [ value: string ] scala > linesWithSpark . count () res8 : Long = 15 scala > linesWithSpark . count () res9 : Long = 15
使用Spark來探索和緩存100行文本文件彷佛很愚蠢。 有趣的是,這些相同的功能可用於很是大的數據集,即便它們跨越數十個或數百個節點進行條帶化。 您也能夠經過將bin/spark-shell
鏈接到羣集來交互式地執行此操做,如RDD編程指南中所述 。
假設咱們但願使用Spark API編寫一個自包含的應用程序。 咱們將在Scala(使用sbt),Java(使用Maven)和Python(pip)中使用簡單的應用程序。
咱們將在Scala中建立一個很是簡單的Spark應用程序 - 事實上,它簡單地命名爲SimpleApp.scala
:
/* SimpleApp.scala */ import org.apache.spark.sql.SparkSession object SimpleApp { def main ( args : Array [ String ]) { val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system val spark = SparkSession . builder . appName ( "Simple Application" ). getOrCreate () val logData = spark . read . textFile ( logFile ). cache () val numAs = logData . filter ( line => line . contains ( "a" )). count () val numBs = logData . filter ( line => line . contains ( "b" )). count () println ( s"Lines with a: $numAs , Lines with b: $numBs " ) spark . stop () } }
請注意,應用程序應定義main()
方法,而不是擴展scala.App
。 scala.App
子類可能沒法正常工做。
該程序只計算包含'a'的行數和Spark README中包含'b'的數字。 請注意,您須要將YOUR_SPARK_HOME替換爲安裝Spark的位置。 與以前使用Spark shell(初始化本身的SparkSession)的示例不一樣,咱們將SparkSession初始化爲程序的一部分。
咱們調用SparkSession.builder
來構造[[SparkSession]],而後設置應用程序名稱,最後調用getOrCreate
來獲取[[SparkSession]]實例。
咱們的應用程序依賴於Spark API,所以咱們還將包含一個sbt配置文件build.sbt
,它解釋了Spark是一個依賴項。 此文件還添加了Spark依賴的存儲庫:
name := "Simple Project" version := "1.0" scalaVersion := "2.11.12" libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
爲了使sbt正常工做,咱們須要根據典型的目錄結構佈局build.sbt
和build.sbt
。 一旦到位,咱們能夠建立一個包含應用程序代碼的JAR包,而後使用spark-submit
腳原本運行咱們的程序。
# Your directory layout should look like this $ find . . ./build.sbt ./src ./src/main ./src/main/scala ./src/main/scala/SimpleApp.scala # Package a jar containing your application $ sbt package ... [ info ] Packaging { .. } / { .. } /target/scala-2.11/simple-project_2.11-1.0.jar # Use spark-submit to run your application $ YOUR_SPARK_HOME/bin/spark-submit \ --class "SimpleApp" \ --master local [ 4 ] \ target/scala-2.11/simple-project_2.11-1.0.jar ... Lines with a: 46 , Lines with b: 23
恭喜您運行第一個Spark應用程序!
examples
目錄( Scala , Java , Python , R )中包含了幾個示例。 您能夠按以下方式運行它們:# For Scala and Java, use run-example: ./bin/run-example SparkPi # For Python examples, use spark-submit directly: ./bin/spark-submit examples/src/main/python/pi.py # For R examples, use spark-submit directly: ./bin/spark-submit examples/src/main/r/dataframe.R