spark應用程序結構python
Spark應用程序可分兩部分:driver部分和executor部分初始化SparkContext和主體程序web
A:driver部分編程
driver部分主要是對SparkContext進行配置、初始化以及關閉。初始化SparkContext是爲了構建Spark應用程序的運行環境,在初始化SparkContext,要先導入一些Spark的類和隱式轉換;在executor部分運行完畢後,須要將SparkContext關閉。瀏覽器
B:executor部分緩存
Spark應用程序的executor部分是對數據的處理,數據分三種:session
原生數據,包含輸入的數據和輸出的數據app
生成Scala標量數據,如count(返回RDD中元素的個數)、reduce、fold/aggregate;返回幾個標量,如take(返回前幾個元素)。函數
生成Scala集合數據集,如collect(把RDD中的全部元素倒入 Scala集合類型)、lookup(查找對應key的全部值)。oop
生成hadoop數據集,如saveAsTextFile、saveAsSequenceFile搜索引擎
scala集合數據集,如Array(1,2,3,4,5),Spark使用parallelize方法轉換成RDD。
hadoop數據集,Spark支持存儲在hadoop上的文件和hadoop支持的其餘文件系統,如本地文件、HBase、SequenceFile和Hadoop的輸入格式。例如Spark使用txtFile方法能夠將本地文件或HDFS文件轉換成RDD。
對於輸入原生數據,Spark目前提供了兩種:
對於輸出數據,Spark除了支持以上兩種數據,還支持scala標量
RDD,Spark進行並行運算的基本單位,其細節參見RDD 細解。RDD提供了四種算子:
窄依賴算子
寬依賴算子,寬依賴會涉及shuffle類,在DAG圖解析時以此爲邊界產生Stage,如圖所示。
輸入輸出一對一的算子,且結果RDD的分區結構不變,主要是map、flatMap;
輸入輸出一對一,但結果RDD的分區結構發生了變化,如union、coalesce;
從輸入中選擇部分元素的算子,如filter、distinct、subtract、sample。
對單個RDD基於key進行重組和reduce,如groupByKey、reduceByKey;
對兩個RDD基於key進行join和重組,如join、cogroup。
輸入算子,將原生數據轉換成RDD,如parallelize、txtFile等
轉換算子,最主要的算子,是Spark生成DAG圖的對象,轉換算子並不當即執行,在觸發行動算子後再提交給driver處理,生成DAG圖 --> Stage --> Task --> Worker執行。按轉化算子在DAG圖中做用,能夠分紅兩種:
緩存算子,對於要屢次使用的RDD,能夠緩衝加快運行速度,對重要數據能夠採用多備份緩存。
行動算子,將運算結果RDD轉換成原生數據,如count、reduce、collect、saveAsTextFile等。
共享變量,在Spark運行時,一個函數傳遞給RDD內的patition操做時,該函數所用到的變量在每一個運算節點上都複製並維護了一份,而且各個節點之間不會相互影響。可是在Spark Application中,可能須要共享一些變量,提供Task或驅動程序使用。Spark提供了兩種共享變量:
廣播變量,能夠緩存到各個節點的共享變量,一般爲只讀,使用方法:
>>> from pyspark.context import SparkContext >>> sc = SparkContext('local', 'test') >>> b = sc.broadcast([1, 2, 3, 4, 5]) >>> b.value[1, 2, 3, 4, 5] >>> sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect()[1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
累計器,只支持加法操做的變量,能夠實現計數器和變量求和。用戶能夠調用SparkContext.accumulator(v)建立一個初始值爲v的累加器,而運行在集羣上的Task能夠使用「+=」操做,但這些任務卻不能讀取;只有驅動程序才能獲取累加器的值。使用方法:
python編程
實驗項目
sogou日誌數據分析
實驗數據來源:sogou精簡版數據下載地址
數據格式說明:
訪問時間\t用戶ID\t[查詢詞]\t該URL在返回結果中的排名\t用戶點擊的順序號\t用戶點擊的URL
其中,用戶ID是根據用戶使用瀏覽器訪問搜索引擎時的Cookie信息自動賦值,即同一次使用瀏覽器輸入的不一樣查詢對應同一個用戶ID。
以上數據格式是官方說明,實際上該數據集中排名和順序號之間不是\t分割,而是空格分割。
一個session內查詢次數最多的用戶的session與相應的查詢次數
import sys from pyspark import SparkContext if __name__ == "__main__": if len(sys.argv) != 2: print >> sys.stderr, "Usage: SogouC <file>" exit(-1) sc = SparkContext(appName="SogouC") sgRDD = sc.textFile(sys.argv[1]) print sgRDD.filter(lambda line : len(line.split('\t')) == 5).map(lambda line : (line.split('\t')[1],1)).reduceByKey(lambda x , y : x + y ).map(lambda pair : (pair[1],pair[0])).sortByKey(False).map(lambda pair : (pair[1],pair[0])).take(10) sc.stop()
虛擬集羣中任意節點運行命令:./bin/spark-submit --master spark://hadoop1:7077 --executor-memory 3g --driver-memory 1g SogouC.py hdfs://hadoop1:8000/dataguru/data/mini.txt
運行結果:[(u'11579135515147154', 431), (u'6383499980790535', 385), (u'7822241147182134', 370), (u'900755558064074', 335), (u'12385969593715146', 226), (u'519493440787543', 223), (u'787615177142486', 214), (u'502949445189088', 210), (u'2501320721983056', 208), (u'9165829432475153', 201)]