spark python編程

spark應用程序結構python

  Spark應用程序可分兩部分:driver部分和executor部分初始化SparkContext和主體程序web

A:driver部分編程

      driver部分主要是對SparkContext進行配置、初始化以及關閉。初始化SparkContext是爲了構建Spark應用程序的運行環境,在初始化SparkContext,要先導入一些Spark的類和隱式轉換;在executor部分運行完畢後,須要將SparkContext關閉。瀏覽器

B:executor部分緩存

      Spark應用程序的executor部分是對數據的處理,數據分三種:session

  • 原生數據,包含輸入的數據和輸出的數據app

    • 生成Scala標量數據,如count(返回RDD中元素的個數)、reduce、fold/aggregate;返回幾個標量,如take(返回前幾個元素)。函數

    • 生成Scala集合數據集,如collect(把RDD中的全部元素倒入 Scala集合類型)、lookup(查找對應key的全部值)。oop

    • 生成hadoop數據集,如saveAsTextFile、saveAsSequenceFile搜索引擎

    • scala集合數據集,如Array(1,2,3,4,5),Spark使用parallelize方法轉換成RDD。

    • hadoop數據集,Spark支持存儲在hadoop上的文件和hadoop支持的其餘文件系統,如本地文件、HBase、SequenceFile和Hadoop的輸入格式。例如Spark使用txtFile方法能夠將本地文件或HDFS文件轉換成RDD。

    • 對於輸入原生數據,Spark目前提供了兩種:

    • 對於輸出數據,Spark除了支持以上兩種數據,還支持scala標量

  • RDD,Spark進行並行運算的基本單位,其細節參見RDD 細解。RDD提供了四種算子:

    • 窄依賴算子

    • 寬依賴算子,寬依賴會涉及shuffle類,在DAG圖解析時以此爲邊界產生Stage,如圖所示。

    • 輸入輸出一對一的算子,且結果RDD的分區結構不變,主要是map、flatMap;

    • 輸入輸出一對一,但結果RDD的分區結構發生了變化,如union、coalesce;

    • 從輸入中選擇部分元素的算子,如filter、distinct、subtract、sample。

    • 對單個RDD基於key進行重組和reduce,如groupByKey、reduceByKey;

    • 對兩個RDD基於key進行join和重組,如join、cogroup。

    • 輸入算子,將原生數據轉換成RDD,如parallelize、txtFile等

    • 轉換算子,最主要的算子,是Spark生成DAG圖的對象,轉換算子並不當即執行,在觸發行動算子後再提交給driver處理,生成DAG圖 -->  Stage --> Task  --> Worker執行。按轉化算子在DAG圖中做用,能夠分紅兩種:

    • 緩存算子,對於要屢次使用的RDD,能夠緩衝加快運行速度,對重要數據能夠採用多備份緩存。

    • 行動算子,將運算結果RDD轉換成原生數據,如count、reduce、collect、saveAsTextFile等。

  • 共享變量,在Spark運行時,一個函數傳遞給RDD內的patition操做時,該函數所用到的變量在每一個運算節點上都複製並維護了一份,而且各個節點之間不會相互影響。可是在Spark Application中,可能須要共享一些變量,提供Task或驅動程序使用。Spark提供了兩種共享變量:

    • 廣播變量,能夠緩存到各個節點的共享變量,一般爲只讀,使用方法:

    • >>> from pyspark.context import SparkContext                    >>> sc = SparkContext('local', 'test')                           >>> b = sc.broadcast([1, 2, 3, 4, 5])                                    >>> b.value[1, 2, 3, 4, 5]                                                        >>> sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect()[1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
    • 累計器,只支持加法操做的變量,能夠實現計數器和變量求和。用戶能夠調用SparkContext.accumulator(v)建立一個初始值爲v的累加器,而運行在集羣上的Task能夠使用「+=」操做,但這些任務卻不能讀取;只有驅動程序才能獲取累加器的值。使用方法:

python編程

實驗項目

sogou日誌數據分析

實驗數據來源:sogou精簡版數據下載地址

數據格式說明:

訪問時間\t用戶ID\t[查詢詞]\t該URL在返回結果中的排名\t用戶點擊的順序號\t用戶點擊的URL

其中,用戶ID是根據用戶使用瀏覽器訪問搜索引擎時的Cookie信息自動賦值,即同一次使用瀏覽器輸入的不一樣查詢對應同一個用戶ID。

以上數據格式是官方說明,實際上該數據集中排名和順序號之間不是\t分割,而是空格分割。

一個session內查詢次數最多的用戶的session與相應的查詢次數

import sys  
from pyspark import SparkContext  
  
if __name__ == "__main__":  
    if len(sys.argv) != 2:  
        print >> sys.stderr, "Usage: SogouC <file>"  
        exit(-1)  
    sc = SparkContext(appName="SogouC")  
    sgRDD = sc.textFile(sys.argv[1])  
    print sgRDD.filter(lambda line : len(line.split('\t')) == 5).map(lambda line : (line.split('\t')[1],1)).reduceByKey(lambda x , y : x + y ).map(lambda pair : (pair[1],pair[0])).sortByKey(False).map(lambda pair : (pair[1],pair[0])).take(10)  
    sc.stop()

 

虛擬集羣中任意節點運行命令:./bin/spark-submit --master spark://hadoop1:7077 --executor-memory 3g --driver-memory 1g SogouC.py hdfs://hadoop1:8000/dataguru/data/mini.txt

運行結果:[(u'11579135515147154', 431), (u'6383499980790535', 385), (u'7822241147182134', 370), (u'900755558064074', 335), (u'12385969593715146', 226), (u'519493440787543', 223), (u'787615177142486', 214), (u'502949445189088', 210), (u'2501320721983056', 208), (u'9165829432475153', 201)]

相關文章
相關標籤/搜索