kafka消息隊列

kafka自己就是LinkIn公司開發用於日誌系統的,因此其文件叫作log
php

  1. 點對對與發佈訂閱的區別java

    1.1 點對點模式,生產者發送一條消息到queue,只有一個消費者能收到。 python

    1.2 發佈訂閱模式 nginx

    發佈者發送到topic的消息,只有訂閱了topic的訂閱者纔會收到消息
    rabbitMQ中實現發佈訂閱模式
    當RabbitMQ須要支持多訂閱時,發佈者發送的消息經過路由同時寫到多個Queue,不一樣訂閱組消費此消息。
    kafka發佈訂閱模式
    Kafka只支持r息持久化,消費端爲拉模型,消費狀態和訂閱關係由客戶端端負責維護,消息消費完後不會當即刪除,會保留歷史消息。所以支持多訂閱時,消息只會存儲一份就能夠了。

  2. kafka背景介紹
    kafka是最初由Linkedin公司開發,使用Scala語言編寫,Kafka是一個分佈式、分區的、多副本的、多訂閱者的日誌系統(分佈式MQ系統),能夠用於web/nginx日誌,搜索日誌,監控日誌,訪問日誌等等。
    kafka目前支持多種客戶端語言:java,python,c++,php等等。
    c++

  3. kafka高吞吐量的設計
    數據磁盤持久化:消息不在內存中cache,直接寫入到磁盤,充分利用磁盤的順序讀寫性能。
    zero-copy:減小IO操做步驟。
    支持數據批量發送和拉取。
    支持數據壓縮。
    Topic劃分爲多個partition,提升並行處理能力。
    web

  4. kafka信息存儲
    Kafka中的Message是以topic爲基本單位的,不一樣的topic之間是相互獨立的。每一個topic又能夠分紅幾個不一樣的partition(在建立topic時指定的),每一個partition存儲一部分Message。關係以下圖
    apache

    partition是以文件的形式存儲在文件系統中,好比,建立了一個名爲t101的topic,其有5個partition,那麼在Kafka的數據目錄中(log.dirs指定的)中就有這樣4個目錄: t101-0,t101-1,t101-2,t101-3 ,其命名規則爲<topic_name>-<partition_id>,裏面存儲的分別就是這4個partition的數據。 建立命令以下:
    bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 4 --topic t101
    bootstrap

    新數據是添加在文件末尾,不論文件數據文件有多大,這個操做永遠都是O(1)的。
    查找某個offset的Message是順序查找的。所以,若是數據文件很大的話,查找的效率就低。
    爲解決查找效率低的問題,kafka採用分段和索引

4.1 數據文件分段
好比有100條Message,它們的offset是從0到99。假設將數據文件分紅5段,第一段爲0-19,第二段爲20-39,以此類推,每段放在一個單獨的數據文件裏面,數據文件以該段中最小的offset命名。這樣在查找指定offset的Message的時候,用二分查找就能夠定位到該Message在哪一個段中。bash

4.2 爲數據文件建索引
數據文件分段使得能夠在一個較小的數據文件中查找對應offset的Message了,可是這依然須要順序掃描才能找到對應offset的Message。爲了進一步提升查找的效率,Kafka爲每一個分段後的數據文件創建了索引文件,文件名與數據文件的名字是同樣的,只是文件擴展名爲.index。
索引文件中包含若干個索引條目,每一個條目表示數據文件中一條Message的索引。索引包含兩個部分,分別爲相對offset和position。
服務器

#相對offset:由於數據文件分段之後,每一個數據文件的起始offset不爲0,相對offset表示這條Message相對於其所屬數據文件中最小的offset的大小。舉例,分段後的一個數據文件的offset是從20開始,那麼offset爲25的Message在index文件中的相對offset就是25-20=5。存儲相對offset可減小索引文件佔用的空間 。
#position,表示該條Message在數據文件中的絕對位置。只要打開文件並移動文件指針到這個position就能夠讀取對應的Message了。
index文件中並無爲數據文件中的每條Message創建索引,而是採用了稀疏存儲的方式,每隔必定字節的數據創建一條索引。這樣避免了索引文件佔用過多的空間,從而能夠將索引文件保留在內存中。缺點是沒有創建索引的Message也不能一次定位到其在數據文件的位置,從而須要作一次順序掃描,可是此次順序掃描的範圍就很小了。
4.3 查找message原理圖

1)首先是用二分查找肯定它是在哪一個LogSegment中,天然是在第一個Segment中。 2)打開這個Segment的index文件,也是用二分查找找到offset小於或者等於指定offset的索引條目中最大的那個offset。天然offset爲6的那個索引是咱們要找的,經過索引文件咱們知道offset爲6的Message在數據文件中的位置爲9807。
3)打開數據文件,從位置爲9807的那個地方開始順序掃描直到找到offset爲7的那條Message。

Kafka的Message存儲採用了分區(partition),分段(LogSegment)和稀疏索引這幾個手段來達到了高效性

  1. zookeeper在kafka中的做用
    其中5.五、5.六、5.7的是老版本的設計方式,新的版本偏移量已經不在存在zookeeper中。
    5.1 管理broker集羣
    Broker是分佈式部署而且相互之間相互獨立,可是須要有一個註冊系統可以將整個集羣中的Broker管理起來。
    在Zookeeper上會有一個專門用來進行Broker服務器列表記錄的節點:
    /brokers/ids
    每一個broker啓動的時候都會在zookeeper上進行註冊。
    Kafka使用了全局惟一的數字來指代每一個Broker服務器,不一樣的Broker必須使用不一樣的Broker ID進行註冊,建立完節點後,每一個Broker就會將本身的IP地址和端口信息記錄到該節點中去。其中,Broker建立的節點類型是臨時節點,一旦Broker宕機,則對應的臨時節點也會被自動刪除。
    5.2 管理topic信息
    在Kafka中,同一個Topic的消息會被分紅多個分區並將其分佈在多個Broker上,這些分區信息及與Broker的對應關係也都是由Zookeeper在維護,由專門的節點來記錄,如:
    /borkers/topics
    Kafka中每一個Topic都會以/brokers/topics/[topic]的形式被記錄,Broker服務器啓動後,會到對應Topic節點(/brokers/topics)上註冊本身的Brokerid並寫入針對該Topic的分區總數,一樣,這個分區節點也是臨時節點。
    5.3 生產者負載均衡

    因爲同一個Topic消息會被分區並將其分佈在多個Broker上,所以,生產者須要將消息合理地發送到這些分佈式的Broker上,那麼如何實現生產者的負載均衡,Kafka支持傳統的四層負載均衡,也支持Zookeeper方式實現負載均衡。
    (1) 四層負載均衡,根據生產者的IP地址和端口來爲其肯定一個相關聯的Broker。一般,一個生產者只會對應單個Broker,而後該生產者產生的消息都發往該Broker。這種方式邏輯簡單,每一個生產者不須要同其餘系統創建額外的TCP鏈接,只須要和Broker維護單個TCP鏈接便可。可是,其沒法作到真正的負載均衡,由於實際系統中的每一個生產者產生的消息量及每一個Broker的消息存儲量都是不同的,若是有些生產者產生的消息遠多於其餘生產者的話,那麼會致使不一樣的Broker接收到的消息總數差別巨大,同時,生產者也沒法實時感知到Broker的新增和刪除。
    (2) 使用Zookeeper進行負載均衡,因爲每一個Broker啓動時,都會完成Broker註冊過程,生產者會經過該節點的變化來動態地感知到Broker服務器列表的變動,實現動態的負載均衡機制。
    Kafka的生產者會對ZooKeeper上的「Broker的新增與減小」、「Topic的新增和減小」和「Broker和Topic關聯關係的變化」等事件註冊Watcher監聽
    經過ZooKeeper的Watcher通知可以讓生產者動態的獲取Broker和Topic的變化狀況。
    5.4 消費者負載均衡
    與生產者相似,Kafka中的消費者一樣須要進行負載均衡來實現多個消費者合理地從對應的Broker服務器上接收消息,每一個消費者分組包含若干消費者,每條消息都只會發送給分組中的一個消費者,不一樣的消費者分組消費本身特定的Topic下面的消息。

5.5 zookeeper記錄分區與消費者的關係
對於每一個消費者組 (Consumer Group),Kafka都會爲其分配一個全局惟一的Group ID,Group內部的全部消費者共享該ID。
同時,Kafka爲每一個消費者分配一個Consumer ID,一般採用"Hostname:UUID"形式表示。
在Kafka中,規定了每一個消息分區 只能被同組的一個消費者進行消費,所以,須要在 Zookeeper 上記錄 消息分區 與 Consumer 之間的關係,每一個消費者一旦肯定了對一個消息分區的消費權力,須要將其Consumer ID 寫入到 Zookeeper 對應消息分區的臨時節點上,例如:
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]

節點內容就是該消息分區上消費者的Consumer ID

5.6 消息消費進度Offset記錄
在消費者對指定消息分區進行消息消費的過程當中,須要定時地將分區消息的消費進度Offset記錄到Zookeeper上,以便在該消費者進行重啓或者其餘消費者從新接管該消息分區的消息消費後,可以從以前的進度開始繼續進行消息消費。Offset在Zookeeper中由一個專門節點進行記錄,其節點路徑爲:
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
節點內容就是Offset的值

5.7 消費者註冊
消費者服務器在初始化啓動時加入消費者分組的步驟以下
註冊到消費者分組。每一個消費者服務器啓動時,都會到Zookeeper的指定節點下建立一個屬於本身的消費者節點,例如/consumers/[group_id]/ids/[consumer_id],完成節點建立後,消費者就會將本身訂閱的Topic信息寫入該臨時節點。
對消費者分組中的消費者的變化註冊監聽。每一個消費者都須要關注所屬消費者分組中其餘消費者服務器的 變化狀況, 即對/consumers/[group_id]/ids節點註冊子節點變化的Watcher監聽,一旦發現消費者新增或減小,就觸發消費者的負載均衡。
對Broker服務器變化註冊監聽。消費者須要對/broker/ids/[0-N]中的節點進行監聽,若是發現Broker服務器列表發生變化,那麼就根據具體狀況來決定是否須要進行消費者負載均衡。
進行消費者負載均衡。爲了讓同一個Topic下不一樣分區的消息儘可能均衡地被多個消費者消費而進行消費者與消息 分區分配的過程,一般,對於一個消費者分組,若是組內的消費者服務器發生變動或Broker服務器發生變動,會發出消費者負載均衡。

  1. 新版本消費位移存儲
    老版本的消費位移信息是存儲的zookeeper 中的, 可是zookeeper 並不適合頻繁的寫入查詢操做, 因此在新版本的中消費位移信息存放在了__consumer_offsets內置topic中。
    能夠利用以下命令建立consumers group信息,建立group consumer_offsets_t105
    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic t105 --from-beginning --group consumer_offsets_t105

    查詢consumer_offsets_t105 在 __consumer_offsets topic 中存放的位移信息__consumer_offsets 默認分區50。經過以下公式便可獲取:Math.abs("consumer_offsets_t105".hashCode()) % 50。

    能夠計算得位移偏移量是存在partitionId等於44分區上。
    使用命令能夠查詢出消息的偏移信息。
    bin/kafka-console-consumer.sh --topic __consumer_offsets --partition 44 --bootstrap-server localhost:9092 --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter" -- from-beginning
    能夠根據命令查詢消息的發佈狀況
    bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic t105 --time -1
    能夠看出消息偏移與消息的發佈的數據基本一致。

  2. kafka java調用
    7.1 生產者

    import java.util.Properties;
         import org.apache.kafka.clients.producer.KafkaProducer;
         import org.apache.kafka.clients.producer.Producer;
         import org.apache.kafka.clients.producer.ProducerRecord;
         public class ProducerDemo {
         
             public static void main(String[] args){
                 Properties properties = new Properties();
                 properties.put("bootstrap.servers", "localhost:9092");
                 properties.put("acks", "all");
                 properties.put("retries", 0);
                 properties.put("batch.size", 16384);
                 properties.put("linger.ms", 1);
                 properties.put("buffer.memory", 33554432);
                 properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
                 properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
                 Producer<String, String> producer = null;
                 try {
                     producer = new KafkaProducer<String, String>(properties);
                     for (int i = 0; i < 100; i++) {
                         String msg = "Message " + i;
                         producer.send(new ProducerRecord<String, String>("t105", msg));
                         System.out.println("Sent:" + msg);
                     }
                 } catch (Exception e) {
                     e.printStackTrace();
         
                 } finally {
                     producer.close();
                 }
         
             }
         }
        
        
    
    複製代碼
7.2 消費者

複製代碼
import java.time.Duration;
    import java.util.Arrays;
    import java.util.Properties;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    
    public class ConsumerDemo {
    
        public static void main(String[] args){
            Properties properties = new Properties();
            properties.put("bootstrap.servers", "localhost:9092");
            properties.put("group.id", "group4");
            properties.put("enable.auto.commit", "true");
            properties.put("auto.commit.interval.ms", "1000");
            properties.put("auto.offset.reset", "none");
            properties.put("session.timeout.ms", "30000");
            properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
            kafkaConsumer.subscribe(Arrays.asList("t105"));
            while (true) {
                ConsumerRecords<String, String> records = kafkaConsumer.poll(            Duration.ofMillis(100)
                );
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("partition = "+ record.partition() +"  offset = %d, value = %s", record.offset(), record.value());
                    System.out.println();
                }
            }
    
        }
    }

參數auto.offset.reset有三個值latest, earliest, none。<br>
earliest: automatically reset the offset to the earliest offset<br>
latest:  automatically reset the offset to the latest offset<br>
none: hrow exception to the consumer if no previous offset is found for the consumer's group
複製代碼
  1. kafka零拷貝
    傳統的IO機制
    這一過程實際上發生了四次數據拷貝。首先經過系統調用將文件數據讀入到內核態 Buffer(DMA 拷貝),而後應用程序將內存態 Buffer 數據讀入到用戶態 Buffer(CPU 拷貝),接着用戶程序經過 Socket 發送數據時將用戶態 Buffer 數據拷貝到內核態 Buffer(CPU 拷貝),最後經過 DMA 拷貝將數據拷貝到 NIC Buffer。同時,還伴隨着四次上下文切換。

數據經過 DMA 拷貝到內核態 Buffer 後,直接經過 DMA 拷貝到 NIC Buffer,無需 CPU 拷貝。除了減小數據拷貝外,由於整個讀文件 - 網絡發送由一個 sendfile 調用完成,整個過程只有兩次上下文切換,所以大大提升了性能。

  1. Kafka的leader選舉機制
    只有leader 負責讀寫,follower只負責備份,若是leader宕機的話,Kafka動態維護了一個同步狀態的副本的集合(a set of in-sync replicas),簡稱ISR,ISR中有f+1個節點,就能夠容許在f個節 點down掉的狀況下不會丟失消息並正常提供服。ISR的成員是動態的,若是一個節點被淘汰了,當它從新達到「同步中」的狀態時,他能夠從新加入ISR。所以若是leader宕了,直接從ISR中選擇一個follower就行。

若是全部的ISR副本都失敗了
此時有兩種方法可選,一種是等待ISR集合中的副本復活,一種是選擇任何一個當即可用的副本,而這個副本不必定是在ISR集合中。這兩種方法各有利弊,實際生產中按需選擇。
若是要等待ISR副本復活,雖然能夠保證一致性,但可能須要很長時間。而若是選擇當即可用的副本,則極可能該副本並不一致。

  1. kafka Stream
    一個流處理器從它所在的拓撲上游接收數據,經過Kafka Streams提供的流處理的基本方法, 如map()、filter()、join()以及聚合等方法,對數據進行處理,而後將處理以後的一個或者多個輸出結果發送給下游流處理器。

kafka的流實例參考
juejin.im/post/5cd50a…

  1. kafka壓縮

12. 經常使用命令 啓動 bin/kafka-server-start.sh config/server.properties bin/kafka-server-start.sh config/server-1.properties & bin/kafka-server-start.sh config/server-2.properties &

建立一個topic bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic t106

describe topics bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic t106

往集羣中發消息 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic t106

集羣消費消息 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic t106

驗證消息是否生產成功 bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic t105 --time -1

—————————

建立消費組消費消息 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic t105 --from-beginning --group consumer_offsets_t105

查詢偏移量消息 bin/kafka-console-consumer.sh --topic __consumer_offsets --partition 44 --bootstrap-server localhost:9092 --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter" -- from-beginning

//經過config文件訪問客戶端 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic t105 --group consumer_offsets_t105 --consumer.config config/consumer.properties

相關文章
相關標籤/搜索