kafka生產者與消費者

package kafka;


import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.log4j.Logger;

import java.util.Properties;


public class Producer {

    Logger logger = Logger.getLogger("Producer");

    public KafkaProducer getKafkaProducer() {
        Properties kafkaProps = new Properties();
        /**
         * kafka生產者必選是三個屬性
         * bootstrap.servers 指定broker的地址清單
         * key.serializer 必須是一個實現org.apache.kafka.common.serialization.Serializer接口的類,將key序列化成字節數組。注意:key.serializer必須被設置,即便消息中沒有指定key
         * value.serializer  將value序列化成字節數組
         */

        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //其餘設置
        /*
        acks=0:若是設置爲0,生產者不會等待kafka的響應,高吞吐。消息會被馬上加到發送緩衝通道中,而且認爲已經發送成功。這種狀況下,不能保證kafka接收到了這條消息,retries配置不會生效,每條消息的偏移量都是1;

        acks=1:這個配置意味着kafka會把這條消息寫到本地日誌文件中,可是不會等待集羣中其餘機器的成功響應。這種狀況下,在寫入日誌成功後,集羣主機器掛掉,同時從機器還沒來得及寫的話,消息就會丟失掉。

        acks=all:這個配置意味着leader會等待全部的follower同步完成。這個確保消息不會丟失,除非kafka集羣中全部機器掛掉。這是最強的可用性保證,最安全模式,但延遲相對較長。

        (1)acks指定必需要有多少個partition副本收到消息,生產者纔會認爲消息的寫入是成功的。

              acks=0,生產者不須要等待服務器的響應,以網絡能支持的最大速度發送消息,吞吐量高,可是若是broker沒有收到消息,生產者是不知道的

              acks=1,leader partition收到消息,生產者就會收到一個來自服務器的成功響應

              acks=all,全部的partition都收到消息,生產者纔會收到一個服務器的成功響應

        (2)buffer.memory,設置生產者內緩存區域的大小,生產者用它緩衝要發送到服務器的消息。

        (3)compression.type,默認狀況下,消息發送時不會被壓縮,該參數能夠設置成snappy、gzip或lz4對發送給broker的消息進行壓縮

        (4)retries,生產者從服務器收到臨時性錯誤時,生產者重發消息的次數

        (5)batch.size,發送到同一個partition的消息會被先存儲在batch中,該參數指定一個batch能夠使用的內存大小,單位是byte。不必定須要等到batch被填滿才能發送

        (6)linger.ms,生產者在發送消息前等待linger.ms,從而等待更多的消息加入到batch中。若是batch被填滿或者linger.ms達到上限,就把batch中的消息發送出去

        (7)max.in.flight.requests.per.connection,生產者在收到服務器響應以前能夠發送的消息個數
         */
        kafkaProps.put("acks", "all");//
        return new KafkaProducer(kafkaProps);
    }

    /**
     * 同步發送
     *
     * @param topic
     * @param key
     * @param value
     * @param kafkaProducer
     */
    public void sendMsgSynchr(String topic, String key, String value, KafkaProducer kafkaProducer) {

        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, key, value);
        kafkaProducer.send(producerRecord);
    }

    /**
     * 異步發送
     *
     * @param topic
     * @param key
     * @param value
     * @param kafkaProducer
     */
    public void sendMsgAsynchr(String topic, String key, String value, KafkaProducer kafkaProducer) {
        ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, key, value);
        kafkaProducer.send(producerRecord, new ProducerCallback());//發送消息時,傳遞一個回調對象,該回調對象必須實現org.apahce.kafka.clients.producer.Callback接口
    }

    private class ProducerCallback implements Callback {

        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            if (e != null) {//若是Kafka返回一個錯誤,onCompletion方法拋出一個non null異常。
                e.printStackTrace();//對異常進行一些處理,這裏只是簡單打印出來
            }
        }
    }

    public static void main(String[] args) {
        Producer producer = new Producer();
        KafkaProducer kafkaProducer = producer.getKafkaProducer();
        for (int i = 0; i < 100; i++) {
            String msg = "msg------" + i;
            System.out.println(msg);
            producer.sendMsgAsynchr("test_kafka", null, msg, kafkaProducer);
        }
    }
}
package kafka;


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Collections;
import java.util.Properties;

public class Consumer {


    public KafkaConsumer getKafkaConsumer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "groupid1");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //其餘參數
        /*
        1:fetch.min.bytes,指定消費者從broker獲取消息的最小字節數,即等到有足夠的數據時才把它返回給消費者

        2:fetch.max.wait.ms,等待broker返回數據的最大時間,默認是500ms。fetch.min.bytes和fetch.max.wait.ms哪一個條件先獲得知足,就按照哪一種方式返回數據

        3:max.partition.fetch.bytes,指定broker從每一個partition中返回給消費者的最大字節數,默認1MB

        4:session.timeout.ms,指定消費者被認定死亡以前能夠與服務器斷開鏈接的時間,默認是3s

        5:auto.offset.reset,消費者在讀取一個沒有偏移量或者偏移量無效的狀況下(由於消費者長時間失效,包含偏移量的記錄已通過時並被刪除)該做何處理。默認是latest(消費者從最新的記錄開始讀取數據)。另外一個值是                    earliest(消費者從起始位置讀取partition的記錄)

        6:enable.auto.commit,指定消費者是否自動提交偏移量,默認爲true

        7:partition.assignment.strategy,指定partition如何分配給消費者,默認是Range。Range:把Topic的若干個連續的partition分配給消費者。RoundRobin:把Topic的全部partition逐個分配給消費者

        8:max.poll.records,單次調用poll方法可以返回的消息數量
         */
        return new KafkaConsumer(props);
    }

    public void getMsg(String topic, KafkaConsumer kafkaConsumer) {
        //2.訂閱Topic

        //建立一個只包含單個元素的列表,Topic的名字叫做customerCountries
        kafkaConsumer.subscribe(Collections.singletonList(topic));  //主題列表
        //支持正則表達式,訂閱全部與test相關的Topic
        //consumer.subscribe("test.*");
        //3.輪詢
        //消息輪詢是消費者的核心API,經過一個簡單的輪詢向服務器請求數據,一旦消費者訂閱了Topic,輪詢就會處理所欲的細節,包括羣組協調、partition再均衡、發送心跳
        //以及獲取數據,開發者只要處理從partition返回的數據便可。

//        try {
        while (true) {//消費者是一個長期運行的程序,經過持續輪詢向Kafka請求數據。在其餘線程中調用consumer.wakeup()能夠退出循環
            //在100ms內等待Kafka的broker返回數據.超市參數指定poll在多久以後能夠返回,無論有沒有可用的數據都要返回

            ConsumerRecords<String, String> records = kafkaConsumer.poll(100l);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.value());
            }
        }
//        } finally {
//            //退出應用程序前使用close方法關閉消費者,網絡鏈接和socket也會隨之關閉,並當即觸發一次再均衡
//            kafkaConsumer.close();
//        }
    }

    public static void main(String[] args) {
        Consumer consumer = new Consumer();
        KafkaConsumer kafkaConsumer = consumer.getKafkaConsumer();
        consumer.getMsg("test_kafka", kafkaConsumer);
    }
}
相關文章
相關標籤/搜索