Spark (Python版) 零基礎學習筆記(一)—— 快速入門

因爲Scala纔剛剛開始學習,仍是對python更爲熟悉,所以在這記錄一下本身的學習過程,主要內容來自於spark的官方幫助文檔,這一節的地址爲:html

http://spark.apache.org/docs/latest/quick-start.htmlpython

文章主要是翻譯了文檔的內容,但也在裏邊加入了一些本身在實際操做中遇到的問題及解決的方案,和一些補充的小知識,一塊兒學習。算法

環境:Ubuntu 16.04 LTS,Spark 2.0.1, Hadoop 2.7.3, Python 3.5.2,shell

 

利用spark shell進行交互式分析apache

1. 基礎api

首先打開spark與python交互的API緩存

$ cd /usr/local/spark
$ ./bin/pyspark

Spark最重要的一個概念就是RDD(Resilient Distributed Dataset),彈性分佈式數據集。RDD能夠利用Hadoop的InputFormats建立,或者從其餘RDD轉換。app

這裏,做爲入門,咱們利用spark安裝後文件夾中自帶的README.md(此文件位置爲/usr/local/spark/README.md)文件做爲例子,學習如何建立一個新的RDD。分佈式

建立新的RDD:函數

>>> textFile = sc.textFile(「README.md」)

 

RDD支持兩種類型的操做,actions和transformations:

actions: 在數據集上運行計算後返回值

transformations: 轉換, 從現有數據集建立一個新的數據集

 RDD能夠有執行一系列的動做(actions),這些動做能夠返回值(values),轉換(transformations),或者指向新的RDD的指針。下邊學習RDD的一些簡單的動做:

>>> textFile.count()  # 計數,返回RDD中items的個數,這裏就是README.md的總行# 數
99
>>> textFile.first()  # RDD中的第一個item,這裏就是文件README.md的第一行
u'# Apache Spark'

注意:若是以前是從/usr/local/spark啓動pyspark,而後讀取README.md文件的,若是執行count語句,會出現如下錯誤:

py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.

: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://localhost:9000/user/spark/README.md

這是由於在使用相對路徑時,系統默認是從hdfs://localhost:9000/目錄下讀取README.md文件的,可是README.md文件並不在這一目錄下,因此sc.textFile()必須使用絕對路徑,此時代碼修改成:

>>> textFile = sc.textFile(「file:///usr/local/spark/README.md」)
99

下邊嘗試使用一個轉換(transformation)。例如,使用filter這一轉換返回一個新的RDD,這些RDD中的items都含有「Spark」字符串。

>>> linesWithSpark = textFile.filter(lambda line: 「Spark」 in line)

咱們還能夠將actions和transformation連接起來:

>>> textFile.filter(lambda line: 「Spark」 in line).count()  # 有多好行含有「Spark」這一字符串
19

 

2. 更多的RDD操做

利用RDD的動做和轉換可以完成不少複雜的計算。例如,咱們但願找到含有最後單詞的一句話:

>>> textFile.map(lambda line: len(line.split())).reduce(lambda a, b: a if (a>b) else b)
22

這個語句中,map函數將len(line.split())這一語句在全部line上執行,返回每一個line所含有的單詞個數,也就是將line都map到一個整數值,而後建立一個新的RDD。而後調用reduce,找到最大值。map和reduce函數裏的參數是python中的匿名函數(lambda),事實上,咱們這裏也能夠傳遞python中更頂層的函數。好比,咱們先定義一個比較大小的函數,這樣咱們的代碼會更容易理解:

>>> def max(a, b):
. . .     if a > b:
. . .         return a
. . .     else:
. . .         return b
. . .
>>> textFile.map(lambda line: len(line.split())).reduce(max)
22

Hadoop掀起了MapReduce的熱潮。在spark中,可以更加容易的實現MapReduce

>>> wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)

上述語句中,利用flatMap, map和reduceByKey三個轉換,計算文件README.md中每一個單詞出現的個數,並返回一個新的RDD,每一個item的格式爲(string, int),即單詞和對應的出現次數。其中,

flatMap(func):與map類似,可是每一個輸入的item可以被map到0個或者更多的輸出items上,也就是說func的返回值應當是一個Seq,而不是一個單獨的item,上述語句中,匿名函數返回的就是一句話中所含的每一個單詞

reduceByKey(func):能夠做用於使用「鍵-值」(K, V)形式存儲的數據集上並返回一組新的數據集(K, V),其中,每一個鍵的值爲聚合使用func操做的結果,這裏至關於python中字典的含義。上述語句中,至關於當某個單詞出現一次時,就在這個單詞的出現次數上加1,每一個單詞就是一個Key,reducByKey中的匿名函數計算單詞的出現次數。

 

要收集上述語句的計算結果,可使用collect這一動做:

>>> wordCounts.collect()
[(u'when', 1), (u'R,', 1), (u'including', 3), (u'computation', 1), ...]

 

3. 緩存Caching

Spark也支持將數據集存入集羣範圍的內存緩存中。這對於須要進行重複訪問的數據很是有用,好比咱們須要在一個小的數據集中執行查詢操做,或者須要執行一個迭代算法(例如PageRank)。下面,利用以前命令中獲得的linesWithSpark數據集,演示緩存這一操做過程:

>>> linesWithSpark.cache()
PythonRDD[26] at RDD at PythonRDD.scala:48
>>> linesWithSpark.count()
19
>>> linesWithSpark.count()
19

利用Spark去緩存一個100行的文件可能並沒什麼意義。可是有趣的是,這一系列的操做能夠用於很是大的數據集上,甚至含有成千上萬的節點的數據集。

 

4. 自含式應用程序(self-contained applications)

假設咱們但願利用Spark API寫一個自含式應用程序,咱們能夠利用Scala,Java或者Python完成。

下邊,簡單介紹一下怎樣利用Python API (PySpark)寫一個應用程序,命名爲SimpleApp.py.

 

在spark所在目錄下輸入:

./bin/spark-submit --master local[4] SimpleApp.py

輸出爲:

Lines with a: 61, Lines with b: 27

 

此外,Spark自帶不少例子,能夠在spark目錄下輸入下列指令查看:

# For Scala and Java, use run-example:

./bin/run-example SparkPi

# For Python examples, use spark-submit directly:

./bin/spark-submit examples/src/main/python/pi.py

# For R examples, use spark-submit directly:

./bin/spark-submit examples/src/main/r/dataframe.R
相關文章
相關標籤/搜索