1、初識Spark和Hadoophtml
Apache Spark 是一個新興的大數據處理通用引擎,提供了分佈式的內存抽象。Spark 正如其名,最大的特色就是快(Lightning-fast),可比 Hadoop MapReduce 的處理速度快 100 倍。web
Hadoop實質上更可能是一個分佈式數據基礎設施: 它將巨大的數據集分派到一個由普通計算機組成的集羣中的多個節點進行存儲,意味着你不須要購買和維護昂貴的服務器硬件。shell
同時,Hadoop還會索引和跟蹤這些數據,讓大數據處理和分析效率達到史無前例的高度。Spark則是一個專門用來對那些分佈式存儲的大數據進行處理的工具,它並不會進行分佈式數據的存儲。apache
Hadoop除了提供爲你們所共識的HDFS分佈式數據存儲功能以外,還提供了叫作MapReduce的數據處理功能。因此咱們徹底能夠拋開Spark,使用Hadoop自身的MapReduce來完成數據的處理。vim
固然,Spark也不是非要依附在Hadoop身上才能生存。但它沒有提供文件管理系統,因此,它必須和其餘的分佈式文件系統進行集成才能運做。咱們能夠選擇Hadoop的HDFS,也能夠選擇其餘的基於雲的數據系統平臺。但Spark默認來講仍是被用在Hadoop上面的,畢竟你們都認爲它們的結合是最好的。緩存
2、安裝Spark
bash
如下都是在Ubuntu 16.04 系統上的操做
服務器
1.安裝Java JDK並配置好環境變量(這部分就不詳細說了)閉包
2.安裝Hadoop
app
2.1 建立Hadoop用戶:
打開終端,輸入命令:
sudo useradd -m hadoop -s /bin/bash
添加hadoop用戶,並設置/bin/bash做爲shell
2.2 設置Hadoop用戶的登陸密碼:
sudo passwd hadoop
而後根據系統提示輸入兩次本身的密碼,再給hadoop用戶添加管理員權限:
sudo adduser hadoop sudo
2.3 將當前的用戶切換到剛剛建立的hadoop用戶(屏幕右上角有個齒輪,點進去就看到)
2.4 更新系統的apt。打開終端,輸入命令:
sudo apt-get update
2.5 安裝ssh、配置ssh無密碼登陸
集羣、單節點模式都須要遇到SSH登陸。Ubuntu默認安裝了SSH Client,但須要本身安裝SSH Server:
sudo apt-get install openssh-server
安裝後,直接登陸本機
ssh localhost
SSH首次登陸須要確認,根據提示輸入:yes,而後再按提示輸入剛剛設置的hadoop的密碼,就登陸了。
2.6 下載Hadoop
下載地址:http://mirror.bit.edu.cn/apache/hadoop/common/
選擇「stable」文件夾,點擊下載「hadoop-2.x.y.tar.gz」文件。默認會下載到「下載」目錄中,
在該文件夾下打開終端,將該文件解壓到/usr/local文件中,執行命令:
sudo tar -zxf ~/hadoop-2.9.0.tar.gz cd /usr/local/ sudo mv ./hadoop-2.9.0/ ./hadoop #將文件名修改成hadoop sudo chown -R hadoop ./hadoop #修改文件權限
Hadoop的文件夾解壓以後就能夠直接使用,檢查一下Hadoop是否能夠正常使用,若是正常則顯示Hadoop的版本信息
cd /usr/local/hadoop ./bin/hadoop version
這裏就初步完成了Hadoop的安裝,還有不少配置什麼的用到的時候在寫,好比僞分佈式系統配置。
3.安裝Spark
3.1 下載Spark:http://spark.apache.org/downloads.html
第一項我選擇的版本是最新當前的最新版本:2.3.1,第二項選擇「Pre-build with user-provided Apache Hadoop」,而後點擊第三項後面的下載「spark-2.3.1-bin-without-hadoop-tgz」。
3.2 解壓文件
這一步與Hadoop的解壓是同樣的,咱們都把它解壓到/usr/local路徑下:
$ sudo tar -zxf ~/下載/spark-2.3.1-bin-without-hadoop.tgz -C /usr/local/ $ cd /usr/local $ sudo mv ./spark-2.3.1-bin-without-hadoop/ ./spark $ sudo chown -R hadoop:hadoop ./spark
3.3 設置環境變量
執行以下命令拷貝一個配置文件:
$ cd /usr/local/spark $ ./conf/spark-env.sh.template ./conf/spark-env.sh
而後編輯spark-env.sh:
$ vim ./conf/spark-env.sh
打開以後在這個文件的最後一行加上下面的內容:
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)
而後保存退出Vim,就可使用Spark了。
3、Spark入門示例
在文件路徑「/usr/local/spark/examples/src/main」中,咱們能夠找到spark自帶的一些實例,以下圖能夠看到Spark支持Scala、Python、Java、R語言等。
1.Spark最簡單的方式就是使用交互式命令行提示符。打開PySpark終端,在命令行中打出pyspark:
~$ pyspark
2.PySpark將會自動使用本地Spark配置建立一個SparkContext。咱們能夠經過sc變量來訪問它,來建立第一個RDD:
>>>text=sc.textFile(「file\\\usr\local\spark\exp\test1.txt") >>>print text
3.轉換一下這個RDD,來進行分佈式計算的「hello world」:「字數統計」
首先導入了add操做符,它是個命名函數,能夠做爲加法的閉包來使用。咱們稍後再使用這個函數。首先咱們要作的是把文本拆分爲單詞。咱們建立了一個tokenize函數,參數是文本片斷,返回根據空格拆分的單詞列表。而後咱們經過給flatMap操做符傳遞tokenize閉包對textRDD進行變換建立了一個wordsRDD。你會發現,words是個PythonRDD,可是執行本應該當即進行。顯然,咱們尚未把整個數據集拆分爲單詞列表。
4.將每一個單詞映射到一個鍵值對,其中鍵是單詞,值是1,而後使用reducer計算每一個鍵的1總數
>>> wc = words.map(lambda x: (x,1)) >>> print wc.toDebugString()
我使用了一個匿名函數(用了Python中的lambda關鍵字)而不是命名函數。這行代碼將會把lambda映射到每一個單詞。所以,每一個x都是一個單詞,每一個單詞都會被匿名閉包轉換爲元組(word, 1)。爲了查看轉換關係,咱們使用toDebugString方法來查看PipelinedRDD是怎麼被轉換的。可使用reduceByKey動做進行字數統計,而後把統計結果寫到磁盤。
5.使用reduceByKey動做進行字數統計,而後把統計結果寫到磁盤
>>> counts = wc.reduceByKey(add) >>> counts.saveAsTextFile("wc")
一旦咱們最終調用了saveAsTextFile動做,這個分佈式做業就開始執行了,在做業「跨集羣地」(或者你本機的不少進程)運行時,你應該能夠看到不少INFO語句。若是退出解釋器,你能夠看到當前工做目錄下有個「wc」目錄。每一個part文件都表明你本機上的進程計算獲得的被保持到磁盤上的最終RDD。
4、Spark數據形式
4.1 彈性分佈式數據集(RDD)
Spark 的主要抽象是分佈式的元素集合(distributed collection of items),稱爲RDD(Resilient Distributed Dataset,彈性分佈式數據集),它可被分發到集羣各個節點上,進行並行操做。RDDs 能夠經過 Hadoop InputFormats 建立(如 HDFS),或者從其餘 RDDs 轉化而來。
得到RDD的三種方式:
Parallelize:將一個存在的集合,變成一個RDD,這種方式試用於學習spark和作一些spark的測試
>>>sc.parallelize(['cat','apple','bat’])
MakeRDD:只有scala版本纔有此函數,用法與parallelize相似
textFile:從外部存儲中讀取數據來建立 RDD
>>>sc.textFile(「file\\\usr\local\spark\README.md」)
RDD的兩個特性:不可變;分佈式。
RDD支持兩種操做;Transformation(轉化操做:返回值仍是RDD)如map(),filter()等。這種操做是lazy(惰性)的,即從一個RDD轉換生成另外一個RDD的操做不是立刻執行,只是記錄下來,只有等到有Action操做是纔會真正啓動計算,將生成的新RDD寫到內存或hdfs裏,不會對原有的RDD的值進行改變;Action(行動操做:返回值不是RDD)會實際觸發Spark計算,對RDD計算出一個結果,並把結果返回到內存或hdfs中,如count(),first()等。
4.2 RDD的緩存策略
Spark最爲強大的功能之一即是可以把數據緩存在集羣的內存裏。這經過調用RDD的cache函數來實現:rddFromTextFile.cache,
調用一個RDD的cache函數將會告訴Spark將這個RDD緩存在內存中。在RDD首次調用一個執行操做時,這個操做對應的計算會當即執行,數據會從數據源裏讀出並保存到內存。所以,首次調用cache函數所須要的時間會部分取決於Spark從輸入源讀取數據所須要的時間。可是,當下一次訪問該數據集的時候,數據能夠直接從內存中讀出從而減小低效的I/O操做,加快計算。多數狀況下,這會取得數倍的速度提高。
Spark的另外一個核心功能是能建立兩種特殊類型的變量:廣播變量和累加器。廣播變量(broadcast variable)爲只讀變量,它由運行SparkContext的驅動程序建立後發送給會參與計算的節點。對那些須要讓各工做節點高效地訪問相同數據的應用場景,好比機器學習,這很是有用。Spark下建立廣播變量只需在SparkContext上調用一個方法便可:
>>> broadcastAList = sc.broadcast(list(["a", "b", "c", "d", "e"]))