1. RDD-(Resilient Distributed Dataset)彈性分佈式數據集
Spark以RDD爲核心概念開發的,它的運行也是以RDD爲中心。有兩種RDD:第一種是並行Collections,它是Scala collection,能夠進行並行計算;第二種是Hadoop數據集,它是並行計算HDFS文件的每條記錄,凡是Hadoop支持的文件系統,均可以進行操做。這兩種RDD都以一樣的方式處理。java
1.1 RDD之並行Collections
並行Collections由SparkContext的parallelize方法,在一個已經存在的Scala collection上建立。這個collection上的成員會被copy成分佈式數據庫,也就是copy到全部節點,因而就能夠進行 並行計算了。舉例以下:
數據庫
#scala的collection scala> val data = Array(1, 2, 3, 4, 5) data: Array[Int] = Array(1, 2, 3, 4, 5) #並行collection scala> val distData = sc.parallelize(data) distData: spark.RDD[Int] = spark.ParallelCollection@10d13e3e
第一條語句建立一個Scala collection,第二條語句將它轉化成並行collection。並行collection有一個重要參數,就是slices數,spark在進行計算的時候,每一個slice對應一個task。一般,一個 CPU對應2~4個slice。通常狀況下,Spark會根據集羣的情況,自動計算slice,也能夠手動指定,好比說,paralize(data,10)就是指定了10個slice。
express
1.2 RDD之Hadoop數據集
Spark支持在任何Hadoop能處理的文件系統上建立分佈式數據集,包括本地文件系統,Amazon S3,Hypertable,HBase等等。Spark支持文本文件,序列文件,以及任何Hadoop的 InputFormat。 好比,從文本文件建立數據集的方式以下: apache
scala> val distFile = sc.textFile("data.txt") distFile: spark.RDD[String] = spark.HadoopRDD@1d4cee08
若是給distFile設置slice數量,形如sc.textFile("data.txt",5)。默認狀況下,spark爲data.txt的每一個block塊設置一個slice。緩存
Note: 手工設置的slice數,只能比文件的block塊數量大,不能比它小。
對於SequenceFile序列文件,SparkContext的sequenceFile[k, v]函數將它轉化成RDD。 對其餘的Hadoop InputFormat,SparkContext.hadoopRDD方法處理。app
2. RDD運算
RDD支持兩種運算:變換transformation-從已有的RDD建立一個新的RDD,如map;或者從action中建立RDD,如reduce。 Spark的transformation都是lazy的,Spark會記下這些transformation,不馬上計算結果,直到action須要返回結果的時候再進行計算。
Note: 默認狀況下,每一個RDD的transformation都會從新計算,但若是將RDD用persist持久化到內存裏,或者緩存到內存裏,它就不從新計算了,由此加快查詢速度。 less
3. RDD持久化分佈式
若是一個RDD被持久化了,那麼,每一個節點都會存放這個RDD的全部slice,因而能夠在內存進行計算,能夠重用,這樣可讓後來的action計算的更快,一般會把速度提升至少十倍。對迭代式計算來講,持久化很是關鍵。RDD的persist方法和cache方法均可以進行持久化。RDD是容錯的--若是它的任何部分丟失了,都會從新計算建立。
Note:RDD有不一樣的存儲方式,能夠存在硬盤,或者內存,或者複製到全部節點。而cache函數只有一個默認的存儲方式就是內存。 函數
4. 共享變量-廣播變量、累計量
4.1 廣播變量oop
即在集羣的每一個節點機器上都緩存一個只讀的變量,好比說,每一個節點都保存一份輸入數據的只讀緩存。
廣播變量的使用方式:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar: spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c) scala> broadcastVar.value res0: Array[Int] = Array(1, 2, 3)
Note:建立了廣播變量以後,就不能使用broadcastVar了,要使用broadcastVar.value。
4.2 累計量
只能是用做計數器counter或者求和sum,只能作add運算,例如:
scala> val accum = sc.accumulator(0) accum: spark.Accumulator[Int] = 0 scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) ... 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s scala> accum.value res2: Int = 10
Spark實戰1:計算某段時間內賣的最火的Item
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.husor.Project import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.SparkContext._ /** * Created by Kelvin Lee on 2014/12/10. */ /* Test cases log=item&au=0&id=470158&sid=&_t=1417674589632 log=item&au=0&id=357332&sid=&_t=1417737534715 log=item&au=0&id=431796&sid=&_t=1417739107530 log=item&au=0&id=488016&sid=&_t=1417780009676 log=item&au=0&id=468117&sid=&_t=1417780024422 log=item&au=0&id=468117&sid=&_t=1417780025946 log=item&au=0&id=468117&sid=&_t=1417780025946 log=item&au=0&id=468117&sid=&_t=1417780024422 log=item&au=0&id=141073&sid=&_t=1418054075319 log=item&au=0&id=141073&sid=&_t=1418054264602 * */ object Hot_Product_TopK { def main(args: Array[String]) { println("Test is starting......") System.setProperty("hadoop.home.dir", "d:\\winutil\\") /*if (args.length < 5) { System.err.println("Usage: Input <directory> , Output <directory>") System.exit(1) } val inputFile = args(0) val outDir = args(1) val start_time = args(2).split("_")(0) + " " + args(2).split("_")(1) val end_time = args(3).split("_")(0) + " " + args(3).split("_")(1) val kNum = args(4).toInt*/ val inputFile = "SparkTest/TestData/order.txt" val inputFile1 = "SparkTest/TestData/order1.txt" val outDir = "SparkTest/Output5" val start_time = "2014-12-04 14:20:14" val end_time = "2014-12-08 23:59:14" val kNum = 2 // Checks argument formats val logPattern = """^log=(.+)&au=(.+)&id=(.+)&sid=&_t=(.+)""".r val conf = new SparkConf().setAppName("Hot_Product_TopK").setMaster("local") // Create the context val sc = new SparkContext(conf) val orderlog1s = sc.textFile(inputFile) val orderlogs = orderlog1s.union(sc.textFile(inputFile1)) val transferOrderLogs = orderlogs.map( (line:String) => { // Matches related Data By Regex logPattern val logPattern(itemType,userType,itemId,orderTime) = line // Converts unixTimeStamp type to Date val createdTime = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new java.util.Date(orderTime.substring(0,10).toLong*1000)) // Outputs related data what you want given the order log (itemType,userType,itemId,createdTime) }) // Gets related data between start_time and end_time val givenDateLogs = transferOrderLogs.filter( info => info._4 >= start_time && info._4 <= end_time ) // Counts the related item Id val itemIdCounts = givenDateLogs.map( info1 => (info1._3,1)).reduceByKey(_ + _) //itemIdCounts.saveAsTextFile(outDir) // Sorts related item Ids according to the counts of Item Id val sorted = itemIdCounts.map { //exchange key and value case(key, value) => (value, key) }.sortByKey(true, 1) println("sorted: " + sorted) // Gets the top K's Item Ids val topK = sorted.top(kNum) // Outputs Value and Key to the Console topK.foreach(println) val ex_VK_KV = topK.map { //exchange key and value case(value, key) => (key, value) } // Outputs Key and Value to the Console ex_VK_KV.foreach(println) // Transfers Tuple's to RDD's type, storing result to the file system(such as HDFS or local file) sc.parallelize(ex_VK_KV,2).saveAsTextFile(outDir) sc.stop() println("Test is Succeed!!!") } }