Java進階專題(二十一) 消息中間件架構體系(3)-- Kafka研究

前言

Kafka 是一款分佈式消息發佈和訂閱系統,具備高性能、高吞吐量的特色而被普遍應用與大數據傳輸場景。它是由 LinkedIn 公司開發,使用 Scala 語言編寫,以後成爲 Apache 基金會的一個頂級項目。kafka 提供了相似 JMS 的特性,可是在設計和實現上是徹底不一樣的,並且他也不是 JMS 規範的實現。java

Kafka簡介

kafka產生背景

kafka 做爲一個消息系統,早起設計的目的是用做 LinkedIn 的活動流(Activity Stream)和運營數據處理管道(Pipeline)。活動流數據是全部的網站對用戶的使用狀況作分析的時候要用到的最常規的部分,活動數據包括頁面的訪問量(PV)、被查看內容方面的信息以及搜索內容。這種數據一般的處理方式是先把各類活動以日誌的形式寫入某種文件,而後週期性的對這些文件進行統計分析。運營數據指的是服務器的性能數據(CPU、IO 使用率、請求時間、服務日誌等)。node

Kafka應用場景

因爲 kafka 具備更好的吞吐量、內置分區、冗餘及容錯性的優勢(kafka 每秒能夠處理幾十萬消息),讓 kafka 成爲了一個很好的大規模消息處理應用的解決方案。linux

因此在企業級應用長,主要會應用於以下幾個方面算法

行爲跟蹤:kafka 能夠用於跟蹤用戶瀏覽頁面、搜索及其餘行爲。經過發佈-訂閱模式實時記錄到對應的 topic中,經過後端大數據平臺接入處理分析,並作更進一步的實時處理和監控
日誌收集:日誌收集方面,有不少比較優秀的產品,好比 Apache Flume,不少公司使用kafka 代理日誌聚合。日誌聚合表示從服務器上收集日誌文件,而後放到一個集中的平臺(文件服務器)進行處理。在實際應用開發中,咱們應用程序的 log 都會輸出到本地的磁盤上,排查問題的話經過 linux 命令來搞定,若是應用程序組成了負載均衡集羣,而且集羣的機器有幾十臺以上,那麼想經過日誌快速定位到問題,就是很麻煩的事情了。因此通常都會作一個日誌統一收集平臺管理 log 日誌用來快速查詢重要應用的問題。因此不少公司的套路都是把應用日誌幾種到 kafka 上,而後分別導入到 es 和 hdfs 上,用來作實時檢索分析和離線統計數據備份等。而另外一方面,kafka 自己又提供了很好的 api 來集成日誌而且作日誌收集spring

kafka架構

一個典型的 kafka 集羣包含若干 Producer(能夠是應用節點產生的消息,也能夠是經過Flume 收集日誌產生的事件),若干個 Broker(kafka 支持水平擴展)、若干個 Consumer Group,以及一個 zookeeper 集羣。kafka 經過 zookeeper 管理集羣配置及服務協同。docker

Producer 使用 push 模式將消息發佈到 broker,consumer 經過監聽使用 pull 模式從broker 訂閱並消費消息。多個 broker 協同工做,producer 和 consumer 部署在各個業務邏輯中。三者經過zookeeper 管理協調請求和轉發。這樣就組成了一個高性能的分佈式消息發佈和訂閱系統。圖上有一個細節是和其餘 mq 中間件不一樣的點,producer 發送消息到 broker的過程是 push,而 consumer 從 broker 消費消息的過程是 pull,主動去拉數據。而不是 broker 把數據主動發送給 consumershell

名詞解釋:數據庫

Topicapache

Kafka將消息分門別類,每一類的消息稱之爲一個主題(Topic)。json

Producer

發佈消息的對象稱之爲主題生產者(Kafka topic producer)

Consumer

訂閱消息並處理髮布的消息的對象稱之爲主題消費者(consumers)

Broker

已發佈的消息保存在一組服務器中,稱之爲Kafka集羣。集羣中的每個服務器都是一個代理(Broker)。 消費者能夠訂閱一個或多個主題(topic),並從Broker拉數據,從而消費這些已發佈的消息。

Topic和Log

Topic是發佈的消息的類別名,一個topic能夠有零個,一個或多個消費者訂閱該主題的消息。

對於每一個topic,Kafka集羣都會維護一個分區log,就像下圖中所示:

每個分區都是一個順序的、不可變的消息隊列, 而且能夠持續的添加。分區中的消息都被分了一個序列號,稱之爲偏移量(offset),在每一個分區中此偏移量都是惟一的。

Kafka集羣保持全部的消息,直到它們過時(不管消息是否被消費)。實際上消費者所持有的僅有的元數據就是這個offset(偏移量),也就是說offset由消費者來控制:正常狀況當消費者消費消息的時候,偏移量也線性的的增長。可是實際偏移量由消費者控制,消費者能夠將偏移量重置爲更早的位置,從新讀取消息。能夠看到這種設計對消費者來講操做自如,一個消費者的操做不會影響其它消費者對此log的處理。

再說說分區。Kafka中採用分區的設計有幾個目的。一是能夠處理更多的消息,不受單臺服務器的限制。Topic擁有多個分區意味着它能夠不受限的處理更多的數據。第二,分區能夠做爲並行處理的單元,稍後會談到這一點。

分佈式

Log的分區被分佈到集羣中的多個服務器上。每一個服務器處理它分到的分區。 根據配置每一個分區還能夠複製到其它服務器做爲備份容錯。 每一個分區有一個leader,零或多個follower。Leader處理此分區的全部的讀寫請求,而follower被動的複製數據。若是leader宕機,其它的一個follower會被推舉爲新的leader。 一臺服務器可能同時是一個分區的leader,另外一個分區的follower。 這樣能夠平衡負載,避免全部的請求都只讓一臺或者某幾臺服務器處理。

生產者

生產者往某個Topic上發佈消息。生產者也負責選擇發佈到Topic上的哪個分區。最簡單的方式從分區列表中輪流選擇。也能夠根據某種算法依照權重選擇分區。開發者負責如何選擇分區的算法。

消費者

一般來說,消息模型能夠分爲兩種, 隊列和發佈-訂閱式。 隊列的處理方式是 一組消費者從服務器讀取消息,一條消息只有其中的一個消費者來處理。在發佈-訂閱模型中,消息被廣播給全部的消費者,接收到消息的消費者均可以處理此消息。Kafka爲這兩種模型提供了單一的消費者抽象模型: 消費者組 (consumer group)。 消費者用一個消費者組名標記本身。 一個發佈在Topic上消息被分發給此消費者組中的一個消費者。 假如全部的消費者都在一個組中,那麼這就變成了queue模型。 假如全部的消費者都在不一樣的組中,那麼就徹底變成了發佈-訂閱模型。 更通用的, 咱們能夠建立一些消費者組做爲邏輯上的訂閱者。每一個組包含數目不等的消費者, 一個組內多個消費者能夠用來擴展性能和容錯。正以下圖所示:

2個kafka集羣託管4個分區(P0-P3),2個消費者組,消費組A有2個消費者實例,消費組B有4個。

正像傳統的消息系統同樣,Kafka保證消息的順序不變。 再詳細扯幾句。傳統的隊列模型保持消息,而且保證它們的前後順序不變。可是, 儘管服務器保證了消息的順序,消息仍是異步的發送給各個消費者,消費者收到消息的前後順序不能保證了。這也意味着並行消費將不能保證消息的前後順序。用過傳統的消息系統的同窗確定清楚,消息的順序處理很讓人頭痛。若是隻讓一個消費者處理消息,又違背了並行處理的初衷。 在這一點上Kafka作的更好,儘管並無徹底解決上述問題。 Kafka採用了一種分而治之的策略:分區。 由於Topic分區中消息只能由消費者組中的惟一一個消費者處理,因此消息確定是按照前後順序進行處理的。可是它也僅僅是保證Topic的一個分區順序處理,不能保證跨分區的消息前後處理順序。 因此,若是你想要順序的處理Topic的全部消息,那就只提供一個分區。

Docker搭建kafka

下載如下三個鏡像

docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
docker pull sheepkiller/kafka-manager

kafka-manager是kafka的可視化管理工具

啓動容器

docker run -d --name zookeeper --publish 2181:2181 \--volume /etc/localtime:/etc/localtime \--restart=always \wurstmeister/zookeeper
docker run -d --name kafka --publish 9082:9092 \--link zookeeper:zookeeper \--env KAFKA_BROKER_ID=100 \--env HOST_IP=127.0.0.1 \--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \--env KAFKA_ADVERTISED_HOST_NAME=192.168.1.108 \--env KAFKA_ADVERTISED_PORT=9082 \--restart=always \--volume /etc/localtime:/etc/localtime \wurstmeister/kafka
docker run -d --name kafka-manager \--link zookeeper:zookeeper \--link kafka:kafka -p 9001:9000 \--restart=always \--env ZK_HOSTS=zookeeper:2181 \sheepkiller/kafka-manager

訪問

http://127.0.0.1:9001

添加Cluster

查看界面

搭建完畢,頁面其餘功能本身摸索下

Kafka快速入門

//如下Spring Boot應用程序將三個消息發送到一個主題,接收它們,而後中止:
@SpringBootApplication
public class Application implements CommandLineRunner {

    public static Logger logger = LoggerFactory.getLogger(Application.class);

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args).close();
    }

    @Autowired
    private KafkaTemplate<String, String> template;

    private final CountDownLatch latch = new CountDownLatch(3);

    @Override
    public void run(String... args) throws Exception {
        this.template.send("myTopic", "foo1");
        this.template.send("myTopic", "foo2");
        this.template.send("myTopic", "foo3");
        latch.await(60, TimeUnit.SECONDS);
        logger.info("All received");
    }

    @KafkaListener(topics = "myTopic")
    public void listen(ConsumerRecord<?, ?> cr) throws Exception {
        logger.info(cr.toString());
        latch.countDown();
    }

}

Kafka進階

通訊原理

消息是 kafka 中最基本的數據單元,在 kafka 中,一條消息由 key、 value 兩部分構成,在發送一條消息時,咱們能夠指定這個 key,那麼 producer 會根據 key 和 partition 機制來判斷當前這條消息應該發送並存儲到哪一個 partition 中。咱們能夠根據須要進行擴展 producer 的 partition 機制。

消息默認的分發機制

默認狀況下,kafka 採用的是 hash 取模的分區算法。若是Key 爲 null,則會隨機分配一個分區。這個隨機是在這個參數」metadata.max.age.ms」的時間範圍內隨機選擇一個。對於這個時間段內,若是 key 爲 null,則只會發送到惟一的分區。這個值值哦默認狀況下是 10 分鐘更新一次。

關於 Metadata ,這個以前沒講過,簡單理解就是T opic/Partition 和 broker 的映射關係,每個 topic 的每個 partition,須要知道對應的 broker 列表是什麼, leader是誰、 follower 是誰。這些信息都是存儲在 Metadata 這個類裏面。

消費端如何消費指定的分區

//經過下面的代碼,就能夠消費指定該 topic 下的 0 號分區。其餘分區的數據就沒法接收
//消費指定分區的時候,不須要再訂閱
//kafkaConsumer.subscribe(Collections.singletonList(topic));
//消費指定的分區
TopicPartition topicPartition=new 
TopicPartition(topic,0);
kafkaConsumer.assign(Arrays.asList(topicPartit
ion));

消費原理

在實際生產過程當中,每一個 topic 都會有多個 partitions,多個 partitions 的好處在於,一方面可以對 broker 上的數據進行分片有效減小了消息的容量從而提高 io 性能。另一方面,爲了提升消費端的消費能力,通常會經過多個consumer 去消費同一個 topic ,也就是消費端的負載均衡機制,也就是咱們接下來要了解的,在多個partition 以及多個 consumer 的狀況下,消費者是如何消費消息的同時,在上一節課,咱們講了, kafka 存在 consumer group的 概 念 , 也 就是 group.id 同樣 的 consumer ,這些consumer 屬於一個 consumer group,組內的全部消費者協調在一塊兒來消費訂閱主題的全部分區。固然每個分區只能由同一個消費組內的 consumer 來消費,那麼同一個consumer group 裏面的 consumer 是怎麼去分配該消費哪一個分區裏的數據的呢?以下圖所示, 3 個分區, 3 個消費者,那麼哪一個消費者消分哪一個分區?

分區分配策略

在 kafka 中,存在兩種分區分配策略,一種是 Range(默認)、另 一 種 另 一 種 還 是 RoundRobin ( 輪 詢 )。 經過partition.assignment.strategy 這個參數來設置。

Range strategy(範圍分區)

Range 策略是對每一個主題而言的,首先對同一個主題裏面的分區按照序號進行排序,並對消費者按照字母順序進行排序。假設咱們有 10 個分區,3 個消費者,排完序的分區將會是 0, 1, 2, 3, 4, 5, 6, 7, 8, 9;消費者線程排完序將會是C1-0, C2-0, C3-0。而後將 partitions 的個數除於消費者線程的總數來決定每一個消費者線程消費幾個分區。若是除不盡,那麼前面幾個消費者線程將會多消費一個分區。在咱們的例子裏面,咱們有 10 個分區,3 個消費者線程, 10 / 3 = 3,並且除不盡,那麼消費者線程 C1-0 將會多消費一個分區,因此最後分區分配的結果看起來是這樣的:
C1-0 將消費 0, 1, 2, 3 分區
C2-0 將消費 4, 5, 6 分區
C3-0 將消費 7, 8, 9 分區

假如咱們有 11 個分區,那麼最後分區分配的結果看起來是這樣的:
C1-0 將消費 0, 1, 2, 3 分區
C2-0 將消費 4, 5, 6, 7 分區
C3-0 將消費 8, 9, 10 分區

假如咱們有 2 個主題(T1 和 T2),分別有 10 個分區,那麼最後分區分配的結果看起來是這樣的:
C1-0 將消費 T1 主題的 0, 1, 2, 3 分區以及 T2 主題的 0, 1, 2, 3 分區
C2-0 將消費 T1 主題的 4, 5, 6 分區以及 T2 主題的 4, 5, 6 分區
C3-0 將消費 T1 主題的 7, 8, 9 分區以及 T2 主題的 7, 8, 9 分區
能夠看出,C1-0 消費者線程比其餘消費者線程多消費了 2 個分區,這就是 Range strategy 的一個很明顯的弊端

RoundRobin strategy(輪詢分區)

輪詢分區策略是把全部 partition 和全部 consumer 線程都列出來,而後按照 hashcode 進行排序。最後經過輪詢算法分配 partition 給消費線程。若是全部 consumer 實例的訂閱是相同的,那麼 partition 會均勻分佈。

在咱們的例子裏面,假如按照 hashCode 排序完的 topicpartitions 組依次爲 T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9,咱們的消費者線程排序爲 C1-0, C1-1, C2-0, C2-1,最後分區分配的結果爲:
C1-0 將消費 T1-5, T1-2, T1-6 分區;
C1-1 將消費 T1-3, T1-1, T1-9 分區;
C2-0 將消費 T1-0, T1-4 分區;
C2-1 將消費 T1-8, T1-7 分區;

使用輪詢分區策略必須知足兩個條件

  1. 每一個主題的消費者實例具備相同數量的流
  2. 每一個消費者訂閱的主題必須是相同的

何時會觸發這個策略呢?

當出現如下幾種狀況時,kafka 會進行一次分區分配操做,
也就是 kafka consumer 的 rebalance

  1. 同一個 consumer group 內新增了消費者
  2. 消費者離開當前所屬的 consumer group,好比主動停機或者宕機
  3. topic 新增了分區(也就是分區數量發生了變化)
    kafka consuemr 的 rebalance 機制規定了一個 consumergroup 下的全部 consumer 如何達成一致來分配訂閱 topic的每一個分區。而具體如何執行分區策略,就是前面提到過的兩種內置的分區策略。而 kafka 對於分配策略這塊,提供了可插拔的實現方式, 也就是說,除了這兩種以外,咱們還能夠建立本身的分配機制。

何時會觸發這個策略呢?

當出現如下幾種狀況時,kafka 會進行一次分區分配操做,也就是 kafka consumer 的 rebalance

  1. 同一個 consumer group 內新增了消費者
  2. 消費者離開當前所屬的 consumer group,好比主動停機或者宕機
  3. topic 新增了分區(也就是分區數量發生了變化)kafka consuemr 的 rebalance 機制規定了一個 consumergroup 下的全部 consumer 如何達成一致來分配訂閱 topic的每一個分區。而具體如何執行分區策略,就是前面提到過的兩種內置的分區策略。而 kafka 對於分配策略這塊,提供了可插拔的實現方式, 也就是說,除了這兩種以外,咱們還能夠建立本身的分配機制。

誰來執行 Rebalance 以及管理 consumer 的 group 呢?

Kafka 提供了一個角色: coordinator 來執行對於 consumer group 的管理,Kafka 提供了一個角色:coordinator 來執行對於 consumer group 的管理,當 consumer group 的第一個 consumer 啓動的時候,它會去和 kafka server 肯定誰是它們組的 coordinator。以後該 group 內的全部成員都會和該 coordinator 進行協調通訊

如何肯定 coordinator

consumer group 如何肯定本身的 coordinator 是誰呢, 消費 者 向 kafka 集 羣 中 的 任 意 一 個 broker 發 送 一 個GroupCoordinatorRequest 請求,服務端會返回一個負載最 小 的 broker 節 點 的 id , 並 將 該 broker 設 置 爲coordinator

JoinGroup 的過程

在 rebalance 以前,須要保證 coordinator 是已經肯定好了的,整個 rebalance 的過程分爲兩個步驟,Join 和 Syncjoin: 表示加入到 consumer group 中,在這一步中,全部的成員都會向 coordinator 發送 joinGroup 的請求。一旦全部成員都發送了 joinGroup 請求,那麼 coordinator 會選擇一個 consumer 擔任 leader 角色,並把組成員信息和訂閱信息發送消費者

protocol_metadata: 序列化後的消費者的訂閱信息
leader_id: 消費組中的消費者,coordinator 會選擇一個座位 leader,對應的就是 member_id
member_metadata 對應消費者的訂閱信息
members:consumer group 中所有的消費者的訂閱信息
generation_id: 年代信息,相似於以前講解 zookeeper 的時候的 epoch 是同樣的,對於每一輪 rebalance ,
generation_id 都會遞增。主要用來保護 consumer group。隔離無效的 offset 提交。也就是上一輪的 consumer 成員沒法提交 offset 到新的 consumer group 中。

Synchronizing Group State 階段

完成分區分配以後,就進入了 Synchronizing Group State階段,主要邏輯是向 GroupCoordinator 發送SyncGroupRequest 請求,而且處理 SyncGroupResponse響應,簡單來講,就是 leader 將消費者對應的 partition 分配方案同步給 consumer group 中的全部 consumer

每一個消費者都會向 coordinator 發送 syncgroup 請求,不過只有 leader 節點會發送分配方案,其餘消費者只是打打醬油而已。當 leader 把方案發給 coordinator 之後,coordinator 會把結果設置到 SyncGroupResponse 中。這樣全部成員都知道本身應該消費哪一個分區。
➢ consumer group 的分區分配方案是在客戶端執行的!Kafka 將這個權利下放給客戶端主要是由於這樣作能夠有更好的靈活性

如何保存消費端的消費位置

什麼是 offset

前面在講partition 的時候,提到過 offset, 每一個 topic能夠劃分多個分區(每一個 Topic 至少有一個分區),同一topic 下的不一樣分區包含的消息是不一樣的。每一個消息在被添加到分區時,都會被分配一個 offset(稱之爲偏移量),它是消息在此分區中的惟一編號, kafka 經過 offset 保證消息在分區內的順序, offset 的順序不跨分區,即 kafka 只保證在同一個分區內的消息是有序的; 對於應用層的消費來講,每次消費一個消息而且提交之後,會保存當前消費到的最近的一個 offset。那麼 offset 保存在哪裏?

offset 在哪裏維護?

在 kafka 中,提供了一個__consumer_offsets_的一個topic ,把 offset 信 息 寫 入 到 這 個 topic 中。
_consumer_offsets——按保存了每一個 consumer group某一時刻提交的 offset 信息。 consumer_offsets 默認有50 個分區。

消息的存儲原理

消息的保存路徑

消息發送端發送消息到 broker 上之後,消息是如何持久化的呢?那麼接下來去分析下消息的存儲

首先咱們須要瞭解的是, kafka 是使用日誌文件的方式來保存生產者和發送者的消息,每條消息都有一個 offset 值來表示它在分區中的偏移量。 Kafka 中存儲的通常都是海量的消息數據,爲了不日誌文件過大,Log 並非直接對應在一個磁盤上的日誌文件,而是對應磁盤上的一個目錄,這個目錄的明明規則是<topic_name>_<partition_id>好比建立一個名爲 firstTopic 的 topic,其中有 3 個 partition,那麼在 kafka 的數據目錄(/tmp/kafka-log)中就有 3 個目錄,firstTopic-0~3

多個分區在集羣中的分配

若是咱們對於一個 topic,在集羣中建立多個 partition,那麼 partition 是如何分佈的呢?
1.將全部 N Broker 和待分配的 i 個 Partition 排序
2.將第 i 個 Partition 分配到第(i mod n)個 Broker 上

瞭解到這裏的時候,你們再結合前面講的消息分發策略,就應該能明白消息發送到 broker 上,消息會保存到哪一個分區中,而且消費端應該消費哪些分區的數據了。

消息寫入的性能

咱們如今大部分企業仍然用的是機械結構的磁盤,若是把消息以隨機的方式寫入到磁盤,那麼磁盤首先要作的就是尋址,也就是定位到數據所在的物理地址,在磁盤上就要找到對應的柱面、磁頭以及對應的扇區;這個過程相對內存來講會消耗大量時間,爲了規避隨機讀寫帶來的時間消耗, kafka 採用順序寫的方式存儲數據。即便是這樣,可是頻繁的 I/O 操做仍然會形成磁盤的性能瓶頸,因此 kafka還有一個性能策略

頁緩存

順序寫入是Kafka高吞吐量的一個緣由,固然即便採用的是磁盤的順序寫入,那麼也是沒有辦法和內存相比的。由於爲了再一次提升Kakfa的吞吐量,Kafka採用了Memory Mapped Files
(後面簡稱mmap)也被翻譯成內存映射文件 ,它的工做原理是直接利用操做系統的page cache 來實現文件到物理內存的直接映射,完成映射以後你對物理內存的操做會被同步到硬盤上(操做系統在適當的時候)。
操做系統自己有一層緩存,叫作page cache,是在內存裏的緩存,咱們也能夠稱之爲os cache,意思就是操做系統本身管理的緩存。你在寫入磁盤文件的時候,能夠直接寫入這個os cache裏,也就是僅僅寫入內存中,接下來由操做系統本身決定何時把os cache裏的數據真的刷入磁
盤文件中(每5秒檢查一次是否須要將頁緩存數據同步到磁盤文件)。僅僅這一個步驟,就能夠將磁盤文件寫性能提高不少了,由於其實這裏至關因而在寫內存,不是在寫磁盤.

零拷貝

消息從發送到落地保存,broker 維護的消息日誌自己就是文件目錄,每一個文件都是二進制保存,生產者和消費者使用相同的格式來處理。在消費者獲取消息時,服務器先從硬盤讀取數據到內存,而後把內存中的數據原封不動的經過 socket 發送給消費者。雖然這個操做描述起來很簡單,但實際上經歷了不少步驟。

▪ 操做系統將數據從磁盤讀入到內核空間的頁緩存
▪ 應用程序將數據從內核空間讀入到用戶空間緩存中
▪ 應用程序將數據寫回到內核空間到 socket 緩存中
▪ 操做系統將數據從 socket 緩衝區複製到網卡緩衝區,以便將數據經網絡發出

這個過程涉及到 4 次上下文切換以及 4 次數據複製,而且有兩次複製操做是由 CPU 完成。可是這個過程當中,數據徹底沒有進行變化,僅僅是從磁盤複製到網卡緩衝區。

經過「零拷貝」技術,能夠去掉這些不必的數據複製操做,同時也會減小上下文切換次數。現代的 unix 操做系統提供一個優化的代碼路徑,用於將數據從頁緩存傳輸到 socket;在 Linux 中,是經過 sendfile 系統調用來完成的。Java 提供了訪問這個系統調用的方法: FileChannel.transferTo API

使用 sendfile,只須要一次拷貝就行,容許操做系統將數據直接從頁緩存發送到網絡上。因此在這個優化的路徑中,只有最後一步將數據拷貝到網卡緩存中是須要的

消息的文件存儲機制

前面咱們知道了一個 topic 的多個 partition 在物理磁盤上的保存路徑,那麼咱們再來分析日誌的存儲方式。經過以下命令找到對應 partition 下的日誌內容

[root@localhost ~]# ls /tmp/kafka-logs/firstTopic-1/00000000000000000000.index 00000000000000000000.log  00000000000000000000.timeindex   leader-epochcheckpoint

kafka 是經過分段的方式將 Log 分爲多個 LogSegment,LogSegment 是一個邏輯上的概念,一個 LogSegment 對應磁盤上的一個日誌文件和一個索引文件,其中日誌文件是用來記錄消息的。索引文件是用來保存消息的索引。那麼這個 LogSegment 是什麼呢?

LogSegment

假設 kafka 以 partition 爲最小存儲單位,那麼咱們能夠想象當 kafka producer 不斷髮送消息,必然會引發 partition文件的無線擴張,這樣對於消息文件的維護以及被消費的消息的清理帶來很是大的挑戰,因此 kafka 以 segment 爲單位又把 partition 進行細分。每一個 partition 至關於一個巨型文件被平均分配到多個大小相等的 segment 數據文件中(每一個 segment 文件中的消息不必定相等),這種特性方便已經被消費的消息的清理,提升磁盤的利用率。
➢ log.segment.bytes=107370 ( 設置分段大小 ), 默認是1gb,咱們把這個值調小之後,能夠看到日誌分段的效果
➢ 抽取其中 3 個分段來進行分析

segment file 由 2 大部分組成,分別爲 index file 和 data file,此 2 個文件一一對應,成對出現,後綴".index"和「.log」分別表示爲 segment 索引文件、數據文件.segment 文件命名規則:partion 全局的第一個 segment從 0 開始,後續每一個 segment 文件名爲上一個 segment文件最後一條消息的 offset 值進行遞增。數值最大爲 64 位long 大小,20 位數字字符長度,沒有數字用 0 填。

segment 中 index 和 log 的對應關係

從全部分段中,找一個分段進行分析爲了提升查找消息的性能,爲每個日誌文件添加 2 個索引索引文件: OffsetIndex 和 TimeIndex,分別對應.index以及.timeindex, TimeIndex 索引文件格式:它是映射時間戳和相對offset

查 看 索 引 內 容 : sh  kafka-run-class.sh 
kafka.tools.DumpLogSegments  --files  /tmp/kafkalogs/test-0/00000000000000000000.index  --print-datalog

如圖所示,index 中存儲了索引以及物理偏移量。 log 存儲了消息的內容。索引文件的元數據執行對應數據文件中
message 的物理偏移地址。舉個簡單的案例來講,以[4053,80899]爲例,在 log 文件中,對應的是第 4053 條記錄,物理偏移量( position )爲 80899. position 是ByteBuffer 的指針位置

在 partition 中如何經過 offset 查找 message

  1. 根據 offset 的值,查找 segment 段中的 index 索引文件。因爲索引文件命名是以上一個文件的最後一個offset 進行命名的,因此,使用二分查找算法可以根據offset 快速定位到指定的索引文件。

  2. 找到索引文件後,根據 offset 進行定位,找到索引文件中的符合範圍的索引。(kafka 採用稀疏索引的方式來提升查找性能)

  3. 獲得 position 之後,再到對應的 log 文件中,從 position出開始查找 offset 對應的消息,將每條消息的 offset 與目標 offset 進行比較,直到找到消息

好比說,咱們要查找 offset=2490 這條消息,那麼先找到00000000000000000000.index, 而後找到[2487,49111]這個索引,再到 log 文件中,根據 49111 這個 position 開始查找,比較每條消息的 offset 是否大於等於 2490。最後查找到對應的消息之後返回

日誌清除策略

前面提到過,日誌的分段存儲,一方面可以減小單個文件內容的大小,另外一方面,方便 kafka 進行日誌清理。日誌的清理策略有兩個

  1. 根據消息的保留時間,當消息在 kafka 中保存的時間超過了指定的時間,就會觸發清理過程
  2. 根據 topic 存儲的數據大小,當 topic 所佔的日誌文件大小大於必定的閥值,則能夠開始刪除最舊的消息。 kafka會啓動一個後臺線程,按期檢查是否存在能夠刪除的消息

經過 log.retention.bytes 和 log.retention.hours 這兩個參數來設置,當其中任意一個達到要求,都會執行刪除。默認的保留時間是:7 天

日誌壓縮策略

Kafka 還提供了「日誌壓縮(Log Compaction)」功能,經過這個功能能夠有效的減小日誌文件的大小,緩解磁盤緊張的狀況,在不少實際場景中,消息的 key 和 value 的值之間的對應關係是不斷變化的,就像數據庫中的數據會不斷被修改同樣,消費者只關心 key 對應的最新的 value。 所以,咱們能夠開啓 kafka 的日誌壓縮功能,服務端會在後臺啓動啓動 Cleaner 線程池,按期將相同的 key 進行合併,只保留最新的 value 值。日誌的壓縮原理是

partition 的高可用副本機制

咱們已經知道 Kafka 的每一個 topic 均可以分爲多個 Partition,而且多個 partition 會均勻分佈在集羣的各個節點下。雖然這種方式可以有效的對數據進行分片,可是對於每一個partition 來講,都是單點的,當其中一個 partition 不可用的時候,那麼這部分消息就沒辦法消費。因此 kafka 爲了提升 partition 的可靠性而提供了副本的概念(Replica) ,經過副本機制來實現冗餘備份。每一個分區能夠有多個副本,而且在副本集合中會存在一個leader 的副本,全部的讀寫請求都是由 leader 副原本進行處理。剩餘的其餘副本都作爲 follower 副本,follower 副本 會 從 leader 副 本 同 步 消 息 日 志 。 這 個 有 點 類 似zookeeper 中 leader 和 follower 的概念,可是具體的時間方式仍是有比較大的差別。因此咱們能夠認爲,副本集會存在一主多從的關係。
通常狀況下,同一個分區的多個副本會被均勻分配到集羣中的不一樣 broker 上,當 leader 副本所在的 broker 出現故障後,能夠從新選舉新的 leader 副本繼續對外提供服務。經過這樣的副本機制來提升 kafka 集羣的可用性。

副本分配算法

將全部 N Broker 和待分配的 i 個 Partition 排序.
將第 i 個 Partition 分配到第(i mod n)個 Broker 上.
將第 i 個 Partition 的第 j 個副本分配到第((i + j) mod n)個Broker 上.

kafka 副本機制中的幾個概念

Kafka 分區下有可能有不少個副本(replica)用於實現冗餘,從而進一步實現高可用。副本根據角色的不一樣可分爲 3 類:
leader 副本:響應 clients 端讀寫請求的副本
follower 副本:被動地備份 leader 副本中的數據,不能響應 clients 端讀寫請求。
ISR 副本:包含了 leader 副本和全部與 leader 副本保持同步的 follower 副本——如何斷定是否與 leader 同步後面會提到每一個 Kafka 副本對象都有兩個重要的屬性:LEO 和HW。注意是全部的副本,而不僅是 leader 副本。
LEO:即日誌末端位移(log end offset),記錄了該副本底層日誌(log)中下一條消息的位移值。注意是下一條消息!也就是說,若是 LEO=10,那麼表示該副本保存了 10 條消息,位移值範圍是[0, 9]。另外, leader LEO 和follower LEO 的更新是有區別的。咱們後面會詳細說
HW:即上面提到的水位值。對於同一個副本對象而言,其
HW 值不會大於 LEO 值。小於等於 HW 值的全部消息都被認爲是「 已備份」 的(replicated )。同理, leader 副本和follower 副本的 HW 更新是有區別的

副本協同機制

剛剛提到了,消息的讀寫操做都只會由 leader 節點來接收和處理。follower 副本只負責同步數據以及當 leader 副本所在的 broker 掛了之後,會從 follower 副本中選取新的leader。

請求首先由 Leader 副本處理,以後 follower 副本會從leader 上拉取寫入的消息,這個過程會有必定的延遲,致使 follower 副本中保存的消息略少於 leader 副本,可是隻要沒有超出閾值均可以容忍。可是若是一個 follower 副本出現異常,好比宕機、網絡斷開等緣由長時間沒有同步到消息,那這個時候, leader 就會把它踢出去。 kafka 經過 ISR集合來維護一個分區副本信息

ISR
ISR 表示目前「可用且消息量與 leader 相差很少的副本集合,這是整個副本集合的一個子集」。怎麼去理解可用和相差很少這兩個詞呢?具體來講,ISR 集合中的副本必須知足兩個條件

  1. 副本所在節點必須維持着與 zookeeper 的鏈接
  2. 副本最後一條消息的 offset 與 leader 副本的最後一條消息的 offset 之 間 的 差 值 不 能 超 過 指 定 的 閾值(replica.lag.time.max.ms)replica.lag.time.max.ms:若是該 follower 在此時間間隔內一直沒有追上過 leader 的全部消息,則該 follower 就會被剔除 isr 列表
    ➢ ISR 數 據 保 存 在 Zookeeper 的/brokers/topics/ /partitions/ /state 節點中

HW&LEO

關於 follower 副本同步的過程當中,還有兩個關鍵的概念,HW(HighWatermark)和 LEO(Log End Offset). 這兩個參數跟 ISR 集合緊密關聯。 HW 標記了一個特殊的 offset,當消費者處理消息的時候,只能拉去到 HW 以前的消息, HW以後的消息對消費者來講是不可見的。也就是說,取partition 對應 ISR 中最小的 LEO 做爲 HW,consumer 最多隻能消費到 HW 所在的位置。每一個 replica 都有 HW,leader 和 follower 各自維護更新本身的 HW 的狀態。一條消息只有被 ISR 裏的全部 Follower 都從 Leader 複製過去纔會被認爲已提交。這樣就避免了部分數據被寫進了Leader,還沒來得及被任何 Follower 複製就宕機了,而形成數據丟失(Consumer 沒法消費這些數據)。而對於Producer 而言,它能夠選擇是否等待消息 commit,這能夠經過 acks 來設置。這種機制確保了只要 ISR 有一個或以上的 Follower,一條被 commit 的消息就不會丟失。

數據的同步過程

瞭解了副本的協同過程之後,還有一個最重要的機制,就是數據的同步過程。它須要解決

  1. 怎麼傳播消息
  2. 在向消息發送端返回 ack 以前須要保證多少個 Replica
    已經接收到這個消息

數據的處理過程是
Producer 在 發 布 消 息 到 某 個 Partition 時 ,先經過ZooKeeper 找到該 Partition 的 Leader 【 get /brokers/topics/ /partitions/2/state】,而後不管該Topic 的 Replication Factor 爲多少(也即該 Partition 有多少個 Replica), Producer 只將該消息發送到該 Partition 的Leader。 Leader 會將該消息寫入其本地 Log。每一個 Follower都從 Leader pull 數據。這種方式上, Follower 存儲的數據順序與 Leader 保持一致。 Follower 在收到該消息並寫入其Log 後,向 Leader 發送 ACK。一旦 Leader 收到了 ISR 中的全部 Replica 的 ACK,該消息就被認爲已經 commit 了,Leader 將增長 HW(HighWatermark)而且向 Producer 發送ACK。

初始狀態

初始狀態下,leader 和 follower 的 HW 和 LEO 都是 0,leader 副本會保存 remote LEO,表示全部 follower LEO,也會被初始化爲 0,這個時候,producer 沒有發送消息。follower 會不斷地個 leader 發送 FETCH 請求,可是由於沒有數據,這個請求會被 leader 寄存,當在指定的時間以後會 強 制 完 成 請 求 , 這 個 時 間 配 置 是(replica.fetch.wait.max.ms),若是在指定時間內 producer有消息發送過來,那麼 kafka 會喚醒 fetch 請求,讓 leader繼續處理

這裏會分兩種狀況,第一種是 leader 處理完 producer 請求以後,follower 發送一個 fetch 請求過來、第二種是follower 阻塞在 leader 指定時間以內,leader 副本收到producer 的請求。這兩種狀況下處理方式是不同的。先來看第一種狀況

1、follower 的 fetch 請求是當 leader 處理消息之後執行的

leader 處理完 producer 請求以後,follower 發送一個fetch 請求過來 。狀態圖以下

leader 副本收到請求之後,會作幾件事情

  1. 把消息追加到 log 文件,同時更新 leader 副本的 LEO
  2. 嘗試更新 leader HW 值。這個時候因爲 follower 副本尚未發送 fetch 請求,那麼 leader 的 remote LEO 仍然是 0。leader 會比較本身的 LEO 以及 remote LEO 的值發現最小值是 0,與 HW 的值相同,因此不會更新 HW

follower fetch 消息

follower 發送 fetch 請求,leader 副本的處理邏輯是:

  1. 讀取 log 數據、更新 remote LEO=0(follower 尚未寫入這條消息,這個值是根據 follower 的 fetch 請求中的offset 來肯定的)
  2. 嘗試更新 HW,由於這個時候 LEO 和 remoteLEO 仍是不一致,因此仍然是 HW=0
  3. 把消息內容和當前分區的 HW 值發送給 follower 副本follower 副本收到 response 之後
  4. 將消息寫入到本地 log,同時更新 follower 的 LEO
  5. 更新 follower HW,本地的 LEO 和 leader 返回的 HW進行比較取小的值,因此仍然是 0第一次交互結束之後, HW 仍然仍是 0,這個值會在下一次follower 發起 fetch 請求時被更新

follower 發第二次 fetch 請求,leader 收到請求之後

  1. 讀取 log 數據
  2. 更新 remote LEO=1, 由於此次 fetch 攜帶的 offset 是1.
  3. 更新當前分區的 HW,這個時候 leader LEO 和 remoteLEO 都是 1,因此 HW 的值也更新爲 1
  4. 把數據和當前分區的 HW 值返回給 follower 副本,這個時候若是沒有數據,則返回爲空

follower 副本收到 response 之後

  1. 若是有數據則寫本地日誌,而且更新 LEO
  2. 更新 follower 的 HW 值 到目前爲止,數據的同步就完成了,意味着消費端可以消費 offset=0 這條消息。

2、follower 的 fetch 請求是直接從阻塞過程當中觸發

前面說過,因爲 leader 副本暫時沒有數據過來,因此follower 的 fetch 會被阻塞,直到等待超時或者 leader 接收到新的數據。當 leader 收到請求之後會喚醒處於阻塞的fetch 請求。處理過程基本上和前面說的一直

  1. leader 將消息寫入本地日誌,更新 Leader 的 LEO
  2. 喚醒 follower 的 fetch 請求
  3. 更新 HWkafka 使用 HW 和 LEO 的方式來實現副本數據的同步,自己是一個好的設計,可是在這個地方會存在一個數據丟失的問題,固然這個丟失只出如今特定的背景下。咱們回想一下, HW 的值是在新的一輪 FETCH 中才會被更新。咱們分析下這個過程爲何會出現數據丟失

數據丟失問題

問題描述

前提: min.insync.replicas=1 的時候。 ->設定 ISR 中的最小副本數是多少,默認值爲 1, 當且僅當 acks 參數設置爲-1(表示須要全部副本確認) 時,此參數才生效. 表達的含義是,至少須要多少個副本同步才能表示消息是提交的因此,當 min.insync.replicas=1 的時候一旦消息被寫入 leader 端 log 即被認爲是「已提交」,而延遲一輪 FETCH RPC 更新 HW 值的設計使得 follower HW值是異步延遲更新的,假若在這個過程當中 leader 發生變動,那麼成爲新 leader 的 follower 的 HW 值就有多是過時的,使得 clients 端認爲是成功提交的消息被刪除。

數據丟失的解決方案

在 kafka0.11.0.0 版本之後,提供了一個新的解決方案,使用 leader epoch 來解決這個問題, leader epoch 其實是一對之(epoch,offset), epoch 表示 leader 的版本號,從 0開始,當 leader 變動過 1 次時 epoch 就會+1,而 offset 則對應於該 epoch 版本的 leader 寫入第一條消息的位移。好比說(0,0) ; (1,50); 表示第一個 leader 從 offset=0 開始寫消息,一共寫了 50 條,第二個 leader 版本號是 1,從 50 條處開始寫消息。這個信息保存在對應分區的本地磁盤文件中,文 件 名 爲 : /tml/kafka-log/topic/leader-epochcheckpointleader broker 中會保存這樣的一個緩存,並按期地寫入到一個 checkpoint 文件中。
當 leader 寫 log 時它會嘗試更新整個緩存——若是這個leader 首次寫消息,則會在緩存中增長一個條目;不然就不作更新。而每次副本從新成爲 leader 時會查詢這部分緩存,獲取出對應 leader 版本的 offset

如何處理全部的 Replica 不工做的狀況

在 ISR 中至少有一個 follower 時,Kafka 能夠確保已經commit 的數據不丟失,但若是某個 Partition 的全部Replica 都宕機了,就沒法保證數據不丟失了

  1. 等待 ISR 中的任一個 Replica「活」過來,而且選它做爲Leader
  2. 選擇第一個「活」過來的 Replica(不必定是 ISR 中的)做爲 Leader這就須要在可用性和一致性當中做出一個簡單的折衷。若是必定要等待 ISR 中的 Replica「活」過來,那不可用的時間就可能會相對較長。並且若是 ISR 中的全部 Replica 都沒法「活」過來了,或者數據都丟失了,這個 Partition 將永遠不可用。

選擇第一個 「 活 」 過來的 Replica 做爲 Leader ,而這個Replica 不是 ISR 中的 Replica,那即便它並不保證已經包含了全部已 commit 的消息,它也會成爲 Leader 而做爲consumer 的數據源(前文有說明,全部讀寫都由 Leader完成)。

ISR 的設計原理

​ 在全部的分佈式存儲中,冗餘備份是一種常見的設計方式,而經常使用的模式有同步複製和異步複製,按照 kafka 這個副本模型來講若是採用同步複製,那麼須要要求全部能工做的 Follower 副本都複製完,這條消息纔會被認爲提交成功,一旦有一個follower 副本出現故障,就會致使 HW 沒法完成遞增,消息就沒法提交,消費者就獲取不到消息。這種狀況下,故障的Follower 副本會拖慢整個系統的性能,設置致使系統不可用.
​ 若是採用異步複製, leader 副本收到生產者推送的消息後,就認爲次消息提交成功。follower 副本則異步從 leader 副本同步。這種設計雖然避免了同步複製的問題,可是假設全部follower 副本的同步速度都比較慢他們保存的消息量遠遠落後於 leader 副本。而此時 leader 副本所在的 broker 忽然宕機,則會從新選舉新的 leader 副本,而新的 leader 副本中沒有原來 leader 副本的消息。這就出現了消息的丟失。
​ kafka 權衡了同步和異步的兩種策略,採用 ISR 集合,巧妙解決了兩種方案的缺陷:當 follower 副本延遲太高, leader 副本則會把該 follower 副本提出 ISR 集合,消息依然能夠快速提交。當 leader 副本所在的 broker 忽然宕機,會優先將 ISR 集合中follower 副本選舉爲 leader,新 leader 副本包含了 HW 以前的所有消息,這樣就避免了消息的丟失。

Kafka順序性保證

Kafka保證消息順序性的特色以下所示:
topic中的數據分割爲一個或多個partition。每一個topic至少有一個partition。在單個partition中的數據是有序的,若是消息分散在不一樣的partition,Kafka 沒法保證其順序性。但只須要確保要求順序性的若干消息發送到同一個partiton,便可知足其順序性。而且在進行消息消費的時候,須要確保消費者是進行單線程消費。

要保證若干消息發送到同一個partiton中,那麼咱們就須要在發送消息的時候指定一個分區的id,那麼這樣的話消息就被髮送到同一個分區中。

// 發送消息到指定的分區,保證分區的消息順序性
public static void sendMessageToDestPartition() {
   for(int x = 0; x < 5; x++) {
       // Kafka消息的異步發送
       String msg = "Kakfa環境測試...." + x;
       kafkaTemplate.send("test",0, "order",
msg).addCallback((obj) ->{
           LOGGER.info("send msg to kafka broker success ---> {} ",
((SendResult)obj).getProducerRecord().value());
      } , (t) ->{
           t.printStackTrace();
      });
       LOGGER.info("send msg to local cache success ---> {} ", msg);
  }
}

消費者進行指定分區的消費:

@KafkaListener(topicPartitions =
{@org.springframework.kafka.annotation.TopicPartition(topic = "test",
partitions = "0")} , groupId = "test.demo")
public void consumerOrderMessageHandler(String msg, KafkaConsumer
consumer) {
   LOGGER.info("consumer topic is : {} , msg is ----> {} ", "test",
msg);
   consumer.commitAsync();
}

Kafka解決消息重複保證

生產者消息重複

問題描述
生產者發送的消息沒有收到正確的broke響應,致使producer重試。producer發出一條消息,broker落盤之後由於網絡等種種緣由發送端獲得一個發送失敗的響應或者網絡中斷,而後producer收到一個可恢復的Exception重試消息致使消息重複。

解決方案

解決方案:
一、啓動kafka的冪等性
二、retries=0,不重試(可能會丟消息,適用於吞吐量指標重要性高於數據丟失,例如:日誌收集)所謂冪等性,就是對接口的屢次調用所產生的結果和調用一次是一致的。生產者在進行重試的時候有可能會重複寫入消息,而使用Kafka的冪等性功能就能夠避免這種狀況。
開啓冪等性的方式比較簡單,咱們只須要設置enable.idempotence參數爲true就能夠了。以下所示:

spring:
 kafka:
   producer:
     bootstrap-servers: 192.168.23.131:9092
     acks: all
     retries: 2
     properties: {'max.in.flight.requests.per.connection': 1,"enable.idempotence":true}

若是使用冪等性,而且咱們顯示的指定了retries,acks,max.in.flight.requests.per.connection這幾個參數,那麼就對這幾個參數的配置是有要求的:
retries的值必須是大於0,若是設置不對就會拋出以下異常:

Caused by: org.apache.kafka.common.config.ConfigException: Must set retries to non-zero when using the idempotent producer.

max.in.flight.requests.per.connection的值不能大於5,若是設置不對就會拋出以下異常:

Caused by: org.apache.kafka.common.config.ConfigException: Must set
max.in.flight.requests.per.connection to at most 5 to use the idempotent
producer.

acks的取值須要設置爲-1/all,若是設置不對就會拋出以下異常:

Caused by: org.apache.kafka.common.config.ConfigException: Must set acks
to all in order to use the idempotent producer. Otherwise we cannot
guarantee idempotence.

冪等性原理介紹:
爲了實現生產者冪等性,Kafka爲此引入了producer id(PID) 和序列號(sequence number)這兩個概念,每一個新的生產者實例在初始化的時候都會被分配一個PID ,這個PID對用戶而言是徹底透明的。對於每一個PID,消息發送到的每個分區都有對應的序列號,這些序列號從0開始單調遞增。生產者每發送一條消息就會將<PID , 分區>對應的序號的值加1。
broker端會在內存中爲每一對<PID , 分區>維護一個序列號。對於收到的每一條消息,會存在這樣的幾種情
況:
一、SN_new = SN_old + 1時,broker纔會接收它。
二、SN_new < SN_old + 1,那麼說明消息被重複寫入,broker能夠直接將其丟棄。
三、SN_new > SN_old + 1,那麼說明中間有數據還沒有寫入,出現了亂序,暗示可能有消息丟失,對應的生產者會拋出OutOfOrderSquenceException,這個異常時一個嚴重的異常。冪等性不能跨分區實現。

相關知識
冪等性並不能跨多個分區運做,好比咱們如今要想發送3個消息,當第二個消息發送完畢之後程序報錯了,這樣第三個消息就沒有發送成功,當下一次在調用這個方法發送數據的時候,就會致使消息重複發送(失去了冪等性)。而事務能夠彌補這個缺憾,事務能夠保證對多個分區寫入操做的原子性。操做的原子性是指多個操做要麼所有成功,要麼所有失敗,不存在部分紅功部分失敗的可能。爲了實現事務,應用程序必須提供惟一的transactionalId,這個參數經過客戶

端程序來進行設定。以下所示:

spring.kafka.producer.transaction-id-prefix=order_tx.   # 表示開啓事務機制

而且事務機制的使用須要冪等性的支持,因此咱們還須要開啓冪等性:enable.idempotence = true。若是沒有開啓冪等性的支持,就會報錯,以下所示:

Caused by: org.apache.kafka.common.config.ConfigException: Cannot set a transactional.id without also enabling idempotence.

事務機制實現的兩種方式:

一、第一種方式

// 事務消息發送的第一種方式
public static void sendTransactionMessageMethod01() {
    // 發送事務消息
   kafkaTemplate.executeInTransaction((operations) ->{
       // 發送消息
       kafkaTemplate.send("itcast", 0, "order", "事務消息----> 1")
;
       kafkaTemplate.send("itcast", 0, "order", "事務消息----> 2")
;
       // 產生異常代碼
       int a = 1 / 0;
       kafkaTemplate.send("itcast", 0, "order", "事務消息----> 3")
;
       // 返回true,表示發送成功
       return true;
  }) ;
   LOGGER.info("send transaction message to local cache success ");
}

二、第二種方式

@Transactional(rollbackFor = RuntimeException.class)
public void sendTransactionMessage() {
   // 發送消息
   kafkaTemplate.send("itcast", 0, "order", "事務消息----> 1") ;
   kafkaTemplate.send("itcast", 0, "order", "事務消息----> 2") ;
   // 產生異常代碼
   int a = 1 / 0;
   kafkaTemplate.send("itcast", 0, "order", "事務消息----> 3") ;
}

消費端消息重複

一、根本緣由
數據消費完沒有及時提交offset到broker。
解決方案
一、取消自動自動提交
每次消費完或者程序退出時手動提交。
二、下游作冪等
通常的解決方案是讓下游作冪等。

Kafka爲何快

分區管理

​ Kafka能夠將主題劃分爲多個分區(Partition),會根據分區規則選擇把消息存儲到哪一個分區中,只要分區
規則設置的合理,那麼全部的消息將會被均勻的分佈到不一樣的分區中,這樣就實現了負載均衡和水平擴展。另外,多個訂閱者能夠從一個或者多個分區中同時消費數據,以支撐海量數據處理能力。順便說一句,因爲消息是以追加的方法存儲到分區中的,多個分區順序寫磁盤的總效率要比隨機寫內存還要高(引用Apache Kafka – A High Throughput Distributed Messaging System的觀點),是Kafka高吞吐率的重要保證之一。

分區副本機制

因爲Producer和Consumer都只會與Leader角色的分區副本相連,因此kafka須要以集羣的組織形式提供主題下的消息高可用。kafka支持主備複製,因此消息具有高可用和持久性。
一個分區能夠有多個副本,這些副本保存在不一樣的broker上。每一個分區的副本中都會有一個做爲Leader。當
一個broker失敗時,Leader在這臺broker上的分區都會變得不可用,kafka會自動移除Leader,再其餘副本中選一個做爲新的Leader。在一般狀況下,增長分區能夠提供kafka集羣的吞吐量。然而,也應該意識到集羣的總分區數或是單臺服務器上的分區數過多,會增長不可用及延遲的風險。

分區leader選舉

能夠預見的是,若是某個分區的Leader掛了,那麼其它跟隨者將會進行選舉產生一個新的leader,以後全部的讀寫就會轉移到這個新的Leader上,在kafka中,其不是採用常見的多數選舉的方式進行副本的Leader選舉,而是會在Zookeeper上針對每一個T opic維護一個稱爲ISR(in-sync replica,已同步的副本)的集合,顯然還有一些副本沒有來得及同步。只有這個ISR列表裏面的纔有資格成爲leader(先使用ISR裏面的第一個,若是不行依次類推,由於ISR裏面的是同步副本,消息是最完整且各個節點都是同樣的)。經過ISR,kafka能夠容忍的失敗數比較高。
假設某個topic有f+1個副本,kafka能夠容忍f個不可用,固然若是所有ISR裏面的副本都不可用,也能夠選擇其餘可用的副本,只是存在數據的不一致。

分區從新分配

咱們往已經部署好的Kafka集羣裏面添加機器是最正常不過的需求,並且添加起來很是地方便,咱們須要作
的事是從已經部署好的Kafka節點中複製相應的配置文件,而後把裏面的
broker id修改爲全局惟一的,最後啓動這個節點便可將它加入到現有Kafka集羣中。可是問題來了,新添加
的Kafka節點並不會自動地分配數據,因此沒法分擔集羣的負載,除非我
們新建一個topic。可是如今咱們想手動將部分分區移到新添加的Kafka節點上,Kafka內部提供了相關的工具
來從新分佈某個topic的分區。
具體的實現步驟以下所示:
一、好比某一個主題的分區信息以下所示:

二、給某一個分區在添加一個新的分區

bin/kafka-topics.sh --alter --zookeeper 172.19.0.61:2181 --topic demo
--partitions 4

添加完畢之後,分區的信息以下所示:

這樣會致使3個Broker上有從新維護了更多的分區節點。

三、再次建立一個kafka的容器

docker run -di --network=host --name=kafka_04 -v
/etc/localtime:/etc/localtime --privileged=true
wurstmeister/kafka:latest /bin/bash

查看itheima主題的分區狀況,以下所示

和以前沒有任何的變化。

三、修改集羣配置文件

broker.id=3                       # 表示broker的編號,如
果集羣中有多個broker,則每一個broker的編號須要設置的不一樣
port=9095                      # 端口號
listeners=PLAINTEXT://192.168.23.131:9095       # brokder對外提供的服
務入口地址
log.dirs=/tmp/kafka-logs                # 設置存放消息日誌文件
的地址
zookeeper.connect=172.19.0.61:2181,172.19.0.62:2181,172.19.0.63:2181
# Kafka所需Zookeeper集羣地址,Zookeeper和Kafka都安裝本機

四、從新分配

如今咱們須要將原先分佈在broker 1-3節點上的分區從新分佈到broker 1-4節點上,藉助kafkareassignpartitions.sh工具生成reassign plan,不過咱們先得按照要求定義一個文件,裏面說明哪些topic須要從新分區,文件內容以下:

demo@Server-node:/mnt/d/kafka-cluster/kafka-1$ cat reassign.json
{"topics":[{"topic":"demo"}],
"version":1
}

而後使用 kafka-reassign-partitions.sh 工具生成reassign plan

bin/kafka-reassign-partitions.sh --zookeeper 172.19.0.61:2181 --topics-to-move-json-file
reassign.json --broker-list "0,1,2,3" --generate

命令會輸出兩個字符串,以下所示:

Current partition replica assignment
{"version":1,"partitions":[{"topic":"demo","partition":2,"replicas":
[2,1,0],"log_dirs":["any","any","any"]},
{"topic":"demo","partition":1,"replicas":[0,2,1],"log_dirs":
["any","any","any"]},{"topic":"itheima","partition":0,"replicas":
[1,0,2],"log_dirs":["any","any","any"]},
{"topic":"demo","partition":3,"replicas":[1,2,0],"log_dirs":
["any","any","any"]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"demo","partition":2,"replicas":
[3,1,2],"log_dirs":["any","any","any"]},
{"topic":"demo","partition":1,"replicas":[2,0,1],"log_dirs":
["any","any","any"]},{"topic":"demo","partition":3,"replicas":
[0,2,3],"log_dirs":["any","any","any"]},
{"topic":"demo","partition":0,"replicas":[1,3,0],"log_dirs":
["any","any","any"]}]}

第一個JSON內容爲當前的分區副本分配狀況,第二個爲從新分配的候選方案,注意這裏只是生成一份可行性的方案,並無真正執行重分配的動做。
咱們將第二個JSON內容保存到名爲result.json文件裏面(文件名不重要,文件格式也不必定要以json爲結尾,只要保證內容是json便可),而後執行這些reassign plan。以下所示:

執行分配策略

bin/kafka-reassign-partitions.sh --zookeeper 172.19.0.61:2181 --reassignment-json-file result.json --execute

執行完畢之後,看出分區信息:

修改副本因子

場景
修改副本因子的使用場景也不少,好比在建立主題時填寫了錯誤的副本因子數而須要修改,再好比運行一段時間以後想要經過增長副本因子數來提升容錯性和可靠性。修改副本所以也是經過kafka-reassign-partitions.sh腳本實現的。仔細觀察咱們剛纔對itheima進行分區從新分配之後的結果:

{"version":1,"partitions":[{"topic":"demo","partition":2,"replicas":
[3,1,2],"log_dirs":["any","any","any"]},
{"topic":"demo","partition":1,"replicas":[2,0,1],"log_dirs":
["any","any","any"]},{"topic":"itheima","partition":3,"replicas":
[0,2,3],"log_dirs":["any","any","any"]},
{"topic":"demo","partition":0,"replicas":[1,3,0],"log_dirs":
["any","any","any"]}]}

經過觀察JSON內容裏的replicas都是3個副本。咱們能夠更改指定分區數的副本,具體的實現以下所示:

一、建立一個json文件,定義指定分區的副本數據

{
"version":1,
"partitions":[
        {"topic":"demo","partition":2,"replicas":[0,1]}
]
}

二、而後執行腳本文件

bash-4.4# bin/kafka-reassign-partitions.sh --zookeeper 172.19.0.61:2181
--reassignment-json-file replication-factor.json --execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"demo","partition":2,"replicas":
[3,1,2],"log_dirs":["any","any","any"]},
{"topic":"demo","partition":1,"replicas":[2,0,1],"log_dirs":
["any","any","any"]},{"topic":"itheima","partition":0,"replicas":
[1,3,0],"log_dirs":["any","any","any"]},
{"topic":"demo","partition":3,"replicas":[0,2,3],"log_dirs":
["any","any","any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.

執行完畢之後,咱們再次看出demo的分區副本狀況,以下所示:

相關文章
相關標籤/搜索