好程序員分享大數據的架構體系

flume採集數據
        MapReduce
        HBse (HDFS)
        Yarn   資源調度系統

展現平臺 數據平臺node

1,提交任務
        2,展現結果數據

spark 分析引擎 S3 能夠進行各類的數據分析 , 可能夠和hive進行整合 ,spark任務能夠運行在Yarnapache

提交任務到集羣的入口類 SC編程

爲何用spark : 速度快,易用,通用,兼容性高分佈式

hadoop
scala
jdk
spark函數

若是結果爲定長的 toBuffer編程變長的oop

啓動流程優化

spark集羣啓動流程 和任務提交ui

主節點 master
子節點work 多臺
start-all。sh 腳本 先啓動master服務 啓動work
master 提交註冊信息 work 響應 work會定時發送心跳信息spa

集羣啓動流程線程

一、調用start-all腳本   ,開始啓動Master
  二、master啓動之後,preStart方法調用了一個定時器,定時的檢查超時的worker
  三、啓動腳本會解析slaves配置文件,找到啓動work的相應節點,開始啓動worker
  四、worker服務啓動後開始調用prestart方法(生命週期方法)開始向全部的master註冊
  五、master接收到work發送過來的註冊信息,master 開始保存註冊信息並把本身的URL響應給worker
  六、worker接收到master的URL後並更新,開始掉用一個定時器,定時的向master發送心跳信息

任務提交流程

將任務rdd經過客戶端submit 提交給Master 的管道 (隊列:先進先出)

worker啓動Executor子進程   來從master拿取任務信息
                  Executor  向客戶端Driver端註冊
                     客戶端收到註冊信息  客戶端就會將任務給 Executor進行人物計算

任務提交流程

一、首先Driver端會經過spark-submit腳本啓動sparkSubmint進程,此時開始建立重要的對象(SparkContext),啓動後開始向Master發送信息開始通訊
    二、Master接收到發送過來的信息後,開始生成任務信息,並把任務信息放到隊列中
    三、master開始把全部有效的worker過濾出來並進行排序,按照空閒的資源進行排序
    四、Master開始向有效的worker通知拿取任務信息,並啓動相應的Executor
    五、worker啓動Executor ,並向Driver反向註冊
    六、Driver開始把生成的task發送給相應的Executor,Executor

WordCount中產生的RDD

hdfs上有三個文件 sc.textFile(「路徑」)方法 生成第一個RDD HadoopRDD 第二個RDD MapPartitionsRDD flatMap(_.split()"") 生成 第三個RDD MapPartitionsRDD

map((_,1))生成第四個RDD  MapPartitionsRDD    reduceByKey  生成第五個 ShuffledRDD      saveAsTextFile  生成第六個RDD MapPartitionsRDD

.toDebugString 能夠看出RDD

分區
Partition 後跟分區 分區自己不會改變 會生成以一個新的RDD分區爲修改後 由於rdd自己不可變 修改後大於本來分區的會發生shullfer 小於的不會發生

coalesce 後跟分區少於原來的分區則會改變 由於不會發生shuffle 大於時則不可改變

PartitionBy 後跟新的分區器 new 全名稱的分區器org.apache.spark.hparPartition

客戶端提交Job任務信息給Master
Master生成任務信息 master 生成任務信息描述任務的數據 通知work 建立相應的Executor

客戶端將job信息給work  work讓Executor 進行計算數據

object Demo {
def main(args: Array[String]): Unit = {

//SparkConf:構架配置信息類,優先於集羣配置文件
//setAppName:指定應用程序名稱,若是不指定,會自動生成相似於uuid產生的名稱
//setMaster:指定運行模式:local[1]-用一個線程模擬集羣運行,local[2] -用兩個集羣模擬線程集羣運行,local[*] -有多少線程就用多少線程運行

val conf= new SparkConf().setAppName("")    // setAppName起名稱  setMaster  在哪裏運行  是本地仍是  []是調用多少線程來運行

.setMaster("local[2]") //在打包上傳集羣時 不須要這一步直接刪除或是註釋掉
//建立提交任務到集羣的入口類(上下文對象)

val sc =  new SparkContext(conf)

//獲取hdfs的數據
val lines = sc.textFile("hdfs://suansn:9000/wc")
val words= lines.flatMap(_.split(" ")) // 切分後生成單詞
val tuples=words.map((_,1)) //將單詞生成一個元組
val sum= tuples.reduceBykey(_+_) // 進行聚合
val PX = sum.sortBy(_._2,false) // 倒敘拍尋
print(PX.collect.toBuffer) // 打印至控制檯 在打包上傳集羣時 不須要這一步直接刪除或是註釋掉

PX.saveAsTextFile("hdfs://suansn:9000/ssss")
sc.stop //釋放資源

}

}

RDD 提供的方法 叫作算子

RDD 數據集 數據的抽象 是一種類型,提供方法處理數據 分佈式 僅僅是指向數據 , 不可變 若是想要其餘的操做 就在另外定義一個 RDD 。 可分區 若是一個文件 小於128M 就是一個分區 若是大於將根據大小來分區

一組分片 一個計算每一個分區的函數 RDD之間的依賴關係 一個Partitioner,即RDD的分片函數。 一個列表,存儲存取每一個Partition的優先位置(preferred location)。

RDD 有兩種類型 一個算子對應一個Action的job

1 、 Transformation  轉換的類型    延遲加載   只是記錄計算過程 並不執行     只有調用  Action 類型的算子後  觸發job 生成計算
                  若是沒有Transformation 算子  而全是Action算子  就沒法優化  集羣一直處於繁忙狀態。

  二、 Action

sc.parallelize 並行方法建立RDD

val rdd1 = sc.parallelize(List(3,4,6,5,8,7,9,2,1),2)

每一個數據乘10
val rdd2 = rdd1.map(_*10)

Array[Int] = Array(30, 40, 60, 50, 80, 70, 90, 20, 10)

利用分區計算 mapPartitions

val rdd2= rdd1.mapPartitions(_.map(_*10)) //map前_ 表示每一個分區的數據 封裝到Iterator

Array[Int] = Array(30, 40, 60, 50, 80, 70, 90, 20, 10)

mapWith //map的變異 也可將元素數據遍歷出來 將分區號做爲輸入 返回到A類型做爲輸出

(constructA: Int => A)(f: (T, A) => RDD[U])

參數列表:(constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => U) // Int => A 操做的每一個分區的分區號,preservesPartitioning: Boolean = false 是否記錄rdd的分區信息 (T, A) T時rdd中的元素

//    實現了柯里化的步驟  兩個A的傳入

rdd1.mapWith(i => i*10)((a, b) => b+2).collect //分區號 i 乘以10 B接收 A 時RDD的元素

Array[Int] = Array(2,2,2,2,12,12,12,12,12)

flatMapWith //分區排序

(constructA: Int => A)(f: (T, A) => Seq[U])
參數列表:(constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => Seq[U])
rdd1.flatMapWith(i => i, true)((x, y) => List((y, x))).collect // i爲分區號 原樣不懂輸出 true 至關於 容許記錄分區信息 Y爲拿到的分區號 X 爲RDD的元素

Array[(Int,Int)] = Array((0,3)(0,4)(0,6)(0,5)(1,8)(1,7)(1,9)(1,2)(1,1))

mapPartitions f: Iterator[T] => Iterator[U]
rdd1.mapPartitions(_.toList.reverse.iterator).collect // 每一個分區顛倒排列

Array[Int] = Array(5, 6, 4, 3, 1, 2, 9, 7, 8)

mapPartitionsWithIndex 循環分區並能夠操做分區號
參數列表:(f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false) //Iterator[(Int) 分區信息 index: Int 分區號
val func = (index: Int, iter: Iterator[(Int)]) => {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
rdd1.mapPartitionsWithIndex(func).collect

Array[String] = Array([partID:0, val: 1], [partID:0, val: 2], [partID:0, val: 3], [partID:0, val: 4], [partID:1, val: 5], [partID:1, val: 6], [partID:1, val: 7], [partID:1, val: 8], [partID:1, val: 9])

aggregate // 聚合算子
(zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

def func1(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
rdd1.mapPartitionsWithIndex(func1).collect

Array[String] = Array([partID:0, val: 1], [partID:0, val: 2], [partID:0, val: 3], [partID:0, val: 4], [partID:1, val: 5], [partID:1, val: 6], [partID:1, val: 7], [partID:1, val: 8], [partID:1, val: 9])

rdd1.aggregate(0)(math.max(_, ), + ) // 循環時 第一個 拿到的時初始值0 第二個_拿到的0分區第一個元素 而後判斷最大值 依次類推 局部聚合完,最後全局聚合時 初始值+ 第0分區的最大值。第1分區的最大值

Int=13

rdd1.aggregate(5)(math.max(_, ), + _) //原理和上面的相同不過初始值時5 這樣獲得的第0 分區的最大值就是 初始值 5 第1分區的最大值仍是9 最後的全局聚合時 就是5 + 5+9

Int=19

val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)
def func2(index: Int, iter: Iterator[(String)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}
rdd2.mapPartitionsWithIndex(func2).collect

Array[String] = Array([partID:0, val: a], [partID:0, val: b], [partID:0, val: c], [partID:1, val: d], [partID:1, val: e], [partID:1, val: f])

rdd2.aggregate("")(_ + , + _) //全局聚合和局部聚合 都屬於字符串拼接 初始值爲空

String = abcdef   String = defabc  //由於不肯定那個分區先完成任務因此 會出現兩種結果

rdd2.aggregate("=")(_ + , + _)

String = ==abc=def

val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y) // 取每一個字符串的長度 第一次與初始值 比較 然後用第二個數據的長度與上一次比較後的長度相比較, 最後全局聚合時 兩個分區最長的字符串和初始值相加

String = 24  String = 42

val rdd4 = sc.parallelize(List("12","23","345",""),2)
rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y) // 運算方法與上面的相同 這個求的字符串是最短的 由於在第二個分區內有個空數據字符串爲0 第一個分區的由於初始值也爲空 因此爲空 tostring後第一次的變爲字符串 0 長度爲1 全局後爲10

String = 10

val rdd5 = sc.parallelize(List("12","23","","345"),2)
rdd5.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y) 與上面相同

String = 11

aggregateByKey 經過相同的key 進行聚合
(zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)
//Partitioner 分區器
val pairRDD = sc.parallelize(List(("mouse", 2),("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12)), 2)
def func2(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}
pairRDD.mapPartitionsWithIndex(func2).collect
// 全局聚合時 不會加 初始值

pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect  // 相同的key的value值進行操做

pairRDD.aggregateByKey(100)(math.max(_, ), + _).collect

combineByKey // 聚合的算子

(createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)

val rdd1 = sc.textFile("hdfs://node01:9000/wc").flatMap(_.split(" ")).map((_, 1))
val rdd2 = rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
rdd2.collect

val rdd3 = rdd1.combineByKey(x => x + 10, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
rdd.collect

val rdd4 = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
val rdd5 = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
val rdd6 = rdd5.zip(rdd4)
val rdd7 = rdd6.combineByKey(List(_), (x: List[String], y: String) => x :+ y, (m: List[String], n: List[String]) => m

countByKey

val rdd1 = sc.parallelize(List(("a", 1), ("b", 2), ("b", 2), ("c", 2), ("c", 1)))
rdd1.countByKey //相同key 的 value的個數
rdd1.countByValue // 把整個rdd當作Value

filterByRange //給定範圍 求

val rdd1 = sc.parallelize(List(("e", 5), ("c", 3), ("d", 4), ("c", 2), ("a", 1)))
val rdd2 = rdd1.filterByRange("c", "d")
rdd2.collect

flatMapValues
val rdd3 = sc.parallelize(List(("a", "1 2"), ("b", "3 4")))
rdd3.flatMapValues(_.split(" "))

foldByKey

val rdd1 = sc.parallelize(List("dog", "wolf", "cat", "bear"), 2)
val rdd2 = rdd1.map(x => (x.length, x))
val rdd3 = rdd2.foldByKey("")(_+_)

val rdd = sc.textFile("hdfs://node01:9000/wc").flatMap(_.split(" ")).map((_, 1))
rdd.foldByKey(0)(_+_)

foreachPartition //
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
rdd1.foreachPartition(x => println(x.reduce(_ + _))) 表示每一個分區的數據的聚合值

keyBy
val rdd1 = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val rdd2 = rdd1.keyBy(_.length) 元素數據的長度生成爲key 元素數據生成爲value
rdd2.collect

keys values
val rdd1 = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val rdd2 = rdd1.map(x => (x.length, x))
rdd2.keys.collect
rdd2.values.collect

checkpoint
sc.setCheckpointDir("hdfs://node01:9000/cp")
val rdd = sc.textFile("hdfs://node01:9000/wc").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
rdd.checkpoint 將checkpoint後的文件 準備存儲 還未存儲 沒有Action 算子沒有運行job
rdd.isCheckpointed 查看是否運行checkpoint
rdd.count 隨便調動Avtion的算子 提交job
rdd.isCheckpointed
rdd.getCheckpointFile 查看checkpoint的文件存儲的位置

repartition, coalesce, partitionBy
val rdd1 = sc.parallelize(1 to 10, 3)
val rdd2 = rdd1.coalesce(2, false)
rdd2.partitions.length

collectAsMap Array 轉換map (kv)對
val rdd = sc.parallelize(List(("a", 1), ("b", 2)))
rdd.collectAsMap

在必定時間範圍內,求全部用戶在通過全部基站停留時間最長的TOP2

思路:獲取用戶產生的log日誌並切分

用戶在基站停留的總時長
  過去基站的基礎信息
  把經緯度信息join到用戶數據中
  求出用戶在某些基站停留的時長的TOP2


  object Demo  {
    def main(args: Array[String]): Unit = {

//模板代碼
val conf = new SparkConf()
.setAppName("ML")
.setMaster("local[2]")
val sc= new SparkContext(conf)

//獲取用戶訪問基站的log
val files=sc.textFile("地址")
//切分用戶的log
val userInfo=files.map(line=>{
val fields=line.split(",")
val phone = fields(0)//用戶手機號
val time = fields(1).toLong//時間戳
val lac = fields(2) //基站ID
val eventType = fields(3)//事件類型
val time_long = if(eventType.equals("1")) -time else time

((phone,lac),time_long)
})

//用戶在相同的基站停留的總時長
val sumedUserAndTime = userInfo.reduceByKey(_+_)

//爲了便於和基站基礎信息進行Join 須要把數據調整,把基站ID做爲key
val lacAndPhoneAndTime sumedUserAndTime.map(tup =>{

val phone = tup._1._1 //用戶手機號
val lac= tup._1._2//基站的ID
val time = tup._2 //用戶在某個基站停留的總時長
(lac,(phone,time))
})
//獲取基站的基礎信息
val lacInfo= sc.textFile("路徑")
//切分基站基礎數據
val lacAndXY=lacInfo.map (line =>{
val fields = line.split(",")
val lac= fields(0)//基站ID
val x = files(1)//經度
val y = fields(2)//緯度

(lac,(x,y))

})

//把經緯度信息join到用戶的訪問信息
val joined=lacAndPhoneAndTime join lacAndXY

//爲了便於之後發呢組排序計算,須要整合數據
val phoneAndTimeAndXY=joined,map(tup=>{
val phone = tup._2._1._1//手機號
val lac = tup._1// ID
val time = tup._2._1._2
val xy = tup._2._2 //經緯度
(phone,time,xy)

})
//按照用戶手機號進行分組
val grouped=phoneAndTimeAndXY.groupBy(_._1)
//按照時長進行組內排序
//val sorted = grouped.map(x => (x._,x._2.toList.sortBy(_._2).reverse))
val sorted = grouped.mapValues(_.toList.sortBy(_._2).reverse)
//整合數據
val filterede=sorted.map(tup =>{
val phone= tup._1

val list = tup._2
val filteredList=list.map(x =>{

val time = x._2
val xy = x._3

List(time,xy)
})

(phone,filteredList)

})

val res = filterede.mapValues(_.take(2))

sc.stop()

}

  }
相關文章
相關標籤/搜索