『 Spark 』3. spark 編程模式

寫在前面

本系列是綜合了本身在學習spark過程當中的理解記錄 + 對參考文章中的一些理解 + 我的實踐spark過程當中的一些心得而來。寫這樣一個系列僅僅是爲了梳理我的學習spark的筆記記錄,因此一切以可以理解爲主,沒有必要的細節就不會記錄了,並且文中有時候會出現英文原版文檔,只要不影響理解,都不翻譯了。若想深刻了解,最好閱讀參考文章和官方文檔。html

其次,本系列是基於目前最新的 spark 1.6.0 系列開始的,spark 目前的更新速度很快,記錄一下版本好仍是必要的。python

最後,若是各位以爲內容有誤,歡迎留言備註,全部留言 24 小時內一定回覆,很是感謝。 git

Tips: 若是插圖看起來不明顯,能夠:1. 放大網頁;2. 新標籤中打開圖片,查看原圖哦。github

1. spark 基本編程模式

spark 裏有兩個很重要的概念:SparkContext 通常簡稱爲 sc] 和 RDD,在上一篇文章中 [『 Spark 』2. spark 基本概念解析 有講到。能夠說,sc 和 RDD 貫穿了 spark app 的大部分生命週期,從 app 的初始化,到數據的清洗,計算,到最後獲取,展現結果。數據庫

爲了更加深刻的瞭解 RDD 和基於 RDD 的編程模型,咱們先把 RDD 的屬性簡單的分一個類,而後再經過一張流程圖來理解。apache

1.1 RDD 的屬性

接觸過 RDD 的人確定都知道 transformaction 這兩個核心概念,甚至不少人都認爲 RDD 僅僅有 transformaction 這兩個概念。卻不知其實 RDD 裏面還有不少其餘方法,下面咱們來簡單的分個類,在看這裏的時候最好參考一下官方的 api 文檔編程

RDDapi

  • action : count, take, sample, first, collect ...緩存

  • transform : foreach, glom, map ...微信

  • method : cache, checkpoint, id, isCheckpointed, isEmpty, keys, lookup, max, mean, name, setName ...

  • property : context

看到了嗎,這裏其實 RDD 其實有不少既不是 transform 也不是 action 的函數和屬性,在編寫 spark app 的時候,其實不少時候咱們都會用到那些 method,這樣在開發調試過程當中都會更加方便。好比說 cache, setName, lookup, id 這些,在開發過程當中都頗有用。

1.2 spark 編程模式圖

spark-programming-model.jpg

如圖所示,咱們構建 spark app,通常都是三個步驟:

  • 加載數據集,這裏的數據集大概分爲兩組:

    • 一種是不變的,靜態數據集,大多數場景都是從數據庫,文件系統上面加載進來

    • 另外一種是動態的數據集,通常作 streaming 應用的時候用到,大多數場景是經過 socket 來加載數據,複雜場景能夠經過文件系統,akka actors,kafka,kinesis 和 一些第三方提供的 streaming api [twitter 等] 來做爲數據源加載數據

  • 處理數據,這是重點中的重點,不過不外乎都是從三個方面來完成這裏的數據清理,邏輯運算等:

    • 自定義的一些複雜處理函數或者第三方包 [下面咱們稱爲函數集]

    • 經過 RDD 的 transform,action 和函數集來完成整個處理,計算流程

    • 經過 RDD 提供的 cache,persist,checkpoint 方法把一些處理流程中的重要處理節點和經常使用數據緩存和備份,以加速處理,計算速度

  • 結果展現,這裏通常狀況都是使用 RDD 的 collect,take,first,top 等方法把結果取出來,更經常使用的是先把結果取出來,放到一個數據庫或文件系統上,而後再提供給專門展現結果的另外一個 application 使用。

2. 例子:MC [Monte Carlo]

下面我將從幾個方面來介紹這個例子:首先是介紹蒙特卡羅方法的基本概念和應用,而後是介紹如何用蒙特卡羅方法來估算 pi 的值,最後是看在 spark 集羣中如何用多種方法來實現一個蒙特卡洛應用來計算 pi 的值。

2.1 蒙特卡羅方法介紹

from wiki:

Monte Carlo methods (or Monte Carlo experiments) are a broad class of computational algorithms that rely on repeated random sampling to obtain numerical results. They are often used in physical and mathematical problems and are most useful when it is difficult or impossible to use other mathematical methods. Monte Carlo methods are mainly used in three distinct problem classes:[1] optimization, numerical integration, and generating draws from a probability distribution.

總的來講,蒙特卡羅是一種基於隨機樣本實驗來進行估值的一種計算方法。

2.2 蒙特卡羅方法估算 pi 值原理

用蒙特卡羅方法估算 pi 值,核心方法是利用正方形和圓形面積的比例:

  • 首先,咱們在座標軸上構造一個邊長爲 1 的正方形

  • 其次,咱們以 (0, 0) 爲圓心,構造一個半徑爲 1 的圓形

  • 此時咱們知道這個圓形有 1/4 是在正方形中的,正方形的面積和這 1/4 圓的面積分別是:1 和 pi/4,即 1/4 圓的面積和正方形面積之比恰好是 pi/4

  • 而後經過蒙特卡羅模擬,看看這個比例大概是多少,模擬方法以下:

    • 隨機扔 n 個點 (x, y),其中 x, y 都在 0 和 1 之間

    • 若是 x^2 + y^2 < 0,則把這個點標註爲紅色,表示這個點落在圓內

    • 最後數數有 n 個點中有多少點是紅點,即落在圓內,假設點數爲 m

    • 則這個 1/4 圓的面積和正方形面積的比例應該是:m/n,即 m/n = pi/4 => pi = 4*m/n

mc.gif

2.3 Python 實現蒙特卡羅方法估算 pi 值

import numpy as np

def mc_pi(n=100):
    """Use Monte Calo Method to estimate pi.
    """
    m = 0
    i = 0
    while i < n:
        x, y = np.random.rand(2)
        if x**2 + y**2 < 1:
            m += 1
        i += 1

    pi = 4. * m / n
    res = {'total_point': n, 'point_in_circle': m, 'estimated_pi': pi}
    
    return res

spark-programming-model-11.jpg

2.4 在 spark 集羣中實現蒙特卡羅方法

咱們按照上面寫的三大步驟來寫這個 spark 應用:

  • 加載數據集

    ### iterate number
    
    total = int(100 * 10000)
    local_collection = xrange(1, total)
    
    ### parallelize a data set into the cluster
    
    rdd = sc.parallelize(local_collection)       \
        .setName("parallelized_data")        \
        .cache()
  • 處理數據

    ### randomly generate points
    
    def map_func(element):
        x = random.random()       ## [0, 1)
        y = random.random()       ## [0, 1)
        
        return (x, y)             ## random point
    
    def map_func_2(element):
        x, y = element
        return 1 if x**2 + y**2 < 1 else 0
    
    rdd2 = rdd.map(map_func)            \
              .setName("random_point")  \
              .cache()
    
    ### calculate the number of points in and out the circle
    
    rdd3 = rdd2.map(map_func_2)                 \
               .setName("points_in_out_circle") \
               .cache()
  • 結果展現

    ### how many points are in the circle
    
    in_circle = rdd3.reduce(operator.add)
    pi = 4. * in_circle / total
    print 'iterate {} times'.format(total)
    print 'estimated pi : {}'.format(pi)

2.5 Seems a little complex, really?

上面這個例子,可能會讓一些初步接觸 spark 的人很困惑,"明明幾行代碼就能解決的問題在 spark 裏還有按照這些步驟寫這麼多代碼?難道是老溼又騙我了嗎?"。

wawawa.gif

其實,就從上面這個例子看起來,彷佛 spark 真的沒有什麼優點,可是,上面這個例子的目的是代表 spark 的編程模式,若是你還不相信,能夠把模擬次數加到千萬或者億次以上看看效果。

若是,若是你仍是糾結於 "我騙了你,spark 沒有夢想中的那麼好" 的話,那看下面這一行代碼吧,它也完成了一樣的事情:

### version 1
sc.parallelize(xrange(total))                                 \
    .map(lambda x: (random.random(), random.random()))        \
    .map(lambda x: 1 if x[0]**2 + x[1]**2 < 1 else 0)         \
    .reduce(lambda x, y: x + y)                               \
    / float(total) * 4

### version 2
sc.parallelize(xrange(total))                                  \
    .map(lambda x: 1 if sum(np.random.random(2) ** 2) else 0)  \
    .reduce(lambda x, y: x + y)                                \
    / float(total) * 4

3. Next

下一篇,介紹 spark 的 RDD,以後會單獨介紹 spark 的 dataframe 和 datasets。

4. 打開微信,掃一掃,點一點,棒棒的,^_^

wechat_pay.png

參考文章

本系列文章連接

相關文章
相關標籤/搜索