Kafka如何作到1秒處理1500萬條消息

來源:51CTO技術棧(ID:blog51cto)git

Apache Kafka是一款流行的分佈式數據流平臺,它已經普遍地被諸如New Relic(數據智能平臺)、Uber、Square(移動支付公司)等大型公司用來構建可擴展的、高吞吐量的、高可靠的實時數據流系統。github

例如,在New Relic的生產環境中,Kafka羣集每秒可以處理超過1500萬條消息,並且其數據聚合率接近1Tbps。可見,Kafka大幅簡化了對於數據流的處理,所以它也得到了衆多應用開發人員和數據管理專家的青睞。算法

然而,在大型系統中Kafka的應用會比較複雜。若是你的Consumers沒法跟上數據流的話,各類消息每每在未被查看以前就已經消失掉了。apache

同時,它在自動化數據保留方面的限制,高流量的發佈+訂閱(publish-subscribe,pub/sub)模式等,可能都會影響到系統的性能。能夠絕不誇張地說,若是那些存放着數據流的系統沒法按需擴容、或穩定性不可靠的話,估計你們常常會寢食難安。緩存

爲了減小上述複雜性,我在此分享New Relic公司爲Kafka集羣在應對高吞吐量方面的20項最佳實踐。網絡

我將從以下四個方面進行展開:架構

Partitions(分區)負載均衡

Consumers(消費者)socket

Producers(生產者)分佈式

Brokers(代理)

1、快速瞭解Kafka的概念與架構
Kafka是一種高效的分佈式消息系統。在性能上,它具備內置的數據冗餘度與彈性,也具備高吞吐能力和可擴展性。

在功能上,它支持自動化的數據保存限制,可以以「流」的方式爲應用提供數據轉換,以及按照「鍵-值(key-value)」的建模關係「壓縮」數據流。

要了解各類最佳實踐,首先須要熟悉以下關鍵術語:

Message(消息)

Kafka中的一條記錄或數據單位。每條消息都有一個鍵和對應的一個值,有時還會有可選的消息頭。

Producer(生產者)

Producer將消息發佈到Kafka的topics上。Producer決定向topic分區的發佈方式,如:輪詢的隨機方法、或基於消息鍵(key)的分區算法。

Broker(代理)

Kafka以分佈式系統或集羣的方式運行,那麼羣集中的每一個節點稱爲一個Broker。

Topic(主題)

Topic是那些被髮布的數據記錄或消息的一種類別。消費者經過訂閱Topic來讀取寫給它們的數據。

Topic Partition(主題分區)

不一樣的Topic被分爲不一樣的分區,而每一條消息都會被分配一個Offset,一般每一個分區都會被複制至少一到兩次。

每一個分區都有一個Leader和存放在各個Follower上的一到多個副本(即:數據的副本),此法可防止某個Broker的失效。

羣集中的全部Broker均可以做爲Leader和Follower,可是一個Broker最多隻能有一個Topic Partition的副本。Leader可被用來進行全部的讀寫操做。

Offset(偏移量)

單個分區中的每一條消息都被分配一個Offset,它是一個單調遞增的整型數,可用來做爲分區中消息的惟一標識符。

Consumer(消費者)

Consumer經過訂閱Topic partition,來讀取Kafka的各類Topic消息。而後,消費類應用處理會收到消息,以完成指定的工做。

Consumer group(消費組)

Consumer能夠按照Consumer group進行邏輯劃分。Topic Partition被均衡地分配給組中的全部Consumers。

所以,在同一個Consumer group中,全部的Consumer都以負載均衡的方式運做。

換言之,同一組中的每個Consumer都能羣組看到分配給他的相應分區的全部消息。若是某個Consumer處於「離線」狀態的話,那麼該分區將會被分配給同組中的另外一個Consumer。這就是所謂的「再均衡(rebalance)」。

固然,若是組中的Consumer多於分區數,則某些Consumer將會處於閒置的狀態。

相反,若是組中的Consumer少於分區數,則某些Consumer會得到來自一個以上分區的消息。

Lag(延遲)

當Consumer的速度跟不上消息的產生速度時,Consumer就會由於沒法從分區中讀取消息,而產生延遲。

延遲表示爲分區頭後面的Offset數量。從延遲狀態(到「追遇上來」)恢復正常所須要的時間,取決於Consumer每秒可以應對的消息速度。

其公式以下:time=messages/(consume rate per second - produce rate per second)

1針對Partitions
1)瞭解分區的數據速率,以確保提供合適的數據保存空間

此處所謂「分區的數據速率」是指數據的生成速率。換言之,它是由「平均消息大小」乘以「每秒消息數」得出的數據速率決定了在給定時間內,所能保證的數據保存空間的大小(以字節爲單位)。

若是你不知道數據速率的話,則沒法正確地計算出知足基於給定時間跨度的數據,所須要保存的空間大小。

同時,數據速率也可以標識出單個Consumer在不產生延時的狀況下,所須要支持的最低性能值。

2)除非有其餘架構上的須要,不然在寫Topic時請使用隨機分區

在進行大型操做時,各個分區在數據速率上的良莠不齊是很是難以管理的。

其緣由來自於以下三個方面:

首先,「熱」(有較高吞吐量)分區上的Consumer勢必會比同組中的其餘Consumer處理更多的消息,所以極可能會致使出如今處理上和網絡上的瓶頸。

其次,那些爲具備最高數據速率的分區,所配置的最大保留空間,會致使Topic中其餘分區的磁盤使用量也作相應地增加。

第三,根據分區的Leader關係所實施的最佳均衡方案,比簡單地將Leader關係分散到全部Broker上,要更爲複雜。在同一Topic中,「熱」分區會「承載」10倍於其餘分區的權重。

有關Topic Partition的使用,能夠參閱《Kafka Topic Partition的各類有效策略》

參考連接:

https://blog.newrelic.com/eng...

2針對Consumers
3)若是Consumers運行的是比Kafka 0.10還要舊的版本,那麼請立刻升級

在0.8.x版中,Consumer使用Apache ZooKeeper來協調Consumer group,而許多已知的Bug會致使其長期處於再均衡狀態,或是直接致使再均衡算法的失敗(咱們稱之爲「再均衡風暴」)。

所以在再均衡期間,一個或多個分區會被分配給同一組中的每一個Consumer。

而在再均衡風暴中,分區的全部權會持續在各個Consumers之間流轉,這反而阻礙了任何一個Consumer去真正獲取分區的全部權。

4)調優Consumer的套接字緩衝區(socket buffers),以應對數據的高速流入

在Kafka的0.10.x版本中,參數receive.buffer.bytes的默認值爲64KB。而在Kafka的0.8.x版本中,參數socket.receive.buffer.bytes的默認值爲100KB。

這兩個默認值對於高吞吐量的環境而言都過小了,特別是若是Broker和Consumer之間的網絡帶寬延遲積(bandwidth-delay product)大於局域網(local areanetwork,LAN)時。

對於延遲爲1毫秒或更多的高帶寬的網絡(如10Gbps或更高),請考慮將套接字緩衝區設置爲8或16MB。

若是內存不足,也至少考慮設置爲1MB。固然,也能夠設置爲-1,它會讓底層操做系統根據網絡的實際狀況,去調整緩衝區的大小。

可是,對於須要啓動「熱」分區的Consumers來講,自動調整可能不會那麼快。

5)設計具備高吞吐量的Consumers,以便按需實施背壓(back-pressure)

一般,咱們應該保證系統只去處理其能力範圍內的數據,而不要超負荷「消費」,進而致使進程中斷「掛起」,或出現Consume group的溢出。

若是是在Java虛擬機(JVM)中運行,Consumers應當使用固定大小的緩衝區,並且最好是使用堆外內存(off-heap)。

請參見Disruptor模式:

http://lmax-exchange.github.i...

固定大小的緩衝區可以阻止Consumer將過多的數據拉到堆棧上,以致於JVM花費掉其全部的時間去執行垃圾回收,進而沒法履行其處理消息的本質工做。

6)在JVM上運行各類Consumers時,請警戒垃圾回收對它們可能產生的影響

例如,長時間垃圾回收的停滯,可能致使ZooKeeper的會話被丟棄、或Consumer group處於再均衡狀態。

對於Broker來講也如此,若是垃圾回收停滯的時間太長,則會產生集羣掉線的風險。

3針對Producers
7)配置Producer,以等待各類確認

籍此Producer可以獲知消息是否真正被髮送到了Broker的分區上。在Kafka的0.10.x版本上,其設置是Acks;而在0.8.x版本上,則爲request.required.acks。

Kafka經過複製,來提供容錯功能,所以單個節點的故障、或分區Leader關係的更改不會影響到系統的可用性。

若是沒有用Acks來配置Producer(或稱「fireand forget」)的話,則消息可能會悄然丟失。

8)爲各個Producer配置Retries

其默認值爲3,固然是很是低的。不過,正確的設定值取決於你的應用程序,即:就那些對於數據丟失零容忍的應用而言,請考慮設置爲Integer.MAX_VALUE(有效且最大)。

這樣將可以應對Broker的Leader分區出現沒法馬上響應Produce請求的狀況。

9)爲高吞吐量的Producer,調優緩衝區的大小

特別是buffer.memory和batch.size(以字節爲單位)。因爲batch.size是按照分區設定的,而Producer的性能和內存的使用量,均可以與Topic中的分區數量相關聯。

所以,此處的設定值將取決於以下幾個因素:

Producer數據速率(消息的大小和數量);

要生成的分區數;

可用的內存量。

請記住,將緩衝區調大並不老是好事,若是Producer因爲某種緣由而失效了(例如,某個Leader的響應速度比確認還要慢),那麼在堆內內存(on-heap)中的緩衝的數據量越多,其須要回收的垃圾也就越多。

10)檢測應用程序,以跟蹤諸如生成的消息數、平均消息大小、以及已使用的消息數等指標

4針對Brokers
11)在各個Brokers上,請壓縮Topics所需的內存和CPU資

日誌壓縮須要各個Broker上的堆棧(內存)和CPU週期都能成功地配合實現,而若是讓那些失敗的日誌壓縮數據持續增加的話,則會給Brokers分區帶來風險。

請參見:

https://kafka.apache.org/docu...

你能夠在Broker上調整log.cleaner.dedupe.buffer.size和log.cleaner.threads這兩個參數,可是請記住,這兩個值都會影響到各個Brokers上的堆棧使用。

若是某個Broker拋出OutOfMemoryError異常,那麼它將會被關閉、並可能形成數據的丟失。

而緩衝區的大小和線程的計數,則取決於須要被清除的Topic Partition數量、以及這些分區中消息的數據速率與密鑰的大小。

對於Kafka的0.10.2.1版本而言,經過ERROR條目來監控日誌清理程序的日誌文件,是檢測其線程可能出現問題的最可靠方法。

12)經過網絡吞吐量來監控Brokers

請監控發向(transmit,TX)和收向(receive,RX)的流量,以及磁盤的I/O、磁盤的空間和CPU的使用率,並且容量規劃是維護羣集總體性能的關鍵步驟。

13)在羣集的各個Brokers之間分配分區的Leader關係

Leader一般會須要大量的網絡I/O資源。例如,當咱們將複製因子(replication factor)配置爲三、並運行起來時。

Leader必須首先獲取分區的數據,而後將兩套副本發送給另兩個Followers,進而再傳輸到多個須要該數據的Consumers上。

所以在該例子中,單個Leader所使用的網絡I/O,至少是Follower的四倍。並且,Leader還可能須要對磁盤進行讀操做,而Follower只需進行寫操做。

14)不要忽略監控Brokers的in-sync replica(ISR)shrinks、under-replicatedpartitions和unpreferred leaders

這些都是集羣中潛在問題的跡象。例如,單個分區頻繁出現ISR收縮,則暗示着該分區的數據速率超過了Leader的能力,已沒法爲Consumer和其餘副本線程提供服務了。

15)按需修改Apache Log4j的各類屬性

詳細內容能夠參考:

https://github.com/apache/kaf...

Kafka的Broker日誌記錄會耗費大量的磁盤空間,可是咱們卻不能徹底關閉它。

由於有時在發生事故以後,須要重建事件序列,那麼Broker日誌就會是咱們最好的、甚至是惟一的方法。

16)禁用Topic的自動建立,或針對那些未被使用的Topics創建清除策略

例如,在設定的x天內,若是未出現新的消息,你應該考慮該Topic是否已經失效,並將其從羣集中予以刪除。此舉可避免花時間去管理羣集中被額外建立的元數據。

17)對於那些具備持續高吞吐量的Brokers,請提供足夠的內存,以免它們從磁盤子系統中進行讀操做

咱們應儘量地直接從操做系統的緩存中直接獲取分區的數據。然而,這就意味着你必須確保本身的Consumers可以跟得上「節奏」,而對於那些延遲的Consumer就只能強制Broker從磁盤中讀取了。

18)對於具備高吞吐量服務級別目標(service level objectives,SLOs)的大型羣集,請考慮爲Brokers的子集隔離出不一樣的Topic

至於如何肯定須要隔離的Topics,則徹底取決於本身的業務須要。例如,你有一些使用相同羣集的聯機事務處理(multipleonline transaction processing,OLTP)系統。

那麼將每一個系統的Topics隔離到不一樣Brokers子集中,則可以有助於限制潛在事件的影響半徑。

19)在舊的客戶端上使用新的Topic消息格式。應當代替客戶端,在各個Brokers上加載額外的格式轉換服務

固然,最好仍是要儘可能避免這種狀況的發生

20)不要錯誤地認爲在本地主機上測試好Broker,就能表明生產環境中的真實性能了

要知道,若是使用複製因子爲1,並在環回接口上對分區所作的測試,是與大多數生產環境大相徑庭的。

在環回接口上網絡延遲幾乎能夠被忽略的,而在不涉及到複製的狀況下,接收Leader確認所需的時間則一樣會出現巨大的差別。

2、總結但願上述各項建議可以有助於你們更有效地去使用Kafka。若是你想提升本身在Kafka方面的專業知識,請進一步查閱Kafka配套文檔中的「操做」部分,其中包含了有關操做羣集等實用信息

相關文章
相關標籤/搜索