本文中,咱們將首先討論如何在本地機器上利用Spark進行簡單分析。而後,將在入門級水平探索Spark,瞭解Spark是什麼以及它如何工做(但願能夠激發更多探索)。最後兩節將開始經過命令行與Spark進行交互,而後演示如何用Python寫Spark應用,並做爲Spark做業提交到集羣上。同時也會提供相應的 Scala 版本。html
在本機設置和運行Spark很是簡單。你只須要下載一個預構建的包,只要你安裝了Java 6+和Python 2.6+,就能夠在Windows、Mac OS X和Linux上運行Spark。確保java程序在PATH環境變量中,或者設置了JAVA_HOME環境變量。相似的,python也要在PATH中。java
假設你已經安裝了Java和Python,以及 Spark,若是沒有請參照以前的教程:python
《Spark 僞分佈式 & 全分佈式 安裝指南》:http://my.oschina.net/leejun2005/blog/394928git
注意:若是要用到下文的 pyspark,則須要設置 python 相關的 spark 包路徑:github
vi .bashrc export PYTHONPATH=$SPARK_HOME/python/:$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH
不然會報錯: ImportError: No module named pyspark 或者 ImportError: No module named py4j.java_gateway算法
source這些配置(或者重啓終端)以後,你就能夠在本地運行一個pyspark解釋器。執行pyspark命令,你會看到如下結果:數據庫
~$ pyspark Python 2.7.8 (default, Dec 2 2014, 12:45:58) [GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.54)] on darwin Type "help", "copyright", "credits" or "license" for more information. Spark assembly has been built with Hive, including Datanucleus jars on classpath Using Sparks default log4j profile: org/apache/spark/log4j-defaults.properties [… snip …] Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ `_/ /__ / .__/\_,_/_/ /_/\_\ version 1.2.0 /_/ Using Python version 2.7.8 (default, Dec 2 2014 12:45:58) SparkContext available as sc. >>>
如今Spark已經安裝完畢,能夠在本機以」單機模式「使用。你能夠在本機開發應用並提交Spark做業,這些做業將以多進程/多線程模式運行的,或者,配置該機器做爲一個集羣的客戶端(不推薦這樣作,由於在Spark做業中,驅動程序(driver)是個很重要的角色,而且應該與集羣的其餘部分處於相同網絡)。apache
Spark(和PySpark)的執行能夠特別詳細,不少INFO日誌消息都會打印到屏幕。開發過程當中,這些很是惱人,由於可能丟失Python棧跟蹤或者print的輸出。爲了減小Spark輸出 – 你能夠設置$SPARK_HOME/conf下的log4j。首先,拷貝一份$SPARK_HOME/conf/log4j.properties.template文件,去掉「.template」擴展名。編程
編輯新文件,用WARN替換代碼中出現的INFO。你的log4j.properties文件相似:segmentfault
~$ pyspark Python 2.7.8 (default, Dec 2 2014, 12:45:58) [GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.54)] on darwin Type "help", "copyright", "credits" or "license" for more information. Spark assembly has been built with Hive, including Datanucleus jars on classpath Using Sparks default log4j profile: org/apache/spark/log4j-defaults.properties [… snip …] Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ `_/ /__ / .__/\_,_/_/ /_/\_\ version 1.2.0 /_/ Using Python version 2.7.8 (default, Dec 2 2014 12:45:58) SparkContext available as sc. >>>
如今運行PySpark,輸出消息將會更簡略!
talk is cheap,show you the code. 我們先來測試下 Spark 環境是否正常:
from pyspark import SparkContext from pyspark.streaming import StreamingContext #sc = SparkContext("spark://110.9.17.187:8070", "NetworkWordCount") from pyspark import SparkConf, SparkContext conf = SparkConf().setMaster("local").setAppName("MY First App") sc = SparkContext(conf = conf) data = [1, 2, 3, 4, 5] distData = sc.parallelize(data) distData.reduce(lambda a, b: a + b)
若是你能獲得一個數字 15,並且沒有錯誤發生,那麼你的context正確工做了!
既然設置好了Spark,如今咱們討論下Spark是什麼。Spark是個通用的集羣計算框架,經過將大量數據集計算任務分配到多臺計算機上,提供高效內存計算。若是你熟悉Hadoop,那麼你知道分佈式計算框架要解決兩個問題:如何分發數據和如何分發計算。Hadoop使用HDFS來解決分佈式數據問題,MapReduce計算範式提供有效的分佈式計算。相似的,Spark擁有多種語言的函數式編程API,提供了除map和reduce以外更多的運算符,這些操做是經過一個稱做彈性分佈式數據集(resilient distributed datasets, RDDs)的分佈式數據框架進行的。
本質上,RDD是種編程抽象,表明能夠跨機器進行分割的只讀對象集合。RDD能夠從一個繼承結構(lineage)重建(所以能夠容錯),經過並行操做訪問,能夠讀寫HDFS或S3這樣的分佈式存儲,更重要的是,能夠緩存到worker節點的內存中進行當即重用。因爲RDD能夠被緩存在內存中,Spark對迭代應用特別有效,由於這些應用中,數據是在整個算法運算過程當中均可以被重用。大多數機器學習和最優化算法都是迭代的,使得Spark對數據科學來講是個很是有效的工具。另外,因爲Spark很是快,能夠經過相似Python REPL的命令行提示符交互式訪問。
Spark庫自己包含不少應用元素,這些元素能夠用到大部分大數據應用中,其中包括對大數據進行相似SQL查詢的支持,機器學習和圖算法,甚至對實時流數據的支持。
核心組件以下:
Spark Core:包含Spark的基本功能;尤爲是定義RDD的API、操做以及這二者上的動做。其餘Spark的庫都是構建在RDD和Spark Core之上的。
Spark SQL:提供經過Apache Hive的SQL變體Hive查詢語言(HiveQL)與Spark進行交互的API。每一個數據庫表被當作一個RDD,Spark SQL查詢被轉換爲Spark操做。對熟悉Hive和HiveQL的人,Spark能夠拿來就用。
Spark Streaming:容許對實時數據流進行處理和控制。不少實時數據庫(如Apache Store)能夠處理實時數據。Spark Streaming容許程序可以像普通RDD同樣處理實時數據。
MLlib:一個經常使用機器學習算法庫,算法被實現爲對RDD的Spark操做。這個庫包含可擴展的學習算法,好比分類、迴歸等須要對大量數據集進行迭代的操做。以前可選的大數據機器學習庫Mahout,將會轉到Spark,並在將來實現。
GraphX:控制圖、並行圖操做和計算的一組算法和工具的集合。GraphX擴展了RDD API,包含控制圖、建立子圖、訪問路徑上全部頂點的操做。
因爲這些組件知足了不少大數據需求,也知足了不少數據科學任務的算法和計算上的須要,Spark快速流行起來。不只如此,Spark也提供了使用Scala、Java和Python編寫的API;知足了不一樣團體的需求,容許更多數據科學家簡便地採用Spark做爲他們的大數據解決方案。
編寫Spark應用與以前實如今Hadoop上的其餘數據流語言相似。代碼寫入一個惰性求值的驅動程序(driver program)中,經過一個動做(action),驅動代碼被分發到集羣上,由各個RDD分區上的worker來執行。而後結果會被髮送回驅動程序進行聚合或編譯。本質上,驅動程序建立一個或多個RDD,調用操做來轉換RDD,而後調用動做處理被轉換後的RDD。
這些步驟大致以下:
(1)定義一個或多個RDD,能夠經過獲取存儲在磁盤上的數據(HDFS,Cassandra,HBase,Local Disk),並行化內存中的某些集合,轉換(transform)一個已存在的RDD,或者,緩存或保存。
(2)經過傳遞一個閉包(函數)給RDD上的每一個元素來調用RDD上的操做。Spark提供了除了Map和Reduce的80多種高級操做。
(3)使用結果RDD的動做(action)(如count、collect、save等)。動做將會啓動集羣上的計算。
當Spark在一個worker上運行閉包時,閉包中用到的全部變量都會被拷貝到節點上,可是由閉包的局部做用域來維護。Spark提供了兩種類型的共享變量,這些變量能夠按照限定的方式被全部worker訪問。廣播變量會被分發給全部worker,可是是隻讀的。累加器這種變量,worker可使用關聯操做來「加」,一般用做計數器。
Spark應用本質上經過轉換和動做來控制RDD。後續文章將會深刻討論,可是理解了這個就足以執行下面的例子了。
簡略描述下Spark的執行。本質上,Spark應用做爲獨立的進程運行,由驅動程序中的SparkContext協調。這個context將會鏈接到一些集羣管理者(如YARN),這些管理者分配系統資源。集羣上的每一個worker由執行者(executor)管理,執行者反過來由SparkContext管理。執行者管理計算、存儲,還有每臺機器上的緩存。
重點要記住的是應用代碼由驅動程序發送給執行者,執行者指定context和要運行的任務。執行者與驅動程序通訊進行數據分享或者交互。驅動程序是Spark做業的主要參與者,所以須要與集羣處於相同的網絡。這與Hadoop代碼不一樣,Hadoop中你能夠在任意位置提交做業給JobTracker,JobTracker處理集羣上的執行。
使用Spark最簡單的方式就是使用交互式命令行提示符。打開PySpark終端,在命令行中打出pyspark。
PySpark將會自動使用本地Spark配置建立一個SparkContext。你能夠經過sc變量來訪問它。咱們來建立第一個RDD。
>>> text = sc.textFile("shakespeare.txt") >>> print text shakespeare.txt MappedRDD[1] at textFile at NativeMethodAccessorImpl.java:-2
textFile方法將莎士比亞所有做品加載到一個RDD命名文本。若是查看了RDD,你就能夠看出它是個MappedRDD,文件路徑是相對於當前工做目錄的一個相對路徑(記得傳遞磁盤上正確的shakespear.txt文件路徑)。咱們轉換下這個RDD,來進行分佈式計算的「hello world」:「字數統計」。
>>> from operator import add >>> def tokenize(text): ... return text.split() ... >>> words = text.flatMap(tokenize) >>> print words PythonRDD[2] at RDD at PythonRDD.scala:43
咱們首先導入了add操做符,它是個命名函數,能夠做爲加法的閉包來使用。咱們稍後再使用這個函數。首先咱們要作的是把文本拆分爲單詞。咱們建立了一個tokenize函數,參數是文本片斷,返回根據空格拆分的單詞列表。而後咱們經過給flatMap操做符傳遞tokenize閉包對textRDD進行變換建立了一個wordsRDD。你會發現,words是個PythonRDD,可是執行本應該當即進行。顯然,咱們尚未把整個莎士比亞數據集拆分爲單詞列表。
若是你曾使用MapReduce作過Hadoop版的「字數統計」,你應該知道下一步是將每一個單詞映射到一個鍵值對,其中鍵是單詞,值是1,而後使用reducer計算每一個鍵的1總數。
首先,咱們map一下。
>>> wc = words.map(lambda x: (x,1)) >>> print wc.toDebugString() (2) PythonRDD[3] at RDD at PythonRDD.scala:43 | shakespeare.txt MappedRDD[1] at textFile at NativeMethodAccessorImpl.java:-2 | shakespeare.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:-2
我使用了一個匿名函數(用了Python中的lambda關鍵字)而不是命名函數。這行代碼將會把lambda映射到每一個單詞。所以,每一個x都是一個單詞,每一個單詞都會被匿名閉包轉換爲元組(word, 1)。爲了查看轉換關係,咱們使用toDebugString方法來查看PipelinedRDD是怎麼被轉換的。可使用reduceByKey動做進行字數統計,而後把統計結果寫到磁盤。
>>> counts = wc.reduceByKey(add) >>> counts.saveAsTextFile("wc")
一旦咱們最終調用了saveAsTextFile動做,這個分佈式做業就開始執行了,在做業「跨集羣地」(或者你本機的不少進程)運行時,你應該能夠看到不少INFO語句。若是退出解釋器,你能夠看到當前工做目錄下有個「wc」目錄。
$ ls wc /_SUCCESS part-00000 part-00001
每一個part文件都表明你本機上的進程計算獲得的被保持到磁盤上的最終RDD。若是對一個part文件進行head命令,你應該能看到字數統計元組。
$ head wc/part-00000 (u'fawn', 14) (u'Fame.', 1) (u'Fame,', 2) (u'kinghenryviii@7731', 1) (u'othello@36737', 1) (u'loveslabourslost@51678', 1) (u'1kinghenryiv@54228', 1) (u'troilusandcressida@83747', 1) (u'fleeces', 1) (u'midsummersnightsdream@71681', 1)
注意這些鍵沒有像Hadoop同樣被排序(由於Hadoop中Map和Reduce任務中有個必要的打亂和排序階段)。可是,能保證每一個單詞在全部文件中只出現一次,由於你使用了reduceByKey操做符。你還可使用sort操做符確保在寫入到磁盤以前全部的鍵都被排過序。
一個完整的例子:
from pyspark import SparkContext sc = SparkContext("spark://110.9.17.187:8070", "NetworkWordCount") lines = sc.textFile("hdfs://110.9.17.187:8020/tmp/num.txt") lineLengths = lines.map(lambda s: len(s)) totalLength = lineLengths.reduce(lambda a, b: a + b) print totalLength # # lines.count()
scala 版本以下:
val lines = sc.textFile("hdfs://110.9.17.187:8020/tmp/num.txt") val lineLengths = lines.map(s => s.length) val totalLength = lineLengths.reduce((a, b) => a + b)
PS:我這邊用上面的兩段代碼測試發現(一億五千萬隨機數,500MB),scala 比 python 快了 20 倍,跟官方的性能數據相差太遠了,
應該是pythonAPI或者環境哪裏有問題~
編寫Spark應用與經過交互式控制檯使用Spark相似。API是相同的。首先,你須要訪問<SparkContext,它已經由<pyspark自動加載好了。
使用Spark編寫Spark應用的一個基本模板以下:
## Spark Application - execute with spark-submit: spark-submit app.py ## Imports from pyspark import SparkConf, SparkContext ## Module Constants APP_NAME = "My Spark Application" ## Closure Functions ## Main functionality def main(sc): pass if __name__ == "__main__": # Configure Spark conf = SparkConf().setAppName(APP_NAME) conf = conf.setMaster("local[*]") sc = SparkContext(conf=conf) # Execute Main functionality main(sc)
這個模板列出了一個Spark應用所需的東西:導入Python庫,模塊常量,用於調試和Spark UI的可識別的應用名稱,還有做爲驅動程序運行的一些主要分析方法學。在ifmain中,咱們建立了SparkContext,使用了配置好的context執行main。咱們能夠簡單地導入驅動代碼到pyspark而不用執行。注意這裏Spark配置經過setMaster方法被硬編碼到SparkConf,通常你應該容許這個值經過命令行來設置,因此你能看到這行作了佔位符註釋。
使用<sc.stop()或<sys.exit(0)來關閉或退出程序。
## Spark Application - execute with spark-submit ## Imports import csv import matplotlib.pyplot as plt from StringIO import StringIO from datetime import datetime from collections import namedtuple from operator import add, itemgetter from pyspark import SparkConf, SparkContext ## Module Constants APP_NAME = "Flight Delay Analysis" DATE_FMT = "%Y-%m-%d" TIME_FMT = "%H%M" fields = ('date', 'airline', 'flightnum', 'origin', 'dest', 'dep', 'dep_delay', 'arv', 'arv_delay', 'airtime', 'distance') Flight = namedtuple('Flight', fields) ## Closure Functions def parse(row): """ Parses a row and returns a named tuple. """ row[0] = datetime.strptime(row[0], DATE_FMT).date() row[5] = datetime.strptime(row[5], TIME_FMT).time() row[6] = float(row[6]) row[7] = datetime.strptime(row[7], TIME_FMT).time() row[8] = float(row[8]) row[9] = float(row[9]) row[10] = float(row[10]) return Flight(*row[:11]) def split(line): """ Operator function for splitting a line with csv module """ reader = csv.reader(StringIO(line)) return reader.next() def plot(delays): """ Show a bar chart of the total delay per airline """ airlines = [d[0] for d in delays] minutes = [d[1] for d in delays] index = list(xrange(len(airlines))) fig, axe = plt.subplots() bars = axe.barh(index, minutes) # Add the total minutes to the right for idx, air, min in zip(index, airlines, minutes): if min > 0: bars[idx].set_color('#d9230f') axe.annotate(" %0.0f min" % min, xy=(min+1, idx+0.5), va='center') else: bars[idx].set_color('#469408') axe.annotate(" %0.0f min" % min, xy=(10, idx+0.5), va='center') # Set the ticks ticks = plt.yticks([idx+ 0.5 for idx in index], airlines) xt = plt.xticks()[0] plt.xticks(xt, [' '] * len(xt)) # minimize chart junk plt.grid(axis = 'x', color ='white', linestyle='-') plt.title('Total Minutes Delayed per Airline') plt.show() ## Main functionality def main(sc): # Load the airlines lookup dictionary airlines = dict(sc.textFile("ontime/airlines.csv").map(split).collect()) # Broadcast the lookup dictionary to the cluster airline_lookup = sc.broadcast(airlines) # Read the CSV Data into an RDD flights = sc.textFile("ontime/flights.csv").map(split).map(parse) # Map the total delay to the airline (joined using the broadcast value) delays = flights.map(lambda f: (airline_lookup.value[f.airline], add(f.dep_delay, f.arv_delay))) # Reduce the total delay for the month to the airline delays = delays.reduceByKey(add).collect() delays = sorted(delays, key=itemgetter(1)) # Provide output from the driver for d in delays: print "%0.0f minutes delayed\t%s" % (d[1], d[0]) # Show a bar chart of the delays plot(delays) if __name__ == "__main__": # Configure Spark conf = SparkConf().setMaster("local[*]") conf = conf.setAppName(APP_NAME) sc = SparkContext(conf=conf) # Execute Main functionality main(sc)
使用<spark-submit命令來運行這段代碼(假設你已有ontime目錄,目錄中有兩個CSV文件):
~$ spark-submit app.py
這個Spark做業使用本機做爲master,並搜索app.py同目錄下的ontime目錄下的2個CSV文件。最終結果顯示,4月的總延誤時間(單位分鐘),既有早點的(若是你從美國大陸飛往夏威夷或者阿拉斯加),但對大部分大型航空公司都是延誤的。注意,咱們在app.py中使用matplotlib直接將結果可視化出來了:
這段代碼作了什麼呢?咱們特別注意下與Spark最直接相關的main函數。首先,咱們加載CSV文件到RDD,而後把split函數映射給它。split函數使用csv模塊解析文本的每一行,並返回表明每行的元組。最後,咱們將collect動做傳給RDD,這個動做把數據以Python列表的形式從RDD傳回驅動程序。本例中,airlines.csv是個小型的跳轉表(jump table),能夠將航空公司代碼與全名對應起來。咱們將轉移表存儲爲Python字典,而後使用sc.broadcast廣播給集羣上的每一個節點。
接着,main函數加載了數據量更大的flights.csv([譯者注]做者筆誤寫成fights.csv,此處更正)。拆分CSV行完成以後,咱們將parse函數映射給CSV行,此函數會把日期和時間轉成Python的日期和時間,並對浮點數進行合適的類型轉換。每行做爲一個NamedTuple保存,名爲Flight,以便高效簡便地使用。
有了Flight對象的RDD,咱們映射一個匿名函數,這個函數將RDD轉換爲一些列的鍵值對,其中鍵是航空公司的名字,值是到達和出發的延誤時間總和。使用reduceByKey動做和add操做符能夠獲得每一個航空公司的延誤時間總和,而後RDD被傳遞給驅動程序(數據中航空公司的數目相對較少)。最終延誤時間按照升序排列,輸出打印到了控制檯,而且使用matplotlib進行了可視化。
這個例子稍長,可是但願能演示出集羣和驅動程序之間的相互做用(發送數據進行分析,結果取回給驅動程序),以及Python代碼在Spark應用中的角色。
Spark Streaming 主要用來作實時處理,其原理本質上是「更細粒度的批處理」:
from pyspark import SparkContext from pyspark.streaming import StreamingContext # Create a local StreamingContext with two working thread and batch interval of 1 second sc = SparkContext("spark://110.9.17.187:8070", "NetworkWordCount") ssc = StreamingContext(sc, 3) # Create a DStream that will connect to hostname:port, like localhost:9999 lines = ssc.socketTextStream("110.9.17.187", 9999) # Split each line into words words = lines.flatMap(lambda line: line.split(" ")) # Count each word in each batch pairs = words.map(lambda word: (word, 1)) wordCounts = pairs.reduceByKey(lambda x, y: x + y) # Print the first ten elements of each RDD generated in this DStream to the console wordCounts.pprint() ssc.start() # Start the computation ssc.awaitTermination() # Wait for the computation to terminate
Scala 版本:
import org.apache.spark._ import org.apache.spark.streaming._ // not necessary in Spark 1.3+ // Create a local StreamingContext with two working thread and batch interval of 1 second. // The master requires 2 cores to prevent from a starvation scenario. object Streaming { def main(args: Array[String]) { val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(3)) // Create a DStream that will connect to hostname:port, like localhost:9999 val lines = ssc.socketTextStream("localhost", 9999) // Split each line into words val words = lines.flatMap(_.split(" ")) // Count each word in each batch val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) // Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print() ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate } }
測試:
# TERMINAL 1: # Running Netcat $ nc -lk 9999 hello world ...
# TERMINAL 2: RUNNING NetworkWordCount spark-submit TestScala.jar localhost 9999 ... ------------------------------------------- Time: 1357008430000 ms ------------------------------------------- (hello,1) (world,1) ...
儘管算不上一個完整的Spark入門,咱們但願你能更好地瞭解Spark是什麼,如何使用進行快速、內存分佈式計算。至少,你應該能將Spark運行起來,並開始在本機或Amazon EC2上探索數據。
Spark不能解決分佈式存儲問題(一般Spark從HDFS中獲取數據),可是它爲分佈式計算提供了豐富的函數式編程API。這個框架創建在伸縮分佈式數據集(RDD)之上。RDD是種編程抽象,表明被分區的對象集合,容許進行分佈式操做。RDD有容錯能力(可伸縮的部分),更重要的時,能夠存儲到節點上的worker內存裏進行當即重用。內存存儲提供了快速和簡單表示的迭代算法,以及實時交互分析。
因爲Spark庫提供了Python、Scale、Java編寫的API,以及內建的機器學習、流數據、圖算法、類SQL查詢等模塊;Spark迅速成爲當今最重要的分佈式計算框架之一。與YARN結合,Spark提供了增量,而不是替代已存在的Hadoop集羣,它將成爲將來大數據重要的一部分,爲數據科學探索鋪設了一條康莊大道。
附上代碼:
#pyspark --master yarn-client --queue root.spark.username.spark --executor-memory 12G --num-executors 20 from __future__ import division import decimal from pyspark import SparkConf, SparkContext, StorageLevel import sys reload(sys) sys.setdefaultencoding('utf-8') #sc.stop conf = SparkConf() conf.setAppName("lego") #print conf.toDebugString() file_rdd = sc.textFile("/ooxx/20170317/") #統計全天日誌量、關鍵詞出現次數、3月17日15:30-16:45關鍵詞次數 def parse_line(line): a = ("uploadFail_all", 0) b = ("uploadFail_34", 0) if "uploadFail" in line: a = ("uploadFail_all", 1) if 1489735800000 < int(line.split("\x01")[5]) < 1489740300000: b = ("uploadFail_34", 1) return (("all_log",1), a, b) file_rdd.map(parse_line).flatMap(lambda item: [item[0], item[1], item[2]]).reduceByKey(lambda accum, n: accum + n).collect() #注意 parse_line 不能 return None,不然 value list 爲 None: >>> for i in None: ... print i ... Traceback (most recent call last): File "<stdin>", line 1, in <module> TypeError: 'NoneType' object is not iterable >>> #def parse_kv(kv): # return ((kv[0],kv[1]), (kv[0]+"*2",kv[1]*2)) # #def sumFunc(accum, n): # return accum + n # #x = sc.parallelize([("a", 1), ("b", 1), ("a", 2), ("a", 4),("b", 2), ("b", 4), ("b", 8), ("b", 16)], 3) #x.map(parse_kv).flatMap(lambda item: [(item[0][0],item[0][1]), (item[1][0],item[1][1])]).reduceByKey(sumFunc).collect()
[1] Spark入門(Python版)
http://blog.jobbole.com/86232/
[2] Spark編程指南筆記
http://blog.javachen.com/2015/02/03/spark-programming-guide/#
[3] Spark Streaming Programming Guide
https://spark.apache.org/docs/latest/streaming-programming-guide.html
[4] 大數據算命系列(8): spark框架與pyspark簡介
https://github.com/renewjoy/bigdata-fortune-telling/blob/master/08_pyspark/pyspark.rst
[5] PySpark內部實現
http://blog.csdn.net/lantian0802/article/details/36376873
[6] Spark Streaming
http://debugo.com/spark-streaming/
[7] Spark入門階段一之掃盲筆記
http://www.javashuo.com/article/p-mulggfym-cu.html
[8] Spark通訊原理之Python與JVM的交互