kafka消費者客戶端

Kafka消費者

1.1 消費者與消費者組

消費者與消費者組之間的關係java

​ 每個消費者都隸屬於某一個消費者組,一個消費者組能夠包含一個或多個消費者,每一條消息只會被消費者組中的某一個消費者所消費。不一樣消費者組之間消息的消費是互不干擾的。node

爲何會有消費者組的概念正則表達式

​ 消費者組出現主要是出於兩個目的:apache

​ (1) 使總體的消費能力具有橫向的伸縮性。能夠適當增長消費者組中消費者的數量,來提升總體的消費能力。可是每個分區至多被消費者組的中一個消費者所消費,所以當消費者組中消費者數量超過度區數時,多出的消費者不會分配到任何一個分區。固然這是默認的分區分配策略,可經過partition.assignment.strategy進行配置。緩存

​ (2) 實現消息消費的隔離。不一樣消費者組之間消息消費互不干擾,從而實現發佈訂閱這種消息投遞模式。安全

注意:多線程

​ 消費者隸屬的消費者組能夠經過group.id進行配置。消費者組是一個邏輯上的概念,但消費者並非一個邏輯上的概念,它能夠是一個線程,也能夠是一個進程。同一個消費者組內的消費者能夠部署在同一臺機器上,也能夠部署在不一樣的機器上。併發

1.2 消費者客戶端開發

​ 一個正常的消費邏輯須要具有如下幾個步驟:負載均衡

  • 配置消費者客戶端參數及建立相應的消費者實例。異步

  • 訂閱主題

  • 拉取消息並消費

  • 提交消費位移

  • 關閉消費者實例

  public class KafkaConsumerAnalysis {
  public static final String brokerList="node112:9092,node113:9092,node114:9092";
  public static final String topic = "topic-demo";
  public static final String groupId = "group.demo";
  public static final AtomicBoolean isRunning = new AtomicBoolean(true);

  public static Properties initConfig() {
      Properties prop = new Properties();
      prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
      prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
      prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
      prop.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
      prop.put(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG, "consumer.client.di.demo");
      return prop;
  }


  public static void main(String[] args) {
      KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(initConfig());
       
      for (ConsumerRecord<String, String> record : records) {
          System.out.println("topic = " + record.topic() + ", partition =" +                                               record.partition() + ", offset = " + record.offset());
          System.out.println("key = " + record.key() + ", value = " + record.value());
              }
          }
      } catch (Exception e) {
          e.printStackTrace();
      }finally {
          consumer.close();
      }
  }
}

1.2.1 訂閱主題和分區

​ 先來講一下消費者訂閱消息的粒度:一個消費者能夠訂閱一個主題、多個主題、或者多個主題的特定分區。主要經過subsribe和assign兩個方法實現訂閱。

(1)訂閱一個主題:

​ public void subscribe(Collection<String> topics),當集合中有一個主題時。

(2)訂閱多個主題:

​ public void subscribe(Collection<String> topics),當集合中有多個主題時。

​ public void subscribe(Pattern pattern),經過正則表達式實現消費者主題的匹配。經過這種方式,若是在消息消費的過程當中,又添加了新的可以匹配到正則的主題,那麼消費者就能夠消費到新添加的主題。 consumer.subscribe(Pattern.compile("topic-.*"));

(3)多個主題的特定分區

​ public void assign(Collection<TopicPartition> partitions),能夠實現訂閱某些特定的主題分區。TopicPartition包括兩個屬性:topic(String)和partition(int)。

​ 若是事先不知道有多少分區該如何處理,KafkaConsumer中的partitionFor方法能夠得到指定主題分區的元數據信息:

​ public List<PartitionInfo> partitionsFor(String topic)

​ PartitionInfo的屬性以下:

  
public class PartitionInfo {
  private final String topic;//主題
  private final int partition;//分區
  private final Node leader;//分區leader
  private final Node[] replicas;//分區的AR
  private final Node[] inSyncReplicas;//分區的ISR
  private final Node[] offlineReplicas;//分區的OSR
}

​ 所以也能夠經過這個方法實現某個主題的所有訂閱。

​ 須要指出的是,subscribe(Collection)、subscirbe(Pattern)、assign(Collection)方法分別表明了三種不一樣的訂閱狀態:AUTO_TOPICS、AUTO_PATTREN和USER_ASSIGN,這三種方式是互斥的,消費者只能使用其中一種,不然會報出IllegalStateException。

​ subscirbe方法能夠實現消費者自動再平衡的功能。多個消費者的狀況下,能夠根據分區分配策略自動分配消費者和分區的關係,當消費者增長或減小時,也能實現負載均衡和故障轉移。

​ 如何實現取消訂閱:

​ consumer.unsubscribe()

1.2.2 反序列化

​ KafkaProducer端生產消息進行序列化,一樣消費者就要進行相應的反序列化。至關於根據定義的序列化格式的一個逆序提取數據的過程。

  
import com.gdy.kafka.producer.Company;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;

public class CompanyDeserializer implements Deserializer<Company> {
  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {

  }

  @Override
  public Company deserialize(String topic, byte[] data) {
      if(data == null) {
          return null;
      }

      if(data.length < 8) {
          throw new SerializationException("size of data received by Deserializer is shorter than expected");
      }

      ByteBuffer buffer = ByteBuffer.wrap(data);
      int nameLength = buffer.getInt();
      byte[] nameBytes = new byte[nameLength];
      buffer.get(nameBytes);
      int addressLen = buffer.getInt();
      byte[] addressBytes = new byte[addressLen];
      buffer.get(addressBytes);
      String name,address;
      try {
          name = new String(nameBytes,"UTF-8");
          address = new String(addressBytes,"UTF-8");
      }catch (UnsupportedEncodingException e) {
          throw new SerializationException("Error accur when deserializing");
      }

      return new Company(name, address);
  }

  @Override
  public void close() {

  }
}

​ 實際生產中須要自定義序列化器和反序列化器時,推薦使用Avro、JSON、Thrift、ProtoBuf或者Protostuff等通用的序列化工具來包裝。

1.2.3 消息消費

​ Kafka中消息的消費是基於拉模式的,kafka消息的消費是一個不斷輪旋的過程,消費者須要作的就是重複的調用poll方法。

  
public ConsumerRecords<K, V> poll(final Duration timeout)

​ 這個方法須要注意的是,若是消費者的緩衝區中有可用的數據,則會當即返回,不然會阻塞至timeout。若是在阻塞時間內緩衝區仍沒有數據,則返回一個空的消息集。timeout的設置取決於應用程序對效應速度的要求。若是應用線程的位移工做是從Kafka中拉取數據並進行消費能夠將這個參數設置爲Long.MAX_VALUE。

​ 每次poll都會返回一個ConsumerRecords對象,它是ConsumerRecord的集合。對於ConsumerRecord相比於ProducerRecord多了一些屬性:

  
private final String topic;//主題
  private final int partition;//分區
  private final long offset;//偏移量
  private final long timestamp;//時間戳
  private final TimestampType timestampType;//時間戳類型
  private final int serializedKeySize;//序列化key的大小
  private final int serializedValueSize;//序列化value的大小
  private final Headers headers;//headers
  private final K key;//key
  private final V value;//value
  private volatile Long checksum;//CRC32校驗和

​ 另外咱們能夠按照分區維度對消息進行消費,經過ConsumerRecords.records(TopicPartiton)方法實現。

  
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    Set<TopicPartition> partitions = records.partitions();
    for (TopicPartition tp : partitions) {
        for (ConsumerRecord<String, String> record : records.records(tp)) {
              System.out.println(record.partition() + " ," + record.value());
        }
    }

​ 另外還能夠按照主題維度對消息進行消費,經過ConsumerRecords.records(Topic)實現。

  
for (String topic : topicList) {
      for (ConsumerRecord<String, String> record : records.records(topic)) {
              System.out.println(record.partition() + " ," + record.value());
      }
}

1.2.4 消費者位移提交

​ 首先要 明白一點,消費者位移是要作持久化處理的,不然當發生消費者崩潰或者消費者重平衡時,消費者消費位移沒法得到。舊消費者客戶端是將位移提交到zookeeper上,新消費者客戶端將位移存儲在Kafka內部主題_consumer_offsets中。

​ KafkaConsumer提供了兩個方法position(TopicPatition)和commited(TopicPartition)。

​ public long position(TopicPartition partition)-----得到下一次拉取數據的偏移量

​ public OffsetAndMetadata committed(TopicPartition partition)-----給定分區的最後一次提交的偏移量。

還有一個概念稱之爲lastConsumedOffset,這個指的是最後一次消費的偏移量。

​ 在kafka提交方式有兩種:自動提交和手動提交。

(1)自動位移提交

​ kafka默認狀況下采用自動提交,enable.auto.commit的默認值爲true。固然自動提交併非沒消費一次消息就進行提交,而是按期提交,這個按期的週期時間由auto.commit.intervals.ms參數進行配置,默認值爲5s,固然這個參數生效的前提就是開啓自動提交。

​ 自動提交會形成重複消費和消息丟失的狀況。重複消費很容易理解,由於自動提交實際是延遲提交,所以很容易形成重複消費,而後消息丟失是怎麼產生的?

(2)手動位移提交

​ 開始手動提交的須要配置enable.auto.commit=false。手動提交消費者偏移量,又可分爲同步提交和異步提交。

​ 同步提交:

​ 同步提交很簡單,調用commitSync() 方法:

  
while (isRunning.get()) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        for (ConsumerRecord<String, String> record : records) {
            //consume message
            consumer.commitSync();
        }
}

​ 這樣,每消費一條消息,提交一個偏移量。固然可用過緩存消息的方式,實現批量處理+批量提交:

  
while (isRunning.get()) {
      ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
      for (ConsumerRecord<String, String> record : records) {
            buffer.add(record);
      }
      if (buffer.size() >= minBaches) {
          for (ConsumerRecord<String, String> record : records) {
              //consume message
          }
          consumer.commitSync();
          buffer.clear();
      }
}

​ 還能夠經過public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets)這個方法實現按照分區粒度進行同步提交。

  
while (isRunning.get()) {
  ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
  for (TopicPartition tp : records.partitions()) {
      List<ConsumerRecord<String, String>> partitionRecords = records.records(tp);
      for (ConsumerRecord record : partitionRecords) {
          //consume message
      }
      long lastConsumerOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
      consumer.commitSync(Collections.singletonMap(tp,new                                                               OffsetAndMetadata(lastConsumerOffset+1)));
  }
}

​ 異步提交:

​ commitAsync異步提交的時候消費者線程不會被阻塞,便可能在提交偏移量的結果還未返回以前,就開始了新一次的拉取數據操做。異步提交能夠提高消費者的性能。commitAsync有三個重載:

​ public void commitAsync()

​ public void commitAsync(OffsetCommitCallback callback)

​ public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback )

​ 對照同步提交的方法參數,多了一個Callback回調參數,它提供了一個異步提交的回調方法,當消費者位移提交完成後回調OffsetCommitCallback的onComplement方法。以第二個方法爲例:

  
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
  for (ConsumerRecord<String, String> record : records) {
      //consume message
  }
  consumer.commitAsync(new OffsetCommitCallback() {
      @Override
      public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
          if (e == null) {
              System.out.println(offsets);
          }else {
                e.printStackTrace();
          }
      }
});

1.2.5 控制和關閉消費

​ kafkaConsumer提供了pause()和resume() 方法分別實現暫停某些分區在拉取操做時返回數據給客戶端和恢復某些分區向客戶端返回數據的操做:

​ public void pause(Collection<TopicPartition> partitions)

​ public void resume(Collection<TopicPartition> partitions)

​ 優雅中止KafkaConsumer退出消費者循環的方式:

​ (1)不要使用while(true),而是使用while(isRunning.get()),isRunning是一個AtomicBoolean類型,能夠在其餘地方調用isRunning.set(false)方法退出循環。

​ (2)調用consumer.wakup()方法,wakeup方法是KafkaConsumer中惟一一個能夠從其餘線程裏安全調用的方法,會拋出WakeupException,咱們不須要處理這個異常。

​ 跳出循環後必定要顯示的執行關閉動做和釋放資源。

1.2.6 指定位移消費

KafkaConsumer可經過兩種方式實現實現不一樣粒度的指定位移消費。第一種是經過auto.offset.reset參數,另外一種經過一個重要的方法seek。

(1)auto.offset.reset

auto.offset.reset這個參數總共有三種可配置的值:latest、earliest、none。若是配置不在這三個值當中,就會拋出ConfigException。

latest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset或位移越界時,消費新產生的該分區下的數據

earliest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset或位移越界時,從頭開始消費

none:topic各分區都存在已提交的offset時,從offset後開始消費;只要有一個分區不存在已提交的offset或位移越界,則拋出NoOffsetForPartitionException異常

消息的消費是經過poll方法進行的,poll方法對於開發者來講就是一個黑盒,沒法精確的掌控消費的起始位置。即便經過auto.offsets.reset參數也只能在找不到位移或者位移越界的狀況下粗粒度的從頭開始或者從末尾開始。所以,Kafka提供了另外一種更細粒度的消費掌控:seek。

(2)seek

seek能夠實現追前消費和回溯消費:

  
public void seek(TopicPartition partition, long offset)

能夠經過seek方法實現指定分區的消費位移的控制。須要注意的一點是,seek方法只能重置消費者分配到的分區的偏移量,而分區的分配是在poll方法中實現的。所以在執行seek方法以前須要先執行一次poll方法獲取消費者分配到的分區,可是並非每次poll方法都能得到數據,因此能夠採用以下的方法。

  
consumer.subscribe(topicList);
  Set<TopicPartition> assignment = new HashSet<>();
  while(assignment.size() == 0) {
      consumer.poll(Duration.ofMillis(100));
      assignment = consumer.assignment();//獲取消費者分配到的分區,沒有獲取返回一個空集合
  }

  for (TopicPartition tp : assignment) {
      consumer.seek(tp, 10); //重置指定分區的位移
  }
  while (true) {
      ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
      //consume record
    }

若是對未分配到的分區執行了seek方法,那麼會報出IllegalStateException異常。

在前面咱們已經提到,使用auto.offsets.reset參數時,只有當消費者分配到的分區沒有提交的位移或者位移越界時,才能從earliest消費或者從latest消費。seek方法能夠彌補這一中狀況,實現任意狀況的從頭或從尾部消費。

   Set<TopicPartition> assignment = new HashSet<>();
  while(assignment.size() == 0) {
      consumer.poll(Duration.ofMillis(100));
      assignment = consumer.assignment();
  }
  Map<TopicPartition, Long> offsets = consumer.endOffsets(assignment);//獲取指定分區的末尾位置
  for (TopicPartition tp : assignment) {
      consumer.seek;
  }

與endOffset對應的方法是beginningOffset方法,能夠獲取指定分區的起始位置。其實kafka已經提供了一個從頭和從尾消費的方法。

  
public void seekToBeginning(Collection<TopicPartition> partitions)
public void seekToEnd(Collection<TopicPartition> partitions)

還有一種場景是這樣的,咱們並不知道特定的消費位置,卻知道一個相關的時間點。爲解決這種場景遇到的問題,kafka提供了一個offsetsForTimes()方法,經過時間戳來查詢分區消費的位移。

      Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
  for (TopicPartition tp : assignment) {
      timestampToSearch.put(tp, System.currentTimeMillis() - 24 * 3600 * 1000);
  }
//得到指定分區指定時間點的消費位移
  Map<TopicPartition, OffsetAndTimestamp> offsets =                                                                                   consumer.offsetsForTimes(timestampToSearch);
  for (TopicPartition tp : assignment) {
      OffsetAndTimestamp offsetAndTimestamp = offsets.get(tp);
      if (offsetAndTimestamp != null) {
              consumer.seek(tp, offsetAndTimestamp.offset());
      }
  }

因爲seek方法的存在,使得消費者的消費位移能夠存儲在任意的存儲介質中,包括DB、文件系統等。

1.2.7 消費者的再均衡

再均衡是指分區的所屬權從一個消費者轉移到另外一消費者的行爲,它爲消費者組具有高可用伸縮性提升保障。不過須要注意的地方有兩點,第一是消費者發生再均衡期間,消費者組中的消費者是沒法讀取消息的。第二點就是消費者發生再均衡可能會引發重複消費問題,因此通常狀況下要儘可能避免沒必要要的再均衡。

KafkaConsumer的subscribe方法中有一個參數爲ConsumerRebalanceListener,咱們稱之爲再均衡監聽器,它能夠用來在設置發生再均衡動做先後的一些準備和收尾動做。

  public interface ConsumerRebalanceListener {
  void onPartitionsRevoked(Collection<TopicPartition> partitions);
  void onPartitionsAssigned(Collection<TopicPartition> partitions);
}

onPartitionsRevoked方法會在再均衡以前和消費者中止讀取消息以後被調用。能夠經過這個回調函數來處理消費位移的提交,以免重複消費。參數partitions表示再均衡前分配到的分區。

onPartitionsAssigned方法會在再均衡以後和消費者消費之間進行調用。參數partitons表示再均衡以後所分配到的分區。

  consumer.subscribe(topicList);
  Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
  consumer.subscribe(topicList, new ConsumerRebalanceListener() {
      @Override
      public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
          consumer.commitSync(currentOffsets);//提交偏移量
      }

      @Override
      public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
          //do something
      }
  });

  try {
      while (isRunning.get()) {
          ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
          for (ConsumerRecord<String, String> record : records) {
              //process records
              //記錄當前的偏移量
              currentOffsets.put(new TopicPartition(record.topic(), record.partition()),new                               OffsetAndMetadata( record.offset() + 1));
          }
          consumer.commitAsync(currentOffsets, null);
      }

      } catch (Exception e) {
          e.printStackTrace();
      }finally {
          consumer.close();
      }

1.2.8 消費者攔截器

消費者攔截器主要是在消費到消息或者提交消費位移時進行一些定製化的操做。消費者攔截器須要自定義實現org.apache.kafka.clients.consumer.ConsumerInterceptor接口。

  public interface ConsumerInterceptor<K, V> extends Configurable {    
  public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
  public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
  public void close();
}

onConsume方法是在poll()方法返回以前被調用,好比修改消息的內容、過濾消息等。若是onConsume方法發生異常,異常會被捕獲並記錄到日誌中,可是不會向上傳遞。

Kafka會在提交位移以後調用攔截器的onCommit方法,可使用這個方法來記錄和跟蹤消費的位移信息。

  
public class ConsumerInterceptorTTL implements ConsumerInterceptor<String,String> {
  private static final long EXPIRE_INTERVAL = 10 * 1000; //10秒過時
  @Override
  public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
      long now = System.currentTimeMillis();
      Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<>();

      for (TopicPartition tp : records.partitions()) {
          List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
          List<ConsumerRecord<String, String>> newTpRecords = records.records(tp);
          for (ConsumerRecord<String, String> record : tpRecords) {
              if (now - record.timestamp() < EXPIRE_INTERVAL) {//判斷是否超時
                  newTpRecords.add(record);
              }
          }
          if (!newRecords.isEmpty()) {
              newRecords.put(tp, newTpRecords);
          }


      }
      return new ConsumerRecords<>(newRecords);
  }

  @Override
  public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
      offsets.forEach((tp,offset) -> {
          System.out.println(tp + ":" + offset.offset());
      });
  }

  @Override
  public void close() {}

  @Override
  public void configure(Map<String, ?> configs) {}
}

使用這種TTL須要注意的是若是採用帶參數的位移提交方式,有可能提交了錯誤的位移,可能poll拉取的最大位移已經被攔截器過濾掉。

1.2.9 消費者的多線程實現

KafkaProducer是線程安全的,然而KafkaConsumer是非線程安全的。KafkaConsumer中的acquire方法用於檢測當前是否只有一個線程在操做,若是有就會拋出ConcurrentModifiedException。acuqire方法和咱們一般所說的鎖是不一樣的,它不會阻塞線程,咱們能夠把它看作是一個輕量級的鎖,它經過線程操做計數標記的方式來檢測是否發生了併發操做。acquire方法和release方法成對出現,分表表示加鎖和解鎖。

  //標記當前正在操做consumer的線程
private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
//refcount is used to allow reentrant access by the thread who has acquired currentThread,
//大概能夠理解我加鎖的次數
private final AtomicInteger refcount = new AtomicInteger(0);
private void acquire() {
long threadId = Thread.currentThread().getId();
if (threadId != currentThread.get()&&!currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
      throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
      refcount.incrementAndGet();
}

private void release() {
  if (refcount.decrementAndGet() == 0)
      currentThread.set(NO_CURRENT_THREAD);
}

kafkaConsumer中的每一個共有方法在調用以前都會執行aquire方法,只有wakeup方法是個意外。

KafkaConsumer的非線程安全並不意味着消費消息的時候只能以單線程的方式執行。能夠經過多種方式實現多線程消費。

(1)Kafka多線程消費第一種實現方式--------線程封鎖

所謂線程封鎖,就是爲每一個線程實例化一個KafkaConsumer對象。這種方式一個線程對應一個KafkaConsumer,一個線程(可就是一個consumer)能夠消費一個或多個分區的消息。這種消費方式的併發度受限於分區的實際數量。當線程數量超過度分區數量時,就會出現線程限制額的狀況。

  import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class FirstMutiConsumerDemo {
  public static final String brokerList="node112:9092,node113:9092,node114:9092";
  public static final String topic = "topic-demo";
  public static final String groupId = "group.demo";

  public static Properties initConfig() {
      Properties prop = new Properties();
      prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
      prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
      prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
      prop.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
      prop.put(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG, "consumer.client.di.demo");
      prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
      return prop;
  }

  public static void main(String[] args) {
      Properties prop = initConfig();
      int consumerThreadNum = 4;
      for (int i = 0; i < 4; i++) {
          new KafkaCoosumerThread(prop, topic).run();
      }
  }

  public static class KafkaCoosumerThread extends Thread {
  //每一個消費者線程包含一個KakfaConsumer對象。
      private KafkaConsumer<String, String> kafkaConsumer;
      public KafkaCoosumerThread(Properties prop, String topic) {
          this.kafkaConsumer = new KafkaConsumer<String, String>(prop);
          this.kafkaConsumer.subscribe(Arrays.asList(topic));
      }

      @Override
      public void run() {
          try {
              while (true) {
                  ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
                  for (ConsumerRecord<String, String> record : records) {
                      //處理消息模塊
                  }
              }
          } catch (Exception e) {
              e.printStackTrace();
          }finally {
              kafkaConsumer.close();
          }
      }
  }
}

這種實現方式和開啓多個消費進程的方式沒有本質的區別,優勢是每一個線程能夠按照順序消費消費各個分區的消息。缺點是每一個消費線程都要維護一個獨立的TCP鏈接,若是分區數和線程數都不少,那麼會形成不小的系統開銷。

(2)Kafka多線程消費第二種實現方式--------多個消費線程同時消費同一分區

多個線程同時消費同一分區,經過assign方法和seek方法實現。這樣就能夠打破原有消費線程個數不能超過度區數的限制,進一步提升了消費的能力,可是這種方式對於位移提交和順序控制的處理就會變得很是複雜。實際生產中不多使用。

(3)第三種實現方式-------建立一個消費者,records的處理使用多線程實現

通常而言,消費者經過poll拉取數據的速度至關快,而總體消費能力的瓶頸也正式在消息處理這一塊。基於此

考慮第三種實現方式。

  import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThirdMutiConsumerThreadDemo {
  public static final String brokerList="node112:9092,node113:9092,node114:9092";
  public static final String topic = "topic-demo";
  public static final String groupId = "group.demo";

  public static Properties initConfig() {
      Properties prop = new Properties();
      prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
      prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
      prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
      prop.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
      prop.put(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG, "consumer.client.di.demo");
      prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
      return prop;
  }

  public static void main(String[] args) {
      Properties prop = initConfig();
      KafkaConsumerThread consumerThread = new KafkaConsumerThread(prop, topic, Runtime.getRuntime().availableProcessors());
      consumerThread.start();
  }


  public static class KafkaConsumerThread extends Thread {
      private KafkaConsumer<String, String> kafkaConsumer;
      private ExecutorService executorService;
      private int threadNum;

      public KafkaConsumerThread(Properties prop, String topic, int threadNum) {
          this.kafkaConsumer = new KafkaConsumer<String, String>(prop);
          kafkaConsumer.subscribe(Arrays.asList(topic));
          this.threadNum = threadNum;
          executorService = new ThreadPoolExecutor(threadNum, threadNum, 0L, TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
      }

      @Override
      public void run() {
          try {
              while (true) {
                  ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
                  if (!records.isEmpty()) {
                      executorService.submit(new RecordHandler(records));
                  }
              }
          } catch (Exception e) {
              e.printStackTrace();
          }finally {
              kafkaConsumer.close();
          }
      }
  }

  public static class RecordHandler implements Runnable {
      public final ConsumerRecords<String,String> records;
      public RecordHandler(ConsumerRecords<String, String> records) {
          this.records = records;
      }
       
      @Override
      public void run() {
          //處理records
      }
  }
}

KafkaConsumerThread類對應一個消費者線程,裏面經過線程池的方式調用RecordHandler處理一批批的消息。其中線程池採用的拒絕策略爲CallerRunsPolicy,當阻塞隊列填滿時,由調用線程處理該任務,以防止整體的消費能力跟不上poll拉取的速度。這種方式還能夠進行橫向擴展,經過建立多個KafkaConsumerThread實例來進一步提高總體的消費能力。

這種方式還能夠減小TCP鏈接的數量,可是對於消息的順序處理就變得困難了。這種方式須要引入一個共享變量Map<TopicPartition,OffsetAndMetadata> offsets參與消費者的偏移量提交。每個RecordHandler類在處理完消息後都將對應的消費位移保存到共享變量offsets中,KafkaConsumerThread在每一次poll()方法以後都要進讀取offsets中的內容並對其進行提交。對於offsets的讀寫要採用加鎖處理,防止出現併發問題。而且在寫入offsets的時候須要注意位移覆蓋的問題。針對這個問題,能夠將RecordHandler的run方法作以下改變:

  public void run() {
          for (TopicPartition tp : records.partitions()) {
              List<ConsumerRecord<String, String>> tpRecords = this.records.records(tp);
              //處理tpRecords
              long lastConsumedOffset = tpRecords.get(tpRecords.size() - 1).offset();
              synchronized (offsets) {
                  if (offsets.containsKey(tp)) {
                      offsets.put(tp, new OffsetAndMetadata(lastConsumedOffset + 1));
                  }else {
                      long positioin = offsets.get(tp).offset();
                      if(positioin < lastConsumedOffset + 1) {
                      offsets.put(tp, new OffsetAndMetadata(lastConsumedOffset + 1));
                      }
                  }
              }
          }
      }

對應的位移提交代碼也應該在KafkaConsumerThread的run方法中進行體現

  public void run() {
  try {
      while (true) {
          ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
          if (!records.isEmpty()) {
              executorService.submit(new RecordHandler(records));
              synchronized (offsets) {
                  if (!offsets.isEmpty()) {
                      kafkaConsumer.commitSync(offsets);
                      offsets.clear();
                    }
              }
          }
      }
  } catch (Exception e) {
      e.printStackTrace();
    }finally {
        kafkaConsumer.close();
      }
    }
}

其實這種方式並不完美,可能形成數據丟失。能夠經過更爲複雜的滑動窗口的方式進行改進。

1.2.10 消費者重要參數

  • fetch.min.bytes

    kafkaConsumer一次拉拉取請求的最小數據量。適當增長,會提升吞吐量,但會形成額外延遲。

  • fetch.max.bytes

    kafkaConsumer一次拉拉取請求的最大數據量,若是kafka一條消息的大小超過這個值,仍然是能夠拉取的。

  • fetch.max.wait.ms

    一次拉取的最長等待時間,配合fetch.min.bytes使用

  • max.partiton.fetch.bytes

    每一個分區裏返回consumer的最大數據量。

  • max.poll.records

    一次拉取的最大消息數

  • connection.max.idle.ms

    多久以後關閉限制的鏈接

  • exclude.internal.topics

    這個參數用於設置kafka中的兩個內部主題可否被公開:consumer_offsets和transaction_state。若是設爲true,可使用Pattren訂閱內部主題,若是是false,則沒有這種限制。

  • receive.buffer.bytes

    socket接收緩衝區的大小

  • send.buffer.bytes

    socket發送緩衝區的大小

  • request.timeout.ms

    consumer等待請求響應的最長時間。

  • reconnect.backoff.ms

    重試鏈接指定主機的等待時間。

  • max.poll.interval.ms

    配置消費者等待拉取時間的最大值,若是超過這個期限,消費者組將剔除該消費者,進行再平衡。

  • auto.offset.reset

    自動偏移量重置

  • enable.auto.commit

    是否容許偏移量的自動提交

  • auto.commit.interval.ms

    自動偏移量提交的時間間隔

相關文章
相關標籤/搜索