sparkjava
spark的優點:node
(1)每個做業獨立調度,能夠把全部的做業作一個圖進行調度,各個做業之間相互依賴,在調度過程當中一塊兒調度,速度快。mysql
(2)全部過程都基於內存,因此一般也將Spark稱做是基於內存的迭代式運算框架。web
(3)spark提供了更豐富的算子,讓操做更方便。算法
爲何Spark比Map Reduced運算速度快:sql
(1)Spark計算比MapReduce快的根本緣由在於DAG計算模型。通常而言,DAG相比Hadoop的MapReduce在大多數狀況下能夠減小shuffle次數。數據庫
(2)Hadoop每次計算的結果都要保存到hdfs,而後每次計算都須要從hdfs上讀書數據,磁盤上的I/O開銷比較大。 spark一次讀取數據緩存在內存中,內存的數據讀取比磁盤數據讀取快不少。還有一點就是spark的RDD數據結構,RDD在每次transformation後並不當即執行,並且action後才執行,有進一步減小了I/O操做。apache
(3)MR它必須等map輸出的全部數據都寫入本地磁盤文件之後,才能啓動reduce操做,由於mr要實現默認的根據Key的排序!因此要排序確定得寫完全部數據,才能排序,而後reduce來拉取。可是spark不須要,spark默認狀況下,是不會對數據進行排序的。所以shufflemaptask每寫入一點數據,resulttask就能夠拉取一點數據,而後再本地執行咱們定義的聚合函數和算子,進行計算.編程
spark的DAG有向無循環圖:api
DAG叫作有向無環圖,原始的RDD經過一系列的轉換就造成了DAG,根據RDD之間依賴關係的不一樣將DAG劃分紅不一樣的Stage(調度階段)。對於窄依賴,partition的轉換處理在一個Stage中完成計算。對於寬依賴,因爲有Shuffle的存在,只能在parent RDD處理完成後,才能開始接下來的計算,所以寬依賴是劃分Stage的依據。
spark如何分區: 分區是RDD內部並行計算的一個計算單元,RDD的數據集在邏輯上被劃分爲多個分片,每個分片稱爲分區,分區的格式決定了並行計算的粒度,而每一個分區的數值計算都是在一個任務中進行的,所以任務的個數,也是由RDD(準確來講是做業最後一個RDD)的分區數決定。spark默認分區方式是HashPartitioner.
只有Key-Value類型的RDD纔有分區的,非Key-Value類型的RDD分區的值是None,每一個RDD的分區ID範圍:0~numPartitions-1,決定這個值是屬於那個分區的。
(1)HashPartitioner分區:partition = key.hashCode () % numPartitions,若是餘數小於0,則用餘數+分區的個數,最後返回的值就是這個key所屬的分區ID。
缺點:可能致使每一個分區中數據量的不均勻,極端狀況下會致使某些分區擁有RDD的所有數據
(2)RangePartitioner分區(範圍分區):RangePartitioner會對key值進行排序,而後將key值按照分區個數進行劃分分區.儘可能保證每一個分區中數據量的均勻,並且分區與分區之間是有序的,一個分區中的元素確定都是比另外一個分區內的元素小或者大;可是分區內的元素是不能保證順序的。分界的算法尤其重要。算法對應的函數是rangeBounds.
(3)CustomPartitioner自定義分區:須要繼承org.apache.spark.Partitioner類,sc.parallelize(List((1,'a'),(1,'aa'),(2,'b'),(2,'bb'),(3,'c')), 3).partitionBy(new CustomPartitioner(3))
spark從HDFS中讀取數據是如何分區的:
Spark從HDFS讀入文件的分區數默認等於HDFS文件的塊數(blocks),HDFS中的block是分佈式存儲的最小單元。若是咱們上傳一個30GB的非壓縮的文件到HDFS,HDFS默認的塊容量大小128MB,所以該文件在HDFS上會被分爲235塊(30GB/128MB);Spark讀取SparkContext.textFile()讀取該文件,默認分區數等於塊數即235。
通常合理的分區數設置爲總核數的2~3倍
spark數據傾斜出現的緣由:
根本緣由是分區不均勻,在執行shuffle操做的時候,是按照key,來進行values的數據的輸出、拉取和聚合的。同一個key的values,必定是分配到一個reduce task進行處理的。某個或者某些key對應的數據,遠遠的高於其餘的key。定位數據傾斜就是看哪些地方用了會產生shuffle的算子,groupByKey、countByKey、reduceByKey、join
數據傾斜發生的現象:
(1)大部分的task執行的特別快,剩下的幾個task執行的特別慢.
(2)運行一段時間後,其餘task都已經執行完成,可是有的task可能會出現OOM異常由於task的所分配的數據量太大,並且task每處理一條數據還要建立大量的對象,內存存儲不下.
解決數據傾斜的方法:
(1)聚合源數據:在數據的源頭將數據聚合成一個key對應多個value值.這樣在進行操做時就可能不會出現shuffle過程.
(2)將致使數據傾斜的key提取出來,如果key對應的null或者無效數據,就將其刪除,如果正常的數據,就將其單獨處理,再與正常處理的數據進行union操做.
(3)提升shuffle操做reduce的並行度:將reduce task的數量變多,好比groupByKey、countByKey、reduceByKey。在調用的時候,傳入進去一個參數。那個數字,就表明了那個shuffle操做的reduce端的並行度。那麼在進行shuffle操做的時候,就會對應着建立指定數量的reduce task。
(4)對key先添加隨機值,進行操做後,去掉隨機值,再進行一次操做。將原始的 key 轉化爲 key + 隨機值(例如Random.nextInt),對數據進行操做後,將 key + 隨機值 轉成 key.
reduceByKey與groupByKey的區別:
pairRdd.reduceByKey(_+_).collect.foreach(println) 等價於pairRdd.groupByKey().map(t => (t._1,t._2.sum)).collect.foreach(println)
reduceByKey的結果:(hello,2)(world,3) groupByKey的結果:(hello,(1,1))(world,(1,1,1))
使用reduceByKey()的時候,本地的數據先進行merge而後再傳輸到不一樣節點再進行merge,最終獲得最終結果。而使用groupByKey()的時候,並不進行本地的merge,所有數據傳出,獲得所有數據後纔會進行聚合成一個sequence.groupByKey()傳輸速度明顯慢於reduceByKey()。雖然groupByKey().map(func)也能實現reduceByKey(func)功能,可是,優先使用reduceByKey(func).
Spark運行的全流程:
(1)首先經過spark-submit提交Application應用,後臺就會建立相應的driver進程,driver進程會運行Application中的代碼,
(2)初始化Sparkcontext,Sparkcontext是用戶通向spark集羣的入口,在初始化sparkContext時,同時的會初始化DAGScheduler、TaskScheduler,初始化TaskScheduler的同時,會建立兩個很是重要的對象,分別是 DriverActor 和 ClientActor.
(3)clientActor向master註冊Application,master收到Application的註冊請求後,會使用本身的資源調度算法,通知相應的worker節點爲Application啓動多個Executor.
(4)多個Executor啓動以後,會反向註冊到DriverActor,以後driver結束sparkcontext的初始化,繼續執行接下來的代碼.
(5)在接下來的代碼中,將所遇到的對RDD的全部操做造成一個DAG有向無循環圖,每執行到action操做就會建立一個job到DAGScheduler中,而job又根據RDD的依賴關係劃分紅多個stage,每一個stage里根據最後一個RDD的分區數目來建立task,多個task造成一個taskset
(6)將taskset送到taskscheduler中,而後taskscheduler對task進行序列化,而後將序列化好的task封裝到launchTask中,而後將launchTask發送給指定的executor中運行.
(7)executor接收到了DriverActor 發送過來的launchTask 時,會對launchTask 進行反序列化,封裝到一個TaskRunner 中,而後從executor這個線程池中獲取一個線程來執行指定的任務.
(8)最終當全部的task任務完成以後,整個application執行完成,關閉sparkContext對象。
spark處理任務的過程:
(1)構建DAG(有向無環圖)(調用RDD上的方法)
(2)DAGScheduler將DAG切分Stage(切分的依據是Shuffle),將Stage中生成的Task以TaskSet的形式給TaskScheduler
Spark 的運行模式中有哪幾種:
(1)本地模式:driver和Executors處於同一個jvm
(2)standalone模式:基本Spark內置的集羣搭建模式,運行時要開起master和worker的守護進程.適合於不太依賴Hadoop的運算環境.
(3)基於yarn-cluster模式:做業調度、資源調度由Yarn分配。Yarn在這方面作得比Spark standalone集羣好,適用於存儲計算合一,或者須要依賴MR、Hive等做業的場景,通常用於生產模式.
(4)基於yarn-client模式,通常用來測試,傳輸消耗大,方便調試.
spark的yarn-cluster模式:
在 Yarn-Cluster 模式中,當用戶向 Yarn 中提交一個應用程序後, Yarn 將分兩個階段運行該應用程序:第一個階段是把 Spark 的 Driver 做爲一個 ApplicationMaster 在 Yarn 集羣中先啓動;第二個階段是由 ApplicationMaster 建立應用程序,而後爲它向 ResourceManager 申請資源,並啓動 Executor 來運行 Task,同時監控它的整個運行過程,直到運行完成。(過程相似於mapreduce)
spark的shuffle過程:
未優化:
(1)每個ShufflleMapTask會爲每個ReduceTask建立一個bucket緩存,而且會爲每個bucket建立一個文件。這個bucket存放的數據就是通過Partitioner操做(默認是HashPartitioner)以後找到對應的bucket而後放進去,最後將bucket緩存的數據刷新到磁盤上,即對應的block file.
(2)而後ShuffleMapTask將輸出做爲MapStatus發送到DAGScheduler的MapOutputTrackerMaster,每個MapStatus包含了每個ResultTask要拉取的數據的位置和大小.
(3)ResultTask而後去利用BlockStoreShuffleFetcher向MapOutputTrackerMaster獲取MapStatus,看哪一份數據是屬於本身的,而後底層經過BlockManager將數據拉取過來.
(4)拉取過來的數據會組成一個內部的ShuffleRDD,優先放入內存,內存不夠用則放入磁盤,而後ResulTask開始進行聚合,最後生成咱們但願獲取的那個MapPartitionRDD
缺點:如上圖所示:在這裏有1個worker,2個executor,每個executor運行2個ShuffleMapTask,有三個ReduceTask,因此總共就有4 * 3=12個bucket和12個bucket和12個block file。
若是數據量較大,將會生成M*R個小文件,好比ShuffleMapTask有100個,ResultTask有100個,這就會產生100*100=10000個小文件
bucket緩存很重要,須要將ShuffleMapTask全部數據都寫入bucket,纔會刷到磁盤,那麼若是Map端數據過多,這就很容易形成內存溢出,儘管後面有優化,bucket寫入的數據達到刷新到磁盤的閥值以後,就會將數據一點一點的刷新到磁盤,可是這樣磁盤I/O就多了.與MR徹底不同的是,MR它必須將全部的數據都寫入本地磁盤文件之後,才能啓動reduce操做,來拉取數據。由於mr要實現默認的根據Key的排序!因此要排序確定得寫完全部數據,才能排序,而後reduce來拉取。可是spark不須要,spark默認狀況下,是不會對數據進行排序的。所以shufflemaptask每寫入一點數據,resulttask就能夠拉取一點數據,而後再本地執行咱們定義的聚合函數和算子.spark這種機制的好處在於速度比mr快多了.因爲這種事實拉取的機制,一次提供不了直接處理key對應的valur的算子,只能經過reducebykey,先shuffle,有一個maptartitionsRDD,而後用map算子,來處理每一個key對應的values.
優化後:
(1)每個Executor進程根據核數,決定Task的併發數量,好比executor核數是2,就是能夠併發運行兩個task,若是是一個則只能運行一個task,
(2)假設executor核數是1,ShuffleMapTask數量是M,那麼executor依然會根據ResultTask的數量R,建立R個bucket緩存,而後對key進行hash,數據進入不一樣的bucket中,每個bucket對應着一個block file,用於刷新bucket緩存裏的數據
(3)而後下一個task運行的時候,那麼不會再建立新的bucket和block file,而是複用以前的task已經建立好的bucket和block file。即所謂同一個Executor進程裏全部Task都會把相同的key放入相同的bucket緩衝區中
這樣的話,生成文件的數量就是(本地worker的executor數量*executor的cores*ResultTask數量)如上圖所示,即2 * 1* 3 = 6個文件,每個Executor的shuffleMapTask數量100,ReduceTask數量爲100,那麼 未優化的HashShuffle的文件數是2 *1* 100*100 =20000,優化以後的數量是2*1*100 = 20
缺點:缺點:若是 Reducer 端的並行任務或者是數據分片過多的話則 Core * Reducer Task 依舊過大,也會產生不少小文件。
spark的checkpoint操做:
checkpoint的意思就是創建檢查點,相似於快照,例如在spark計算裏面 計算流程DAG特別長,服務器須要將整個DAG計算完成得出結果,可是若是在這很長的計算流程中忽然中間算出的數據丟失了,spark又會根據RDD的依賴關係從頭至尾計算一遍,這樣子就很費性能,固然咱們能夠將中間的計算結果經過cache或者persist放到內存或者磁盤中,可是這樣也不能保證數據徹底不會丟失,存儲的這個內存出問題了或者磁盤壞了,也會致使spark從頭再根據RDD計算一遍,因此就有了checkpoint,其中checkpoint的做用就是將DAG中比較重要的中間數據作一個檢查點將結果存儲到一個高可用的地方(一般這個地方就是HDFS裏面)
spark的cache和persist的區別:
計算流程DAG特別長,服務器須要將整個DAG計算完成得出結果,可是若是在這很長的計算流程中忽然中間算出的數據丟失了,spark又會根據RDD的依賴關係從頭至尾計算一遍,這樣子就很費性能,固然咱們能夠將中間的計算結果經過cache或者persist放到內存或者磁盤中
cache最終調用了persist方法,默認的存儲級別僅是存儲內存中的,persist有好幾個存儲級別,persist是最根本的底層函數,executor執行時,60%用來緩存RDD,40%用來存放數據.
spark中transform與action操做的區別:
transformation是獲得一個新的RDD,方式不少,好比從數據源生成一個新的RDD,從RDD生成一個新的RDD,action是獲得一個值,或者一個結果(直接將RDD cache到內存中).
全部的transformation都是採用的懶策略,就是若是隻是將transformation提交是不會執行計算的,計算只有在action被提交的時候才被觸發。
spark的RDD與DataFrame以及Dataset的區別:
spark的基本數據結構:RDD是彈性分佈式數據集,編譯時類型安全,具備面向對象的風格RDD是一組表示數據的Java或Scala對象,可是序列化性能開銷大,須要頻繁非刪除對象致使GC性能開銷大.
彈性:RDD的每一個分區在spark節點上存儲時默認是放在內存中的,若內存存儲不下,則存儲在磁盤中
分佈性:每一個RDD中的數據能夠處在不一樣的分區中,而分區能夠處在不一樣的節點中.
容錯性:當一個RDD出現故障時,能夠根據RDD之間的依賴關係來從新計算出發生故障的RDD.
從如下方面是區別三者之間的關係:
(1)數據的表示形式:RDD是數據元素的分佈式集合,RDD是一組表示數據的Java或Scala對象;DataFrame是以列方式構成的分佈式數據集合,相似於關係數據庫中的表;Dataset是DataFrame API的擴展.
(2)數據格式:RDD能夠輕鬆有效地處理結構化和非結構化的數據,DataFrame僅適用於結構化和半結構化數據,Dataset能夠有效地處理結構化和非結構化數據它表示行(row)的JVM對象或行對象集合形式的數據.
(3)編譯時類型安全:RDD提供了一種熟悉的面向對象編程風格,具備編譯時類型安全性。DataFrame嘗試訪問表中不存在的列,則持編譯錯誤僅在運行時檢測屬性錯誤,DataSet能夠在編譯時檢查類型, 它提供編譯時類型安全性。
(4)性能開銷:RDD:分發數據或者將數據寫入磁盤時,會使用java序列化,序列化單個Java或者scala對象的開銷較大,銷燬單個對象時,會致使垃圾回收.
DataFrame:能夠將數據序列化爲二進制的格式存儲在堆外內存中,而後直接在內存中進行轉換,無需使用java序列化來編碼數據.避免在爲數據集中的每一行構造單個對象時引發的垃圾回收。
Dataset:在序列化數據時,它使用spark內部Tungsten二進制格式存儲表格表示,由於序列化是經過Tungsten進行的,它使用了off heap()數據序列化,不須要垃圾回收器來摧毀對象
spark廣播變量及其原理:
當在Executor端用到了Driver變量,不使用廣播變量,在每一個Executor中有多少個task就有多少個Driver端變量副本。若是使用廣播變量在每一個Executor端中只有一份Driver端的變量副本。廣播變量在Driver定義,在Exector端不可改變,在Executor端不能定義
原理:實際上就是Executor端用到了driver端的變量,若是在executor端你使用到了driver端的廣播變量,若是不使用廣播變量,在每一個executor中有多少task就有多少變量副本。使用了廣播變量,實際上就是爲了減小executor端的備份,最終減小executor端的內存。
spark streaming從kafka中讀數據的兩種方式:
Receiver方式是經過zookeeper來鏈接kafka隊列,Direct方式是直接鏈接到kafka的節點上獲取數據
(1)receiver方式:Receiver是使用Kafka的高層次Consumer API來實現的。receiver從Kafka中獲取的數據都是存儲在Spark Executor的內存中的(若是忽然數據暴增,大量batch堆積,很容易出現內存溢出的問題),而後Spark Streaming啓動的job會去處理那些數據。 然而,在默認的配置下,這種方式可能會由於底層的失敗而丟失數據。若是要啓用高可靠機制,讓數據零丟失,就必須啓用Spark Streaming的預寫日誌機制(Write Ahead Log,WAL)。該機制會同步地將接收到的Kafka數據寫入分佈式文件系統(好比HDFS)上的預寫日誌中。因此,即便底層節點出現了失敗,也可使用預寫日誌中的數據進行恢復。
然而,在默認的配置下,這種方式可能會由於底層的失敗而丟失數據。若是要啓用高可靠機制,讓數據零丟失,就必須啓用Spark Streaming的預寫日誌機制(Write Ahead Log,WAL)。該機制會同步地將接收到的Kafka數據寫入分佈式文件系統(好比HDFS)上的預寫日誌中。因此,即便底層節點出現了失敗,也可使用預寫日誌中的數據進行恢復。
注意:Kafka中的topic的partition,與Spark中的RDD的partition是沒有關係的。因此,在一、KafkaUtils.createStream()中,提升partition的數量,只會增長一個Receiver中,讀取partition的線程的數量。不會增長Spark處理數據的並行度。 能夠建立多個Kafka輸入DStream,使用不一樣的consumer group和topic,來經過多個receiver並行接收數據。
(2)基於Direct方式:這種方式會週期性地查詢Kafka,來得到每一個topic+partition的最新的offset,從而定義每一個batch的offset的範圍。當處理數據的job啓動時,就會使用Kafka的簡單consumer api來獲取Kafka指定offset範圍的數據。
優勢:簡化並行讀取:若是要讀取多個partition,不須要建立多個輸入DStream而後對它們進行union操做。Spark會建立跟Kafka partition同樣多的RDD partition,而且會並行從Kafka中讀取數據。因此在Kafka partition和RDD partition之間,有一個一對一的映射關係。
高性能:若是要保證零數據丟失,在基於receiver的方式中,須要開啓WAL機制。這種方式其實效率低下,由於數據實際上被複制了兩份,Kafka本身自己就有高可靠的機制,會對數據複製一份,而這裏又會複製一份到WAL中。而基於direct的方式,不依賴Receiver,不須要開啓WAL機制,只要Kafka中做了數據的複製,那麼就能夠經過Kafka的副本進行恢復。
receiver與和direct的比較:
(1)基於receiver的方式,是使用Kafka的高階API來在ZooKeeper中保存消費過的offset的。這是消費Kafka數據的傳統方式。這種方式配合着WAL機制能夠保證數據零丟失的高可靠性,可是卻沒法保證數據被處理一次且僅一次,可能會處理兩次。由於Spark和ZooKeeper之間多是不一樣步的。
(2)基於direct的方式,使用kafka的簡單api,Spark Streaming本身就負責追蹤消費的offset,並保存在checkpoint中。Spark本身必定是同步的,所以能夠保證數據是消費一次且僅消費一次。
spark的內存管理機制:
堆內內存:
做爲一個 JVM 進程,Executor 的內存管理創建在 JVM 的內存管理之上,Spark 對 JVM 的堆內(On-heap)空間進行了更爲詳細的分配,以充分利用內存。同時,Spark 引入了堆外(Off-heap)內存,使之能夠直接在工做節點的系統內存中開闢空間,進一步優化了內存的使用。
堆內內存:堆內內存的大小,由 Spark 應用程序啓動時的 –executor-memory參數配置,一個Executor中的內存分爲三塊,一塊是execution內存,一塊是storage內存,一塊是other內存。
(1)execution內存是執行內存,文檔中說join,aggregate都在這部份內存中執行,shuffle的數據也會先緩存在這個內存中,滿了再寫入磁盤,可以減小IO。其實map過程也是在這個內存中執行的。
(2)storage內存是存儲broadcast,cache,persist數據的地方。
(3)other內存是程序執行時預留給本身的內存。
堆外內存:
Off-heap memory不在 JVM 內申請內存,而是調用 Java 的 unsafe 相關 API (相似於malloc()函數)直接向操做系統申請內存因爲這種方式不進過 JVM 內存管理,因此能夠避免頻繁的 GC,這種內存申請的缺點是必須本身編寫內存申請和釋放的邏輯。堆外內存只區分 Execution 內存和 Storage 內存.
不管堆內和堆外內存目前 Execution 內存和 Storage 內存能夠互相共享的。也就是說,若是 Execution 內存不足,而 Storage 內存有空閒,那麼 Execution 能夠從 Storage 中申請空間;反之亦然.
Spark中的OOM問題不外乎如下兩種狀況:
(1)map執行中內存溢出如flatMap,filter,mapPatitions
map端過程產生大量對象致使內存溢出:這種溢出的緣由是在單個map中產生了大量的對象致使的針對這種問題,在不增長內存的狀況下,能夠經過減小每一個Task的大小,以便達到每一個Task即便產生大量的對象Executor的內存也可以裝得下。具體作法能夠在會產生大量對象的map操做以前調用repartition方法,分區成更小的塊傳入map。
(2)shuffle後內存溢出如join,reduceByKey,repartition
shuffle內存溢出的狀況能夠說都是shuffle後,單個文件過大致使的。在shuffle的使用,須要傳入一個partitioner,大部分Spark中的shuffle操做,默認的partitioner都是HashPatitioner,默認值是父RDD中最大的分區數.這個參數spark.default.parallelism只對HashPartitioner有效.若是是別的partitioner致使的shuffle內存溢出就須要重寫partitioner代碼了.
task之間的內存分配:
爲了更好地使用使用內存,Executor 內運行的 Task 之間共享着 Execution 內存。具體的,Spark 內部維護了一個 HashMap 用於記錄每一個 Task 佔用的內存。當 Task 須要在 Execution 內存區域申請 numBytes 內存,其先判斷 HashMap 裏面是否維護着這個 Task 的內存使用狀況,若是沒有,則將這個 Task 內存使用置爲0,而且以 TaskId 爲 key,內存使用爲 value 加入到 HashMap 裏面。以後爲這個 Task 申請 numBytes 內存,若是 Execution 內存區域正好有大於 numBytes 的空閒內存,則在 HashMap 裏面將當前 Task 使用的內存加上 numBytes,而後返回;若是當前 Execution 內存區域沒法申請到每一個 Task 最小可申請的內存,則當前 Task 被阻塞,直到有其餘任務釋放了足夠的執行內存,該任務才能夠被喚醒。每一個 Task 可使用 Execution 內存大小範圍爲 1/2N ~ 1/N,其中 N 爲當前 Executor 內正在運行的 Task 個數。一個 Task 可以運行必須申請到最小內存爲 (1/2N * Execution 內存);當 N = 1 的時候,Task 可使用所有的 Execution 內存。好比若是 Execution 內存大小爲 10GB,當前 Executor 內正在運行的 Task 個數爲5,則該 Task 能夠申請的內存範圍爲 10 / (2 * 5) ~ 10 / 5,也就是 1GB ~ 2GB的範圍。
Hadoop
HA(高可用集羣):
(1)腦裂現象:指在一個高可用(HA)系統中,當聯繫着的兩個節點斷開聯繫時,原本爲一個總體的系統,分裂爲兩個獨立節點,這時兩個節點開始爭搶共享資源,結果會致使系統混亂,數據損壞。
(2)如何防止腦裂:仲裁:當兩個節點出現分歧時,由第3方的仲裁者決定聽誰的。這個仲裁者,多是一個鎖服務,一個共享盤.
fencing: 當不能肯定某個節點的狀態時,經過fencing把對方幹掉,確保共享資源被徹底釋放,前提是必需要有可靠的fence設備。
hdfs的優勢:
(1)高容錯性,保存多個副本,且提供容錯機制。副本丟失或宕機自動恢復。默認存3份。
(2)流式數據訪問,一次寫入,屢次讀取,高吞吐量,因此能夠同時處理大量數據
(3)運行在廉價的機器上,經過副本提升可靠性,提供了容錯和恢復機制
(4)能夠存儲大量文件,如PB級
缺點:不擅長低延遲數據訪問,不擅長小文件的分區,不擅長併發寫入,文件隨機修改
namenode與datanode:
namenode:namenode管理文件系統的命名空間。它維護着文件系統樹及整棵樹內的全部文件和目錄。這些信息以兩個文件形式永久保存在本地磁盤上:命名空間鏡像文件和編輯日誌文件。namenode也記錄着每一個文件中各個塊所在的數據節點信息,可是它並不會永久保存塊的位置信息,由於這些信息會在系統啓動時根據數據節點信息重建。
datanode:一個數據塊在datanode上以文件形式存儲在磁盤上,包括兩個文件,一個是數據自己,一個是元數據包括數據塊的長度,塊數據的校驗和,以及時間戳。心跳是每3秒一次,心跳返回結果帶有namenode給該datanode的命令如複製塊數據到另外一臺機器,或刪除某個數據塊。若是超過10分鐘沒有收到某個datanode的心跳,則認爲該節點不可用。
如何肯定map與reduce的任務個數:
map個數取決於文件的個數,能夠設置map的數量,可是不能小於文件分塊的數量.
正確的reduce任務的個數應該是0.95或者1.75 *(節點數 ×mapred.tasktracker.tasks.maximum參數值)。Reduce任務也可以與 map任務同樣,經過設定JobConf 的conf.setNumReduceTasks(int num)方法來增長任務個數。
hdfs的高可用性:
配置了一對活動備用namenode,當活動namenode失效,備用namenode就會接管它的任務並開始服務於來自客戶端的請求.
hdfs的容錯性:
(1)心跳機制:namenode 和datanode是維護心跳的檢測。可能網絡故障,致使namenode接收不到datanode的心跳包,namenode就不會將任何新的I/O操做派發給那個datanode,namenode會檢測到文件塊的副本數目小於設置值,若是小於就自動開始複製新的副本並分發到其餘的datanode.
(2)集羣的負載均衡:節點的丟失或者增長會使數據分佈不均,當某個Datanode節點的空閒空間大於一個臨界點值時,HDFS會自動從其餘節點把數據遷移過來
(3)當namenode的損壞時,會利用secondNamenode來恢復namenode(爲namenode內存中的文件系統元數據建立檢查點).
(4)datanode的容錯性:DataNode是以數據塊做爲容錯單元,默認狀況下,每一個數據塊會被備份在三分,分別存在不一樣的DataNode上。當一個數據塊訪問失效,則會從備份的DataNode中選取一個,並備份該數據塊,以保證數據
爲namenode內存中的元數據建立檢查點機制:
(1)secondNamenode請求主namenode中止使用正在進行中的edit文件,這樣新的編輯操做記錄到一個新文件中edit.new
(2)secondNamenode從主nomanode獲取最近的fsimage和edit文件.
(3)SecondNamenode將fsimage文件載入內存,逐一執行edit文件中的操做.建立新的合併的fsimage.
(4)輔助namenode將新的fsimage文件發送回namenode,主namenode將其保存爲臨時的.ckpt
(5)主namenode從新命名爲臨時的fsimage文件,以便之後使用.
利用檢查點進行故障恢復:
當namenode發生故障時,能夠從輔助namenode恢復數據.
(1)方法1:將相關的存儲目錄恢復到新的namenode中.
(2)方法2:啓動namenode的輔助進程,將secondNamenode用做新的主namenode.
hdfs上傳寫數據的過程:
(1)client跟NameNode通訊請求上傳文件,NameNode檢查目標文件是否存在以及client是否有新建此文件的權限.
(2)若經過檢查,NameNode就會爲建立一條記錄,記錄該新建文件的信息.若未經過檢查就會拋出異常.
(3)而後NameNode向客戶端返回能夠新建文件的信息以及相應的一組datanode,Client就近原則請求3臺中的一臺DataNode 1上傳數據(本質上是一個RPC調用,創建pipeline),DataNode 1收到請求會繼續調用DataNode 2,而後DataNode 2調用DataNode 3,將整個pipeline(管線)創建完成,而後逐級返回客戶端
(4)Client開始往DataNode 1上傳第一個block,以pocket爲單位.DataNode 1收到一個packet就會傳給DataNode 2,DataNode 2傳給DataNode 3,DataNode 1每傳一個pocket會放入一個應答隊列等待應答.以此類推.
(5)當DataNode 3寫完最後一個pocket時,就會返回確認消息給DataNode 2,DataNode 2返回確認消息給DataNode 1,而後返回給客戶端,最後由客戶端返回給namenode.
hdfs讀取數據的過程:
(1)client與NameNode通訊查詢元數據,找到文件塊所在的DataNode,namenode將相應的元數據信息返回給client.
(2)client根據元數據信息挑選一臺DataNode(網絡拓撲上的就近原則,若是都同樣,則隨機挑選一臺DataNode)服務器,請求創建socket流.以packet(一個packet爲64kb)爲單位進行發送.
(3)DataNode開始發送數據,客戶端以packet爲單位接收,先在本地緩存,而後寫入目標文件
HDFS的塊劃分:FileInputFormat對文件的切分是嚴格按照偏移量來的,FileInputFormat默認爲文件在HDFS上的每個Block生成一個對應的FileSplit。那麼天然,FileSplit.start 就是對應Block在文件中的Offset、FileSplit.length就是對應Block的Length、FileSplit.hosts就是對應Block的Location(尋找距離最近的location)。劃分後數據如何讀取:在Map執行的時候,會使用InputFormat.getRecordReader()所返回的RecordReader對象來讀取Split中的每一條記錄.
hadoop實現冗餘備份的難點:
對於不一樣的數據備份,須要放到不一樣的節點上面,利用Hash函數,這樣能夠把每一個備份id對應到一個哈希值,而後再將這個哈希值與某個節點對應起來,就完成了一個數據備份的分配。這樣作在時間複雜度上只有O(1),可是不少哈希函數有一個問題,就是不穩定。這裏所謂的不穩定是指,當節點個數發生變化的時候,原來被分配到節點K上的數據備份可能就會被分配到另外一個節點上。經常使用的哈希函數爲:hash(x) = x % N,其中N爲節點個數,x爲備份id,這樣當集羣中節點出現故障或者擴展新的節點時,原來的計算的哈希值幾乎全都變了,那麼對於整個系統中的數據訪問來講,無疑是一個災難,由於訪問位置全都得改變,而且須要從新遷移數據。
那麼有沒有可能在N變化的侯,原有數據備份的哈希值不改變呢?這就是一致性哈希的優點所在。一致性哈希的原理能夠這麼理解:原來哈希是用x%N,如今是用x%S且N%S,這裏的S表示哈希函數自己能夠表示的哈希值範圍,好比它的範圍是0~2^32 - 1,那麼S=2^32。
以下圖若是選取的哈希函數取值範圍在0到2^32 - 1之間(Hash Range),那麼咱們能夠同時把Data Blocks和Data Nodes同時哈希到這個範圍裏面,這些Nodes會把Hash Range劃分爲若干區域,規定每一個Node存儲與其相鄰的前一個區域中的Blocks,從而完成數據的分配。這種方式的好處在於,即便出現Data Nodes數量變化的狀況,也不會影響其它Nodes和Blocls的位置狀況,最可能是在被刪除節點或者新增節點的附近進行調整,好比將原有區域中的Blocks進一步劃分或者合併。
上圖展現的方式中,三個Nodes將Hash Range分爲了4個區域,顯然不方便分配,因此提出一致性哈希環的概念,即將Hash Range的首位相連,而後在一個環路上面進行劃分,N個Nodes必定可以劃分出N個區域,而後讓每一個Node存儲前一個相鄰區域便可。以下圖所示:
如何解決client和hdfs讀寫延遲太高:
(1)Chunk緩衝機制:數據會被寫入一個chunk緩衝數組,這個chunk是一個512字節大小的數據片斷,這個緩衝數組能夠容納多個chunk大小的數據在裏面緩衝。
(2)Packet數據包機制:當chunk緩衝數組都寫滿了以後,就會把這個chunk緩衝數組進行一下chunk切割,切割爲一個一個的chunk,一個chunk是一個數據片斷。而後多個chunk會直接一次性寫入另一個內存緩衝數據結構,就是Packet數據包,一個Packet數據包,設計爲能夠容納127個chunk
(3)當一個Packet被塞滿了chunk以後,就會將這個Packet放入一個內存隊列來進行排隊。而後有一個DataStreamer線程會不斷的獲取隊列中的Packet數據包,經過網絡傳輸直接寫一個Packet數據包給DataNode。
mapreduce的排序:
在Hadoop中,排序是MapReduce框架中最重要的操做之一,Map Task和Reduce Task都會對數據按照key排序,無論邏輯上是否真的須要排序,任何程序中的數據都會被排序,這是Haddoop的默認行爲。MapReduce中使用了兩種排序算法:快速排序和優先隊列。在Map和Reduce Task的緩衝區使用的是快速排序,而對磁盤上的IFile文件合併使用的是優先隊列
優先隊列:本質上是最小堆,在優先隊列中,元素被賦予優先級。當訪問元素時,具備最高優先級的元素最早刪除。優先隊列具備最高級先出 (first in, largest out)的行爲特徵。一般採用堆數據結構來實現。優先隊列的兩個核心函數就是 upHeap() 和 downHeap(),全部優先隊列的操做都是圍繞這兩個函數進行的。upHeap()函數名裏的up表明目的方向,實際是從down 到 up 的 過程。downHeap()函數名裏的down也表明目的方向,實際是從up到 down 的 過程。upHeap() 是建小頂堆,downHeap() 從新調整節點,從新成爲小頂堆。
YARN結構:
yarn經過兩類長期運行的守護進程提供本身的核心服務:管理集羣上的資源使用的資源管理器(ResourceManager),以及運行在集羣中的全部節點上且可以啓動和監控容器的節點管理器(nodeManager).容器Container 是 YARN 中的資源抽象,它封裝了某個節點上的多維度資源,如內存、CPU、磁盤、網絡等,當AM向RM申請資源時,RM爲AM返回的資源即是用Container表示的。YARN會爲每一個任務分配一個Container,且該任務只能使用該Container中描述的資源。
YARN 採用了一種分層的集羣框架,經過將資源管理和應用程序管理兩部分分剝離開,分別由ResouceManager和ApplicationMaster負責,其中,ResouceManager專管資源管理和調度,而ApplicationMaster則負責與具體應用程序相關的任務切分、任務調度和容錯等,每一個應用程序對應一個ApplicationMaster.
YARN的運行機制:
(1)客戶端提交application到資源管理器,要求資源管理器運行ApplicationMaster進程.
(2)資源管理器找到一個可以在容器中啓動application的節點管理器.若是是一個簡單的計算過程,就會在資源管理器所處的容器中進行計算,將計算結果返回給客戶端.
(3)當計算任務比較複雜時,就會向資源管理器請求更多的節點的容器,以用於運行一個分佈式計算.
yarn在什麼層面調度,內存調度是什麼怎麼調度的,若是考慮CPU怎麼調度的。如何實現隔離的:
一些分佈式框架MR和Spark做爲YARN的應用運行在集羣計算層和集羣的存儲層上的.
Yarn的資源隔離是指爲運行着不一樣任務的「Container」提供可獨立使用的計算資源,以免它們之間相互干擾。目前支持兩種類型的資源隔離:CPU和內存,對於這兩種類型的資源,Yarn使用了不一樣的資源隔離方案。
(1)CPU的隔離:對於CPU而言,它是一種「彈性」資源,使用量大小不會直接影響到應用程序的存亡,所以CPU的資源隔離方案採用了Linux Kernel提供的輕量級資源隔離技術ControlGroup;yarn使用cgroup的兩種方式來控制cpu資源分配分別是嚴格按核數隔離資源和按比例隔離資源.
(2)內存的隔離:對於內存而言,它是一種「限制性」資源,使用量大小直接決定着應用程序的存亡,Cgroup會嚴格限制應用程序的內存使用上限,一旦使用量超過預先定義的上限值,就會將該應用程序「殺死」,所以沒法使用Cgroup進行內存資源隔離,而是選擇了線程監控的方式。
yarn的資源調度器:
YARN中有三種調度器可用:FIFO調度器(FIFO Scheduler),容量調度器(Capacity Scheduler)和公平調度器(Fair Scheduler)。
(1)FIFO調度器將應用放置在第一個隊列中,而後按照提交的順序(先進先出)運行應用。首先爲隊列中第一個應用的請求分配資源,第一個應用的請求被知足後再一次爲隊列中下一個應用服務。優勢是簡單易懂不須要任何配置,可是不適合共享集羣,當使用FIFO調度器(i)時,小做業一直被阻塞,直至大做業完成。
(2)容量調度器時,一個獨立的專門隊列保證小做業一提交就能夠啓動,因爲隊列容量是爲那個隊列中的做業所保留的,所以這種策略是以整個集羣的利用率爲代價的。這意味着與使用FIFO調度器相比,大做業執行的時間要長。
(3)公平調度器不須要預留必定量的資源,由於調度器會在全部運行的做業之間動態平衡資源。第一個(大)做業啓動時,它是惟一運行的做業,於是得到集羣中全部的資源。當第二個(小)做業啓動時,它被分配到集羣的一半資源,這樣每一個做業都能公平共享資源。
公平調度器支持搶佔功能:
所謂搶佔就是容許調度器終止那些佔用資源超過了其公平共享份額的隊列的容器,這些容器資源釋放後能夠分配給資源數量低於應得份額的隊列。注意,搶佔會下降整個集羣的效率,由於被終止的containers須要從新執行。若是隊列在指定的時間內未得到的資源仍然低於其公平共享份額的一半,那麼調度器就會搶佔其餘容器。
任務的分配:
若是不是小做業, 那麼應用管理器向資源管理器請求container來運行全部的map任務和reduce任務,每一個任務對應一個container,且只能在該container上運行。這些請求是經過心跳來傳輸的, 包括每一個map任務的數據位置,好比存放輸入分片的主機名和機架(rack). 調度器利用這些信息來調度任務, 儘可能將任務分配給存儲數據的節點, 或者退而分配給和存放輸入分片的節點相同機架的節點。 請求也包括了任務的內存需求, 默認狀況下map和reduce任務的內存需求都1024MB.
Mapreduce中的shuffle過程:
shuffle 開始和結束時間:
開始時間:map執行完成有輸出文件產生,shuffle開始;
結束時間:reduce輸入文件最終肯定了,shuffle結束;
shuffle優化核心:減小拉取數據的量(io操做)及儘可能使用內存而不是磁盤。
每一個map task都有一個內存緩衝區,存儲着map的輸出結果,當緩衝區快滿的時候須要將緩衝區的數據以一個臨時文件的方式存放到磁盤,當整個map task結束後再對磁盤中這個map task產生的全部臨時文件作合併,生成最終的正式輸出文件,而後等待reduce task來拉數據。
具體分爲如下步驟:
shuffle前半段:
(1)在通過mapper的運行後,mapper的輸出結果是key/value對。MapReduce提供Partitioner接口,它根據key決定當前的這對輸出數據最終應該交由哪一個reduce task處理。默認是對key進行 hashcode後再以reduce task數量取模。
(2)接下來,須要將數據寫入內存緩衝區中,緩衝區的做用是批量收集map結果,減小磁盤IO的影響。咱們的key/value對以及Partition的結果都會被寫入環形緩衝區。環形緩衝區分爲兩個部分,數據區和索引區。數據區是存放用戶真實的數據,索引區存放數據對應的key值,partition和位置信息。整個環形內存緩衝區就是一個字節數組,當map task的輸出結果不少時,就可能會撐爆內存,因此須要在達到設定的環形緩衝區的閾值後將緩衝區中的數據臨時寫入磁盤,而後從新利用這塊緩衝區。在溢寫前,會將數據根據key和partition進行排序,這個從內存往磁盤寫數據的過程被稱爲Spill(溢寫),這個溢寫是由單獨線程來完成,不影響往緩衝區寫map結果的線程。溢寫線程啓動時不該該阻止map的結果輸出,因此整個緩衝區有個溢寫的比例默認是0.8。也就是當緩衝區的數據已經達到閾值,溢寫線程啓動,鎖定這80%的內存,執行溢寫過程。Map task的輸出結果還能夠往剩下的20%內存中寫,互不影響。 當溢寫線程啓動後,須要對這80MB空間內的key作排序(Sort)。若是有一個combiner函數,就在排序後的輸出上運行。(運行combiner函數使map的結果更加緊湊,所以減小到寫到磁盤上的數據和傳遞給reduce的數據)
(3)merger溢寫文件:每次溢寫會在磁盤上生成一個溢寫文件,若是map的輸出結果然的很大,有屢次這樣的溢寫發生,磁盤上相應的就會有多個溢寫文件存在。當map task真正完成時,內存緩衝區中的數據也所有溢寫到磁盤中造成一個溢寫文件。而後將這些溢寫文件歸併到一塊兒
至此,map端的全部工做都已結束,最終生成的這個文件也存放在TaskTracker夠得着的某個本地目錄內。每一個reduce task不斷地經過RPC從JobTracker那裏獲取map task是否完成的信息,若是reduce task獲得通知,獲知某臺TaskTracker上的map task執行完成,Shuffle的後半段過程開始啓動。
Shuffle後半段:
(4)Copy過程:簡單地拉取數據,將Copy過來的數據會先放入reduce端的內存緩衝區中。Reduce進程啓動一些數據copy線程(Fetcher),經過HTTP方式請求map task所在的TaskTracker獲取map task的輸出文件。由於map task早已結束,這些文件就歸TaskTracker管理在本地磁盤中。
(5)Merge階段:合併不一樣map端copy來的數值。
(6)Reducer的輸入文件:不斷地merge後,最後會生成一個「最終文件」,這個文件可能存在於磁盤上,也可能存在於內存中。對咱們來講,固然但願它存放於內存中,直接做爲Reducer的輸入,但默認狀況下,這個文件是存放於磁盤中的。
java的序列化方式,hadoop的序列化方式:
序列化就是把內存中的對象的狀態信息轉換成字節序列,以便於存儲(持久化)和網絡傳輸,反序列化就是就將收到的字節序列或者是硬盤的持久化數據,轉換成內存中的對象。
(1)java序列化:只要實現了serializable接口就能實現序列化與反序列化,必定要加上序列化版本ID serialVersionUID,這個是用來識別序列化的以前的類究竟是哪個。好比但願類的不一樣版本對序列化兼容,須要確保類的不一樣版本具備相同的serialVersionUID;
(2)hadoop的序列化信息:hadoop原生的序列化類須要實現一個叫Writeable的接口,相似於serializable接口,實現Writable接口必須實現兩個方法:write(DataOutputStream out);readField(DataInputStream in)方法。
特色:緊湊,對象可重用( java序列化每次序列化都要從新建立對象,內存消耗大。Writable能夠重用)可拓展性(hadoop本身寫序列化很容易,能夠利用實現hadoop的Writable接口 實現了直接比較字符流以肯定兩個Writable對象的大小)
hive
hive的做用:
hive是基於Hadoop的一個數據倉庫工具,能夠將結構化的數據文件映射爲一張數據庫表,並提供簡單的sql查詢功能,能夠將sql語句轉換爲MapReduce任務進行運行。 相對於mapreduce 離線計算須要寫不少java代碼去實現數據提取,hive能夠經過類SQL語句快速實現簡單的MapReduce統計,沒必要開發專門的MapReduce應用開發程序,更適合數據倉庫的統計分析。
hive的架構:
(1)用戶接口:主要有:CLI,client,wui.
(2)元數據存儲:Hive 將元數據存儲在數據庫中,如 mysql、derby。Hive 中的元數據包括表的名字,表的列和分區及其屬性,表的屬性(是否爲外部表等),表的數據所在目錄等
(3)解釋器,編譯器,優化器,執行器:生成的查詢計劃存儲在 HDFS 中,並在隨後由 MapReduce 調用執行。
hive是如何將HQL語句轉化爲mapreduce的:
接收到用戶的指令(HQL),使用本身的Driver,結合元數據(MetaStore),將這些指令翻譯成MapReduce,提交到Hadoop中執行,最後,將執行返回的結果輸出到用戶交互接口。HiveSQL ->AST(抽象語法樹) -> QB(查詢塊) ->OperatorTree(操做樹)->優化後的操做樹->mapreduce任務樹->優化後的mapreduce任務樹
hive的數據傾斜問題:
緣由:key分佈不均勻,業務數據自己的特性,建表時考慮不周,某些SQL語句自己就有數據傾斜.
解決方法:
(1)參數調節:hive.map.aggr = true,Map 端部分聚合,至關於Combiner
hive.groupby.skewindata=true,數據傾斜的時候進行負載均衡,當項設定爲 true,生成的查詢計劃會有兩個 MR Job。第一個 MR Job 中,Map 的輸出結果集合會隨機分佈到 Reduce 中,每一個 Reduce 作部分聚合操做,並輸出結果,這樣處理的結果是相同的 Group By Key 有可能被分發到不一樣的 Reduce 中,從而達到負載均衡的目的;第二個 MR Job 再根據預處理的數據結果按照 Group By Key 分佈到 Reduce 中(這個過程能夠保證相同的 Group By Key 被分佈到同一個 Reduce 中),最後完成最終的聚合操做。
(2)HQL語句調節:使用map join讓小的維度表(1000條如下的記錄條數) 先進內存。在map端完成reduce.
(3)空值產生的數據傾斜:賦與空值分新的key值.
udf(用戶自定義函數)
雖然有不少內置的函數,可是生產上確定不夠那麼全面,全部,用戶須要自定義函數來知足自身的要求.只須要
kafka
kafka的三種消費模式(消息接收):
(1)自動提交offset:在建立一個消費者時,默認是自動提交偏移量,這種方式也被稱爲【at most once】,fetch到消息後就能夠更新offset,不管是否消費成功。
(2)手動提交:對偏移量實行更加精確的管理,以保證消息不被重複消費以及消息不被丟失。這種方式稱爲【at least once】。fetch到消息後,等消費完成再調用方法【consumer.commitSync()】,手動更新offset;若是消費失敗, 則offset也不會更新,此條消息會被重複消費一次。
手動提交又能夠分爲:(1)同步提交:同步模式下提交失敗時一直嘗試提交,直到遇到沒法重試的狀況下才會結束,同時,同步方式下消費者線程在拉取消息時會被阻塞,直到偏移量提交操做成功或者在提交過程當中發生錯誤。
(2)異步提交:而異步方式下消費者線程不會被阻塞,可能在提交偏移量操做的結果還未返回時就開始進行下一次的拉取操做,在提交失敗時也不會嘗試提交。
kafka在什麼地方須要用到zookeeper:
Kafka將元數據信息保存在Zookeeper中,可是發送給Topic自己的數據是不會發到Zk上的
(1)kafka使用zookeeper來實現動態的集羣擴展,不須要更改客戶端(producer和consumer)的配置。broker會在zookeeper註冊並保持相關的元數據(topic,partition信息等)更新。
(2) 而客戶端會在zookeeper上註冊相關的watcher。一旦zookeeper發生變化,客戶端能及時感知並做出相應調整。這樣就保證了添加或去除broker時,各broker間仍能自動實現負載均衡。這裏的客戶端指的是Kafka的消息生產端(Producer)和消息消費端(Consumer)
(3)Broker端使用zookeeper來註冊broker信息,以及監測partitionleader存活性.
(4) Consumer端使用zookeeper用來註冊consumer信息,其中包括consumer消費的partition列表等,同時也用來發現broker列表,並和partitionleader創建socket鏈接,並獲取消息.
(5)Zookeer和Producer沒有創建關係,只和Brokers、Consumers創建關係以實現負載均衡,即同一個ConsumerGroup中的Consumers能夠實現負載均衡(由於Producer是瞬態的,能夠發送後關閉,無需直接等待)
kafka的高效讀寫:
(1)順序磁盤寫:producer生產出來的數據一直是追加到文件末端,不是在文件的隨機位置來修改數據.順序讀寫能省去類大量磁頭尋址的時間.
(2)零拷貝技術:一般是指計算機在網絡上發送文件時,不須要將文件內容拷貝到用戶空間(User Space)而直接在內核空間(Kernel Space)中傳輸到網絡的方式。根本沒有把數據複製到咱們的應用緩存中,實際上只複製一次到cpu
kafka結構,爲何最新版本不用zookeeper來維護offset:
之前的版本是將offset 存儲在zookeeper上的,kafka在傳輸數據時,數據消費成功就會修改偏移量,這樣就能夠保證數據不會丟失而致使傳輸出錯;可是這也存在一個問題:那就是每次消費數據時都要將數據的offset寫入一次,效率比較低,並且zookeeper與kafka的offset變化確認也須要走網絡IO,這樣就會給offset的維護帶來不穩定性和低效。
offset的使用了內部的roker來管理,這樣僅僅只須要broker,而不要zookeeper來維護,都是將topic提交給__consumer_offsets函數來執行。
consumer group:
(1)consumer group下能夠有一個或多個consumer instance,consumer instance能夠是一個進程,也能夠是一個線程
(2)consumer instance共享一個公共的ID,即group ID。
(3)consumer group下訂閱的topic下的每一個分區只能分配給某個group下的一個consumer(固然該分區還能夠被分配給其餘group)
kafka完整運行流程:
https://blog.csdn.net/qq_35641192/article/details/80956244