Apache Kafka 是一個快速、可擴展的、高吞吐、可容錯的分佈式發佈訂閱消息系統。其具備高吞吐量、內置分區、支持數據副本和容錯的特性,適合在大規模消息處理場景中使用。java
筆者以前在物聯網公司工做,其中 Kafka 做爲物聯網 MQ 選型的事實標準,這裏優先給你們搭建 Kafka 集羣環境。因爲 Kafka 的安裝須要依賴 Zookeeper,對 Zookeeper 還不瞭解的小夥伴能夠在 這裏 先認識下 Zookeeper。web
Kafka 能解決什麼問題呢?先說一下消息隊列常見的使用場景吧,其實場景有不少,可是比較核心的有 3 個:解耦、異步、削峯。spring
Kafka 部分名詞解釋以下:docker
配合上一節的 Zookeeper 環境,計劃搭建一個 3 節點的集羣。宿主機 IP 爲 192.168.124.5
。bootstrap
docker-compose-kafka-cluster.ymlbash
version: '3.7'
networks:
docker_net:
external: true
services:
kafka1:
image: wurstmeister/kafka
restart: unless-stopped
container_name: kafka1
ports:
- "9093:9092"
external_links:
- zoo1
- zoo2
- zoo3
environment:
KAFKA_BROKER_ID: 1
KAFKA_ADVERTISED_HOST_NAME: 192.168.124.5 ## 修改:宿主機IP
KAFKA_ADVERTISED_PORT: 9093 ## 修改:宿主機映射port
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.124.5:9093 ## 綁定發佈訂閱的端口。修改:宿主機IP
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181"
volumes:
- "./kafka/kafka1/docker.sock:/var/run/docker.sock"
- "./kafka/kafka1/data/:/kafka"
networks:
- docker_net
kafka2:
image: wurstmeister/kafka
restart: unless-stopped
container_name: kafka2
ports:
- "9094:9092"
external_links:
- zoo1
- zoo2
- zoo3
environment:
KAFKA_BROKER_ID: 2
KAFKA_ADVERTISED_HOST_NAME: 192.168.124.5 ## 修改:宿主機IP
KAFKA_ADVERTISED_PORT: 9094 ## 修改:宿主機映射port
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.124.5:9094 ## 修改:宿主機IP
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181"
volumes:
- "./kafka/kafka2/docker.sock:/var/run/docker.sock"
- "./kafka/kafka2/data/:/kafka"
networks:
- docker_net
kafka3:
image: wurstmeister/kafka
restart: unless-stopped
container_name: kafka3
ports:
- "9095:9092"
external_links:
- zoo1
- zoo2
- zoo3
environment:
KAFKA_BROKER_ID: 3
KAFKA_ADVERTISED_HOST_NAME: 192.168.124.5 ## 修改:宿主機IP
KAFKA_ADVERTISED_PORT: 9095 ## 修改:宿主機映射port
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.124.5:9095 ## 修改:宿主機IP
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181"
volumes:
- "./kafka/kafka3/docker.sock:/var/run/docker.sock"
- "./kafka/kafka3/data/:/kafka"
networks:
- docker_net
kafka-manager:
image: sheepkiller/kafka-manager:latest
restart: unless-stopped
container_name: kafka-manager
hostname: kafka-manager
ports:
- "9000:9000"
links: # 鏈接本compose文件建立的container
- kafka1
- kafka2
- kafka3
external_links: # 鏈接本compose文件之外的container
- zoo1
- zoo2
- zoo3
environment:
ZK_HOSTS: zoo1:2181,zoo2:2181,zoo3:2181 ## 修改:宿主機IP
TZ: CST-8
networks:
- docker_net
複製代碼
執行如下命令啓動服務器
docker-compose -f docker-compose-kafka-cluster.yml up -d
複製代碼
能夠看到 kafka 集羣已經啓動成功。併發
細心的小夥伴發現上邊的配置除了 kafka 外還有一個 kafka-manager 模塊。它是 kafka 的可視化管理模塊。由於 kafka 的元數據、配置信息由 Zookeeper 管理,這裏咱們在 UI 頁面作下相關配置。app
1. 訪問 http:localhost:9000,按圖示添加相關配置less
2. 配置後咱們能夠看到默認有一個 topic(__consumer_offsets),3 個 brokers。該 topic 分 50 個 partition,用於記錄 kafka 的消費偏移量。
1. 首先觀察下根目錄
kafka 基於 zookeeper,kafka 啓動會將元數據保存在 zookeeper 中。查看 zookeeper 節點目錄,會發現多了不少和 kafka 相關的目錄。結果以下:
➜ docker zkCli -server 127.0.0.1:2183
Connecting to 127.0.0.1:2183
Welcome to ZooKeeper!
JLine support is enabled
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
[zk: 127.0.0.1:2183(CONNECTED) 0] ls /
[cluster, controller, brokers, zookeeper, admin, isr_change_notification, log_dir_event_notification, controller_epoch, zk-test0000000000, kafka-manager, consumers, latest_producer_id_block, config]
複製代碼
2. 查看咱們映射的 kafka 目錄,新版本的 kafka 偏移量再也不存儲在 zk 中,而是在 kafka 本身的環境中。
咱們節選了部分目錄(包含 2 個 partition)
├── kafka1
│ ├── data
│ │ └── kafka-logs-c4e2e9edc235
│ │ ├── __consumer_offsets-1
│ │ │ ├── 00000000000000000000.index // segment索引文件
│ │ │ ├── 00000000000000000000.log // 數據文件
│ │ │ ├── 00000000000000000000.timeindex // 消息時間戳索引文件
│ │ │ └── leader-epoch-checkpoint
...
│ │ ├── __consumer_offsets-7
│ │ │ ├── 00000000000000000000.index
│ │ │ ├── 00000000000000000000.log
│ │ │ ├── 00000000000000000000.timeindex
│ │ │ └── leader-epoch-checkpoint
│ │ ├── cleaner-offset-checkpoint
│ │ ├── log-start-offset-checkpoint
│ │ ├── meta.properties
│ │ ├── recovery-point-offset-checkpoint
│ │ └── replication-offset-checkpoint
│ └── docker.sock
複製代碼
結果與 Kafka-Manage 顯示結果一致。圖示的文件是一個 Segment,00000000000000000000.log 表示 offset 從 0 開始,隨着數據不斷的增長,會有多個 Segment 文件。
➜ docker docker exec -it kafka1 /bin/bash # 進入容器
bash-4.4# cd /opt/kafka/ # 進入安裝目錄
bash-4.4# ./bin/kafka-topics.sh --list --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 # 查看主題列表
__consumer_offsets
bash-4.4# ./bin/kafka-topics.sh --create --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 --replication-factor 2 --partitions 3 --topic test # 新建主題
Created topic test.
複製代碼
說明: --replication-factor 副本數; --partitions 分區數; replication<=broker(必定); 有效消費者數<=partitions 分區數(必定);
新建主題後, 再次查看映射目錄, 由圖可見,partition 在 3 個 broker 上均勻分佈。
bash-4.4# ./bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic test
>msg1
>msg2
>msg3
>msg4
>msg5
>msg6
複製代碼
bash-4.4# ./bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic test --from-beginning
msg1
msg3
msg2
msg4
msg6
msg5
複製代碼
--from-beginning 表明從頭開始消費
查看消費者組
bash-4.4# ./bin/kafka-consumer-groups.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --list
KafkaManagerOffsetCache
console-consumer-86137
複製代碼
消費組偏移量
bash-4.4# ./bin/kafka-consumer-groups.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --describe --group KafkaManagerOffsetCache
複製代碼
查看 topic 詳情
bash-4.4# ./bin/kafka-topics.sh --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 --describe --topic test
Topic: test PartitionCount: 3 ReplicationFactor: 2 Configs:
Topic: test Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1
Topic: test Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: test Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3
複製代碼
查看.log 數據文件
bash-4.4# ./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.log --print-data-log
Dumping /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1583317546421 size: 72 magic: 2 compresscodec: NONE crc: 1454276831 isvalid: true
| offset: 0 CreateTime: 1583317546421 keysize: -1 valuesize: 4 sequence: -1 headerKeys: [] payload: msg2
baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 72 CreateTime: 1583317550369 size: 72 magic: 2 compresscodec: NONE crc: 3578672322 isvalid: true
| offset: 1 CreateTime: 1583317550369 keysize: -1 valuesize: 4 sequence: -1 headerKeys: [] payload: msg4
baseOffset: 2 lastOffset: 2 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 144 CreateTime: 1583317554831 size: 72 magic: 2 compresscodec: NONE crc: 2727139808 isvalid: true
| offset: 2 CreateTime: 1583317554831 keysize: -1 valuesize: 4 sequence: -1 headerKeys: [] payload: msg6
複製代碼
這裏須要看下本身的文件路徑是什麼,別直接 copy 個人哦
查看.index 索引文件
bash-4.4# ./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.index
Dumping /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.index
offset: 0 position: 0
複製代碼
查看.timeindex 索引文件
bash-4.4# ./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.timeindex --verify-index-only
Dumping /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.timeindex
Found timestamp mismatch in :/kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.timeindex
Index timestamp: 0, log timestamp: 1583317546421
複製代碼
筆者 SpringBoot 版本是 2.2.2.RELEASE
pom.xml 添加依賴
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.4.0.RELEASE</version>
</dependency>
複製代碼
生產者配置
@Configuration
public class KafkaProducerConfig {
/** * producer配置 * @return */
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
// 指定多個kafka集羣多個地址 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.124.5:9093,192.168.124.5:9094,192.168.124.5:9095");
// 重試次數,0爲不啓用重試機制
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
// acks=0 把消息發送到kafka就認爲發送成功
// acks=1 把消息發送到kafka leader分區,而且寫入磁盤就認爲發送成功
// acks=all 把消息發送到kafka leader分區,而且leader分區的副本follower對消息進行了同步就職務發送成功
props.put(ProducerConfig.ACKS_CONFIG,"all");
// 生產者空間不足時,send()被阻塞的時間,默認60s
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000);
// 控制批處理大小,單位爲字節
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
// 批量發送,延遲爲1毫秒,啓用該功能能有效減小生產者發送消息次數,從而提升併發量
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// 生產者可使用的總內存字節來緩衝等待發送到服務器的記錄
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
// 消息的最大大小限制,也就是說send的消息大小不能超過這個限制, 默認1048576(1MB)
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,1048576);
// 客戶端id
props.put(ProducerConfig.CLIENT_ID_CONFIG,"producer.client.id.topinfo");
// 鍵的序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 值的序列化方式
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 壓縮消息,支持四種類型,分別爲:none、lz四、gzip、snappy,默認爲none。
// 消費者默認支持解壓,因此壓縮設置在生產者,消費者無需設置。
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"none");
return props;
}
/** * producer工廠配置 * @return */
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
/** * Producer Template 配置 */
@Bean(name="kafkaTemplate")
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
複製代碼
消費者配置
@Configuration
public class KafkaConsumerConfig {
private static final String GROUP0_ID = "group0";
private static final String GROUP1_ID = "group1";
/** * 1. setAckMode: 消費者手動提交ack * * RECORD: 每處理完一條記錄後提交。 * BATCH(默認): 每次poll一批數據後提交一次,頻率取決於每次poll的調用頻率。 * TIME: 每次間隔ackTime的時間提交。 * COUNT: 處理完poll的一批數據後而且距離上次提交處理的記錄數超過了設置的ackCount就提交。 * COUNT_TIME: TIME和COUNT中任意一條知足即提交。 * MANUAL: 手動調用Acknowledgment.acknowledge()後,而且處理完poll的這批數據後提交。 * MANUAL_IMMEDIATE: 手動調用Acknowledgment.acknowledge()後當即提交。 * * 2. factory.setConcurrency(3); * 此處設置的目的在於:假設 topic test 下有 0、一、2三個 partition,Spring Boot中只有一個 @KafkaListener() 消費者訂閱此 topic,此處設置併發爲3, * 啓動後 會有三個不一樣的消費者分別訂閱 p0、p一、p2,本地實際有三個消費者線程。 * 而 factory.setConcurrency(1); 的話 本地只有一個消費者線程, p0、p一、p2被同一個消費者訂閱。 * 因爲 一個partition只能被同一個消費者組下的一個消費者訂閱,對於只有一個 partition的topic,即便設置 併發爲3,也只會有一個消費者,多餘的消費者沒有 partition能夠訂閱。 * * 3. factory.setBatchListener(true); * 設置批量消費 ,每一個批次數量在Kafka配置參數ConsumerConfig.MAX_POLL_RECORDS_CONFIG中配置, * 限制的是 一次批量接收的最大條數,而不是 等到達到最大條數才接收,這點容易被誤解。 * 實際測試時,接收是實時的,當生產者大量寫入時,一次批量接收的消息數量爲 配置的最大條數。 */
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String>
factory = new ConcurrentKafkaListenerContainerFactory<>();
// 設置消費者工廠
factory.setConsumerFactory(consumerFactory());
// 設置爲批量消費,每一個批次數量在Kafka配置參數中設置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
factory.setBatchListener(true);
// 消費者組中線程數量,消費者數量<=partition數量,即便配置的消費者數量大於partition數量,多餘消費者沒法消費到數據。
factory.setConcurrency(4);
// 拉取超時時間
factory.getContainerProperties().setPollTimeout(3000);
// 手動提交
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
Map<String, Object> map = consumerConfigs();
map.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP0_ID);
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
// @Bean
// KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory1() {
// ConcurrentKafkaListenerContainerFactory<Integer, String>
// factory = new ConcurrentKafkaListenerContainerFactory<>();
// // 設置消費者工廠
// factory.setConsumerFactory(consumerFactory1());
// // 設置爲批量消費,每一個批次數量在Kafka配置參數中設置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
// factory.setBatchListener(true);
// // 消費者組中線程數量,消費者數量<=partition數量,即便配置的消費者數量大於partition數量,多餘消費者沒法消費到數據。
// factory.setConcurrency(3);
// // 拉取超時時間
// factory.getContainerProperties().setPollTimeout(3000);
// // 手動提交
// factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
// return factory;
// }
//
// public ConsumerFactory<Integer, String> consumerFactory1() {
// Map<String, Object> map = consumerConfigs();
// map.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP1_ID);
// return new DefaultKafkaConsumerFactory<>(consumerConfigs());
// }
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
// Kafka地址
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.124.5:9093,192.168.124.5:9094,192.168.124.5:9095");
// 是否自動提交offset偏移量(默認true)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// 批量消費
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
// 消費者組
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-default");
// 自動提交的頻率(ms)
// propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
// Session超時設置
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
// 鍵的反序列化方式
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 值的反序列化方式
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// offset偏移量規則設置:
// (1)、earliest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
// (2)、latest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據
// (3)、none:topic各分區都存在已提交的offset時,從offset後開始消費;只要有一個分區不存在已提交的offset,則拋出異常
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return props;
}
}
複製代碼
主題配置
@Configuration
public class KafkaTopicConfig {
/** * 定義一個KafkaAdmin的bean,能夠自動檢測集羣中是否存在topic,不存在則建立 */
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
// 指定多個kafka集羣多個地址,例如:192.168.2.11,9092,192.168.2.12:9092,192.168.2.13:9092
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.124.5:9093,192.168.124.5:9094,192.168.124.5:9095");
return new KafkaAdmin(configs);
}
/** * 建立 Topic */
@Bean
public NewTopic topicinfo() {
// 建立topic,須要指定建立的topic的"名稱"、"分區數"、"副本數量(副本數數目設置要小於Broker數量)"
return new NewTopic("test", 3, (short) 2);
}
}
複製代碼
消費者服務
@Slf4j
@Service
public class KafkaConsumerService {
// /**
// * 單條消費
// * @param message
// */
// @KafkaListener(id = "id0", topics = {Constant.TOPIC}, containerFactory="kafkaListenerContainerFactory")
// public void kafkaListener0(String message){
// log.info("consumer:group0 --> message:{}", message);
// }
//
// @KafkaListener(id = "id1", topics = {Constant.TOPIC}, groupId = "group1")
// public void kafkaListener1(String message){
// log.info("consumer:group1 --> message:{}", message);
// }
// /**
// * 監聽某個 Topic 的某個分區示例,也能夠監聽多個 Topic 的分區
// * 爲何找不到group2呢?
// * @param message
// */
// @KafkaListener(id = "id2", groupId = "group2", topicPartitions = { @TopicPartition(topic = Constant.TOPIC, partitions = { "0" }) })
// public void kafkaListener2(String message) {
// log.info("consumer:group2 --> message:{}", message);
// }
//
// /**
// * 獲取監聽的 topic 消息頭中的元數據
// * @param message
// * @param topic
// * @param key
// */
// @KafkaListener(id = "id3", topics = Constant.TOPIC, groupId = "group3")
// public void kafkaListener(@Payload String message,
// @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
// @Header(KafkaHeaders.RECEIVED_PARTITION_ID) String partition,
// @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
// Long threadId = Thread.currentThread().getId();
// log.info("consumer:group3 --> message:{}, topic:{}, partition:{}, key:{}, threadId:{}", message, topic, partition, key, threadId);
// }
//
// /**
// * 監聽 topic 進行批量消費
// * @param messages
// */
// @KafkaListener(id = "id4", topics = Constant.TOPIC, groupId = "group4")
// public void kafkaListener(List<String> messages) {
// for(String msg:messages){
// log.info("consumer:group4 --> message:{}", msg);
// }
// }
//
// /**
// * 監聽topic並手動提交偏移量
// * @param messages
// * @param acknowledgment
// */
// @KafkaListener(id = "id5", topics = Constant.TOPIC,groupId = "group5")
// public void kafkaListener(List<String> messages, Acknowledgment acknowledgment) {
// for(String msg:messages){
// log.info("consumer:group5 --> message:{}", msg);
// }
// // 觸發提交offset偏移量
// acknowledgment.acknowledge();
// }
//
// /**
// * 模糊匹配多個 Topic
// * @param message
// */
// @KafkaListener(id = "id6", topicPattern = "test.*",groupId = "group6")
// public void annoListener2(String message) {
// log.error("consumer:group6 --> message:{}", message);
// }
/** * 完整consumer * @return */
@KafkaListener(id = "id7", topics = {Constant.TOPIC}, groupId = "group7")
public boolean consumer4(List<ConsumerRecord<?, ?>> data) {
for (int i=0; i<data.size(); i++) {
ConsumerRecord<?, ?> record = data.get(i);
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
Long threadId = Thread.currentThread().getId();
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
log.info("consumer:group7 --> message:{}, topic:{}, partition:{}, key:{}, offset:{}, threadId:{}", message.toString(), record.topic(), record.partition(), record.key(), record.offset(), threadId);
}
}
return true;
}
}
複製代碼
生產者服務
@Service
public class KafkaProducerService {
@Autowired
private KafkaTemplate kafkaTemplate;
/** * producer 同步方式發送數據 * @param topic topic名稱 * @param key 通常用業務id,相同業務在同一partition保證消費順序 * @param message producer發送的數據 */
public void sendMessageSync(String topic, String key, String message) throws InterruptedException, ExecutionException, TimeoutException {
// 默認輪詢partition
kafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS);
// // 根據key進行hash運算,再將運算結果寫入到不一樣partition
// kafkaTemplate.send(topic, key, message).get(10, TimeUnit.SECONDS);
// // 第二個參數爲partition,當partition和key同時設置時partition優先。
// kafkaTemplate.send(topic, 0, key, message);
// // 組裝消息
// Message msg = MessageBuilder.withPayload("Send Message(payload,headers) Test")
// .setHeader(KafkaHeaders.MESSAGE_KEY, key)
// .setHeader(KafkaHeaders.TOPIC, topic)
// .setHeader(KafkaHeaders.PREFIX,"kafka_")
// .build();
// kafkaTemplate.send(msg).get(10, TimeUnit.SECONDS);
// // 組裝消息
// ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test", "Send ProducerRecord(topic,value) Test");
// kafkaTemplate.send(producerRecord).get(10, TimeUnit.SECONDS);
}
/** * producer 異步方式發送數據 * @param topic topic名稱 * @param message producer發送的數據 */
public void sendMessageAsync(String topic, String message) {
ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(topic, message);
// 設置異步發送消息獲取發送結果後執行的動做
ListenableFutureCallback listenableFutureCallback = new ListenableFutureCallback<SendResult<Integer, String>>() {
@Override
public void onSuccess(SendResult<Integer, String> result) {
System.out.println("success");
}
@Override
public void onFailure(Throwable ex) {
System.out.println("failure");
}
};
// 將listenableFutureCallback與異步發送消息對象綁定
future.addCallback(listenableFutureCallback);
}
public void test(String topic, Integer partition, String key, String message) throws InterruptedException, ExecutionException, TimeoutException {
kafkaTemplate.send(topic, partition, key, message).get(10, TimeUnit.SECONDS);
}
}
複製代碼
web 測試
@RestController
public class KafkaProducerController {
@Autowired
private KafkaProducerService producerService;
@GetMapping("/sync")
public void sendMessageSync(@RequestParam String topic) throws InterruptedException, ExecutionException, TimeoutException {
producerService.sendMessageSync(topic, null, "同步發送消息測試");
}
@GetMapping("/async")
public void sendMessageAsync(){
producerService.sendMessageAsync("test","異步發送消息測試");
}
@GetMapping("/test")
public void test(@RequestParam String topic, @RequestParam(required = false) Integer partition, @RequestParam(required = false) String key, @RequestParam String message) throws InterruptedException, ExecutionException, TimeoutException {
producerService.test(topic, partition, key, message);
}
}
複製代碼
若是您以爲寫的還不錯,請關注公衆號 【當我趕上你】, 您的支持是我最大的動力。