消息隊列之 Kafka

Kafka 特色

Kafka 最先是由 LinkedIn 公司開發一種分佈式的基於發佈/訂閱的消息系統,以後成爲 Apache 的頂級項目。主要特色以下:html

1. 同時爲發佈和訂閱提供高吞吐量

Kafka 的設計目標是以時間複雜度爲 O(1) 的方式提供消息持久化能力,即便對TB 級以上數據也能保證常數時間的訪問性能。即便在很是廉價的商用機器上也能作到單機支持每秒 100K 條消息的傳輸。java

2. 消息持久化

將消息持久化到磁盤,所以可用於批量消費,例如 ETL 以及實時應用程序。經過將數據持久化到硬盤以及 replication 防止數據丟失。node

3. 分佈式

支持 Server 間的消息分區及分佈式消費,同時保證每一個 partition 內的消息順序傳輸。這樣易於向外擴展,全部的producer、broker 和 consumer 都會有多個,均爲分佈式的。無需停機便可擴展機器。apache

4. 消費消息採用 pull 模式

消息被處理的狀態是在 consumer 端維護,而不是由 server 端維護,broker 無狀態,consumer 本身保存 offset。bootstrap

5. 支持 online 和 offline 的場景。

同時支持離線數據處理和實時數據處理。後端

Kafka 中的基本概念

1. Broker

Kafka 集羣中的一臺或多臺服務器統稱爲 Broker緩存

2. Topic

每條發佈到 Kafka 的消息都有一個類別,這個類別被稱爲 Topic 。(物理上不一樣 Topic 的消息分開存儲。邏輯上一個 Topic 的消息雖然保存於一個或多個broker上,但用戶只需指定消息的 Topic 便可生產或消費數據而沒必要關心數據存於何處)安全

3. Partition

Topic 物理上的分組,一個 Topic 能夠分爲多個 Partition ,每一個 Partition 是一個有序的隊列。Partition 中的每條消息都會被分配一個有序的 id(offset)bash

4. Producer

消息和數據的生產者,能夠理解爲往 Kafka 發消息的客戶端服務器

5. Consumer

消息和數據的消費者,能夠理解爲從 Kafka 取消息的客戶端

6. Consumer Group

每一個 Consumer 屬於一個特定的 Consumer Group(可爲每一個 Consumer 指定Group Name,若不指定 Group Name 則屬於默認的 Group)。 這是 Kafka 用來實現一個 Topic 消息的廣播(發給全部的 Consumer )和單播(發給任意一個 Consumer )的手段。一個 Topic 能夠有多個 Consumer Group。Topic 的消息會複製(不是真的複製,是概念上的)到全部的 Consumer Group,但每一個 Consumer Group 只會把消息發給該 Consumer Group 中的一個 Consumer。若是要實現廣播,只要每一個 Consumer 有一個獨立的 Consumer Group 就能夠了。若是要實現單播只要全部的 Consumer 在同一個 Consumer Group 。用 Consumer Group 還能夠將 Consumer 進行自由的分組而不須要屢次發送消息到不一樣的 Topic 。

Kafka 安裝

Mac 用戶用 HomeBrew 來安裝,安裝前要先更新 brew

brew update
複製代碼

接着安裝 kafka

brew install kafka
複製代碼

安裝完成以後能夠查看 kafka 的配置文件

cd /usr/local/etc/kafka
複製代碼

kafka 配置文件
個人電腦經過 HomeBrew 安裝的 kafka 位置在 /usr/local/Cellar/kafka/0.11.0.1/bin ,能夠看到 HomeBrew 安裝下來的 kafka 的版本已是 0.11.0.1 的了。

kafka 須要用到 zookeeper,HomeBrew 安裝kafka 的時候會同時安裝 zookeeper。下面先啓動 zookeeper:

zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
複製代碼

接着啓動 kafka

cd /usr/local/Cellar/kafka/0.11.0.1
./bin/kafka-server-start /usr/local/etc/kafka/server.properties
複製代碼

建立 topic,設置 partition 數量爲2,topic 的名字叫 test-topic,下面的例子都用這個 topic

cd /usr/local/Cellar/kafka/0.11.0.1
./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test-topic
複製代碼

查看建立的 topic

cd /usr/local/Cellar/kafka/0.11.0.1
./bin/kafka-topics --list --zookeeper localhost:2181
複製代碼

Kafka 命令行測試

發送消息

cd /usr/local/Cellar/kafka/0.11.0.1/bin
kafka-console-producer --broker-list localhost:9092 --topic test-topic
複製代碼

消費消息

cd /usr/local/Cellar/kafka/0.11.0.1/bin
kafka-console-consumer --bootstrap-server localhost:9092 --topic test-topic --from-beginning
複製代碼

刪除 topic

cd /usr/local/Cellar/kafka/0.11.0.1/bin
./bin/kafka-topics --delete --zookeeper localhost:2181 --topic test-topic
複製代碼

若是 kafka 啓動時加載的配置文件中 server.properties 沒有配置delete.topic.enable=true,那麼此時的刪除並非真正的刪除,而是把 topic 標記爲:marked for deletion

查看全部 topic

cd /usr/local/Cellar/kafka/0.11.0.1/bin
./bin/kafka-topics --zookeeper localhost:2181 --list 
複製代碼

物理刪除 topic

登陸zookeeper客戶端:/usr/local/Cellar/zookeeper/3.4.10/bin/zkCli
找到topic所在的目錄:ls /brokers/topics
找到要刪除的topic,執行命令:rmr /brokers/topics/test-topic 便可,此時topic被完全刪除
複製代碼

Java 客戶端訪問

1. maven工程的pom文件中添加依賴

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.1</version>
</dependency>
複製代碼

2. 消息生產者

package org.study.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.HashMap;
import java.util.Map;

public class ProducerSample {

    public static void main(String[] args) {
        Map<String, Object> props = new HashMap<String, Object>();
        props.put("zk.connect", "127.0.0.1:2181");//zookeeper 的地址
        props.put("bootstrap.servers", "localhost:9092");//用於創建與 kafka 集羣鏈接的 host/port 組。
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        String topic = "test-topic";
        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        producer.send(new ProducerRecord<String, String>(topic, "idea-key2", "java-message 1"));
        producer.send(new ProducerRecord<String, String>(topic, "idea-key2", "java-message 2"));
        producer.send(new ProducerRecord<String, String>(topic, "idea-key2", "java-message 3"));

        producer.close();
    }

}
複製代碼

3. 消息消費者

package org.study.kafka;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class ConsumerSample {

    public static void main(String[] args) {
        String topic = "test-topic";// topic name

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");//用於創建與 kafka 集羣鏈接的 host/port 組。
        props.put("group.id", "testGroup1");// Consumer Group Name
        props.put("enable.auto.commit", "true");// Consumer 的 offset 是否自動提交
        props.put("auto.commit.interval.ms", "1000");// 自動提交 offset 到 zookeeper 的時間間隔,時間是毫秒
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        Consumer<String, String> consumer = new KafkaConsumer(props);
        consumer.subscribe(Arrays.asList(topic));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
        }

    }
}
複製代碼

4. 啓動 zookeeper

zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
複製代碼

5. 啓動 kafka 服務器

kafka-server-start /usr/local/etc/kafka/server.properties
複製代碼

6. 運行 Consumer

先運行 Consumer ,這樣當生產者發送消息的時候能在消費者後端看到消息記錄。

7. 運行 Producer

運行 Producer,發佈幾條消息,在 Consumer 的控制檯能看到接收的消息

Consumer 控制檯

Kafka 集羣配置

kafka 的集羣配置通常有三種,即: single node - single broker ,single node - multiple broker ,multiple node - multiple broker

前兩種實際上官網有介紹

single node - single broker

單節點單 broker

1. 啓動 zookeeper

zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
複製代碼

2. 啓動 kafka broker

kafka-server-start /usr/local/etc/kafka/server.properties
複製代碼

3. 建立一個 kafka topic

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic topic-singlenode-singlebroker
複製代碼

4. 啓動 producer 發送信息

kafka-console-producer --broker-list localhost:9092 --topic topic-singlenode-singlebroker
複製代碼

broker-list 和 topic 這兩個參數是必須的,broker-list 指定要鏈接的 broker 的地址,格式爲 node_address:port 。topic 是必須的,由於須要發送消息給訂閱了該 topic 的 consumer group 。 如今能夠在命令行裏輸入一些信息,每一行會被做爲一個消息。

發送消息

5. 啓動 consumer 消費消息

kafka-console-consumer --bootstrap-server localhost:9092 --topic topic-singlenode-singlebroker
複製代碼

在不一樣的終端窗口裏分別啓動 zookeeper、broker、producer、consumer 後,在 producer 終端裏輸入消息,消息就會在 consumer 終端中顯示了。

消息顯示

single node - multiple broker

單節點多 broker

1. 啓動 zookeeper

zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
複製代碼

2. 啓動broker

若是須要在單個節點(即一臺機子)上面啓動多個 broker(這裏做爲例子啓動三個 broker),須要準備多個server.properties文件便可,因此須要複製 /usr/local/etc/kafka/server.properties 文件。由於須要爲每一個 broker 指定單獨的屬性配置文件,其中 broker.id 、 port 、 log.dir 這三個屬性必須是不一樣的。

新建一個 kafka-example 目錄和三個存放日誌的目錄

mkdir kafka-example
mkdir kafka-logs-1
mkdir kafka-logs-2
mkdir kafka-logs-3
複製代碼

複製 /usr/local/etc/kafka/server.properties 文件三份

cp server.properties /Users/niwei/Downloads/kafka-example/server-1.properties
cp server.properties /Users/niwei/Downloads/kafka-example/server-2.properties
cp server.properties /Users/niwei/Downloads/kafka-example/server-3.properties
複製代碼

在 broker1 的配置文件 server-1.properties 中,相關要修改的參數爲:

broker.id=1
port=9093
log.dirs=/Users/niwei/Downloads/kafka-example/kafka-logs-1
複製代碼

broker2 的配置文件 server-2.properties 中,相關要修改的參數爲:

broker.id=2
port=9094
log.dirs=/Users/niwei/Downloads/kafka-example/kafka-logs-2
複製代碼

broker3 的配置文件 server-3.properties 中,相關要修改的參數爲:

broker.id=3
port=9095
log.dirs=/Users/niwei/Downloads/kafka-example/kafka-logs-3
複製代碼

啓動每一個 broker

cd /Users/niwei/Downloads/kafka-example
kafka-server-start server-1.properties
kafka-server-start server-2.properties
kafka-server-start server-3.properties
複製代碼

3. 建立 topic

建立一個名爲 topic-singlenode-multiplebroker 的topic

kafka-topics --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic topic-singlenode-multiplebroker
複製代碼

4. 啓動 producer 發送信息

若是一個 producer 須要鏈接多個 broker 則須要傳遞參數 broker-list

kafka-console-producer --broker-list localhost:9093, localhost:9094, localhost:9095 --topic topic-singlenode-multiplebroker
複製代碼

5. 啓動 consumer 消費消息

kafka-console-consumer --zookeeper localhost:2181 --topic topic-singlenode-multiplebroker
複製代碼

單節點多 broker 消費消息

multiple node - multiple broker

多節點多 broker
在多節點多 broker 集羣中,每一個節點都須要安裝 Kafka,且全部的 broker 都鏈接到同一個 zookeeper 。這裏 zookeeper 固然也是能夠配置成集羣方式的,具體步驟參見我以前寫的 搭建 zookeeper 集羣

1. Kafka 的集羣配置

broker.id=1  #當前機器在集羣中的惟一標識
port=9093 #當前 kafka 對外提供服務的端口,默認是 9092
host.name=192.168.121.101 #這個參數默認是關閉的,在0.8.1有個bug,DNS解析問題,失敗率的問題。
log.dirs=/Users/niwei/Downloads/kafka-example/kafka-logs-1 #消息存放的目錄,這個目錄能夠配置爲逗號分割的表達式
zookeeper.connect=192.168.120.101:2181,192.168.120.102:2181,192.168.120.103:2181 #設置 zookeeper 集羣的鏈接端口

num.network.threads=3 #這個是 borker 進行網絡處理的線程數
num.io.threads=5 #這個是 borker 進行 IO 處理的線程數
socket.send.buffer.bytes=102400 #發送緩衝區的大小,數據先回存儲到緩衝區了到達必定的大小後在發送能提升性能
socket.receive.buffer.bytes=102400 #接收緩衝區的大小,當數據到達必定大小後在序列化到磁盤
socket.request.max.bytes=104857600 #這個參數是向 kafka 請求消息或者向 kafka 發送消息的請求的最大數,這個值不能超過 jvm 的堆棧大小
num.partitions=1 #默認的分區數,一個 topic 默認1個分區數
log.retention.hours=24 #默認消息的最大持久化時間,24小時
message.max.byte=5242880  #消息保存的最大值5M
default.replication.factor=2  #kafka 保存消息的副本數,若是一個副本失效了,另外一個還能夠繼續提供服務
replica.fetch.max.bytes=5242880  #取消息的最大直接數
log.segment.bytes=1073741824 #這個參數是由於 kafka 的消息是以追加的形式落地到文件,當超過這個值的時候,kafka 會新建一個文件
log.retention.check.interval.ms=300000 #每隔 300000 毫秒去檢查上面配置的 log 失效時間(log.retention.hours=24 ),到目錄查看是否有過時的消息若是有則刪除
log.cleaner.enable=false #是否啓用 log 壓縮,通常不用啓用,啓用的話能夠提升性能
複製代碼

因爲是多節點多 broker 的,因此每一個 broker 的配置文件 server.properties 都要按以上說明修改

2. producer 的配置修改

kafka-console-producer --broker-list 192.168.21.1:9092,192.168.21.2:9092,192.168.21.3:9092 --topic topic-multiplenode-multiplebroker
複製代碼

3. consumer 的配置修改

kafka-console-consumer --zookeeper 192.168.120.101:2181,192.168.120.102:2181,192.168.120.103:2181 --topic topic-multiplenode-multiplebroker
複製代碼

Kafka 高可靠性配置

Kafka 提供了很高的數據冗餘彈性,對於須要數據高可靠性的場景能夠增長數據冗餘備份數(replication.factor),調高最小寫入副本數的個數(min.insync.replicas)等等,可是這樣會影響性能。反之,性能提升而可靠性則下降,用戶須要自身業務特性在彼此之間作一些權衡性選擇。

要保證數據寫入到 Kafka 是安全的、高可靠的,須要以下的配置:

1. topic 的配置

replication.factor>=3,即副本數至少是3個2<=min.insync.replicas<=replication.factor

2. broker 的配置

leader 的選舉條件 unclean.leader.election.enable=false

3. producer 的配置

request.required.acks=-1,producer.type=sync

Kafka 高吞吐量的祕訣

消息中間件從功能上看就是寫入數據、讀取數據兩大類,優化也能夠從這兩方面來看。

爲了優化寫入速度 Kafak 採用如下技術:

1. 順序寫入

磁盤大多數都仍是機械結構(SSD不在討論的範圍內),若是將消息以隨機寫的方式存入磁盤,就須要按柱面、磁頭、扇區的方式尋址,緩慢的機械運動(相對內存)會消耗大量時間,致使磁盤的寫入速度與內存寫入速度差好幾個數量級。爲了規避隨機寫帶來的時間消耗,Kafka 採起了順序寫的方式存儲數據,以下圖所示:

順序寫
每條消息都被append 到該 partition 中,屬於順序寫磁盤,所以效率很是高。 但這種方法有一個缺陷:沒有辦法刪除數據。因此Kafka是不會刪除數據的,它會把全部的數據都保留下來,每一個消費者(Consumer)對每一個 Topic 都有一個 offset 用來表示讀取到了第幾條數據。
消費消息
上圖中有兩個消費者,Consumer1 有兩個 offset 分別對應 Partition0、Partition1(假設每個 Topic 一個 Partition )。Consumer2 有一個 offset 對應Partition2 。這個 offset 是由客戶端 SDK 保存的,Kafka 的 Broker 徹底無視這個東西的存在,通常狀況下 SDK 會把它保存到 zookeeper 裏面。 若是不刪除消息,硬盤確定會被撐滿,因此 Kakfa 提供了兩種策略來刪除數據。一是基於時間,二是基於 partition 文件大小,具體配置能夠參看它的配置文檔。 即便是順序寫,過於頻繁的大量小 I/O 操做同樣會形成磁盤的瓶頸,因此 Kakfa 在此處的處理是把這些消息集合在一塊兒批量發送,這樣減小對磁盤 I/O 的過分操做,而不是一次發送單個消息。

2. 內存映射文件

即使是順序寫入硬盤,硬盤的訪問速度仍是不可能追上內存。因此 Kafka 的數據並非實時的寫入硬盤,它充分利用了現代操做系統分頁存儲來利用內存提升I/O效率。Memory Mapped Files (後面簡稱mmap)也被翻譯成內存映射文件,在64位操做系統中通常能夠表示 20G 的數據文件,它的工做原理是直接利用操做系統的 Page 來實現文件到物理內存的直接映射。完成映射以後對物理內存的操做會被同步到硬盤上(由操做系統在適當的時候)。 經過 mmap 進程像讀寫硬盤同樣讀寫內存,也沒必要關心內存的大小,有虛擬內存爲咱們兜底。使用這種方式能夠獲取很大的 I/O 提高,由於它省去了用戶空間到內核空間複製的開銷(調用文件的 read 函數會把數據先放到內核空間的內存中,而後再複製到用戶空間的內存中) 但這樣也有一個很明顯的缺陷——不可靠,寫到 mmap 中的數據並無被真正的寫到硬盤,操做系統會在程序主動調用 flush 的時候才把數據真正的寫到硬盤。因此 Kafka 提供了一個參數—— producer.type 來控制是否是主動 flush,若是Kafka 寫入到 mmap 以後就當即 flush 而後再返回 Producer 叫同步(sync);若是寫入 mmap 以後當即返回,Producer 不調用 flush ,就叫異步(async)。

3. 標準化二進制消息格式

爲了不無效率的字節複製,尤爲是在負載比較高的狀況下影響是顯著的。爲了不這種狀況,Kafka 採用由 Producer,Broker 和 Consumer 共享的標準化二進制消息格式,這樣數據塊就能夠在它們之間自由傳輸,無需轉換,下降了字節複製的成本開銷。

而在讀取速度的優化上 Kafak 採起的主要是零拷貝

零拷貝(Zero Copy)的技術:

傳統模式下咱們從硬盤讀取一個文件是這樣的

文件傳輸到 Socket 的常規方式
(1) 操做系統將數據從磁盤讀到內核空間的頁緩存區

(2) 應用將數據從內核空間讀到用戶空間的緩存中

(3) 應用將數據寫會內核空間的套接字緩存中

(4)操做系統將數據從套接字緩存寫到網卡緩存中,以便將數據經網絡發出

這樣作明顯是低效的,這裏有四次拷貝,兩次系統調用。 針對這種狀況 Unix 操做系統提供了一個優化的路徑,用於將數據從頁緩存區傳輸到 socket。在 Linux 中,是經過 sendfile 系統調用來完成的。Java提供了訪問這個系統調用的方法:FileChannel.transferTo API。這種方式只須要一次拷貝:操做系統將數據直接從頁緩存發送到網絡上,在這個優化的路徑中,只有最後一步將數據拷貝到網卡緩存中是須要的。

零拷貝方式傳輸到 Socket
這個技術其實很是廣泛,The C10K problem 裏面也有很詳細的介紹,Nginx 也是用的這種技術,稍微搜一下就能找到不少資料。

Kafka 速度的祕訣在於它把全部的消息都變成一個的文件。經過 mmap 提升 I/O 的速度,寫入數據的時候是末尾添加因此速度最優;讀取數據的時候配合sendfile 直接暴力輸出。因此單純的去測試 MQ 的速度沒有任何意義,Kafka 的這種暴力的作法已經脫了 MQ 的底褲,更像是一個暴力的數據傳送器。

相關文章
相關標籤/搜索