什麼業務場景,這個業務場景有個什麼技術挑戰,若是不用MQ可能會很麻煩,可是你如今用了MQ以後帶給了你不少的好處。消息隊列的常見使用場景,其實場景有不少,可是比較核心的有3個:解耦、異步、削峯。linux
A系統發送個數據到BCD三個系統,接口調用發送,那若是E系統也要這個數據呢?那若是C系統如今不須要了呢?如今A系統又要發送第二種數據了呢?並且A系統要時時刻刻考慮BCDE四個系統若是掛了咋辦?要不要重發?我要不要把消息存起來?算法
你須要去考慮一下你負責的系統中是否有相似的場景,就是一個系統或者一個模塊,調用了多個系統或者模塊,互相之間的調用很複雜,維護起來很麻煩。可是其實這個調用是不須要直接同步調用接口的,若是用MQ給他異步化解耦,也是能夠的,你就須要去考慮在你的項目裏,是否是能夠運用這個MQ去進行系統的解耦。數據庫
A系統接收一個請求,須要在本身本地寫庫,還須要在BCD三個系統寫庫,本身本地寫庫要30ms,BCD三個系統分別寫庫要300ms、450ms、200ms。最終請求總延時是30 + 300 + 450 + 200 = 980ms,接近1s,異步後,BCD三個系統分別寫庫的時間,A系統就再也不考慮了。api
天天0點到16點,A系統風平浪靜,每秒併發請求數量就100個。結果每次一到16點~23點,每秒併發請求數量忽然會暴增到1萬條。可是系統最大的處理能力就只能是每秒鐘處理1000個請求啊。怎麼辦?須要咱們進行流量的削峯,讓系統能夠平緩的處理突增的請求。數組
優勢上面已經說了,就是在特殊場景下有其對應的好處,解耦、異步、削峯。緩存
缺點呢? 網絡
系統引入的外部依賴越多,越容易掛掉,原本你就是A系統調用BCD三個系統的接口就行了,ABCD四個系統好好的,沒啥問題,你偏加個MQ進來,萬一MQ掛了怎麼辦?MQ掛了,整套系統崩潰了,業務也就停頓了。session
硬生生加個MQ進來,怎麼保證消息沒有重複消費?怎麼處理消息丟失的狀況?怎麼保證消息傳遞的順序性? 架構
A系統處理完了直接返回成功了,人都覺得你這個請求就成功了;可是問題是,要是BCD三個系統那裏,BD兩個系統寫庫成功了,結果C系統寫庫失敗了,你這數據就不一致了。併發
因此消息隊列實際是一種很是複雜的架構,你引入它有不少好處,可是也得針對它帶來的壞處作各類額外的技術方案和架構來規避掉。
消息發送端應用的消息重複發送,有如下幾種狀況。
l 消息發送端發送消息給消息中間件,消息中間件收到消息併成功存儲,而這時消息中間件出現了問題,致使應用端沒有收到消息發送成功的返回於是進行重試產生了重複。
l 消息中間件由於負載高響應變慢,成功把消息存儲到消息存儲中後,返回「成功」這個結果時超時。
l 消息中間件將消息成功寫入消息存儲,在返回結果時網絡出現問題,致使應用發送端重試,而重試時網絡恢復,由此致使重複。
能夠看到,經過消息發送端產生消息重複的主要緣由是消息成功進入消息存儲後,由於各類緣由使得消息發送端沒有收到「成功」的返回結果,而且又有重試機制,於是致使重複。
消息到達了消息存儲,由消息中間件進行向外的投遞時產生重複,有如下幾種狀況。
l 消息被投遞到消息接收者應用進行處理,處理完畢後應用出問題了,消息中間件不知道消息處理結果,會再次投遞。
l 消息被投遞到消息接收者應用進行處理,處理完畢後網絡出現問題了,消息中間件沒有收到消息處理結果,會再次投遞。
l 消息被投遞到消息接收者應用進行處理,處理時間比較長,消息中間件由於消息超時會再次投遞。
l 消息被投遞到消息接收者應用進行處理,處理完畢後消息中間件出問題了,沒能收到消息結果並處理,會再次投遞
l 消息被投遞到消息接收者應用進行處理,處理完畢後消息中間件收到結果可是遇到消息存儲故障,沒能更新投遞狀態,會再次投遞。
能夠看到,在投遞過程當中產生的消息重複接收主要是由於消息接收者成功處理完消息後,消息中間件不能及時更新投遞狀態形成的。
那麼有什麼辦法能夠解決呢?主要是要求消息接收者來處理這種重複的狀況,也就是要求消息接收者的消息處理是冪等操做。
對於消息接收端的狀況,冪等的含義是採用一樣的輸入屢次調用處理函數,獲得一樣的結果。例如,一個SQL操做
update stat_table set count= 10 where id =1
這個操做屢次執行,id等於1的記錄中的 count字段的值都爲10,這個操做就是冪等的,咱們不用擔憂這個操做被重複。
再來看另一個SQL操做
update stat_table set count= count +1 where id= 1;
這樣的SQL操做就不是冪等的,一旦重複,結果就會產生變化。
所以應對消息重複的辦法是,使消息接收端的處理是一個冪等操做。這樣的作法下降了消息中間件的總體複雜性,不過也給使用消息中間件的消息接收端應用帶來了必定的限制和門檻。
多版本併發控制,樂觀鎖的一種實現,在生產者發送消息時進行數據更新時須要帶上數據的版本號,消費者去更新時須要去比較持有數據的版本號,版本號不一致的操做沒法成功。例如博客點贊次數自動+1的接口:
public boolean addCount(Long id, Long version);
update blogTable set count= count+1,version=version+1 where id=321 and version=123
每個version只有一次執行成功的機會,一旦失敗了生產者必須從新獲取數據的最新版本號再次發起更新。
利用數據庫表單的特性來實現冪等,經常使用的一個思路是在表上構建惟一性索引,保證某一類數據一旦執行完畢,後續一樣的請求再也不重複處理了(利用一張日誌表來記錄已經處理成功的消息的ID,若是新到的消息ID已經在日誌表中,那麼就再也不處理這條消息。)
以電商平臺爲例子,電商平臺上的訂單id就是最適合的token。當用戶下單時,會經歷多個環節,好比生成訂單,減庫存,減優惠券等等。每個環節執行時都先檢測一下該訂單id是否已經執行過這一步驟,對未執行的請求,執行操做並緩存結果,而對已經執行過的id,則直接返回以前的執行結果,不作任何操做。這樣能夠在最大程度上避免操做的重複執行問題,緩存起來的執行結果也能用於事務的控制等。
要保證消息的可靠性,除了消息的持久化,還包括兩個方面,一是生產者發送的消息能夠被ActiveMQ收到,二是消費者收到了ActiveMQ發送的消息。
非持久化又不在事務中的消息,可能會有消息的丟失。爲保證消息能夠被ActiveMQ收到,咱們應該採用事務消息或持久化消息。
對消息的確認有4種機制
一、 AUTO_ACKNOWLEDGE = 1 自動確認
二、 CLIENT_ACKNOWLEDGE = 2 客戶端手動確認
三、 DUPS_OK_ACKNOWLEDGE = 3 自動批量確認
四、 SESSION_TRANSACTED = 0 事務提交併確認
ACK_MODE描述了Consumer與broker確認消息的方式(時機),好比當消息被Consumer接收以後,Consumer將在什麼時候確認消息。因此ack_mode描述的不是producer於broker之間的關係,而是customer於broker之間的關係。
對於broker而言,只有接收到ACK指令,纔會認爲消息被正確的接收或者處理成功了,經過ACK,能夠在consumer與Broker之間創建一種簡單的「擔保」機制.
自動確認
「同步」(receive)方法返回message給消息時會當即確認。
在"異步"(messageListener)方式中,將會首先調用listener.onMessage(message),若是onMessage方法正常結束,消息將會正常確認。若是onMessage方法異常,將致使消費者要求ActiveMQ重發消息。
客戶端手動確認,這就意味着AcitveMQ將不會「自做主張」的爲你ACK任何消息,開發者須要本身擇機確認。
咱們能夠在當前消息處理成功以後,當即調用message.acknowledge()方法來"逐個"確認消息,這樣能夠儘量的減小因網絡故障而致使消息重發的個數;固然也能夠處理多條消息以後,間歇性的調用acknowledge方法來一次確認多條消息,減小ack的次數來提高consumer的效率,不過須要自行權衡。
相似於AUTO_ACK確認機制,爲自動批量確認而生,並且具備「延遲」確認的特色,ActiveMQ會根據內部算法,在收到必定數量的消息自動進行確認。在此模式下,可能會出現重複消息,何時?當consumer故障重啓後,那些還沒有ACK的消息會從新發送過來。
當session使用事務時,就是使用此模式。當決定事務中的消息能夠確認時,必須調用session.commit()方法,commit方法將會致使當前session的事務中全部消息當即被確認。在事務開始以後的任什麼時候機調用rollback(),意味着當前事務的結束,事務中全部的消息都將被重發。固然在commit以前拋出異常,也會致使事務的rollback。
生產者將數據發送到RabbitMQ的時候,可能數據就在半路給搞丟了,由於網絡啥的問題,都有可能。此時能夠選擇用RabbitMQ提供的事務功能,就是生產者發送數據以前開啓RabbitMQ事務(channel.txSelect),而後發送消息,若是消息沒有成功被RabbitMQ接收到,那麼生產者會收到異常報錯,此時就能夠回滾事務(channel.txRollback),而後重試發送消息;若是收到了消息,那麼能夠提交事務(channel.txCommit)。可是問題是,RabbitMQ事務機制一搞,基本上吞吐量會下來,由於太耗性能。
因此通常來講,若是要確保RabbitMQ的消息別丟,能夠開啓confirm模式,在生產者那裏設置開啓confirm模式以後,你每次寫的消息都會分配一個惟一的id,而後若是寫入了RabbitMQ中,RabbitMQ會給你回傳一個ack消息,告訴你說這個消息ok了。若是RabbitMQ沒能處理這個消息,會回調你一個nack接口,告訴你這個消息接收失敗,你能夠重試。並且你能夠結合這個機制本身在內存裏維護每一個消息id的狀態,若是超過必定時間還沒接收到這個消息的回調,那麼你能夠重發。
事務機制和cnofirm機制最大的不一樣在於,事務機制是同步的,你提交一個事務以後會阻塞在那兒,可是confirm機制是異步的,你發送個消息以後就能夠發送下一個消息,而後那個消息RabbitMQ接收了以後會異步回調你一個接口通知你這個消息接收到了。
因此通常在生產者這塊避免數據丟失,都是用confirm機制的。
就是RabbitMQ本身弄丟了數據,這個你必須開啓RabbitMQ的持久化,就是消息寫入以後會持久化到磁盤,哪怕是RabbitMQ本身掛了,恢復以後會自動讀取以前存儲的數據,通常數據不會丟。除非極其罕見的是,RabbitMQ還沒持久化,本身就掛了,可能致使少許數據會丟失的,可是這個機率較小。
設置持久化有兩個步驟,第一個是建立queue和交換器的時候將其設置爲持久化的,這樣就能夠保證RabbitMQ持久化相關的元數據,可是不會持久化queue裏的數據;第二個是發送消息的時候將消息的deliveryMode設置爲2,就是將消息設置爲持久化的,此時RabbitMQ就會將消息持久化到磁盤上去。必需要同時設置這兩個持久化才行,RabbitMQ哪怕是掛了,再次重啓,也會從磁盤上重啓恢復queue,恢復這個queue裏的數據。
並且持久化能夠跟生產者那邊的confirm機制配合起來,只有消息被持久化到磁盤以後,纔會通知生產者ack了,因此哪怕是在持久化到磁盤以前,RabbitMQ掛了,數據丟了,生產者收不到ack,你也是能夠本身重發的。
哪怕是你給RabbitMQ開啓了持久化機制,也有一種可能,就是這個消息寫到了RabbitMQ中,可是還沒來得及持久化到磁盤上,結果不巧,此時RabbitMQ掛了,就會致使內存裏的一點點數據會丟失。
RabbitMQ若是丟失了數據,主要是由於你消費的時候,剛消費到,還沒處理,結果進程掛了,好比重啓了,那麼就尷尬了,RabbitMQ認爲你都消費了,這數據就丟了。
這個時候得用RabbitMQ提供的ack機制,簡單來講,就是你關閉RabbitMQ自動ack,能夠經過一個api來調用就行,而後每次你本身代碼裏確保處理完的時候,再程序裏ack一把。這樣的話,若是你還沒處理完,不就沒有ack?那RabbitMQ就認爲你還沒處理完,這個時候RabbitMQ會把這個消費分配給別的consumer去處理,消息是不會丟的。
惟一可能致使消費者弄丟數據的狀況,就是說,你那個消費到了這個消息,而後消費者那邊自動提交了offset,讓kafka覺得你已經消費好了這個消息,其實你剛準備處理這個消息,你還沒處理,你本身就掛了,此時這條消息就丟咯。
你們都知道kafka會自動提交offset,那麼只要關閉自動提交offset,在處理完以後本身手動提交offset,就能夠保證數據不會丟。可是此時確實仍是會重複消費,好比你剛處理完,還沒提交offset,結果本身掛了,此時確定會重複消費一次,本身保證冪等性就行了。
生產環境碰到的一個問題,就是說咱們的kafka消費者消費到了數據以後是寫到一個內存的queue裏先緩衝一下,結果有的時候,你剛把消息寫入內存queue,而後消費者會自動提交offset。
而後此時咱們重啓了系統,就會致使內存queue裏還沒來得及處理的數據就丟失了
這塊比較常見的一個場景,就是kafka某個broker宕機,而後從新選舉partiton的leader時。你們想一想,要是此時其餘的follower恰好還有些數據沒有同步,結果此時leader掛了,而後選舉某個follower成leader以後,他不就少了一些數據?這就丟了一些數據啊。
因此此時通常是要求起碼設置以下4個參數:
給這個topic設置replication.factor參數:這個值必須大於1,要求每一個partition必須有至少2個副本。
在kafka服務端設置min.insync.replicas參數:這個值必須大於1,這個是要求一個leader至少感知到有至少一個follower還跟本身保持聯繫,沒掉隊,這樣才能確保leader掛了還有一個follower吧。
在producer端設置acks=all:這個是要求每條數據,必須是寫入全部replica以後,才能認爲是寫成功了。
在producer端設置retries=MAX(很大很大很大的一個值,無限次重試的意思):這個是要求一旦寫入失敗,就無限重試,卡在這裏了。
若是按照上述的思路設置了ack=all,必定不會丟,要求是,你的leader接收到消息,全部的follower都同步到了消息以後,才認爲本次寫成功了。若是沒知足這個條件,生產者會自動不斷的重試,重試無限次。
從根本上說,異步消息是不該該有順序依賴的。在MQ上估計是無法解決。要實現嚴格的順序消息,簡單且可行的辦法就是:保證生產者 - MQServer - 消費者是一對一對一的關係。
1、經過高級特性consumer獨有消費者(exclusive consumer)
queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");
consumer = session.createConsumer(queue);
當在接收信息的時候,有多個獨佔消費者的時候,只有一個獨佔消費者能夠接收到消息。
獨佔消息就是在有多個消費者同時消費一個queue時,能夠保證只有一個消費者能夠消費消息,這樣雖然保證了消息的順序問題,不過也帶來了一個問題,就是這個queue的全部消息將只會在這一個主消費者上消費,其餘消費者將閒置,達不到負載均衡分配,而實際業務咱們可能更多的是這樣的場景,好比一個訂單會發出一組順序消息,咱們只要求這一組消息是順序消費的,而訂單與訂單之間又是能夠並行消費的,不須要順序,由於順序也沒有任何意義,有沒有辦法作到呢?能夠利用activemq的另外一個高級特性之messageGroup
2、利用Activemq的高級特性:messageGroups
Message Groups特性是一種負載均衡的機制。在一個消息被分發到consumer以前,broker首先檢查消息JMSXGroupID屬性。若是存在,那麼broker會檢查是否有某個consumer擁有這個message group。若是沒有,那麼broker會選擇一個consumer,並將它關聯到這個message group。此後,這個consumer會接收這個message group的全部消息,直到:Consumer被關閉。Message group被關閉,經過發送一個消息,並設置這個消息的JMSXGroupSeq爲-1
bytesMessage.setStringProperty("JMSXGroupID", "constact-20100000002");
bytesMessage.setIntProperty("JMSXGroupSeq", -1);
如上圖所示,同一個queue中,擁有相同JMSXGroupID的消息將發往同一個消費者,解決順序問題,不一樣分組的消息又能被其餘消費者並行消費,解決負載均衡的問題。
若是有順序依賴的消息,要保證消息有一個hashKey,相似於數據庫表分區的的分區key列。保證對同一個key的消息發送到相同的隊列。A用戶產生的消息(包括建立消息和刪除消息)都按A的hashKey分發到同一個隊列。只須要把強相關的兩條消息基於相同的路由就好了,也就是說通過m1和m2的在路由表裏的路由是同樣的,那天然m1會優先於m2去投遞。並且一個queue只對應一個consumer。
一個topic,一個partition,一個consumer,內部單線程消費
rabbitmq,rabbitmq是能夠設置過時時間的,就是TTL,若是消息在queue中積壓超過必定的時間,而又沒有設置死信隊列機制,就會被rabbitmq給清理掉,這個數據就沒了。
ActiveMQ則經過更改配置,支持消息的定時發送。
發生了線上故障,幾千萬條數據在MQ裏積壓好久。是修復consumer的問題,讓他恢復消費速度,而後等待幾個小時消費完畢?這是個解決方案。不過有時候咱們還會進行臨時緊急擴容。
一個消費者一秒是1000條,一秒3個消費者是3000條,一分鐘是18萬條。1000多萬條,因此若是積壓了幾百萬到上千萬的數據,即便消費者恢復了,也須要大概1小時的時間才能恢復過來。
通常這個時候,只能操做臨時緊急擴容了,具體操做步驟和思路以下:
先修復consumer的問題,確保其恢復消費速度,而後將現有cnosumer都停掉。
新建一個topic,partition是原來的10倍,臨時創建好原先10倍或者20倍的queue數量。而後寫一個臨時的分發數據的consumer程序,這個程序部署上去消費積壓的數據,消費以後不作耗時的處理,直接均勻輪詢寫入臨時創建好的10倍數量的queue。
接着臨時徵用10倍的機器來部署consumer,每一批consumer消費一個臨時queue的數據。
這種作法至關因而臨時將queue資源和consumer資源擴大10倍,以正常的10倍速度來消費數據。
等快速消費完積壓數據以後,再恢復原先部署架構,從新用原先的consumer機器來消費消息。
Kafka中每一個Topic都包含一個或多個Partition,不一樣Partition可位於不一樣節點。同時Partition在物理上對應一個本地文件夾,每一個Partition包含一個或多個Segment,每一個Segment包含一個數據文件和一個與之對應的索引文件。在邏輯上,能夠把一個Partition看成一個很是長的數組,可經過這個「數組」的索引(offset)去訪問其數據。
一方面,因爲不一樣Partition可位於不一樣機器,所以能夠充分利用集羣優點,實現機器間的並行處理。另外一方面,因爲Partition在物理上對應一個文件夾,即便多個Partition位於同一個節點,也可經過配置讓同一節點上的不一樣Partition置於不一樣的disk drive上,從而實現磁盤間的並行處理,充分發揮多磁盤的優點。
利用多磁盤的具體方法是,將不一樣磁盤mount到不一樣目錄,而後在server.properties中,將log.dirs設置爲多目錄(用逗號分隔)。Kafka會自動將全部Partition儘量均勻分配到不一樣目錄也即不一樣目錄(也即不一樣disk)上。
Partition是最小併發粒度,Partition個數決定了可能的最大並行度。。
- RDBMS的讀寫分離即爲典型的Master-Slave方案
- 同步複製可保證強一致性但會影響可用性
- 異步複製可提供高可用性但會下降一致性
- 主要用於去中心化的分佈式系統中。
- N表明總副本數,W表明每次寫操做要保證的最少寫成功的副本數,R表明每次讀至少要讀取的副本數
- 當W+R>N時,可保證每次讀取的數據至少有一個副本擁有最新的數據
- 多個寫操做的順序難以保證,可能致使多副本間的寫操做順序不一致。Dynamo經過向量時鐘保證最終一致性
- Google的Chubby,Zookeeper的原子廣播協議(Zab),RAFT等
Kafka的數據複製是以Partition爲單位的。而多個備份間的數據複製,經過Follower向Leader拉取數據完成。從一這點來說,Kafka的數據複製方案接近於上文所講的Master-Slave方案。不一樣的是,Kafka既不是徹底的同步複製,也不是徹底的異步複製,而是基於ISR的動態複製方案。
ISR,也即In-sync Replica。每一個Partition的Leader都會維護這樣一個列表,該列表中,包含了全部與之同步的Replica(包含Leader本身)。每次數據寫入時,只有ISR中的全部Replica都複製完,Leader纔會將其置爲Commit,它才能被Consumer所消費。
這種方案,與同步複製很是接近。但不一樣的是,這個ISR是由Leader動態維護的。若是Follower不能緊「跟上」Leader,它將被Leader從ISR中移除,待它又從新「跟上」Leader後,會被Leader再次加加ISR中。每次改變ISR後,Leader都會將最新的ISR持久化到Zookeeper中。
因爲Leader可移除不能及時與之同步的Follower,故與同步複製相比可避免最慢的Follower拖慢總體速度,也即ISR提升了系統可用性。
ISR中的全部Follower都包含了全部Commit過的消息,而只有Commit過的消息纔會被Consumer消費,故從Consumer的角度而言,ISR中的全部Replica都始終處於同步狀態,從而與異步複製方案相比提升了數據一致性。
ISR可動態調整,極限狀況下,能夠只包含Leader,極大提升了可容忍的宕機的Follower的數量。與Majority Quorum方案相比,容忍相同個數的節點失敗,所要求的總節點數少了近一半。
Kafka的整個設計中,Partition至關於一個很是長的數組,而Broker接收到的全部消息順序寫入這個大數組中。同時Consumer經過Offset順序消費這些數據,而且不刪除已經消費的數據,從而避免了隨機寫磁盤的過程。
因爲磁盤有限,不可能保存全部數據,實際上做爲消息系統Kafka也不必保存全部數據,須要刪除舊的數據。而這個刪除過程,並不是經過使用「讀-寫」模式去修改文件,而是將Partition分爲多個Segment,每一個Segment對應一個物理文件,經過刪除整個文件的方式去刪除Partition內的數據。這種方式清除舊數據的方式,也避免了對文件的隨機寫操做。
在存儲機制上,使用了Log Structured Merge Trees(LSM) 。
注:Log Structured Merge Trees(LSM),谷歌 「BigTable」 的論文,中提出,LSM是當前被用在許多產品的文件結構策略:HBase, Cassandra, LevelDB, SQLite,Kafka。LSM被設計來提供比傳統的B+樹或者ISAM更好的寫操做吞吐量,經過消去隨機的本地更新操做來達到這個目標。這個問題的本質仍是磁盤隨機操做慢,順序讀寫快。這二種操做存在巨大的差距,不管是磁盤仍是SSD,並且快至少三個數量級。
使用Page Cache的好處以下
- I/O Scheduler會將連續的小塊寫組裝成大塊的物理寫從而提升性能
- I/O Scheduler會嘗試將一些寫操做從新按順序排好,從而減小磁盤頭的移動時間
- 充分利用全部空閒內存(非JVM內存)。若是使用應用層Cache(即JVM堆內存),會增長GC負擔
- 讀操做可直接在Page Cache內進行。若是消費和生產速度至關,甚至不須要經過物理磁盤(直接經過Page Cache)交換數據
- 若是進程重啓,JVM內的Cache會失效,但Page Cache仍然可用
Broker收到數據後,寫磁盤時只是將數據寫入Page Cache,並不保證數據必定徹底寫入磁盤。從這一點看,可能會形成機器宕機時,Page Cache內的數據未寫入磁盤從而形成數據丟失。可是這種丟失只發生在機器斷電等形成操做系統不工做的場景,而這種場景徹底能夠由Kafka層面的Replication機制去解決。若是爲了保證這種狀況下數據不丟失而強制將Page Cache中的數據Flush到磁盤,反而會下降性能。也正因如此,Kafka雖然提供了flush.messages和flush.ms兩個參數將Page Cache中的數據強制Flush到磁盤,可是Kafka並不建議使用。
若是數據消費速度與生產速度至關,甚至不須要經過物理磁盤交換數據,而是直接經過Page Cache交換數據。同時,Follower從Leader Fetch數據時,也可經過Page Cache完成。
注:Page Cache,又稱pcache,其中文名稱爲頁高速緩衝存儲器,簡稱頁高緩。page cache的大小爲一頁,一般爲4K。在linux讀寫文件時,它用於緩存文件的邏輯內容,從而加快對磁盤上映像和數據的訪問。 是Linux操做系統的一個特點。
Broker的log.dirs配置項,容許配置多個文件夾。若是機器上有多個Disk Drive,可將不一樣的Disk掛載到不一樣的目錄,而後將這些目錄都配置到log.dirs裏。Kafka會盡量將不一樣的Partition分配到不一樣的目錄,也即不一樣的Disk上,從而充分利用了多Disk的優點。
Kafka中存在大量的網絡數據持久化到磁盤(Producer到Broker)和磁盤文件經過網絡發送(Broker到Consumer)的過程。這一過程的性能直接影響Kafka的總體吞吐量。
傳統模式下的四次拷貝與四次上下文切換
以將磁盤文件經過網絡發送爲例。傳統模式下,通常使用以下僞代碼所示的方法先將文件數據讀入內存,而後經過Socket將內存中的數據發送出去。
buffer = File.readSocket.send(buffer)
這一過程實際上發生了四次數據拷貝。首先經過系統調用將文件數據讀入到內核態Buffer(DMA拷貝),而後應用程序將內存態Buffer數據讀入到用戶態Buffer(CPU拷貝),接着用戶程序經過Socket發送數據時將用戶態Buffer數據拷貝到內核態Buffer(CPU拷貝),最後經過DMA拷貝將數據拷貝到NIC Buffer。同時,還伴隨着四次上下文切換。
而Linux 2.4+內核經過sendfile系統調用,提供了零拷貝。數據經過DMA拷貝到內核態Buffer後,直接經過DMA拷貝到NIC Buffer,無需CPU拷貝。這也是零拷貝這一說法的來源。除了減小數據拷貝外,由於整個讀文件-網絡發送由一個sendfile調用完成,整個過程只有兩次上下文切換,所以大大提升了性能。
從具體實現來看,Kafka的數據傳輸經過Java NIO的FileChannel的transferTo和transferFrom方法實現零拷貝。
注: transferTo和transferFrom並不保證必定能使用零拷貝。其實是否能使用零拷貝與操做系統相關,若是操做系統提供sendfile這樣的零拷貝系統調用,則這兩個方法會經過這樣的系統調用充分利用零拷貝的優點,不然並不能經過這兩個方法自己實現零拷貝。
批處理是一種經常使用的用於提升I/O性能的方式。對Kafka而言,批處理既減小了網絡傳輸的Overhead,又提升了寫磁盤的效率。
Kafka 的send方法並不是當即將消息發送出去,而是經過batch.size和linger.ms控制實際發送頻率,從而實現批量發送。
因爲每次網絡傳輸,除了傳輸消息自己之外,還要傳輸很是多的網絡協議自己的一些內容(稱爲Overhead),因此將多條消息合併到一塊兒傳輸,可有效減小網絡傳輸的Overhead,進而提升了傳輸效率。
Kafka從0.7開始,即支持將數據壓縮後再傳輸給Broker。除了能夠將每條消息單獨壓縮而後傳輸外,Kafka還支持在批量發送時,將整個Batch的消息一塊兒壓縮後傳輸。數據壓縮的一個基本原理是,重複數據越多壓縮效果越好。所以將整個Batch的數據一塊兒壓縮能更大幅度減少數據量,從而更大程度提升網絡傳輸效率。
Broker接收消息後,並不直接解壓縮,而是直接將消息以壓縮後的形式持久化到磁盤。Consumer Fetch到數據後再解壓縮。所以Kafka的壓縮不只減小了Producer到Broker的網絡傳輸負載,同時也下降了Broker磁盤操做的負載,也下降了Consumer與Broker間的網絡傳輸量,從而極大得提升了傳輸效率,提升了吞吐量。
Kafka消息的Key和Payload(或者說Value)的類型可自定義,只需同時提供相應的序列化器和反序列化器便可。所以用戶能夠經過使用快速且緊湊的序列化-反序列化方式(如Avro,Protocal Buffer)來減小實際網絡傳輸和磁盤存儲的數據規模,從而提升吞吐率。這裏要注意,若是使用的序列化方法太慢,即便壓縮比很是高,最終的效率也不必定高。