kafka介紹和使用

1.消息系統簡介

1.1爲何要用消息系統 ?

解耦 各位系統之間經過消息系統這個統一的接口交換數據,無須瞭解彼此的存在;
冗餘 部分消息系統具備消息持久化能力,可規避消息處理前丟失的風險;html

靈活性和消除峯值 在訪問量劇增的狀況下,應用仍然須要繼續發揮做用,使用消息隊列可以使關鍵組件頂住突發的訪問壓力,而不會由於突發的超負荷的請求而徹底崩潰;(節省資源)
可恢復性 系統中部分組件失效並不會影響整個系統,它恢復後仍然可從消息系統中獲取並處理數據;數據庫

順序保障 在大多使用場景下,數據處理的順序都很重要。大部分消息隊列原本就是排序的,而且能保證數據會按照特定的順序來處理。Kafka保證一個Partition內的消息的有序性;
異步通訊 在不須要當即處理請求的場景下,能夠將請求放入消息系統,合適的時候再處理。apache

 

1.2.有哪些消息系統 ?

RabbitMQ Erlang編寫,支持多協議 AMQP,XMPP,SMTP,STOMP。支持負載均衡、數據持久化。同時支持Peer-to-Peer和發佈/訂閱模式;
Redis 基於Key-Value對的NoSQL數據庫,同時支持MQ功能,可作輕量級隊列服務使用。就入隊操做而言, Redis對短消息(小於10KB)的性能比RabbitMQ好,長消息的性能比RabbitMQ差;
ZeroMQ 輕量級,不須要單獨的消息服務器或中間件,應用程序自己扮演該角色,Peer-to-Peer。它實質上是 一個庫,須要開發人員本身組合多種技術,使用複雜度高;
ActiveMQ JMS實現,Peer-to-Peer,支持持久化、XA事務;api

MetaQ/RocketMQ 純Java實現,發佈/訂閱消息系統,支持本地事務和XA分佈式事務;
Kafka 高性能跨語言的分佈式發佈/訂閱消息系統,數據持久化,全分佈式,同時支持實時在線處理和離線數據處理。Apache Kafka相對於ActiveMQ是一個很是輕量級的消息系統,除了性能很是好以外,仍是一個工做良好的分佈式系統。緩存

1.3.Kafka設計目標是什麼?

高吞吐率 在廉價的商用機器上單機可支持每秒100萬條消息的讀寫;
消息持久化 全部消息均被持久化到磁盤,無消息丟失,支持消息重放;
徹底分佈式 Producer,Broker,Consumer均支持水平擴展,同時適應在線流處理和離線批處理。安全

2.kafka簡介和架構

2.1.核心概念

       Broker:Kafka集羣包含一個或多個服務器,這種服務器被稱爲broker;服務器

       Message:消息是Kafka中最基本的數據單元,主要有key和value構成;真正有效的是消息是value數據,key只是做爲消息路由分區使用;網絡

Topic:每條發佈到Kafka集羣的消息都有一個類別,這個類別被稱爲Topic。(物理上不一樣Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存於一個或多個broker上但用戶只需指定消息的Topic便可生產或消費數 據而沒必要關心數據存於何處),強調的是kafka不保證topic消息有序;多線程

Partition:Parition是物理上的概念,每一個Topic包含一個或多個Partition;kafka只保證一個partiton是有序的;經過配置來設置partition中的文件大小和文件保留策略;
Producer:負責發佈消息到Kafka broker;
Consumer:消息消費者,向Kafka broker讀取消息的客戶端;架構

Consumer Group:官方稱爲邏輯上的訂閱者,每一個Consumer屬於一個特定的Consumer Group(可爲每一個Consumer指定group name,若不指定group name則屬於默認的group),消息的單播和多播都是基於消費組來實現的,消費組中的消費者不是越多越好,消費者數量超過度區數量時,回致使消費者分配不到資源,形成資源浪費;
Offset:每一個partition都由一系列有序的、不可變的消息組成,這些消息被連續的追加到partition中。partition中的每一個消息都有一個連續的序列號叫作offset,用於partition惟一標識一條消息。

topic的配置可參考:
http://kafka.apache.org/documentation.html#topic-config

2.2.kafka架構

       

         如上圖所示,一個典型的Kafka集羣中包含若干Producer,若干broker(broker數量越多,集羣吞吐率越高),若干Consumer Group,以及一個Zookeeper集羣。Kafka經過Zookeeper管理集羣配置,選舉leader,在消費組發生變化時進行rebalance(新版本不依賴)。Producer使用push模式將消息發佈到broker,Consumer使用pull模式從broker訂閱並消費消息。

3.kafka的客戶端設計

3.1.生產者設計

3.1.1.producer的使用

在Kafka老版本中,同步和異步都是分開成不一樣的方法來實現的,最新的都是由KafkaProducer來實現,經過掉future的get阻塞線程來實現同步。實際上二者底層實現相同,都是經過一步實現的。

3.1.2.producer發送消息的過程(0.10.2.1)

        主要是兩個線程的操做:

        主線程封裝消息成ProducerRecord對象,並調用append方法將消息追加RecordAccumulator中暫時存儲;
        Sender線程負責將消息構形成請求,並從RecordAccumulator取出消息消息並批量發送。

1 ProducerIntercptor對消息進行攔截;
2 Serialzer對key和value進行序列化;
3 Partitioner對消息選擇合適的分區;
4 RecordAccumulator收集消息,實現批量發送;
5 Sender從RecordAccumulator獲取消息;
6 構造ClientRequest;
7 將ClientRequest交給Network,準備發送;
8 Network將請求放入KafkaChannel的緩存;
9 發送請求;
10 收到響應,調用ClientRequest;
11 調用RecordBatch的回調函數,最終調用到每個消息上註冊的回調函數。

3.1.3.Product方法詳解

  主線程的send方法:

一、首先調用waitOnMetadata()方法確保該主題topic對應的元數據metadata是可用的;
二、計算剩餘等待時間remainingWaitMs;
三、根據record中topic、key,利用valueSerializer獲得序列化key:serializedKey;
四、根據record中topic、value,利用valueSerializer獲得序列化value:serializedValue;
五、調用partition()方法得到分區號partition;
六、計算序列化後的key、value及其offset、size所佔大小serializedSize;
七、調用ensureValidRecordSize()方法確保記錄大小serializedSize是有效的;
八、根據record中的topic和partition構造TopicPartition實例tp;
九、調用accumulator的append()方法添加記錄,得到記錄添加結果RecordAppendResult類型的result;
十、根據結果result的batchIsFull或newBatchCreated肯定是否執行sender的wakeup();
十一、返回result中的future。

3.1.4.Replication設計

當某個Topic的replication-factor爲N且N大於1時,每一個Partition都會有N個副本(Replication);
Replica的個數小於等於Broker數,即對每一個Partition而言每一個Broker上只會有一個Replica,所以 可用Broker ID表示Replication;
全部Partition的全部Replication默認狀況會均勻分佈到全部Broker上。

要解決的問題:

1:如何Propagate消息?

Producer在發佈消息到某個Partition時,先經過Zookeeper找到該Partition的Leader,而後不管該Topic的Replication Factor爲多少(也即該Partition有多少個Replica),Producer只將該消息發送到該Partition的Leader。Leader會將該消息寫入其本地Log。每一個Follower都從Leader pull數據。這種方式上,Follower存儲的數據順序與Leader保持一致。Follower在收到該消息並寫入其Log後,向Leader發送ACK。

2:什麼時候Commit?

        一條消息只有被ISR裏的全部Follower都從Leader複製過去纔會被認爲已提交。這樣就避免了部分數據被寫進了Leader,還沒來得及被任何Follower複製就宕機了,而形成數據丟失(Consumer沒法消費這些數據)。而對於Producer而言,它能夠選擇是否等待消息commit,這能夠經過request.required.acks來設置。這種機制確保了只要ISR有一個或以上的Follower,一條被commit的消息就不會丟失。 

3:如何處理Replica恢復?

Kafka producer的ack有3中機制,初始化producer時的producerconfig能夠經過配置request.required.acks不一樣的值來實現。
0:這意味着生產者producer不等待來自broker同步完成的確認繼續發送下一條(批)消息。此選項提供最低的延遲但最弱的耐久性保證(當服務器發生故障時某些數據會丟失,如leader已死,但producer並不知情,發出去的信息broker就收不到)。1:這意味着producer在leader已成功收到的數據並獲得確認後發送下一條message。此選項提供了更好的耐久性爲客戶等待服務器確認請求成功(被寫入死亡leader但還沒有複製將失去了惟一的消息)。-1:這意味着producer在follower副本確認接收到數據後纔算一次發送完成。 此選項提供最好的耐久性,咱們保證沒有信息將丟失,只要至少一個同步副本保持存活。

       三種機制,性能依次遞減 (producer吞吐量下降),數據健壯性則依次遞增。

4:如何處理Replica所有宕機

機器恢復,lead選舉。(目前都是動態配置);

1.等待ISR中的任一個Replica「活」過來,而且選它做爲Leader 2.選擇第一個「活」過來的Replica(不必定是ISR中的)做爲Leader。

3.2.Consumer設計

3.2.1.建立一個消費者

消費者都是線程不安全的,若是發現多線程調用,直接拋異常。

consumer 採用 pull 模式從 broker 中讀取數據。

push 模式很難適應消費速率不一樣的消費者,由於消息發送速率是由 broker 決定的。它的目標是儘量以最快速度傳遞消息,可是這樣很容易形成 consumer 來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而 pull 模式則能夠根據 consumer 的消費能力以適當的速率消費消息。

對於 Kafka 而言,pull模式更合適,它可簡化 broker 的設計,consumer 可自主控制消費消息的速率,同時 consumer 能夠本身控制消費方式——便可批量消費也可逐條消費,同時還能選擇不一樣的提交方式從而實現不一樣的傳輸語義。

consumer group

        CG是kafka提供的可擴展且具備容錯性的消費者機制。組內能夠有多個消費者或消費者實例(consumer instance),它們共享一個公共的ID,即group ID。組內的全部消費者協調在一塊兒來消費訂閱主題(subscribed topics)的全部分區(partition)。kafka 的分配單位是 patition。每一個 consumer 都屬於一個 group,一個 partition 只能被同一個 group 內的一個 consumer 所消費(也就保障了一個消息只能被 group 內的一個 consuemr 所消費),可是多個 group 能夠同時消費這個 partition。consumer group下訂閱的topic下的每一個分區只能分配給某個group下的一個consumer(固然該分區還能夠被分配給其餘group)

3.2.2.消費獲取

        傳遞保證語義有三個級別:At most once: 最多一次,消息可能會丟失,但不會重複傳遞,At least once: 至少一次,消息毫不會丟,可是可能會重複傳遞,Exactly once: 每一條消息只會被傳遞一次。

        Kafka服務器端並不會記錄消費者的消費位置,而是由消費者本身決定如何保存其消費的offset. 0.8.2版本以前消費者會將其消費位置記錄zookeeper中,在後面的新版本中,消費者爲了緩解zookeeper集羣的壓力,在Kafka服務器端添加了一個名字是__consusmer_offsets的內部topic,簡稱爲offset topic,他能夠用來保存消費者提交的offset,當出現消費者上線或者下線時會觸發消費者組的rebalance操做,對partitions從新進行分配,等待rebalance完成以後,消費者就能夠讀取offset topic中的記錄的offset,並今後offset開始繼續消費。你也能夠根據業務需求將offset存儲在別的存儲介質中,好比數據庫等

3.2.3.rebalance

觸發rebalance的時機

# 有新的消費者加入;
# 有消費者宕機或者下線;
# 消費者主動退出消費者組;
# 消費者組訂閱的topic出現分區數量變化;
# 消費者調用unsubscrible取消對某topic的訂閱。

1. 將目標 topic 下的全部 partirtion 排序,存於PT;
2. 對某 consumer group 下全部 consumer 排序,存於 CG,第 i 個consumer 記爲 Ci;
3. N=size(PT)/size(CG),向上取整;
4. 解除 Ci 對原來分配的 partition 的消費權(i從0開始);
5. 將第i*N到(i+1)*N-1個 partition 分配給 Ci。

4.kafka高性能之道

4.1.高效使用磁盤 

順序寫磁盤,順序寫磁盤性能高於隨機寫內存;

追加寫:數據不更新,不作數據級別的刪除,文件級別的刪除;

支持多目錄(多磁盤)。

4.2.零拷貝

這裏的零拷貝值得是cpu級別的拷貝,使用nio的調用操做系統的sendfile實現零拷貝,同時減小兩次上下文切換和1次系統調用。

傳統意義上的拷貝:

NIO拷貝:

4.3.批處理和壓縮

        kafka的生產者和消費者均支持批量處理數據,指定緩存的消息達到某個量的時候就發出去,或者緩存了固定的時間後就發送出去,如100條消息就發送,或者每5秒發送一次

這種策略將大大減小服務端的I/O次數;
       生產者支持將數據壓縮後發送給broker,從而減小網絡傳輸代價,目前支持:GZIP或Snappy格式。

4.4.Partition

經過partition實現了並行處理和水平擴展,partition也是kafka並行處理的最小單位;

partitiom能夠處在不一樣的機器上,充分利用多機資源;

同一節點上的partitiom能夠位於多個目錄下,若是節點下有多個磁盤,能夠充分利用多磁盤優點。

4.5.ISR 

ISR實現了可用性和一致性的動態平衡;

ISR容忍了更多節點的失敗;

可配置化replica crash處理策略。

相關文章
相關標籤/搜索