專爲實時而構建:使用Apache Kafka進行大數據消息傳遞 第2部分

在Apache Kafka簡介的前半部分,您使用Kafka開發了幾個小規模的生產者/消費者應用程序。從這些練習中,您應該熟悉Apache Kafka消息傳遞系統的基礎知識。在下半部分,您將學習如何使用分區來分佈負載並橫向擴展應用程序,天天處理多達數百萬條消息。您還將瞭解Kafka如何使用消息偏移來跟蹤和管理複雜的消息處理,以及如何在消費者失敗時保護您的Apache Kafka消息傳遞系統免於失敗。咱們將從第1部分開發用於發佈 - 訂閱和點對點用例的示例應用程序html

Apache Kafka中的分區

Kafka中的topic能夠細分爲分區。例如,在建立名爲Demo的topic時,您能夠將其配置爲具備三個分區。服務器將建立三個日誌文件,每一個文件分區一個。當生產者向topic發佈消息時,它將爲該消息分配分區ID。而後,服務器將消息僅附加到該分區的日誌文件中。java

若是您隨後啓動了兩個消費者,則服務器可能會將分區1和2分配給第一個消費者,將分區3分配給第二個消費者。每一個消費者只能從其分配的分區中讀取。您能夠在圖1中看到爲三個分區配置的Demo的topic。git

爲了擴展這個場景,想象一下有兩個代理的Kafka集羣,它位於兩臺機器中。分區演示tpoic時,您將其配置爲具備兩個分區和兩個副本。對於此類配置,Kafka服務器會將兩個分區分配給羣集中的兩個broker。每一個broker都是其中一個分區的領導者。github

當生產者發佈消息時,它將轉到分區領導者。領導者將獲取消息並將其附加到本地計算機上的日誌文件中。第二個broker會被動地將該提交日誌複製到本身的機器上。若是分區負責人發生故障,第二個broker將成爲新的領導者並開始提供客戶端請求。以一樣的方式,當消費者向分區發送請求時,該請求將首先發送給分區領導者,分區領導者將返回所請求的消息。數據庫

分區的好處

考慮分區基於Kafka的消息傳遞系統的好處:apache

  1. 可伸縮性:在只有一個分區的系統中,發佈到topic的消息存儲在一個日誌文件中,該文件存在於一臺計算機上。topic的消息數必須適合單個提交日誌文件,而且存儲的消息大小永遠不會超過該計算機的磁盤空間。經過對topic進行分區,您能夠經過將消息存儲在羣集中的不一樣計算機上來擴展系統。例如,若是您想爲演示主題存儲30千兆字節(GB)的消息,您能夠構建一個由三臺計算機組成的Kafka集羣,每臺計算機具備10 GB的磁盤空間。而後,您將topic配置爲具備三個分區。
  2. 服務器負載平衡:擁有多個分區可以讓您在broker之間傳播消息請求。例如,若是您的topic每秒處理100萬條消息,則能夠將其劃分爲100個分區,並將100個broker添加到羣集中。每一個broker都是單個分區的領導者,負責每秒響應10,000個客戶端請求。
  3. 消費者負載平衡:與服務器負載平衡相似,在不一樣機器上託管多個消費者能夠分散消費者負載。假設您但願從具備100個分區的topic每秒消耗100萬條消息。您能夠建立100個消費者並並行運行它們。Kafka服務器將爲每一個消費者分配一個分區,每一個消費者將並行處理10,000個消息。因爲Kafka僅將每一個分區分配給一個消費者,所以在分區內將按順序使用每一個消息。

兩種分區方式

生產者負責決定消息將進入的分區。生產者有兩種控制這種分配的選擇:編程

  • 自定義分區程序:您能夠建立實現該org.apache.kafka.clients.producer.Partitioner接口的類。此自定義Partitioner將實現業務邏輯以肯定發送消息的位置。
  • DefaultPartitioner:若是您不建立自定義分區程序類,則默認狀況下將使用該類org.apache.kafka.clients.producer.internals.DefaultPartitioner。對於大多數狀況,默認分區程序足夠好,提供三個選項:
  • 手動:建立ProducerRecord時,使用重載的構造函數new ProducerRecord(topicName, partitionId,messageKey,message)指定分區ID。
  • 散列(局部敏感):建立ProducerRecord時,經過調用指定messageKey的構造方法 new ProducerRecord(topicName,messageKey,message)DefaultPartitioner將使用messageKey的散列來確保相同messageKey的全部消息都轉到同一個生產者。這是最簡單也是最經常使用的方法。
  • 噴塗(隨機負載平衡):若是您不想控制哪些分區消息,只需調用new ProducerRecord(topicName, message)以建立您的ProducerRecord。在這種狀況下,分區程序將以循環方式向全部分區發送消息,從而確保平衡的服務器負載。

對Apache Kafka應用程序進行分區

對於第1部分中的簡單生產者/消費者示例,咱們使用了 DefaultPartitioner。如今咱們將嘗試建立自定義分區程序。對於此示例,咱們假設咱們有一個零售網站,消費者可使用該網站在世界任何地方訂購產品。根據使用狀況,咱們知道大多數消費者都在美國或印度。咱們但願對咱們的應用程序進行分區,以便未來自美國或印度的訂單發送給各自的消費者,而來自其餘任何地方的訂單將轉發給第三個消費者。api

首先,咱們將建立一個實現org.apache.kafka.clients.producer.Partitioner接口的CountryPartitioner。咱們必須實現如下方法:bash

  1. 當咱們使用配置屬性初始化類時,Kafka將調用configure()。此方法初始化特定於應用程序業務邏輯的函數,例如鏈接到數據庫。在這種狀況下,咱們須要一個至關通用的分區器做爲屬性。而後,咱們可使用將消息流映射到分區。未來咱們可使用這種格式來改變哪些國家/地區得到本身的分區。PartitionerMapcountryNameconfigProperties.put("partitions.0","USA")
  2. Producer爲每一個消息調用partition()方法。在這種狀況下,咱們將使用它來讀取消息並從消息中解析國家/地區的名稱。若是國家的名稱在countryToPartitionMap,它將返回存儲在MappartitionId若是沒有,它將散列國家的值並使用它來計算它應該去哪一個分區。
  3. 咱們調用close()來關閉分區程序。使用此方法可確保在關閉期間清除初始化期間獲取的任何資源。

請注意,當Kafka調用configure()時,Kafka生成器會將咱們爲生成器配置的全部屬性傳遞給Partitioner類。咱們必須只讀取那些以partitions.開頭的屬性,解析它們以獲取partitionId並存儲ID到 countryToPartitionMap服務器

如下是咱們的Partitioner界面自定義實現。

清單1. CountryPartitioner

public class CountryPartitioner implements Partitioner {
        private static Map<String,Integer> countryToPartitionMap;

        public void configure(Map<String, ?> configs) {
            System.out.println("Inside CountryPartitioner.configure " + configs);
            countryToPartitionMap = new HashMap<String, Integer>();
            for(Map.Entry<String,?> entry: configs.entrySet()){
                if(entry.getKey().startsWith("partitions.")){
                    String keyName = entry.getKey();
                    String value = (String)entry.getValue();
                    System.out.println( keyName.substring(11));
                    int paritionId = Integer.parseInt(keyName.substring(11));
                    countryToPartitionMap.put(value,paritionId);
                }
            }
        }

        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,
                             Cluster cluster) {
            List partitions = cluster.availablePartitionsForTopic(topic);
            String valueStr = (String)value;
            String countryName = ((String) value).split(":")[0];
            if(countryToPartitionMap.containsKey(countryName)){
                //If the country is mapped to particular partition return it
                return countryToPartitionMap.get(countryName);
            }else {
                //If no country is mapped to particular partition distribute between remaining partitions
                int noOfPartitions = cluster.topics().size();
                return  value.hashCode()%noOfPartitions + countryToPartitionMap.size() ;
            }
        }

        public void close() {}
    }
    複製代碼

Producer清單2(下面)中的類與第1部分中的簡單生成器很是類似,其中兩個更改以粗體標記:

  1. 咱們使用等於值ProducerConfig.PARTITIONER_CLASS_CONFIG的鍵設置config屬性,該值匹配咱們CountryPartitioner類的徹底限定名。咱們還設置countryNamepartitionId,從而映射了咱們想要傳遞給CountryPartitioner的屬性。
  2. 咱們將實現org.apache.kafka.clients.producer.Callback接口的類的實例做爲producer.send()方法的第二個參數傳遞。一旦成功發佈消息(附加了RecordMetadata對象),Kafka客戶端將調用onCompletion()其方法。咱們將可以使用此對象來找出發送消息的分區,以及分配給已發佈消息的偏移量。

清單2.分區生產者

public class Producer {
    private static Scanner in;
    public static void main(String[] argv)throws Exception {
        if (argv.length != 1) {
            System.err.println("Please specify 1 parameters ");
            System.exit(-1);
        }
        String topicName = argv[0];
        in = new Scanner(System.in);
        System.out.println("Enter message(type exit to quit)");

        //Configure the Producer
        Properties configProperties = new Properties();
        configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
        configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

            configProperties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,CountryPartitioner.class.getCanonicalName());
        configProperties.put("partition.1","USA");
        configProperties.put("partition.2","India");
        
        org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties);
        String line = in.nextLine();
        while(!line.equals("exit")) {
            ProducerRecord<String, String> rec = new ProducerRecord<String, String>(topicName, null, line);
            producer.send(rec, new Callback() {
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    System.out.println("Message sent to topic ->" + metadata.topic()+ " ,parition->" + metadata.partition() +" stored at offset->" + metadata.offset());
;
                }
            });
            line = in.nextLine();
        }
        in.close();
        producer.close();
    }
}
複製代碼

爲消費者分配分區

Kafka服務器保證僅將分區分配給一個消費者,從而保證消息的消耗順序。您能夠手動分配分區或自動分配分區。

若是您的業務邏輯須要更多控制,那麼您將須要手動分配分區。在這種狀況下,您將使用KafkaConsumer.assign(<listOfPartitions>)將每一個消費者感興趣的分區列表傳遞給Kakfa服務器。

自動分配分區是默認和最多見的選擇。在這種狀況下,Kafka服務器將爲每一個使用者分配一個分區,並將從新分配分區以擴展新的使用者。

假設您正在建立一個包含三個分區的新topic。當您爲新topic啓動第一個消費者時,Kafka會將全部三個分區分配給同一個消費者。若是您隨後啓動第二個消費者,Kafka將從新分配全部分區,將一個分區分配給第一個下發者,將剩餘的兩個分區分配給第二個消費者。若是添加第三個消費者,Kafka將再次從新分配分區,以便爲每一個消費者分配一個分區。最後,若是您啓動第四個和第五個消費者,那麼三個消費者將擁有一個分配的分區,但其餘消費者將不會收到任何消息。若是最初的三個分區之一出現故障,Kafka將使用相同的分區邏輯將該消費者的分區從新分配給其餘消費者。

咱們將爲示例應用程序使用自動分配。咱們的大部分消費者代碼都與第1部分中的簡單消費者代碼相同。惟一的區別是咱們將一個實例ConsumerRebalanceListener做爲第二個參數傳遞給咱們的KafkaConsumer.subscribe()方法。Kafka每次爲此消費者分配或撤銷分區時都會調用此類的方法。咱們將覆蓋ConsumerRebalanceListeneronPartitionsRevoked()onPartitionsAssigned()方法,並打印今後訂閱者分配或撤消的分區列表。

清單3.分區的使用者

private static class ConsumerThread extends Thread {
     private String topicName;
     private String groupId;
     private KafkaConsumer<String, String> kafkaConsumer;

     public ConsumerThread(String topicName, String groupId) {
         this.topicName = topicName;
         this.groupId = groupId;
     }

     public void run() {
         Properties configProperties = new Properties();
         configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
         configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
         configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

         //Figure out where to start processing messages from
         kafkaConsumer = new KafkaConsumer<String, String>(configProperties);
         kafkaConsumer.subscribe(Arrays.asList(topicName), new ConsumerRebalanceListener() {
             public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                 System.out.printf("%s topic-partitions are revoked from this consumer\n", Arrays.toString(partitions.toArray()));
             }
             public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                 System.out.printf("%s topic-partitions are assigned to this consumer\n", Arrays.toString(partitions.toArray()));
             }
         });
         //Start processing messages
         try {
             while (true) {
                 ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                 for (ConsumerRecord<String, String> record : records)
                     System.out.println(record.value());
             }
         } catch (WakeupException ex) {
             System.out.println("Exception caught " + ex.getMessage());
         } finally {
             kafkaConsumer.close();
             System.out.println("After closing KafkaConsumer");
         }
     }

     public KafkaConsumer<String, String> getKafkaConsumer() {
         return this.kafkaConsumer;
     }
}
   複製代碼

測試您的Apache Kafka應用程序

咱們已準備好運行並測試生產者/消費者應用程序的當前迭代。如前所述,您可使用清單1到清單3中的代碼,或者在GitHub上下載完整的源代碼

  1. 經過調用:編譯並建立一個JAR mvn compile assembly:single
  2. 建立一個以三個分區和一個複製因子命名的主題part-demo <KAFKA_HOME>bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic part-demo
  3. 啓動生產者: java -cp target/KafkaAPIClient-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.partition.Producer part-demo
  4. 啓動三個消費者,而後觀察控制檯以查看每次啓動使用者的新實例時如何分配和撤消分區: java -cp target/KafkaAPIClient-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.partition.Consumer part-demo group1
  5. 在生產者控制檯中鍵入一些消息,並驗證消息是否路由到正確的使用者: USA: First order India: First order USA: Second order France: First order

圖2顯示了分區主題中的生產者/消費者輸出。

可以將單個主題劃分爲多個部分是Kafka可擴展性的關鍵。經過分區,您能夠水平擴展消息傳遞基礎結構,同時還能夠維護每一個分區內的順序 接下來,咱們將瞭解Kafka如何使用消息偏移來跟蹤和管理複雜的消息傳遞方案。

管理message偏移

我在第1部分中提到,每當生產者發佈消息時,Kafka服務器就會爲該消息分配一個偏移量。消費者可以經過設置或重置消息偏移來控制它想要消費的消息。在開發消費者時,您有兩種管理偏移的選項:自動和手動。

兩種類型的偏移

當您在Kafka客戶端中啓動使用者時,它將讀取您的ConsumerConfig.AUTO_OFFSET_RESET_CONFIG(auto.offset.reset)配置值。若是該配置設置爲

最先,
則消費者將以該topic可用的最小偏移量開始。在向Kafka提出的第一個請求中,消費者會說:給我這個分區中的全部消息,其偏移量大於可用的最小值。它還將指定批量大小。Kafka服務器將以指定大小的批量返回全部匹配的消息。

消費者跟蹤它處理的最後一條消息的偏移量,所以它將始終請求偏移量高於最後一個偏移量的消息。當消費者正常運行時,此設置有效,但若是消費者崩潰,或者您想中止維護,會發生什麼?在這種狀況下,您但願使用者記住上次處理的消息的偏移量,以便它能夠從第一個未處理的消息開始。

爲了確保消息持久性,Kafka使用兩種類型的偏移:

當前偏移量
用於跟蹤消費者正常工做時消耗的消息。該
偏移
還跟蹤最後的消息抵消,但它發送信息到服務器kafka永久儲存。

若是消費者因爲某種緣由而關閉或被關閉,它能夠向Kafka服務器查詢

最後提交的偏移量
並恢復消息消費,就好像沒有丟失同樣。就其自己而言,Kafka broker將此信息存儲在一個名爲 __consumer_offsets的topic中。此數據將複製到多個broker,以便broker不會丟失偏移量。

提交偏移數據

您能夠選擇提交偏移數據的頻率。若是您常常提交,則會受到性能損失。另外一方面,若是消費者確實失敗了,那麼從新處理和消費的消息就會減小。您的另外一個選擇是減小提交(以得到更好的性能),但在發生故障時從新處理更多消息。在任何一種狀況下,消費者都有兩種提交偏移的選項:

  1. 自動提交:您能夠設置auto.commit爲true並使用以毫秒爲單位的值設置auto.commit.interval.ms屬性。啓用此功能後,Kafka使用者將提交poll()調用而收到的最後一條消息的偏移量。該poll()調用在auto.commit.interval.ms後發出。
  2. 手動提交:您能夠隨時調用KafkaConsumercommitSync()commitAsync()方法。當您發出調用時,使用者將獲取在poll()期間收到的最後一條消息的偏移量並將其提交給Kafka服務器。

手動偏移的三個用例

讓咱們考慮三種使用狀況,您不但願使用Kafka的默認偏移管理基礎架構。相反,您將手動肯定要從哪一個消息開始。

  1. 從頭開始:在此用例中,您將捕獲Kafka中的數據庫更改。第一份數據是完整數據; 此後,您只會得到值已更改的列(更改的增量)。在這種狀況下,您始終須要從頭開始閱讀topic中的全部消息,以構建記錄的完整狀態。要解決這種狀況,您能夠將消費者配置爲經過調用kafkaConsumer.seekToBeginning(topicPartition)方法從頭開始讀取。請記住,默認狀況下,Kafka將刪除超過七天的消息,所以您須要爲此用例配置更高的log.retention.hours值。
  2. 轉到最後:如今讓咱們假設您經過實時分析交易來構建股票推薦應用程序。最糟糕的狀況發生,您的消費者應用程序崩潰。在這種狀況下,你已經使用過了kafkaConsumer.seekToEnd(topicPartition) 來配置偏移量以忽略停機期間的消息。相反,消費者將開始處理重啓之時發生的消息
  3. 從給定的偏移開始:最後,假設您剛剛在生產環境中發佈了新版本的生產者。在觀看它產生一些消息後,您意識到它正在生成錯誤消息。你修復了生產者並從新開始。您不但願消費者使用這些錯誤消息,所以您能夠經過調用kafkaConsumer.seek(topicPartition, startingOffset)手動將偏移量設置爲生成的第一個良好消息。

消費者應用程序中的手動偏移

咱們迄今爲止開發的消費者代碼每5秒自動提交一次記錄。如今讓咱們更新消費者以獲取手動設置偏移消耗的第三個參數。

若是使用等於0的最後一個參數的值,則使用者將假定您要從頭開始,所以它將爲每一個分區調用一個kafkaConsumer.seekToBeginning()方法。若是傳遞值-1,則會假定您要忽略現有消息,而且僅消費在從新啓動使用者後發佈的消息。在這種狀況下,它將爲每一個分區調用kafkaConsumer.seekToEnd()。最後,若是指定除0或-1之外的任何值,則會假定您已指定了消費者要從中開始的偏移量; 例如,若是您將第三個值傳遞爲5,那麼在從新啓動時,使用者將使用偏移量大於5的消息。爲此,它將調用kafkaConsumer.seek(<topicname>, <startingoffset>)

清單4.向使用者添加第三個參數

private static class ConsumerThread extends Thread{
    private String topicName;
    private String groupId;
    private long startingOffset;
    private KafkaConsumer<String,String> kafkaConsumer;

    public ConsumerThread(String topicName, String groupId, long startingOffset){
        this.topicName = topicName;
        this.groupId = groupId;
        this.startingOffset=startingOffset;
    }
    public void run() {
        Properties configProperties = new Properties();
        configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        configProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, "offset123");
        configProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

        //Figure out where to start processing messages from
        kafkaConsumer = new KafkaConsumer<String, String>(configProperties);
        kafkaConsumer.subscribe(Arrays.asList(topicName), new ConsumerRebalanceListener() {
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                System.out.printf("%s topic-partitions are revoked from this consumer\n", Arrays.toString(partitions.toArray()));
            }
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                System.out.printf("%s topic-partitions are assigned to this consumer\n", Arrays.toString(partitions.toArray()));
                Iterator<TopicPartition> topicPartitionIterator = partitions.iterator();
                while(topicPartitionIterator.hasNext()){
                    TopicPartition topicPartition = topicPartitionIterator.next();
                    System.out.println("Current offset is " + kafkaConsumer.position(topicPartition) + " committed offset is ->" + kafkaConsumer.committed(topicPartition) );
                    if(startingOffset ==0){
                        System.out.println("Setting offset to beginning");
                        kafkaConsumer.seekToBeginning(topicPartition);
                    }else if(startingOffset == -1){
                        System.out.println("Setting it to the end ");
                        kafkaConsumer.seekToEnd(topicPartition);
                    }else {
                        System.out.println("Resetting offset to " + startingOffset);
                        kafkaConsumer.seek(topicPartition, startingOffset);
                    }
                }
            }
        });
        //Start processing messages
        try {
            while (true) {
                ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(record.value());
                }

            }
        }catch(WakeupException ex){
            System.out.println("Exception caught " + ex.getMessage());
        }finally{
            kafkaConsumer.close();
            System.out.println("After closing KafkaConsumer");
        }
    }
    public KafkaConsumer<String,String> getKafkaConsumer(){
        return this.kafkaConsumer;
    }
}
複製代碼

代碼準備就緒後,您能夠經過執行如下命令對其進行測試:

java -cp target/KafkaAPIClient-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.offset.Consumer part-demo group1 0
    複製代碼

Kafka客戶端應該打印偏移量爲0的全部消息,或者您能夠更改最後一個參數的值以在消息隊列中跳轉。

Apache Kafka中的消費者羣體

傳統的消息傳遞用例能夠分爲兩種主要類型:點對點和發佈 - 訂閱。在

點對點
場景中,一個消費者使用一條消息。當消息中繼銀行交易時,只有一個消費者應該經過更新銀行帳戶進行響應。在
發佈 - 訂閱
方案中,多個消費者將使用單個消息但對其做出不一樣的響應。當Web服務器出現故障時,您但願將警報發送給編程爲以不一樣方式響應的消費者。

隊列
是指點對點場景,其中消息僅由一個消費者使用。
主題
是指發佈 - 訂閱方案,其中每一個消費者都使用消息。Kafka沒有爲隊列和主題用例定義單獨的API; 相反,當您啓動消費者時,您須要指定 ConsumerConfig.GROUP_ID_CONFIG屬性。

若是您對多個消費者使用相同的GROUP_ID_CONFIG消息,Kafka將假設它們都是單個組的一部分,而且它將僅向一個消費者傳遞消息。若是你在不一樣的group.id中啓動兩個消費者,Kafka將假設它們不相關,所以每一個消費者將得到它本身的消息副本。

回想一下清單3中的分區使用者將groupId其做爲第二個參數。如今咱們將使用該groupId參數爲消費者實現隊列和主題用例。

  1. 建立一個以group-test兩個分區命名的主題: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic group-test
  2. 啓動一個可用於將消息發佈到group-test剛剛建立的主題的生產者: java -cp target/KafkaDemo-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.partition.Producer group-test
  3. 啓動三個聽取發佈到group-test主題的消息的消費者。使用group1你的價值group id。這將爲您提供三個消費者group1 java -cp target/KafkaDemo-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.simple.Consumer group-test group1
  4. 啓動第四個消費者,但此次改變了group idto 的值group2。這將爲您提供三個消費者group1和一個消費者group2 java -cp target/KafkaDemo-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.simple.Consumer group-test group2
  5. 返回生產者控制檯並開始鍵入消息。您發佈的每條新消息都應在group2消費者窗口中出現一次,並在三個group1消費者窗口中出現一次,如圖3所示。

第2部分的結論

大數據消息系統的早期用例須要批處理,例如運行夜間ETL過程或按期將數據從RDBMS移動到NoSQL數據存儲區。在過去幾年中,對實時處理的需求增長,特別是對於欺詐檢測和應急響應系統。Apache Kafka是爲這些類型的實時場景而構建的。

Apache Kafka是一個很好的開源產品,但確實有一些限制; 例如,您沒法在主題到達目標以前從主題內部查詢數據,也不能跨多個地理位置分散的羣集複製數據。您能夠將MapR Streams(商業產品)與Kafka API結合使用,以實現這些和其餘更復雜的發佈 - 訂閱方案。

原文連接:www.javaworld.com/article/306…

查看更多文章

公衆號:銀河系1號

聯繫郵箱:public@space-explore.com

(未經贊成,請勿轉載)

相關文章
相關標籤/搜索