Spark入門(Python)

Hadoop是對大數據集進行分佈式計算的標準工具,這也是爲何當你穿過機場時能看到」大數據(Big Data)」廣告的緣由。它已經成爲大數據的操做系統,提供了包括工具和技巧在內的豐富生態系統,容許使用相對便宜的商業硬件集羣進行超級計算機級別的計算。2003和2004年,兩個來自Google的觀點使Hadoop成爲可能:一個分佈式存儲框架(Google文件系統),在Hadoop中被實現爲HDFS;一個分佈式計算框架(MapReduce)。html

這兩個觀點成爲過去十年規模分析(scaling analytics)、大規模機器學習(machine learning),以及其餘大數據應用出現的主要推進力!可是,從技術角度上講,十年是一段很是長的時間,並且Hadoop還存在不少已知限制,尤爲是MapReduce。對MapReduce編程明顯是困難的。對大多數分析,你都必須用不少步驟將Map和Reduce任務串接起來。這形成類SQL的計算或機器學習須要專門的系統來進行。更糟的是,MapReduce要求每一個步驟間的數據要序列化到磁盤,這意味着MapReduce做業的I/O成本很高,致使交互分析和迭代算法(iterative algorithms)開銷很大;而事實是,幾乎全部的最優化和機器學習都是迭代的。java

爲了解決這些問題,Hadoop一直在向一種更爲通用的資源管理框架轉變,即YARN(Yet Another Resource Negotiator, 又一個資源協調者)。YARN實現了下一代的MapReduce,但同時也容許應用利用分佈式資源而沒必要採用MapReduce進行計算。經過將集羣管理通常化,研究轉到分佈式計算的通常化上,來擴展了MapReduce的初衷。node

Spark是第一個脫胎於該轉變的快速、通用分佈式計算範式,而且很快流行起來。Spark使用函數式編程範式擴展了MapReduce模型以支持更多計算類型,能夠涵蓋普遍的工做流,這些工做流以前被實現爲Hadoop之上的特殊系統。Spark使用內存緩存來提高性能,所以進行交互式分析也足夠快速(就如同使用Python解釋器,與集羣進行交互同樣)。緩存同時提高了迭代算法的性能,這使得Spark很是適合數據理論任務,特別是機器學習。python

本文中,咱們將首先討論如何在本地機器上或者EC2的集羣上設置Spark進行簡單分析。而後,咱們在入門級水平探索Spark,瞭解Spark是什麼以及它如何工做(但願能夠激發更多探索)。最後兩節咱們開始經過命令行與Spark進行交互,而後演示如何用Python寫Spark應用,並做爲Spark做業提交到集羣上。git

設置Spark算法

在本機設置和運行Spark很是簡單。你只須要下載一個預構建的包,只要你安裝了Java 6+和Python 2.6+,就能夠在Windows、Mac OS X和Linux上運行Spark。確保java程序在PATH環境變量中,或者設置了JAVA_HOME環境變量。相似的,python也要在PATH中。shell

假設你已經安裝了Java和Python:數據庫

  1. 訪問Spark下載頁
  2. 選擇Spark最新發布版(本文寫做時是1.2.0),一個預構建的Hadoop 2.4包,直接下載。

如今,如何繼續依賴於你的操做系統,靠你本身去探索了。Windows用戶能夠在評論區對如何設置的提示進行評論。apache

通常,個人建議是按照下面的步驟(在POSIX操做系統上):編程

1.解壓Spark

~$ tar -xzf spark-1.2.0-bin-hadoop2.4.tgz

 

2.將解壓目錄移動到有效應用程序目錄中(如Windows上的

~$ mv spark-1.2.0-bin-hadoop2.4 /srv/spark-1.2.0

 

3.建立指向該Spark版本的符號連接到<spark目錄。這樣你能夠簡單地下載新/舊版本的Spark,而後修改連接來管理Spark版本,而不用更改路徑或環境變量。

~$ ln -s /srv/spark-1.2.0 /srv/spark

 

4.修改BASH配置,將Spark添加到PATH中,設置SPARK_HOME環境變量。這些小技巧在命令行上會幫到你。在Ubuntu上,只要編輯~/.bash_profile或~/.profile文件,將如下語句添加到文件中:

export SPARK_HOME=/srv/spark
export PATH=$SPARK_HOME/bin:$PATH

 

5.source這些配置(或者重啓終端)以後,你就能夠在本地運行一個pyspark解釋器。執行pyspark命令,你會看到如下結果:

~$ pyspark
Python 2.7.8 (default, Dec  2 2014, 12:45:58)
[GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.54)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Sparks default log4j profile: org/apache/spark/log4j-defaults.properties
[… snip …]
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  `_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.2.0
      /_/
 
Using Python version 2.7.8 (default, Dec  2 2014 12:45:58)
SparkContext available as sc.
>>>

 

如今Spark已經安裝完畢,能夠在本機以」單機模式「(standalone mode)使用。你能夠在本機開發應用並提交Spark做業,這些做業將以多進程/多線程模式運行的,或者,配置該機器做爲一個集羣的客戶端(不推薦這樣作,由於在Spark做業中,驅動程序(driver)是個很重要的角色,而且應該與集羣的其餘部分處於相同網絡)。可能除了開發,你在本機使用Spark作得最多的就是利用spark-ec2腳原本配置Amazon雲上的一個EC2 Spark集羣了。

簡略Spark輸出

Spark(和PySpark)的執行能夠特別詳細,不少INFO日誌消息都會打印到屏幕。開發過程當中,這些很是惱人,由於可能丟失Python棧跟蹤或者print的輸出。爲了減小Spark輸出 – 你能夠設置$SPARK_HOME/conf下的log4j。首先,拷貝一份$SPARK_HOME/conf/log4j.properties.template文件,去掉「.template」擴展名。

~$ cp $SPARK_HOME/conf/log4j.properties.template $SPARK_HOME/conf/log4j.properties

 

編輯新文件,用WARN替換代碼中出現的INFO。你的log4j.properties文件相似:

# Set everything to be logged to the console
 log4j.rootCategory=WARN, console
 log4j.appender.console=org.apache.log4j.ConsoleAppender
 log4j.appender.console.target=System.err
 log4j.appender.console.layout=org.apache.log4j.PatternLayout
 log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Settings to quiet third party logs that are too verbose
 log4j.logger.org.eclipse.jetty=WARN
 log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
 log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=WARN
 log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=WARN

 

如今運行PySpark,輸出消息將會更簡略!感謝@genomegeek在一次District Data Labs的研討會中指出這一點。

在Spark中使用IPython Notebook

當搜索有用的Spark小技巧時,我發現了一些文章提到在PySpark中配置IPython notebook。IPython notebook對數據科學家來講是個交互地呈現科學和理論工做的必備工具,它集成了文本和Python代碼。對不少數據科學家,IPython notebook是他們的Python入門,而且使用很是普遍,因此我想值得在本文中說起。

這裏的大部分說明都來改編自IPython notebook: 在PySpark中設置IPython。可是,咱們將聚焦在本機以單機模式將IPtyon shell鏈接到PySpark,而不是在EC2集羣。若是你想在一個集羣上使用PySpark/IPython,查看並評論下文的說明吧!

  1. 1.爲Spark建立一個iPython notebook配置
~$ ipython profile create spark
[ProfileCreate] Generating default config file: u'$HOME/.ipython/profile_spark/ipython_config.py'
[ProfileCreate] Generating default config file: u'$HOME/.ipython/profile_spark/ipython_notebook_config.py'
[ProfileCreate] Generating default config file: u'$HOME/.ipython/profile_spark/ipython_nbconvert_config.py'

 

記住配置文件的位置,替換下文各步驟相應的路徑:

2.建立文件$HOME/.ipython/profile_spark/startup/00-pyspark-setup.py,並添加以下代碼:

import os
import sys
 
# Configure the environment
if 'SPARK_HOME' not in os.environ:
    os.environ['SPARK_HOME'] = '/srv/spark'
 
# Create a variable for our root path
SPARK_HOME = os.environ['SPARK_HOME']
 
# Add the PySpark/py4j to the Python Path
sys.path.insert(0, os.path.join(SPARK_HOME, "python", "build"))
sys.path.insert(0, os.path.join(SPARK_HOME, "python"))

 

3.使用咱們剛剛建立的配置來啓動IPython notebook。

~$ ipython notebook --profile spark

 

4.在notebook中,你應該能看到咱們剛剛建立的變量。

print SPARK_HOME

 

5.在IPython notebook最上面,確保你添加了Spark context。

from pyspark import  SparkContext
sc = SparkContext( 'local', 'pyspark')

 

6.使用IPython作個簡單的計算來測試Spark context。

def isprime(n):
"""
check if integer n is a prime
"""
# make sure n is a positive integer
n = abs(int(n))
# 0 and 1 are not primes
if n < 2:
    return False
# 2 is the only even prime number
if n == 2:
    return True
# all other even numbers are not primes
if not n & 1:
    return False
# range starts with 3 and only needs to go up the square root of n
# for all odd numbers
for x in range(3, int(n**0.5)+1, 2):
    if n % x == 0:
        return False
return True
 
# Create an RDD of numbers from 0 to 1,000,000
nums = sc.parallelize(xrange(1000000))
 
# Compute the number of primes in the RDD
print nums.filter(isprime).count()

 

若是你能獲得一個數字並且沒有錯誤發生,那麼你的context正確工做了!

編輯提示:上面配置了一個使用PySpark直接調用IPython notebook的IPython context。可是,你也可使用PySpark按如下方式直接啓動一個notebook: $ IPYTHON_OPTS=」notebook –pylab inline」 pyspark

哪一個方法好用取決於你使用PySpark和IPython的具體情景。前一個容許你更容易地使用IPython notebook鏈接到一個集羣,所以是我喜歡的方法。

在EC2上使用Spark

在講授使用Hadoop進行分佈式計算時,我發現不少能夠經過在本地僞分佈式節點(pseudo-distributed node)或以單節點模式(single-node mode)講授。可是爲了瞭解真正發生了什麼,就須要一個集羣。當數據變得龐大,這些書面講授的技能和真實計算需求間常常出現隔膜。若是你肯在學習詳細使用Spark上花錢,我建議你設置一個快速Spark集羣作作實驗。 包含5個slave(和1個master)每週大概使用10小時的集羣每個月大概須要$45.18。

完整的討論能夠在Spark文檔中找到:在EC2上運行Spark在你決定購買EC2集羣前必定要通讀這篇文檔!我列出了一些關鍵點:

  1. 經過AWS Console獲取AWS EC2 key對(訪問key和密鑰key)。
  2. 將key對導出到你的環境中。在shell中敲出如下命令,或者將它們添加到配置中。
export AWS_ACCESS_KEY_ID=myaccesskeyid
export AWS_SECRET_ACCESS_KEY=mysecretaccesskey

 

注意不一樣的工具使用不一樣的環境名稱,確保你用的是Spark腳本所使用的名稱。

3.啓動集羣:

~$ cd $SPARK_HOME/ec2
ec2$ ./spark-ec2 -k <keypair> -i <key-file> -s <num-slaves> launch <cluster-name>

 

4.SSH到集羣來運行Spark做業。

ec2$ ./spark-ec2 -k <keypair> -i <key-file> login <cluster-name>

 

5.銷燬集羣

ec2$ ./spark-ec2 destroy &lt;cluster-name&gt;.

 

這些腳本會自動建立一個本地的HDFS集羣來添加數據,copy-dir命令能夠同步代碼和數據到該集羣。可是你最好使用S3來存儲數據,建立使用s3://URI來加載數據的RDDs。

Spark是什麼?

既然設置好了Spark,如今咱們討論下Spark是什麼。Spark是個通用的集羣計算框架,經過將大量數據集計算任務分配到多臺計算機上,提供高效內存計算。若是你熟悉Hadoop,那麼你知道分佈式計算框架要解決兩個問題:如何分發數據和如何分發計算。Hadoop使用HDFS來解決分佈式數據問題,MapReduce計算範式提供有效的分佈式計算。相似的,Spark擁有多種語言的函數式編程API,提供了除map和reduce以外更多的運算符,這些操做是經過一個稱做彈性分佈式數據集(resilient distributed datasets, RDDs)的分佈式數據框架進行的。

本質上,RDD是種編程抽象,表明能夠跨機器進行分割的只讀對象集合。RDD能夠從一個繼承結構(lineage)重建(所以能夠容錯),經過並行操做訪問,能夠讀寫HDFS或S3這樣的分佈式存儲,更重要的是,能夠緩存到worker節點的內存中進行當即重用。因爲RDD能夠被緩存在內存中,Spark對迭代應用特別有效,由於這些應用中,數據是在整個算法運算過程當中均可以被重用。大多數機器學習和最優化算法都是迭代的,使得Spark對數據科學來講是個很是有效的工具。另外,因爲Spark很是快,能夠經過相似Python REPL的命令行提示符交互式訪問。

Spark庫自己包含不少應用元素,這些元素能夠用到大部分大數據應用中,其中包括對大數據進行相似SQL查詢的支持,機器學習和圖算法,甚至對實時流數據的支持。

核心組件以下:

  • Spark Core:包含Spark的基本功能;尤爲是定義RDD的API、操做以及這二者上的動做。其餘Spark的庫都是構建在RDD和Spark Core之上的。
  • Spark SQL:提供經過Apache Hive的SQL變體Hive查詢語言(HiveQL)與Spark進行交互的API。每一個數據庫表被當作一個RDD,Spark SQL查詢被轉換爲Spark操做。對熟悉Hive和HiveQL的人,Spark能夠拿來就用。
  • Spark Streaming:容許對實時數據流進行處理和控制。不少實時數據庫(如Apache Store)能夠處理實時數據。Spark Streaming容許程序可以像普通RDD同樣處理實時數據。
  • MLlib:一個經常使用機器學習算法庫,算法被實現爲對RDD的Spark操做。這個庫包含可擴展的學習算法,好比分類、迴歸等須要對大量數據集進行迭代的操做。以前可選的大數據機器學習庫Mahout,將會轉到Spark,並在將來實現。
  • GraphX:控制圖、並行圖操做和計算的一組算法和工具的集合。GraphX擴展了RDD API,包含控制圖、建立子圖、訪問路徑上全部頂點的操做。

因爲這些組件知足了不少大數據需求,也知足了不少數據科學任務的算法和計算上的須要,Spark快速流行起來。不只如此,Spark也提供了使用Scala、Java和Python編寫的API;知足了不一樣團體的需求,容許更多數據科學家簡便地採用Spark做爲他們的大數據解決方案。

對Spark編程

編寫Spark應用與以前實如今Hadoop上的其餘數據流語言相似。代碼寫入一個惰性求值的驅動程序(driver program)中,經過一個動做(action),驅動代碼被分發到集羣上,由各個RDD分區上的worker來執行。而後結果會被髮送回驅動程序進行聚合或編譯。本質上,驅動程序建立一個或多個RDD,調用操做來轉換RDD,而後調用動做處理被轉換後的RDD。

這些步驟大致以下:

  1. 定義一個或多個RDD,能夠經過獲取存儲在磁盤上的數據(HDFS,Cassandra,HBase,Local Disk),並行化內存中的某些集合,轉換(transform)一個已存在的RDD,或者,緩存或保存。
  2. 經過傳遞一個閉包(函數)給RDD上的每一個元素來調用RDD上的操做。Spark提供了除了Map和Reduce的80多種高級操做。
  3. 使用結果RDD的動做(action)(如count、collect、save等)。動做將會啓動集羣上的計算。

當Spark在一個worker上運行閉包時,閉包中用到的全部變量都會被拷貝到節點上,可是由閉包的局部做用域來維護。Spark提供了兩種類型的共享變量,這些變量能夠按照限定的方式被全部worker訪問。廣播變量會被分發給全部worker,可是是隻讀的。累加器這種變量,worker可使用關聯操做來「加」,一般用做計數器。

Spark應用本質上經過轉換和動做來控制RDD。後續文章將會深刻討論,可是理解了這個就足以執行下面的例子了。

Spark的執行

簡略描述下Spark的執行。本質上,Spark應用做爲獨立的進程運行,由驅動程序中的SparkContext協調。這個context將會鏈接到一些集羣管理者(如YARN),這些管理者分配系統資源。集羣上的每一個worker由執行者(executor)管理,執行者反過來由SparkContext管理。執行者管理計算、存儲,還有每臺機器上的緩存。

重點要記住的是應用代碼由驅動程序發送給執行者,執行者指定context和要運行的任務。執行者與驅動程序通訊進行數據分享或者交互。驅動程序是Spark做業的主要參與者,所以須要與集羣處於相同的網絡。這與Hadoop代碼不一樣,Hadoop中你能夠在任意位置提交做業給JobTracker,JobTracker處理集羣上的執行。

與Spark交互

使用Spark最簡單的方式就是使用交互式命令行提示符。打開PySpark終端,在命令行中打出pyspark。

~$ pyspark
[… snip …]
>>>

 

PySpark將會自動使用本地Spark配置建立一個SparkContext。你能夠經過sc變量來訪問它。咱們來建立第一個RDD。

>>> text = sc.textFile("shakespeare.txt")
>>> print text
shakespeare.txt MappedRDD[1] at textFile at NativeMethodAccessorImpl.java:-2

 

textFile方法將莎士比亞所有做品加載到一個RDD命名文本。若是查看了RDD,你就能夠看出它是個MappedRDD,文件路徑是相對於當前工做目錄的一個相對路徑(記得傳遞磁盤上正確的shakespear.txt文件路徑)。咱們轉換下這個RDD,來進行分佈式計算的「hello world」:「字數統計」。

>>> from operator import add
>>> def tokenize(text):
...     return text.split()
...
>>> words = text.flatMap(tokenize)
>>> print words
PythonRDD[2] at RDD at PythonRDD.scala:43

 

咱們首先導入了add操做符,它是個命名函數,能夠做爲加法的閉包來使用。咱們稍後再使用這個函數。首先咱們要作的是把文本拆分爲單詞。咱們建立了一個tokenize函數,參數是文本片斷,返回根據空格拆分的單詞列表。而後咱們經過給flatMap操做符傳遞tokenize閉包對textRDD進行變換建立了一個wordsRDD。你會發現,words是個PythonRDD,可是執行本應該當即進行。顯然,咱們尚未把整個莎士比亞數據集拆分爲單詞列表。

若是你曾使用MapReduce作過Hadoop版的「字數統計」,你應該知道下一步是將每一個單詞映射到一個鍵值對,其中鍵是單詞,值是1,而後使用reducer計算每一個鍵的1總數。

首先,咱們map一下。

>>> wc = words.map(lambda x: (x,1))
>>> print wc.toDebugString()
(2) PythonRDD[3] at RDD at PythonRDD.scala:43
|  shakespeare.txt MappedRDD[1] at textFile at NativeMethodAccessorImpl.java:-2
|  shakespeare.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:-2

 

我使用了一個匿名函數(用了Python中的lambda關鍵字)而不是命名函數。這行代碼將會把lambda映射到每一個單詞。所以,每一個x都是一個單詞,每一個單詞都會被匿名閉包轉換爲元組(word, 1)。爲了查看轉換關係,咱們使用toDebugString方法來查看PipelinedRDD是怎麼被轉換的。可使用reduceByKey動做進行字數統計,而後把統計結果寫到磁盤。

>>> counts = wc.reduceByKey(add)
>>> counts.saveAsTextFile("wc")

 

一旦咱們最終調用了saveAsTextFile動做,這個分佈式做業就開始執行了,在做業「跨集羣地」(或者你本機的不少進程)運行時,你應該能夠看到不少INFO語句。若是退出解釋器,你能夠看到當前工做目錄下有個「wc」目錄。

$ ls wc/
_SUCCESS   part-00000 part-00001

 

每一個part文件都表明你本機上的進程計算獲得的被保持到磁盤上的最終RDD。若是對一個part文件進行head命令,你應該能看到字數統計元組。

$ head wc/part-00000
(u'fawn', 14)
(u'Fame.', 1)
(u'Fame,', 2)
(u'kinghenryviii@7731', 1)
(u'othello@36737', 1)
(u'loveslabourslost@51678', 1)
(u'1kinghenryiv@54228', 1)
(u'troilusandcressida@83747', 1)
(u'fleeces', 1)
(u'midsummersnightsdream@71681', 1)

 

注意這些鍵沒有像Hadoop同樣被排序(由於Hadoop中Map和Reduce任務中有個必要的打亂和排序階段)。可是,能保證每一個單詞在全部文件中只出現一次,由於你使用了reduceByKey操做符。你還可使用sort操做符確保在寫入到磁盤以前全部的鍵都被排過序。

編寫一個Spark應用

編寫Spark應用與經過交互式控制檯使用Spark相似。API是相同的。首先,你須要訪問<SparkContext,它已經由<pyspark自動加載好了。

使用Spark編寫Spark應用的一個基本模板以下:

## Spark Application - execute with spark-submit
 
## Imports
from pyspark import SparkConf, SparkContext
 
## Module Constants
APP_NAME = "My Spark Application"
 
## Closure Functions
 
## Main functionality
 
def main(sc):
    pass
 
if __name__ == "__main__":
    # Configure Spark
    conf = SparkConf().setAppName(APP_NAME)
    conf = conf.setMaster("local[*]")
    sc   = SparkContext(conf=conf)
 
    # Execute Main functionality
    main(sc)

 

這個模板列出了一個Spark應用所需的東西:導入Python庫,模塊常量,用於調試和Spark UI的可識別的應用名稱,還有做爲驅動程序運行的一些主要分析方法學。在ifmain中,咱們建立了SparkContext,使用了配置好的context執行main。咱們能夠簡單地導入驅動代碼到pyspark而不用執行。注意這裏Spark配置經過setMaster方法被硬編碼到SparkConf,通常你應該容許這個值經過命令行來設置,因此你能看到這行作了佔位符註釋。

使用<sc.stop()或<sys.exit(0)來關閉或退出程序。

## Spark Application - execute with spark-submit
 
## Imports
import csv
import matplotlib.pyplot as plt
 
from StringIO import StringIO
from datetime import datetime
from collections import namedtuple
from operator import add, itemgetter
from pyspark import SparkConf, SparkContext
 
## Module Constants
APP_NAME = "Flight Delay Analysis"
DATE_FMT = "%Y-%m-%d"
TIME_FMT = "%H%M"
 
fields   = ('date', 'airline', 'flightnum', 'origin', 'dest', 'dep',
            'dep_delay', 'arv', 'arv_delay', 'airtime', 'distance')
Flight   = namedtuple('Flight', fields)
 
## Closure Functions
def parse(row):
    """
    Parses a row and returns a named tuple.
    """
 
    row[0]  = datetime.strptime(row[0], DATE_FMT).date()
    row[5]  = datetime.strptime(row[5], TIME_FMT).time()
    row[6]  = float(row[6])
    row[7]  = datetime.strptime(row[7], TIME_FMT).time()
    row[8]  = float(row[8])
    row[9]  = float(row[9])
    row[10] = float(row[10])
    return Flight(*row[:11])
 
def split(line):
    """
    Operator function for splitting a line with csv module
    """
    reader = csv.reader(StringIO(line))
    return reader.next()
 
def plot(delays):
    """
    Show a bar chart of the total delay per airline
    """
    airlines = [d[0] for d in delays]
    minutes  = [d[1] for d in delays]
    index    = list(xrange(len(airlines)))
 
    fig, axe = plt.subplots()
    bars = axe.barh(index, minutes)
 
    # Add the total minutes to the right
    for idx, air, min in zip(index, airlines, minutes):
        if min > 0:
            bars[idx].set_color('#d9230f')
            axe.annotate(" %0.0f min" % min, xy=(min+1, idx+0.5), va='center')
        else:
            bars[idx].set_color('#469408')
            axe.annotate(" %0.0f min" % min, xy=(10, idx+0.5), va='center')
 
    # Set the ticks
    ticks = plt.yticks([idx+ 0.5 for idx in index], airlines)
    xt = plt.xticks()[0]
    plt.xticks(xt, [' '] * len(xt))
 
    # minimize chart junk
    plt.grid(axis = 'x', color ='white', linestyle='-')
 
    plt.title('Total Minutes Delayed per Airline')
    plt.show()
 
## Main functionality
def main(sc):
 
    # Load the airlines lookup dictionary
    airlines = dict(sc.textFile("ontime/airlines.csv").map(split).collect())
 
    # Broadcast the lookup dictionary to the cluster
    airline_lookup = sc.broadcast(airlines)
 
    # Read the CSV Data into an RDD
    flights = sc.textFile("ontime/flights.csv").map(split).map(parse)
 
    # Map the total delay to the airline (joined using the broadcast value)
    delays  = flights.map(lambda f: (airline_lookup.value[f.airline],
                                     add(f.dep_delay, f.arv_delay)))
 
    # Reduce the total delay for the month to the airline
    delays  = delays.reduceByKey(add).collect()
    delays  = sorted(delays, key=itemgetter(1))
 
    # Provide output from the driver
    for d in delays:
        print "%0.0f minutes delayed\t%s" % (d[1], d[0])
 
    # Show a bar chart of the delays
    plot(delays)
 
if __name__ == "__main__":
    # Configure Spark
    conf = SparkConf().setMaster("local[*]")
    conf = conf.setAppName(APP_NAME)
    sc   = SparkContext(conf=conf)
 
    # Execute Main functionality
    main(sc)

 

使用<spark-submit命令來運行這段代碼(假設你已有ontime目錄,目錄中有兩個CSV文件):

~$ spark-submit app.py

 

這個Spark做業使用本機做爲master,並搜索app.py同目錄下的ontime目錄下的2個CSV文件。最終結果顯示,4月的總延誤時間(單位分鐘),既有早點的(若是你從美國大陸飛往夏威夷或者阿拉斯加),但對大部分大型航空公司都是延誤的。注意,咱們在app.py中使用matplotlib直接將結果可視化出來了:

這段代碼作了什麼呢?咱們特別注意下與Spark最直接相關的main函數。首先,咱們加載CSV文件到RDD,而後把split函數映射給它。split函數使用csv模塊解析文本的每一行,並返回表明每行的元組。最後,咱們將collect動做傳給RDD,這個動做把數據以Python列表的形式從RDD傳回驅動程序。本例中,airlines.csv是個小型的跳轉表(jump table),能夠將航空公司代碼與全名對應起來。咱們將轉移表存儲爲Python字典,而後使用sc.broadcast廣播給集羣上的每一個節點。

接着,main函數加載了數據量更大的flights.csv([譯者注]做者筆誤寫成fights.csv,此處更正)。拆分CSV行完成以後,咱們將parse函數映射給CSV行,此函數會把日期和時間轉成Python的日期和時間,並對浮點數進行合適的類型轉換。每行做爲一個NamedTuple保存,名爲Flight,以便高效簡便地使用。

有了Flight對象的RDD,咱們映射一個匿名函數,這個函數將RDD轉換爲一些列的鍵值對,其中鍵是航空公司的名字,值是到達和出發的延誤時間總和。使用reduceByKey動做和add操做符能夠獲得每一個航空公司的延誤時間總和,而後RDD被傳遞給驅動程序(數據中航空公司的數目相對較少)。最終延誤時間按照升序排列,輸出打印到了控制檯,而且使用matplotlib進行了可視化。

這個例子稍長,可是但願能演示出集羣和驅動程序之間的相互做用(發送數據進行分析,結果取回給驅動程序),以及Python代碼在Spark應用中的角色。

結論

儘管算不上一個完整的Spark入門,咱們但願你能更好地瞭解Spark是什麼,如何使用進行快速、內存分佈式計算。至少,你應該能將Spark運行起來,並開始在本機或Amazon EC2上探索數據。你應該能夠配置好iPython notebook來運行Spark。

Spark不能解決分佈式存儲問題(一般Spark從HDFS中獲取數據),可是它爲分佈式計算提供了豐富的函數式編程API。這個框架創建在伸縮分佈式數據集(RDD)之上。RDD是種編程抽象,表明被分區的對象集合,容許進行分佈式操做。RDD有容錯能力(可伸縮的部分),更重要的時,能夠存儲到節點上的worker內存裏進行當即重用。內存存儲提供了快速和簡單表示的迭代算法,以及實時交互分析。

因爲Spark庫提供了Python、Scale、Java編寫的API,以及內建的機器學習、流數據、圖算法、類SQL查詢等模塊;Spark迅速成爲當今最重要的分佈式計算框架之一。與YARN結合,Spark提供了增量,而不是替代已存在的Hadoop集羣,它將成爲將來大數據重要的一部分,爲數據科學探索鋪設了一條康莊大道。

有用的連接

但願你喜歡這篇博文!寫做並非憑空而來的,如下是一些曾幫助我寫做的有用連接;查看這些連接,可能對進一步探索Spark有幫助。注意,有些圖書連接是推廣連接,意味着若是你點擊併購買了這些圖書,你將會支持District Data Labs!

這篇更可能是篇入門文章,而不是District Data Labs的典型文章,有些與此入門相關的數據和代碼你能夠在這裏找到:

Spark論文

Spark與Hadoop同樣,有一些基礎論文,我認爲那些須要對大數據集進行分佈式計算的嚴謹數據科學家必定要讀。首先是HotOS(「操做系統熱門話題」的簡寫)的一篇研討會論文,簡單易懂地描述了Spark。第二個是偏理論的論文,具體描述了RDD。

  1. M. Zaharia, M. Chowdhury, M. J. Franklin, S. Shenker, and I. Stoica, 「Spark: cluster computing with working sets,」 in Proceedings of the 2nd USENIX conference on Hot topics in cloud computing, 2010, pp. 10–10.
  2. M. Zaharia, M. Chowdhury, T. Das, A. Dave, J. Ma, M. McCauley, M. J. Franklin, S. Shenker, and I. Stoica, 「Resilient distributed datasets: A fault-tolerant abstraction for in-memory cluster computing,」 in Proceedings of the 9th USENIX conference on Networked Systems Design and Implementation, 2012, pp. 2–2.

Spark圖書

  1. 學習Spark
  2. 使用Spark進行高級分析

有用的博文

  1. 設置IPython以使用PySpark
  2. Databricks的Spark參考應用程序
  3. 在EC2上運行Spark
  4. 在Amazon Elastic MapReduce上運行Spark和SparkSQL
相關文章
相關標籤/搜索