Spark2.1.0——Spark初體驗

      學習一個工具的最好途徑,就是使用它。這就比如《極品飛車》玩得好的同窗,未必真的會開車,要學習車的駕駛技能,就必須用手觸摸方向盤、用腳感覺剎車與油門的力道。在IT領域,在深刻了解一個系統的原理、實現細節以前,應當先準備好它的運行環境或者源碼閱讀環境。若是能在實際環境下安裝和運行Spark,顯然可以提高讀者對於Spark的一些感覺,對系統能有個大致的印象,有經驗的工程師甚至可以猜出一些Spark在實現過程當中採用的設計模式、編程模型。html

      考慮到大部分公司在開發和生產環境都採用Linux操做系統,因此筆者選用了64位的Linux。在正式安裝Spark以前,先要找臺好機器。爲何?由於筆者在安裝、編譯、調試的過程當中發現Spark很是耗費內存,若是機器配置過低,恐怕會跑不起來。Spark的開發語言是Scala,而Scala須要運行在JVM之上,於是搭建Spark的運行環境應該包括JDK和Scala。java

      本文只介紹最基本的與Spark相關的準備工做,至於Spark在實際生產環境下的配置,則須要結合具體的應用場景進行準備。linux

安裝JDK

      自Spark2.0.0版本開始,Spark已經準備放棄對Java 7的支持,因此咱們須要選擇Java 8。咱們還須要使用命令getconf LONG_BIT查看linux機器是32位仍是64位,而後下載相應版本的JDK並安裝。shell

下載地址:apache

http://www.oracle.com/technetwork/java/javase/downloads/index.html編程

配置環境:vim

cd ~
vim .bash_profile

添加以下配置:設計模式

exportJAVA_HOME=/opt/java
exportPATH=$PATH:$JAVA_HOME/bin
exportCLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

輸入如下命令使環境變量快速生效:bash

source .bash_profile

安裝完畢後,使用java –version命令查看,確認安裝正常,如圖1所示。架構

圖1 查看java安裝是否正常

安裝Scala

      因爲從Spark 2.0.0開始,Spark默認使用Scala 2.11來編譯、打包,再也不是之前的Scala 2.10,因此咱們須要下載Scala 2.11。

    下載地址:

    http://www.scala-lang.org/download/

選擇Scala 2.11的版本進行下載,下載方法以下:

wget https://downloads.lightbend.com/scala/2.11.8/scala-2.11.8.tgz

移動到選好的安裝目錄,例如:

mv scala-2.11.8.tgz~/install/

進入安裝目錄,執行如下命令:

chmod 755scala-2.11.8.tgz
tar -xzvfscala-2.11.8.tgz 

配置環境:

cd ~
vim .bash_profile

添加以下配置:

export SCALA_HOME=$HOME/install/scala-2.11.8
export PATH=$SCALA_HOME/bin:$PATH

輸入如下命令使環境變量快速生效:

source .bash_profile

安裝完畢後鍵入scala,進入scala命令行以確認安裝正常,如圖2所示。

圖2 進入Scala命令行

安裝Spark

      Spark進入2.0時代以後,目前一共有兩個大的版本:一個是2.0.0,一個是2.1.0。本書選擇2.1.0。

下載地址:

http://spark.apache.org/downloads.html

下載方法以下:

wget http://d3kbcqa49mib13.cloudfront.net/spark-2.1.0-bin-hadoop2.6.tgz

移動到選好的安裝目錄,如:

mv spark-2.1.0-bin-hadoop2.6.tgz~/install/

進入安裝目錄,執行如下命令:

chmod 755 spark-2.1.0-bin-hadoop2.6.tgz
tar -xzvf spark-2.1.0-bin-hadoop2.6.tgz

配置環境:

cd ~
vim .bash_profile

添加以下配置:

    export SPARK_HOME=$HOME/install/spark-2.1.0-bin-hadoop2.6
    export PATH=$SPARK_HOME/bin:$PATH

輸入如下命令使環境變量快速生效:

source .bash_profile

安裝完畢後鍵入spark-shell,進入scala命令行以確認安裝正常,如圖3所示。

圖3 執行spark-shell進入Scala命令行

既然已經介紹瞭如何準備好基本的Spark運行環境,如今是時候實踐一下,以便於在使用過程當中提高讀者對於Spark最直接的感觸!本文經過Spark的基本使用,讓讀者對Spark能有初步的認識,便於引導讀者逐步深刻學習。

運行spark-shell

  在《Spark2.1.0——運行環境準備》一文曾經簡單運行了spark-shell,並用下圖進行了展現(此處再次展現此圖)。

圖4    執行spark-shell進入Scala命令行

圖4中顯示了不少信息,這裏進行一些說明:

  • 在安裝完Spark 2.1.0後,若是沒有明確指定log4j的配置,那麼Spark會使用core模塊的org/apache/spark/目錄下的log4j-defaults.properties做爲log4j的默認配置。log4j-defaults.properties指定的Spark日誌級別爲WARN。用戶能夠到Spark安裝目錄的conf文件夾下從log4j.properties.template複製一份log4j.properties文件,並在其中增長本身想要的配置。
  • 除了指定log4j.properties文件外,還能夠在spark-shell命令行中經過sc.setLogLevel(newLevel)語句指定日誌級別。
  • SparkContext的Web UI的地址是:http://192.168.0.106:4040。192.168.0.106是筆者安裝Spark的機器的ip地址,4040是SparkContext的Web UI的默認監聽端口。
  • 指定的部署模式(即master)爲local[*]。當前應用(Application)的ID爲local-1497084620457。
  • 能夠在spark-shell命令行經過sc使用SparkContext,經過spark使用SparkSession。sc和spark實際分別是SparkContext和SparkSession在Spark REPL中的變量名,具體細節已在《Spark2.1.0——剖析spark-shell》一文有過度析。

  因爲Spark core的默認日誌級別是WARN,因此看到的信息不是不少。如今咱們將Spark安裝目錄的conf文件夾下的log4j.properties.template以以下命令複製出一份: 

cp log4j.properties.template log4j.properties

並將log4j.properties中的log4j.logger.org.apache.spark.repl.Main=WARN修改成log4j.logger.org.apache.spark.repl.Main=INFO,而後咱們再次運行spark-shell,將打印出更豐富的信息,如圖5所示。

圖5  Spark啓動過程打印的部分信息

從圖5展現的啓動日誌中咱們能夠看到SecurityManager、SparkEnv、BlockManagerMasterEndpoint、DiskBlockManager、MemoryStore、SparkUI、Executor、NettyBlockTransferService、BlockManager、BlockManagerMaster等信息。它們是作什麼的?剛剛接觸Spark的讀者只須要知道這些信息便可,具體內容將在後邊的博文給出。

執行word count

      這一節,咱們經過word count這個耳熟能詳的例子來感覺下Spark任務的執行過程。啓動spark-shell後,會打開Scala命令行,而後按照如下步驟輸入腳本:

步驟1    

      輸入val lines =sc.textFile("../README.md", 2),以Spark安裝目錄下的README.md文件的內容做爲word count例子的數據源,執行結果如圖6所示。

圖6   步驟1執行結果

圖6告訴咱們lines的實際類型是MapPartitionsRDD。

步驟2

       textFile方法對文本文件是逐行讀取的,咱們須要輸入val words =lines.flatMap(line => line.split(" ")),將每行文本按照空格分隔以獲得每一個單詞,執行結果如圖7所示。

圖7   步驟2執行結果

圖7告訴咱們lines在通過flatMap方法的轉換後獲得的words的實際類型也是MapPartitionsRDD。

步驟3

     對於獲得的每一個單詞,經過輸入val ones = words.map(w => (w,1)),將每一個單詞的計數初始化爲1,執行結果如圖8所示。

圖8   步驟3執行結果

圖8告訴咱們words在通過map方法的轉換後獲得的ones的實際類型也是MapPartitionsRDD。

步驟4

    輸入val counts = ones.reduceByKey(_ + _),對單詞進行計數值的聚合,執行結果如圖9所示。

圖9   步驟4執行結果

圖9告訴咱們ones在通過reduceByKey方法的轉換後獲得的counts的實際類型是ShuffledRDD。

步驟5

       輸入counts.foreach(println),將每一個單詞的計數值打印出來,做業的執行過程如圖10和圖11所示。做業的輸出結果如圖12所示。

圖10   步驟5執行過程第一部分

圖11  步驟5執行過程第二部分

圖10和圖11展現了不少做業提交、執行的信息,這裏挑選關鍵的內容進行介紹:

  • SparkContext爲提交的Job生成的ID是0。
  • 一共有四個RDD,被劃分爲ResultStage和ShuffleMapStage。ShuffleMapStage的ID爲0,嘗試號爲0。ResultStage的ID爲1,嘗試號也爲0。在Spark中,若是Stage沒有執行完成,就會進行屢次重試。Stage不管是首次執行仍是重試都被視爲是一次Stage嘗試(Stage Attempt),每次Attempt都有一個惟一的嘗試號(AttemptNumber)。
  • 因爲Job有兩個分區,因此ShuffleMapStage和ResultStage都有兩個Task被提交。每一個Task也會有屢次嘗試,於是也有屬於Task的嘗試號。從圖中看出ShuffleMapStage中的兩個Task和ResultStage中的兩個Task的嘗試號也都是0。
  • HadoopRDD則用於讀取文件內容。

圖12  步驟5輸出結果

 

圖12展現了單詞計數的輸出結果和最後打印的任務結束的日誌信息。

       本文介紹的word count例子是以SparkContext的API來實現的,讀者朋友們也能夠選擇在spark-shell中經過運用SparkSession的API來實現。

有了對Spark的初次體驗,下面能夠來分析下spark-shell的實現原理了,請看——《Spark2.1.0——剖析spark-shell》

想要對Spark源碼進行閱讀的同窗,能夠看看《Spark2.1.0——代碼結構及載入Ecplise方法》

關於《Spark內核設計的藝術 架構設計與實現》

通過近一年的準備,基於Spark2.1.0版本的《Spark內核設計的藝術 架構設計與實現》一書現已出版發行,圖書如圖:
 
紙質版售賣連接以下:
相關文章
相關標籤/搜索