本文源碼:GitHub·點這裏 || GitEE·點這裏java
版本:kafka2.11,zookeeper3.4
注意:這裏zookeeper3.4也是基於集羣模式部署。node
tar -zxvf kafka_2.11-0.11.0.0.tgz mv kafka_2.11-0.11.0.0 kafka2.11
建立日誌目錄git
[root@en-master kafka2.11]# mkdir logs
注意:以上操做須要同步到集羣下其餘服務上。github
vim /etc/profile export KAFKA_HOME=/opt/kafka2.11 export PATH=$PATH:$KAFKA_HOME/bin source /etc/profile
[root@en-master /opt/kafka2.11/config]# vim server.properties -- 核心修改以下 # 惟一編號 broker.id=0 # 開啓topic刪除 delete.topic.enable=true # 日誌地址 log.dirs=/opt/kafka2.11/logs # zk集羣 zookeeper.connect=zk01:2181,zk02:2181,zk03:2181
注意:broker.id安裝集羣服務個數編排便可,集羣下不能重複。bootstrap
# 啓動命令 [root@node02 kafka2.11]# bin/kafka-server-start.sh -daemon config/server.properties # 中止命令 [root@node02 kafka2.11]# bin/kafka-server-stop.sh # 進程查看 [root@node02 kafka2.11]# jps
注意:這裏默認啓動了zookeeper集羣服務,而且集羣下的kafka分別啓動。vim
建立topicapp
bin/kafka-topics.sh --zookeeper zk01:2181 \ --create --replication-factor 3 --partitions 1 --topic one-topic
參數說明:異步
查看topic列表分佈式
bin/kafka-topics.sh --zookeeper zk01:2181 --list
修改topic分區ide
bin/kafka-topics.sh --zookeeper zk01:2181 --alter --topic one-topic --partitions 5
查看topic
bin/kafka-topics.sh --zookeeper zk01:2181 \ --describe --topic one-topic
發送消息
bin/kafka-console-producer.sh \ --broker-list 192.168.72.133:9092 --topic one-topic
消費消息
bin/kafka-console-consumer.sh \ --bootstrap-server 192.168.72.133:9092 --from-beginning --topic one-topic
刪除topic
bin/kafka-topics.sh --zookeeper zk01:2181 \ --delete --topic first
Kafka集羣中有一個broker會被選舉爲Controller,Controller依賴Zookeeper環境,管理集羣broker的上下線,全部topic的分區副本分配和leader選舉等工做。
Kafka中間件的Producer攔截器主要用於實現消息發送的自定義控制邏輯。用戶能夠在消息發送前以及回調邏輯執行前有機會對消息作一些自定義,好比消息修改等,發送狀態監控等,用戶能夠指定多個攔截器按順序執行攔截。
核心方法
注意:這裏說的攔截器是針對消息發送流程。
定義方式:實現ProducerInterceptor接口便可。
攔截器一:在onSend方法中,對攔截的消息進行修改。
@Component public class SendStartInterceptor implements ProducerInterceptor<String, String> { private final Logger LOGGER = LoggerFactory.getLogger("SendStartInterceptor"); @Override public void configure(Map<String, ?> configs) { LOGGER.info("configs..."); } @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { // 修改消息內容 return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), record.key(), "onSend:{" + record.value()+"}"); } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { LOGGER.info("onAcknowledgement..."); } @Override public void close() { LOGGER.info("SendStart close..."); } }
攔截器二:在onAcknowledgement方法中,判斷消息是否發送成功。
@Component public class SendOverInterceptor implements ProducerInterceptor<String, String> { private final Logger LOGGER = LoggerFactory.getLogger("SendOverInterceptor"); @Override public void configure(Map<String, ?> configs) { LOGGER.info("configs..."); } @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { LOGGER.info("record...{}", record.value()); return record ; } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { if (exception != null){ LOGGER.info("Send Fail...exe-msg",exception.getMessage()); } LOGGER.info("Send success..."); } @Override public void close() { LOGGER.info("SendOver close..."); } }
加載攔截器:基於一個KafkaProducer配置Bean,加入攔截器。
@Configuration public class KafkaConfig { @Bean public Producer producer (){ Properties props = new Properties(); // 省略其餘配置... // 添加攔截器 List<String> interceptors = new ArrayList<>(); interceptors.add("com.kafka.cluster.interceptor.SendStartInterceptor"); interceptors.add("com.kafka.cluster.interceptor.SendOverInterceptor"); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); return new KafkaProducer<>(props) ; } }
@RestController public class SendMsgWeb { @Resource private KafkaProducer<String,String> producer ; @GetMapping("/sendMsg") public String sendMsg (){ producer.send(new ProducerRecord<>("one-topic", "msgKey", "msgValue")); return "success" ; } }
基於上述自定義Bean類型,進行消息發送,關注攔截器中打印日誌信息。
說明:該過程基於上述案例producer.send方法追蹤的源碼執行流程,源碼中的過程相對清楚,涉及的核心流程以下。
Producer發送消息採用的是異步發送的方式,消息發送過程以下:
絮叨一句:讀這些中間件的源碼,不只能開闊思惟,也會讓本身意識到平時寫的代碼可能真的叫搬磚。
Kafka中消息是以topic進行標識分類,生產者面向topic生產消息,topic分區(partition)是物理上的存儲,基於消息日誌文件的方式。
Kafka支持消息的事務控制
Producer事務
跨分區跨會話的事務原理,引入全局惟一的TransactionID,並將Producer得到的PID和TransactionID綁定。Producer重啓後能夠經過正在進行的TransactionID得到原來的PID。
Kafka基於TransactionCoordinator組件管理Transaction,Producer經過和TransactionCoordinator交互得到TransactionID對應的任務狀態。TransactionCoordinator將事務狀態寫入Kafka的內部Topic,即便整個服務重啓,進行中的事務狀態能夠獲得恢復。
Consumer事務
Consumer消息消費,事務的保證強度很低,沒法保證消息被精確消費,由於同一事務的消息可能會出現重啓後已經被刪除的狀況。
GitHub·地址 https://github.com/cicadasmile/data-manage-parent GitEE·地址 https://gitee.com/cicadasmile/data-manage-parent
推薦關聯閱讀:數據源管理系列