Apache Spark是一個開源分佈式運算框架,最初是由加州大學柏克萊分校AMPLab所開發。html
Hadoop MapReduce的每一步完成必須將數據序列化寫到分佈式文件系統致使效率大幅下降。Spark儘量地在內存上存儲中間結果, 極大地提升了計算速度。java
MapReduce是一路計算的優秀解決方案, 但對於多路計算的問題必須將全部做業都轉換爲MapReduce模式並串行執行。python
Spark擴展了MapReduce模型,容許開發者使用有向無環圖(DAG)開發複雜的多步數據管道。而且支持跨有向無環圖的內存數據共享,以便不一樣的做業能夠共同處理同一個數據程序員
Spark不是Hadoop的替代方案而是其計算框架Hadoop MapReduce的替代方案。Hadoop更多地做爲集羣管理系統爲Spark提供底層支持。算法
Spark可使用本地Spark, Hadoop YARN或Apache Mesos做爲集羣管理系統。Spark支持HDFS,Cassandra, OpenStack Swift做爲分佈式存儲解決方案。sql
Spark採用Scala語言開發運行於JVM上,並提供了Scala,Python, Java和R語言API,可使用其中的Scala和Python進行交互式操做。shell
本文測試環境爲Spark 2.1.0, Python API.apache
彈性分佈式數據集(Resilient Distributed Dataset, RDD)是Saprk的基本數據結構, 表明能夠跨機器進行分割的只讀對象集合。api
RDD能夠由Hadoop InputFormats建立(好比HDFS上的文件)或者由其它RDD轉換而來, RDD一旦建立便不可改變。RDD操做分爲變換和行動兩種:緩存
變換(Transformation): 接受一個RDD做爲參數,返回一個新的RDD, 原RDD不變。
包括:map,filter,flatMap,groupByKey,reduceByKey,aggregateByKey,pipe以及coalesce
行動(Action): 接受一個RDD做爲參數, 進行查詢並返回一個值。
包括: reduce,collect,count,first,take,countByKey以及foreach
Spark的核心組件包括:
Spark Core: 核心功能, 提供RDD及其API和操做。
Spark SQL: 提供經過Apache Hive的SQL變體HiveQL與Spark進行交互的API。每一個數據表被當作一個RDD,Spark SQL查詢被轉換爲Spark操做。
Spark Streaming:容許對實時數據流進行處理和控制,park Streaming容許程序可以像普通RDD同樣處理實時數據。
MLlib:一個經常使用機器學習算法庫,算法被實現爲對RDD的Spark操做。這個庫包含可擴展的學習算法,好比分類、迴歸等須要對大量數據集進行迭代的操做
GraphX: 圖計算框架, GraphX擴展了RDD API,包含控制圖、建立子圖、訪問路徑上全部頂點的操做。
對於Linux和Mac用戶只須要在本地安裝java運行環境並在官網中下載Pre-built版本的壓縮包, 解壓縮以後便可以單機模式使用Spark。
進入解壓後的spark目錄, 其中包含一些腳本和二進制程序:
sbin
: 管理員命令目錄
spark-config.sh
將spark運行配置寫入環境變量spark-daemon.sh
在本地啓動守護進程spark-daemons.sh
在全部slave主機上啓動守護進程start-master.sh
啓動master進程start-slave.sh
在本地上啓動slave進程start-slaves.sh
根據conf/slaves配置文件在slave主機上啓動slave進程start-all.sh
啓動全部守護進程,啓動本地master進程, 根據conf/slaves啓動slave進程stop-all.sh
中止全部守護進程及其下的master/slave進程stop-master.sh
中止master進程stop-slave.sh
中止本地的slave進程stop-slaves.sh
中止全部slave進程bin
普通用戶工具目錄
pyspark
: python交互環境spark-shell
scala交互環境sparkR
R交互環境spark-submit
將Spark應用提交到集羣上運行spark-sql
spark-sql交互環境run-example
運行示例使用sbin/start-all.sh
啓動spark而後調用bin/pyspark
進入Python交互界面:
SparkSession和SparkContext初始化成功後, 能夠確認交互界面已正確啓動。
>>> txt = sc.textFile("README.md") >>> txt.count() 104
上述代碼中,sc是SparkContext的別名, 咱們根據"README.md"的內容建立了一個RDD並用count()
方法取出RDD中項目的數量。
bin/spark-submit
能夠將使用python編寫的Spark應用提交到集羣上運行。
咱們將上文中的示例寫成腳本, 與交互模式不一樣的是腳本須要手動進行一些配置:
from pyspark import SparkConf, SparkContext APP_NAME = "My Spark Application" MASTER_URL = "local[*]" conf = SparkConf().setAppName(APP_NAME) conf = conf.setMaster(MASTER_URL) sc = SparkContext(conf=conf) def main(sc): txt = sc.textFile("README.md") print(txt.count()) if __name__ == '__main__': main(sc)
保存爲my_test.py
, 使用spark-submit提交做業:
$ bin/spark-submit my_test.py 104
如今對上述代碼作一些說明。
APP_NAME
是應用的名稱由程序員自定義,MASTER_URL
用於指定集羣Master的位置:
URL | 含義 |
---|---|
local | 用一個worker線程本地運行Spark |
local[K] | 用k個worker線程本地運行Spark(一般設置爲機器核心數) |
local[*] | 用盡量多的worker線程本地運行Spark |
spark://HOST:PORT | 鏈接到給定的Spark獨立部署集羣master, 默認端口7077 |
mesos://HOST:PORT | 鏈接到給定的mesos集羣 |
yarn-client | 以client模式鏈接到Yarn集羣。集羣位置將基於經過HADOOP_CONF_DIR變量找到 |
yarn-cluster | 以cluster模式鏈接到Yarn集羣。羣集位置將基於經過HADOOP_CONF_DIR變量找到 |
並行集合(Parallelized collections)基於python可迭代對象(iterable)建立:
>>> data = [1,2,3,4] >>> para_data = sc.parallelize(data) >>> para_data ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:475 >>> para_data.reduce(lambda x, y: x+y) 10
RDD一旦建立便可以並行模式運算.
除了使用內部的iterable對象建立RDD外, 也可使用外部數據源建立RDD.
Spark 能夠從任何一個 Hadoop 支持的存儲源建立分佈式數據集,包括你的本地文件系統,HDFS,Cassandra,HBase等.
>>> src_uri = "README.md" >>> txt = sc.textFile(src_uri) >>> txt.count() 104
Spark支持textFile, SequenceFile和其它Hadoop InputFormat做爲外部數據源。
src_uri支持的協議包括hdfs://
, s3n://
和file://
等。直接填寫路徑則默認採用file://
即本地文件系統路徑.
若是src_uri使用本地文件系統路徑,文件必須能在 worker 節點上用相同的路徑訪問到。要麼複製文件到全部的 workers,要麼使用網絡共享文件系統.
Spark採用惰性求值的機制進行運算, 咱們用一個簡單的例子說明Spark的運算過程:
lines = sc.textFile("data.txt") lineLengths = lines.map(lambda s: len(s)) totalLength = lineLengths.reduce(lambda a, b: a + b)
第一行從外部數據集建立了一個名爲lines的RDD, lines只是一個指針文件內容沒有真的被讀入內存。
第二行執行了map操做, 一樣的lineLength並無被當即求值。
第三行執行了reduce操做, Spark 把計算分紅多個任務(task),而且讓它們運行在多個機器上。每臺機器都運行本身的 map 部分和本地 reduce 部分, 並把結果返回給Master。
前文已經說明transformation是根據RDD建立新的RDD的操做,這裏將說明一些經常使用的操做,更多內容請參見官方文檔.
rdd.map(func): 將數據源的每一個元素傳遞給func函數, 獲得func的返回值組成新RDD
在示例lines.map(lambda s: len(s))
中lines的元素類型爲str, map函數將其映射爲長度元素長度的集合。
>>> r = sc.parallelize([1,2,3,4]).flatMap(lambda x: [x, x+1]) >>> r.collect() # show all elements >>> [1, 2, 2, 3, 3, 4, 4, 5]
rdd.filter(func): 將數據源的每一個元素傳遞給func函數, 使func返回True的元素加入到結果RDD中
rdd1.union(rdd2): 求rdd1與rdd2的並集
rdd1.intersection(rdd2): 求rdd1和rdd2的交集
rdd.distinct(): 返回去除重複元素後的rdd
action是對RDD進行查詢並返回單個元素的操做, 這裏將說明一些經常使用的操做,更多內容請參見官方文檔。
rdd.reduce(func): func是接受兩個參數並返回一個值的函數, reduce使用func對集合進行聚合。
這個過程能夠理解爲從集合中任取兩個元素傳給func, 而後將返回值加入集合中並刪除兩個參數, 反覆迭代直至集合只有一個元素, 該元素即爲最後的返回值。
示例lineLengths.reduce(lambda a, b: a + b)
中, reduce函數對RDD內全部函數進行了求和。
rdd.collect(): 以python list的形式返回集合中全部元素
rdd.first(): 返回集合中第一個元素, 對集合不產生影響
rdd.take(n): 返回集合中前n個元素組成的list, 下標從1開始
rdd.count(): 返回集合中元素的數目
rdd.foreach(func): func是接受一個參數的函數, 對集合中每一個元素調用func函數, foreach返回None
上文中的RDD對元素的類型是基本沒有限制的, 相似於python內置的list(其實更相似於ORM的查詢集)。RDD在使用二元組做爲元素時, Spark會將二元組做爲一個鍵值對處理, 二元組的第一個元素被認爲是鍵, 第二個元素認爲是值。
元素爲二元組的RDD仍然可使用普通RDD的操做,Spark也爲這類RDD定義了一些基於鍵值對的操做:
groupByKey():將key相同的鍵值對合併爲一個鍵值對: (key,[val, val, ...])
reduceByKey(func): 對key相同的鍵值對應用func進行聚合: (key,RDD<val>.reduce(func))
rdd.sortByKey(ascending=True): 按key對鍵值對進行排序,默認爲升序
rdd1.join(rdd2): 對兩個鍵值對形式的rdd進行合併,(k, v)和(k,w)將被合併爲(k, (v,w))
countByKey(): 返回每一個鍵對應鍵值對的個數(key, count), 返回值爲dict而非RDD.
RDD持久化是Spark的一個重要功能, 上文已經說起Spark提供了持久化到內存的功能極大的提升了運算速度, 也是Spark比Hadoop MapReduce更先進的緣由之一。
rdd.persist([level])能夠根據指定等級執行持久化:
>>> from pyspark import StorageLevel >>> r.persist(StorageLevel.MEMORY_ONLY) PythonRDD[16]
Spark支持的持久化級別包括:
MEMORY_ONLY: 將RDD做爲java對象存儲在JVM中,若RDD的某部分沒法做爲java對象存儲,則不對該部分進行緩存。默認緩存級別。
MEMORY_AND_DISK: 將RDD做爲java對象存儲在JVM中,若RDD的某部分沒法做爲java對象存儲,則將該部分用pickle序列化後緩存到磁盤上。
MEMORY_ONLY_SER: 將RDD序列化後做爲java byte[]存儲在內存中,不合適的分區不緩存。 比較節省內存可是消耗時間
MEMORY_AND_DISK_SER: 將RDD序列化後做爲java byte[]存儲在內存中,不合適的分區序列化後存儲到磁盤上
DISK_ONLY: 序列化後僅存儲在磁盤上
MEMORY_ONLY_2, MEMORY_AND_DISK_2等: 與上述存儲級別相似, 不過是存儲到兩個節點上
OFF_HEAP: 將RDD序列化後緩存到分佈式內存存儲Tachyon上
Spark官方文檔給出了一些關於選擇存儲級別的建議:
若是你的RDD適合默認的存儲級別(MEMORY_ONLY),就選擇默認的存儲級別。由於這是cpu利用率最高的選項,會使RDD上的操做盡量的快。
若是不適合用默認的級別,選擇MEMORY_ONLY_SER。選擇一個更快的序列化庫提升對象的空間使用率,可是仍可以至關快的訪問。
除非函數計算RDD的花費較大或者它們須要過濾大量的數據,不要將RDD存儲到磁盤上,不然,重複計算一個分區就會和重磁盤上讀取數據同樣慢。
若是你但願更快的錯誤恢復,能夠利用重複(replicated)存儲級別。全部的存儲級別均可以經過重複計算丟失的數據來支持完整的容錯,可是重複的數據可以使你在RDD上繼續運行任務,而不須要重複計算丟失的數據。
Spark提供了rdd.cache()
方法, 它與rdd.persist(StorageLevel.MEMORY_ONLY)
功能相同。
Spark自動的監控每一個節點緩存的使用狀況,利用最近最少使用原則刪除老舊的數據。rdd.unpersist()
能夠手動刪除緩存。
一個傳遞給Spark操做(例如map和reduce)的函數在遠程節點上面運行時,Spark操做實際上操做的是這個函數所用變量的一個獨立副本。
這些變量被複制到每臺機器上,而且這些變量在遠程機器上 的全部更新都不會傳遞迴驅動程序,一般這種跨任務的讀寫變量是低效的。
Spark提供了兩個共享變量: 廣播變量(broadcast variable)和累加器(accumulator)進行跨任務共享。
廣播變量在每臺機器上面緩存一個只讀變量,而不是每一個任務保存一個副本。Spark也嘗試着利用有效的廣播算法去分配廣播變量,以減小通訊的成本。
>>> broadcastVar = sc.broadcast([1, 2, 3]) >>> broadcastVar.value [1, 2, 3]
廣播變量建立後咱們可使用它代替原變量,其操做與RDD基本相同。
累加器特性與廣播變量相似, 另外定義了add方法用於累加。
>>> accum = sc.accumulator(0) >>> accum Accumulator<id=0, value=0> >>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x)) >>> accum.value 10
累加器默認使用python內置int類型計數, 咱們能夠自定義計數類型。一般自定義類型爲多維向量,用來進行復雜計數。