Spark Shell簡單使用

基礎html

Spark的shell做爲一個強大的交互式數據分析工具,提供了一個簡單的方式學習API。它可使用Scala(在Java虛擬機上運行現有的Java庫的一個很好方式)或Python。在Spark目錄裏使用下面的方式開始運行:java

 

[plain]  view plain  copy
 
  1. ./bin/spark-shell  
在Spark Shell中,有一個專有的SparkContext已經爲您建立好了,變量名叫作sc。本身建立的SparkContext將沒法工做。能夠用--master參數來設置SparkContext要鏈接的集羣,用--jars來設置須要添加到CLASSPATH的jar包,若是有多個jar包,可使用逗號分隔符鏈接它們。例如,在一個擁有4核的環境上運行spark-shell,使用:
[plain]  view plain  copy
 
  1. ./bin/spark-shell --master local[4]  
或在CLASSPATH中添加code.jar,使用:
[plain]  view plain  copy
 
  1. ./bin/spark-shell --master local[4] --jars code.jar  
能夠執行spark-shell --help獲取完整的選項列表。 
Spark最主要的抽象是叫Resilient Distributed Dataset(RDD)的彈性分佈式集合。RDDs可使用Hadoop InputFormats(例如HDFS文件)建立,也能夠從其餘的RDDs轉換。讓咱們在Spark源代碼目錄裏從README.md文本文件中建立一個新的RDD。
[plain]  view plain  copy
 
  1. scala> val textFile = sc.textFile("file:///home/hadoop/hadoop/spark/README.md")  
  2. 16/07/24 03:30:53 INFO storage.MemoryStore: ensureFreeSpace(217040) called with curMem=321016, maxMem=280248975  
  3. 16/07/24 03:30:53 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 212.0 KB, free 266.8 MB)  
  4. 16/07/24 03:30:53 INFO storage.MemoryStore: ensureFreeSpace(20024) called with curMem=538056, maxMem=280248975  
  5. 16/07/24 03:30:53 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 19.6 KB, free 266.7 MB)  
  6. 16/07/24 03:30:53 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:43303 (size: 19.6 KB, free: 267.2 MB)  
  7. 16/07/24 03:30:53 INFO spark.SparkContext: Created broadcast 2 from textFile at <console>:21  
  8. textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at textFile at <console>:21  
注意:1. 其中2~7行是日誌信息,暫且沒必要關注,主要看最後一行。以後的運行日誌信息將再也不貼出。用戶也能夠進入到spark目錄/conf文件夾下,此時有一個log4j.properties.template文件,咱們執行以下命令將其拷貝一份爲log4j.properties,並對log4j.properties文件進行修改。

 

 

[plain]  view plain  copy
 
  1. cp log4j.properties.template log4j.properties  
  2. vim log4j.properties  
以下圖所示,將INFO改成WARN,這樣就不輸出藍色部分的日誌信息:

 

log4j.properties修改

2. 另外,file:///home/hadoop/hadoop/spark/README.md,首部的file表明本地目錄,注意file:後有三個斜槓(/);中間紅色部分是個人spark安裝目錄,讀者可根據本身的狀況進行替換。es6

 

RDD的actions從RDD中返回值,transformations能夠轉換成一個新RDD並返回它的引用。下面展現幾個action:shell

 

[plain]  view plain  copy
 
  1. scala> textFile.count()  
  2. res0: Long = 98  
  3.   
  4. scala> textFile.first()  
  5. res1: String = # Apache Spark  
其中,count表明RDD中的總數據條數;first表明RDD中的第一行數據。

 

下面使用一個transformation,咱們將使用filter函數對textFile這個RDD進行過濾,取出包含字符串"Spark"的行,並返回一個新的RDD:apache

 

[plain]  view plain  copy
 
  1. scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))  
  2. linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:23  
固然也能夠把actions和transformations鏈接在一塊兒使用:

 

 

[plain]  view plain  copy
 
  1. scala> textFile.filter(line => line.contains("Spark")).count()  
  2. res2: Long = 19  
上面這條語句表示有多少行包括字符串"Spark"。

 

 

更多RDD操做vim

RDD actions和transformations能被用在更多的複雜計算中。好比想要找到一行中最多的單詞數量:緩存

 

[plain]  view plain  copy
 
  1. scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)  
  2. res3: Int = 14  
首先將行映射成一個整型數值產生一個新的RDD。在這個新的RDD上調用reduce找到行中最大的單詞數個數。map和reduce的參數是Scala的函數串(閉包),而且可使用任何語言特性或者Scala/Java類庫。例如,咱們能夠很方便地調用其餘的函數聲明。咱們使用Math.max()函數讓代碼更容易理解:

 

 

[plain]  view plain  copy
 
  1. scala> import java.lang.Math  
  2. import java.lang.Math  
  3.   
  4. scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))  
  5. res4: Int = 14  
你們都知道,Hadoop流行的一個通用數據流模式是MapReduce。Spark可以很容易地實現MapReduce:

 

 

[plain]  view plain  copy
 
  1. scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)  
  2. wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[8] at reduceByKey at <console>:24  
這裏,咱們結合了flatMap、map和reduceByKey來計算文件裏每一個單詞出現的數量,它的結果是包含一組(String, Int)鍵值對的RDD。咱們可使用collect操做收集單詞的數量:

 

 

[plain]  view plain  copy
 
  1. scala> wordCounts.collect()  
  2. res5: Array[(String, Int)] = Array((package,1), (For,2), (Programs,1), (processing.,1), (Because,1), (The,1), (cluster.,1), (its,1), ([run,1), (APIs,1), (have,1), (Try,1), (computation,1), (through,1), (several,1), (This,2), ("yarn-cluster",1), (graph,1), (Hive,2), (storage,1), (["Specifying,1), (To,2), (page](http://spark.apache.org/documentation.html),1), (Once,1), (application,1), (prefer,1), (SparkPi,2), (engine,1), (version,1), (file,1), (documentation,,1), (processing,,2), (the,21), (are,1), (systems.,1), (params,1), (not,1), (different,1), (refer,2), (Interactive,2), (given.,1), (if,4), (build,3), (when,1), (be,2), (Tests,1), (Apache,1), (all,1), (./bin/run-example,2), (programs,,1), (including,3), (Spark.,1), (package.,1), (1000).count(),1), (Versions,1), (HDFS,1), (Data.,1), (>...  

 

 

緩存

Spark支持把數據集緩存到內存之中,當要重複訪問時,這是很是有用的。舉一個簡單的例子:閉包

[plain]  view plain  copy
 
  1. scala> linesWithSpark.cache()  
  2. res6: linesWithSpark.type = MapPartitionsRDD[2] at filter at <console>:23  
  3.   
  4. scala> linesWithSpark.count()  
  5. res7: Long = 19  
  6.   
  7. scala> linesWithSpark.count()  
  8. res8: Long = 19  
  9.   
  10. scala> linesWithSpark.count()  
  11. res9: Long = 19  
首先緩存linesWithSpark數據集,而後重複訪問count函數返回的值。固然,咱們並不能察覺明顯的查詢速度變化,可是當在大型的數據集中使用緩存,將會很是顯著的提高相應的迭代操做速度。
相關文章
相關標籤/搜索