Kafka 使用ExecutorService 多線程消費

前言:

Apache Kafka 做爲當下最經常使用消息中間件之一。給到個人需求是須要咱們處理大量的消息(若是單線程處理過多消息會出現性能瓶頸)。java

如何使用Java的ExecutorService框架來建立線程池處理大量消息?apache

1.建立一個能夠從topic中poll()消息後傳遞到線程池以進行進一步處理。bootstrap

2.建立工做線程,以執行每條消息的進一步處理。session

1.topic消息傳遞到ThreadPoolExecutorService

/** kafka 消息處理*/
public class KafkaProcessor {
    private final KafkaConsumer<String, String> myConsumer;
    private ExecutorService executor;
    private static final Properties KAFKA_PROPERTIES = new Properties();
   
   //基礎的kafka配置~
   static {
        KAFKA_PROPERTIES.put("bootstrap.servers", "localhost:9092");
        KAFKA_PROPERTIES.put("group.id", "test-consumer-group");
        KAFKA_PROPERTIES.put("enable.auto.commit", "true");
        KAFKA_PROPERTIES.put("auto.commit.interval.ms", "1000");
        KAFKA_PROPERTIES.put("session.timeout.ms", "30000");
        KAFKA_PROPERTIES.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KAFKA_PROPERTIES.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    }

    public KafkaProcessor() {
        this.myConsumer = new KafkaConsumer<>(KAFKA_PROPERTIES);//初始化配置
        this.myConsumer.subscribe(Arrays.asList("test")); //訂閱topic=test
    }
    public void init(int numberOfThreads) {
      //建立一個線程池 
     /** * public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) *corePoolSize : 核心線程數,一旦建立將不會再釋放。若是建立的線程數尚未達到指定的核心線 程數量,將會繼續建立新的核心線程,直到達到最大核心線程數後,核心線程數將不在增長;若是沒有空閒的核心線程,同時又未達到最大線程數,則將繼續建立非核心線程;若是核心線程數等於最大線程數,則當核心線程都處於激活狀態時,任務將被掛起,等待空閒線程來執行。 *maximumPoolSize : 最大線程數,容許建立的最大線程數量。若是最大線程數等於核心線程數,則沒法建立非核心線程;若是非核心線程處於空閒時,超過設置的空閒時間,則將被回收,釋放佔用的資源。 *keepAliveTime : 也就是當線程空閒時,所容許保存的最大時間,超過這個時間,線程將被釋放銷燬,但只針對於非核心線程。 *unit : 時間單位,TimeUnit.SECONDS等。 *workQueue : 任務隊列,存儲暫時沒法執行的任務,等待空閒線程來執行任務。 *threadFactory : 線程工程,用於建立線程。 *handler : 當線程邊界和隊列容量已經達到最大時,用於處理阻塞時的程序 */

      executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads,0L,TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
      
    while (true) {
        ConsumerRecords<String, String> records = myConsumer.poll(100);//每隔時間段進行消息拉取
        for (final ConsumerRecord<String, String> record : records) {
        executor.submit(new KafkaRecordHandler(record)); 
        }
      }
}
    //別忘記線程池的關閉!
    public void shutdown() {
        if (myConsumer != null) {
        myConsumer.close();
        }
        if (executor != null) {
        executor.shutdown();
        }
        try {
          if (executor != null && !executor.awaitTermination(60, TimeUnit.MILLISECONDS)) {
          executor.shutdownNow();
          }
        }catch (InterruptedException e) {
        executor.shutdownNow();
        }
}
}複製代碼

2.建立工做線程

// 建立消息線程進行處理
public class KafkaRecordHandler implements Runnable {

    private ConsumerRecord<String, String> record;

    public KafkaRecordHandler(ConsumerRecord<String, String> record) {
        this.record = record;
    }

    @Override
    public void run() { 
      //業務操做...
        System.out.println("value = "+record.value());
        System.out.println("Thread id = "+ Thread.currentThread().getId());
    }
}複製代碼

3.Using ?

//消費測試
public class ConsumerTest {
    public static void main(String[] args) {
      KafkaProcessor processor = new KafkaProcessor();
      try {
          processor.init(5);//指定相應的線程數!
      }catch (Exception exp) {
          processor.shutdown();
      }
    }
}複製代碼

4.總結

可能並不適合全部方案,按需定製方案。框架

相關文章
相關標籤/搜索