Kafka簡單入門

什麼是kafka

Apache Kafka® 是 一個分佈式流處理平臺apache

上面是官網的介紹,和通常的消息處理系統相比,不一樣之處在於:編程

  1. kafka是一個分佈式系統,易於向外擴展
  2. 它同時爲發佈和訂閱提供高吞吐量
  3. 它支持多訂閱者,當失敗時能自動平衡消費者
  4. 消息的持久化

和其餘的消息系統之間的對比:安全

對比指標 kafka activemq rabbitmq rocketmq
背景 Kafka 是LinkedIn 開發的一個高性能、分佈式的消息系統,普遍用於日誌收集、流式數據處理、在線和離線消息分發等場景 ActiveMQ是一種開源的,實現了JMS1.1規範的,面向消息(MOM)的中間件, 爲應用程序提供高效的、可擴展的、穩定的和安全的企業級消息通訊。 RabbitMQ是一個由erlang開發的AMQP協議(Advanced Message Queue )的開源實現。 RocketMQ是阿里巴巴在2012年開源的分佈式消息中間件,目前已經捐贈給Apache基金會,已經於2016年11月成爲 Apache 孵化項目
開發語言 Java、Scala Java Erlang Java
協議支持 本身實現的一套 JMS協議 AMQP JMS、MQTT
持久化 支持 支持 支持 支持
producer容錯 在kafka中提供了acks配置選項, acks=0 生產者在成功寫入悄息以前不會等待任何來自服務器的響應 acks=1 只要集羣的首領節點收到消息,生產者就會收到一個來自服務器的成功響應 acks=all 只有當全部參與複製的節點所有收到消息時,生產者纔會收到一個來自服務器的成功響應,這種模式最安全 發送失敗後便可重試 有ack模型。 ack模型可能重複消息 ,事務模型保證徹底一致 和kafka相似
吞吐量 kafka具備高的吞吐量,內部採用消息的批量處理,zero-copy機制,數據的存儲和獲取是本地磁盤順序批量操做,具備O(1)的複雜度,消息處理的效率很高 rabbitMQ在吞吐量方面稍遜於kafka,他們的出發點不同,rabbitMQ支持對消息的可靠的傳遞,支持事務,不支持批量的操做;基於存儲的可靠性的要求存儲能夠採用內存或者硬盤。 kafka在topic數量很少的狀況下吞吐量比rocketMq高,在topic數量多的狀況下rocketMq比kafka高
負載均衡 kafka採用zookeeper對集羣中的broker、consumer進行管理,能夠註冊topic到zookeeper上;經過zookeeper的協調機制,producer保存對應topic的broker信息,能夠隨機或者輪詢發送到broker上;而且producer能夠基於語義指定分片,消息發送到broker的某分片上 rabbitMQ的負載均衡須要單獨的loadbalancer進行支持 NamerServer進行負載均衡

架構圖:bash

使用實例:

producer:

public class Producer extends Thread {

    private final KafkaProducer<Integer, String> producer;
    private final String topic;
    private final Boolean isAsync;

    public Producer(String topic, Boolean isAsync) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producer = new KafkaProducer<>(props);
        this.topic = topic;
        this.isAsync = isAsync;
    }

    @Override
    public void run() {
        int messageNo = 1;
        while (true) {
            String messageStr = "Message_" + messageNo;
            long startTime = System.currentTimeMillis();
            if (isAsync) { // Send asynchronously
                producer.send(new ProducerRecord<>(topic,
                        messageNo,
                        messageStr), new DemoCallBack(startTime, messageNo, messageStr));
            } else { // Send synchronously
                try {
                    producer.send(new ProducerRecord<>(topic,
                            messageNo,
                            messageStr)).get();
                    System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            }
            ++messageNo;
        }
    }

    class DemoCallBack implements Callback {

        private final long startTime;
        private final int key;
        private final String message;

        public DemoCallBack(long startTime, int key, String message) {
            this.startTime = startTime;
            this.key = key;
            this.message = message;
        }

        /**
         * A callback method the user can implement to provide asynchronous handling of request completion. This method will
         * be called when the record sent to the server has been acknowledged. Exactly one of the arguments will be
         * non-null.
         *
         * @param metadata  The metadata for the record that was sent (i.e. the partition and offset). Null if an error
         *                  occurred.
         * @param exception The exception thrown during processing of this record. Null if no error occurred.
         */
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            long elapsedTime = System.currentTimeMillis() - startTime;
            if (metadata != null) {
                System.out.println(
                        "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
                                "), " +
                                "offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
            } else {
                exception.printStackTrace();
            }
        }
    }
}
複製代碼

consumer:

public class Consumer extends Thread {
    private final KafkaConsumer<Integer, String> consumer;
    private final String topic;

    public Consumer(String topic) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        consumer = new KafkaConsumer<>(props);
        this.topic = topic;
    }

    @Override
    public void run() {
        while (true) {
            consumer.subscribe(Collections.singletonList(this.topic));
            ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1).getSeconds());
            for (ConsumerRecord<Integer, String> record : records) {
                System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
            }
        }
    }
}
複製代碼

properties:

public class KafkaProperties {
    public static final String TOPIC = "topic1";
    public static final String KAFKA_SERVER_URL = "localhost";
    public static final int KAFKA_SERVER_PORT = 9092;
    public static final int KAFKA_PRODUCER_BUFFER_SIZE = 64 * 1024;
    public static final int CONNECTION_TIMEOUT = 100000;
    public static final String TOPIC2 = "topic2";
    public static final String TOPIC3 = "topic3";
    public static final String CLIENT_ID = "SimpleConsumerDemoClient";

    private KafkaProperties() {}
}
複製代碼

相關名詞:

  1. Producer :消息生產者,向Broker發送消息的客戶端
  2. Consumer :消息消費者,從Broker讀取消息的客戶端,消費者<=消息的分區數量
  3. broker :消息中間件處理節點,一個Kafka節點就是一個broker,一個或者多個Broker能夠組成一個Kafka集羣
  4. topic : 主題,Kafka根據topic對消息進行歸類,發佈到Kafka集羣的每條消息都須要指定一個topic
  5. Partition : 分區,物理上的概念,一個topic能夠分爲多個partition,每一個partition內部是有序的,kafka默認根據key%partithon肯定消息發送到具體的partition
  6. ConsumerGroup : 每一個Consumer屬於一個特定的Consumer Group,一條消息能夠發送到多個不一樣的Consumer Group,可是一個Consumer Group中只能有一個Consumer可以消費該消息

Topic 和 Partition

  • 一個Topic中的消息會按照指定的規則(默認是key的hash值%分區的數量,固然你也能夠自定義),發送到某一個分區上面;
  • 每個分區都是一個順序的、不可變的消息隊列,而且能夠持續的添加。分區中的消息都被分了一個序列號,稱之爲偏移量(offset),在每一個分區中此偏移量都是惟一的
  • 消費者所持有的元數據就是這個偏移量,也就是消費者在這個log(分區)中的位置。這個偏移量由消費者控制:正常狀況當消費者消費消息的時候,偏移量也線性的的增長

Consumer 和 Partition

  • 一般來說,消息模型能夠分爲兩種, 隊列和發佈-訂閱式。隊列的處理方式 是一個消費者組從隊列的一端拉取數據,這個數據消費完就沒了。在發佈-訂閱模型中,消息被廣播給全部的消費者,接受到消息的消費者都能處理此消息。在Kafka模型中抽象出來了:消費者組(consumer group)
  • 消費者組(consumer group):每一個組中有若干個消費者,若是全部的消費者都在一個組中,那麼這個就變成了隊列模型;若是笑消費者在不一樣的組中,這就成了發佈-訂閱模型
  • 一個分區裏面的數據只會由一個分組中的消費者處理,同分組的其餘消費者不會重複處理
  • 消費者組中的消費者數量<=分區數量,若是大於分區數量,多出來的消費者會處於收不到消息的狀態,形成沒必要要的浪費。

最後

小尾巴走一波,歡迎關注個人公衆號,不按期分享編程、投資、生活方面的感悟:)服務器