2、SparkContext
SparkContext 的做用
用Python來鏈接Spark:可使用RD4s並經過庫Py4j來實現。python
PySpark Shell將Python API連接到Spark Core並初始化Spark Context。(SparkContext是Spark應用程序的核心)git
1.Spark Context設置內部服務並創建到Spark執行環境的鏈接。redis
2.驅動程序中的Spark Context對象協調全部分佈式進程並容許進行資源分配。sql
3.集羣管理器執行程序,它們是具備邏輯的JVM進程。編程
4.Spark Context對象將應用程序發送給執行者。api
5.Spark Context在每一個執行器中執行任務。數組
SparkContext 初始化
統計帶有字符「a」或「b」的行數。緩存
from pyspark import SparkContext sc = SparkContext("local", "first app") logFile = "file:///home/hadoop/spark-2.1.0-bin-hadoop2.7/README.md" logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count() numBs = logData.filter(lambda s: 'b' in s).count() print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
3、SparkConf
SparkConf包含了Spark集羣配置的各類參數。
from pyspark import SparkConf, SparkContext conf = SparkConf().setAppName("PySpark App").setMaster("spark://master:7077") sc = SparkContext(conf=conf)
如下是SparkConf最經常使用的一些屬性
-
-
set(key,value) - 設置配置屬性。
-
setMaster(value) - 設置主URL。
-
setAppName(value) - 設置應用程序名稱。
-
get(key,defaultValue = None) - 獲取密鑰的配置值。
-
setSparkHome(value) - 在工做節點上設置Spark安裝路徑。
-
RDD建立
1、加載文件
.textFile()方法從三個方式讀取內容:HDFS, LOCAL, S3
Local
讀取本地文件,生成一個RDD (就是lines)。
# RDD lines變量
lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt") # 三個斜槓
lines.foreach(print)
lines.first()
lines.saveAsTextFile("...") # 把RDD寫入到文本文件中
HDFS
如下三個等價。爲了區別本地讀取,讀方式採用了三斜槓。
lines = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt") lines = sc.textFile("/user/hadoop/word.txt") lines = sc.textFile("word.txt")
lines.saveAsTextFile("writeback") # 把RDD寫入到HDFS文件中
addFile函數
原文連接:https://blog.csdn.net/guohecang/article/details/52095387
在Apache Spark中,您可使用 sc.addFile 上傳文件(sc是您的默認SparkContext),並使用 SparkFiles.get 獲取工做者的路徑。
咱們在使用Spark的時候有時候須要將一些數據分發到計算節點中。
(1)一種方法是將這些文件上傳到HDFS上,而後計算節點從HDFS上獲取這些數據。
(2)咱們也可使用addFile函數來分發這些文件。
注意,若是是spark程序經過yarn集羣上加載配置文件,path必須是集羣hdfs的絕對路徑,如:viewfs://58-cluster//home/hdp_lbg_supin/resultdata/zhaopin/recommend/config/redis.properties。
from pyspark import SparkContext from pyspark import SparkFiles finddistance = "/home/hadoop/examples_pyspark/finddistance.R" finddistancename = "finddistance.R" sc = SparkContext("local", "SparkFile App") sc.addFile(finddistance) print "Absolute Path -> %s" % SparkFiles.get(finddistancename)
2、經過並行集合列表建立RDD
每一個元素i至關與一行。
array = [1,2,3,4,5] rdd = sc.parallelize(array) rdd.foreach(print)
序列化
序列化用於Apache Spark的性能調優。
經過 "網絡發送" 或 "寫入磁盤" 或 "持久存儲在內存中" 的全部數據都應序列化。
PySpark支持用於性能調優的自定義序列化程序。
from pyspark.context import SparkContext from pyspark.serializers import MarshalSerializer sc = SparkContext("local", "serialization app", serializer = MarshalSerializer()) print(sc.parallelize(list(range(1000))).map(lambda x: 2 * x).take(10)) sc.stop()
RDD操做
[Transformation操做]
對一個數據爲{1, 2, 3, 3}的RDD進行基本的RDD轉化操做
對數據分別爲{1, 2, 3}和{3, 4, 5}的RDD進行鍼對兩個RDD的轉化操做
[Action操做]
對一個數據爲{1, 2, 3, 3}的RDD進行基本的RDD行動操做
1、"轉換" 操做
filter(func)
lines = sc.textFile("file:///<path>") linesWithSpark = lines.filter(lambda line: "Spark" in lines) linesWithSpark.foreach(print)
map(func)
data = [1,2,3,4,5] rdd1 = sc.parallelize(data) rdd2 = rdd1.map(lambda x: x+10) rdd2.foreach(print)
flatMap
所謂flat,就是最後要的是 「單詞的集合」。
lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt") words = lines.flatMap(lambda line:line.split(" "))
流程本質上就是:[[...], [...], [...]] --> flat降維 --> [...]
groupByKey
根據key把value歸併起來。
words = sc.parallelize([("Hadoop",1), ("is",1), ...) words1 = words.groupByKey() words1.foreach(print)
下圖左邊的輸入,能夠經過map(lambda word: (word,1))來得到。
reduceByKey
進一步地,直接將groupByKey的values通過reduce處理後可變爲一個值。
2、"行動" 操做
惰性機制。
rdd = sc.parallelize([1,2,3,4,5]) rdd.count() rdd.take(3) # 以數組的形式返回數據集中的前n個元素 rdd.reduce(lambda a,b:a+b) rdd.collect() # 以數組的形式返回數據集中的全部元素 rdd.foreach(lambda elem:print(elem))
3、RDD常見操做
(1) RDD表明 Resilient Distributed Dataset,它們是在多個節點上運行和操做以在集羣上進行並行處理的元素。
from pyspark import SparkContext sc = SparkContext("local", "count app") words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] )
方法測試:
>>> words_filter = words.filter(lambda x: 'spark' in x) >>> filtered = words_filter.collect() >>> print("Fitered RDD -> %s" % (filtered)) Fitered RDD -> ['spark', 'spark vs hadoop', 'pyspark', 'pyspark and spark'] >>> words_map = words.map(lambda x: (x, 1)) >>> mapping = words_map.collect() >>> print("Key value pair -> %s" % (mapping)) Key value pair -> [('scala', 1), ('java', 1), ('hadoop', 1), ('spark', 1), ('akka', 1), ('spark vs hadoop', 1), ('pyspark', 1), ('pyspark and spark', 1)] >>> words.cache() ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195
>>> caching = words.persist().is_cached >>> print("Words got chached > %s" % (caching)) Words got chached > True
(2) 這個也是很是相似與python中的api。
from pyspark import SparkContext from operator import add
sc = SparkContext("local", "Reduce app") nums = sc.parallelize([1, 2, 3, 4, 5])
方法測試:
>>> adding = nums.reduce(add) >>> print("Adding all the elements -> %i" % (adding)) Adding all the elements -> 15
(3) 相似sql中的join。
from pyspark import SparkContext sc = SparkContext("local", "Join app")
方法測試:
>>> x = sc.parallelize([("spark", 1), ("hadoop", 4)]) >>> y = sc.parallelize([("spark", 2), ("hadoop", 5)]) >>> joined = x.join(y) >>> joined PythonRDD[16] at RDD at PythonRDD.scala:53 # 說明joined是個惰性的rdd。
>>> final = joined.collect() >>> print("Join RDD -> %s" % (final)) Join RDD -> [('hadoop', (4, 5)), ('spark', (1, 2))]
4、「持久化」 的必要性
沒用持久化
list = ["Hadoop」, 「Spark", "Hive"] rdd = sc.parallelize(list)
print(rdd.count()) # 動做操做,觸發一次從頭至尾的計算 print(','.join(rdd.collect())) # 以逗號做爲分隔把這三個字符串鏈接起來,python使用; # 動做操做,觸發一次從頭至尾的計算
使用持久化
.persist(MEMORY_AND_DISK)
.persist(MEMORY_ONLY) ---> .cache() # 簡潔寫法
list = ["Hadoop」, 「Spark", "Hive"] rdd = sc.parallelize(list)
# 第一次行動計算時,才真正的緩存持久化 rdd.cache()
print(rdd.count()) print(','.join(rdd.collect())) # 這裏便不須要在從頭至尾計算,由於rdd已緩存
rdd.unpersist()
原始的方式,以下所示:
StorageLevel決定如何存儲RDD。在Apache Spark中,StorageLevel決定RDD是應該存儲在內存中仍是存儲在磁盤上,或二者都存儲。它還決定是否序列化RDD以及是否複製RDD分區。
讓咱們考慮如下StorageLevel示例,其中咱們使用存儲級別 MEMORY_AND_DISK_2, 這意味着RDD分區將具備2的複製。
from pyspark import SparkContext import pyspark sc = SparkContext ( "local", "storagelevel app" ) rdd1 = sc.parallelize([1,2]) rdd1.persist( pyspark.StorageLevel.MEMORY_AND_DISK_2 ) rdd1.getStorageLevel() print(rdd1.getStorageLevel())
RDD分區
Ref: Spark-RDD 分區
1、分區好處
增長並行性
多節點同時計算。
減小通訊開銷
(UserId, UserInfo) join (UserID, LinkInfo) ---> (UserID, UserInfo, LinkInfo)
一個文件很是大,分塊存儲在不一樣的機器上,謂之 「分塊」。
左圖:每個塊,有全部rows的一部分信息;
右圖:沒一個塊,只有一部分rows的信息。
左圖三步驟:
(1) join操做會將兩個數據集中的全部的鍵的哈希值都求出來,
(2) 將哈希值相同的記錄傳送到同一臺機器上,
(3) 以後在該機器上對全部鍵相同的記錄進行join操做。
這種狀況之下,每次進行join都會有數據混洗的問題,形成了很大的網絡傳輸開銷。
右圖三過程:
(1) 因爲UserData表比events表要大得多,因此選擇將UserData進行分區。
(2) 以後Spark就會知曉該RDD是根據鍵的哈希值來分區的。
(3) 這樣在調用join()時,Spark就會利用這一點。當調用UserData.join(events)時,Spark只會對events進行數據混洗操做,將events中特定的UserID的記錄發送到userData的對應分區所在的那臺機器上。
2、分區原則
手動分區
儘可能等於集羣中的邏輯cpu core的數量。
彈性RDD的演示:
list = [1,2,3,4,5] data = sc.parallelize(list, 2) len(data.glom().collect())
2 rdd = data.repartition(1) len(rdd.glom().collect())
1
自定義分區
三種分區方式:
- HashPartitioner(默認)
- RangePartitioner(默認)
- 自定義分區
分配分區的index。
from pyspark import SparkConf, SparkContext def MyPartitioner(key): print("MyPartitioner is running") print('The key is %d' % key) return key % 10
自定義分區。
def main(): print("The main function is running") conf = SparkConf().setMaster("local").setAppName("MyApp") sc = SparkContext(conf = conf) # 把這些數字分紅5個分區 data = sc.parallelize(range(10), 5) data.map(lambda x: (x,1)) \ .partitionBy(10, MyPartitioner) \ .map(lambda x: x[0]) \ .saveAsTextFile("file:///usr/local/spark/mycode/rdd/partitioner") # 目錄地址,10個分區是10個文件 if __name__ == '__main__': main()
map(lambda x: (x, 1))
map(lambda x: x[0])
3、調試方式
由於分了10個區,因此最後生成了10個文件。
# python sol
python3 TestPartitioner.py
# spark sol
spark-submit TestPartitioner.py
栗子:word count
1、進行詞頻統計
其實就是上文中RDD操做的一個綜合應用。
lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
wordCount = lines.flatMap(lambda line: line.split(" "))
.map(lambda word: (word,1))
.reduceByKey(lambda a,b:a+b)
print(wordCount.collect())
2、過程解析
將統計內容分配到各個節點,計算出分區的統計結果,以後再reduce到master統計出最終結果。
鍵值對RDD
1、如何建立
從文件中加載
flatMap, map便可得。
經過並行集合建立
直接flat便可。
2、如何轉換
常見的轉換方法以下。
reduceByKey(lambda a,b: a+b)
# 等價於:
groupByKey().map(lambda t: (t[0], sum(t[1]))) # (one,1) (two,(1,1)) (three,(1,1,1))
pairRDD.keys().foreach() pairRDD.values().foreach()
pairRDD.sortByKey().foreach() # 降序排序sortByKey(False) d1.reduceByKey(lambda a,b: a+b).sortBy(lambda x: x,False) # 默認是key排序 d1.reduceByKey(lambda a,b: a+b).sortBy(lambda x: x[0],False) # key排序 d1.reduceByKey(lambda a,b: a+b).sortBy(lambda x: x[1],False) # value排序
pairRDD.mapValues(lambda x: x+1) # 只針對dict的value操做 pairRdd1.join(pairRDD2) # 根據key,把value歸併起來,相似於:flatMap+map
栗子:average sales
1、計算天天平均銷量
2、代碼
x[0]表明值的總和。
x[1]表明值的個數。
rdd = sc.parallelize([("spark",2), ("hadoop", 6), ("hadoop", 4), ("spark", 6)]) rdd.mapValues(lambda x: (x,1)). \
... reduceByKey(lambda x,y: (x[0]+y[0], x[1]+[1]). \
... mapValues(lambda x: x[0]/x[1]).collect()
廣播與累積器
1、共享變量
默認狀況下,若是在一個算子的函數中使用到了某個外部的變量,那麼這個變量的值會被拷貝到每一個task中。此時每一個task只能操做本身的那份變量副本。若是多個task想要共享某個變量,那麼這種方式是作不到的。
所以,Spark提供了兩種共享變量,一種是Broadcast Variable(廣播變量),另外一種是Accumulator(累加變量)。
- Broadcast Variable會將使用到的變量,僅僅爲每一個節點拷貝一份,而不會爲每一個task都拷貝一份副本。更大的用處是優化性能,減小網絡傳輸以及內存消耗。
- Accumulator則可讓多個task共同操做一份變量,主要能夠進行累加操做。
2、實例演示
Broadcast
廣播變量用於跨全部節點保存數據副本。此變量緩存在全部計算機上,而不是在具備任務的計算機上發送。
class pyspark.Broadcast ( sc = None, value = None, pickle_registry = None, path = None )
示例代碼:
from pyspark import SparkContext sc = SparkContext("local", "Broadcast app")
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"])
data = words_new.value print "Stored data -> %s" % (data) elem = words_new.value[2] print "Printing a particular element in RDD -> %s" % (elem)
Accumulator
累加器變量用於經過關聯和交換操做聚合信息。例如,您可使用累加器進行求和操做或計數器(在MapReduce中)。
from pyspark import SparkContext sc = SparkContext("local", "Accumulator app")
num = sc.accumulator(10)
def f(x): global num num+=x
rdd = sc.parallelize([20,30,40,50]) rdd.foreach(f) final = num.value print "Accumulated value is -> %i" % (final)
End.