Kafka 分佈式發佈-訂閱消息系統

1. Kafka 概述

1.1什麼是 Kafka

Apache Kafka 是分佈式發佈-訂閱消息系統(消息中間件)。它最初由 LinkedIn 公司開發,之後成爲 Apache 項目的一部分。Kafka 是一種快速、可擴展的、設計內在就是分佈式的,分區的和可複製的提交日誌服務。html

 

簡單說明什麼是Kafka:java

舉個例子,生產者消費者,生產者生產雞蛋,消費者消費雞蛋,生產者生產一個雞蛋, 消費者就消費一個雞蛋,假設消費者消費雞蛋的時候噎住了(系統宕機了),生產者還在生 產雞蛋,那新生產的雞蛋就丟失了。再好比生產者很強勁(大交易量的狀況),生產者1秒鐘生產100 個雞蛋,消費者1 秒鐘只能吃50 個雞蛋,那要不了一會,消費者就吃不消了

(消息堵塞,最終致使系統超時),消費者拒絕再吃了,」雞蛋「又丟失了,這個時候咱們

放個籃子在它們中間,生產出來的雞蛋都放到籃子裏,消費者去籃子裏拿雞蛋,這樣雞蛋就 不會丟失了,都在籃子裏,而這個籃子就是」Kafka「。 雞蛋其實就是「數據流」,系統之間的交互都是經過「數據流」來傳輸的(就是tcp、http 什麼的),也稱爲報文,也叫「消息」。 消息隊列滿了,其實就是籃子滿了,」雞蛋「 放不下了,那趕忙多放幾個籃子,其實就是Kafka 的擴容。Kafka 就是例子中的"籃子"。

 

傳統消息中間件服務 RabbitMQ、Apache ActiveMQ 等。node

Apache Kafka 與傳統消息系統相比,有如下不一樣:算法

  1.它是分佈式系統,易於向外擴展;apache

  2.它同時爲發佈和訂閱提供高吞吐量;bootstrap

  3.它支持多訂閱者,當失敗時能自動平衡消費者;服務器

  4.它將消息持久化到磁盤,所以可用於批量消費,例如 ETL,以及實時應用程序。app

1.2Kafka 術語

術語tcp

解釋分佈式

Broker

Kafka 集羣包含一個或多個服務器,這種服務器被稱爲 broker

Topic

每條發佈到 Kafka 集羣的消息都有一個類別,這個類別被稱爲 Topic。(物

理上不一樣 Topic 的消息分開存儲,邏輯上一個 Topic 的消息雖然保存於一個或多個 broker 上但用戶只需指定消息的 Topic 便可生產或消費數據而沒必要關心數據存於何處)

Partition

Partition 是物理上的概念,每一個 Topic 包含一個或多個 Partition.

Producer

負責發佈消息到 Kafka broker

Consumer

消息消費者,向 Kafka broker 讀取消息的客戶端

Consumer Group

每一個 Consumer 屬於一個特定的 Consumer Group(可爲每一個 Consumer

指定 group name,若不指定 group name 則屬於默認的 group)

replica

partition  的副本,保障 partition  的高可用

leader

replica  中的一個角色, producer  和 consumer  只跟 leader  交互

follower

replica  中的一個角色,從 leader  中複製數據

controller

Kafka  集羣中的其中一個服務器,用來進行 leader election  以及各類

failover

小白理解:

  producer:生產者,就是它來生產「雞蛋」的。

  consumer:消費者,生出的「雞蛋」它來消費。

  topic把它理解爲標籤,生產者每生產出來一個雞蛋就貼上一個標籤(topic),消費者可不是誰生產的「雞蛋」都吃的,這樣不一樣的生產者生產出來的「雞蛋」,消費者就能夠選擇性的「吃」了。

  broker:就是籃子了。

若是從技術角度,topic標籤實際就是隊列,生產者把全部「雞蛋(消息)」都放到對應的隊列裏了,消費者到指定的隊列裏取。

 

2. Kafka 安裝

2.1下載

Apache kafka 官方: http://kafka.apache.org/downloads.html

Scala 2.11  - kafka_2.11-0.10.2.0.tgz (asc, md5)

2.1. Kafka 集羣安裝

  2.1.1. 安裝 JDK &配置 JAVA_HOME

  2.1.2. 安裝 Zookeeper

  參照 Zookeeper 官網搭建一個 ZK 集羣, 並啓動 ZK 集羣。

  2.1.3. 解壓 Kafka 安裝包

    2.1.3.1. 修改配置文件 config/server.properties

vi  server.properties
broker.id=0    //爲依次增加的:0、一、二、三、4,集羣中惟一id
log.dirs=/kafkaData/logs // Kafka 的消息數據存儲路徑zookeeper.connect=master:2181,slave1:2181,slave2:2181   //zookeeperServers   列表,各節點以逗號分開

Vi  zookeeper.properties
dataDir=/root/zkdata #指向你安裝的zk 的數據存儲目錄

#  將 Kafka server.properties    zookeeper.properties    文件拷貝到其餘節點機器
KAFKA_HOME/config>scp server.properties    zookeeper.properties xx:$PWD

    2.1.3.2. 啓動 Kafka

    在每臺節點上啓動:

    bin/kafka-server-start.sh -daemon config/server.properties &

    2.1.3.3. 測試集羣

      1-進入 kafka 根目錄,建立 Topic 名稱爲: test 的主題

    bin/kafka-topics.sh --create --zookeeper hadoop:2181,hadoop001:2181,hadoop002:2181 --replication-factor 3 --partitions 3 --topic testTopic

      2-列出已建立的 topic 列表

    bin/kafka-topics.sh --list --zookeeper localhost:2181

      3-查看 Topic 的詳細信息

    bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test Topic:test PartitionCount:1 ReplicationFactor:3 Configs:

    Topic: test Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0

    第一行是對全部分區的一個描述,而後每一個分區對應一行,由於只有一個分區因此下面一行。

      leader:負責處理消息的讀和寫,leader 是從全部節點中隨機選擇的.

      replicas:列出了全部的副本節點,無論節點是否在服務中.

      isr:是正在服務中的節點.

    在例子中,節點 1 是做爲 leader 運行。

      4-模擬客戶端去發送消息

    bin/kafka-console-producer.sh --broker-list hadoop:9092,hadoop001:9092 --topic test

      5-模擬客戶端去接受消息

    bin/kafka-console-consumer.sh --bootstrap-server hadoop:9092 --from-beginning --topic hellotopic

      6-測試一下容錯能力.

    Kill -9 pid[leader 節點]

    另一個節點被選作了 leader,node 1 再也不出如今 in-sync 副本列表中: bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test Topic:test PartitionCount:1 ReplicationFactor:3 Configs:

    Topic: test Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0

    雖然最初負責續寫消息的 leader down 掉了,但以前的消息仍是能夠消費的:

    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic test

3. Kafka 客戶端開發

  3.1. Java Client

  3.1.1. 添加 pom.xml 依賴

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.2.1</version>
</dependency>
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.1</version> </dependency>

  3.1.2. Producer  生產者

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
/**
 * kafka 生產端Api開發
 */
public class ProducerApi {
    public static void main(String[] args) throws Exception{
        Properties props = new Properties();
        props.setProperty("bootstrap.servers","hadoop:9092,hadoop001:9092,hadoop002:9092");
        props.setProperty("key.serializer",StringSerializer.class.getName());
        props.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        /**
         * 發送數據的時候是否須要應答
         * 取值範圍:
         * [all,-1,0,1]
         * 0:leader不作任何應答
         * 1:leader會給producer作出應答
         * all、-1:fllower->leader->producer
         * 默認值: 1
         */
        //props.setProperty("acks","1")
        /**
         * 自定義分區
         * 默認值:org.apache.kafaka.clients.producer.internals.DefaultPartitoner
         */
        //props.setProperty("partitioner.class","org.apache.kafaka.clients.producer.internals.DefaultPartitoner");

        //建立一個生產者的客戶端實例
        KafkaProducer<Object, Object> kafkaproducer = new KafkaProducer<>(props);
        int count=0;
        while (count<1000){
            int partitionNum=count%3;
            //封裝一條消息
            ProducerRecord record = new ProducerRecord("testTopic", partitionNum, "", count+"");
            //發送一條消息
            kafkaproducer.send(record);
            count++;
            Thread.sleep(1*1000);
        }
        //釋放
        kafkaproducer.close();
    }
}
View Code

  3.1.3. Consumer  消費者

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Properties;
/**
 * 消費端Api開發
 */
public class ConsumerApi {
    public static void main(String[] args) {
        Properties config = new Properties();
        HashMap<String, Object> props = new HashMap<>();
        config.put("bootstrap.servers","hadoop:9092,hadoop001:9092,hadoop002:9092");
        config.put("key.deserializer",StringDeserializer.class.getName());
        config.put("value.deserializer",StringDeserializer.class.getName());
        config.put("group.id","day12_001");
        /**
         * 從哪一個位置開始獲取數據
         * 取值範圍:
         * [latest,earliest,none]
         * 默認值:
         * latest
         */
        config.put("auto.offset.reset","earliest");
        /**
         * 是否要自動遞交偏移量(offset)這條數據在某個分區所在位置的編號
         */
        config.put("enable.auto.commit",true);
        /**
         * 設置500毫秒遞交一次offset值
         */
        config.put("auto.commit.interval.ms",500);
        //建立一個客戶端實例
        KafkaConsumer<Object, Object> kafkaConsumer = new KafkaConsumer<>(config);
        //訂閱主題
        kafkaConsumer.subscribe(Arrays.asList("testTopic"));

        while (true){
            //拉取數據,會從kafka因此分區下拉取數據
            ConsumerRecords<Object, Object> records = kafkaConsumer.poll(2000);
            Iterator<ConsumerRecord<Object, Object>> iterator = records.iterator();
            while (iterator.hasNext()){
                ConsumerRecord<Object, Object> record = iterator.next();
                System.out.println("record"+record);
            }
        }
    }
}
View Code

4. Kafka 原理

4.1. Kafka 的拓撲結構

如上圖所示,一個典型的 Kafka 集羣中包含若干 Producer,若干 broker(Kafka 支持水平擴展, 通常 broker 數量越多,集羣吞吐率越高),若干 Consumer Group,以及一個 Zookeeper 集羣。Kafka 經過 Zookeeper 管理集羣配置,選舉 leader。Producer 使用 push 模式將消息發佈 broker,Consumer 使用 pull 模式從 broker 訂閱並消費消息。 

4.2. Zookeeper 節點

4.3. Producer 發佈消息

  • producer  採用 push  模式將消息發佈到 broker,每條消息都被 append   partition

 中,屬於順序寫磁盤。

  主題是發佈記錄的類別或訂閱源名稱。Kafka的主題老是多用戶; 也就是說,一個主題能夠有零個,一個或多個消費者訂閱寫入它的數據。

對於每一個主題,Kafka羣集都維護一個以下所示的分區日誌:


  • producer 發送消息到 broker 時,會根據分區算法選擇將其存儲到哪個 partition。

  1.指定了 partition,則直接使用;

  2.未指定 partition  但指定 key,經過對 key   value  進行 hash  選出一個 partition

  3.partition   key  都未指定,使用輪詢選出一個 partition  。

4.3.1. 寫數據流程

  1. producer 先從 zookeeper  "/brokers/.../state" 節點找到該 partition  leader
  2. producer 將消息發送給該 leader
  3. leader  將消息寫入本地 log
  4. followers   leader pull  消息,寫入本地 log   leader  發送 ACK
  5. leader  收到全部 ISR(in-sync replicas) 中的 replica   ACK  後向 producer  發送 ACK

 

4.4. Broker 存儲消息

4.4.1. 消息存儲方式

物理上把 topic  分紅一個或多個 partition(對應 server.properties  中的 num.partitions=3  配置),每一個 partition 物理上對應一個文件夾(該文件夾存儲該 partition 的全部消息和索引文件),以下:

4.4.2. 消息存儲策略

不管消息是否被消費,kafka 都會保留全部消息。有兩種策略能夠刪除舊數據:

log.retention.hours=168 #基於時間

log.retention.bytes=1073741824 #基於大小 

4.5. Kafka log 的存儲解析

Partition 中的每條 Message offset 來表示它在這個 partition 中的偏移量,這個 offset 不是 Message partition 數據文件中的實際存儲位置,而是邏輯上一個值,它惟一肯定了

partition 中的一條 Message。所以,能夠認爲 offset partition Message id。partition

中的每條 Message 包含了如下三個屬性:

  offset

  MessageSize

  data

其中 offset long 型,MessageSize int32,表示 data 有多大,data message 的具體內容。

 

咱們來思考一下,若是一個partition 只有一個數據文件會怎麼樣?

1) 新數據是添加在文件末尾,不論文件數據文件有多大,這個操做永遠都是高效的。

2) 查找某個offset  的Message是順序查找的。所以,若是數據文件很大的話,查找的效率就低。

 

Kafka 是如何解決查找效率的的問題呢?有兩大法寶:1)  分段 2)  索引。

Ø 數據文件的分段

Kafka 解決查詢效率的手段之一是將數據文件分段,好比有 100 Message,它們的 offset 是從 0 99。假設將數據文件分紅 5 段,第一段爲 0-19,第二段爲 20-39,以此類推,每段放在一個單獨的數據文件裏面,數據文件以該段中最小的 offset 命名。這樣在查找指定 offset Message 的時候,用二分查找就能夠定位到該 Message 在哪一個段中。

Ø 爲數據文件建索引

數據文件分段使得能夠在一個較小的數據文件中查找對應 offset Message 了,可是這依然須要順序掃描才能找到對應 offset Message。爲了進一步提升查找的效率,Kafka 爲每一個分段後的數據文件創建了索引文件,文件名與數據文件的名字是同樣的,只是文件擴展名爲.index。

索引文件中包含若干個索引條目,每一個條目表示數據文件中一條 Message 的索引。索引包含兩個部分,分別爲相對 offset position。

  1.相對 offset:由於數據文件分段之後,每一個數據文件的起始 offset 不爲 0,相對 offset 表示這條 Message 相對於其所屬數據文件中最小的 offset 的大小。舉例,分段後的一個數據文件的 offset 是從 20 開始,那麼 offset  25  Message  index 文件中的相對 offset 就是 25-20 = 5。存儲相對 offset 能夠減少索引文件佔用的空間。

  2.position,表示該條 Message 在數據文件中的絕對位置。只要打開文件並移動文件指針到這個 position 就能夠讀取對應的 Message 了。

index 文件中並無爲數據文件中的每條 Message 創建索引,而是採用了稀疏存儲的方式, 每隔必定字節的數據創建一條索引。這樣避免了索引文件佔用過多的空間,從而能夠將索引   文件保留在內存中。但缺點是沒有創建索引的 Message 也不能一次定位到其在數據文件的位置,從而須要作一次順序掃描,可是此次順序掃描的範圍就很小了。

 

  咱們以幾張圖來總結一下 Message 是如何在 Kafka 中存儲的,以及如何查找指定 offset 的Message 的。

  Message 是按照 topic 來組織,每一個 topic 能夠分紅多個的 partition,好比:有 5 partition的名爲爲 page_visits topic 的目錄結構爲:

partition 是分段的,每一個段叫 Segment,包括了一個數據文件和一個索引文件,下圖是某個partition 目錄下的文件:

能夠看到,這個 partition 4 Segment。圖示 Kafka 是如何查找 Message 的。

 

好比:要查找絕對 offset 7 Message:

首先是用二分查找肯定它是在哪一個 LogSegment 中,天然是在第一個 Segment 中。

打開這個 Segment index 文件,也是用二分查找找到 offset 小於或者等於指定 offset 的索引條目中最大的那個 offset。天然 offset 6 的那個索引是咱們要找的,經過索引文件咱們知道 offset 6 Message 在數據文件中的位置爲 9807。

打開數據文件,從位置爲9807 的那個地方開始順序掃描直到找到offset 爲7 的那條Message。這套機制是創建在 offset 是有序的。索引文件被映射到內存中,因此查找的速度仍是很快的。

 

一句話,Kafka Message 存儲採用了分區(partition),分段(LogSegment)和稀疏索引這幾個手段來達到了高效性。

相關文章
相關標籤/搜索