Kafka萬億級消息實戰

1、Kafka應用

本文主要總結當Kafka集羣流量達到 萬億級記錄/天或者十萬億級記錄/天  甚至更高後,咱們須要具有哪些能力才能保障集羣高可用、高可靠、高性能、高吞吐、安全的運行。html

這裏總結內容主要針對Kafka2.1.1版本,包括集羣版本升級、數據遷移、流量限制、監控告警、負載均衡、集羣擴/縮容、資源隔離、集羣容災、集羣安全、性能優化、平臺化、開源版本缺陷、社區動態等方面。本文主要是介紹核心脈絡,不作過多細節講解。下面咱們先來看看Kafka做爲數據中樞的一些核心應用場景。java

下圖展現了一些主流的數據處理流程,Kafka起到一個數據中樞的做用。node

接下來看看咱們Kafka平臺總體架構;git

1.1 版本升級

1.1.1  開源版本如何進行版本滾動升級與回退

官網地址:http://kafka.apache.orggithub

1.1.1.2 源碼改造如何升級與回退apache

因爲在升級過程當中,必然出現新舊代碼邏輯交替的狀況。集羣內部部分節點是開源版本,另一部分節點是改造後的版本。因此,須要考慮在升級過程當中,新舊代碼混合的狀況,如何兼容以及出現故障時如何回退。bootstrap

1.2 數據遷移

因爲Kafka集羣的架構特色,這必然致使集羣內流量負載不均衡的狀況,因此咱們須要作一些數據遷移來實現集羣不一樣節點間的流量均衡。Kafka開源版本爲數據遷移提供了一個腳本工具「bin/kafka-reassign-partitions.sh」,若是本身沒有實現自動負載均衡,可使用此腳本。api

開源版本提供的這個腳本生成遷移計劃徹底是人工干預的,當集羣規模很是大時,遷移效率變得很是低下,通常以天爲單位進行計算。固然,咱們能夠實現一套自動化的均衡程序,當負載均衡實現自動化之後,基本使用調用內部提供的API,由程序去幫咱們生成遷移計劃及執行遷移任務。須要注意的是,遷移計劃有指定數據目錄和不指定數據目錄兩種,指定數據目錄的須要配置ACL安全認證。緩存

官網地址:http://kafka.apache.org安全

1.2.1 broker間數據遷移

不指定數據目錄

//未指定遷移目錄的遷移計劃
{
    "version":1,
    "partitions":[
        {"topic":"yyj4","partition":0,"replicas":[1000003,1000004]},
        {"topic":"yyj4","partition":1,"replicas":[1000003,1000004]},
        {"topic":"yyj4","partition":2,"replicas":[1000003,1000004]}
    ]
}

指定數據目錄

//指定遷移目錄的遷移計劃
{
    "version":1,
    "partitions":[
        {"topic":"yyj1","partition":0,"replicas":[1000006,1000005],"log_dirs":["/data1/bigdata/mydata1","/data1/bigdata/mydata3"]},
        {"topic":"yyj1","partition":1,"replicas":[1000006,1000005],"log_dirs":["/data1/bigdata/mydata1","/data1/bigdata/mydata3"]},
        {"topic":"yyj1","partition":2,"replicas":[1000006,1000005],"log_dirs":["/data1/bigdata/mydata1","/data1/bigdata/mydata3"]}
    ]
}

1.2.2 broker內部磁盤間數據遷移

生產環境的服務器通常都是掛載多塊硬盤,好比4塊/12塊等;那麼可能出如今Kafka集羣內部,各broker間流量比較均衡,可是在broker內部,各磁盤間流量不均衡,致使部分磁盤過載,從而影響集羣性能和穩定,也沒有較好的利用硬件資源。在這種狀況下,咱們就須要對broker內部多塊磁盤的流量作負載均衡,讓流量更均勻的分佈到各磁盤上。

1.2.3 併發數據遷移

當前Kafka開源版本(2.1.1版本)提供的副本遷移工具「bin/kafka-reassign-partitions.sh」在同一個集羣內只能實現遷移任務的串行。對於集羣內已經實現多個資源組物理隔離的狀況,因爲各資源組不會相互影響,可是卻不能友好的進行並行的提交遷移任務,遷移效率有點低下,這種不足直到2.6.0版本才得以解決。若是須要實現併發數據遷移,能夠選擇升級Kafka版本或者修改Kafka源碼。

1.2.4 終止數據遷移

當前Kafka開源版本(2.1.1版本)提供的副本遷移工具「bin/kafka-reassign-partitions.sh」在啓動遷移任務後,沒法終止遷移。當遷移任務對集羣的穩定性或者性能有影響時,將變得一籌莫展,只能等待遷移任務執行完畢(成功或者失敗),這種不足直到2.6.0版本才得以解決。若是須要實現終止數據遷移,能夠選擇升級Kafka版本或者修改Kafka源碼。

1.3 流量限制

1.3.1 生產消費流量限制

常常會出現一些突發的,不可預測的異常生產或者消費流量會對集羣的IO等資源產生巨大壓力,最終影響整個集羣的穩定與性能。那麼咱們能夠對用戶的生產、消費、副本間數據同步進行流量限制,這個限流機制並非爲了限制用戶,而是避免突發的流量影響集羣的穩定和性能,給用戶能夠更好的服務。

以下圖所示,節點入流量由140MB/s左右突增到250MB/s,而出流量則從400MB/s左右突增至800MB/s。若是沒有限流機制,那麼集羣的多個節點將有被這些異常流量打掛的風險,甚至形成集羣雪崩。

圖片生產/消費流量限制官網地址:點擊連接

對於生產者和消費者的流量限制,官網提供瞭如下幾種維度組合進行限制(固然,下面限流機制存在必定缺陷,後面在「Kafka開源版本功能缺陷」咱們將提到):

/config/users/<user>/clients/<client-id> //根據用戶和客戶端ID組合限流
/config/users/<user>/clients/<default>
/config/users/<user>//根據用戶限流 這種限流方式是咱們最經常使用的方式
/config/users/<default>/clients/<client-id>
/config/users/<default>/clients/<default>
/config/users/<default>
/config/clients/<client-id>
/config/clients/<default>

在啓動Kafka的broker服務時須要開啓JMX參數配置,方便經過其餘應用程序採集Kafka的各項JMX指標進行服務監控。當用戶須要調整限流閾值時,根據單個broker所能承受的流量進行智能評估,無需人工干預判斷是否能夠調整;對於用戶流量限制,主要須要參考的指標包括如下兩個:

(1)消費流量指標:ObjectName:kafka.server:type=Fetch,user=acl認證用戶名稱 屬性:byte-rate(用戶在當前broker的出流量)、throttle-time(用戶在當前broker的出流量被限制時間)
(2)生產流量指標:ObjectName:kafka.server:type=Produce,user=acl認證用戶名稱 屬性:byte-rate(用戶在當前broker的入流量)、throttle-time(用戶在當前broker的入流量被限制時間)

1.3.2 follower同步leader/數據遷移流量限制

副本遷移/數據同步流量限制官網地址:連接

涉及參數以下:

//副本同步限流配置共涉及如下4個參數
leader.replication.throttled.rate
follower.replication.throttled.rate
leader.replication.throttled.replicas
follower.replication.throttled.replicas

輔助指標以下:

(1)副本同步出流量指標:ObjectName:kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesOutPerSec
(2)副本同步入流量指標:ObjectName:kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesInPerSec

1.4 監控告警

關於Kafka的監控有一些開源的工具可用使用,好比下面這幾種:

Kafka Manager

Kafka Eagle

Kafka Monitor

KafkaOffsetMonitor;

咱們已經把Kafka Manager做爲咱們查看一些基本指標的工具嵌入平臺,然而這些開源工具不能很好的融入到咱們本身的業務系統或者平臺上。因此,咱們須要本身去實現一套粒度更細、監控更智能、告警更精準的系統。其監控覆蓋範圍應該包括基礎硬件、操做系統(操做系統偶爾出現系統進程hang住狀況,致使broker假死,沒法正常提供服務)、Kafka的broker服務、Kafka客戶端應用程序、zookeeper集羣、上下游全鏈路監控。

1.4.1 硬件監控

網絡監控:

核心指標包括網絡入流量、網絡出流量、網絡丟包、網絡重傳、處於TIME.WAIT的TCP鏈接數、交換機、機房帶寬、DNS服務器監控(若是DNS服務器異常,可能出現流量黑洞,引發大面積業務故障)等。

磁盤監控:

核心指標包括監控磁盤write、磁盤read(若是消費時沒有延時,或者只有少許延時,通常都沒有磁盤read操做)、磁盤ioutil、磁盤iowait(這個指標若是太高說明磁盤負載較大)、磁盤存儲空間、磁盤壞盤、磁盤壞塊/壞道(壞道或者壞塊將致使broker處於半死不活狀態,因爲有crc校驗,消費者將被卡住)等。

CPU監控:

監控CPU空閒率/負載,主板故障等,一般CPU使用率比較低不是Kafka的瓶頸。

內存/交換區監控:

內存使用率,內存故障。通常狀況下,服務器上除了啓動Kafka的broker時分配的堆內存之外,其餘內存基本所有被用來作PageCache。

緩存命中率監控:

因爲是否讀磁盤對Kafka的性能影響很大,因此咱們須要監控Linux的PageCache緩存命中率,若是緩存命中率高,則說明消費者基本命中緩存。

詳細內容請閱讀文章:《Linux Page Cache調優在Kafka中的應用》。

系統日誌:

咱們須要對操做系統的錯誤日誌進行監控告警,及時發現一些硬件故障。

1.4.2 broker服務監控

broker服務的監控,主要是經過在broker服務啓動時指定JMX端口,而後經過實現一套指標採集程序去採集JMX指標。(服務端指標官網地址

broker級監控:broker進程、broker入流量字節大小/記錄數、broker出流量字節大小/記錄數、副本同步入流量、副本同步出流量、broker間流量誤差、broker鏈接數、broker請求隊列數、broker網絡空閒率、broker生產延時、broker消費延時、broker生產請求數、broker消費請求數、broker上分佈leader個數、broker上分佈副本個數、broker上各磁盤流量、broker GC等。

topic級監控:topic入流量字節大小/記錄數、topic出流量字節大小/記錄數、無流量topic、topic流量突變(突增/突降)、topic消費延時。

partition級監控:分區入流量字節大小/記錄數、分區出流量字節大小/記錄數、topic分區副本缺失、分區消費延遲記錄、分區leader切換、分區數據傾斜(生產消息時,若是指定了消息的key容易形成數據傾斜,這嚴重影響Kafka的服務性能)、分區存儲大小(能夠治理單分區過大的topic)。

用戶級監控:用戶出/入流量字節大小、用戶出/入流量被限制時間、用戶流量突變(突增/突降)。

broker服務日誌監控:對server端打印的錯誤日誌進行監控告警,及時發現服務異常。

1.4.3.客戶端監控

客戶端監控主要是本身實現一套指標上報程序,這個程序須要實現

org.apache.kafka.common.metrics.MetricsReporter 接口。而後在生產者或者消費者的配置中加入配置項 metric.reporters,以下所示:

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 
//ClientMetricsReporter類實現org.apache.kafka.common.metrics.MetricsReporter接口
props.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, ClientMetricsReporter.class.getName());
...

客戶端指標官網地址:

http://kafka.apache.org/21/documentation.html#selector_monitoring

http://kafka.apache.org/21/documentation.html#common\_node\_monitoring

http://kafka.apache.org/21/documentation.html#producer_monitoring

http://kafka.apache.org/21/documentation.html#producer\_sender\_monitoring

http://kafka.apache.org/21/documentation.html#consumer_monitoring

http://kafka.apache.org/21/documentation.html#consumer\_fetch\_monitoring

客戶端監控流程架構以下圖所示:

1.4.3.1 生產者客戶端監控

維度:用戶名稱、客戶端ID、客戶端IP、topic名稱、集羣名稱、brokerIP;

指標:鏈接數、IO等待時間、生產流量大小、生產記錄數、請求次數、請求延時、發送錯誤/重試次數等。

1.4.3.2 消費者客戶端監控

維度:用戶名稱、客戶端ID、客戶端IP、topic名稱、集羣名稱、消費組、brokerIP、topic分區;

指標:鏈接數、io等待時間、消費流量大小、消費記錄數、消費延時、topic分區消費延遲記錄等。

1.4.4 Zookeeper監控

  1. Zookeeper進程監控;
  2. Zookeeper的leader切換監控;
  3. Zookeeper服務的錯誤日誌監控;

1.4.5 全鏈路監控

當數據鏈路很是長的時候(好比:業務應用->埋點SDk->數據採集->Kafka->實時計算->業務應用),咱們定位問題一般須要通過多個團隊反覆溝通與排查才能發現問題到底出如今哪一個環節,這樣排查問題效率比較低下。在這種狀況下,咱們就須要與上下游一塊兒梳理整個鏈路的監控。出現問題時,第一時間定位問題出如今哪一個環節,縮短問題定位與故障恢復時間。

1.5 資源隔離

1.5.1 相同集羣不一樣業務資源物理隔離

咱們對全部集羣中不一樣對業務進行資源組物理隔離,避免各業務之間相互影響。在這裏,咱們假設集羣有4個broker節點(Broker1/Broker2/Broker3/Broker4),2個業務(業務A/業務B),他們分別擁有topic分區分佈以下圖所示,兩個業務topic都分散在集羣的各個broker上,而且在磁盤層面也存在交叉。

試想一下,若是咱們其中一個業務異常,好比流量突增,致使broker節點異常或者被打掛。那麼這時候另一個業務也將受到影響,這樣將大大的影響了咱們服務的可用性,形成故障,擴大了故障影響範圍。

針對這些痛點,咱們能夠對集羣中的業務進行物理資源隔離,各業務獨享資源,進行資源組劃分(這裏把4各broker劃分爲Group1和Group2兩個資源組)以下圖所示,不一樣業務的topic分佈在本身的資源組內,當其中一個業務異常時,不會波及另一個業務,這樣就能夠有效的縮小咱們的故障範圍,提升服務可用性。

1.6 集羣歸類

咱們把集羣根據業務特色進行拆分爲日誌集羣、監控集羣、計費集羣、搜索集羣、離線集羣、在線集羣等,不一樣場景業務放在不一樣集羣,避免不一樣業務相互影響。

1.7 擴容/縮容

1.7.1 topic擴容分區

隨着topic數據量增加,咱們最初建立的topic指定的分區個數可能已經沒法知足數量流量要求,因此咱們須要對topic的分區進行擴展。擴容分區時須要考慮一下幾點:

必須保證topic分區leader與follower輪詢的分佈在資源組內全部broker上,讓流量分佈更加均衡,同時須要考慮相同分區不一樣副本跨機架分佈以提升容災能力;

當topic分區leader個數除以資源組節點個數有餘數時,須要把餘數分區leader優先考慮放入流量較低的broker。

1.7.2 broker上線

隨着業務量增多,數據量不斷增大,咱們的集羣也須要進行broker節點擴容。關於擴容,咱們須要實現如下幾點:

擴容智能評估:根據集羣負載,把是否須要擴容評估程序化、智能化;

智能擴容:當評估須要擴容後,把擴容流程以及流量均衡平臺化。

1.7.3 broker下線

某些場景下,咱們須要下線咱們的broker,主要包括如下幾個場景:

一些老化的服務器須要下線,實現節點下線平臺化;

服務器故障,broker故障沒法恢復,咱們須要下線故障服務器,實現節點下線平臺化;

有更優配置的服務器替換已有broker節點,實現下線節點平臺化。

1.8 負載均衡

咱們爲何須要負載均衡呢?首先,咱們來看第一張圖,下圖是咱們集羣某個資源組剛擴容後的流量分佈狀況,流量沒法自動的分攤到咱們新擴容後的節點上。那麼這個時候須要咱們手動去觸發數據遷移,把部分副本遷移至新節點上才能實現流量均衡。

下面,咱們來看一下第二張圖。這張圖咱們能夠看出流量分佈很是不均衡,最低和最高流量誤差數倍以上。這和Kafka的架構特色有關,當集羣規模與數據量達到必定量後,必然出現當問題。這種狀況下,咱們也須要進行負載均衡。

咱們再來看看第三張圖。這裏咱們能夠看出出流量只有部分節點突增,這就是topic分區在集羣內部不夠分散,集中分佈到了某幾個broker致使,這種狀況咱們也須要進行擴容分區和均衡。

咱們比較理想的流量分佈應該以下圖所示,各節點間流量誤差很是小,這種狀況下,既能夠加強集羣扛住流量異常突增的能力又能夠提高集羣總體資源利用率和服務穩定性,下降成本。

負載均衡咱們須要實現如下效果:

1)生成副本遷移計劃以及執行遷移任務平臺化、自動化、智能化;

2)執行均衡後broker間流量比較均勻,且單個topic分區均勻分佈在全部broker節點上;

3)執行均衡後broker內部多塊磁盤間流量比較均衡;

要實現這個效果,咱們須要開發一套本身的負載均衡工具,如對開源的 cruise control進行二次開發;此工具的核心主要在生成遷移計劃的策略,遷移計劃的生成方案直接影響到最後集羣負載均衡的效果。參考內容:

1. linkedIn/cruise-control

2. Introduction to Kafka Cruise Control

3. Cloudera Cruise Control REST API Reference

cruise control架構圖以下:

在生成遷移計劃時,咱們須要考慮如下幾點:

1)選擇核心指標做爲生成遷移計劃的依據,好比出流量、入流量、機架、單topic分區分散性等;

2)優化用來生成遷移計劃的指標樣本,好比過濾流量突增/突降/掉零等異常樣本;

3)各資源組的遷移計劃須要使用的樣本所有爲資源組內部樣本,不涉及其餘資源組,無交叉;

4)治理單分區過大topic,讓topic分區分佈更分散,流量不集中在部分broker,讓topic單分區數據量更小,這樣能夠減小遷移的數據量,提高遷移速度;

5)已經均勻分散在資源組內的topic,加入遷移黑名單,不作遷移,這樣能夠減小遷移的數據量,提高遷移速度;

6)作topic治理,排除長期無流量topic對均衡的干擾;

7)新建topic或者topic分區擴容時,應讓全部分區輪詢分佈在全部broker節點,輪詢後餘數分區優先分佈流量較低的broker;

8)擴容broker節點後開啓負載均衡時,優先把同一broker分配了同一大流量(流量大而不是存儲空間大,這裏能夠認爲是每秒的吞吐量)topic多個分區leader的,遷移一部分到新broker節點;

9)提交遷移任務時,同一批遷移計劃中的分區數據大小誤差應該儘量小,這樣能夠避免遷移任務中小分區遷移完成後長時間等待大分區的遷移,形成任務傾斜;

1.9 安全認證

是否是咱們的集羣全部人均可以隨意訪問呢?固然不是,爲了集羣的安全,咱們須要進行權限認證,屏蔽非法操做。主要包括如下幾個方面須要作安全認證:

(1)生產者權限認證;

(2)消費者權限認證;

(3)指定數據目錄遷移安全認證;

官網地址:http://kafka.apache.org

1.10 集羣容災

跨機架容災:

官網地址:http://kafka.apache.org

跨集羣/機房容災:若是有異地雙活等業務場景時,能夠參考Kafka2.7版本的MirrorMaker 2.0。

GitHub地址:https://github.com

精確KIP地址 :https://cwiki.apache.org

ZooKeeper集羣上Kafka元數據恢復:咱們會按期對ZooKeeper上的權限信息數據作備份處理,當集羣元數據異常時用於恢復。

1.11 參數/配置優化

broker服務參數優化:這裏我只列舉部分影響性能的核心參數。

num.network.threads
#建立Processor處理網絡請求線程個數,建議設置爲broker當CPU核心數*2,這個值過低常常出現網絡空閒過低而缺失副本。
 
num.io.threads
#建立KafkaRequestHandler處理具體請求線程個數,建議設置爲broker磁盤個數*2
 
num.replica.fetchers
#建議設置爲CPU核心數/4,適當提升能夠提高CPU利用率及follower同步leader數據當並行度。
 
compression.type
#建議採用lz4壓縮類型,壓縮能夠提高CPU利用率同時能夠減小網絡傳輸數據量。
 
queued.max.requests
#若是是生產環境,建議配置最少500以上,默認爲500。
 
log.flush.scheduler.interval.ms
log.flush.interval.ms
log.flush.interval.messages
#這幾個參數表示日誌數據刷新到磁盤的策略,應該保持默認配置,刷盤策略讓操做系統去完成,由操做系統來決定何時把數據刷盤;
#若是設置來這個參數,可能對吞吐量影響很是大;
 
auto.leader.rebalance.enable
#表示是否開啓leader自動負載均衡,默認true;咱們應該把這個參數設置爲false,由於自動負載均衡不可控,可能影響集羣性能和穩定;

生產優化:這裏我只列舉部分影響性能的核心參數。

linger.ms
#客戶端生產消息等待多久時間才發送到服務端,單位:毫秒。和batch.size參數配合使用;適當調大能夠提高吞吐量,可是若是客戶端若是down機有丟失數據風險;
 
batch.size
#客戶端發送到服務端消息批次大小,和linger.ms參數配合使用;適當調大能夠提高吞吐量,可是若是客戶端若是down機有丟失數據風險;
 
compression.type
#建議採用lz4壓縮類型,具有較高的壓縮比及吞吐量;因爲Kafka對CPU的要求並不高,因此,能夠經過壓縮,充分利用CPU資源以提高網絡吞吐量;
 
buffer.memory
#客戶端緩衝區大小,若是topic比較大,且內存比較充足,能夠適當調高這個參數,默認只爲33554432(32MB)
 
retries
#生產失敗後的重試次數,默認0,能夠適當增長。當重試超過必定次數後,若是業務要求數據準確性較高,建議作容錯處理。
 
retry.backoff.ms
#生產失敗後,重試時間間隔,默認100ms,建議不要設置太大或者過小。

除了一些核心參數優化外,咱們還須要考慮好比topic的分區個數和topic保留時間;若是分區個數太少,保留時間太長,可是寫入數據量很是大的話,可能形成如下問題:

1)topic分區集中落在某幾個broker節點上,致使流量副本失衡;

2)致使broker節點內部某幾塊磁盤讀寫超負載,存儲被寫爆;

1.11.1 消費優化

消費最大的問題,而且常常出現的問題就是消費延時,拉歷史數據。當大量拉取歷史數據時將出現大量讀盤操做,污染pagecache,這個將加劇磁盤的負載,影響集羣性能和穩定;

能夠怎樣減小或者避免大量消費延時呢?

1)當topic數據量很是大時,建議一個分區開啓一個線程去消費;

2)對topic消費延時添加監控告警,及時發現處理;

3)當topic數據能夠丟棄時,遇到超大延時,好比單個分區延遲記錄超過千萬甚至數億,那麼能夠重置topic的消費點位進行緊急處理;【此方案通常在極端場景才使用】

4)避免重置topic的分區offset到很早的位置,這可能形成拉取大量歷史數據;

1.11.2 Linux服務器參數優化

咱們須要對Linux的文件句柄、pagecache等參數進行優化。可參考《Linux Page Cache調優在Kafka中的應用》。

1.12.硬件優化

磁盤優化

在條件容許的狀況下,能夠採用SSD固態硬盤替換HDD機械硬盤,解決機械盤IO性能較低的問題;若是沒有SSD固態硬盤,則能夠對服務器上的多塊硬盤作硬RAID(通常採用RAID10),讓broker節點的IO負載更加均衡。若是是HDD機械硬盤,一個broker能夠掛載多塊硬盤,好比 12塊*4TB。

內存

因爲Kafka屬於高頻讀寫型服務,而Linux的讀寫請求基本走的都是Page Cache,因此單節點內存大一些對性能會有比較明顯的提高。通常選擇256GB或者更高。

網絡

提高網絡帶寬:在條件容許的狀況下,網絡帶寬越大越好。由於這樣網絡帶寬纔不會成爲性能瓶頸,最少也要達到萬兆網絡( 10Gb,網卡爲全雙工)才能具有相對較高的吞吐量。若是是單通道,網絡出流量與入流量之和的上限理論值是1.25GB/s;若是是雙工雙通道,網絡出入流量理論值均可以達到1.25GB/s。

網絡隔離打標:因爲一個機房可能既部署有離線集羣(好比HBase、Spark、Hadoop等)又部署有實時集羣(如Kafka)。那麼實時集羣和離線集羣掛載到同一個交換機下的服務器將出現競爭網絡帶寬的問題,離線集羣可能對實時集羣形成影響。因此咱們須要進行交換機層面的隔離,讓離線機器和實時集羣不要掛載到相同的交換機下。即便有掛載到相同交換機下面的,咱們也將進行網絡通行優先級(金、銀、銅、鐵)標記,當網絡帶寬緊張的時候,讓實時業務優先通行。

CPU

Kafka的瓶頸不在CPU,單節點通常有32核的CPU都足夠使用。

1.13.平臺化

如今問題來了,前面咱們提到不少監控、優化等手段;難道咱們管理員或者業務用戶對集羣全部的操做都須要登陸集羣服務器嗎?答案固然是否認的,咱們須要豐富的平臺化功能來支持。一方面是爲了提高咱們操做的效率,另一方面也是爲了提高集羣的穩定和下降出錯的可能。

配置管理

黑屏操做,每次修改broker的server.properties配置文件都沒有變動記錄可追溯,有時可能由於有人修改了集羣配置致使一些故障,卻找不到相關記錄。若是咱們把配置管理作到平臺上,每次變動都有跡可循,同時下降了變動出錯的風險。

滾動重啓

當咱們須要作線上變動時,有時候須要對集羣對多個節點作滾動重啓,若是到命令行去操做,那效率將變得很低,並且須要人工去幹預,浪費人力。這個時候咱們就須要把這種重複性的工做進行平臺化,提高咱們的操做效率。

集羣管理

集羣管理主要是把原來在命令行的一系列操做作到平臺上,用戶和管理員再也不須要黑屏操做Kafka集羣;這樣作主要有如下優勢:

提高操做效率;

操做出錯機率更小,集羣更安全;

全部操做有跡可循,能夠追溯;

集羣管理主要包括:broker管理、topic管理、生產/消費權限管理、用戶管理等

1.13.1 mock功能

在平臺上爲用戶的topic提供生產樣例數據與消費抽樣的功能,用戶能夠不用本身寫代碼也能夠測試topic是否可使用,權限是否正常;

在平臺上爲用戶的topic提供生產/消費權限驗證功能,讓用戶能夠明確本身的帳號對某個topic有沒有讀寫權限;

1.13.2 權限管理

把用戶讀/寫權限管理等相關操做進行平臺化。

1.13.3 擴容/縮容

把broker節點上下線作到平臺上,全部的上線和下線節點再也不須要操做命令行。

1.13.4 集羣治理

1)無流量topic的治理,對集羣中無流量topic進行清理,減小過多無用元數據對集羣形成的壓力;

2)topic分區數據大小治理,把topic分區數據量過大的topic(如單分區數據量超過100GB/天)進行梳理,看看是否須要擴容,避免數據集中在集羣部分節點上;

3)topic分區數據傾斜治理,避免客戶端在生產消息的時候,指定消息的key,可是key過於集中,消息只集中分佈在部分分區,致使數據傾斜;

4)topic分區分散性治理,讓topic分區分佈在集羣儘量多的broker上,這樣能夠避免因topic流量突增,流量只集中到少數節點上的風險,也能夠避免某個broker異常對topic影響很是大;

5)topic分區消費延時治理;通常有延時消費較多的時候有兩種狀況,一種是集羣性能降低,另一種是業務方的消費併發度不夠,若是是消費者併發不夠的化應該與業務聯繫增長消費併發。

1.13.5 監控告警

1)把全部指標採集作成平臺可配置,提供統一的指標採集和指標展現及告警平臺,實現一體化監控;

2)把上下游業務進行關聯,作成全鏈路監控;

3)用戶能夠配置topic或者分區流量延時、突變等監控告警;

1.13.6 業務大屏

業務大屏主要指標:集羣個數、節點個數、日入流量大小、日入流量記錄、日出流量大小、日出流量記錄、每秒入流量大小、每秒入流量記錄、每秒出流量大小、每秒出流量記錄、用戶個數、生產延時、消費延時、數據可靠性、服務可用性、數據存儲大小、資源組個數、topic個數、分區個數、副本個數、消費組個數等指標。

1.13.7 流量限制

把用戶流量如今作到平臺,在平臺進行智能限流處理。

1.13.8 負載均衡

把自動負載均衡功能作到平臺,經過平臺進行調度和管理。

1.13.9 資源預算

當集羣達到必定規模,流量不斷增加,那麼集羣擴容機器從哪裏來呢?業務的資源預算,讓集羣裏面的多個業務根據本身在集羣中當流量去分攤整個集羣的硬件成本;固然,獨立集羣與獨立隔離的資源組,預算方式能夠單獨計算。

1.14.性能評估

1.14.1 單broker性能評估

咱們作單broker性能評估的目的包括如下幾方面:

1)爲咱們進行資源申請評估提供依據;

2)讓咱們更瞭解集羣的讀寫能力及瓶頸在哪裏,針對瓶頸進行優化;

3)爲咱們限流閾值設置提供依據;

4)爲咱們評估何時應該擴容提供依據;

1.14.2 topic分區性能評估

1)爲咱們建立topic時,評估應該指定多少分區合理提供依據;

2)爲咱們topic的分區擴容評估提供依據;

1.14.3 單磁盤性能評估

1)爲咱們瞭解磁盤的真正讀寫能力,爲咱們選擇更合適Kafka的磁盤類型提供依據;

2)爲咱們作磁盤流量告警閾值設置提供依據;

1.14.4 集羣規模限制摸底

1)咱們須要瞭解單個集羣規模的上限或者是元數據規模的上限,探索相關信息對集羣性能和穩定性的影響;

2)根據摸底狀況,評估集羣節點規模的合理範圍,及時預測風險,進行超大集羣的拆分等工做;

1.15 DNS+LVS的網絡架構

當咱們的集羣節點達到必定規模,好比單集羣數百個broker節點,那麼此時咱們生產消費客戶端指定bootstrap.servers配置時,若是指定呢?是隨便選擇其中幾個broker配置仍是所有都配上呢?

其實以上作法都不合適,若是隻配置幾個IP,當咱們配置當幾個broker節點下線後,咱們當應用將沒法鏈接到Kafka集羣;若是配置全部IP,那更不現實啦,幾百個IP,那麼咱們應該怎麼作呢?

方案:採用DNS+LVS網絡架構,最終生產者和消費者客戶端只須要配置域名就能夠啦。須要注意的是,有新節點加入集羣時,須要添加映射;有節點下線時,須要從映射中踢掉,不然這批機器若是拿到其餘的地方去使用,若是端口和Kafka的同樣的話,原來集羣部分請求將發送到這個已經下線的服務器上來,形成生產環境重點故障。

2、開源版本功能缺陷

RTMP協議主要的特色有:多路複用,分包和應用層協議。如下將對這些特色進行詳細的描述。

2.1 副本遷移

沒法實現增量遷移;【咱們已經基於2.1.1版本源碼改造,實現了增量遷移】

沒法實現併發遷移;【開源版本直到2.6.0才實現了併發遷移】

沒法實現終止遷移;【咱們已經基於2.1.1版本源碼改造,實現了終止副本遷移】【開源版本直到2.6.0才實現了暫停遷移,和終止遷移有些不同,不會回滾元數據】

當指定遷移數據目錄時,遷移過程當中,若是把topic保留時間改短,topic保留時間針對正在遷移topic分區不生效,topic分區過時數據沒法刪除;【開源版本bug,目前尚未修復】

當指定遷移數據目錄時,當遷移計劃爲如下場景時,整個遷移任務沒法完成遷移,一直處於卡死狀態;【開源版本bug,目前尚未修復】

遷移過程當中,若是有重啓broker節點,那個broker節點上的全部leader分區沒法切換回來,致使節點流量所有轉移到其餘節點,直到全部副本被遷移完畢後leader纔會切換回來;【開源版本bug,目前尚未修復】。

在原生的Kafka版本中存在如下指定數據目錄場景沒法遷移完畢的狀況,此版本咱們也不決定修復次bug:
 
1.針對同一個topic分區,若是部分目標副本相比原副本是所屬broker發生變化,部分目標副本相比原副本是broker內部所屬數據目錄發生變化;
那麼副本所屬broker發生變化的那個目標副本能夠正常遷移完畢,目標副本是在broker內部數據目錄發生變化的沒法正常完成遷移;
可是舊副本依然能夠正常提供生產、消費服務,而且不影響下一次遷移任務的提交,下一次遷移任務只須要把此topic分區的副本列表所屬broker列表變動後提交依然能夠正常完成遷移,而且能夠清理掉以前未完成的目標副本;
 
這裏假設topic yyj1的初始化副本分佈狀況以下:
 
{
"version":1,
"partitions":[
{"topic":"yyj","partition":0,"replicas":[1000003,1000001],"log_dirs":["/kfk211data/data31","/kfk211data/data13"]}
]
}
//遷移場景1:
{
"version":1,
"partitions":[
{"topic":"yyj","partition":0,"replicas":[1000003,1000002],"log_dirs":["/kfk211data/data32","/kfk211data/data23"]}
]
}
 
//遷移場景2:
{
"version":1,
"partitions":[
{"topic":"yyj","partition":0,"replicas":[1000002,1000001],"log_dirs":["/kfk211data/data22","/kfk211data/data13"]}
]
}
針對上述的topic yyj1的分佈分佈狀況,此時若是咱們的遷移計劃爲「遷移場景1」或遷移場景2「,那麼都將出現有副本沒法遷移完畢的狀況。
可是這並不影響舊副本處理生產、消費請求,而且咱們能夠正常提交其餘的遷移任務。
爲了清理舊的未遷移完成的副本,咱們只須要修改一次遷移計劃【新的目標副本列表和當前分區已分配副本列表徹底不一樣便可】,再次提交遷移便可。
 
這裏,咱們依然以上述的例子作遷移計劃修改以下:
{
"version":1,
"partitions":[
{"topic":"yyj","partition":0,"replicas":[1000004,1000005],"log_dirs":["/kfk211data/data42","/kfk211data/data53"]}
]
}
這樣咱們就能夠正常完成遷移。

2.2 流量協議

限流粒度較粗,不夠靈活精準,不夠智能。

當前限流維度組合

/config/users/<user>/clients/<client-id>
/config/users/<user>/clients/<default>
/config/users/<user>
/config/users/<default>/clients/<client-id>
/config/users/<default>/clients/<default>
/config/users/<default>
/config/clients/<client-id>
/config/clients/<default>

存在問題

當同一個broker上有多個用戶同時進行大量的生產和消費時,想要讓broker能夠正常運行,那必須在作限流時讓全部的用戶流量閾值之和不超過broker的吞吐上限;若是超過broker上限,那麼broker就存在被打掛的風險;然而,即便用戶流量沒有達到broker的流量上限,可是,若是全部用戶流量集中到了某幾塊盤上,超過了磁盤的讀寫負載,也會致使全部生產、消費請求將被阻塞,broker可能處於假死狀態。

解決方案

(1)改造源碼,實現單個broker流量上限限制,只要流量達到broker上限當即進行限流處理,全部往這個broker寫的用戶均可以被限制住;或者對用戶進行優先級處理,放太高優先級的,限制低優先級的;

(2)改造源碼,實現broker上單塊磁盤流量上限限制(不少時候都是流量集中到某幾塊磁盤上,致使沒有達到broker流量上限卻超過了單磁盤讀寫能力上限),只要磁盤流量達到上限,當即進行限流處理,全部往這個磁盤寫的用戶均可以被限制住;或者對用戶進行優先級處理,放太高優先級的,限制低優先級的;

(3)改造源碼,實現topic維度限流以及對topic分區的禁寫功能;

(4)改造源碼,實現用戶、broker、磁盤、topic等維度組合精準限流;

3、kafka發展趨勢

3.1 Kafka社區迭代計劃

3.2 逐步棄用ZooKeeper(KIP-500)

3.3 controller與broker分離,引入raft協議做爲controller的仲裁機制(KIP-630)

3.4 分層存儲(KIP-405)

3.5 能夠減小topic分區(KIP-694)

3.6 MirrorMaker2精確一次(KIP-656)

3.7 下載與各版本特性說明

3.8 Kafka全部KIP地址

4、如何貢獻社區

4.1 哪些點能夠貢獻

http://kafka.apache.org/contributing

4.2 wiki貢獻地址

https://cwiki.apache.org/confluence/dashboard.action#all-updates

4.3 issues地址

1)https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-10444?filter=allopenissues

2)https://issues.apache.org/jira/secure/BrowseProjects.jspa?selectedCategory=all

4.4 主要committers

http://kafka.apache.org/committers

做者:vivo互聯網服務器團隊-Yang Yijun
相關文章
相關標籤/搜索