大數據相關面試題

kafka
1 什麼是kafka
Kafka是分佈式發佈-訂閱消息系統,它最初是由LinkedIn公司開發的,以後成爲Apache項目的一部分,Kafka是一個分佈式,可劃分的,冗餘備份的持久性的日誌服務,它主要用於處理流式數據。html

2 爲何要使用 kafka,爲何要使用消息隊列
緩衝和削峯:上游數據時有突發流量,下游可能扛不住,或者下游沒有足夠多的機器來保證冗餘,kafka在中間能夠起到一個緩衝的做用,把消息暫存在kafka中,下游服務就能夠按照本身的節奏進行慢慢處理。
解耦和擴展性:項目開始的時候,並不能肯定具體需求。消息隊列能夠做爲一個接口層,解耦重要的業務流程。只須要遵照約定,針對數據編程便可獲取擴展能力。
冗餘:能夠採用一對多的方式,一個生產者發佈消息,能夠被多個訂閱topic的服務消費到,供多個毫無關聯的業務使用。
健壯性:消息隊列能夠堆積請求,因此消費端業務即便短期死掉,也不會影響主要業務的正常進行。
異步通訊:不少時候,用戶不想也不須要當即處理消息。消息隊列提供了異步處理機制,容許用戶把一個消息放入隊列,但並不當即處理它。想向隊列中放入多少消息就放多少,而後在須要的時候再去處理它們。java

3.Kafka中的ISR、AR又表明什麼?ISR的伸縮又指什麼
ISR:In-Sync Replicas 副本同步隊列
AR:Assigned Replicas 全部副本
ISR是由leader維護,follower從leader同步數據有一些延遲(包括延遲時間replica.lag.time.max.ms和延遲條數replica.lag.max.messages兩個維度, 當前最新的版本0.10.x中只支持replica.lag.time.max.ms這個維度),任意一個超過閾值都會把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也會先存放在OSR中。AR=ISR+OSR。git

4.kafka中的broker 是幹什麼的
broker 是消息的代理,Producers往Brokers裏面的指定Topic中寫消息,Consumers從Brokers裏面拉取指定Topic的消息,而後進行業務處理,broker在中間起到一個代理保存消息的中轉站。github

5.kafka中的 zookeeper 起到什麼做用,能夠不用zookeeper麼
zookeeper 是一個分佈式的協調組件,早期版本的kafka用zk作meta信息存儲,consumer的消費狀態,group的管理以及 offset的值。考慮到zk自己的一些因素以及整個架構較大機率存在單點問題,新版本中逐漸弱化了zookeeper的做用。新的consumer使用了kafka內部的group coordination協議,也減小了對zookeeper的依賴,
可是broker依然依賴於ZK,zookeeper 在kafka中還用來選舉controller 和 檢測broker是否存活等等。數據庫

6.kafka follower如何與leader同步數據
Kafka的複製機制既不是徹底的同步複製,也不是單純的異步複製。徹底同步複製要求All Alive Follower都複製完,這條消息纔會被認爲commit,這種複製方式極大的影響了吞吐率。而異步複製方式下,Follower異步的從Leader複製數據,數據只要被Leader寫入log就被認爲已經commit,這種狀況下,若是leader掛掉,會丟失數據,kafka使用ISR的方式很好的均衡了確保數據不丟失以及吞吐率。Follower能夠批量的從Leader複製數據,並且Leader充分利用磁盤順序讀以及send file(zero copy)機制,這樣極大的提升複製性能,內部批量寫磁盤,大幅減小了Follower與Leader的消息量差。apache

7.什麼狀況下一個 broker 會從 isr中踢出去
leader會維護一個與其基本保持同步的Replica列表,該列表稱爲ISR(in-sync Replica),每一個Partition都會有一個ISR,並且是由leader動態維護 ,若是一個follower比一個leader落後太多,或者超過必定時間未發起數據複製請求,則leader將其重ISR中移除 。編程

8.kafka 爲何那麼快
Cache Filesystem Cache PageCache緩存
順序寫 因爲現代的操做系統提供了預讀和寫技術,磁盤的順序寫大多數狀況下比隨機寫內存還要快。
Zero-copy 零拷技術減小拷貝次數
Batching of Messages 批量量處理。合併小的請求,而後以流的方式進行交互,直頂網絡上限。
Pull 拉模式 使用拉模式進行消息的獲取消費,與消費端處理能力相符。小程序

9.kafka producer如何優化打入速度
增長線程
提升 batch.size
增長更多 producer 實例
增長 partition 數
設置 acks=-1 時,若是延遲增大:能夠增大 num.replica.fetchers(follower 同步數據的線程數)來調解;
跨數據中心的傳輸:增長 socket 緩衝區設置以及 OS tcp 緩衝區設置。vim

10.kafka producer 打數據,ack 爲 0, 1, -1 的時候表明啥, 設置 -1 的時候,什麼狀況下,leader 會認爲一條消息 commit了
1(默認) 數據發送到Kafka後,通過leader成功接收消息的的確認,就算是發送成功了。在這種狀況下,若是leader宕機了,則會丟失數據。
0 生產者將數據發送出去就無論了,不去等待任何返回。這種狀況下數據傳輸效率最高,可是數據可靠性確是最低的。
-1 producer須要等待ISR中的全部follower都確認接收到數據後纔算一次發送完成,可靠性最高。當ISR中全部Replica都向Leader發送ACK時,leader才commit,這時候producer才能認爲一個請求中的消息都commit了。數組

11.kafka unclean 配置表明啥,會對 spark streaming 消費有什麼影響
unclean.leader.election.enable 爲true的話,意味着非ISR集合的broker 也能夠參與選舉,這樣有可能就會丟數據,spark streaming在消費過程當中拿到的 end offset 會忽然變小,致使 spark streaming job掛掉。若是unclean.leader.election.enable參數設置爲true,就有可能發生數據丟失和數據不一致的狀況,Kafka的可靠性就會下降;而若是unclean.leader.election.enable參數設置爲false,Kafka的可用性就會下降。

12.若是leader crash時,ISR爲空怎麼辦
kafka在Broker端提供了一個配置參數:unclean.leader.election,這個參數有兩個值:
true(默認):容許不一樣步副本成爲leader,因爲不一樣步副本的消息較爲滯後,此時成爲leader,可能會出現消息不一致的狀況。
false:不容許不一樣步副本成爲leader,此時若是發生ISR列表爲空,會一直等待舊leader恢復,下降了可用性。

13.kafka的message格式是什麼樣的
一個Kafka的Message由一個固定長度的header和一個變長的消息體body組成
header部分由一個字節的magic(文件格式)和四個字節的CRC32(用於判斷body消息體是否正常)構成。
當magic的值爲1的時候,會在magic和crc32之間多一個字節的數據:attributes(保存一些相關屬性,
好比是否壓縮、壓縮格式等等);若是magic的值爲0,那麼不存在attributes屬性
body是由N個字節構成的一個消息體,包含了具體的key/value消息

14.kafka中consumer group 是什麼概念
一樣是邏輯上的概念,是Kafka實現單播和廣播兩種消息模型的手段。同一個topic的數據,會廣播給不一樣的group;同一個group中的worker,只有一個worker能拿到這個數據。換句話說,對於同一個topic,每一個group均可以拿到一樣的全部數據,可是數據進入group後只能被其中的一個worker消費。group內的worker可使用多線程或多進程來實現,也能夠將進程分散在多臺機器上,worker的數量一般不超過partition的數量,且兩者最好保持整數倍關係,由於Kafka在設計時假定了一個partition只能被一個worker消費(同一group內)。

15.Kafka中的消息是否會丟失和重複消費?
要肯定Kafka的消息是否丟失或重複,從兩個方面分析入手:消息發送和消息消費。
一、消息發送
Kafka消息發送有兩種方式:同步(sync)和異步(async),默認是同步方式,可經過producer.type屬性進行配置。Kafka經過配置request.required.acks屬性來確認消息的生產:
0---表示不進行消息接收是否成功的確認;
1---表示當Leader接收成功時確認;
-1---表示Leader和Follower都接收成功時確認;
綜上所述,有6種消息生產的狀況,下面分狀況來分析消息丟失的場景:
(1)acks=0,不和Kafka集羣進行消息接收確認,則當網絡異常、緩衝區滿了等狀況時,消息可能丟失;
(2)acks=一、同步模式下,只有Leader確認接收成功後但掛掉了,副本沒有同步,數據可能丟失;

二、消息消費
Kafka消息消費有兩個consumer接口,Low-level API和High-level API:
Low-level API:消費者本身維護offset等值,能夠實現對Kafka的徹底控制;
High-level API:封裝了對parition和offset的管理,使用簡單;
若是使用高級接口High-level API,可能存在一個問題就是當消息消費者從集羣中把消息取出來、並提交了新的消息offset值後,還沒來得及消費就掛掉了,那麼下次再消費時以前沒消費成功的消息就「詭異」的消失了;
解決辦法:
針對消息丟失:同步模式下,確認機制設置爲-1,即讓消息寫入Leader和Follower以後再確認消息發送成功;異步模式下,爲防止緩衝區滿,能夠在配置文件設置不限制阻塞超時時間,當緩衝區滿時讓生產者一直處於阻塞狀態;
針對消息重複:將消息的惟一標識保存到外部介質中,每次消費時判斷是否處理過便可。
消息重複消費及解決參考:https://www.javazhiyin.com/22910.html

16.爲何Kafka不支持讀寫分離?
在 Kafka 中,生產者寫入消息、消費者讀取消息的操做都是與 leader 副本進行交互的,從 而實現的是一種主寫主讀的生產消費模型。
Kafka 並不支持主寫從讀,由於主寫從讀有 2 個很明 顯的缺點:
(1)數據一致性問題。數據從主節點轉到從節點必然會有一個延時的時間窗口,這個時間 窗口會致使主從節點之間的數據不一致。某一時刻,在主節點和從節點中 A 數據的值都爲 X, 以後將主節點中 A 的值修改成 Y,那麼在這個變動通知到從節點以前,應用讀取從節點中的 A 數據的值並不爲最新的 Y,由此便產生了數據不一致的問題。
(2)延時問題。相似 Redis 這種組件,數據從寫入主節點到同步至從節點中的過程須要經 歷網絡→主節點內存→網絡→從節點內存這幾個階段,整個過程會耗費必定的時間。而在 Kafka 中,主從同步會比 Redis 更加耗時,它須要經歷網絡→主節點內存→主節點磁盤→網絡→從節 點內存→從節點磁盤這幾個階段。對延時敏感的應用而言,主寫從讀的功能並不太適用。

17.Kafka中是怎麼體現消息順序性的?
kafka每一個partition中的消息在寫入時都是有序的,消費時,每一個partition只能被每個group中的一個消費者消費,保證了消費時也是有序的。
整個topic不保證有序。若是爲了保證topic整個有序,那麼將partition調整爲1.

18.消費者提交消費位移時提交的是當前消費到的最新消息的offset仍是offset+1?
offset+1

19.kafka如何實現延遲隊列?
Kafka並無使用JDK自帶的Timer或者DelayQueue來實現延遲的功能,而是基於時間輪自定義了一個用於實現延遲功能的定時器(SystemTimer)。JDK的Timer和DelayQueue插入和刪除操做的平均時間複雜度爲O(nlog(n)),並不能知足Kafka的高性能要求,而基於時間輪能夠將插入和刪除操做的時間複雜度都降爲O(1)。時間輪的應用並不是Kafka獨有,其應用場景還有不少,在Netty、Akka、Quartz、Zookeeper等組件中都存在時間輪的蹤跡。
底層使用數組實現,數組中的每一個元素能夠存放一個TimerTaskList對象。TimerTaskList是一個環形雙向鏈表,在其中的鏈表項TimerTaskEntry中封裝了真正的定時任務TimerTask.
Kafka中究竟是怎麼推動時間的呢?Kafka中的定時器藉助了JDK中的DelayQueue來協助推動時間輪。具體作法是對於每一個使用到的TimerTaskList都會加入到DelayQueue中。Kafka中的TimingWheel專門用來執行插入和刪除TimerTaskEntry的操做,而DelayQueue專門負責時間推動的任務。再試想一下,DelayQueue中的第一個超時任務列表的expiration爲200ms,第二個超時任務爲840ms,這裏獲取DelayQueue的隊頭只須要O(1)的時間複雜度。若是採用每秒定時推動,那麼獲取到第一個超時的任務列表時執行的200次推動中有199次屬於「空推動」,而獲取到第二個超時任務時有須要執行639次「空推動」,這樣會無端空耗機器的性能資源,這裏採用DelayQueue來輔助以少許空間換時間,從而作到了「精準推動」。Kafka中的定時器真可謂是「知人善用」,用TimingWheel作最擅長的任務添加和刪除操做,而用DelayQueue作最擅長的時間推動工做,相輔相成。
參考:https://blog.csdn.net/u013256816/article/details/80697456

20.Kafka中的事務是怎麼實現的?
參考:https://blog.csdn.net/u013256816/article/details/89135417

21.Kafka中有那些地方須要選舉?這些地方的選舉策略又有哪些?
https://blog.csdn.net/yanshu2012/article/details/54894629

  1. 消息列隊的特色:
    • 生產者消費者模式
    • 先進先出(FIFO)順序保證
    • 可靠性保證
    – 本身不丟數據
    – 消費者不丟數據:「至少一次,嚴格一次」
    • 至少一次就是可能會有兩次,會重
    • 嚴格一次機制就會負責一點

4.消息列隊常見場景
• 系統之間解耦合
– queue模型
– publish-subscribe模型
• 峯值壓力緩衝
• 異步通訊

5.kafka 的架構
• producer:消息生產者
• consumer:消息消費者
• broker:kafka集羣的server,負責處理消息讀、寫請求,存儲消息
• topic:消息隊列/分類
• Queue裏面有生產者消費者模型
• broker就是代理,在kafka cluster這一層這裏,其實裏面是有不少個broker
• topic就至關於queue
• 圖裏沒有畫其實還有zookeeper,這個架構裏面有些元信息是存在zookeeper上面的,整個集羣的管理也和zookeeper有很大的關係
• 一個topic分紅多個partition
• 每一個partition內部消息強有序,其中的每一個消息都有一個序號叫offset
• 一個partition只對應一個broker,一個broker能夠管多個partition
• 消息不通過內存緩衝,直接寫入文件
• 根據時間策略刪除,而不是消費完就刪除
• producer本身決定往哪一個partition寫消息,能夠是輪詢的負載均衡,或者是基於hash的partition策略

• 接下來咱們看kafka是怎麼生產消息,消費消息,和怎麼存儲消息的,來看它精髓的地方
• kafka裏面的消息是有topic來組織的,簡單的咱們能夠想象爲一個隊列,一個隊列就是一個topic,而後它把每一個topic又分爲不少個partition,這個是爲了作並行的,在每一個partition裏面是有序的,至關於有序的隊列,其中每一個消息都有個序號,好比0到12,從前面讀日後面寫,
• 一個partition對應一個broker,一個broker能夠管多個partition,好比說,topic有6個partition,有兩個broker,那每一個broker就管3個partition
• 這個partition能夠很簡單想象爲一個文件,當數據發過來的時候它就往這個partition上面append,追加就行,kafka和不少消息系統不同,不少消息系統是消費完了我就把它刪掉,而kafka是根據時間策略刪除,而不是消費完就刪除,在kafka裏面沒有一個消費完這麼個概念,只有過時這樣一個概念,這個模型帶來了不少個好處,這個咱們後面再討論一下
•這裏producer本身決定往哪一個partition裏面去寫,這裏有一些的策略,譬如若是hash就不用多個partition之間去join數據了
kafka 的消息存儲和生產消費模型
• consumer本身維護消費到哪一個offset
• 每一個consumer都有對應的group
• group內是queue消費模型
– 各個consumer消費不一樣的partition
– 所以一個消息在group內只消費一次
• group間是publish-subscribe消費模型
– 各個group各自獨立消費,互不影響
– 所以一個消息只被每一個group消費一次

kafka 有哪些特 點
• 消息系統的特色:生產者消費者模型,FIFO
• 高性能:單節點支持上千個客戶端,百MB/s吞吐
• 持久性:消息直接持久化在普通磁盤上且性能好
• 分佈式:數據副本冗餘、流量負載均衡、可擴展
• 很靈活:消息長時間持久化+Client維護消費狀態
• 消息系統基本的特色是保證了,有基本的生產者消費者模型,partition內部是FIFO的,partition之間呢不是FIFO的,固然咱們能夠把topic設爲一個partition,這樣就是嚴格的FIFO
• 接近網卡的極限
• 直接寫到磁盤裏面去,就是直接append到磁盤裏面去,這樣的好處是直接持久化,數據不會丟,第二個好處是順序寫,而後消費數據也是順序的讀,因此持久化的同時還能保證順序,比較好,由於磁盤順序讀比較好
• 分佈式,數據副本,也就是同一份數據能夠到不一樣的broker上面去,也就是當一份數據,磁盤壞掉的時候,數據不會丟失,好比3個副本,就是在3個機器磁盤都壞掉的狀況下數據纔會丟,在大量使用狀況下看這樣是很是好的,負載均衡,可擴展,在線擴展,不須要停服務的
• 消費方式很是靈活,第一緣由是消息持久化時間跨度比較長,一天或者一星期等,第二消費狀態本身維護消費到哪一個地方了,Queue的模型,發佈訂閱(廣播)的模型,還有回滾的模型

• ZeroMQ是一個socket的通訊庫,它是以庫的形式提供的,因此說你須要寫程序來實現消息系統,它只管內存和通訊那一塊,持久化也得本身寫,仍是那句話它是用來實現消息隊列的一個庫,其實在storm裏面呢,storm0.9以前,那些spout和bolt,bolt和bolt之間那些底層的通訊就是由ZeroMQ來通訊的,它並非一個消息隊列,就是一個通訊庫,在0.9以後呢,由於license的緣由,ZeroMQ就由Netty取代了,Netty自己就是一個網絡通訊庫嘛,因此說更合適是在通訊庫這一層,不該該是MessageQueue這一層

• Kafka,的亮點,天生是分佈式的,不須要你在上層作分佈式的工做,另外有較長時間持久化,前面基本消費就幹掉了,另外在長時間持久化下性能還比較高,順序讀和順序寫,另外還經過sendFile這樣0拷貝的技術直接從文件拷貝到網絡,減小內存的拷貝,還有批量讀批量寫來提升網絡讀取文件的性能,最後一點是比較輕量和靈活

• 消費狀態誰來維護Client vs.Server

• 有人可能會說kafka寫磁盤,會不會是瓶頸,其實不會並且是很是好的,爲何是很是好的,由於kafka寫磁盤是順序的,因此不斷的往前產生,不斷的日後寫,kafka還用了sendFile的0拷貝技術,提升速度,並且還用到了批量讀寫,一批批往裏寫,64K爲單位,100K爲單位,每一次網絡傳輸量不會特別小,RTT(RTT:Round-TripTime往返時間)的開銷就會微不足道,對文件的操做不會是很小的IO,也會是比較大塊的IO
storm+kafka 有什麼好 處
• 知足獲取輸入。產生輸出數據的基本需求
• kafka的分佈式、產生輸出數據的基本需求
• kafka的分佈式、高性能和storm吻合
• pub-sub模型可讓多個storm業務共享輸入數據
• kafka靈活消費的模式能配合storm實現不丟不重(exactly-once)的處理模型
• exactly-once,精準一次,這種模型在不少時候也是頗有用的
理解零拷 貝
• 從WIKI的定義中,咱們看到「零拷貝」是指計算機操做的過程當中,CPU不須要爲數據在內存之間的拷貝消耗資源。而它一般是指計算機在網絡上發送文件時,不須要將文件內容拷貝到用戶空間(User Space)而直接在內核空間(Kernel Space)中傳輸到網絡的方式。
• Non-Zero Copy方式:

Zero Copy方式:

從上圖中能夠清楚的看到,Zero Copy的模式中,避免了數據在用戶空間和內存空間之間的拷貝,從而提升了系統的總體性能。Linux中的sendfile()以及Java NIO中的FileChannel.transferTo()方法都實現了零拷貝的功能,而在Netty中也經過在FileRegion中包裝了NIO的FileChannel.transferTo()方法實現了零拷貝。

Storm簡介(一)
Storm是 Twitter開源的一個分佈式的實時計算系統,用於數據的實時分析,持續計算,分佈式RPC等等。
官網地址http://storm-project.net/
源碼地址:https:/github.com/nathanmarz/storm
實時計算須要解決一些什麼問題
最顯而易見的就是實時推薦系統,好比咱們在淘寶等電商購物網站去買東西,咱們會在網頁旁邊或者底端看到與本身所須要商品相關的系列產品。這就是使用相似 storn實時計算去作的,咱們很是熟悉的 Hadoop只是作離線的數據分析,沒法作到實時分析計算。
好比車流量實時的計算,天天咱們北京市的交通狀況很是的擁擠,咱們能夠利用stom爲咱們實時計算每個路段的擁擠度等相關路況信息。
再好比咱們很是熟悉的股票,那麼股票系統也是一種實時計算的機制,利用stom徹底能夠實現。
Storm簡介(二)
實現一個實時計算系統
低延遲:都說了是實時計算系統了,延遲是必定要低的。高性能:可使用幾臺普通的服務器創建環境,結餘成本
分佈式:Stom很是適合於分佈式場景,大數據的實時計算;你的數據和計算單機就能搞定,那麼不用考慮這些複雜的問題了。咱們所說的是單機搞不定的狀況。
可擴展:伴隨着業務的發展,咱們的數據量、計算量可能會愈來愈大,因此但願這個系統是可擴展的。
容錯:這是分佈式系統中通用問題,一個節點掛了不能影響個人應用,Storm能夠輕鬆作到在節點掛了的時候實現任務轉移,而且在節點重啓的時候(也就是從新投入生產環境時,自動平衡任務)
可靠性:可靠的消息處理。Storm保證每一個消息至少能獲得一次完整處理。任務失敗時,它會負責從消息源重試消息。
快速:系統的設計保證了消息能獲得快速的處理,使用 ZeroMQ做爲其底層消息隊列。
本地模式:Storm有一個「本地模式」,能夠在處理過程當中徹底模擬stom集羣。這讓你能夠快速進行開發和單元測試
Storm體系結構(一)
首先咱們拿 Hadoop和stom進行一個簡單的對比:

storm是一個開源的分佈式實時計算系統,能夠簡單、可靠的處理大量的數據流。Storm有不少使用場景:如實時分析,在線機器學習,持續計算,分佈式RPC,ETL等等。Stom支持水平擴展,具備高容錯性,保證每一個消息都會獲得處理,並且處理速度很快(在一個小集羣中,每一個結點每秒能夠處理數以百萬計的消息)。Storm的部署和運維都很便捷,並且更爲重要的是可使用任意編程詢言來開發應用。
Storm體系結構(二)
Storm架構結構圖

Storm體系結構(三)
Nimbus主節點:
主節點一般運行一個後臺程序—Nimbus,用於響應分佈在集羣中的節點,分配任務和監測故障。這個很相似亍 Hadoop中的 Job Tracker.
Supervisor工做節點:
工做節點一樣會運行一個後臺程序—supervisor,用於收聽工做指派並基於要求運行工做進程。每一個工做節點都是 topology中一個子集的實現。而Nimbus和 Supervisor之間的協調則經過 Zookeeper系統戒者集羣。
Zookeeper
Zookeeper是完成 Supervisor和 Nimbus之間協調的服務。而應用程序實現實時的邏輯則被封裝到stom中的「topology」.topology則是一組由 Spouts(數據源)和Bots(數據操做)經過 Stream Groupings運行鏈接的圖。下面對出現的術語進行更深入的解析。
Topology(拓撲)
storm中運行的一個實時應用程序,由於各個組件間的消息流動造成邏輯上的一個拓撲結構。一個 topology是 spouts和bos組成的圖,經過 stream groupings將圖中的 spouts和bots鏈接起來,以下圖:

Storm Hello World (一)

Storm Hello world(二)
咱們首先回顧下storm的組件,安裝這個流程去寫咱們的java代碼:

Storm Hello world(三)
首先編寫咱們的數據源類:Spout。可使用倆種方式:
繼承 BaseRichSpout類
實現 IRichSpout接口
重點須要幾個方法進行重寫或實現:open、nextTuple、declareOutputFields
繼續編寫咱們的數據處理類:Bolt。可使用倆種方式:
繼承 BaseBasicBolt類
實現 IRichBolt接口
重點須要幾個方法進行重寫或實現:execute、declareOutputFields
最後咱們編寫主函數(Topology)去進行提交一個任務。
在使用 Topology的時候,Storm框架爲咱們提供了倆種模式:本地模式和集羣模式
本地模式:(無需stom集羣,直接在jaa中便可運行,通常用於測試和開發階段)執行運行main函數便可。
集羣模式:(須要Stom集羣,把實現的java程序打包,而後 Topology進行提交)須要把應用打成jar,使用stom命令把 Topology提交到集羣中去
Storm Hello World(四)
提交topology命令:storm jar storm01.jar bhz.topology.PWTopology1
查看任務命令:storm list
另外倆個 supervisor節點jps顯示:
最後咱們能夠看下倆個工做節點的 usr/local/temp下的文件信息是否有內容
Storm APl
Topology(拓撲)
Stream grouping(流分組、數據的分發方式)
Spout(噴口、消息源)
Bolt(螺栓、處理器)
Worker(工做進程)
Executor(執行器、Task的線程)
Task(具體的執行任務)
Configuration(配置)
Storm拓撲配置(一)
工做進程、並行度、任務數設置:
咱們首先設置了2個工做進程(也就是2個jvm)
而後咱們設置了 spout的並行度爲2(產生2個執行器和2個任務)
第一個bolt的並行度爲2而且指定任務數爲4(產生2個執行器和4個任務)
第二個bolt的並行度爲6(產生6個執行器和6個任務)
所以:該拓撲程序共有倆個工做進程(worker),2+2+6=10個執行器
(executor),2+4+6=12個任務(task)。每一個工做進程能夠領取到12/2=6個任務。默認狀況下一個執行器執行一個任務,但若是指定了任務的數目。則任務會平均分配到執行器中。
Storm什麼是拓撲?(二)
咱們在使用storm進行流式計算的時候,都必需要在Main函數裏面創建所謂的「拓撲」,拓撲是什麼?
拓撲是一個有向圖的計算。(也就是說在計算的過程當中是有流向的去處理業務邏輯,節點之間的鏈接顯示數據該如何進入下一個節點,他們是進行鏈接傳遞的)
拓撲運行很簡單,只須要使用 storm命令,把一個jar提交給 nimbus節點,numbus就會把任務分配給具體的子節點(supervisor)去工做。
咱們建立拓撲很是簡單:
第一,構建 TopologyBuilder對象
第二,設置 Spout(噴口)數據源對象(能夠設置多個)
第三,設置Bolt(螺栓)數據處理對象(能夠設置多個)
第四,構建 Config對象
第五,提交拓撲
Storm流分組(一)
Stream Grouping:爲每一個bolt指定應該接受哪一個流做爲輸入,流分組定義瞭如何在bolt的任務直接進行分發。

Storm流分組(二)☆☆☆
Shuffle Grouping隨機分組:保證每一個bot接收到的 tuple數目相同
Fields Grouping按字段分組:好比按 userid來分組,具備一樣 userid的 tuple會被分到相同的Bots,而不一樣的 userid則會被分配到不一樣的Bolts。
All Grouping廣播發送:對於每個 tuple,全部的Bots都會收到。
Global Grouping:全局分組:這個 tuple被分配到stom中的一個bolt的其中一個task。再具體一點就是分配給id值最低的那個task。
Non Grouping無分組:假設你不關心流式如何分組的煤科院使用這種方式,目前這種分組和隨機分組是同樣的效果,不一樣的是Stom會把這個Bolt放到Bolt的訂閱者的同一個線程中執行。
Direct Grouping直接分組:這種分組意味着消息的發送者指定由消息接收者的哪一個task處理這個消息。只有被聲明爲 Direct stream的消息流能夠聲明這種分組方法並且這種消息tupe必須使用 emitDirect方法來發射。消息處理者能夠經過TopologyContext來獲取處理它的消息的 taskid(Outputcollector.emit,方法也會返回 taskid)
本地分組:若是目標bo在同一工做進程存在一個或多個任務,元祖會隨機分配給執行任務,不然該分組方式與隨機分組方式是同樣的。
storm流分組(三)

Storm WorldCount
咱們以一個統計單詞的小程序來講明問題。(storm02)

上面的示意圖中有4個組件,分別爲一個 spout和3個bolt,當數據源 spout.取得數據(能夠是一個句子,裏面包含多個單詞)之後,發送給 SolitBolt進行切分,而後由 CountBolt進行統計結果,最終由ReportBolt記錄結果
Storm Spout的可靠性(一)
Spout是 Storm數據流的入口,在設計拓撲時,一件很重要的事情就是須要考慮消息的可靠性,若是消息不能被處理而丟失是很嚴重的問題。
咱們繼續做實驗,以一個傳遞消息而且實時處理的例子,來講明這個問題。
新建 maven項目(storm03)
經過示例咱們知道,若是在第一個bolt處理的時候出現異常,咱們可讓整個數據進行重發,可是若是在第二個bolt處理的時候出現了異常,那麼咱們也會讓對應的整個spout裏的數據重發,這樣就會出現事務的問題,咱們就須要進行判斷或者是進行記錄
若是是數據入庫的話,能夠與原ID進行比對。
將一批數據定義惟一的ID入庫(冪等性判斷事物)
若是是事務的話在編寫代碼時,儘可能就不要進行拆分 tuple
或者使用 storm的 Trident框架
Storm Spout的可靠性(三)
下圖是 spout處理可靠性的示意圖:當 spout發送一個消息時,分配給倆個bolt分別處理,那麼在最後一個bolt接受的時候會作異或運算

RPC介紹

調用客戶端句柄;執行傳送參數
調用本地系統內核發送網絡
消息消息傳送到遠程主機
服務器句柄獲得消息並取得參數
執行遠程過程
執行的過程將結果返回服務器句柄
服務器句柄返回結果,調用遠程系統內核
消息傳回本地主機
客戶句柄由內核接收消息
客戶接收句柄返回的數據
Storm DRPC介紹
分佈式RPc(distributed RPc,DRPc)
Storm裏面引入DRPC主要是利用 storm的實時計算能力來並行化cPU密集型(CPU intensive)的計算任務。DRPc的 storm topology以函數的參數流做爲輸入,而把這些函數調用的返回值做爲 topology的輸出流。
DRPc其實不能算是 storm自己的一個特性,它是經過組合stom的原語stream、spout、bolt、topology而成的一種模式(pattern)。原本應該把DRPc單獨打成一個包的,可是DRPC實在是太有用了,因此咱們把它和storm捆綁在一塊兒。
Distributed RPC是經過一個」DRPC Server」來實現
DRPC Server的總體工做過程以下:
1接收一個RPC請求
2)發送請求到 storm topology
3)從 storm topology接收結果。
4)把結果發回給等待的客戶端。
Storm DRPC配置和示例
Storm提供了一個稱做 LinearDRPCTopologyBuilder的 Topology builder,它把實現DRPc的幾乎全部步驟都自簡化了。
相關代碼地址https://github.com/nathanmarz/storm-starter/blob/master/src/jvm/storm/starter/BasicDRPCTopology.java
實現DRPC步驟:(新建 maven項目,storm04)
1須要修改配置文件內容爲(分別修改每臺機器配置):
vim /usr/local/apache-storm-0.9.2/conf/storm.yaml
drpc.servers:
-"192.168.1.114"
2須要啓動stom的drpc:服務,命令:storm drpc&
3把相應的 topology代碼上傳到stom服務器上
storm jar storm04.jar bhz.drpc1.BasicDRPCTopology exc
4在本地調用遠程 topology便可。
Storm DRPC實例場景
咱們繼續看下一個示例:
主要使用 storm的並行計算能力來進行,咱們在微博、論壇進行轉發帖子的時候,是對u進行轉發,分析給粉絲(關注個人人),那麼每個人的粉絲(關注者可能會有重複的狀況),這個例子就是統計一下帖子(ur)的轉發人數。
相關代碼地址:https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/storm/starter/ReachTopology.java實現步驟以下:第一,獲取當前轉發帖子的人。第二,獲取當前人的粉絲(關注者)。第三,進行粉絲去重。第四,統計人數第五,最後使用drpc遠程調用 topology返回執行結果。Storm Trident介紹Trident是在stom基礎上,一個以實時計算爲目標的高度抽象。它在提供處理大吞吐量數據能力(每秒百萬次消息)的同時,也提供了低延時分佈式查詢和有狀態流式處理的能力。若是你對Pig和 Cascading這種高級批處理工具很瞭解的話,那麼應該很容易理解 Trident,由於他們之間不少的概念和思想都是相似的。Trident提供了 joins,aggregations,grouping,functions,以及 filters等能力。除此以外,Trident還提供了一些與門的原語,從而在基於數據庫戒者其餘存儲的前提下來應付有狀態的遞增式處理。Trident也提供致性(consistent)、有且僅有一次(exactly-once)等語義,這使得咱們在使用 trident toplogy時變得容易。咱們首先熟悉下 Trident的概念:"Stream"是 Trident中的核心數據模型,它被當作一系列的 batch來處理。在Storm集羣的節點之間,一個 stream被劃分紅不少 partition(分區),對流的操做(operation)是在每一個 partition上並行執行的。state Query、partition Persist.、poe(filter、partitionAggregate、對每一個 partition的局部操做包括:function新建 maven工程(storm05)Storm Trident FunctionStorm Trident FilterStorm Trident projectionStorm Trident operationStorm Trident aggregateBatch和 Scout與 Transactiona(一)Trident提供了下面的語義來實現有且有一次被處理的目標一、Tuples是被分紅小的集合(一組 tuple被稱爲一個 batch)被批量處理的。二、每一批 tuples被給定一個惟1D做爲事務ID(txid),當這一個 batch被重發時,tid不變。三、batch和 batch之間的狀態更新時嚴格順序的。好比說 batch3的狀態的更新必需要等到 batch2的狀態更新成功以後才能夠進行。有了這些定義,你的狀態實現能夠檢測到當前 batch是否之前處理過,並根據不一樣的狀況進行不一樣的處理,這個處理取決於你的輸入 spout。有三種不一樣類型的能夠容錯的 sqout:一、non-transactional(無事務支持的 spout)二、transactional(事務支持的 spout)三、opaque transactional(不透明事務支持的 spout)Batch和 Scout與 Transactiona(二)transactional sqout實現一、重發操做:二、重發結果:opaque transactional sqout實現實現ITridentspout接口最通用的AP能夠支持 transactional or opaque transactional語義實現IBatchSpou接口:一個 non-transactional spout實現IPArtitioned Tridentspout接口:一個 transactional spout實現IOpaquePartitioned Tridentspout接口:一個opaque transactional spoutStorm與 KafKaKafka是一種高吞吐量的分佈式發佈訂閱消息系統,它能夠處理消費者規模的網站中的全部動做流數據。這種動做(網頁瀏覽,搜索和其餘用戶的行動是在現代網絡上的許多社會功能的一個關鍵因素。這些數據一般是因爲吞吐量的要求而經過處理日誌和日誌聚合來解決。對於像 Hadoop的同樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是經過 Hadoop的並行加載機制來統一線上和離線的消息處理,也是爲了經過集羣機來提供實時的消費。

相關文章
相關標籤/搜索