來自官網的Spark Programming Guide,包括我的理解的東西。node
這裏有一個疑惑點,pyspark是否支持Python內置函數(list、tuple、dictionary相關操做)?思考加搜索查詢以後是這麼考慮的:要想在多臺機器上分佈式處理數據,首先須要是spark支持的數據類型(要使用spark的文件I/O接口來讀取數據),pyspark主要是Dataframe;而後須要用到spark的API。原本spark是支持Python的C語言開發的庫包,那麼Python的內置函數都是能夠運行的,可是要想實現分佈式處理,提升計算效率,在涉及到數據分發處理時要使用spark的transformation和action。潛臺詞是非分佈式處理的操做能夠用內置函數。是這樣的吧?python
RDD是spark中最重要的抽象概念(數據結構),是集羣中各節點上並行處理的分隔元素的集合(彙總),總會用到collect()方法。程序員
RDD能夠從Hadoop文件系統中的文件建立,也能夠從執行程序中的Scala集合中建立或轉換。spark能夠在內存中留存一份RDD,方便在並行運算中高效重用。shell
還有個抽象概念,共享變量。spark在不一樣的節點並行執行任務集時,須要把每一個變量的副本傳送一份到每一個任務中,有時候變量須要在任務中共享。數組
共享變量有兩種:廣播變量(Broadcast Variables)和累加器(Accumulators)。前者緩存在全部節點的內存中,後者用來疊加計數或求和。緩存
Spark2.2.0可使用標準的CPython接口,故C庫如Numpy可使用,Pandas亦可。數據結構
1)spark程序的第一件事是建立一個spark上下文對象,其中,先要配置本身的應用信息。閉包
from pyspark import SparkContext, SparkConf conf = SparkConf().setAppName('myFirstAPP').setMaster('local[*]') sc = SparkContext(conf=conf)
data = [1, 2, 3, 4, 5] distData = sc.parallelize(data)
#SparkContext.parallelize()用於將本地Python集合分佈式處理爲RDD格式,以便並行處理。能夠設置分隔的數量,如sc.parallelize(data,6)
#即,要想並行處理,數據必需要是RDD或DataSets或DataFrame格式。數據轉換成這些格式後,就可使用C庫包來進行其餘運算操做。
2)外部文件,spark支持文本文件、序列文件及其餘Hadoop輸入格式。分佈式
distFile = sc.textFile("data.txt") #文本文件,以行集合的格式讀取 distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b) #textFile可以使用DataSets的操做
3)RDD操做:兩種操做Transformation(從現存數據集中建立新的數據集DataSets)和Action(執行運算後將值返回給執行程序)。好比,map是transformation,reduce是action。ide
全部的transformation都是‘懶的’,只記憶並不執行,只有當action須要返回值給執行程序時才執行計算,這樣spark能夠更高效。這樣只會返回reduce結果,而沒有龐大的map數據集。
可是,若是有多個reduce,那麼每次都要從新map,解決方法是:能夠經過persist
(or cache
)方法將RDD留存在內存中。
lines = sc.textFile("data.txt") #此處只建立一個指針 lineLengths = lines.map(lambda s: len(s)) #此處未計算 lineLengths.persist() #留存,以重用 totalLength = lineLengths.reduce(lambda a, b: a + b) #此處開始計算,只返回計算結果。任務在多臺機器上運行,每臺機器只負責本身的map部分及本地reduce,並返回本身的值給執行程序。
4)傳遞函數給spark:lambda表達式(不支持多語句,且要求有返回值),本地自定義def(適用於長代碼),模塊的Top-level函數。
"""MyScript.py""" if __name__ == "__main__": def myFunc(s): words = s.split(" ") return len(words) #分隔後返回長度。s是文件數據,下面的textFile是RDD #關於if __name__=="__main__"這種寫法的用處,前面必然定義了一些函數,那麼只在本程序中執行時運行該段代碼,載入到其餘程序時,就能夠只用所定義的函數,而不會執行該段代碼 sc = SparkContext(...) sc.textFile("file.txt").map(myFunc)
注意:若是建立新的MyClass並調用doStuff()時,須要調用self.field,這樣就須要把整個對象傳送到集羣中。把field複製到本地變量中可避免該狀況。
class MyClass(object): def __init__(self): self.field = "Hello" def doStuff(self, rdd): return rdd.map(lambda s: self.field + s) def doStuff(self, rdd): #複製field到本地變量 field = self.field return rdd.map(lambda s: field + s)
5)理解閉包:全局變量須要聚合時,建議使用Accumulator(累加器)。
counter = 0 rdd = sc.parallelize(data) # Wrong: Don't do this!! def increment_counter(x): global counter counter += x rdd.foreach(increment_counter) print("Counter value: ", counter)
本地模式(使用相同的JVM)時可能能夠執行,但集羣模式就不會如預期般執行。執行以前,spark會計算任務的(序列化)閉包(對每一個執行器均可見的變量或方法),但counter變量傳遞給執行器的是副本(copies),當foreach方法引用counter時,這已經不是執行節點的counter,而是工做節點的counter,那麼最終counter可能仍是0。
執行節點(driver node)執行程序存在的地方,工做節點(work node)把任務分發到集羣中的地方。
此外,想要使用rdd.foreach(println)
或rdd.map(println)
打印時,並不能實現預期效果。由於閉包模式中,stdout在工做節點的執行器中,並不在執行節點,故須要先使用collect()將全部元素彙總到執行節點。但把全部元素彙總到一臺機器上可能會內存溢出,解決方法是take()
: rdd.take(100).foreach(println),
只打印部分元素。
6)用鍵值對進行操做:reduceByKey,sortByKey。鍵值對可以使用Python內建的tuple輕鬆得到。
lines = sc.textFile("data.txt") pairs = lines.map(lambda s: (s, 1)) counts = pairs.reduceByKey(lambda a, b: a + b) #統計該文件每行的值出現幾回,大概有重複的行 counts.collect()
7)常見transformation和action。列出經常使用操做,知道都能實現哪些功能。
Transformation:
map(func) | 通過func映射後,返回新的分佈式數據集 |
filter(func) | 返回新的數據集,由func爲True時的元素的組成。過濾 |
flatMap(func) | 類map,但每一個輸入項可映射到0或多個輸出項,故func返回的是個序列 |
mapPartitions(func) | 類map,在RDD的每一個分區上分別執行,那麼func的類型必須是迭代器Iterator<T> => Iterator<U> |
mapPartitionsWithIndex(func) | func提供整型值來表示分區的index,func的類型(Int, Iterator<T>) => Iterator<U> |
sample(withReplacement, fraction, seed) | 採樣數據的fraction部分,可替換可不替換,隨機數種子 |
union(otherDataset) | 返回新的數據集,包括源數據和其餘數據的元素,聯合 |
intersection(otherDataset) | 插入 |
distinct([numTasks])) | 去重 |
groupByKey([numTasks]) | 分組,note:若分組後要聚合,那麼直接使用reduceByKey()或aggregateByKey()效率更高。任務數可選 |
reduceByKey(func, [numTasks]) | 聚合 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 聚合 |
sortByKey([ascending], [numTasks]) | 排序 |
join(otherDataset, [numTasks]) | 鏈接, (K, V) and (K, W)->(K,(V,W))。外鏈接leftOuterJoin , rightOuterJoin , and fullOuterJoin |
cogroup(otherDataset, [numTasks]) | (K, V) and (K, W)->(K, (Iterable<V>, Iterable<W>)) tuples |
cartesian(otherDataset) | 用於T和U類型RDD時,返回(T, U)對(類型鍵值對RDD)。笛卡爾的(笛卡爾乘積?) |
pipe(command, [envVars]) | 經過shell命令管道處理每一個RDD分片 |
coalesce(numPartitions) | 減小分片數,適用於大的數據集過濾後 |
repartition(numPartitions) | 從新分片,生成多的或少的分片數 |
repartitionAndSortWithinPartitions(partitioner) | 從新分片並排序,若是重分片後須要排序,那麼直接使用該函數 |
Action:
reduce(func) | 使用func聚合元素,(兩個參數,而後返回一個結果),要求func是可交換、可組合的(加法交換律、結合律?),以便並行處理 |
collect() | 返回數據集的全部元素,做爲執行程序的數組 |
count() | 返回數據集的元素數 |
first() | 返回數據集的第一個元素 |
take(n) | 返回前n個元素組成的數組 |
takeSample(withReplacement, num, [seed]) | 返回隨機採樣的num個元素組成的數組 |
takeOrdered(n, [ordering]) | 返回排序後的前n個元素,天然順序或自定義比較器 |
saveAsTextFile(path) | 把數據集的元素做爲TextFile寫入到指定路徑。spark會對每一個元素調用toString,將其轉換爲文件中的一行文本 |
saveAsSequenceFile(path) (Java and Scala) |
將數據集的元素保存到序列文件中 |
saveAsObjectFile(path) (Java and Scala) |
將數據集的元素使用Java的序列化特性寫到文件中 |
countByKey() | 只適用於鍵值對RDD,返回哈希映射(key,int),對每一個key計數 |
foreach(func) | 對數據集的每一個元素執行func。適用於帶反作用的操做,如更新累加器或與外部存儲系統交互 |
8)洗牌(Shuffle)操做:包括重分片操做(repartition和coalesce),ByKey操做(reduceByKey、groupByKey、sortByKey,除去countByKey),鏈接操做(cogroup和join)
好比reduceByKey(),須要按照某個能夠去reduce時,同一個能夠可能在不一樣的分片或者不一樣的機器上,那麼每一個分片執行以後,須要從每一個分片讀數據而後計算出最終的結果,這個過程就是洗牌。
9)共享變量(broadcast變量 and accumulators)
若是spark操做額函數是在遠程集羣節點上運行,那麼函數所用到的全部變量都會分發一個副本到每臺機器上,可是這些副本的修改(操做結果)並不能反饋回到執行程序(若是是原始變量的引用就能夠修改原始變量)。那麼多任務之間共享變量就是無效的。so,spark提供了兩個限制類型的共享變量:廣播變量和累加器。看具體用在什麼場景:
一、廣播變量:容許在每臺機器上緩存只讀變量,好比給每一個節點一個大型輸入集的副本。顯式地建立廣播變量僅適用於跨多階段須要相同數據的任務或者以非序列化的形式緩存數據。
使用SparkContext.broadcast(v)建立廣播變量。
>>> broadcastVar = sc.broadcast([1, 2, 3]) <pyspark.broadcast.Broadcast object at 0x102789f10> >>> broadcastVar.value [1, 2, 3]
二、累加器,只適用於在可交換、可結合的操做中去疊加。好比計數或是加和。spark自然支持數值類型,程序員也能夠自行添加新的類型。
使用SparkContext.accumulator(v)建立累加器。
>>> accum = sc.accumulator(0) >>> accum Accumulator<id=0, value=0> >>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x)) ... 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s >>> accum.value 10