本文的圖片是經過PPT截圖出的,讀者若是修改意見請聯繫我html
假設咱們正在開發一個消息通知模塊,該模塊容許用戶訂閱其餘用戶發送的通知/消息。該消息通知模塊採用Apache Kafka,那麼整個架構應該是消息的發佈者經過Producer調用API寫入消息到Kafka Cluster中,而後消息的訂閱者經過Consumer讀取消息,剛開始的時候系統架構圖以下:java
可是,隨着用戶數量的增多,通知的數據也會對應的增加。總會達到一個閾值,在這個點上,Producer產生的數量大於Consumer可以消費的數量。那麼Broker中未消費的消息就會逐漸增多。即便Kafka使用了優秀的消息持久化機制來保存未被消費的消息,可是Kafka的消息保留機制限制(時間,分區大小,消息Key)也會使得始終未被消費的Message被永久性的刪除。另外一方面從業務上講,一個消息通知系統的高延遲幾乎算做是廢物了。因此多線程的Consumer模型是很是有必要的。git
基於Consumer的多線程模型有兩種類型:github
兩種實現方式的優勢/缺點比較以下:apache
名稱 | 優勢 | 缺點 |
---|---|---|
模型一 | 1.Consumer Group容易實現bootstrap 2.各個Partition的順序實現更容易session |
1.Consumer的數量不能超過Partition的數量,不然多出的Consumer永遠不會被使用到多線程 2.因沒個Consumer都須要一個TCP連接,會形成大量的系統性能損耗架構 |
模型二 | 1.因爲經過線程池實現了Consumer,橫向擴展更方便 | 1.在每一個Partition上實現順序處理更困難。app 例如:同一個Partition上有兩個待處理的Message須要被線程池中的2個線程消費掉,那這兩個線程必須實現同步 |
其中,consumergroup包下面對應的是模型一的代碼,consumerthread包下是模型二的代碼。ProducerThread是生產者代碼。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.randy</groupId> <artifactId>kafka_multithread_consumer_model</artifactId> <packaging>war</packaging> <version>1.0-SNAPSHOT</version> <name>kafka_multithread_consumer_model Maven Webapp</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.0</version> </dependency> </dependencies> <build> <finalName>kafka_multithread_consumer_model</finalName> </build> </project>
ProducerThread.java是一個生產者線程,發送消息到Broker
ConsumerThread.java是一個消費者線程,因爲消費消息
ConsumerGroup.java用於產生一組消費者線程
ConsumerGroupMain.java是入口類
3.4.1 ProducerThread.java
package com.randy; import org.apache.kafka.clients.producer.*; import java.util.Properties; /** * Author : RandySun (sunfeng152157@sina.com) * Date : 2017-08-20 11:41 * Comment : */ public class ProducerThread implements Runnable { private final Producer<String,String> kafkaProducer; private final String topic; public ProducerThread(String brokers,String topic){ Properties properties = buildKafkaProperty(brokers); this.topic = topic; this.kafkaProducer = new KafkaProducer<String,String>(properties); } private static Properties buildKafkaProperty(String brokers){ Properties properties = new Properties(); properties.put("bootstrap.servers", brokers); properties.put("acks", "all"); properties.put("retries", 0); properties.put("batch.size", 16384); properties.put("linger.ms", 1); properties.put("buffer.memory", 33554432); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); return properties; } @Override public void run() { System.out.println("start sending message to kafka"); int i = 0; while (true){ String sendMsg = "Producer message number:"+String.valueOf(++i); kafkaProducer.send(new ProducerRecord<String, String>(topic,sendMsg),new Callback(){ @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if(e != null){ e.printStackTrace(); } System.out.println("Producer Message: Partition:"+recordMetadata.partition()+",Offset:"+recordMetadata.offset()); } }); // thread sleep 3 seconds every time try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("end sending message to kafka"); } } }
3.4.2 ConsumerThread.java
package com.randy.consumergroup; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; /** * Author : RandySun (sunfeng152157@sina.com) * Date : 2017-08-20 12:03 * Comment : */ public class ConsumerThread implements Runnable { private static KafkaConsumer<String,String> kafkaConsumer; private final String topic; public ConsumerThread(String brokers,String groupId,String topic){ Properties properties = buildKafkaProperty(brokers,groupId); this.topic = topic; this.kafkaConsumer = new KafkaConsumer<String, String>(properties); this.kafkaConsumer.subscribe(Arrays.asList(this.topic)); } private static Properties buildKafkaProperty(String brokers,String groupId){ Properties properties = new Properties(); properties.put("bootstrap.servers", brokers); properties.put("group.id", groupId); properties.put("enable.auto.commit", "true"); properties.put("auto.commit.interval.ms", "1000"); properties.put("session.timeout.ms", "30000"); properties.put("auto.offset.reset", "earliest"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return properties; } @Override public void run() { while (true){ ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(100); for(ConsumerRecord<String,String> item : consumerRecords){ System.out.println("Consumer Message:"+item.value()+",Partition:"+item.partition()+"Offset:"+item.offset()); } } } }
3.4.3 ConsumerGroup.java
package com.randy.consumergroup; import java.util.ArrayList; import java.util.List; /** * Author : RandySun (sunfeng152157@sina.com) * Date : 2017-08-20 14:09 * Comment : */ public class ConsumerGroup { private final String brokers; private final String groupId; private final String topic; private final int consumerNumber; private List<ConsumerThread> consumerThreadList = new ArrayList<ConsumerThread>(); public ConsumerGroup(String brokers,String groupId,String topic,int consumerNumber){ this.groupId = groupId; this.topic = topic; this.brokers = brokers; this.consumerNumber = consumerNumber; for(int i = 0; i< consumerNumber;i++){ ConsumerThread consumerThread = new ConsumerThread(brokers,groupId,topic); consumerThreadList.add(consumerThread); } } public void start(){ for (ConsumerThread item : consumerThreadList){ Thread thread = new Thread(item); thread.start(); } } }
3.4.4 ConsumerGroupMain.java
package com.randy.consumergroup; import com.randy.ProducerThread; /** * Author : RandySun (sunfeng152157@sina.com) * Date : 2017-08-20 14:18 * Comment : */ public class ConsumerGroupMain { public static void main(String[] args){ String brokers = "Server2:9092"; String groupId = "group01"; String topic = "HelloWorld"; int consumerNumber = 3; Thread producerThread = new Thread(new ProducerThread(brokers,topic)); producerThread.start(); ConsumerGroup consumerGroup = new ConsumerGroup(brokers,groupId,topic,consumerNumber); consumerGroup.start(); } }
ConsumerThreadHandler.java用於處理髮送到消費者的消息
ConsumerThread.java是消費者使用線程池的方式初始化消費者線程
ConsumerThreadMain.java是入口類
3.5.1 ConsumerThreadHandler.java
package com.randy.consumerthread; import org.apache.kafka.clients.consumer.ConsumerRecord; /** * Author : RandySun (sunfeng152157@sina.com) * Date : 2017-08-20 16:29 * Comment : */ public class ConsumerThreadHandler implements Runnable { private ConsumerRecord consumerRecord; public ConsumerThreadHandler(ConsumerRecord consumerRecord){ this.consumerRecord = consumerRecord; } @Override public void run() { System.out.println("Consumer Message:"+consumerRecord.value()+",Partition:"+consumerRecord.partition()+"Offset:"+consumerRecord.offset()); } }
3.5.2 ConsumerThread.java
package com.randy.consumerthread; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * Author : RandySun (sunfeng152157@sina.com) * Date : 2017-08-20 16:42 * Comment : */ public class ConsumerThread { private final KafkaConsumer<String, String> consumer; private final String topic; // Threadpool of consumers private ExecutorService executor; public ConsumerThread(String brokers, String groupId, String topic){ Properties properties = buildKafkaProperty(brokers,groupId); this.consumer = new KafkaConsumer<>(properties); this.topic = topic; this.consumer.subscribe(Arrays.asList(this.topic)); } public void start(int threadNumber){ executor = new ThreadPoolExecutor(threadNumber,threadNumber,0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1000), new ThreadPoolExecutor.CallerRunsPolicy()); while (true){ ConsumerRecords<String,String> consumerRecords = consumer.poll(100); for (ConsumerRecord<String,String> item : consumerRecords){ executor.submit(new ConsumerThreadHandler(item)); } } } private static Properties buildKafkaProperty(String brokers, String groupId){ Properties properties = new Properties(); properties.put("bootstrap.servers", brokers); properties.put("group.id", groupId); properties.put("enable.auto.commit", "true"); properties.put("auto.commit.interval.ms", "1000"); properties.put("session.timeout.ms", "30000"); properties.put("auto.offset.reset", "earliest"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return properties; } }
3.5.3 ConsumerThreadMain.java
package com.randy.consumerthread; import com.randy.ProducerThread; /** * Author : RandySun (sunfeng152157@sina.com) * Date : 2017-08-20 16:49 * Comment : */ public class ConsumerThreadMain { public static void main(String[] args){ String brokers = "Server2:9092"; String groupId = "group01"; String topic = "HelloWorld"; int consumerNumber = 3; Thread producerThread = new Thread(new ProducerThread(brokers,topic)); producerThread.start(); ConsumerThread consumerThread = new ConsumerThread(brokers,groupId,topic); consumerThread.start(3); } }
本篇文章列舉了兩種不一樣的消費者模式。二者各有利弊。全部代碼都上傳到了https://github.com/qizhelongdeyang/kafka_multithread_consumer_model.git ,若有疑問或者錯誤請指正