Spark入門級小玩

·背景 html

隨着周邊吐槽hadoop的聲音漸漸多起來以後,spark也逐漸進入了你們的視野。以前,筆者有粗略的寫過一篇spark的安裝和性能比較[http://www.cnblogs.com/zacard-orc/p/3526007.html],加上這兩天重讀着大學時候的一些基礎書籍,感受IT領域大局勢就像DNA的結構同樣。百家齊鳴卻又萬象歸一,就像APP與H5的戰爭同樣,內存計算及磁盤計算在各領風騷數十年後,可能漸漸也有了一絲明朗的陽光,同時也給了一次屌絲走向高富帥的機會。此次再寫一篇,不作枯燥理論的複製粘貼,就把這幾天工做上碰到的一些內容更形象地與SPARK貼合起來。因爲以前接觸python很少,花了一天時間在上面,終於喝了兩口python的湯,也正好切好本文的角度,已一個局外人的視角來解析spark的方便。寫的很差之處,請拍~。 python

·Spark裏的經常使用名字 算法

pySpark.SparkContext:字面理解spark專屬的上下文,承上啓下。更形象的說,就像你的一份簡歷,上面有不少的字段(屬性),方便你來告訴spark此次的 任務你想幹什麼。若是你忘了一些設置,或者想個性化一些設置,能夠再從pySpark.conf中從新進行設定。 運維

pySpark.SparkContext.textFile:知道了任務,總要告訴Spark你具體要處理的對象。對大多數人來講,讀文件是繞不開的一些。這個函數就是讀取文件的神器。雖然如今的pyspark還不支持streaming,可是預計時間上也是早晚的事。 分佈式

RDD:字面理解,彈性數據集合。再粗俗點,你們把日誌讀到"內存",這個內存形式的會比較怪異。見下圖,它可能存在的多種形式。函數

Partition:在一些常見的PPT介紹中,每一個人對其的理解也有很大的差異,有的認爲,RDD是PARTITION的一部分,多個RDD組成PARTITION。也有認爲PARTITION是RDD的一部分。這個只能說,E文單詞有時候還真的挺隱晦。從官網API的mapPartition上看,筆者以爲Spark更推薦上圖3的使用形式。可是,有了Yarn以後,Spark立馬從張遼變成了張飛。oop

 

Map+Reduce:OK。數據有了存在的形式,接下來幹嗎。那就是本身改刀。在此須要提醒的時,在pyspark提示符號下,做爲剛接觸的同窗能夠多敲敲type。敲完以後,便一目瞭然。 性能

    spark 提供了不少API供你們發揮,惟獨???須要從python中本身打造,有時候神和人的區別也許就在那???中能找到答案。PS:筆者是人。 spa

Shared Variables:MR這就走完了,可是Spark除了內存和並行計算是主打賣點外,還有一件事情是他的賣點,就是共享變量,其重要性不管在哪一種語言的API中都位於一級目錄。這個還真是方便,在有些MR的任務中,每每要插入第三方數據或者亂入的數據。以前hadoop streaming 能夠有conf參數提供,可是是靜態,若是像中途變動,就是重啓服務。正是這種狀況下,Shared Variables中的broadcast發揮了強大的功能,能寫能讀,方便靈活,類同範圍攻擊。官網介紹它時,說他能夠放LARGE DATA並且還能用Spark本身的算法最快地發佈到每一個Worker上。因爲筆者未讀源碼,只是抓包看了一下,不是UDP的組播。另外還有一個變量叫Accumulators,這個能讀不能寫,字面義是"累加器",官網也是這麼演示,可是筆者更看重它的另一層E文翻譯"蓄水池",除傳統累加外,還能作一些MR過程當中的臨時統計,但又不輸出到RDD結果。 翻譯

 

·Spark入門演示

    此次的演示不從官網角度出發,不從複製粘貼開始,就從最實際的工做切入。舉個栗子,要基礎的運維統計,統計每行日誌中哪些耗時超過5100毫秒的操做記錄。而後咱們一步一步來。日誌的樣本以下所示,爲了文章的效果,只顯示5行,而且已經放到了hdfs://cent8:9000/input的目錄下

 

先進入pyspark目錄

因爲pyspark啓動時默認加載了spark許多類庫,因此本來寫在腳本中的import xxx from sparkxxx均可以省略

先試着打開文件,很EAYS,打開的同時就已經分佈式地加載到了內存中,此時words就是RDD類。

words = sc.textFile("/input/2.txt")

看下words裏面是什麼,words.collect(),並且它返回了一個列表。

 

知道它是一個列表後,python同窗的發揮空間就打開了,接下來咱們繼續。把日誌明細拆開,這裏會開啓MAP,固然不開也能夠。

def f(x):
    a=x.split(' ')
    b=a[8].split(':')
    return int(b[0])

words.map(f).collect()

同樣的拆完以後,它返回了一個列表,咱們稍加改動一下,把耗時大於5100的記錄展現出來,它就變成了。

def f(x):
    a=x.split(' ')
    b=a[8].split(':')
    if(int(b[0])>5100):
        print x
        return x    

words.map(f).collect()

找日誌的活就結束了,前面咱們還說到了一個broadcast變量,官網給的例子過短了,讓人理解困難。我來把它從新改造一下。

bv = sc.broadcast([13,23,33]) 設置了broadcast,而後就能夠在自定義函數中本身飲用了。

def f(x):
    a=x.split(' ')
    b=a[8].split(':')
    if(int(b[0])>5100):
        print x
        return int(b[0])+int(bv.value[2])        

words.map(f).collect()

咱們把BV的值再改一次改爲[100,200,300]看會發生什麼

OK,此次爲此,Spark應用大門就此完全打開了。固然這個spark的世界很大很大,包括不少屌絲逆襲高富帥的ML類,此文僅僅滄海一藕。若是想深刻進去,涉及的知識面可謂覆蓋了幾乎整個時下流行的計算機體系的邊邊角角。

相關文章
相關標籤/搜索