map
, filter
, flatMap
, sample
, groupByKey
, reduceByKey
, union
, join
, cogroup
,mapValues
, sort
,partionBy
等多種操做類型,Spark把這些操做稱爲Transformations。同時還提供Count
, collect
, reduce
, lookup
, save
等多種actions操做。
RDD的特色:java
RDD的好處python
RDD的存儲與分區git
RDD的內部表示
在RDD的內部實現中每一個RDD均可以使用5個方面的特性來表示:github
RDD的存儲級別
RDD根據useDisk、useMemory、deserialized、replication四個參數的組合提供了11種存儲級別:web
val NONE=newStorageLevel(false,false,false) val DISK_ONLY=newStorageLevel(true,false,false) val DISK_ONLY_2=newStorageLevel(true,false,false,2) val MEMORY_ONLY=newStorageLevel(false,true,true) val MEMORY_ONLY_2=newStorageLevel(false,true,true,2) val MEMORY_ONLY_SER=newStorageLevel(false,true,false) val MEMORY_ONLY_SER_2=newStorageLevel(false,true,false,2) val MEMORY_AND_DISK=newStorageLevel(true,true,true) val MEMORY_AND_DISK_2=newStorageLevel(true,true,true,2) val MEMORY_AND_DISK_SER=newStorageLevel(true,true,false) val MEMORY_AND_DISK_SER_2=newStorageLevel(true,true,false,2)
RDD定義了各類操做,不一樣類型的數據由不一樣的RDD類抽象表示,不一樣的操做也由RDD進行抽實現。算法
下面來看一從Hadoop文件系統生成RDD的方式,如:val file = spark.textFile("hdfs://...")
,file變量就是RDD(實際是HadoopRDD實例),生成的它的核心代碼以下:shell
// SparkContext根據文件/目錄及可選的分片數建立RDD, 這裏咱們能夠看到Spark與Hadoop MapReduce很像 // 須要InputFormat, Key、Value的類型,其實Spark使用的Hadoop的InputFormat, Writable類型。 def textFile(path:String,minSplits:Int=defaultMinSplits):RDD[String]={ hadoopFile(path,classOf[TextInputFormat],classOf[LongWritable], classOf[Text],minSplits).map(pair=>pair._2.toString)} // 根據Hadoop配置,及InputFormat等建立HadoopRDD newHadoopRDD(this,conf,inputFormatClass,keyClass,valueClass,minSplits)
對RDD進行計算時,RDD從HDFS讀取數據時與Hadoop MapReduce幾乎同樣的:數據庫
// 根據hadoop配置和分片從InputFormat中獲取RecordReader進行數據的讀取。 reader=fmt.getRecordReader(split.inputSplit.value,conf,Reporter.NULL) val key:K=reader.createKey() val value:V=reader.createValue() //使用Hadoop MapReduce的RecordReader讀取數據,每一個Key、Value對以元組返回。 override def getNext()={ try{ finished=!reader.next(key,value) }catch{ caseeof:EOFException=> finished=true } (key,value) }
val sc=newSparkContext(master,"Example",System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR"))) val rdd_A=sc.textFile(hdfs://.....) val rdd_B=rdd_A.flatMap((line=>line.split("\\s+"))).map(word=>(word,1)) val rdd_C=sc.textFile(hdfs://.....) val rdd_D=rdd_C.map(line=>(line.substring(10),1)) val rdd_E=rdd_D.reduceByKey((a,b)=>a+b) val rdd_F=rdd_B.jion(rdd_E) rdd_F.saveAsSequenceFile(hdfs://....)
對於RDD能夠有兩種計算方式:轉換(返回值仍是一個RDD)與操做(返回值不是一個RDD)。
下面使用一個例子來示例說明Transformations與Actions在Spark的使用。編程
Spark對於資源管理與做業調度可使用Standalone(獨立模式),Apache Mesos及Hadoop YARN來實現。 Spark on Yarn在Spark0.6時引用,但真正可用是在如今的branch-0.8版本。Spark on Yarn遵循YARN的官方規範實現,得益於Spark天生支持多種Scheduler和Executor的良好設計,對YARN的支持也就很是容易,Spark on Yarn的大體框架圖。緩存
讓Spark運行於YARN上與Hadoop共用集羣資源能夠提升資源利用率。
Spark使用Scala開發,默認使用Scala做爲編程語言。編寫Spark程序比編寫Hadoop MapReduce程序要簡單的多,SparK提供了Spark-Shell,能夠在Spark-Shell測試程序。寫SparK程序的通常步驟就是建立或使用(SparkContext)實例,使用SparkContext建立RDD,而後就是對RDD進行操做。如:
1 val sc=newSparkContext(master,appName,[sparkHome],[jars])
2 val textFile=sc.textFile("hdfs://.....")
3 textFile.map(....).filter(.....).....
Spark支持Java編程,但對於使用Java就沒有了Spark-Shell這樣方便的工具,其它與Scala編程是同樣的,由於都是JVM上的語言,Scala與Java能夠互操做,Java編程接口其實就是對Scala的封裝。如:
JavaSparkContext sc=newJavaSparkContext(...); JavaRDD lines=ctx.textFile("hdfs://..."); JavaRDD words=lines.flatMap( newFlatMapFunction<String,String>(){ publicIterable call(Strings){ returnArrays.asList(s.split(" ")); } } );
如今Spark也提供了Python編程接口,Spark使用py4j來實現python與java的互操做,從而實現使用python編寫Spark程序。Spark也一樣提供了pyspark,一個Spark的python shell,能夠以交互式的方式使用Python編寫Spark程序。 如:
1 from pyspark import SparkContext
2 sc=SparkContext("local","Job Name",pyFiles=['MyFile.py','lib.zip','app.egg'])
3 words=sc.textFile("/usr/share/dict/words")
4 words.filter(lambdaw:w.startswith("spar")).take(5)
以Standalone模式運行Spark集羣
http://spark-project.org/download/spark-0.7.3-prebuilt-cdh4.tgz
)修改配置(conf/*) slaves: 配置工做節點的主機名 spark-env.sh:配置環境變量。
1 SCALA_HOME=/home/spark/scala-2.9.3 2 JAVA_HOME=/home/spark/jdk1.6.0_45 3 SPARK_MASTER_IP=spark1 4 SPARK_MASTER_PORT=30111 5 SPARK_MASTER_WEBUI_PORT=30118 6 SPARK_WORKER_CORES=2SPARK_WORKER_MEMORY=4g 7 SPARK_WORKER_PORT=30333 8 SPARK_WORKER_WEBUI_PORT=30119 9 SPARK_WORKER_INSTANCES=1
把Hadoop配置copy到conf目錄下
在master主機上對其它機器作ssh無密碼登陸
把配置好的Spark程序使用scp copy到其它機器
在master啓動集羣:$SPARK_HOME/start-all.sh
以Yarn模式運行Spark
下載Spark代碼:git clonegit://github.com/mesos/spark
切換到branch-0.8
使用sbt編譯Spark並
$SPARK_HOME/sbt/sbt >package >assembly
把Hadoop yarn配置copy到conf目錄下
運行測試
SPARK_JAR=./core/target/scala-2.9.3/spark-core-assembly-0.8.0-SNAPSHOT.jar\
./run spark.deploy.yarn.Client--jar examples/target/scala-2.9.3/\
--classspark.examples.SparkPi--args yarn-standalone
$SPARK_HOME/spark-shell
進入shell便可,在Spark-shell中SparkContext已經建立好了,實例名爲sc能夠直接使用,還有一個須要注意的是,在Standalone模式下,Spark默認使用的調度器的FIFO調度器而不是公平調度,而Spark-shell做爲一個Spark程序一直運行在Spark上,其它的Spark程序就只能排隊等待,也就是說同一時間只能有一個Spark-shell在運行。在Spark-shell上寫程序很是簡單,就像在Scala Shell上寫程序同樣。
1 scala>val textFile=sc.textFile("hdfs://hadoop1:2323/user/data")
2 textFile:spark.RDD[String]=spark.MappedRDD@2ee9b6e3
3
4 scala>textFile.count()// Number of items in this RDD
5 res0:Long=21374
6
7 scala>textFile.first()// First item in this RDD
8 res1:String=# Spark
在Spark中Spark程序稱爲Driver程序,編寫Driver程序很簡單幾乎與在Spark-shell上寫程序是同樣的,不一樣的地方就是SparkContext須要本身建立。如WorkCount程序以下:
1 import spark.SparkContext
2 import SparkContext._
3
4 objectWordCount{
5 def main(args:Array[String]){
6 if(args.length==0){
7 println("usage is org.test.WordCount <master>")
8 }
9 println("the args: ")
10 args.foreach(println)
11
12 val hdfsPath="hdfs://hadoop1:8020"
13
14 // create the SparkContext, args(0)由yarn傳入appMaster地址
15 val sc=newSparkContext(args(0),"WrodCount",
16 System.getenv("SPARK_HOME"),Seq(System.getenv("SPARK_TEST_JAR")))
17
18 val textFile=sc.textFile(hdfsPath+args(1))
19
20 val result=textFile.flatMap(line=>line.split("\\s+"))
21 .map(word=>(word,1)).reduceByKey(_+_)
22
23 result.saveAsTextFile(hdfsPath+args(2))
24 }
25 }