kafka java producer consumer實踐

java提供了方便的API進行kafka消息處理。簡單總結一下:html

學習參考:http://www.itnose.net/st/6095038.htmljava

POM配置(關於LOG4J的配置參看 http://www.cnblogs.com/huayu0815/p/5341712.htmlapache

<dependencies>
      <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka_2.10</artifactId>
          <version>0.8.2.0</version>
          <exclusions>
              <exclusion>
                  <groupId>log4j</groupId>
                  <artifactId>log4j</artifactId>
              </exclusion>
              <exclusion>
                  <groupId>org.slf4j</groupId>
                  <artifactId>slf4j-log4j12</artifactId>
              </exclusion>
          </exclusions>
      </dependency>
      <dependency>
          <groupId>ch.qos.logback</groupId>
          <artifactId>logback-core</artifactId>
          <version>1.1.2</version>
      </dependency>
      <dependency>
          <groupId>ch.qos.logback</groupId>
          <artifactId>logback-access</artifactId>
          <version>1.1.2</version>
      </dependency>
      <dependency>
          <groupId>ch.qos.logback</groupId>
          <artifactId>logback-classic</artifactId>
          <version>1.1.2</version>
      </dependency>
      <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>log4j-over-slf4j</artifactId>
          <version>1.7.7</version>
      </dependency>
  </dependencies>

  

PRODUCERapi

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Properties;

public class KafkaProducer {

    Producer<String, String> producer ;


    /*#指定kafka節點列表,用於獲取metadata,沒必要所有指定
    metadata.broker.list=192.168.2.105:9092,192.168.2.106:9092
            # 指定分區處理類。默認kafka.producer.DefaultPartitioner,表經過key哈希到對應分區
    #partitioner.class=com.meituan.mafka.client.producer.CustomizePartitioner

    # 是否壓縮,默認0表示不壓縮,1表示用gzip壓縮,2表示用snappy壓縮。壓縮後消息中會有頭來指明消息壓縮類型,故在消費者端消息解壓是透明的無需指定。
    compression.codec=none

    # 指定序列化處理類(mafka client API調用說明-->3.序列化約定wiki),默認爲kafka.serializer.DefaultEncoder,即byte[]
    serializer.class=com.meituan.mafka.client.codec.MafkaMessageEncoder
    # serializer.class=kafka.serializer.DefaultEncoder
    # serializer.class=kafka.serializer.StringEncoder
    # 若是要壓縮消息,這裏指定哪些topic要壓縮消息,默認empty,表示不壓縮。
            #compressed.topics=

            ########### request ack ###############
            # producer接收消息ack的時機.默認爲0.
    # 0: producer不會等待broker發送ack
    # 1: 當leader接收到消息以後發送ack
    # 2: 當全部的follower都同步消息成功後發送ack.
            request.required.acks=0
            # 在向producer發送ack以前,broker容許等待的最大時間
    # 若是超時,broker將會向producer發送一個error ACK.意味着上一次消息由於某種
    # 緣由未能成功(好比follower未能同步成功)
    request.timeout.ms=10000
            ########## end #####################


            # 同步仍是異步發送消息,默認「sync」表同步,"async"表異步。異步能夠提升發送吞吐量,
            # 也意味着消息將會在本地buffer中,並適時批量發送,可是也可能致使丟失未發送過去的消息
    producer.type=sync
    ############## 異步發送 (如下四個異步參數可選) ####################
            # 在async模式下,當message被緩存的時間超過此值後,將會批量發送給broker,默認爲5000ms
    # 此值和batch.num.messages協同工做.
            queue.buffering.max.ms = 5000
            # 在async模式下,producer端容許buffer的最大消息量
    # 不管如何,producer都沒法儘快的將消息發送給broker,從而致使消息在producer端大量沉積
    # 此時,若是消息的條數達到閥值,將會致使producer端阻塞或者消息被拋棄,默認爲10000
    queue.buffering.max.messages=20000
            # 若是是異步,指定每次批量發送數據量,默認爲200
    batch.num.messages=500
            # 當消息在producer端沉積的條數達到"queue.buffering.max.meesages"後
    # 阻塞必定時間後,隊列仍然沒有enqueue(producer仍然沒有發送出任何消息)
    # 此時producer能夠繼續阻塞或者將消息拋棄,此timeout值用於控制"阻塞"的時間
    # -1: 無阻塞超時限制,消息不會被拋棄
    # 0:當即清空隊列,消息被拋棄
    queue.enqueue.timeout.ms=-1
            ################ end ###############

            # 當producer接收到error ACK,或者沒有接收到ACK時,容許消息重發的次數
    # 由於broker並無完整的機制來避免消息重複,因此當網絡異常時(好比ACK丟失)
    # 有可能致使broker接收到重複的消息,默認值爲3.
            message.send.max.retries=3


            # producer刷新topic metada的時間間隔,producer須要知道partition leader的位置,以及當前topic的狀況
    # 所以producer須要一個機制來獲取最新的metadata,當producer遇到特定錯誤時,將會當即刷新
    # (好比topic失效,partition丟失,leader失效等),此外也能夠經過此參數來配置額外的刷新機制,默認值600000
    topic.metadata.refresh.interval.ms=60000*/

    public Producer<String, String> getClient() {
        if (producer == null) {
            Properties props = new Properties() ;
            //此處配置的是kafka的端口
            props.put("metadata.broker.list", "xxx.xxx.xxx.xxx:9092");

            //配置value的序列化類
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            props.put("producer.type", "async");
            //配置key的序列化類
            props.put("key.serializer.class", "kafka.serializer.StringEncoder");
            props.put("request.required.acks", "0");
            ProducerConfig config = new ProducerConfig(props) ;
            producer = new Producer<>(config) ;
        }
        return producer ;
    }

    public void shutdown(){
        if (producer != null) {
            producer.close();
        }
    }

    public static void main(String[] args) throws CloneNotSupportedException {
        KafkaProducer kafkaProducer = new KafkaProducer() ;
        for (int i=0 ; i< 10; i ++) {
            kafkaProducer.getClient().send(new KeyedMessage<String, String>("topic1","topic1_" + i + "_測試"));
            kafkaProducer.getClient().send(new KeyedMessage<String, String>("topic2","topic2_" + i + "_測試"));
        }
        kafkaProducer.shutdown();
    }
}

 總結:緩存

一、producer每次new的時候,會自動建立線程池bash

二、producer在調用send方法時候,纔會真正創建socket鏈接。服務器

  鏈接過程以下:網絡

  1>、經過metadata.broker.list獲取對應的brokers全量信息(metadata.broker.list給的broker的ip和端口只要保證一個是可用的便可,無需所有列出。不過開發過程當中,通常所有列出)。session

      2>、根據zookeeper的註冊信息獲取topic的分區信息多線程

  3>、創建client和broker的socket鏈接

三、send結束後,直接關閉socket鏈接。

四、每次send會從新創建鏈接

五、client會自動獲取topic的分區信息,所以kafka rebalance的時候,是不受影響的

 

CONSUMER

consumer api官方有兩種,通常稱爲:high-level Consumer API 和 SimpleConsumer API 。

第一種高度抽象的Consumer API,它使用起來簡單、方便,可是對於某些特殊的需求咱們可能要用到第二種更底層的API,先簡單介紹下第二種API可以幫助咱們作哪些事情
  • 一個消息讀取屢次
  • 在一個處理過程當中只消費Partition其中的一部分消息
  • 添加事務管理機制以保證消息被處理且僅被處理一次

  使用第二種的弊端:

  • 必須在程序中跟蹤offset值
  • 必須找出指定Topic Partition中的lead broker
  • 必須處理broker的變更

我主要嘗試了一下第一種也是大多數狀況下使用的API。

使用high-level Consumer api,有兩種用法:單個消費者和多個消費者

單消費者:

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

public class KafkaSingleConsumer {

    /**
     * # zookeeper鏈接服務器地址,此處爲線下測試環境配置(kafka消息服務-->kafka broker集羣線上部署環境wiki)
     # 配置例子:"127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
     zookeeper.connect=192.168.2.225:2181,192.168.2.225:2182,192.168.2.225:2183/config/mobile/mq/mafka
     # zookeeper的session過時時間,默認5000ms,用於檢測消費者是否掛掉,當消費者掛掉,其餘消費者要等該指定時間才能檢查到而且觸發從新負載均衡
     zookeeper.session.timeout.ms=5000
     zookeeper.connection.timeout.ms=10000
     # 指定多久消費者更新offset到zookeeper中。注意offset更新時基於time而不是每次得到的消息。一旦在更新zookeeper發生異常並重啓,將可能拿到已拿到過的消息
     zookeeper.sync.time.ms=2000

     #指定消費組
     group.id=xxx
     # 當consumer消費必定量的消息以後,將會自動向zookeeper提交offset信息
     # 注意offset信息並非每消費一次消息就向zk提交一次,而是如今本地保存(內存),並按期提交,默認爲true
     auto.commit.enable=true
     # 自動更新時間。默認60 * 1000
     auto.commit.interval.ms=1000

     # 當前consumer的標識,能夠設定,也能夠有系統生成,主要用來跟蹤消息消費狀況,便於觀察
     conusmer.id=xxx

     # 消費者客戶端編號,用於區分不一樣客戶端,默認客戶端程序自動產生
     client.id=xxxx
     # 最大取多少塊緩存到消費者(默認10)
     queued.max.message.chunks=50
     # 當有新的consumer加入到group時,將會reblance,此後將會有partitions的消費端遷移到新
     # 的consumer上,若是一個consumer得到了某個partition的消費權限,那麼它將會向zk註冊
     # "Partition Owner registry"節點信息,可是有可能此時舊的consumer尚沒有釋放此節點,
     # 此值用於控制,註冊節點的重試次數.
     rebalance.max.retries=5
     # 獲取消息的最大尺寸,broker不會像consumer輸出大於此值的消息chunk
     # 每次feth將獲得多條消息,此值爲總大小,提高此值,將會消耗更多的consumer端內存
     fetch.min.bytes=6553600
     # 當消息的尺寸不足時,server阻塞的時間,若是超時,消息將當即發送給consumer
     fetch.wait.max.ms=5000
     socket.receive.buffer.bytes=655360

     # 若是zookeeper沒有offset值或offset值超出範圍。那麼就給個初始的offset。有smallest、largest、
     # anything可選,分別表示給當前最小的offset、當前最大的offset、拋異常。默認largest
     auto.offset.reset=smallest
     # 指定序列化處理類(mafka client API調用說明-->3.序列化約定wiki),默認爲kafka.serializer.DefaultDecoder,即byte[]
     derializer.class=com.meituan.mafka.client.codec.MafkaMessageDecoder
     */

    public static void main(String args[]) {
        String topic = "topic1" ;
        Properties props = new Properties();
        props.put("zookeeper.connect", "xxx.xxx.xxx:2181");
        props.put("group.id", "testgroup");
        props.put("zookeeper.session.timeout.ms", "500");
        props.put("zookeeper.sync.time.ms", "250");
        props.put("auto.commit.interval.ms", "1000");
        ConsumerConfig config = new ConsumerConfig(props);
        ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config) ;

        Map<String, Integer> topicMap = new HashMap<>();
        // Define single thread for topic
        topicMap.put(topic, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreamsMap = consumer.createMessageStreams(topicMap);
        List<KafkaStream<byte[], byte[]>> streamList = consumerStreamsMap.get(topic);
        for (KafkaStream<byte[], byte[]> stream : streamList) {
            ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
            while (consumerIte.hasNext())
                System.out.println("Message from Single Topic :: "     + new String(consumerIte.next().message()));
        }
        if (consumer != null)
            consumer.shutdown();
    }
}

多消費者

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class KafkaMultiConsumer {

    /**
     * # zookeeper鏈接服務器地址,此處爲線下測試環境配置(kafka消息服務-->kafka broker集羣線上部署環境wiki)
     # 配置例子:"127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
     zookeeper.connect=192.168.2.225:2181,192.168.2.225:2182,192.168.2.225:2183/config/mobile/mq/mafka
     # zookeeper的session過時時間,默認5000ms,用於檢測消費者是否掛掉,當消費者掛掉,其餘消費者要等該指定時間才能檢查到而且觸發從新負載均衡
     zookeeper.session.timeout.ms=5000
     zookeeper.connection.timeout.ms=10000
     # 指定多久消費者更新offset到zookeeper中。注意offset更新時基於time而不是每次得到的消息。一旦在更新zookeeper發生異常並重啓,將可能拿到已拿到過的消息
     zookeeper.sync.time.ms=2000

     #指定消費組
     group.id=xxx
     # 當consumer消費必定量的消息以後,將會自動向zookeeper提交offset信息
     # 注意offset信息並非每消費一次消息就向zk提交一次,而是如今本地保存(內存),並按期提交,默認爲true
     auto.commit.enable=true
     # 自動更新時間。默認60 * 1000
     auto.commit.interval.ms=1000

     # 當前consumer的標識,能夠設定,也能夠有系統生成,主要用來跟蹤消息消費狀況,便於觀察
     conusmer.id=xxx

     # 消費者客戶端編號,用於區分不一樣客戶端,默認客戶端程序自動產生
     client.id=xxxx
     # 最大取多少塊緩存到消費者(默認10)
     queued.max.message.chunks=50
     # 當有新的consumer加入到group時,將會reblance,此後將會有partitions的消費端遷移到新
     # 的consumer上,若是一個consumer得到了某個partition的消費權限,那麼它將會向zk註冊
     # "Partition Owner registry"節點信息,可是有可能此時舊的consumer尚沒有釋放此節點,
     # 此值用於控制,註冊節點的重試次數.
     rebalance.max.retries=5
     # 獲取消息的最大尺寸,broker不會像consumer輸出大於此值的消息chunk
     # 每次feth將獲得多條消息,此值爲總大小,提高此值,將會消耗更多的consumer端內存
     fetch.min.bytes=6553600
     # 當消息的尺寸不足時,server阻塞的時間,若是超時,消息將當即發送給consumer
     fetch.wait.max.ms=5000
     socket.receive.buffer.bytes=655360

     # 若是zookeeper沒有offset值或offset值超出範圍。那麼就給個初始的offset。有smallest、largest、
     # anything可選,分別表示給當前最小的offset、當前最大的offset、拋異常。默認largest
     auto.offset.reset=smallest
     # 指定序列化處理類(mafka client API調用說明-->3.序列化約定wiki),默認爲kafka.serializer.DefaultDecoder,即byte[]
     derializer.class=com.meituan.mafka.client.codec.MafkaMessageDecoder
     */

    public static void main(String args[]) {
        String topic = "topic1" ;
        int threadCount = 3;
        Properties props = new Properties();
        props.put("zookeeper.connect", "xxx.xxx.xxx.xxx:2181");
        props.put("group.id", "testgroup");
        props.put("zookeeper.session.timeout.ms", "500");
        props.put("zookeeper.sync.time.ms", "250");
        props.put("auto.commit.interval.ms", "1000");
        ConsumerConfig config = new ConsumerConfig(props);
        ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config) ;

        Map<String, Integer> topicMap = new HashMap<>();
        // Define single thread for topic
        topicMap.put(topic, 3);
        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreamsMap = consumer.createMessageStreams(topicMap);
        List<KafkaStream<byte[], byte[]>> streamList = consumerStreamsMap.get(topic);
        int count = 0;
        for (final KafkaStream<byte[], byte[]> stream : streamList) {
            final String threadNumber = "Thread" + count ;
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
                    while (consumerIte.hasNext()) {
                        System.out.println("Thread Number " + threadNumber + ": " + new String(consumerIte.next().message()));
                    }
                }
            });
            count++ ;
        }
    }


}

總結:

一、KAFKA容許多個consumer group,每一個group容許多個consumer。不一樣group之間共享信息(相似發佈-訂閱模式),同一個group之間的多個consumer只會消費消息一次(相似生產-消費者模式)。

二、對同一個topic啓動多個java consumer線程,在zookeeper上能夠看到多個信息:  

[zk:xxx.xxx.xxx.xxx:2181(CONNECTED) 120] ls /consumers/testgroup/ids             
[testgroup_xxx-1459926903849-fea50e90, testgroup_xxx-1459926619712-8d1caf90]

三、若是多線程方式啓動consumer,能夠看到不一樣的consumer綁定到不一樣的topic patition上

[zk: xxx.xxx.xxx.xxx:2181(CONNECTED) 121] get /consumers/testgroup/owners/topic1/1
testgroup_xxx-1459926619712-8d1caf90-1
cZxid = 0x2000006e2
ctime = Wed Apr 06 03:15:04 EDT 2016
mZxid = 0x2000006e2
mtime = Wed Apr 06 03:15:04 EDT 2016
pZxid = 0x2000006e2
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x153413bc26e007e
dataLength = 44
numChildren = 0
[zk: xxx.xxx.xxx.xxx:2181(CONNECTED) 122] get /consumers/testgroup/owners/topic1/0
testgroup_xxx-1459926619712-8d1caf90-0
cZxid = 0x2000006e3
ctime = Wed Apr 06 03:15:04 EDT 2016
mZxid = 0x2000006e3
mtime = Wed Apr 06 03:15:04 EDT 2016
pZxid = 0x2000006e3
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x153413bc26e007e
dataLength = 44
numChildren = 0

四、對於啓動多個consumer進程或是以多線程方式啓動單個consumer進程,區別僅僅在與zookeeper上註冊的consumer信息是多個或是一個「ls /consumers/testgroup/ids 」,可是對於消息的消費而言,都遵照消費只消費一次,同一個分區只會綁定一個consumer信息。

五、若是某個消費者掛掉的話,consumer和partition的綁定信息會從新分配,儘量的保證負載平衡

六、若是consumer的數量大於分區數量,會形成多餘的那部分線程沒法獲取消息,不斷 Got ping response for sessionid: 0x153413bc26e0082 after 2ms。是一種資源的浪費

若是多臺服務器都啓動consumer進程,最好根據分區數合理分配consumer進程中,消費線程的數量

 

更底層的細節問題,後期遇到再繼續調研,先會用,明白大體原理!

相關文章
相關標籤/搜索