寫這篇文章的原由是因爲以前的一篇關於Kafka
異常消費,當時爲了解決問題不得不使用臨時的方案。html
總結起來歸根結底仍是對Kafka不熟悉致使的,加上平時工做的須要,以後就花些時間看了Kafka
相關的資料。java
談到Kafka
就不得不提到MQ,是屬於消息隊列的一種。做爲一種基礎中間件在互聯網項目中有着大量的使用。git
一種技術的產生天然是爲了解決某種需求,一般來講是如下場景:github
- 須要跨進程通訊:B系統須要A系統的輸出做爲輸入參數。
- 當A系統的輸出能力遠遠大於B系統的處理能力。
針對於第一種狀況有兩種方案:算法
RPC
遠程調用,A直接調用B。MQ
,A發佈消息到MQ
,B訂閱該消息。當咱們的需求是:A調用B實時響應,而且實時關心響應結果則使用RPC
,這種狀況就得使用同步調用。shell
反之當咱們並不關心調用以後的執行結果,而且有可能被調用方的執行很是耗時,這種狀況就很是適合用MQ
來達到異步調用目的。apache
好比常見的登陸場景就只能用同步調用的方式,由於這個過程須要實時的響應結果,總不能在用戶點了登陸以後排除網絡緣由以外再額外的等幾秒吧。bootstrap
但相似於用戶登陸須要獎勵積分的狀況則使用MQ
會更好,由於登陸並不關係積分的狀況,只須要發個消息到MQ
,處理積分的服務訂閱處理便可,這樣還能夠解決積分系統故障帶來的雪崩效應。bash
MQ
還有一個基礎功能則是限流削峯,這對於大流量的場景若是將請求直接調用到B系統則很是有可能使B系統出現不可用的狀況。這種場景就很是適合將請求放入MQ
,不但能夠利用MQ
削峯還儘量的保證系統的高可用。網絡
本次重點討論下Kafka
。
簡單來講Kafka
是一個支持水平擴展,高吞吐率的分佈式消息系統。
Kafka
的經常使用知識:
Topic
:生產者和消費者的交互都是圍繞着一個Topic
進行的,一般來講是由業務來進行區分,由生產消費者協商以後進行建立。
Partition
(分區):是Topic
下的組成,一般一個Topic
下有一個或多個分區,消息生產以後會按照必定的算法負載到每一個分區,因此分區也是Kafka
性能的關鍵。當發現性能不高時即可考慮新增分區。
結構圖以下:
Topic
Kafka
的安裝官網有很是詳細的講解。這裏談一下在平常開發中常見的一些操做,好比建立Topic
:
sh bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic `test`複製代碼
建立了三個分區的test
主題。
使用
sh bin/kafka-topics.sh --list --zookeeper localhost:2181複製代碼
能夠列出全部的Topic
。
使用kafka
官方所提供的Java API
來進行消息生產,實際使用中編碼實現更爲經常使用:
/** Kafka生產者 * @author crossoverJie */
public class Producer {
private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
/** * 消費配置文件 */
private static String consumerProPath;
public static void main(String[] args) throws IOException {
// set up the producer
consumerProPath = System.getProperty("product_path");
KafkaProducer<String, String> producer = null;
try {
FileInputStream inputStream = new FileInputStream(new File(consumerProPath));
Properties properties = new Properties();
properties.load(inputStream);
producer = new KafkaProducer<String, String>(properties);
} catch (IOException e) {
LOGGER.error("load config error", e);
}
try {
// send lots of messages
for (int i=0 ;i<100 ; i++){
producer.send(new ProducerRecord<String, String>(
"topic_optimization", i+"", i+""));
}
} catch (Throwable throwable) {
System.out.printf("%s", throwable.getStackTrace());
} finally {
producer.close();
}
}
}複製代碼
再配合如下啓動參數便可發送消息:
-Dproduct_path=/xxx/producer.properties複製代碼
以及生產者的配置文件:
#集羣地址,能夠多個
bootstrap.servers=10.19.13.51:9094
acks=all
retries=0
batch.size=16384
auto.commit.interval.ms=1000
linger.ms=0
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
block.on.buffer.full=true複製代碼
具體的配置說明詳見此處:kafka.apache.org/0100/docume…
流程很是簡單,其實就是一些API
的調用。
消息發完以後能夠經過如下命令查看隊列內的狀況:
sh kafka-consumer-groups.sh --bootstrap-server localhost:9094 --describe --group group1複製代碼
lag
即是隊列裏的消息數量。
有了生產者天然也少不了消費者,這裏首先針對單線程消費:
/** * Function:kafka官方消費 * * @author crossoverJie * Date: 2017/10/19 01:11 * @since JDK 1.8 */
public class KafkaOfficialConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaOfficialConsumer.class);
/** * 日誌文件地址 */
private static String logPath;
/** * 主題名稱 */
private static String topic;
/** * 消費配置文件 */
private static String consumerProPath ;
/** * 初始化參數校驗 * @return */
private static boolean initCheck() {
topic = System.getProperty("topic") ;
logPath = System.getProperty("log_path") ;
consumerProPath = System.getProperty("consumer_pro_path") ;
if (StringUtil.isEmpty(topic) || logPath.isEmpty()) {
LOGGER.error("system property topic ,consumer_pro_path, log_path is required !");
return true;
}
return false;
}
/** * 初始化kafka配置 * @return */
private static KafkaConsumer<String, String> initKafkaConsumer() {
KafkaConsumer<String, String> consumer = null;
try {
FileInputStream inputStream = new FileInputStream(new File(consumerProPath)) ;
Properties properties = new Properties();
properties.load(inputStream);
consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Arrays.asList(topic));
} catch (IOException e) {
LOGGER.error("加載consumer.props文件出錯", e);
}
return consumer;
}
public static void main(String[] args) {
if (initCheck()){
return;
}
int totalCount = 0 ;
long totalMin = 0L ;
int count = 0;
KafkaConsumer<String, String> consumer = initKafkaConsumer();
long startTime = System.currentTimeMillis() ;
//消費消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(200);
if (records.count() <= 0){
continue ;
}
LOGGER.debug("本次獲取:"+records.count());
count += records.count() ;
long endTime = System.currentTimeMillis() ;
LOGGER.debug("count=" +count) ;
if (count >= 10000 ){
totalCount += count ;
LOGGER.info("this consumer {} record,use {} milliseconds",count,endTime-startTime);
totalMin += (endTime-startTime) ;
startTime = System.currentTimeMillis() ;
count = 0 ;
}
LOGGER.debug("end totalCount={},min={}",totalCount,totalMin);
/*for (ConsumerRecord<String, String> record : records) { record.value() ; JsonNode msg = null; try { msg = mapper.readTree(record.value()); } catch (IOException e) { LOGGER.error("消費消息出錯", e); } LOGGER.info("kafka receive = "+msg.toString()); }*/
}
}
}複製代碼
配合如下啓動參數:
-Dlog_path=/log/consumer.log -Dtopic=test -Dconsumer_pro_path=consumer.properties複製代碼
其中採用了輪詢的方式獲取消息,而且記錄了消費過程當中的數據。
消費者採用的配置:
bootstrap.servers=192.168.1.2:9094
group.id=group1
# 自動提交
enable.auto.commit=true
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# fast session timeout makes it more fun to play with failover
session.timeout.ms=10000
# These buffer sizes seem to be needed to avoid consumer switching to
# a mode where it processes one bufferful every 5 seconds with multiple
# timeouts along the way. No idea why this happens.
fetch.min.bytes=50000
receive.buffer.bytes=262144
max.partition.fetch.bytes=2097152複製代碼
爲了簡便我採用的是自動提交offset
。
談到offset
就必須得談談Kafka的消息存放機制.
Kafka
的消息不會由於消費了就會當即刪除,全部的消息都會持久化到日誌文件,並配置有過時時間,到了時間會自動刪除過時數據,而且不會管其中的數據是否被消費過。
因爲這樣的機制就必須的有一個標誌來代表哪些數據已經被消費過了,offset(偏移量)
就是這樣的做用,它相似於指針指向某個數據,當消費以後offset
就會線性的向前移動,這樣一來的話消息是能夠被任意消費的,只要咱們修改offset
的值便可。
消費過程當中還有一個值得注意的是:
同一個consumer group(group.id相等)下只能有一個消費者能夠消費,這個剛開始確實會讓不少人踩坑。
針對於單線程消費實現起來天然是比較簡單,可是效率也是要大打折扣的。
爲此我作了一個測試,使用以前的單線程消費120009條數據的結果以下:
那麼換成多線程消費怎麼實現呢?
咱們能夠利用partition
的分區特性來提升消費能力,單線程的時候等因而一個線程要把全部分區裏的數據都消費一遍,若是換成多線程就可讓一個線程只消費一個分區,這樣效率天然就提升了,因此線程數coreSize<=partition
。
首先來看下入口:
public class ConsumerThreadMain {
private static String brokerList = "localhost:9094";
private static String groupId = "group1";
private static String topic = "test";
/**
* 線程數量
*/
private static int threadNum = 3;
public static void main(String[] args) {
ConsumerGroup consumerGroup = new ConsumerGroup(threadNum, groupId, topic, brokerList);
consumerGroup.execute();
}
}複製代碼
其中的ConsumerGroup
類:
public class ConsumerGroup {
private static Logger LOGGER = LoggerFactory.getLogger(ConsumerGroup.class);
/**
* 線程池
*/
private ExecutorService threadPool;
private List<ConsumerCallable> consumers ;
public ConsumerGroup(int threadNum, String groupId, String topic, String brokerList) {
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("consumer-pool-%d").build();
threadPool = new ThreadPoolExecutor(threadNum, threadNum,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
consumers = new ArrayList<ConsumerCallable>(threadNum);
for (int i = 0; i < threadNum; i++) {
ConsumerCallable consumerThread = new ConsumerCallable(brokerList, groupId, topic);
consumers.add(consumerThread);
}
}
/**
* 執行任務
*/
public void execute() {
long startTime = System.currentTimeMillis() ;
for (ConsumerCallable runnable : consumers) {
Future<ConsumerFuture> future = threadPool.submit(runnable) ;
}
if (threadPool.isShutdown()){
long endTime = System.currentTimeMillis() ;
LOGGER.info("main thread use {} Millis" ,endTime -startTime) ;
}
threadPool.shutdown();
}
}複製代碼
最後真正的執行邏輯ConsumerCallable
:
public class ConsumerCallable implements Callable<ConsumerFuture> {
private static Logger LOGGER = LoggerFactory.getLogger(ConsumerCallable.class);
private AtomicInteger totalCount = new AtomicInteger() ;
private AtomicLong totalTime = new AtomicLong() ;
private AtomicInteger count = new AtomicInteger() ;
/**
* 每一個線程維護KafkaConsumer實例
*/
private final KafkaConsumer<String, String> consumer;
public ConsumerCallable(String brokerList, String groupId, String topic) {
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("group.id", groupId);
//自動提交位移
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
this.consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
}
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
@Override
public ConsumerFuture call() throws Exception {
boolean flag = true;
int failPollTimes = 0 ;
long startTime = System.currentTimeMillis() ;
while (flag) {
// 使用200ms做爲獲取超時時間
ConsumerRecords<String, String> records = consumer.poll(200);
if (records.count() <= 0){
failPollTimes ++ ;
if (failPollTimes >= 20){
LOGGER.debug("達到{}次數,退出 ",failPollTimes);
flag = false ;
}
continue ;
}
//獲取到以後則清零
failPollTimes = 0 ;
LOGGER.debug("本次獲取:"+records.count());
count.addAndGet(records.count()) ;
totalCount.addAndGet(count.get()) ;
long endTime = System.currentTimeMillis() ;
if (count.get() >= 10000 ){
LOGGER.info("this consumer {} record,use {} milliseconds",count,endTime-startTime);
totalTime.addAndGet(endTime-startTime) ;
startTime = System.currentTimeMillis() ;
count = new AtomicInteger();
}
LOGGER.debug("end totalCount={},min={}",totalCount,totalTime);
/*for (ConsumerRecord<String, String> record : records) {
// 簡單地打印消息
LOGGER.debug(record.value() + " consumed " + record.partition() +
" message with offset: " + record.offset());
}*/
}
ConsumerFuture consumerFuture = new ConsumerFuture(totalCount.get(),totalTime.get()) ;
return consumerFuture ;
}
}複製代碼
理一下邏輯:
其實就是初始化出三個消費者實例,用於三個線程消費。其中加入了一些統計,最後也是消費120009條數據結果以下。
因爲是並行運行,可見消費120009條數據能夠提升2秒左右,當數據以更高的數量級提高後效果會更加明顯。
但這也有一些弊端:
Kafka
的知識點仍是較多,Kafka
的使用也遠不這些。以後會繼續分享一些關於Kafka
監控等相關內容。
我的博客:crossoverjie.top。