spark雜記2

#######################################################
Ctrl+Alt+T:生成try catch

Ctrl+N:查找類(enter class name);Ctrl+shift+N:查找文件(enter file name);Ctrl+shift+alt+N:查找文件(enter file name);

Ctrl + F:當前文件查找特定文字、代碼等內容

Ctrl + shift + F:當前項目中查找特定的文字、代碼等內容。(edit—find—find in path)
#######################################################
spark[源碼]-sparkContext詳解[一]
https://www.cnblogs.com/chushiyaoyue/p/7468952.html


Spark :Master、Worker、Driver、Executor工做流程詳解
https://blog.csdn.net/weixin_38750084/article/details/83025172

大量數據去重:Bitmap和布隆過濾器(Bloom Filter)
https://blog.csdn.net/zdxiq000/article/details/57626464
BloomFilter(大數據去重)+Redis(持久化)策略
https://blog.csdn.net/qq_18495465/article/details/78500472

Spark核心技術原理透視一(Spark運行原理)
https://www.jianshu.com/p/1b5f97d5a22a

Spark 核心篇-SparkContext
https://www.cnblogs.com/xia520pi/p/8609602.html

大數據技術,Spark核心技術之運行原理
https://blog.51cto.com/13854477/2347535?source=dra

Spark基本架構及運行原理
https://blog.csdn.net/zxc123e/article/details/79912343

Spark運行原理【史上最詳細】
https://blog.csdn.net/lovechendongxing/article/details/81746988

Spark Mllib
https://www.cnblogs.com/dadadechengzi/p/6993757.html

Spark MLlib機器學習算法、源碼及實戰詳解 書籍及代碼.zip
https://download.csdn.net/download/zjm362/10205368


基於spark和sparkstreaming的word2vec
https://www.cnblogs.com/ulysses-you/p/6863585.html


基於spark word2vec實踐
https://blog.csdn.net/hjj974834257/article/details/79089686

word2vec學習 spark版
https://www.cnblogs.com/aezero/p/4586605.html

spark Word2Vec+LSH類似文本推薦(scala)
https://blog.csdn.net/u013090676/article/details/82716911

Spark MLlib 機器學習
https://www.cnblogs.com/swordfall/p/9456222.html

http://www.r66r.net/(hadoop筆記本【真心贊】)
http://leezk.com/  (有推薦書籍和一些不錯的文章)

#######################################################
RDD(Resilient Distributed Datasets)彈性分佈式數據集:
    分區,只讀,不可變,並行

    RDD[T]
    RDD[(t,s)]

有向無環圖(DAG)

窄依賴:一個父分區只有一個子分區(能夠多個父對應一個子)
寬依賴:一個父分區有多個子分區
Shuffle:含義就是洗牌,將數據打散,父RDD一個分區中的數據若是給了子RDD的多個分區(只要存在這種可能),就是shuffle。Shuffle會有網絡傳輸數據,可是有網絡傳輸,並不意味着就是shuffle。

join多是寬依賴也多是窄依賴

#######################################################
sparkContext構建的頂級三大核心:
    ①DAGScheduler:面向Job的Stage的高層調度器   
    ②TaskScheduler:一個接口,是低層調度器,根據具體的ClusterManager的不一樣會有不一樣的實現。Standalone模式下具體實現的是TaskSchedulerlmpl  
    ③SchedulerBackend:一個接口,根據具體的ClusterManger的不一樣會有不一樣的實現,Standalone模式下具體的實現是SparkDeloySchedulerBackend



#######################################################
transformation/轉換

action/行動




#######################################################
################面試題總結##########################
1.rdd有幾種操做類型?
    ①transformation 轉換操做 (rdd →rdd)
    ②action 行動操做 (rdd →結果集)
    ③controller 控制算子 (對性能效率和容錯方面的支持:persist,cache,checkpoint)

2.寬窄依賴的區別?
    寬依賴:多個子RDD的Partition會依賴同一個父RDD的Partition
    窄依賴:每個父RDD的Partition最多被子RDD的一個Partition使用

3.cache和persist的區別?
    cache:緩存數據,默認是緩存在內存中,本質是調用persist
    persist:緩存數據,能夠指定緩存策略(MEMORY_ONLY,MEMORY_AND_DISK,等等)

4.spark的有幾種部署模式?
    ①本地模式
    ②standalone模式
    ③spark on yarn模式:
        分佈式部署集羣,資源和任務監控交給yarn管理,cluster和client
    ④mesos模式:
        粗粒度模式(Coarse-grained Mode)
        細粒度模式(Fine-grained Mode)

5.Spark爲何比mapreduce快?
    ①基於內存計算,減小低效的磁盤交互;
    ②高效的調度算法,基於DAG;
    ③容錯機制Linage;


6.spark有哪些組件?
    ①master:管理集羣和節點,不參與計算
    ②worker:計算節點,進程自己不參與計算,和master彙報
    ③Driver:運行程序的main方法,建立sparkContext對象
    ④sparkContext: 控制整個Application的生命週期,包括dagscheduler和taskscheduler
    ⑤client:客戶端,用戶提交程序的入口
    ⑥Executor:執行器,在worker上執行任務的組件,用於啓動線程池運行任務
    ⑦RDD: spark基本計算單元
    ⑧DAG Scheduler: 根據job構建基於stage的DAG,並提交stage給TaskScheduler
    ⑨TaskScheduler: 將任務(task)分發給Executor
    ⑩SparkEnv: 線程級別的上下文,存儲運行時的重要組件的引用
    (11)BlockManager:負責存儲管理,建立和查找塊
    (12)SparkConf:負責存儲配置信息




################面試題總結##########################
#######################################################


#######################################################
################hbase##########################
hbase(main):005:0> scan 'label_event_index_table',{LIMIT=>1}
ROW                                   COLUMN+CELL
 10H20190701101233N0                  column=6:1092078ccf0604e3b49abfdfb2b5b46d681, timestamp=1561947156974, value=0.027:1:1561614000000:1092078ccf
                                      0604e3b49abfdfb2b5b46d681:6:{"sourceIDName":"109_|_|Yes\xE5\xA8\xB1\xE4\xB9\x90\xE7\x8E\xB0\xE5\x9C\xBA","org
                                      Categ":"\xE5\xA8\xB1\xE4\xB9\x90"}
 10H20190701101233N0                  column=6:1098aca82c9389f39b8afe70099b18107cd, timestamp=1561947156974, value=0.027:3:1561633525000:1098aca82c
                                      9389f39b8afe70099b18107cd:6:{"sourceIDName":"109_|_|\xE5\xA2\xA8\xE5\x85\xAE\xE7\x9A\x84\xE7\x88\xB1\xE6\x84\
                                      x8F","orgCateg":"\xE5\xA8\xB1\xE4\xB9\x90"}

hbase(main):006:0> get 'label_event_index_table','10H20190701101233N0'
COLUMN                                CELL
 6:1092078ccf0604e3b49abfdfb2b5b46d681 timestamp=1561947156974, value=0.027:1:1561614000000:1092078ccf0604e3b49abfdfb2b5b46d681:6:{"sourceIDName":"1
                                       09_|_|Yes\xE5\xA8\xB1\xE4\xB9\x90\xE7\x8E\xB0\xE5\x9C\xBA","orgCateg":"\xE5\xA8\xB1\xE4\xB9\x90"}
 6:1098aca82c9389f39b8afe70099b18107cd timestamp=1561947156974, value=0.027:3:1561633525000:1098aca82c9389f39b8afe70099b18107cd:6:{"sourceIDName":"1
                                       09_|_|\xE5\xA2\xA8\xE5\x85\xAE\xE7\x9A\x84\xE7\x88\xB1\xE6\x84\x8F","orgCateg":"\xE5\xA8\xB1\xE4\xB9\x90"}


hbase存儲結構:
    RowKey:是Byte array,是表中每條記錄的「主鍵」,方便快速查找,Rowkey的設計很是重要;
    Column Family:列族,擁有一個名稱(string),包含一個或者多個相關列;
    Column:屬於某一個columnfamily,familyName:columnName,每條記錄可動態添加;
    Version Number:類型爲Long,默認值是系統時間戳,可由用戶自定義;
    Value(Cell):Byte array。


################hbase##########################
#######################################################

#######################################################
################redis##########################
NoSQL(Not Only SQL):泛指非關係型數據庫

CAP定理:
    Consistency(一致性), 數據一致更新,全部數據變更都是同步的;
    Availability(可用性), 好的響應性能;
    Partition tolerance(分區容錯性) 可靠性;
定理:任何分佈式系統只可同時知足二點,無法三者兼顧
    CA:傳統Oracle數據庫
    AP:大多數網站架構的選擇
    CP:Redis、Mongodb

DCS,即一種分佈式緩存數據庫服務,將如今很火的幾類內存數據庫Redis、Memcached和內存數據網格進行包裝,提供即開即用、安全可靠、彈性擴容、便捷管理的在線分佈式緩存能力

1.redis:
    redis是一個開源的、使用C語言編寫的、支持網絡交互的、可基於內存也可持久化的Key-Value數據庫(非關係性數據庫)

2.優勢:
    ①速度快,由於數據存在內存中,相似於HashMap,HashMap的優點就是查找和操做的時間複雜度都是O(1)

    ②支持豐富數據類型,支持string,list,set,sorted set,hash

    ③支持事務,操做都是原子性,所謂的原子性就是對數據的更改要麼所有執行,要麼所有不執行

    ④豐富的特性:可用於緩存,消息,按key設置過時時間,過時後將會自動刪除

3.redis數據類型:
    ①字符串(string)
        set key value
        get key
        exists  key   //key是否存在
    ②哈希(hash)
        hset hashKey  key1 value1 key2 value2
        hget hashkey  key1
    ③集合(set)
        sadd setKey value
        scard setKey    //返回集合中元素數量
        sismember setKey value   //查看value是否在集合setKey中
        srem setKey value    //從集合setKey中刪除value
    ④列表(list)
        lpush list  value
        rpop  list
        llen  list
    ⑤有序集合(sort set)
        zadd zset1 key1 value1
        zcard zset1   //統計zset1下key的個數
        zrank zset1 value2   //查看value2在zset1中排名位置
        zrange zset1 0 2 withscores   //查看0到2的全部值和分數按照排名

################redis##########################
#######################################################



#####################################################################
Spark運行模式:
    1. local: 本地線程方式,主要用於開發調試
    Hadoop YARN: 集羣運行在Yarn資源管理器上,資源管理交給Yarn,spark只負責進行任務調度和計算
    2. 各Spark應用程序以相互獨立的進程集合運行於集羣之上,由SparkContext對象進行協調,SparkContext對象能夠視爲Spark應用程序的入口,被稱爲driver program,SparkContext能夠與不一樣種類的集羣資源管理器(Cluster Manager),例如Hadoop Yarn、Mesos等 進行通訊,從而分配到程序運行所需的資源,獲取到集羣運行所需的資源後,SparkContext將獲得集羣中其它工做節點(Worker Node) 上對應的Executors (不一樣的Spark應用程序有不一樣的Executor,它們之間也是獨立的進程,Executor爲應用程序提供分佈式計算及數據存儲功能),以後SparkContext將應用程序代碼分發到各Executors,最後將任務(Task)分配給executors執行。
    3. RDD操做包含 Transformations和Action;全部的transformation都是lazy的

    




#######################################################
application
job
stage
task

master
worker
executor
driver


碰到第一個action算子的時候,至關於觸發了sc.runjob方法執行

DAGScheduler工做:從action算子開始,到把stage變成TaskSet提交到taskScheduler中去執行結束

###########################################################
distinct,groupByKey,reduceByKey,aggregateByKey,join,cogroup,repartition

數據傾斜的解決方案:
    ①使用Hive ETL預處理數據
        從根源上提早處理好hive表中的數據;
        (缺點:治標不治本,spark程序沒有解決數據傾斜的能力)
    ②過濾少數致使傾斜的key
    ③提升shuffle的並行度
        實現簡單,增長shuffle read task數量
        (缺點:緩解傾斜)
    ④兩階段聚合(局部聚合+全局聚合)
        第一階段隨機打亂,好比key能夠加上前綴,將其分開;第二步再將前綴去掉
        (對於聚合類的shuffle操做,最好的解決方案)
    ⑤將reduce join轉爲 map join
        //reduce join 通用的join實現;但容易產生數據傾斜
        rdd1.join(rdd2)
        //map join  完美的避開了shuffle階段,因此沒有數據傾斜;但適用場景有限,只適合大小表作關聯
        val bc=sc.broadCast(rdd1.toList)
        rdd1.foreachPartition(data =>{
            val data2=bc.value
            val data1=data
            data1.join(data2)
        })        
    ⑥採樣傾斜key並分拆join操做
        將出現數據傾斜的key的全部數據,造成單獨的數據集
        
    ⑦使用隨機前綴和擴容的RDD進行join
    ⑧多種方案組合使用
#########################################################################
sc.read.format("json").load("/test/")
sc.read.json("test/")

df.write.json("test/")
df.write.format("json").save("/test/")

sc.read.load("test/xxx.parquet") //默認parquet格式,等價下一
sc.read.format("parquet").load("test/")

#######################################################################

①rdd.toDF
②sqlContext.createDataFrame(rdd)








#####################################################################
Block
輸入可能以多個文件的形式存儲在HDFS上,每一個File都包含了不少塊,稱爲Block。

InputSplit
當Spark讀取這些文件做爲輸入時,會根據具體數據格式對應的InputFormat進行解析,通常是將若干個Block合併成一個輸入分片,稱爲InputSplit,注意InputSplit不能跨越文件。
隨後將爲這些輸入分片生成具體的Task。InputSplit與Task是一一對應的關係。

job
在spark rdd中,有action、transform操做,當真正觸發action時,才真正執行計算,此時產生一個job任務

stage
以shuffle爲界,當在一個job任務中涉及shuffle操做時,會進行stage劃分,產生一個或多個stage。
Stage概念是spark中獨有的。通常而言一個Job會切換成必定數量的stage。各個stage之間按照順序執行。至於stage是怎麼切分的,首選得知道spark論文中提到的narrow dependency(窄依賴)和wide dependency( 寬依賴)的概念。其實很好區分,看一下父RDD中的數據是否進入不一樣的子RDD,若是隻進入到一個子RDD則是窄依賴,不然就是寬依賴。寬依賴和窄依賴的邊界就是stage的劃分點

task
一個stage可能包含一個或者多個task任務,task任務與partition、executor息息相關,即並行度。
Task是Spark中最新的執行單元。RDD通常是帶有partitions的,每一個partition的在一個executor上的執行能夠任務是一個Task。

partition
partition個數即rdd的分區數,不一樣的數據源讀進來的數據分區數默認不一樣,能夠經過repartition進行重分區操做。

executor
executor運行在work上,一個work能夠運行一個或多個executor,一個executor能夠運行一個或者多個task(取決於executor的core個數,默認是一個task佔用一個core,即有多少個core就能夠啓動多少個task任務)


Application
application(應用)其實就是用spark-submit提交的程序。比方說spark examples中的計算pi的SparkPi。一個application一般包含三部分:從數據源(比方說HDFS)取數據造成RDD,經過RDD的transformation和action進行計算,將結果輸出到console或者外部存儲(比方說collect收集輸出到console)。

Driver
Spark中的driver感受其實和yarn中Application Master的功能相相似。主要完成任務的調度以及和executor和cluster manager進行協調。有client和cluster聯衆模式。client模式driver在任務提交的機器上運行,而cluster模式會隨機選擇機器中的一臺機器啓動driver。從spark官網截圖的一張圖能夠大體瞭解driver的功能。



######################################################################

schema: 表的字段的定義(名稱,類型);當前這個表的數據存儲目錄
data:真實數據



離線計算   
批任務
實時處理   
流式處理
################################

storm  
    嚴格的一條數據計算一次,流式處理,但不必定是實時處理
    實時要求能夠很高,吞吐量低
sparkStreaming
    一批數據計算一次(每一個批次的時間間隔用戶自由設置)
    實時要求低一點,吞吐量高一點(折中一點)
    流式計算是離線計算的一個特例
    
flink
    
################################
mapreduce: 每隔一段時間,執行一次任務,對兩次任務之間的數據進行累積

storm: 一條一條的執行計算,(流式處理的核心思路);延遲低,亞秒級;數據消費至少一次(trident API 有且僅一次)

sparkStreaming: 很小的一批批的取執行計算(常見在1s到5min);僞實時;基於sparkCore,基於離線處理;有且僅一次

flink: 把離線處理看作是流式處理的一個特例,離散的批處理,基於流式處理;有且僅一次

#################################################

 
          節點            進程         線程
spark     worker          executor      task
storm     supervisor      worker        executor

###################################################
一站式通用解決方案

DStream:
    離散的RDD組成
    
UpdateStateByKey


輸入數據流 → (接收器) →結果輸出


有狀態計算
無狀態計算

#####################################################
窗口長度
滑動週期
兩者都要是時間片的整數倍
每隔多長時間(滑動週期)計算過去多長時間(窗口長度)的數據

###################################################
package com.huawei.rcm.newsfeed.test

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession}


object test38 {

  case class Fruit(name: String,age:Int,address:String)

  def main(args: Array[String]): Unit = {
    val ss = SparkSession.builder()
      .config("Spark SQL basic example","some-value")
      .master("local[2]")
      .appName("hello")
      //.enableHiveSupport()
      .getOrCreate()
    //enableHiveSupport,必需要有配置文件hive-site.xml
    val sc = ss.sparkContext
    val sqlContext =new SQLContext(sc)
    ss.sparkContext.setLogLevel("ERROR")


    val rdd=sc.parallelize(List(("apple",1,"China"),("strawberry",2,"China"),("banana",3,"America"),("orange",4,"France")))
    val fruitRDD: RDD[Fruit] = rdd.map(x=>Fruit(x._1,x._2,x._3))
    import ss.implicits._
    val fruitDF=fruitRDD.toDF("name","age","Address")
    fruitDF.show()
    //sql風格
    fruitDF.registerTempTable("fruitTable")

    val resutDF: DataFrame = sqlContext.sql("select * from fruitTable where age >2")
    resutDF.show()

    val resultDF2: DataFrame = ss.sql("select * from fruitTable where age >=2")
    resultDF2.show()


    sc.stop()
  }
}

###################################################
mapreduce  典型的離線處理計算引擎
storm     流失處理計算引擎
spark    離線處理+流式處理;基於離線處理的執行引擎來設計的;把流式處理看做是離線處理的特例
flink   流式處理+批處理;基於流式處理的執行引擎來進行設計的; 把離線處理看做是流式處理的特例


#############################################
storm核心概念:
    topology   一個storm的應用程序
    spout   數據源
    bolt    數據處理組件
    tuple   一個消息一條記錄
    StreamingGrouping   分組規則
    
##############################################
實時流式處理的大體架構:
tomcat → log4j → logFile(實時監聽) → flume(實時收集)  → kafka(緩衝做用,上游收集和下游消費的速度調節) →計算引擎(實時計算/流式計算)  → redis/mysql/hbase

離線:
flume → hdfs → mapreduce/hive → hbase/mysql/hdfs/redis

流式:
flume → kafka → storm/sparkStreaming → redis/mysql/hbase


flume既能作到監控文件的變化(tail -F exec),也能由事件進行驅動收集(spooldir)

##############################################
connection 不能序列化
因此connection在driver端,而寫入數據在executor端,

鏈接池:第一次訪問的時候,須要創建鏈接。 可是以後的訪問,均會複用以前建立的鏈接


##########################################
checkpoint
    1.checkpoint的數據類型:
        元數據
        RDD數據
    2.合適啓用checkpoint
        有狀態計算:updateStateByKey,window
        若是有須要對diver進行HA
    3.如何配置checkpoint
        streamingContext.checkpoint(hdfspath)
        def functionToCreateStreamingContext():StreamingContext
        StreamingContext.getOrCreate(chkDir,functionToCreateStreamingContext)

        
###############################################
對數據的消費有三種語義:
at most once  最多一次,有可能會漏消費
at least once   最少一次,有可能重複消費
exactly once    有且僅有一次,效率低


#################################################
每個stage中的task數量,都是有這個stage中最後一個RDD的分區數巨鼎



#####################################################
Resilient Distributed Dataset
彈性(可在內存/磁盤,分區可變)分佈式數據集


transformation/轉換
    延遲計算,rdd → rdd
action/行動
    觸發sparkcontext提交job做業,rdd → 輸出結果

寬依賴:指的是多個子RDD的Partition會依賴同一個父RDD的Partition
窄依賴:指的是每個父RDD的Partition最多被子RDD的一個Partition使用

#############################################
在基於standalone的Spark集羣,Cluster Manger就是Master。Master負責分配資源,在集羣啓動時,Driver向Master申請資源,Worker負責監控本身節點的內存和CPU等情況,並向Master彙報

每一個worker能夠起一個或多個Executor
每一個Executor由若干core組成,每一個Executor的每一個core一次只能執行一個Task

一個application經過action劃分不一樣job,在job中最後一個算子往前推按寬依賴劃分不一樣stage

經過DAGScheduler劃分階段,造成一系列的TaskSet,而後傳給TaskScheduler,把具體的Task交給Worker節點上的Executor的線程池處理。線程池中的線程工做,經過BlockManager來讀寫數據。
######################################################
密集向量dense
稀疏向量sparse







############################################
需求:有兩份數據集,一份是邊,一份是點。求點的PageRank
格式:
邊:sourceID destID
點:name sourceID





















html

相關文章
相關標籤/搜索