[大數據之Spark]——快速入門

爲了良好的閱讀下面的文檔,最好是結合實際的練習。首先須要下載spark,而後安裝hdfs,能夠下載任意版本的hdfs。html

Spark Shell 交互

基本操做

Spark Shell提供給用戶一個簡單的學習API的方式 以及 快速分析數據的工具。在shell中,既可使用scala(運行在java虛擬機,所以可使用java庫)也可使用python。能夠在spark的bin目錄下啓動spark shell:java

./bin/spark-shell.sh複製代碼

spark操做對象是一種分佈式的數據集合,叫作Resilient Distributed Dataset(RDD)。RDD能夠經過hdfs文件建立,也能夠經過RDD轉換得來。python

下面就實際操做下,看看效果。個人本地有個文件——test.txt,內容爲:es6

hello world
haha nihao複製代碼

能夠經過這個文件建立一個新的RDD算法

val textFile = sc.textFile("test.txt")
textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21複製代碼

在Spark中,基於RDD能夠做兩種操做——Actions算子操做以及Transformations轉換操做。shell

咱們可使用一些算子操做體驗下:apache

scala> textFile.count() //RDD有用的數量
res1: Long = 2

scala> textFile.first() //RDD第一行
res3: String = hello world複製代碼

再執行一些轉換操做,好比使用filter轉換,返回一個新的RDD集合:編程

scala> val lines = textFile.filter(line=>line.contains("hello"))
lines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:23

scala> lines.count()
res4: Long = 1

scala> val lines = textFile.filter(line=>line.contains("haha"))
lines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at filter at <console>:23

scala> lines.count()
res5: Long = 1

scala> lines.first()
res6: String = haha nihao複製代碼

更多RDD操做

RDD算子和轉換能夠組成不少複雜的計算,好比咱們想找出最多一行中單詞最多的單詞數量:api

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15複製代碼

這個操做會把一行經過split切分計數,轉變爲一個整型的值,而後建立成新的RDD。reduce操做用來尋找單詞最多的那一行。緩存

用戶能夠在任什麼時候候調用方法和庫,可使用Math.max()函數:

scala> import java.lang.Math
import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15複製代碼

一個很常見的數據操做就是map reduce,這個操做在hadoop中很常見。Spark能夠輕鬆的實現Mapreduce任務:

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[8] at reduceByKey at <console>:28複製代碼

這裏使用了flatMap,map以及reduceByKey等轉換操做來計算每一個單詞在文件中的數量。爲了在shell中顯示,可使用collect()觸發計算:

scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)複製代碼

緩存

Spark也支持在分佈式的環境下基於內存的緩存,這樣當數據須要重複使用的時候就頗有幫助。好比當須要查找一個很小的hot數據集,或者運行一個相似PageRank的算法。

舉個簡單的例子,對linesWithSpark RDD數據集進行緩存,而後再調用count()會觸發算子操做進行真正的計算,以後再次調用count()就不會再重複的計算,直接使用上一次計算的結果的RDD了:

scala> linesWithSpark.cache()
res7: linesWithSpark.type = MapPartitionsRDD[2] at filter at <console>:27

scala> linesWithSpark.count()
res8: Long = 19

scala> linesWithSpark.count()
res9: Long = 19複製代碼

看起來緩存一個100行左右的文件很愚蠢,可是若是再很是大的數據集下就很是有用了,尤爲是在成百上千的節點中傳輸RDD計算的結果。你也能夠經過bin/spark-shell向集羣提交任務,能夠參考編程指南

獨立應用

要使用spark api寫一個本身的應用也很簡單,能夠基於scala、java、python去寫一些簡單的應用。

/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
  }
}複製代碼

注意應用須要定義main()方法。這個程序僅僅是統計文件中包含字符ab的分別都有多少行。你能夠設置YOUR_SPARK_HOME替換本身的文件目錄。不像以前在shell中的例子那樣,咱們須要本身初始化sparkContext。

經過SparkConf構造方法建立SparkContext。

應用依賴於spark api,所以須要在程序中配置sbt的配置文件——simple.sbt,它聲明瞭spark的依賴關係。

name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.0.0"複製代碼

爲了讓sbt正確的工做,還須要建立SimpleApp.scala以及simple.sbt。而後就能夠執行打包命令,經過spark-submit運行了:

# Your directory layout should look like this 你的工程目錄應該向下面這樣
$ find .
.
./simple.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala

# Package a jar containing your application 運行sbt命令進行打包
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.11/simple-project_2.11-1.0.jar

# Use spark-submit to run your application 經過spark-submit提交任務jar包
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/scala-2.11/simple-project_2.11-1.0.jar
...
Lines with a: 46, Lines with b: 23複製代碼

其餘地址

經過上面的例子,就能夠運行起來本身的Spark應用了。

那麼能夠參考下面的連接得到更多的內容:

  • 爲了更深刻的學習,能夠閱讀Spark編程指南
  • 若是想要運行Spark集羣,能夠參考部署指南
  • 最後,Spark在examples目錄中內置了多種語言版本的例子,如scala,java,python,r等等。你能夠經過下面的命令運行:
# For Scala and Java, use run-example:
./bin/run-example SparkPi

# For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py

# For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R複製代碼

相關文章
相關標籤/搜索