答:大數據講的是沒法在必定時間內用常規軟件工具進行捕捉,管理和處理的數據集合.是須要新的處理模式才能具備更強的決策力,洞察力,洞察發現力和流程優化能力的海量,高增加率和多樣化的信息資產;主要解決-->海量的數據存儲的分析和計算問題.css
大數據的特性:html
應用場景:大數據無處不在,大數據應用於各個行業,包括金融、汽車、餐飲、電信、能源、體能和娛樂等在內的社會各行各業都已經融入了大數據的印跡。java
製造業,利用工業大數據提高製造業水平,包括產品故障診斷與預測、分析工藝流程、改進生產工藝,優化生產過程能耗、工業供應鏈分析與優化、生產計劃與排程。node
金融行業:大數據在高頻交易、社交情緒分析和信貸風險分析三大金融創新領域發揮重大做用。nginx
汽車行業:利用大數據和物聯網技術的無人駕駛汽車,在不遠的將來將走入咱們的平常生活。web
互聯網行業:藉助於大數據技術,能夠分析客戶行爲,進行商品推薦和針對性廣告投放。redis
電信行業:利用大數據技術實現客戶離網分析,及時掌握客戶離網傾向,出臺客戶挽留措施。算法
能源行業:隨着智能電網的發展,電力公司能夠掌握海量的用戶用電信息,利用大數據技術分析用戶用電模式,能夠改進電網運行,合理設計電力需求響應系統,確保電網運行安全。sql
物流行業:利用大數據優化物流網絡,提升物流效率,下降物流成本。shell
城市管理:能夠利用大數據實現智能交通、環保監測、城市規劃和智能安防。
生物醫學:大數據能夠幫助咱們實現流行病預測、智慧醫療、健康管理,同時還能夠幫助咱們解讀DNA,瞭解更多的生命奧祕。
體育娛樂:大數據能夠幫助咱們訓練球隊,決定投拍哪一種題財的影視做品,以及預測比賽結果。
安全領域:政府能夠利用大數據技術構建起強大的國家安全保障體系,企業能夠利用大數據抵禦網絡攻擊,警察能夠藉助大數據來預防犯罪。
我的生活:大數據還能夠應用於我的生活,利用與每一個人相關聯的「我的大數據」,分析我的生活行爲習慣,爲其提供更加周到的個性化服務。
大數據的價值,遠遠不止於此,大數據對各行各業的滲透,大大推進了社會生產和生活,將來必將產生重大而深遠的影響。
具體流程:
答題要點:沒法在必定時間內用常規軟件工具進行捕捉,管理和處理的數據集合;海量的數據存儲的分析和計算;數據量龐大;高速性;多樣性;價值性:價值密度低,商業價值高
Nginx是一個高性能的HTTP和反向代理服務器,及電子郵件(IMAP/POP3)代理服務器,同時也是一個很是高效的反向代理、負載平衡。
多進程異步非阻塞事件處理機制:運用了epoll模型
優勢: 跨平臺、配置簡單 非阻塞、高併發鏈接:處理2-3萬併發鏈接數,官方監測能支持5萬併發 內存消耗小:開啓10個nginx才佔150M內存,Nginx採起了分階段資源分配技術 nginx處理靜態文件好,耗費內存少 內置的健康檢查功能:若是有一個服務器宕機,會作一個健康檢查,再發送的請求就不會發送到宕機的服務器了。從新將請求提交到其餘的節點上。 節省寬帶:支持GZIP壓縮,能夠添加瀏覽器本地緩存 穩定性高:宕機的機率很是小 master/worker結構:一個master進程,生成一個或者多個worker進程 接收用戶請求是異步的:瀏覽器將請求發送到nginx服務器,它先將用戶請求所有接收下來,再一次性發送給後端web服務器,極大減輕了web服務器的壓力 一邊接收web服務器的返回數據,一邊發送給瀏覽器客戶端 網絡依賴性比較低,只要ping通就能夠負載均衡 能夠有多臺nginx服務器 事件驅動:通訊機制採用epoll模型
得益於它的事件處理機制: 異步非阻塞事件處理機制:運用了epoll模型,提供了一個隊列,排隊解決
Apache: 建立多個進程或線程,而每一個進程或線程都會爲其分配cpu和內存(線程要比進程小的多,因此worker支持比perfork高的併發),併發過大會榨乾服務器資源。
Nginx: 採用單線程來異步非阻塞處理請求(管理員能夠配置Nginx主進程的工做進程的數量)(epoll),不會爲每一個請求分配cpu和內存資源,節省了大量資源,同時也減小了大量的CPU的上下文切換。因此才使得Nginx支持更高的併發。
首先,nginx在啓動時,會解析配置文件,獲得須要監聽的端口與ip地址,而後在nginx的master進程裏面
先初始化好這個監控的socket(建立socket,設置addrreuse等選項,綁定到指定的ip地址端口,再listen)
而後再fork(一個現有進程能夠調用fork函數建立一個新進程。由fork建立的新進程被稱爲子進程 )出多個子進程出來,而後子進程會競爭accept新的鏈接。此時,客戶端就能夠向nginx發起鏈接了。當客戶端與nginx進行三次握手,與nginx創建好一個鏈接後此時,某一個子進程會accept成功,獲得這個創建好的鏈接的socket,而後建立nginx對鏈接的封裝,即ngx_connection_t結構體.接着,設置讀寫事件處理函數並添加讀寫事件來與客戶端進行數據的交換。最後,nginx或客戶端來主動關掉鏈接,到此,一個鏈接就壽終正寢了首先,nginx在啓動時,會解析配置文件,獲得須要監聽的端口與ip地址,而後在nginx的master進程裏面.先初始化好這個監控的socket,再進行listen,而後再fork出多個子進程出來, 子進程會競爭accept新的鏈接。
一個位於客戶端和原始服務器(origin server)之間的服務器,爲了從原始服務器取得內容,客戶端向代理髮送一個請求並指定目標(原始服務器),而後代理向原始服務器轉交請求並將得到的內容返回給客戶端。客戶端才能使用正向代理.正向代理總結就一句話:代理端代理的是客戶端
反向代理(Reverse Proxy)方式是指以代理服務器來接受internet上的鏈接請求,而後將請求,發給內部網絡上的服務器.並將從服務器上獲得的結果返回給internet上請求鏈接的客戶端,此時代理服務器對外就表現爲一個反向代理服務器 反向代理總結就一句話:代理端代理的是服務端
動態資源、靜態資源分離是讓動態網站裏的動態網頁根據必定規則把不變的資源和常常變的資源區分開來,動靜資源作好了拆分之後,咱們就能夠根據靜態資源的特色將其作緩存操做,這就是網站靜態化處理的核心思路 動態資源、靜態資源分離簡單的歸納是:動態文件與靜態文件的分離
在咱們的軟件開發中,有些請求是須要後臺處理的(如:.jsp,.do等等),有些請求是不須要通過後臺處理的(如:css、html、jpg、js等等文件) 這些不須要通過後臺處理的文件稱爲靜態文件,不然動態文件。所以咱們後臺處理忽略靜態文件。這會有人又說那我後臺忽略靜態文件不就完了嗎 固然這是能夠的,可是這樣後臺的請求次數就明顯增多了。在咱們對資源的響應速度有要求的時候,咱們應該使用這種動靜分離的策略去解決 動、靜分離將網站靜態資源(HTML,JavaScript,CSS,img等文件)與後臺應用分開部署,提升用戶訪問靜態代碼的速度,下降對後臺應用訪問 這裏咱們將靜態資源放到nginx中,動態資源轉發到tomcat服務器中
負載均衡便是代理服務器將接收的請求均衡的分發到各服務器中 負載均衡主要解決網絡擁塞問題,提升服務器響應速度,服務就近提供,達到更好的訪問質量,減小後臺服務器大併發壓力
Hadoop是Apache開源的一個處理大文件的系統,主要有三大核心件:HDFS;MapReduce;YARN它們的依據來源於谷歌的三大論文GFS----HDFS;MapReduce---MapReduce;BigTable----HBase它的模型主要有:Hadoop Common:基礎型模塊。RPC調用,Socket通訊;Hadoop Distributed File System 分佈式文件系統,用於存儲大數據的信息;Hadoop YARN 資源協調框架;Hadoop MapReduce 大數據計算框架;Hadoop Ozone: 對象存儲框架;Hadoop Submarine: 機器學習引擎
答題要點:他是什麼,它的組成,它的做用
HDFS是分佈式文件系統,它是一個基於硬盤之上的文件管理工具;能夠實現和硬盤解耦;HDFS是MapReduce計算的基礎;數據都是以字節數組的方式存放在硬盤上的;數據比較安全,會將數據備份多分;
在HDFS一旦文件被存儲,那麼就不能被修改.爲何這麼說呢?由於修改會影響偏移量,會致使數據傾斜,會致使一系列的蝴蝶效益
有兩個主要的節點:namenode和DataNode
元數據的信息:文件的描述信息-->名稱,大小,權限,建立時間
HDFS主要有三個主要的核心:
Namenode在啓動以後會先進入安全模式,若是DataNode丟失的block達到必定比例,則系統會一直處於安全模式狀態也就是隻讀狀態;能夠經過設置dfs.safemode.threshold.pct(缺省值0.999f)表示HDFS啓動的時候,若是DataNode上報的block個數達到了元數據記錄block個數的0.999倍才能夠離開安全模式,不然一直是隻讀模式。
數據存儲徹底基於內存有明顯的的優勢:那就是計算速度快;缺點是掉電易失
數據存儲介質徹底基於硬盤,數據的寫入和寫出速度慢
當集羣運行過程當中執行的不少操做都會被記錄到edits文件中.
fsimage對應着當前系統鏡像:
(1) 開機:當啓動集羣時namenode會將當前(上次關機時)的fsimage與最新的日誌進行合併產生新的fsimage
(2) 關機:直接關機便可
(3) 當達到閾值時(fs.checkpoint.size 默認64M;fs.checkpoint.period 默認3600秒)secondarynamenode會將namenode的fsimage與edits文件拷貝到seconamenode主機上,而後進行合併,而後生成fsimage.ckpt,而後將其拷貝到namenode上,檢查文件的完整性,修改fsimage.ckpt的名字爲正確的名字;這樣就解決了啓動時間過長和異常關機的問題.
Yarn是hadoop的集羣資源管理框架,在hadoop2被引入,具備足夠的通用性,一樣也支持其餘的分佈式計算模式
Yarn的基本思想就是將jobtracker的兩大主要職能:資源管理,做業的調度監控分爲兩個獨立的進程.一個是全局的resourcemanager,另外一個是每個應用對應的applicationmaster
Resourcemanager是一個純粹的調度器,它根據應用程序的資源請求嚴格限制系統的可用資源.在保證容量,公平性以及服務器等級的前提下,優化集羣資源利用率,即讓全部的資源都能被充分利用;主要做用:處理客戶端請求;啓動或監控ApplicationMaster;監控NodeManager;資源的分配與調度
ApplicationMaster負責與resourcemanager協商資源,並和namemanager進行協同工做來執行容器和監控容器的狀態;主要做用:負責數據的切分;爲應用程序申請資源並分配給內部的任務;任務的監控與容錯
Nodemanager是yarn節點上的工做進程,管理集羣中獨立的計算節點.其職責包括啓動應用程序的容器,監控他們的資源使用狀況,而且彙報給resourcemanager;主要做用:管理單個節點上的資源;處理來自ResourceManager的命令;處理來自ApplicationMaster的命令
Container 是 YARN 中的資源抽象,它封裝了某個節點上的多維度資源,如內存、CPU、磁盤、網絡等,當AM向RM申請資源時,RM爲AM返回的資源即是用Container表示的。YARN會爲每一個任務分配一個Container,且該任務只能使用該Container中描述的資源。
總的來講,Container有如下做用:對任務運行環境進行抽象,封裝CPU、內存等多維度的資源以及環境變量、啓動命令等任務運行相關的信息
要使用一個 YARN 集羣,首先須要一個包含應用程序的客戶的請求。ResourceManager 協商一個容器的必要資源,啓動一個 ApplicationMaster 來表示已提交的應用程序。經過使用一個資源請求協議,ApplicationMaster 協商每一個節點上供應用程序使用的資源容器。執行應用程序時,ApplicationMaster 監視容器直到完成。當應用程序完成時,ApplicationMaster 從 ResourceManager 註銷其容器,執行週期就完成了。
客戶端發送create請求到HDFS-->HDFS接收到請求經過PRC調用namenode.create方法-->namenode檢查文件路徑是否存在,是否有足夠的操做權限(失敗,拋出異常給DFS;成功在namenode上建立一個空的entry對象,給DFS返回成功信息)-->DFS接收到成功信息後,爲客戶端建立一個FSDataOutPutStream對象-->客戶端用FSDataOutPutStream開始上傳第一個快-->向namenode請求當前block的存放位置,namenode根據文件的副本數和機架感知策略,選擇對應副本數量的節點;客戶端開始和三個DataNode建立socket通信.
該模式稱之爲管道:能夠很好的解決IO阻塞問題;客戶端開始讀取要上傳的文件到本身的內存中,而且在內存中構建緩衝區buffered(減小物理IO);
發送數據的級別爲packet(默認大小64K);客戶端從緩存中取出一個packet,從client經過FSDataOutPutStream傳輸給第一個節點,並設置一個ACK狀態,該節點接收完繼續傳遞給下一個節點.
最後一個節點接收完畢後開始返回響應狀態,前一個節點接收到響應狀態後繼續向前發送直到第一個節點,最終client接收到響應轉檯說明當前片傳輸完成,重複執行packet的操做,直到整個快傳輸完成,告知DFS當前塊傳輸完成;DFS經過RPC調用將當前塊完成的信息傳遞給NN,NN將Entry中加入當前塊的位置信息繼續按照以上的流程傳遞剩餘的塊;當整個文件都上傳文成,客戶端關閉FSDataOutPutStream
這裏須要講一下pipeline管道:正向傳輸數據,逆向傳遞響應;將chunk和checknum都封裝到packet中;當packet裝滿以後,將其放在dataqueue隊列中等待發送;DataStreamer從dataqueue裏面按照順序依次取出packet;將要發送的packet放入到ackqueue裏面而後纔開始發送,每個DataNode接收到數據以後都會進行校驗:若是完整繼續向後傳遞,若是不完整返回false狀態;返回給客戶端,其接到返回狀態以後會去ackqueue查找回應的packet;將packet以後的數據從新掛載到dataqueue將會被從新發送
當發送數據時,加入DataNode掛掉,會向namenode彙報節點失效,namenode會記錄下備份失衡.DFS會認爲傳輸時只要有一個節點有完整數據,那麼這個數據就是完整的
首先客戶端將請求發送給DFS,DFS經過RPC調用namenode的open方法,namenode會去檢查該目錄是否存在,是否有讀的權限,成功以後,DFS會建立輸入流FSDataInPutStream,從namenode獲取塊的datanode信息,而後直接從DataNode上讀取數據便可,優先選擇同機架不繁忙的節點,把block讀取到客戶端後,最終合併成一個大文件.
(1) 沒有了namenode節點,有activenamenode徹底代替了namenode的功能;
(2) 增長了standbynamenode:備用namenode,刪除了secondarynamenode,其功能徹底由standbynamenode代替;
(3) 若是主節點掛掉,切換到standbynamenode節點上,該節點上的數據與主節點上的數據徹底一致;
(4) 新增Journalnode:幫助咱們拉取或者存放日誌文件,簡單地說就是當activenamenode生成edits的時候,拉取一份到journalnode上,standbynamenode會從journalnode集羣上,獲取到最新的日誌文件,進行重作;
(5) 引入zookeeper協助管理主備切換
(6) DataNode功能一直沒變.
(7) Hadoop2.X新增了聯邦機制:使用命名空間劃分namenode做用的範圍;固然具體的數據仍是放在DataNode上
(8) 聯邦機制有個缺點:由於命名空間是相互獨立的,因此當一個那麼濃的掛掉後,其餘命名空間是不會進行任何管理的,也就是說存在單點故障問題,故也要實現高可用;
(1) map流程
這裏就要說block,一個block大小默認是128M;再說的是split,它是map任務要梳理的數據的大小,默認等於block的大小;block太小會致使task的個數過多,過大會致使數據傾斜;每個split對應着一個maptask讀取的數據默認是一行;maptask對數據進行計算,分析,而後把臨時結果放入到一個叫kvbuffer的環形緩衝區中,它的大小默認是100M,當達到其閾值80%時就會進行溢寫,若是文件足夠大,那麼會溢寫出不少的小文件,大概是80M左右;接下來講一下分區,分區數和reduce的數量徹底相同,在溢寫的時候會提早計算出key所對應的分區reduce;進行排序,先按照分區排序,再按照key進行快排;將溢寫產生的多個小文件合併成一個大文件(這裏的大文件是相對於前面的小文件來講的),先按照分區,再按照key進行歸併排序;
(2) Reduce流程
先fetch,也就是從maptask上拉取reduce須要計算的數據;再按照key進行歸併排序;咱們必須將key相同的臨時數據拉取到同一個reducetask中進行計算;最後通過output輸出到HDFS上.
簡單的瞭解了一下:提交任務須要一個JOB主類,經過單例建立一個job類設置運行參數,此時job只有兩種狀態:defing和running,是用枚舉定義的;先肯定job的狀態,再設置使用API的版本,建立集羣對象,建立任務提交器;啓用多線程正式提交任務,而後切換job運行狀態;在提交任務以前,會做好切片(writeNewSplits)和設置好maptask的數量;在作切片的時候先獲取到要處理的文件路徑,肯定切片的大小,得到第一個切片後,會記錄下切片的偏移量,從新設置文件的大小(塊的大小減去這個切片的大小),再判斷一下最後一個切片的大小,通常不超過切片的1.1倍就不用再切割了,將這些切片放入集合中並返回;獲得行記錄讀取器,建立收集器時會進行初始化,獲取分區數量以及溢寫的指數,設置環形數據緩衝區,防止出現負數,對其進行哈希取值再取餘。讀取數據時,每次向下多讀一行,以防止數據被切割,除了最後一行。溢寫時設置守護線程(是個服務線程,準確地來講就是服務其餘的線程);reduce端,得到原生的keyvalue迭代器,建立拉取器,若是是本地文件就開啓一個拉取器,若是是遠端就開啓5個拉取器,獲取比較器,若是沒有設置分組比較器,那麼獲取key比較器,若沒有key比較器使用key類型自帶的比較器;若果上面全部的比較器都沒有則使用LongWritable.class。比較的時候會先判斷key是否爲本段的第一個,以及下一個key是否存在,並比較當前key和下一個key是否同樣,若是相同返回false。也就是相同key只會寫出一次。
Hbase是一個面向列的、開源的數據庫。徹底依賴於HDFS,用於存儲數據,適合存儲半結構化或者非結構化的數據。它是一個高可靠性、高性能、面向列、可伸縮的分佈式存儲系統,利用hbase能夠在廉價的PCserver上搭建起大規模結構化存儲集羣。可伸縮性主要體如今基於HDFS,能夠在保證數據安全的前提下完成存儲的擴容。須要四個維度才能定位到數據,分別是RowKey(相似於關係型數據庫的id,用來惟一標明對應列族的數據,通常佔用64k的數據長度,按照字典序排序)、Timestamp(數據的版本,不一樣的列族能夠有不一樣的事件版本)、Column Familly(有相贊成義的集合體,通常設計不超過3列族)、Qualifer(列族中的一個屬性)。全部存儲在hbase上的數據都是無數據類型的,統一爲字節數組。採用MapReduce處理數據。
鏈接hbase有多種方式如:shell、java、MapReduce等等。建立表時,咱們會將表的信息存放在zookeeper中,主要是由於:防止HMaster節點故障,丟失數據;主從master共享數據;更快的訪問數據;
HMaster接收客戶端的DDL請求,並監控HRegionServer,瞭解其當前節點狀態,接收到請求後,會選擇一個空閒的節點執行表的建立。若是發現HRegionServer掛掉,會將其上的region切換到其餘的HRegionServer上;
HRegionServer主要做用是負責管理當前節點的region,它上面可能有多個region,監控region的大小,達到閾值(10G)進行相對等分(保證數據完整爲前提),新分的region會交給另外的HRegionServer維護,負責維護與客戶端的IO請求;
HLog(WAL日誌)隸屬於HRegionServer,HRegionServer中全部的HRegion共享這個HLog;當達到閾值(內存的佔比日誌的條數)時,先寫日誌,而後將mem中的數據寫到Storefile;
HRegion:初適一張表對應一個Region,後面有可能一張表對應多個region;
Store:多個store組成region;一個store對應一個列族;一個store由memstore和Storefile組成;一個store只有一個memstore,能夠有多個Storefile;
Memstore:基於內存的數據存儲;新增數據時先寫入日誌,再寫入內存,提升數據插入速度;
Storefile:將memstore中的數據寫到HDFS上,Storefile達到必定數量會進行合併,客戶端進行數據查詢時,優先查找memstore,而後纔去Storefile中找。
熱點問題是這樣產生的:檢索hbase的記錄首先要經過rowkey來定位數據行。當大量的client訪問hbase集羣的一個或少數個節點,形成少數regionserver的讀/寫請求過多、負載過大,而其餘regionserver負載卻很小,就形成了「熱點」現象。
解決辦法:rowkey設計是熱點的源頭,region有二個重要的屬性:startkey與endkey表示這個region維護的rowkey範圍,當咱們要讀/寫數據時,若是rowkey落在某個start-endkey範圍內,那麼就會定位到目標region而且讀/寫到相關的數據。正常狀況下,建立一張表時只有1個region,start-endkey沒有邊界,全部的數據都在這個region裏放着,可是當數據愈來愈多,region的size愈來愈大時,達到閾值(10G)時,hbase認爲再在裏面放數據已經不合適了,就會找到midkey將region一分爲二,稱之爲分裂而midkey則是這兩個熱功能的臨界點。假設rowkey小於midkey數據放到1區中,反之放到2區中。隨着數據的增大,數據每每會放到2區裏,會一直這樣持續下去,從而致使集羣的資源得不到很好的利用。
數據傾斜:Hbase能夠被劃分爲多個Region,可是默認建立時只有一個Region分佈在集羣的一個節點上,數據一開始時都集中在這個Region,也就是集中在這一個節點上,就算region存儲達到臨界值時被劃分,數據也是存儲在少數節點上。這就是數據傾斜。
解決辦法:(1)設置預分區:讓表的數據能夠均衡的分散在集羣上;(2)加鹽:在rowkey的前面增長隨機數,具體就是給rowkey分配一個隨機前綴以使得它和以前的rowkey的開頭不一樣,這樣rowkey就會分散到各個region上,以免熱點;(3)哈希:哈希會使同一行永遠用一個前綴加鹽。哈希也能夠使負載分散到整個集羣,可是讀倒是能夠預測的。使用肯定的哈希可讓客戶端重構完整的rowkey,能夠使用get操做準確獲取某一個行數據;(4)反轉:反轉固定長度或者數字格式的rowkey,這樣能夠有效的隨機rowkey,可是犧牲了rowkey的有序性;(5)時間戳反轉:rowkey=哈希(主鍵<遞增的id\手機號碼等>)+Long.Max_Value - timestamp。
Hbase的優化主要從如下這幾方面考慮:
(1)設置預分區:建表時設置預分區防止前期數據集中到一個regionserver上;(2)rowkey的設置:任意字符,最大長度64KB,通常設置爲10~100bytes,主要防止出現rowkey冗餘,rowkey是按照字典序排列的,rowkey設置的越短越好;(3)列族設置,不要在一張表中涉及過多的列族,由於一個store對應着一個列族,當memstore溢寫時,會觸發臨近的列族進行溢寫,若列族過多會致使IO數量龐大,若是咱們按照rowkey查詢數據時會把全部的列族的數據查詢出來會致使讀取store文件數過多;(4)在regionserver中有一片共享區域稱之爲memory,創建表時能夠將其放入到memory中,增長緩存命中率;(5)數據文件的合併與拆分:合併文件能夠大大提升查詢數據的效率;數據文件隨着數據的增長而變大,拆分能夠減小HRegionServer的壓力,提升查詢效率,固然這些都在Storefile轉換成Hfile過程當中作的,也就是寫文件到HDFS上時;(6)能夠打開多個表連接增長寫入速度,能夠用事務完成全部數據操做,對不重要的數據能夠不寫入日誌,批量插入數據或者多線程批量插入數據(htable屬於線程不安全,須要建立多個htable鏈接才能使用多線程);(7)多客戶端讀取數據,使用池建立多個htable,設置scan緩存,批量讀取數據,多線程批量讀取數據,(Blockcache * 1+memstore * n < heapsize * 0.8,若是要求響應速度,就要增大Blockcache,若是要求寫入速度,須要增大memstore)
Hbase中常見的過濾器有:KeyOnlyFilter、FirstKeyOnlyFilter、MultipleColumnPrefixFilter、ColumnCountGetFilter、ColumnPaginationFilter、InclusiveStopFilter。
過濾器:用來提升數據處理的效率,用戶能夠經過內置或自定義的過濾器來對數據進行過濾,全部的過濾器都在服務端生效,即謂詞下推,減輕網絡傳輸和客戶端處理的壓力。
主要分爲三大類:比較過濾器、專用過濾器和包裝過濾器;
比較過濾器:RowFilter、Family Filter、QualifierFilter、ValueFilter;
專用過濾器:setFilterIfMissing、setlatestversiononly、PrefixFilter、ColumnPrefixFilter、PageFilter、TimeStampsFilter;
包裝過濾器:whilematchfilter。
https://blog.csdn.net/qq_15973399/article/details/89281778
Hive是一個基於hadoop的數據倉庫工具,能夠將結構化的數據文件映射成一張表,並提供簡單的SQL查詢功能,能夠將SQL語句轉換爲MapReduce任務進行運行;主要進行OLAP操做,也就是經過歷史的數據分析,對如今的業務揮着發展提供數據的支持;常見的模式有星型模型和雪花模型;
架構主要有:client(包括命令行、JDBC、ODBC模式以及webUI),metastore(元數據信息等等),驅動Driver(解析器《將SQL語句解析成語法樹》、編譯器《將語法樹轉成邏輯執行計劃》、優化器《優化執行計劃》、執行器《將邏輯計劃轉換成物理計劃》);
Hive的數據類型也分爲基本類型和複合類型,複合類型中多了一個struct類型;
內部表和外部表:內部表通常處理本身獨享的數據,刪除表時,數據文件會被跟着刪除,外部表能夠和別的表共享數據,刪除表時,不會刪除數據文件;
對錶進行查詢時,默認查詢這個表下的全部文件,設置分區主要目的是減小每次文件的掃描範圍,分爲靜態分區和動態分區;動態分區能夠自動識別分區,插入數據時會動態的建立分區文件夾;
Hive數據分桶:爲了提高咱們查詢數據的速度,常常被查詢的列或者散列比較好的列進行分桶。
常見的數據存儲格式:textfile、sequencefile、orc、rcfile、parquet、avro、json、自定義格式。如何選擇數據格式:若是隻讀取部分列,考慮orc或者parquet,hive用orc,spark用parquet,須要讀取多行使用avro
Hive優化的核心思想就是把HiveSQL當作MapReduce程序去優化;select僅查詢本字段和where僅對本字段進行條件過濾不須要轉換MapReduce程序;(1)將HQL轉換成MapReduce進行,map端進行join操做,小表join大表,大表join大表,過濾空的key;(2)對查詢頻率比較高的次創建分區表以減小數據掃描的範圍;(3)在對待groupBy的數據傾斜的方面,咱們設置hive.group.sviWind=ture,這代表它會自動進行負載均衡,去除了數據傾斜的問題;(4)map的數量要設置合適,過多會致使oom問題,太小會致使處理數據慢;(5)reduce數量設置要合適,數量過多會產生不少小文件,會對namenode產生壓力,過少會致使單個節點壓力大,會產生OOM問題;(6)JVM重用能夠使得JVM實例在同一個job中時候使用N次,可是設置開啓以後,task插槽會一直佔用資源,不管是否有task運行,直到全部的task即整個job所有執行完成時,纔會釋放全部的task插槽資源;(7)排序:order by全排,數據太大時謹慎用;sort by對reduce裏的數據作全排,局部排序;distribute by 按某個字段值,決定數據放入哪一個reduce中;cluster by至關於sort by 加上distribute by。
有時候咱們想要既顯示彙集前的數據,又要顯示彙集後的數據,這時便引入了窗口函數;
(1)、聚合函數+over
Kafka是一個高吞吐量、低延遲、分佈式的消息隊列系統,每秒能夠處理幾十萬條消息,它的延遲最低只有幾毫秒;
Kafka提供了一個生產者、緩衝區、消費者的模型;kafka是由多個broker服務器組成,用於存儲數據;不一樣的數據被分爲不一樣的topic;producer往topic裏生產數據,consumer從topic中獲取數據;一個個消息組成partition,多個partition組成topic;每個消息被標識了一個遞增序列號表明其進來的前後順序,將進來的消息追加到該partition的後面;每次消費以後會產生偏移量offset,它是惟一的標識該分區中的每一個記錄;kafka對消息默認保存7天,固然這個數據時能夠調整的;每個消費者惟一保存的元數據信息就是消費者當前消費日誌的偏移量;該偏移量是由消費者控制,也就說消費者能夠經過修改該偏移量來讀取任何位置的數據;每一個consumer都保留本身的offset,互相不干擾,不存在線程安全問題;partition均勻分配到集羣server中。生產、消費消息時,會被路由到指定的partition,減小單臺服務器的壓力,增長了程序的並行能力;消息消費完成以後不會刪除,能夠經過重置offset從新消費;
應用場景:日誌收集開放給各類consumer,好比hadoop、hbase、solr等;用戶活動跟蹤等等,能夠裝載到hadoop、數倉作離線分析和挖掘等等;
Producer往某個partition中寫入數據時,只會往leader中寫入數據,而後數據纔會被複制進其餘的replica中;kafka是由follower到leader上拉取數據的方式進行同步的;簡單的說就是寫都往leader上寫,讀也只在leader上讀,flower只是數據的一個備份,只是爲了防止leader掛掉,並不往外提供服務。
說到同步,在分佈式架構中分爲兩種:第一種同步複製,即全部的follower把數據拿過去以後才commit,這樣一致性好,可是性能不高;另外一種是異步複製,只要leader拿到數據就當即commit,等follower慢慢複製,性能高,當即返回,可是一致性差;
咱們kafka並非同步也不是異步的,是一種獨特的ISR機制;leader會維護一個與其基本保持同步的replica列表,被稱之爲ISR,每個partition都會有一個ISR,由leader動態維護,若是一個follower比一個leader落後太多,或者超過必定時間沒有發起數據複製請求,則leader將其ISR中移除;涉及到兩個相關參數:replica.lag.time.max.ms和replica.lag.max.messages;當ISR中全部Replica都向Leader發送ACK時,leader才commit,當follower同時知足這兩個條件時,leader又會將它加入ISR機制中,故ISR是一個處於動態調整的狀況;
replica的做用:當partition的leader掛掉後,則會優先從ISR列表裏挑選一個follower選舉成新的leader,同時將舊的leader移除出ISR列表;
消費者API分爲兩種:highlevel consumer API偏移量由zookeeper來保存,使用簡單,可是不靈活;simplelevelconsumerAPI 不依賴於zookeeper,不管從自由度和性能上都有更好的表現,可是開發很複雜。
先說數據丟失問題,有兩種地方會數據丟失:第一種,producer端:有兩種緣由:第一,producer發數據給kafka時,它纔開始將數據存儲在服務器的pagecache中的,按期flush到磁盤上的,若是數據剛進來,這個時候斷電的話,數據就會丟失;第二,是在使用kafka的備份機制時,producer的ack設置爲0或1,最多隻能保證leader有數據,假如producer發送的數據leader剛接收完畢,leader就掛掉了,那麼partition的replica副本還將來得及同步,就會形成數據丟失;
這個問題怎麼解決呢,咱們能夠提升flush的頻率來減小數據丟失量,但這只是治標不治本,官方建議經過備份機制來解決數據丟失問題;
對於備份機制而致使的數據丟失問題,咱們能夠將ack設置爲all,也就是全部的備份分區都同步到這條數據,再發第二條數據,可是這樣就下降了性能,因此咱們每每得結合業務來平衡數據的一致性和系統的性能;
第二種:consumer端致使數據丟失:在使用kafka高級API時,消費者會每一個一段時間將offset自動保存到zookeeper上,假如剛提交完offset,數據還沒消費,此時機器宕機,那麼數據就丟失了;解決方案:關閉偏移量自動提交,改爲手動提交,待數據每次處理完後再提交;
關於數據重複消費產生:消費者自動提交offset到zookeeper後,程序又消費了幾條數據,可是還沒到下次offset提交的時候。這個時候機器宕機。重啓後,消費者會去zookeeper上讀offset進行消費,這就會致使數據重複消費;解決方法就是關閉自動提交,改爲手動提交。
這個是kafka的一個最大的特色:爲了保證數據寫入性能,kafka是基於操做系統的頁緩存來實現文件寫入,即就是在數據寫入文件時,先將數據寫入OS cache中也就是僅僅寫入內存中,接下來由操做系統本身決定何時把OS cache中的數據寫刷入磁盤中;最重要的一點是:以磁盤順序寫的方式來寫入文件的,也就是將數據追加到文件的末尾,不是在文件的隨機位置來修改數據;磁盤順序寫的性能會比隨機寫快上幾百倍;正由於這兩點造就了kafka的超高性能;
另一個緣由就是零拷貝技術:正常的讀取數據的順序是:操做系統從磁盤將數據讀取內核區的pagecache中;用戶進程把數據從內核區copy到用戶區的內存裏;用戶進程再把數據寫入到socket,數據流入內核區socket buffer上;OS再把數據從socket buffer中copy到網卡上,最後發送給客戶端消費者;
零拷貝技術就是直接讓操做系統的cache中的數據發送到網卡後傳輸下游的消費者,中間跳過了兩次拷貝數據的步驟;socket緩存中僅僅會拷貝一個描述符過去,不會拷貝數據到socket緩存。
這麼說吧!!經過零拷貝技術,就不須要把OS cache裏的數據拷貝到應用緩存,再從應用緩存拷貝到socket緩存了,兩次拷貝都省略了,故叫零拷貝;對socket緩存僅僅就是拷貝數據的描述符過去,而後數據就直接從OS cache中發送到網卡上去了,這個過程大大的提高了數據消費時讀取文件數據的性能;這樣作以後相當於徹底基於內存提供數據的讀和寫了,因此性能會極其的高。
Kafka topic的數據存儲在磁盤時會默認存儲在/tmp/kafka-logs目錄下,固然也能夠本身設置,在該目錄下會按topic的每一個partition分區來存儲,一個分區一個目錄,一個partition目錄下會有多個segment文件,分爲.index和.log文件:其中.index文件爲索引文件,命名從0開始,後續由上一個文件的最大的offset偏移量來開頭;.log文件爲數據文件,存放具體消息數據;kafka從磁盤上查找數據時,會先根據offset偏移量,對index文件名字進行掃描,經過用二分法的查找方式,能夠快速定位到此offset所在的索引文件,而後經過索引文件裏的索引,去對應的log文件種查找數據。
具體的相關參數:message.max.bytes (默認:1000000) – broker能接收消息的最大字節數;log.segment.bytes (默認: 1GB) – segment數據文件的大小;log.roll.hours (默認:7天) - 當segment文件7天時間裏都沒達到log.segment.bytes 大小,也會產生一個新文件;replica.fetch.max.bytes (默認: 1MB) – broker可複製的消息的最大字節數;fetch.message.max.bytes (默認 1MB) – 消費者能讀取的最大消息;
Storm是一個實時的、分佈式、高可靠性、可維護性以及具有高容錯的異步流式計算框架;它是逐條處理數據的;它的架構主要有Nimbus,Supervisor,worker;編程模型:DAG、spout、Bolt;高可靠性體如今異常處理和消息的可靠性保障機制;可維護性體如今:提供UI界面圖形化監控端口;它是一個基於內存的處理框架;
Storm的計算模型:
spout--數據源,拓撲中數據的來源。通常會從指定外部的數據源讀取元組發送到拓撲中;一個spout能夠發送多個數據流;spout中最核心的方法時next Tuple,該方法會被storm線程不斷調用、主動從數據源拉取數據,再經過emit方法將數據生成元組發送給以後的Bolt計算。
Blot--數據流處理組件,拓撲中數據處理均由Bolt完成。對於簡單的任務或者數據流轉換,單個bolt能夠簡單實現,複雜的場景每每須要多個bolt分多個步驟完成;一個Bolt能夠發送多個數據流;bolt中最核心的方法時execute方法,該方法負責接收到一個元組數據、真正實現核心的業務邏輯。
Storm grouping--數據流分組(數據分發策略),有shuffle grouping隨機分組,field grouping按字段分組,all grouping廣播發送,global grouping全局分組,nonegrouping不分組,direct grouping指向型分組,local or shuffle grouping本地或隨機分組,customgrouping自定義分組。
主要是Nimbus、Supervisor、Worker和zookeeper:Nimbus主要做用是--資源調度,任務分配,接收jar包;Supervisor--接收nimbus分配的任務,啓動、中止本身管理的worker進程;worker--運行具體處理運算組件的進程,啓動executor,通常默認一個executor負責一個task任務;worker的任務類型有spout任務和bolt任務;zookeeper負責管理集羣;
具體的任務提交流程以下:
我看過他的通訊機制和容錯機制:
通訊機制主要有這幾個方面:ZeroMQ開源的消息傳遞框架,Netty基於NIO的網絡框架,更加高效;worker內部實現了「隊列」的功能,能夠理解爲一種事件監聽或者消息處理機制,也就是隊列當中一邊由生產者放入消息數據,另外一邊消費者並行取出消息數據處理。
容錯機制主要體如今這幾方面:nimbus服務器上,配置集羣防止服務器掛掉,非nimbus服務器發生故障時,該節點上的全部任務都會超時,nimbus會將這些task任務從新分配到其餘服務器上運行;
進程掛掉時:worker進程掛掉後,supervisor會從新啓動這個進程,若是一直啓動失敗,沒法向nimbus發送心跳,nimbus會將該worker從新分配到其餘服務器上;
Supervisor掛掉:無狀態(全部的狀態信息都存放在zookeeper中來管理),快速失敗;
Nimbus掛掉:無狀態(全部的狀態信息都存放在zookeeper中來管理),快速失敗;
消息的完整性:從spout中發出的tuple,以及基於它所產生tuple構成一棵tuple樹,當其發送完成,而且樹當中每一條消息都被正確處理,就標明spout發送消息被「完整處理」,即消息的完整性。實現機制--acker,負責跟蹤每一個spout發出的tuple的DAG。
Spark是專門爲大規模數據處理而設計的快速通用的計算引擎;spark計算主要是基於內存和DAG(有向無環圖);spark處理數據的能力通常是MR的十倍以上;
Spark最核心的是RDD,那麼什麼是RDD呢?? RDD就是彈性分佈式數據集;它主要有五大特性:RDD是由一系列的partition組成的(partition的數量,大小沒有限制,體現了它的彈性)(partition是分佈在不一樣的節點上,體現了它的分佈式);函數是做用在每個partition上的;RDD之間有必定的依賴關係(lineage血統);分區器是做用在K,V格式的RDD上的(RDD中存儲的數據都是二元組對象);RDD提供一系列最佳的計算位置(體現了數據本地化和大數據中「計算向數據移動」的理念);一個application應用程序中有幾個觸發算子,就有幾個job運行。
Spark的控制算子均可以將RDD持久化;持久化的單位是partition;cache和persist都是懶加載,須要觸發算子才能執行,checkpoint不只能持久化RDD到磁盤上,還能切斷RDD之間的依賴關係;cache默認將數據持久化到內存上;persist持久化級別有12種級別;cache是persist的一種持久化;
Checkpoint運行原理:當RDD的job執行完畢後,會從finalRDD從後往前回溯,當回溯到某一個RDD調用了checkpoint方法,會對當前的RDD作一個標記;spark框架會自動啓動一個新的job,從新計算這個RDD的數據,將數據持久化到HDFS上;
優化:對RDD執行checkpoint以前,最好對這個RDD先執行cache,這樣新啓動的job只須要將內存中的數據拷貝到HDFS上就能夠,省略了從新計算這一步。
(1)、standalone模式:有兩種提交模式;
Standalone-client模式:
主要流程:client模式提交任務後,會在客戶端啓動driver進程;driver會向master申請啓動application啓動的資源;資源申請成功後,driver端將task發送到worker端執行;worker將task執行結果返回到driver端。
這種模式只適用於測試調試程序,由於driver每次都會從client端啓動,若是在生產條件下會致使網卡流量暴增。
Standalone-cluster模式提交任務:
主要流程:cluster模式提交應用程序後,會向master請求啓動driver;master接收請求,隨機在集羣一臺節點啓動driver進程;driver啓動後爲當前的應用程序申請資源;資源申請成功後,driver端發送task到worker節點上執行;worker端將執行狀況和執行結果返回給driver端。
這種模式的主要特色是:driver進程是隨機的在一臺worker上啓動,因此客戶端沒法查看task的執行狀況;
總結這兩種提交模式:driver的主要做用是:driver負責申請應用程序資源、分發任務、回收結果和監控task執行狀況。
(2)yarn模式:主要有兩種提交模式;
Yarn-client模式提交任務:
主要流程:客戶端提交一個application。在客戶端啓動一個driver進程;driver啓動後會向resourcemanager發送請求啓動applicationmaster;resourcemanager收到請求,隨機選擇一臺NodeManager啓動ApplicationMaster;ApplicationMaster啓動後會向resourcemanager請求一批container資源,用於啓動executor;resourcemanager會找到一批NodeManager返回給ApplicationMaster,用於啓動executor;ApplicationMaster會向NodeManager發送命令去啓動executor;executor啓動後,會反向註冊driver,driver發送task到executor,執行狀況和結果返回給driver端。
這種模式也會形成網卡流量暴增的問題。
第二種提交方式:yarn-cluster模式:
主要流程:客戶端提交application,發送請求到ResourceManager請求啓動ApplicationMaster;ResourceManager收到請求後隨機在一臺NodeManager上啓動ApplicationMaster;ApplicationMaster啓動後,發送請求到ResourceManager,請求一批container用於啓動excutor;ResourceManager返回一批NodeManager節點給ApplicationMaster;ApplicationMaster鏈接到NodeManager,發送請求到NodeManager啓動excutor;excutor反向註冊到NodeManager所在的節點的driver;driver發送task到excutor;excutor將執行狀況和執行結果返回給driver進程。
這種模式的特色:任務提交後不能查看日誌,只能經過yarn查看日誌。
父RDD與子RDDpartition之間的關係是一對一或者多對一,這樣稱之爲窄依賴;父RDD與子RDDpartition之間的關係是一對多,稱之爲寬依賴;簡單來講就是產生shuffle是寬依賴,反之是窄依賴。
說到寬窄依賴就不得說說DAG,那麼什麼是DAG呢。Spark任務會根據RDD之間的依賴關係,造成一個DAG有向無環圖,DAG提交給DAGScheduler;DAGScheduler會把DAG劃分爲相互依賴的多個stage,依據就是RDD之間的寬窄依賴。遇到寬依賴就劃分stage;stage切割規則:從後往前,遇到寬依賴就切割stage,它的計算模式也是pipeline管道計算模式;那麼管道的數據何時會落地呢??有兩種狀況:對RDD持久化時;shuffle write的時候;
Stage的task並行度是由stage的最後一個RDD的分區數來決定的。
啓動集羣后,worker節點會向master節點彙報資源狀況,master掌握了集羣資源狀況,當spark提交一個application後,根據RDD之間的依賴關係將application造成一個DAG有向無環圖。任務提交後,spark會在driver端建立兩個對象:DAGScheduler和TaskScheduler;DAGScheduler是任務調度的高層調度器,主要做用就是將DAG根據RDD之間的寬窄依賴關係劃分爲一個個的Stage,而後將這些stage以TaskSet的形式提交給TaskScheduler;TaskScheduler會遍歷TaskSet集合,拿到每一個task後會將task發送到計算節點executor中去執行;當task執行失敗時,則由TaskScheduler負責重試,將task從新發送給executor去執行(其實就是發送到Executor中的線程池ThreadPool去執行),默認重試3次。若是過3次以後,這個task所在的stage就失敗了。而後由DAGScheduler來負責重試,從新發送TaskSet到TaskScheduler;DAGScheduler默認重試4次。若是4次都失敗了,那麼這個job就失敗了,application就失敗了。
固然TaskScheduler不只僅是重試失敗的task,還會重試stragging(落後、緩慢),也就是一個task比其餘task慢太多的話,它就會啓動一個新的task與這個task執行相同的處理邏輯,哪個先處理完,就一它的結果爲準。這就是spark的推測執行機制。默認是關閉的;
由於對於ETL數據清洗時就要關閉,防止數據重複入庫;若遇到數據傾斜的狀況。開啓推測機制,任務可能一直處於處理不完的狀態。
粗粒度資源申請(spark):在application執行以前,將全部的資源申請完畢,才進行任務的調度;全部task執行完成後,纔會釋放這部分資源。這樣的話task執行很快;可是資源沒法充分利用。
細粒度資源申請(MR):Job中的每個task在執行前本身去申請資源,task執行完成就釋放資源;集羣的資源能夠充分利用;可是task本身去申請資源,啓動會變慢,全部任務執行就慢。
(1)、--master MASTER_URL,能夠是spark://host:port,mesos://host:port,yarn,yarn-cluster,yarn-client,local;
(2)、--deploy-mode DEPLOY_MODE,driver程度運行的地方,client或者cluster,默認是client;
(3)、--class CLASS_NAME 主類名稱,含包名;
(4)、--jars driver和executor依賴的第三方jar包;
(5)、--files 用逗號隔開的文件列表,會放置在每一個executor工做目錄中;
(6)、--conf spark的配置屬性;
(7)、--driver-memory driver程序使用內存大小,默認1024M;
(8)、--executor-memory 每一個executor內存大小,默認1G;
(9)、--driver-cores driver程序的使用core個數,默認爲1,僅限於spark standalone模式;
(10)、--supervise 失敗後是否重啓driver,僅限於spark alone或者mesos模式;
(11)、--total-executor-cores executor使用的總核數,僅限於sparkstandalone、spark on mesos模式;
(12)、--executor-cores 每一個executor使用的core數,spark on yarn默認爲1,standalone默認爲worker上全部可用的core;
(13)、--driver-cores driver使用的core,僅在yarn-cluster模式下,默認爲1;
(14)、--queue QUEUE_NAME 指定資源隊列的名稱,默認:default;
(15)、--num-executors 一共啓動的executor數量,默認是2個。
SparkUI界面瞭解;配置historyserver;
廣播變量:只能在driver端定義,不能在executor端定義;diver端能夠修改廣播變量的值,在executor端沒法修改廣播變量的值;
累加器:在driver端定義賦初始值,累加器只能在driver端讀取,在excutor端更新。
Spark的shuffle有兩種類型:hashshuffle(1.2版本以前)和sortshuffle(1.2版本以後),spark2.0以後只有sortshuffle了;
(1)hashshuffle:
① 普通機制:
執行流程:每個map task將不一樣結果寫到不一樣的buffer中,每一個buffer的大小爲32K,它起到數據緩存做用;每一個buffer文件最後對應一個磁盤小文件;reducetask來拉取對應的磁盤小文件;map task的計算結果會根據分區器(默認hashpartitioner)來決定寫入到哪個磁盤小文件中去;reducetask會去map端拉取相應的磁盤小文件;產生小文件的個數:maptask的個數*reducetask的個數;shuffle write會產生不少寫磁盤小文件的對象,shuffle read會產生不少讀取磁盤小文件的對象;JVM會頻繁GC,若GC還沒法解決運行所須要的的內存,就會OOM;數據傳輸時會有頻繁的網絡通訊,那麼出現通訊故障可能性大大增長,會致使shuffle file cannot find 因爲這個錯誤致使的task失敗,TaskScheduler不負責重試,由DAGScheduler負責重試Stage。
② 合併機制:
產生小文件的個數:core數*reduce個數;
(2)Sortshuffle
① 普通機制:
執行流程:map task的計算結果會寫入到一個內存數據結構裏面,內存數據結構默認是5M,在shuffle時會有一個定時器,不按期的去估算這個內存結構的大小,會申請擴大內存,若是申請不成功,那麼發生溢寫到磁盤;溢寫以前內存中的結構數據會進行排序分區,而後開始溢寫,以batch的形式去寫,一個batch是1萬條數據;map task執行完成後,會將這些磁盤小文件合併成一個大的磁盤文件,同時生成一個索引文件;reduce task去map端拉取數據時,首先解析索引文件,再根據索引文件去拉取對應的數據;產生磁盤小文件的個數:2*map task的個數。
② Bypass機制:
Bypass運行機制的觸發條件:shuffle reduce task的數量小於spark.shuffle.sort.bypassMergeThreshold的參數值。這個值默認是200;不須要進行map端預聚合。產生磁盤小文件爲:2*map task的個數。
理解圖:
Shuffle文件尋址流程:當map task執行完成後,會將task的執行狀況和磁盤小文件的地址封裝到MpStatus對象中,經過MapOutPutTrackerWorker對象向driver中的MapOutPutTrackerMaster彙報;在全部的map task執行完畢後,driver中就掌握了全部的磁盤小文件的地址;在reduce task執行以前,會經過excutor中MapOutPutTrackerWorker向driver端的MapOutPutTrackerMaster獲取磁盤小文件的地址;獲取到磁盤小文件的地址後,會經過BlockManager中的ConnectionManager鏈接數據所在節點上的ConnectionManager,而後經過BlockTransferService進行數據的傳輸;BlockTransferService默認啓動5個task的節點拉取數據。默認狀況下5個task拉取數據量不能超過48M。
Spark內存管理分爲靜態內存管理和統一內存管理,spark1.6以前使用的是靜態內存管理,以後引入統一內存管理;
(1)靜態內存管理:
20%用於task計算;20%中的80%進行shuffle的聚合內存,20%預留內存,防止OOM;60%中10%預留,防止OOM問題,60%中的90%中的20%用於解壓序列化數據,剩下的存儲RDD的緩存數據和廣播變量。
(2)、統一內存管理
整個內存中空餘出300M用於JVM自身運行;總內存數-300M的25%用於task計算;剩下的75%中的50%用於shuffle聚合,其他的用於存儲RDD緩存數據和廣播變量,這兩部分是能夠相互動態借用的。
那麼reduce中OOM該如何處理呢?? 減小每次拉取的數據量;提升shuffle聚合的內存比例;提升excutor的總內存。
這裏就要說一下shuffle調優了,能夠在代碼中設置配置項new SparkConf().set(「spark.shuffle.file.buffer」,」64」),不推薦,這是硬編碼;推薦使用在提交spark任務時配置spark-submit --conf spark.shuffle.file.buffer=64 –conf …;在conf下的spark-default.conf配置文件中,不推薦,由於這個conf是全部的程序都要使用的。
Shark是基於spark計算框架之上且兼容hive語法的SQL執行引擎,底層的計算是採用spark,性能比MapReduce的hive快上2倍左右;spark底層依賴於hive的解析器,查詢優化器;spark on hive:hive只做爲儲存角色,spark負責SQL解析優化,執行;hive on spark:hive即做爲存儲又負責SQL的解析優化,spark負責執行。
說到這裏就得談談dataset了,它是一個分佈式數據容器,很像傳統數據庫的二維表;除了數據之外,還掌握了數據的結構信息,即schema;dataset的底層封裝的是RDD,當RDD的泛型時Row時,也可稱之爲dataframe。
SparkSQL的數據源能夠是json類型的字符串、JDBC、Parquent、hive、HDFS等;
建立dataset的方式:
讀取json格式的文件建立dataset和經過json格式的RDD建立dataset;經過反射的方式將非json格式的RDD轉換成dataset和動態建立schema將非json格式的RDD轉換成dataset;讀取parquet文件建立dataset;讀取JDBC中的數據建立dataset;讀取hive中的數據加載成dataset;
開窗函數:按照某個字段分組,而後取另外一字段的前幾個的值,至關於分組取topN,函數格式:row number() over (partition by XXX order by XXX)。
Sparkstreaming是一個支持可擴展、高吞吐量、容錯的準實時流處理框架,實時的數據來源能夠是:kafka、flume、Twitter或者TCP sockets,能夠使用高級功能的複雜算子來處理數據,好比map、reduce、join、window,處理後的數據能夠放在HDFS、數據庫等等。
Storm是純實時的流處理框架,sparkstreaming是準實時的處理框架(微批處理),吞吐量比storm高;storm的事務機制要比sparkstreaming的完善;storm支持動態資源調度(spark1.2以後也支持);sparkstreaming擅長複雜的業務處理,storm擅長簡單的彙總型計算。
這裏還要說一下sparkstreaming的窗口操做,也就是窗口長度和滑動間隔必須是batchInterval的整數倍,反之會檢測報錯。
Sparkstreaming2.2(包含之前)+kafka主要有兩種模式:
(1)、receiver模式
當sparkstreaming程序運行起來後,executor中會有receiver task接收kafka推送過來的數據,會被持久化,默認級別是MEMORY_AND_DISK_SER_2,固然這個級別也能夠修改,receiver task對接收過來的數據進行存儲和備份,完成以後去zookeeper中更新消費偏移量,而後向driver中的receiver tracker彙報數據的位置,最後driver根據數據本地化將task分發到不一樣節點上執行。
這樣的話有個問題:若是driver進程掛掉後,它下面的executor都會被殺掉,更新zookeeper消費偏移量時,它掛掉了,就會存在找不到數據的問題,也就是丟失數據;固然,你也能夠開啓WAL預寫日誌機制,在接收數據時在備份到其餘節點時,同時備份到HDFS上,這樣就能保證數據的安全性,可是這樣比較消耗性能,增長job的執行時間,提升了任務執行的延遲度。
Receiver模式的並行度設置:該並行度是由spark.streaming.blockInterval來決定的,默認是200MS想要提升並行度能夠減小blockInterval的數值,可是最好不要低於50MS。
(2)、direct模式:
該模式下是主動去kafka中取數據,sparkstreaming內部對消費者偏移量自動來維護,默認存放在內存中,若設置了checkpoint,那麼會保存在checkpoint中,也能夠實現zookeeper來管理。
Direct模式的並行度是由讀取的kafka中topic的partition數決定的。
如何優雅的中止sparkstreaming:spark.streaming.stopGracefullyOnShutdown設置成true,而後再殺死進程:kill -15/sigterm driverpid。
說了這麼多,總結下來也就幾點:
關於receiver模式:採用了receiver接收器模式,須要一個線程一直接收數據,將數據接收到executor中默認存儲級別:MEMORY_AND_DISK_SER_2;自動使用zookeeper管理消費者offset;底層讀取kafka採用 high level consumerAPI實現,不關心偏移量,只要數據;當driver掛掉後,有數據丟失問題,能夠開啓WAL機制防止,可是這樣的話又加大了數據處理延遲,也會存在數據重複消費風險;並行度默認spark.streaming.blockInterval=200ms,能夠減小這個參數增大並行度,最小不能低於50ms。
之因此不使用它:被動接收數據,有數據存儲問題,不能手動維護消費者offset。
關於direct模式:主動接收數據,本身管理消費者offset,默認存在內存中,能夠設置checkpoint,保存到checkpoint中;底層讀取kafka使用simple consumerAPI,能夠手動維護消費者offset;該模式的並行度與讀取的topic的partition一一對應;能夠設置checkpoint的方式管理消費者偏移量,使用StreamingContext.getOrCreate(ckDir,CreateStreamingContext) 恢復,可是這樣有缺點:代碼邏輯改變後,沒法從checkpoint中來恢復offset;從checkpoint中恢復數據時,有可能形成重複消費,須要咱們寫代碼來保證數據的輸出冪等。若是代碼邏輯改變,就不能使用checkpoint模式管理offset,能夠手動維護消費者offset,能夠將offset存儲到外部系統。
Kafka0.11版本的改變:因爲zookeeper集羣不能擴展寫能力,因此將消費者偏移量放在zookeeper中,每次寫操做代價很昂貴,依次kafka0.11版本默認使用新的API,將offset更新到一個kafka自帶的topic(_consumer_offsets)中,以消費者組groupid爲單位,能夠查詢每一個組的消費topic狀況。
Sparkstreaming2.3與kafka0.11的整合:丟棄了receiver模式;採用了新的消費者API實現,並行度仍是有topic中的partition來決定;大多數狀況下sparkstreaming讀取數據使用LocationStrategies.PreferConsistent 這種策略,這種策略會將分區均勻的分佈在集羣的Executor之間。若是Executor在kafka 集羣中的某些節點上,能夠使用 LocationStrategies.PreferBrokers 這種策略,那麼當前這個Executor 中的數據會來自當前broker節點。若是節點之間的分區有明顯的分佈不均,能夠使用 LocationStrategies.PreferFixed 這種策略,能夠經過一個map 指定將topic分區分佈在哪些節點中;新的消費者API能夠將kafka中的消息預讀到緩存區中,默認大小爲64K,在executor中,能夠經過參數設置spark.streaming.kafka.consumer.cache.maxCapacity 來增大,也能夠經過spark.streaming.kafka.consumer.cache.enabled 設置成false 關閉緩存機制;
關於offset的處理:checkpoint有咱們之前說的那兩種缺點;依靠kafka存儲offset有這樣的缺點:數據默認保存一天,若是這一天內數據沒有被消費,那麼將會被清除,沒法保證有且只有一次語義,由於offset的提交是異步的,全部結果的輸出依然要本身實現冪等性;本身存儲offset,在處理邏輯時,保證數據處理的事務,處理數據成功再保存offset,這樣能夠作到精準的處理一次處理數據。
Spark調優主要有一下幾個方面:
(1)、資源調優:能夠在集羣中指定資源分配的默認參數;在conf目錄下的spark-env.sh,提交命令時給當前的application分配更多的資源,也就是增長命令選項;動態的分配資源。
(2)、並行度調優:讀取HDFS中的數據時,下降block的大小,至關於提升了RDD中partition的個數,或者代碼中sc.parallelize(xxx,numPartitions)、sc.makeRDD(xxx,numPartitions)、sc.parallelizePaits(xxx,numPartitions)、repartions/coalesce、reduceByKey/groupByKey/join---(xxx,Patitions)、spark.default.parallelism net set、spark.sql.shuffle.partitions---200、自定義分區器;
若是讀取數據時在sparkstreaming中:receiver模式中:spark.streaming.blockInterval--200ms,
direct模式中:讀取的topic的分區數。
(3)、代碼調優:避免建立重複的RDD;對屢次使用的RDD進行持久化;儘可能避免使用shuffle類的算子,當一個RDD大,一個RDD小時,能夠將小的RDD用廣播變量廣播;4.使用map-side預聚合的shuffle操做:也就是儘可能使用有combiner的shuffle類算子,能夠下降shuffle write寫磁盤的數據量,下降shuffle read拉取數據量的大小,下降reduce端聚合的次數,常見的這類算子有:reduceByKey、aggregateByKey和combinerByKey;儘可能使用高性能的算子:reducebykey代替groupbykey,mappartition代替map、foreachpartition代替foreach;使用廣播變量,每一個executor中只保留一份副本,executor中task執行時共享這份副本,能夠減小網絡傳輸的性能開銷,減小對executor內存的佔用開銷,下降GC的頻率;使用kryo優化序列化性能,用到序列化的三個方面:在算子函數中使用到外部變量時,該變量會被序列化後進行網絡傳輸,將自定義的類型做爲RDD的泛型類型時會被序列化,使用可序列化的持久化策略;kryo序列化機制比java序列化機制高10倍左右,默認是java序列化機制,由於kryo要求要註冊全部須要進行序列化的自定義類型,開發比較麻煩;優化數據結構:對象、字符串和集合類型比較佔用內存,官方建議在spark編碼中儘可能用字符串代替對象,用原始類型代替字符串,使用數組代替集合類型,這樣能夠下降GC頻率,提高性能;使用高性能的庫fastutil。
(4)、數據本地化四個級別:PROCESS_LOCAL,task計算的數據在本進程的內存中;NODE_LOCAL,task所計算的數據在本節點所在的磁盤上或者數據在本節點其餘executor進程的內存中;NO_PREF,task所計算的數據在關係型數據庫中;RACK_LOCAL,task所計算的數據在同機架的不一樣節點的磁盤或者executor進程的內存中;ANY,跨機架;數據本地化調優:spark中任務調度時TaskScheduler須要依據數據的位置來分發task到數據所在的節點上,默認分發5次,每次等待3秒仍是不能執行,那麼TaskScheduler會下降一級數據本地化級別再次發送task;能夠將發送task的等待時間調大,可是要適量,在保證application的執行時間可接受的範圍內。
(5)、內存調優:JVM堆內存分爲一塊較大的Eden和兩塊較小的Survivor,每次只使用Eden和其中一塊Survivor,當回收時將Eden和Survivor中還活着的對象一次性複製到另一塊Survivor上,最後清理掉Eden和剛纔用過的Survivor,也就是說當Eden和其中一塊兒使用Survivor區域滿了以後就會觸發minor gc進行清理,若是另外一個Survivor滿了,那麼JVM就會將多餘的對象放入到老年代中,若年輕代內存不是很大,就會頻繁gc,當年齡過大(默認15歲),仍是沒有被回收就會跑到老年代中,老年代就會溢滿進行full gc(使用消耗性能和時間的垃圾回收算法)無論是minor gc仍是full gc都會致使JVM的工做線程中止;影響spark的性能的運行速度;調優方法:提升executor整體內存的大小;下降儲存內存的比例或者下降shuffle聚合內存的比例。
(6)、spark shuffle調優:buffer大小(默認32KB)能夠調大;shuffle read拉取數據量的大小(默認48M);shuffle聚合內存的比例(默認20%);拉取數據重試次數(默認5次);重試間隔時間(默認60S);spark shuffle的種類;sortshuffle bypass機制(默認200次)。
(7)、調節executor的堆外內存:spark底層shuffle傳輸方式是使用netty傳輸(零拷貝技術),堆外內存默認每個executor的內存大小的10%,運行spark是調節這個參數到1G或者更大;若是executor因爲內存不足或者堆外內存不足掛掉,怎樣調節堆外內存的大小:在提交任務腳本中添加--conf spark.yarn.executor.memoryOverhead=2048 單位M(yarn模式),--conf spark.executor.memoryOverhead=2048 單位M(standalone模式);spark默認網絡超時時長120S,若是超過那麼task就失敗了,會出現shuffle file cannot find錯誤;能夠在提交任務腳本里添加:--conf spark.core.connection.ack.wait.timeout=300。
(8)、數據傾斜:使用hive ETL預處理數據;過濾少數致使傾斜的key;提升shuffle操做的並行度;雙重聚合,核心實現思路就是進行兩階段聚合:第一次局部聚合,先給每一個key都打上隨機數,進行預聚合放入不一樣的分區內,再去掉各個key的前綴,再次進行全局聚合操做;使用廣播變量代替join操做;採樣傾斜key並分拆join操做;使用隨機前綴和擴容RDD進行join。
Impala是Cloudera公司推出,提供對HDFS、Hbase數據的高性能、低延遲的交互式SQL查詢功能;基於hive使用內存計算,兼顧數據倉庫、具備實時、批處理、多併發等優勢;是CDH平臺首選的PB級大數據實時查詢分析引擎;
主要特色:無需轉換爲MR,直接讀取HDFS及Hbase數據,從而大大下降了延遲;Impala沒有MapReduce批處理,而是經過使用與商並行關係數據庫中相似的分佈式查詢引擎(由Querry Planner、Querry coordinator和Querry Engine三部分組成);兼容HiveSQL,支持hive基本的一些查詢;具備數據倉庫的特性,可對hive數據直接作數據分析;支持Data Local,無需數據移動,減小數據的傳輸;支持列式存儲;支持JDBC/ODBC遠程訪問。
主要劣勢:對內存依賴大;徹底依賴hive;實踐過程當中分區超過1萬,性能嚴重降低;穩定性不如hive。
主要缺點:Impala不提供任何對序列化和反序列化的支持;Impala只能讀取文本文件,而不能讀取自定義的二進制文件;每當新的記錄/文件被添加到HDFS中的數據目錄時,該表須要被刷新。
Impala的核心組件:statestore Daemon,負責收集分佈在集羣中各個impala進程的資源信息、隔節點健康情況,同步節點信息;負責Querry的調度;catalog Daemon,從hive元數據庫中同步元數據,分發表的元數據信息到各個impala中;接收來自statestore的全部請求;Impala Daemon(具備數據本地化的特性故放在DataNode上),接收client、hue、jdbc或者odbc請求、Querry執行並返回給中心協調節點;子節點上的守護進程,負責向statestore保持通訊,彙報工做。
架構流程:客戶端向某一個Impala發送一個Querry(SQL);Impala將Querry解析爲具體的執行計劃Planner,交給當前機器coordinator即爲中心協調節點;coordinator根據執行計劃Planner,經過本機executor執行,並轉發給其餘有數據的impala用executor進行執行;impala的executor之間可進行通訊,可能須要一些數據的處理;各個impala的executor執行完成後,將結果返回給中心協調節點;由中心節點coordinator將匯聚的查詢結果返回客戶端;impala的執行計劃:把執行計劃表現爲一棵完整的執行計劃樹,能夠更天然地分發執行計劃到各個Impalad執行查詢,而不用像Hive那樣把它組合成管道型的 map->reduce模式,以此保證Impala有更好的併發性和避免沒必要要的中間sort與shuffle;任務調度:Impala的調度由本身完成,目前的調度算法會盡可能知足數據的局部性,即掃描數據的進程應儘可能靠近數據自己所在的物理機器。但目前調度暫時尚未考慮負載均衡的問題。從Cloudera的資料看,Impala程序的瓶頸是網絡IO,目前Impala中已經存在對Impalad機器網絡吞吐進行統計,但目前尚未利用統計結果進行調度。
Oozie是大數據中一個做業協調框架,它提供對hadoop、mapreduce和pig job的任務調度與協調;須要部署到java servlet容器中運行;功能類似的任務調度框架還有Azkaban和Zeus。
Oozie三大功能模塊:workflow:定義job任務執行;coordinator:定時觸發workflow,週期性執行workflow;bundle job:綁定多個coordinator,一塊兒提交或觸發全部coordinator。
Oozie工做流:本質就是一個做業協調工具,底層原理是經過將xml語言轉換成MapReduce程序來作,但只是集中在map端作處理,避免shuffle的過程;執行workflow以前首先要進行相關配置:job.properties定義相關屬性以及參數;workflow.xml定義控制流和動做節點;lib存放job任務的相關資料文件;
Oozie的工做流必須是一個有向無環圖,實際上Oozie就至關於Hadoop的一個客戶端,當用戶須要執行多個關聯的MR任務時,只須要將MR執行順序寫入workflow.xml,而後使用Oozie提交本次任務,Oozie會託管此任務流。
Oozie cli命令:使用oozie以前必須先啓動HDFS、yarn和jobhistory; 全部的命令都是以oozie job -oozie oozie_url 開頭的-config 制定job.properties文件夾的位置,-run 文件啓動後會返回一個惟一的jobId,供以後使用。
CDH是cloudera公司的發行版hadoop,咱們將該版本稱之爲CDH(cloudera distribute hadoop)核心功能:可擴展存儲,分佈式計算,基於web用戶界面操做。
優勢:版本劃分清晰;版本更新速度快;支持kerberos安全認證;文檔清晰;支持多種安裝方式。
Cloudera Manager是一個擁有集羣自動化安裝、中心化管理、集羣監控、報警功能的一個工具(軟件),使得安裝hadoop集羣的時間大大縮短,運維人員數大大下降,極大的提升集羣管理的效率;
Cloudera Manager四大功能:管理:對集羣進行管理,好比添加、刪除節點等操做;監控:監控集羣的健康狀況,對設置的各類指標和系統運行狀況進行全面監控;診斷:對集羣出現的問題進行診斷,對出現的問題給出建議解決方案;集成:對hadoop的多組件進行整合。利用CM能夠分廠方便快速的搭建CDH集羣。
Redis是一個開源的,內存中的數據結構存儲系統,它能夠用做數據庫、緩存和消息中間件;redis是鍵值對數據庫,支持多種類型的數據結構,不只能夠是字符串,同時還提供散列、列表、集合、有序集合等數據結構,redis是一個內存數據庫。
Redis的使用:key操做--爲給定key設置生存時間(expipre key seconds);string操做;list操做;set操做;sorted set操做;hash操做--KV模式不變,可是V是一個鍵值對(hset key field value),返回哈希表key中給定域field的值(hget key field)。
Redis的持久化:
RDB:在指定的時間間隔內將內存中的數據集快債寫入磁盤,恢復時就是將快照文件直接讀到內存裏;redis會單獨的建立一個子進程來進行持久化,會先將數據寫入到一個臨時文件中,待持久化過程結束了,再用該文件替換上次持久化的文件;因爲主進程不進行任何IO操做,因此擁有極高的性能;假如須要進行大規模的數據恢復,且對於數據恢復的完整性不是很是敏感,那麼RDB比AOF更加高效;可是他有一個明顯的特色:最後一次持久化後的數據可能丟失;RDB保存的是dump.rdb文件。
AOF:以日誌的形式將記錄每一個操做,將redis執行過的除了讀操做之外的全部寫指令記錄下來;只容許追加文件但不能夠改寫文件;redis重啓的haul就根據日誌文件的內容將寫指令從前到後執行一次以完成數據恢復工做。
AOF有三種策略:always--同步持久化,每次數據變動會被當即記錄到磁盤,性能差但數據完整性較好;everysec--出廠默認推薦,異步操做,每秒記錄,有數據丟失;no--從不fsync==將數據交給操做系統來處理,更快也最不安全的選擇。
AOF的rewrite:AOF採用追加方式,文件過大時就新增了重寫機制,也就是aof文件大小超過所設定的閾值,redis會自動將aof文件的內容壓縮,值保留能夠恢復數據的最小指令集;重寫原理:aof文件持續增加而變大時,會fork初一條新的進程來將文件重寫,遍歷新進程的內存中的數據,每條記錄有一條set語句,重寫aof文件的操做,不讀取舊的aof文件,而是將整個內存的數據庫內容用命令的方式重寫一個新的aof文件。
觸發機制:redis會記錄上次重寫的aof的大小,默認的配置當aof文件大小爲上次rewrite後大小的一倍且文件大於64M觸發。
AOF的優勢讓redis變得很是耐久;默認每秒fsync一次;它是以追加操做的方式來寫日誌文件,所以不須要seek;文件過大時發生重寫,防止數據丟失。
AOF的缺點:對於相同的數據集來講,AOF文件的體積一般大於RDB文件的體積;使用fsync策略上AOF速度可能會慢於RDB。
Redis的主從複製:異步複製;一個主服務器能夠有多個從服務器;從服務器也能夠有本身的從服務器;複製功能不會阻塞主服務器也不會阻塞從服務器。
Redis的哨兵模式:管理多個redis服務器主要有三大任務:監控、提醒和故障遷移。
Flume主要由三大部分組成:source、channel和sink;
Source:主要的數據來源,用於採集數據,產生數據流的地方,將產生的數據流傳輸到channel;能夠處理各類類型、各類格式的日誌數據,包括:avro、thrift、exec、spooling directory、netcat、syslog、http、legacy等等。
Channel:用於橋接source和sinks,相似於一個隊列;線程安全的,能夠同時處理幾個source的寫入操做和幾個sink的讀取操做;自帶兩種channel:一種是memory channel內存中的隊列,能夠在不須要關心數據丟失的情景下使用;另外一種是file channel 將全部時間寫到磁盤上,這樣不會由於程序關閉或者機器宕機而產生數據丟失
Sink:從channel收集數據,將數據寫到目標源;sink不斷地輪詢channel中的事件且批量地移除他們,並將這些事件批量寫入到存儲或索引系統、或者發送到另外一個flume agent;sink是徹底基於事務的,刪除數據前,每一個sink用channel了啓動一個事務。批量事件一旦成功寫出到存儲系統或下個 flume agent。Sink就利用channel提交事務,完成以後,該channel從本身的內部緩衝區刪除事件;
Sink的輸出包括HDFS、logger、avro、file、Hbase、kafka等
Flume event是數據流的基本單元,由一個裝載數據的字節數組和一些列可選的字符串屬性組成;
Sqoop是一個數據遷移工具,我曾經遇到兩個問題:
(1)、sqoop導入數據事件日期類型錯誤:用sqoop import從MySQL數據庫導入到HDFS時一直報錯,後面才發現是一個事件日期類型的非法值致使的,在hive中執行select語句查詢該字段的時候報錯;解決辦法是在建立hive表時用string字段類型;
(2)、sqoop導入導出null存儲一致性問題:hive中的null在底層是以「\N」來存儲的,而MySQL中的null在底層就是null,爲了保證數據兩端的一致性,轉化的過程當中遇到null-string,null-non-string數據都轉化成指定的類型,一般指定成「\N」。在導出數據時採用input-null-string 「\N」,input-null-string「\N」兩個參數;導入數據時採用null-string「\N」null-non-string 「\N」。
(3)、sqoop導出數據到MySQL一致性問題:使用 --staging table選項,將HDFS中的數據先導入到輔助表中,成功以後。輔助表中的數據在一個事務中導出到目標表中;
底層運行的任務中只有map階段,沒有reduce階段。
Kylin是一個開源的首個徹底由中國團隊設計開發的分佈式分析引擎,它提供hadoop之上的SQL查詢接口及多位分析(OLAP)能力以及支持大規模數據,可以處理TB乃至PB級別的分析任務,可以在亞秒級查詢巨大的hive表,並支持高併發。
Kylin的工做原理:指定數據模型,定義維度和度量;預計算cube,計算全部cuboid並保存爲物化視圖;執行查詢時,讀取cuboid,運算,產生查詢結果。
Kylin的核心思想是cube預計算,理論基礎是空間換時間,把高複雜度的聚合運算、多表鏈接等操做轉換成對預計算結果的查詢。
Kylin在數據集規模上的侷限性主要取決於維度的個數和基數,而不是數據集的大小,因此kylin能更好的支持海量數據集的查詢;也正是預計算技術,kylin的查詢速度很是快,亞秒級響應。而hive的查詢時間隨着數據量的增加成線性增加。
Cube的剪枝優化:聚合組,根據業務場景,將經常使用的維度組合定義到一個聚合組中,提升查詢性能;強制維度:若一個維度被定義爲強制維度,那麼這個分組產生的全部cuboid中每個cuboid都會包含該維度,下降預計算的維度;層次維度:好比:年月日,將它們設置爲層次維度,cuboid數量將降低;聯合維度:每一個聯合中包含兩個或更多個維度,那麼這些維度要麼一塊兒出現,要麼都不出現;衍生維度;
Rowkey編碼優化:kylin支持如下幾種編碼:data編碼:將日期類型的數據使用三個字節進行編碼;time編碼:僅僅支持到秒,只使用4個字節,若是可以接受 秒級的時間精度,請選擇Time編碼來表明時間的維度;Integer編碼:Integer編碼須要提供一個額外的參數「Length」來表明需 要多少個字節。Length的長度爲1~8。若是用來編碼int32類型的整數,能夠將Length設爲4;若是用來編碼int64類型的整數,能夠將Length設爲8。在更多狀況下,若是知道一個整數類型維度的可能值都很小,那麼就能使用 Length爲2甚至是1的int編碼來存儲,這將可以有效避免存儲空間的浪費;dict編碼:對於使用該種編碼的維度,每一個Segment在構建的時候都會爲這個維度全部可能的值建立一個字典,而後使用字典中每一個值的編 號來編碼。Dict的優點是產生的編碼很是緊湊,尤爲在維度值的基數較小且長度較大的狀況下,特別節約空間。因爲產生的字典是在查詢時加載入構建引擎和查詢引擎的,因此在維度的基數大、長度也大的狀況下,容易形成構建引擎或查詢引擎的內存溢出;fixed length編碼:編碼須要提供一個額外的參數「Length」來表明需 要多少個字節。該編碼能夠看做Dict編碼的一種補充。對於基數大、長度 也大的維度來講,使用Dict可能不能正常工做,因而能夠採用一段固定長度的字節來存儲表明維度值的字節數組,該數組爲字符串形式的維度值的UTF-8字節。若是維度值的長度大於預設的Length,那麼超出的部分將會被截斷。
Flink是一個低延遲、高吞吐、統一的大數據計算引擎,flink的計算平臺能夠實現毫秒級的延遲狀況下,每秒鐘處理上億次的消息或者事件,還提供了一個Exactly-once的一致性語義,保證了數據的正確性;也提供了有狀態的計算,支持狀態管理,支持強一致性的數據語義以及支持event time,watermark對消息亂序的處理。
Flink以流式處理爲核心,用流處理去模擬批處理;支持流處理,SQL處理,批處理;對於流處理:實時性強,吞吐量高,延遲度低。
每一個flink程序由source operator+transformation operator+sink operator組成;
Flink中的key:flink處理數據不是K,V格式編程模型,它是虛擬的key;批處理用groupby算子,流式處理用keyby算子。
虛擬key的指定:使用Tuples來指定key;使用Field Expression來指定key;使用 key selector Functions來指定key。
Flink運行時包含了兩種類型處理器:jobmanager(master):用於協調分佈式執行,他們用來調度task,協調檢查點,協調失敗時恢復等等;flink運行時至少存在一個jobmanager,高可用模式中可能會存在多個,一個leader和多個standby;taskmanager(worker):用於執行一個dataflow的task(或者特殊的subtask)、數據緩衝和data stream的交換,flink運行時至少會存在一個taskmanager。
啓動方式:standalone模式、yarn模式、meson模式以及container(容器)。
Flink on yarn有兩種運行模式:yarn-session模式:預先在yarn上面劃分一部分資源給flink集羣用,flink提交的全部的任務共用這些資源;single-job模式:每次提交任務時,都會建立一個新的flink集羣,任務之間相互獨立,互不影響,方便管理;任務執行完成以後,flink集羣也會消失。
Flink on yarn的運行原理:
流程:當啓動一個新的flink yarn client會話,客戶端首先會檢查所請求的資源(容器和內存)是否可用,以後會上傳包含了flink配置文件和jar包到HDFS;客戶端的請求一個container資源去啓動applicationmaster進程;resourcemanager選一臺機器啓動AM(JobManager 和 AM 運行在同一個容器中。一旦它們成功地啓動了,AM 知道 JobManager 的地址(它本身)。它會爲 TaskManager 生成一個新的 Flink 配置文件(這樣它們才能連上 JobManager)。該文件也一樣會上傳到 HDFS。另外,AM 容器同時提供了 Flink 的 Web 界面服務);AM是爲flink的taskmanager分配容器在對應的NodeManager上面啓動taskmanager;初始化工做,從HDFS下載jar文件和修改過的配置文件。
Flink程序的任務並行度設置分爲四個級別:算子級別:能夠經過調用其setparallelism方法來定義單個運算符,數據源或數據接收器的並行度;執行環境級別:執行環境級別的並行度是本次任務中全部的操做符,數據源和數據接收器的並行度,能夠同顯式的配置運算符並行度來覆蓋執行環境並行度;客戶端級別:提交做業時,能夠在客戶端設置並行度,經過使用指定的parallelism參數 -p;系統級別:經過設置flink/conf/flink-conf .yaml配置文件中的parallelism.default配置項來定義默認並行度。
Windows是flink處理無限流的核心,flink底層引擎是一個流式引擎,在上面實現了流處理和批處理。而窗口就是從streaming到batch的一個橋樑。基於不一樣事件驅動的窗口能夠分爲:
翻滾窗口:將數據流切分紅不重疊的窗口,每個事件只能屬於一個窗口。具備固定的尺寸。基於時間驅動:咱們須要統計每一分鐘中用戶購買的商品的總數,須要將用戶的行爲事件按每一分鐘進行切分,這種切分被成爲翻滾時間窗口。基於事件驅動:當咱們想要每100個用戶的購買行爲做爲驅動,那麼每當窗口中填滿100個相同元素,就會對窗口進行計算;
滑動窗口:能夠有重疊的部分。在滑動窗口中,一個元素能夠對應多個窗口。基於時間驅動:每30s計算依次最近一分鐘用戶購買的商品總數(timewindow);基於事件:每10個相同元素計算依次最近100個元素的總和(countwindow);
會話窗口:窗口不重疊,沒有固定的開始和結束時間。當會話窗口在一段時間內沒有接收到元素時,會關閉窗口,後續元素將分配給新的會話窗口。
Flink中有三種時間類型:處理時間:當前機器處理該條事件的時間;事件時間:是每一個事件時間在其設備上發生的時間。對於亂序、延時或者數據重放等狀況都能給出正確結果;攝入時間:數據進入flink框架的時間,是在source operator中設置的。
流處理過程當中因爲網絡、背壓等緣由,致使數據亂序;對於延遲數據,咱們不能無限期的等下去,必需要有個機制來保證一個特定的時間後,必須觸發window去進行計算,該機制就是waterMark。WaterMark是flink爲了處理eventtime時間類型的窗口計算提出的一種機制,本質上也是一種時間戳;waterMark是用於處理亂序事件的,而正確的處理亂序事件,一般用waterMark機制結合window來實現。
具體流程:waterMark包含一個時間戳,flink使用waterMark標記全部小於該時間戳的消息都已流入,flink的數據源在確認全部小於某個時間戳的消息都已輸出到flink流處理系統後,會生成一個包含該時間戳的waterMark,插入到消息流中輸出到flink流處理系統中,;flink operator算子按照時間窗口緩存全部流入的消息,當操做符處理到waterMark時,它對全部小於該waterMark時間戳的時間窗口的數據進行處理併發送到下一個操做符節點,而後也將waterMark發送到下一個操做符節點。
產生方式:punctuated-數據流中每個遞增的eventtime都會產生一個waterMark,可是這樣會產生大量的waterMark在必定程度上對下游算子形成壓力,故只有在實時性要求很是高的場景纔會選擇punctuated的方式進行waterMark的生成;periodic-週期性的(必定時間間隔或者達到必定的記錄條數)產生一個waterMark;在實際的生產中periodic的方式必須結合時間和積累條數兩個維度繼續週期性產生waterMark,不然在極端狀況下會有很大的延時。
累加器:能夠在分佈式統計數據,只有在任務結束以後才能獲取累加器的最終結果;計數器是累加器的具體實現,有intcounter、longcounter和doublecounter;注意點:須要在算子內部建立累加器對象;一般在Rich函數中的open方法中註冊累加器,指定累加器的名稱;在當前算子內任意位置能夠使用累加器;必須當任務執行結束後,經過env.execute(xxx)執行後的jobexecutionresult對象獲取累加器的值。
廣播變量:數據集合經過withBroadcastSet進行廣播;可經過getruntimecontext().getbroadcastvariable訪問。
分佈式緩存:flink提供了一個分佈式緩存,相似於hadoop;執行程序時,flink會自動將文件或目錄賦值到全部worker的本地文件系統;用戶函數能夠查找指定名稱下的文件或目錄,並從worker的本地文件系統訪問它。
Flink提供了一種容錯機制,能夠持續恢復數據流應用程序的狀態,確保即便出現故障,通過恢復,程序的狀態也會回到之前的狀態;支持at least once語義和exactly once語義;flink經過按期地作checkpoint來實現容錯和恢復,容錯機制不斷地生成數據流的快照;流應用程序的狀態在一個可配置的地方;若是出現程序故障,flink將中止分佈式流數據流,而後系統從新啓動operator,並將其設置爲最近一批的檢查點;
flink的容錯機制的核心部分是生成分佈式數據流和operator狀態一致的快照;這些快照充當檢查點,系統能夠在發生故障時將其回滾;分佈式快照是由chandy-lamport算法實現的。
Flink的分佈式快照的核心元素是stream barriers;這些barriers被注入到數據流中,做爲數據流的一部分和其餘數據一同流動,barriers不會超過其餘數據提早到達;一個barrier將數據流中的數據分割成兩個數據集,即進入當前快照的數據和進入下一次快照的數據;每一個barrier帶有一個id,該id爲將處於該barrier以前的數據納入快照的檢查點的id;barrier不會打斷數據流的流動,因此它是十分輕量級的;來自不一樣的快照的多個barrier能夠統一時間存在於統一個流中,也就是說,不一樣的快照能夠並行同時發生;barrier是在source處被插入到數據流中的。
這些barrier隨數據流流動向下游,當一箇中間operator在其輸入流接收到快照n的barrier時,它在其全部的輸出流中都發送一個快照n的barrier;當一個sink operator(流DAG的終點)從其輸入流接收到n的barrier,它將快照n通知給checkpoint coordinator(協調器);在全部sink都通知了一個快照後,這個快照就完成了;當快照n完成後,因爲數據源中先於sn的數據已經經過了整個 data flow topology,咱們就能夠肯定再也不須要這些數據了。
恢復數據:flink選擇最近的已經完成的檢查點k,系統接下來重部署整個數據流圖,而後給每一個operator在檢查點k時的相應狀態;數據源則被設置爲從數據流sk位置開始讀取。
先決條件:對於flink的checkpoint機制通常來講,它須要:持續的數據源;狀態存儲的持久化,一般是分佈式文件系統。
啓用和配置檢查點:默認狀況下,禁用checkpoint;開啓方式:調用env.enablecheckpointint(n),其中n是以毫秒爲單位的檢查點間隔。
流計算中在如下場景中須要保存狀態:窗口操做;使用了KV操做的函數;繼承了checkpointedfunction的函數。
當檢查點機制啓動時,狀態將在檢查點中持久化來應對數據丟失以及恢復;狀態是如何持久化到檢查點中以及持久化到哪裏都取決於選定的state backed。
Flink在保存狀態時,支持三種存儲方式:memorystatebacked、fsstatebacked和rockedbstatebacked;若沒有配置其餘任何內容,系統默認使用memorystatebacked。
Memorystatebacked,此種存儲策略將數據保存在java堆裏,當進行checkpoint時,這種策略會對狀態作快照;而後將快照做爲checkpoint中的一部分發送給JobManager,JM也將其保存在堆中;避免阻塞,使用異步的方式進行快照;單次狀態大小最大默認被限制5MB,這個值能夠經過構造函數來更改;不論單次狀態大小最大被限制爲多少,都不能夠大過akka的frame大小;聚合的狀態都會寫入JM的內存;適合用於本地開發和調試;狀態比較少的做業等場景下。
fsstatebacked經過文件系統的URL來設置,當選擇它時,會先將數據保存在任務管理器(taskmanager)的內存中,當作checkpointing時,會將狀態快照寫入文件。保存在文件系統;少許的元數據會保存在JM的內存中,默認配置爲提供異步快照,以免在寫入狀態檢查點時阻塞處理管道;使用場景:狀態比較大,窗口比較長、大的KV狀態;須要作HA的場景。
rocksdbstatebackend經過文件系統的URL來設置;此種方式KV state須要由rockdb數據庫來管理和保存數據,該數據庫保存在taskmanager的數據目錄中;rockdb是一個高性能的K-V數據庫;數據會先放到內存中,在必定條件下觸發寫到磁盤文件上;在checkpoint時,整個rockdb數據庫的數據會快照一份,而後存到配置文件系統中;同時flink將溢寫最小的元數據存儲在Jobmanager的內存或zookeeper中;默認配置爲執行異步快照;使用場景:很是大的狀態,長窗口,大的KV狀態、須要HA的場景;它是目前惟一可用於支持有狀態流處理應用的增量檢查點。
rocksdbstatebackend可以持有的狀態的多少隻取決於可以使用的磁盤的大小,容許使用很是大的狀態,但吞吐量會受限。
Checkpoint的使用:程序的運行過程當中會每隔env.enablecheckpointing(5000)時間產生一個checkpoint快照點,當程序失敗,咱們重啓時,能夠指明從哪一個快照點進行恢復。
Savepoint由用戶建立、擁有和刪除;通常是有計劃的進行手動備份和恢復;好比說有時候flink版本更新或者更改流處理邏輯,更改並行度等等,這個時候咱們每每須要關閉一下流,這就須要咱們將流中的狀態進行存儲;從概念上講,savepoint的生成和恢復成本可能更高,而且更多地關注可移植性和對前面提到的做業更改的支持;保存命令:flink savepoint jobid target_directory。
Flink處於分佈式執行的目的,將operator的subtask連接在一塊兒造成task(相似於spark中的管道);每個task在一個線程中執行;將operator連接成task時很是有效的優化:它能夠減小與線程間的切換戶數據緩衝的開銷,並在下降延遲的同事提升總體吞吐量。
Flink默認會將多個operator進行串聯,造成任務鏈;咱們也能夠禁用任務鏈讓每一個operator造成一個task。
操做鏈其實就是相似spark的pipeline管道模式,一個task能夠執行同一個窄依賴中的算子操做。
Taskmanager是一個JVM進程,並會以獨立的線程來執行一個task或多個subtask;爲了控制一個taskmanager能接受多少個task,flink提出了task slot的概念。
每一個task slot表明了taskmanager的一個固定大小的資源子集,會平均分配資源;將資源你slot化意味着來自不一樣job的task不會爲了內存而競爭,而是每一個task都擁有必定數量的內存儲備;這裏不會涉及到CPU的隔離,slot目前僅僅用來隔離task的內存。
每一個taskmanager有一個slot,也就意味着每一個task運行在獨立的JVM中;每一個taskmanager有多個slot的話,也就是說多個task運行在同一個JVM中;同一個JVM中的task能夠共享TCP鏈接和心跳消息,能夠減小數據的網絡傳輸,也能共享溢寫數據結構,必定程度上減小了每一個task的消耗。
(1)、flink提供記錄歷史任務運行狀況的服務;
(2)、鏈接器:flink中內置了溢寫基本數據源和接收器,Apache Kafka (source/sink);Elasticsearch (sink);Hadoop FileSystem (sink);RabbitMQ (source/sink);Apache NiFi (source/sink);Apache Cassandra (sink);Amazon Kinesis Streams (source/sink);Twitter Streaming API (source);這些鏈接器中,當啓動了flink的容錯機制以後,它分別可以保證不一樣的語義。
(3)、flink exactly once的實現須要兩個階段的提交;須要實現四種主要方法:開啓一個事務;在預提交階段,將本次事務的數據緩存起來,同時開啓一個新事務執行屬於下一個checkpoint的寫入操做;在commit階段,咱們以原子性的方式將上一階段的數據整整的寫入到存儲系統;一旦異常終止事務,程序的處理過程。
(4)、flink的CEP:是flink的複雜處理庫;它容許用戶快速檢測數據流中的復瑣事件模式;首先須要用戶建立定義一個個pattern,而後經過鏈表將由前日後邏輯關係的pattern串在一塊兒,構成模式匹配的邏輯表達;在事件流中藥應用模式匹配,必須實現呢適當的equals和hashcode方法,由於flinkCEP用它們來比較和匹配事件。
(5)、flink支持table API以及批處理狀況下的SQL語句和流處理狀況下sql運距查詢分析
(1)、flink消費kafka數據起始offset配置:1.flinkKafkaConsumer.setStartFromEarliest()--從topic的最先offset位置開始處理數據,若是kafka中保存有消費者組的消費位置將被忽略;flinkKafkaConsumer.setStartFromLatest()--從topic的最新offset位置開始處理數據,若是kafka中保存有消費者組的消費位置將被忽略;flinkKafkaConsumer.setStartFromTimestamp(…)--從指定的時間戳(毫秒)開始消費數據,Kafka中每一個分區中數據大於等於設置的時間戳的數據位置將被當作開始消費的位置。若是kafka中保存有消費者組的消費位置將被忽略;flinkKafkaConsumer.setStartFromGroupOffsets()--默認的設置。根據代碼中設置的group.id設置的消費者組,去kafka中或者zookeeper中找到對應的消費者offset位置消費數據。若是沒有找到對應的消費者組的位置,那麼將按照auto.offset.reset設置的策略讀取offset。
(2)、flink消費kafka數據,消費者offset提交配置:Flink提供了消費kafka數據的offset如何提交給Kafka或者zookeeper(kafka0.8以前)的配置;注意,Flink並不依賴提交給Kafka或者zookeeper中的offset來保證容錯。提交的offset只是爲了外部來查詢監視kafka數據消費的狀況;配置offset的提交方式取決因而否爲job設置開啓checkpoint。能夠使用env.enableCheckpointing(5000)來設置開啓checkpoint;關閉checkpoint:如何禁用了checkpoint,那麼offset位置的提交取決於Flink讀取kafka客戶端的配置,enable.auto.commit配置是否開啓自動提交offset;auto.commit.interval.ms決定自動提交offset的週期;開啓checkpoint:若是開啓了checkpoint,那麼當checkpoint保存狀態完成後,將checkpoint中保存的offset位置提交到kafka。這樣保證了Kafka中保存的offset和checkpoint中保存的offset一致,能夠經過配置setCommitOffsetsOnCheckpoints(boolean)來配置是否將checkpoint中的offset提交到kafka中(默認是true)。若是使用這種方式,那麼properties中配置的kafka offset自動提交參數enable.auto.commit和週期提交參數auto.commit.interval.ms參數將被忽略。