連接:http://spark.apache.org/docs/latest/programming-guide.htmlhtml
安裝好Spark 後,自帶了一些demo, 路徑在Spark根目錄/examples/src/main/python/python
裏面有些例子,例如統計字數的 wordcount.pyapache
import sys from operator import add from pyspark import SparkContext import sys reload(sys) sys.setdefaultencoding("utf-8") if __name__ == "__main__": if len(sys.argv) != 2: print >> sys.stderr, "Usage: wordcount <file>" exit(-1) sc = SparkContext(appName="PythonWordCount") lines = sc.textFile(sys.argv[1], 1) counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) output = counts.collect() for (word, count) in output: print "%s: %i" % (word, count) sc.stop()
另外參考Spark的python api: http://spark.apache.org/docs/latest/api/python/pyspark.html api
寫了一個小demo,就是練習一下api的使用,作業務很方便。針對於大數據文件作統計分析的。好比幾十兆上百兆的咱們單機處理,上G的就放在hadoop 的 hdfs上。緩存
下面是一個學生成績單。四列字段:學生,以及三科成績。其中學生有重複的(好比額外加分的狀況,須要合併分析)。架構
yang 85 90 30 wang 20 60 50 zhang 90 90 90 li 100 54 0 yanf 0 0 0 yang 12 0 0
固然實際中數據要不少,好比不少列,並且幾十萬行甚至幾百萬行。這裏是一個demo ,至關於在部署前測試。app
在 Spark根目錄/example/src/main/python/ 下新建一個 students.py :框架
#coding=utf-8 import sys from operator import add from pyspark import SparkContext import sys reload(sys) sys.setdefaultencoding("utf-8") def map_func(x): s = x.split() return (s[0],[int(s[1]),int(s[2]),int(s[3])]) def f(x): return x rank = sc.parallelize(range(0,sorted.count())) def add(a,b): return [a[r]+ b[r] for r in range(len(a))] def _merge(a,b): print '****' return [a[r]+ b[r] for r in range(len(a))] #the students who has one score is 100 def has100(x): for y in x: if(y==100): return True return False def allIs0(x): if(type(x) == list and sum(x) == 0): return True return False def subMax(x,y): m = [x[1][i] if(x[1][i] > y[1][i]) else y[1][i] for i in range(3)] return('',m) def sumAll(x,y): return ('',[x[1][i]+y[1][i] for i in range(3)]) if __name__ == "__main__": if len(sys.argv) != 2: print >> sys.stderr, "Usage: students <file>" exit(-1) sc = SparkContext(appName="Students") # 加載學生文件,調用map將學生映射成keyValues.其中,key是學生,Value是學生成績。 # map後的結果如('yang',(85,90,30)) # 以後調用 CombineByKey,將相同窗生的成績相加(合併)。 # 而後調用cache, 將整個數據緩存,以便屢次進行reduce而無需每次都從新生成。 lines = sc.textFile(sys.argv[1], 1).map(map_func).combineByKey(f,add,_merge).cache() #print lines count = lines.count() # 獲取學生中三科成績有滿分的,調用filter來實現 whohas100 = lines.filter(lambda x: filter(has100,x)).collect() # 獲取三科中全部成績都是0的同窗(缺考) whoIs0 = lines.filter(lambda x: filter(allIs0,x)).collect() # 獲取每一個學生的成績總和 sumScore = lines.map(lambda x: (x[0],sum(x[1]))).collect() # 獲取三科中,單科最高分 subM = lines.reduce(subMax) # 獲取學生單科成績的總和,求單科平均分用 sumA = lines.reduce(sumAll) # 總分最高的學生 maxScore = max(sumScore,key = lambda x: x[1]) # 總分最低的學生 minScore = min(sumScore,key = lambda x: x[1]) # 全部學生三科成績平均分 avgA = [x/count for x in sumA[1]] # 根據總分進行排序(默認由小而大) sorted = lines.sortBy(lambda x: sum(x[1])) # 排序並附帶序號 sortedWithRank = sorted.zipWithIndex().collect() # 取出成績最高的前三名同窗,發獎! first3 = sorted.takeOrdered(3,key = lambda x: -sum(x[1])) #print '*'*50 print whohas100 print maxScore print whoIs0 print subM print avgA print sorted.collect() print sortedWithRank print first3 #將結果彙總輸出到文件 file = open('/home/yanggaofei/downloads/result.txt','w') file.write('students num:'+`count`+ '\n') file.write('who has a 100 scores:' + str(whohas100) + '\n') file.write('who all is 0:' + str(whoIs0) + '\n') file.write('the max score of each subject:' + str(subM) + '\n') file.write('the avg score of each subject:' + str(avgA) + '\n') file.write('sorted the students:' + str(sorted.collect()) + '\n') file.write('sorted the students with the rank:' + str(sortedWithRank) + '\n') file.write('the first 3 who will get the award:' + str(first3) + '\n') file.close()
好了,運行:ide
[root@cyouemt spark-1.1.1] # ./bin/spark-submit examples/src/main/python/students.py temp/student.txt
運行結果result.txt以下:oop
students num:5 who has a 100 scores:[(u'li', [100, 54, 0])] who all is 0:[(u'yanf', [0, 0, 0])] the max score of each subject:('', [100, 90, 90]) the avg score of each subject:[61, 58, 34] sorted the students:[(u'yanf', [0, 0, 0]), (u'wang', [20, 60, 50]), (u'li', [100, 54, 0]), (u'yang', [97, 90, 30]), (u'zhang', [90, 90, 90])] sorted the students with the rank:[ ((u'yanf', [0, 0, 0]), 0), ((u'wang', [20, 60, 50]), 1), ((u'li', [100, 54, 0]), 2), ((u'yang', [97, 90, 30]), 3), ((u'zhang', [90, 90, 90]), 4)] the first 3 who will get the award:[ (u'zhang', [90, 90, 90]), (u'yang', [97, 90, 30]), (u'li', [100, 54, 0])]
Spark的運行過程會打印出任務執行的開始過程以及結束。表示沒研究透,不作陳述。。。
相比hadoop,Spark 是一個內存計算的MapReduce, 經過緩存機制,在性能上要好不少。它自身不帶數據系統。可是支持 hdfs,mesos,hbase。文本文件等。從架構和應用角度上看,spark是 一個僅包含計算邏輯的開發庫(儘管它提供個獨立運行的master/slave服務,但考慮到穩定後以及與其餘類型做業的繼承性,一般不會被採用),而不 包含任何資源管理和調度相關的實現,這使得spark能夠靈活運行在目前比較主流的資源管理系統上,典型的表明是mesos和yarn,咱們稱之爲 「spark on mesos」和「spark on yarn」。將spark運行在資源管理系統上將帶來很是多的收益,包括:與其餘計算框架共享集羣資源;資源按需分配,進而提升集羣資源利用率等