Kafka學習筆記之掃盲

本文使用的Kafka版本0.11html

先思考些問題:java

  • 我想分析一下用戶行爲(pageviews),以便我能設計出更好的廣告位git

  • 我想對用戶的搜索關鍵詞進行統計,分析出當前的流行趨勢。這個頗有意思,在經濟學上有個長裙理論,就是說,若是長裙的銷量高了,說明經濟不景氣了,由於姑娘們沒錢買各類絲襪了。github

  • 有些數據,我以爲存數據庫浪費,直接存硬盤又怕到時候操做效率低。數據庫

這個時候,咱們就能夠用到分佈式消息系統了。雖然上面的描述更偏向於一個日誌系統,但確實kafka在實際應用中被大量的用於日誌系統。
這些場景都有一個共同點:數據是由上游模塊產生,上游模塊,使用上游模塊的數據計算、統計、分析,這個時候就可使用消息系統,尤爲是分佈式消息系統! apache

Kafka是一個分佈式消息系統,由linkedin使用scala編寫. Kafka的動態擴容是經過Zookeeper來實現的。
Zookeeper是一種在分佈式系統中被普遍用來做爲:分佈式狀態管理、分佈式協調管理、分佈式配置管理、和分佈式鎖服務的集羣。kafka增長和減小服務器都會在Zookeeper節點上觸發相應的事件。編程

相關概念

一、 AMQP協議(Advanced Message Queuing Protocol,高級消息隊列協議)
AMQP是一個標準開放的應用層的消息中間件(Message Oriented Middleware)協議。AMQP定義了經過網絡發送的字節流的數據格式。所以兼容性很是好,任何實現AMQP協議的程序均可以和與AMQP協議兼容的其餘程序交互,能夠很容易作到跨語言,跨平臺。bootstrap

二、 一些基本的概念緩存

  • 消費者:(Consumer):從消息隊列中請求消息的客戶端應用程序安全

  • 生產者:(Producer) :向broker發佈消息的應用程序

  • AMQP服務端(broker):用來接收生產者發送的消息並將這些消息路由給服務器中的隊列,便於fafka將生產者發送的消息,動態的添加到磁盤並給每一條消息一個偏移量,因此對於kafka一個broker就是一個應用程序的實例

  • 主題(Topic):一個主題相似新聞中的體育、娛樂、教育等分類概念,在實際工程中一般一個業務一個主題。

  • 分區(Partition):一個Topic中的消息數據按照多個分區組織,分區是kafka消息隊列組織的最小單位,一個分區能夠看做是一個FIFO( First Input First Output的縮寫,先入先出隊列)的隊列。

生產者生產(push)消息、kafka集羣、消費者獲取(pull)消息這樣一種架構,kafka集羣中的消息,是經過Topic(主題)來進行組織的. 生產者能夠選擇本身喜歡的序列化方法對消息內容編碼。
kafka分區是提升kafka性能的關鍵所在,當你發現你的集羣性能不高時,經常使用手段就是增長Topic的分區,分區裏面的消息是按照重新到老的順序進行組織,消費者從隊列頭訂閱消息,生產者從隊列尾添加消息。

Kafka架構


簡化圖以下:

咱們看上面的圖,咱們把broker的數量減小,只有一臺。如今假設咱們按照上圖進行部署:

  • Server-1 broker其實就是kafka的server,由於producer和consumer都要去連它。Broker主要仍是作存儲用。

  • Server-2是zookeeper的server端,在這裏你能夠先想象,它維持了一張表,記錄了各個節點的IP、端口等信息(之後還會講到,它裏面還存了kafka的相關信息)。

  • Server-三、四、5他們的共同之處就是都配置了zkClient,這之間的鏈接都是須要zookeeper來進行分發的。

  • Server-1和Server-2的關係,他們能夠放在一臺機器上,也能夠分開放,zookeeper也能夠配集羣。目的是防止某一臺掛了。

kafka和JMS(Java Message Service)實現(activeMQ)不一樣的是:即便消息被消費,消息仍然不會被當即刪除.日誌文件將會根據broker中的配置要求,保留必定的時間以後刪除;好比log文件保留2天,那麼兩天後,文件會被清除,不管其中的消息是否被消費.但kafka並無提供JMS中的"事務性""消息傳輸擔保(消息確認機制)""消息分組"等企業級特性;kafka只能使用做爲"常規"的消息系統,在必定程度上,沒法確保消息的發送與接收絕對可靠(好比,消息重發,消息發送丟失等)

對於consumer而言,它須要保存消費消息的offset,對於offset的保存和使用,有consumer來控制;當consumer正常消費消息時,offset將會"線性"的向前驅動,即消息將依次順序被消費.事實上consumer可使用任意順序消費消息,它只須要將offset重置爲任意值..(offset將會保存在zookeeper中)

kafka集羣幾乎不須要維護任何consumer和producer狀態信息,這些信息有zookeeper保存;所以producer和consumer的實現很是輕量級,它們能夠隨意離開,而不會對集羣形成額外的影響.

partitions的目的有多個.最根本緣由是kafka基於文件存儲.經過分區,能夠將日誌內容分散到多個上,來避免文件尺寸達到單機磁盤的上限;能夠將一個topic切分多任意多個partitions.此外越多的partitions意味着能夠容納更多的consumer,有效提高併發消費的能力.

每一個consumer屬於一個consumer group;反過來講,每一個group中能夠有多個consumer.發送到Topic的消息,只會被訂閱此Topic的每一個group中的一個consumer消費(而不是該group下的全部consumer,必定要注意這點).

  • 若是全部的consumer都具備相同的group,這種狀況和queue模式很像;消息將會在consumers之間負載均衡.

  • 若是全部的consumer都具備不一樣的group,那這就是"發佈-訂閱";消息將會廣播給全部的消費者.

在kafka中,一個partition中的消息只會被group中的一個consumer消費;每一個group中consumer消息消費互相獨立;咱們能夠認爲一個group是一個"訂閱"者,一個Topic中的每一個partions,只會被一個"訂閱者"中的一個consumer消費,不過一個consumer能夠消費多個partitions中的消息

注意:kafka使用文件存儲消息,這就直接決定kafka在性能上嚴重依賴文件系統的自己特性。

在分佈式方面:

  • broker的部署是一種no central master的概念,而且每一個節點都是同等的,節點的增長和減小都不須要改變任何配置。

  • producer和consumer經過zookeeper去發現topic,而且經過zookeeper來協調生產和消費的過程。

  • producer、consumer和broker均採用TCP鏈接,通訊基於NIO實現。Producer和consumer能自動檢測broker的增長和減小。

使用場景:

  • 常規消息系統。

  • kafka能夠做爲"網站活性跟蹤"的最佳工具;能夠將網頁/用戶操做等信息發送到kafka中.並實時監控,或者離線統計分析等

  • kafka的特性決定它很是適合做爲"日誌收集中心";application能夠將操做日誌"批量""異步"的發送到kafka集羣中,而不是保存在本地或者DB中;kafka能夠批量提交消息/壓縮消息等,這對producer端而言,幾乎感受不到性能的開支.此時consumer端可使hadoop等其餘系統化的存儲和分析系統.

簡單說下整個系統運行的順序:

  • 1.啓動zookeeper的server

  • 2.啓動kafka的server

  • 3.Producer若是生產了數據,會先經過zookeeper找到broker,而後將數據存放進broker

  • 4.Consumer若是要消費數據,會先經過zookeeper找對應的broker,而後消費。

本地單擊測試環境啓動順序:

  • 1.啓動zookeeper server :bin/zookeeper-server-start.sh ../config/zookeeper.properties &

  • 2.啓動kafka server: bin/kafka-server-start.sh ../config/server.properties &

  • 3.Kafka爲咱們提供了一個console來作連通性測試,
    先建立一個topic:bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test ,你能夠運行bin/kafka-topics.sh --list --zookeeper localhost:2181來檢查是否建立成功和topic列表

運行producer(默認broker端口9092):bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 這是至關於開啓了一個producer的命令行。

  • 4.接下來運行consumer,新啓一個terminal:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

  • 5.執行完consumer的命令後,你能夠在producer的terminal中輸入信息,立刻在consumer的terminal中就會出現你輸的信息。有點兒像一個通訊客戶端。

配置項:

http://kafka.apache.org/docum...
必要配置項:

  • broker.id

  • log.dirs

  • zookeeper.connect

編程

APIDOC:http://kafka.apache.org/0110/...
官方github例子: https://github.com/apache/kaf...

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.0</version>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>0.11.0.0</version>
</dependency>

首先貼一下官方例子:

Producer:

public class MyKafkaProducer {

    public static void main(String[] args) {
        /**
         * 這個例子中,每次調用都會建立一個Producer實例,但此處只是爲了演示方便,實際使用中,請將Producer做爲單例使用,它是線程安全的。

         * 從Kafka 0.11 開始,KafkaProducer支持兩種額外的模式:冪等(idempotent)與事務(transactional)。冪等使得以前的at least once變成exactly once傳送
         * 冪等Producer的重試再也不會致使重複消息。事務容許應用程序以原子方式將消息發送到多個分區(和主題!)

         * 開啓idempotence冪等:props.put("enable.idempotence", true);設置以後retries屬性自動被設爲Integer.MAX_VALUE;;acks屬性自動設爲all;;max.inflight.requests.per.connection屬性自動設爲1.其他同樣。

         * 開啓事務性: props.put("transactional.id", "my-transactional-id");一旦這個屬性被設置,那麼冪等也會自動開啓。而後使用事務API操做便可
         */
    }
    private static void send(){
        Properties props = new Properties();
         props.put("bootstrap.servers", "localhost:9092");
         props.put("enable.idempotence", true);//開啓idempotence冪等 extract-once
//         props.put("acks", "all");//acks配置控制請求被認爲完成的條件
//         props.put("retries", 0);重試次數
//         props.put("batch.size", 16384);
//         props.put("linger.ms", 1);
//         props.put("buffer.memory", 33554432);
         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

         Producer<String, String> producer = new KafkaProducer<>(props);
         for (int i = 0; i < 100; i++)
             producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

         producer.close();
    }
    private static void sendInTx(){
         Properties props = new Properties();
         props.put("bootstrap.servers", "localhost:9092");
         props.put("transactional.id", "my-transactional-id");//要啓用事務,必須配置一個惟一的事務id

         /**
          * http://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
          * KafkaProducer類是線程安全的,能夠在多線程之間共享。
          */
         Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

         producer.initTransactions();

         try {
             producer.beginTransaction();
             for (int i = 0; i < 100; i++){
                 // send()是異步的,會當即返回,內部是緩存到producer的buffer中,以便於生產者能夠批量提交, 你也能夠傳遞一個回調send(ProducerRecord<K,V> record, Callback callback)
                 producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
             }
             producer.commitTransaction();
         } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
             //沒法恢復的異常,咱們只能關閉producer 
             producer.close();
         } catch (KafkaException e) {
             // 可恢復的異常,終止事務而後重試便可。
             producer.abortTransaction();
         }
         producer.close();
    }
}

發送完以後,咱們能夠用bin目錄下的kafka-console-consumer來看發送的結果(固然如今用的topic是test)。能夠用命令:

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

Consumer:

/**
 *與producer不一樣,Kafka consumer不是線程安全的。
 */
public class MyKafkaConsumer {
    /**
     * 經過配置enable.auto.commit,auto.commit.interval.ms來按期自動提交消費的偏移量
     */
    private  void recieveByAutoCommitOffset(){
        Properties props = new Properties();
         props.put("bootstrap.servers", "localhost:9092");
         props.put("group.id", "test");
         props.put("enable.auto.commit", "true");
         props.put("auto.commit.interval.ms", "1000");
         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
         consumer.subscribe(Arrays.asList("foo", "bar"));
         while (true) {
             ConsumerRecords<String, String> records = consumer.poll(100);
             for (ConsumerRecord<String, String> record : records)
                 System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
         }
//         consumer.wakeup();
    }
    /**
     * 手動提交消費的偏移量,這樣用戶能夠控制記錄什麼時候被視爲已消費,從而提交其偏移量。 當消息的消耗與一些處理邏輯相結合時,這是有用的,由於在完成處理以前不該將消息視爲已消費。
     */
    private void recieveByManualCommitOffset(){
        Properties props = new Properties();
         props.put("bootstrap.servers", "localhost:9092");
         props.put("group.id", "test");
         props.put("enable.auto.commit", "false");//手動提交offset
         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
         consumer.subscribe(Arrays.asList("foo", "bar"));
         final int minBatchSize = 200;
         List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
         while (true) {
             ConsumerRecords<String, String> records = consumer.poll(100);
             for (ConsumerRecord<String, String> record : records) {
                 buffer.add(record);
             }
             if (buffer.size() >= minBatchSize) {
//                 insertIntoDb(buffer); 執行相關邏輯
                 consumer.commitSync();//提交offset
                 buffer.clear();
             }
         }
    }
}

Streams:

public class MyKafkaStreams {
    public void test(){
        Map<String, Object> props = new HashMap<>();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         StreamsConfig config = new StreamsConfig(props);

         KStreamBuilder builder = new KStreamBuilder();
         builder.stream("my-input-topic").mapValues(value -> value.toString()+"!!!").to("my-output-topic");

         KafkaStreams streams = new KafkaStreams(builder, config);
         streams.start();
    }
}

注意點:

  • 將producer寫成單例模式,有助於減小zookeeper端佔用的資源。Producer自身是線程安全的類,只要封裝得當就能最恰當的發揮好producer的做用。(ZkClient去鏈接zookeeper的server時候都會建立sendThread和eventThread兩個線程,其中sendThread主要用於client與server端之間的網絡鏈接,真正的處理線程由eventThread來執行。Zookeeper是一個分佈式的協調框架,而分佈式應用中常常會出現動態的增長或刪除節點的操做,因此爲了實時瞭解分佈式整個節點的數量和基本信息,就有必要維護一個長鏈接的線程與服務端保持鏈接。另外zookeeper鏈接時佔用的時間也比較長,若是每次生產數據時都鏈接發起一次鏈接勢必形成了大量時間的耗費。)

  • kafka是將消息按照topic的形式存儲,一個topic會按照partition存在同一個文件夾下,目錄在config/server.properties中指定:

# The directory under which to store log files
log.dir=/tmp/kafka-logs

在消息系統中都會有這樣一個問題存在,數據消費狀態這個信息到底存哪裏。是存在consumer端,仍是存在broker端。對於這樣的爭論,通常會出現三種狀況:

  • At most once :消息一旦發出就立馬標記已消費,不會再有第二發生即便失敗了,缺點是容易丟失消息。

  • At least once :消息至少發送一次,若是消息未能接受成功,可能會重發,直到接收成功.

  • Exactly once :每一個消息僅發生一次,並且一次就能確保到達。這是理想狀態。(kafka0.11支持冪等以後,在開啓冪等的狀況下,就是這種模式)

at most once: 消費者fetch消息,而後保存offset,而後處理消息;當client保存offset以後,可是在消息處理過程當中出現了異常,致使部分消息未能繼續處理.那麼此後"未處理"的消息將不能被fetch到,這就是"atmost once".

at least once: 消費者fetch消息,而後處理消息,而後保存offset.若是消息處理成功以後,可是在保存offset階段zookeeper異常致使保存操做未能執行成功,這就致使接下來再次fetch時可能得到上次已經處理過的消息,這就是"at least once",緣由offset沒有及時的提交給zookeeper,zookeeper恢復正常仍是以前offset狀態.

logback-kafka集成例子

https://github.com/xbynet/log...

參考:

http://kafka.apache.org/docum...
http://kafka.apache.org/intro...
https://my.oschina.net/ielts0...
http://blog.csdn.net/my_bai/a...
http://www.infoq.com/cn/artic...
http://www.cnblogs.com/likehu...
http://www.cnblogs.com/likehu...
https://www.iteblog.com/archi...

相關文章
相關標籤/搜索