Kafka從入門到進階

1. Apache Kafka是一個分佈式流平臺java

1.1 流平臺有三個關鍵功能:web

  1. 發佈和訂閱流記錄,相似於一個消息隊列或企業消息系統
  2. 以一種容錯的持久方式存儲記錄流
  3. 在流記錄生成的時候就處理它們

1.2 Kafka一般用於兩大類應用:spring

若是想學習Java工程化、高性能及分佈式、深刻淺出。微服務、Spring,MyBatis,Netty源碼分析的朋友能夠加個人Java高級交流:854630135,羣裏有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給你們。數據庫

  1. 構建實時流數據管道,在系統或應用程序之間可靠地獲取數據
  2. 構建對數據流進行轉換或輸出的實時流媒體應用程序

1.3 有幾個特別重要的概念:apache

Kafka is run as a cluster on one or more servers that can span multiple datacenters.bootstrap

The Kafka cluster stores streams of records in categories called topics.服務器

Each record consists of a key, a value, and a timestamp.app

Kafka做爲集羣運行在一個或多個能夠跨多個數據中心的服務器上負載均衡

從這句話表達了三個意思:maven

  1. Kafka是以集羣方式運行的
  2. 集羣中能夠只有一臺服務器,也有可能有多臺服務器。也就是說,一臺服務器也是一個集羣,多臺服務器也能夠組成一個集羣
  3. 這些服務器能夠跨多個數據中心

Kafka集羣按分類存儲流記錄,這個分類叫作主題

這句話表達瞭如下幾個信息:

  1. 流記錄是分類存儲的,也就說記錄是歸類的
  2. 咱們稱這種分類爲主題
  3. 簡單地來說,記錄是按主題劃分歸類存儲的

每一個記錄由一個鍵、一個值和一個時間戳組成

1.4 Kafka有四個核心API:

  • Producer API :容許應用發佈一條流記錄到一個或多個主題
  • Consumer API :容許應用訂閱一個或多個主題,並處理流記錄
  • Streams API :容許應用做爲一個流處理器,從一個或多個主題那裏消費輸入流,並將輸出流輸出到一個或多個輸出主題,從而有效地講輸入流轉換爲輸出流
  • Connector API :容許將主題鏈接到已經存在的應用或者數據系統,以構建並容許可重用的生產者或消費者。例如,一個關係型數據庫的鏈接器可能捕獲到一張表的每一次變動

(畫外音:我理解這四個核心API其實就是:發佈、訂閱、轉換處理、從第三方採集數據。)

在Kafka中,客戶端和服務器之間的通訊是使用簡單的、高性能的、與語言無關的TCP協議完成的。

2. Topics and Logs(主題和日誌)

一個topic是一個分類,或者說是記錄被髮布的時候的一個名字(畫外音:能夠理解爲記錄要被髮到哪兒去)。

在Kafka中,topic老是有多個訂閱者,所以,一個topic可能有0個,1個或多個訂閱該數據的消費者。

對於每一個主題,Kafka集羣維護一個分區日誌,以下圖所示:

Kafka從入門到進階

 

每一個分區都是一個有序的、不可變的記錄序列,並且記錄會不斷的被追加,一條記錄就是一個結構化的提交日誌(a structured commit log)。

分區中的每條記錄都被分配了一個連續的id號,這個id號被叫作offset(偏移量),這個偏移量惟一的標識出分區中的每條記錄。(PS:若是把分區比做數據庫表的話,那麼偏移量就是主鍵)

Kafka集羣持久化全部已發佈的記錄,不管它們有沒有被消費,記錄被保留的時間是能夠配置的。例如,若是保留策略被設置爲兩天,那麼在記錄發佈後的兩天內,可使用它,以後將其丟棄以釋放空間。在對數據大小方面,Kafka的性能是高效的,恆定常量級的,所以長時間存儲數據不是問題。

Kafka從入門到進階

 

事實上,惟一維護在每一個消費者上的元數據是消費者在日誌中的位置或者叫偏移量。偏移量是由消費者控制的:一般消費者在讀取記錄的時候會線性的增長它的偏移量,可是,事實上,因爲位置(偏移量)是由消費者控制的,全部它能夠按任意它喜歡的順序消費記錄。例如:一個消費者能夠重置到一個較舊的偏移量來從新處理以前已經處理過的數據,或者跳轉到最近的記錄並從「如今」開始消費。

這種特性意味着消費者很是廉價————他們能夠來來去去的消息而不會對集羣或者其它消費者形成太大影響。

日誌中的分區有幾個用途。首先,它們容許日誌的規模超出單個服務器的大小。每一個獨立分區都必須與宿主的服務器相匹配,但一個主題可能有多個分區,因此它能夠處理任意數量的數據。第二,它們做爲並行的單位——稍後再進一步。

畫外音:簡單地來講,日誌分區的做用有兩個:1、日誌的規模再也不受限於單個服務器;2、分區意味着能夠並行。

什麼意思呢?主題創建在集羣之上,每一個主題維護了一個分區日誌,顧名思義,日誌是分區的;每一個分區所在的服務器的資源(好比:CPU、內存、帶寬、磁盤等)是有限的,若是不分區(能夠理解爲等同於只有一個)的話,必然受限於這個分區所在的服務器,那麼多個分區的話就不同了,就突破了這種限制,服務器能夠隨便加,分區也能夠隨便加。

3. Distribution(分佈)

日誌的分區分佈在集羣中的服務器上,每一個服務器處理數據,而且分區請求是共享的。每一個分區被複制到多個服務器上以實現容錯,到底複製到多少個服務器上是能夠配置的。

Each partition is replicated across a configurable number of servers for fault tolerance.

每一個分區都有一個服務器充當「leader」角色,而且有0個或者多個服務器做爲「followers」。leader處理對這個分區的全部讀和寫請求,而followers被動的從leader那裏複製數據。若是leader失敗,followers中的其中一個會自動變成新的leader。每一個服務器充當一些分區的「leader」的同時也是其它分區的「follower」,所以在整個集羣中負載是均衡的。

也就是說,每一個服務器既是「leader」也是「follower」。咱們知道一個主題可能有多個分區,一個分區可能在一個服務器上也可能跨多個服務器,然而這並不覺得着一臺服務器上只有一個分區,是可能有多個分區的。每一個分區中有一個服務器充當「leader」,其他是「follower」。leader負責處理這個它做爲leader所負責的分區的全部讀寫請求,而該分區中的follow只是被動複製leader的數據。這個有點兒像HDFS中的副本機制。例如:分區-1有服務器A和B組成,A是leader,B是follower,有請求要往分區-1中寫數據的時候就由A處理,而後A把剛纔寫的數據同步給B,這樣的話正常請求至關於A和B的數據是同樣的,都有分區-1的所有數據,若是A宕機了,B成爲leader,接替A繼續處理對分區-1的讀寫請求。

須要注意的是,分區是一個虛擬的概念,是一個邏輯單元。

4. Producers(生產者)

若是想學習Java工程化、高性能及分佈式、深刻淺出。微服務、Spring,MyBatis,Netty源碼分析的朋友能夠加個人Java高級交流:854630135,羣裏有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給你們。

生產者發佈數據到它們選擇的主題中。生產者負責選擇將記錄投遞到哪一個主題的哪一個分區中。要作這件事情,能夠簡單地用循環方式以到達負載均衡,或者根據一些語義分區函數(好比:基於記錄中的某些key)

5. Consumers(消費者)

消費者用一個消費者組名來標識它們本身(PS:至關於給本身貼一個標籤,標籤的名字是組名,以代表本身屬於哪一個組),而且每一條發佈到主題中的記錄只會投遞給每一個訂閱的消費者組中的其中一個消費者實例。消費者實例多是單獨的進程或者在單獨的機器上。

若是全部的消費者實例都使用相同的消費者組,那麼記錄將會在這些消費者之間有效的負載均衡。

若是全部的消費者實例都使用不一樣的消費者組,那麼每條記錄將會廣播給全部的消費者進程。

Kafka從入門到進階

 

上圖中其實那個Kafka Cluster換成Topic會更準確一些

一個Kafka集羣有2個服務器,4個分區(P0-P3),有兩個消費者組。組A中有2個消費者實例,組B中有4個消費者實例。

一般咱們會發現,主題不會有太多的消費者組,每一個消費者組是一個「邏輯訂閱者」(以消費者組的名義訂閱主題,而非以消費者實例的名義去訂閱)。每一個組由許多消費者實例組成,以實現可擴展性和容錯。這仍然是發佈/訂閱,只不過訂閱者是一個消費者羣體,而非單個進程。

在Kafka中,這種消費方式是經過用日誌中的分區除以使用者實例來實現的,這樣能夠保證在任意時刻每一個消費者都是排它的消費,即「公平共享」。Kafka協議動態的處理維護組中的成員。若是有心的實例加入到組中,它們將從組中的其它成員那裏接管一些分區;若是組中有一個實例死了,那麼它的分區將會被分給其它實例。

(畫外音:什麼意思呢?舉個例子,在上面的圖中,4個分區,組A有2個消費者,組B有4個消費者,那麼對A來說組中的每一個消費者負責4/2=2個分區,對組B來講組中的每一個消費者負責4/4=1個分區,並且同一時間消息只能被組中的一個實例消費。若是組中的成員數量有變化,則從新分配。)

Kafka只提供分區下的記錄的總的順序,而不提供主題下不一樣分區的總的順序。每一個分區結合按key劃分數據的能力排序對大多數應用來講是足夠的。然而,若是你須要主題下總的記錄順序,你能夠只使用一個分區,這樣作的作的話就意味着每一個消費者組中只能有一個消費者實例。

6. 保證

在一個高級別的Kafka給出下列保證:

  1. 被一個生產者發送到指定主題分區的消息將會按照它們被髮送的順序追加到分區中。也就是說,若是記錄M1和M2是被同一個生產者發送到同一個分區的,並且M1是先發送的,M2是後發送的,那麼在分區中M1的偏移量必定比M2小,而且M1出如今日誌中的位置更靠前。
  2. 一個消費者看到記錄的順序和它們在日誌中存儲的順序是同樣的。
  3. 對於一個副本因子是N的主題,咱們能夠容忍最多N-1個服務器失敗,而不會丟失已經提交給日誌的任何記錄。

7. Spring Kafka

Spring提供了一個「模板」做爲發送消息的高級抽象。它也經過使用@KafkaListener註釋和「監聽器容器」提供對消息驅動POJOs的支持。這些庫促進了依賴注入和聲明式的使用。

Kafka從入門到進階

 

7.1 純Java方式

1 package com.cjs.example.quickstart;
 2 
 3 import org.apache.kafka.clients.consumer.ConsumerConfig;
 4 import org.apache.kafka.clients.consumer.ConsumerRecord;
 5 import org.apache.kafka.clients.producer.ProducerConfig;
 6 import org.apache.kafka.common.serialization.IntegerDeserializer;
 7 import org.apache.kafka.common.serialization.IntegerSerializer;
 8 import org.apache.kafka.common.serialization.StringDeserializer;
 9 import org.apache.kafka.common.serialization.StringSerializer;
10 import org.springframework.kafka.core.*;
11 import org.springframework.kafka.listener.KafkaMessageListenerContainer;
12 import org.springframework.kafka.listener.MessageListener;
13 import org.springframework.kafka.listener.config.ContainerProperties;
14 
15 import java.util.HashMap;
16 import java.util.Map;
17 
18 public class PureJavaDemo {
19 
20 /**
21 * 生產者配置
22 */
23 private static Map<String, Object> senderProps() {
24 Map<String, Object> props = new HashMap<>();
25 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.5:9093");
26 props.put(ProducerConfig.RETRIES_CONFIG, 0);
27 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
28 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
29 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
30 return props;
31 }
32 
33 /**
34 * 消費者配置
35 */
36 private static Map<String, Object> consumerProps() {
37 Map<String, Object> props = new HashMap<>();
38 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.5:9093");
39 props.put(ConsumerConfig.GROUP_ID_CONFIG, "hello");
40 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
41 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
42 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
43 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
44 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
45 return props;
46 }
47 
48 /**
49 * 發送模板配置
50 */
51 private static KafkaTemplate<Integer, String> createTemplate() {
52 Map<String, Object> senderProps = senderProps();
53 ProducerFactory<Integer, String> producerFactory = new DefaultKafkaProducerFactory<>(senderProps);
54 KafkaTemplate<Integer, String> kafkaTemplate = new KafkaTemplate<>(producerFactory);
55 return kafkaTemplate;
56 }
57 
58 /**
59 * 消息監聽器容器配置
60 */
61 private static KafkaMessageListenerContainer<Integer, String> createContainer() {
62 Map<String, Object> consumerProps = consumerProps();
63 ConsumerFactory<Integer, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProps);
64 ContainerProperties containerProperties = new ContainerProperties("test");
65 KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
66 return container;
67 }
68 
69 
70 public static void main(String[] args) throws InterruptedException {
71 String topic1 = "test"; // 主題
72 
73 KafkaMessageListenerContainer container = createContainer();
74 ContainerProperties containerProperties = container.getContainerProperties();
75 containerProperties.setMessageListener(new MessageListener<Integer, String>() {
76 @Override
77 public void onMessage(ConsumerRecord<Integer, String> record) {
78 System.out.println("Received: " + record);
79 }
80 });
81 container.setBeanName("testAuto");
82 
83 container.start();
84 
85 KafkaTemplate<Integer, String> kafkaTemplate = createTemplate();
86 kafkaTemplate.setDefaultTopic(topic1);
87 
88 kafkaTemplate.sendDefault(0, "foo");
89 kafkaTemplate.sendDefault(2, "bar");
90 kafkaTemplate.sendDefault(0, "baz");
91 kafkaTemplate.sendDefault(2, "qux");
92 
93 kafkaTemplate.flush();
94 container.stop();
95 
96 System.out.println("結束");
97 }
98 
99 }

運行結果:

Received: ConsumerRecord(topic = test, partition = 0, offset = 67, CreateTime = 1533300970788, serialized key size = 4, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = 0, value = foo)
Received: ConsumerRecord(topic = test, partition = 0, offset = 68, CreateTime = 1533300970793, serialized key size = 4, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = 2, value = bar)
Received: ConsumerRecord(topic = test, partition = 0, offset = 69, CreateTime = 1533300970793, serialized key size = 4, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = 0, value = baz)
Received: ConsumerRecord(topic = test, partition = 0, offset = 70, CreateTime = 1533300970793, serialized key size = 4, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = 2, value = qux)

7.2 更簡單一點兒,用SpringBoot

1 package com.cjs.example.quickstart;
 2 
 3 import org.apache.kafka.clients.consumer.ConsumerRecord;
 4 import org.springframework.beans.factory.annotation.Autowired;
 5 import org.springframework.boot.CommandLineRunner;
 6 import org.springframework.context.annotation.Bean;
 7 import org.springframework.context.annotation.Configuration;
 8 import org.springframework.kafka.annotation.KafkaListener;
 9 import org.springframework.kafka.core.KafkaTemplate;
10 
11 @Configuration
12 public class JavaConfigurationDemo {
13 
14 @KafkaListener(topics = "test")
15 public void listen(ConsumerRecord<String, String> record) {
16 System.out.println("收到消息: " + record);
17 }
18 
19 @Bean
20 public CommandLineRunner commandLineRunner() {
21 return new MyRunner();
22 }
23 
24 class MyRunner implements CommandLineRunner {
25 
26 @Autowired
27 private KafkaTemplate<String, String> kafkaTemplate;
28 
29 @Override
30 public void run(String... args) throws Exception {
31 kafkaTemplate.send("test", "foo1");
32 kafkaTemplate.send("test", "foo2");
33 kafkaTemplate.send("test", "foo3");
34 kafkaTemplate.send("test", "foo4");
35 }
36 }
37 }

application.properties配置

spring.kafka.bootstrap-servers=192.168.101.5:9092
spring.kafka.consumer.group-id=world

8. 生產者

1 package com.cjs.example.send;
 2 
 3 import org.apache.kafka.clients.producer.ProducerConfig;
 4 import org.apache.kafka.common.serialization.IntegerSerializer;
 5 import org.apache.kafka.common.serialization.StringSerializer;
 6 import org.springframework.context.annotation.Bean;
 7 import org.springframework.context.annotation.Configuration;
 8 import org.springframework.kafka.core.DefaultKafkaProducerFactory;
 9 import org.springframework.kafka.core.KafkaTemplate;
10 import org.springframework.kafka.core.ProducerFactory;
11 
12 import java.util.HashMap;
13 import java.util.Map;
14 
15 @Configuration
16 public class Config {
17 
18 public Map<String, Object> producerConfigs() {
19 Map<String, Object> props = new HashMap<>();
20 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.5:9092");
21 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
22 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
23 return props;
24 }
25 
26 public ProducerFactory<Integer, String> producerFactory() {
27 return new DefaultKafkaProducerFactory<>(producerConfigs());
28 }
29 
30 @Bean
31 public KafkaTemplate<Integer, String> kafkaTemplate() {
32 return new KafkaTemplate<Integer, String>(producerFactory());
33 }
34 
35 }
1 package com.cjs.example.send;
 2 
 3 import org.springframework.beans.factory.annotation.Autowired;
 4 import org.springframework.boot.CommandLineRunner;
 5 import org.springframework.kafka.core.KafkaTemplate;
 6 import org.springframework.kafka.support.SendResult;
 7 import org.springframework.stereotype.Component;
 8 import org.springframework.util.concurrent.ListenableFuture;
 9 import org.springframework.util.concurrent.ListenableFutureCallback;
10 
11 @Component
12 public class MyCommandLineRunner implements CommandLineRunner {
13 
14 @Autowired
15 private KafkaTemplate<Integer, String> kafkaTemplate;
16 
17 public void sendTo(Integer key, String value) {
18 ListenableFuture<SendResult<Integer, String>> listenableFuture = kafkaTemplate.send("test", key, value);
19 listenableFuture.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
20 @Override
21 public void onFailure(Throwable throwable) {
22 System.out.println("發送失敗啦");
23 throwable.printStackTrace();
24 }
25 
26 @Override
27 public void onSuccess(SendResult<Integer, String> sendResult) {
28 System.out.println("發送成功," + sendResult);
29 }
30 });
31 }
32 
33 @Override
34 public void run(String... args) throws Exception {
35 sendTo(1, "aaa");
36 sendTo(2, "bbb");
37 sendTo(3, "ccc");
38 }
39 
40 
41 }

運行結果:

發送成功,SendResult [producerRecord=ProducerRecord(topic=test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=1, value=aaa, timestamp=null), recordMetadata=test-0@37]
發送成功,SendResult [producerRecord=ProducerRecord(topic=test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=2, value=bbb, timestamp=null), recordMetadata=test-0@38]
發送成功,SendResult [producerRecord=ProducerRecord(topic=test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=3, value=ccc, timestamp=null), recordMetadata=test-0@39]

9. 消費者@KafkaListener

1 package com.cjs.example.receive;
 2 
 3 import org.apache.kafka.clients.consumer.ConsumerConfig;
 4 import org.apache.kafka.clients.consumer.ConsumerRecord;
 5 import org.apache.kafka.common.serialization.IntegerDeserializer;
 6 import org.apache.kafka.common.serialization.StringDeserializer;
 7 import org.springframework.context.annotation.Bean;
 8 import org.springframework.context.annotation.Configuration;
 9 import org.springframework.kafka.annotation.KafkaListener;
10 import org.springframework.kafka.annotation.TopicPartition;
11 import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
12 import org.springframework.kafka.config.KafkaListenerContainerFactory;
13 import org.springframework.kafka.core.ConsumerFactory;
14 import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
15 import org.springframework.kafka.listener.AbstractMessageListenerContainer;
16 import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
17 import org.springframework.kafka.listener.config.ContainerProperties;
18 import org.springframework.kafka.support.Acknowledgment;
19 import org.springframework.kafka.support.KafkaHeaders;
20 import org.springframework.messaging.handler.annotation.Header;
21 import org.springframework.messaging.handler.annotation.Payload;
22 
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26 
27 @Configuration
28 public class Config2 {
29 
30 @Bean
31 public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
32 ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
33 factory.setConsumerFactory(consumerFactory());
34 factory.setConcurrency(3);
35 ContainerProperties containerProperties = factory.getContainerProperties();
36 containerProperties.setPollTimeout(2000);
37 // containerProperties.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
38 return factory;
39 }
40 
41 private ConsumerFactory<Integer,String> consumerFactory() {
42 return new DefaultKafkaConsumerFactory<>(consumerProps());
43 }
44 
45 private Map<String, Object> consumerProps() {
46 Map<String, Object> props = new HashMap<>();
47 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.5:9092");
48 props.put(ConsumerConfig.GROUP_ID_CONFIG, "hahaha");
49 // props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
50 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
51 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
52 return props;
53 }
54 
55 
56 @KafkaListener(topics = "test")
57 public void listen(String data) {
58 System.out.println("listen 收到: " + data);
59 }
60 
61 
62 @KafkaListener(topics = "test", containerFactory = "kafkaListenerContainerFactory")
63 public void listen2(String data, Acknowledgment ack) {
64 System.out.println("listen2 收到: " + data);
65 ack.acknowledge();
66 }
67 
68 @KafkaListener(topicPartitions = {@TopicPartition(topic = "test", partitions = "0")})
69 public void listen3(ConsumerRecord<?, ?> record) {
70 System.out.println("listen3 收到: " + record.value());
71 }
72 
73 
74 @KafkaListener(id = "xyz", topics = "test")
75 public void listen4(@Payload String foo,
76 @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
77 @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
78 @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
79 @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
80 System.out.println("listen4 收到: ");
81 System.out.println(foo);
82 System.out.println(key);
83 System.out.println(partition);
84 System.out.println(topic);
85 System.out.println(offsets);
86 }
87 
88 }

9.1 Committing Offsets

若是想學習Java工程化、高性能及分佈式、深刻淺出。微服務、Spring,MyBatis,Netty源碼分析的朋友能夠加個人Java高級交流:854630135,羣裏有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給你們。

若是enable.auto.commit設置爲true,那麼kafka將自動提交offset。若是設置爲false,則支持下列AckMode(確認模式)。

消費者poll()方法將返回一個或多個ConsumerRecords

  • RECORD :處理完記錄之後,當監聽器返回時,提交offset
  • BATCH :當對poll()返回的全部記錄進行處理完之後,提交偏offset
  • TIME :當對poll()返回的全部記錄進行處理完之後,只要距離上一次提交已通過了ackTime時間後就提交
  • COUNT :當poll()返回的全部記錄都被處理時,只要從上次提交以來收到了ackCount條記錄,就能夠提交
  • COUNT_TIME :和TIME以及COUNT相似,只要這兩個中有一個爲true,則提交
  • MANUAL :消息監聽器負責調用Acknowledgment.acknowledge()方法,此後和BATCH是同樣的
  • MANUAL_IMMEDIATE :當監聽器調用Acknowledgment.acknowledge()方法後當即提交

10. Spring Boot Kafka

10.1 application.properties

spring.kafka.bootstrap-servers=192.168.101.5:9092

10.2 發送消息

1 package com.cjs.example;
 2 
 3 import org.springframework.beans.factory.annotation.Autowired;
 4 import org.springframework.kafka.core.KafkaTemplate;
 5 import org.springframework.web.bind.annotation.RequestMapping;
 6 import org.springframework.web.bind.annotation.RestController;
 7 
 8 import javax.annotation.Resource;
 9 
10 @RestController
11 @RequestMapping("/msg")
12 public class MessageController {
13 
14 @Resource
15 private KafkaTemplate<String, String> kafkaTemplate;
16 
17 @RequestMapping("/send")
18 public String send(String topic, String key, String value) {
19 kafkaTemplate.send(topic, key, value);
20 return "ok";
21 }
22 
23 }

10.3 接收消息

1 package com.cjs.example;
 2 
 3 import org.apache.kafka.clients.consumer.ConsumerRecord;
 4 import org.springframework.kafka.annotation.KafkaListener;
 5 import org.springframework.kafka.annotation.KafkaListeners;
 6 import org.springframework.stereotype.Component;
 7 
 8 @Component
 9 public class MessageListener {
10 
11 /**
12 * 監聽訂單消息
13 */
14 @KafkaListener(topics = "ORDER", groupId = "OrderGroup")
15 public void listenToOrder(String data) {
16 System.out.println("收到訂單消息:" + data);
17 }
18 
19 /**
20 * 監聽會員消息
21 */
22 @KafkaListener(topics = "MEMBER", groupId = "MemberGroup")
23 public void listenToMember(ConsumerRecord<String, String> record) {
24 System.out.println("收到會員消息:" + record);
25 }
26 
27 /**
28 * 監聽全部消息
29 *
30 * 任意時刻,一條消息只會發給組中的一個消費者
31 *
32 * 消費者組中的成員數量不能超過度區數,這裏分區數是1,所以訂閱該主題的消費者組成員不能超過1
33 */
34 // @KafkaListeners({@KafkaListener(topics = "ORDER", groupId = "OrderGroup"),
35 // @KafkaListener(topics = "MEMBER", groupId = "MemberGroup")})
36 // public void listenToAll(String data) {
37 // System.out.println("啊啊啊");
38 // }
39 
40 }

11. pom.xml

若是想學習Java工程化、高性能及分佈式、深刻淺出。微服務、Spring,MyBatis,Netty源碼分析的朋友能夠加個人Java高級交流:854630135,羣裏有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給你們。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <groupId>com.cjs.example</groupId>
 <artifactId>cjs-kafka-example</artifactId>
 <version>0.0.1-SNAPSHOT</version>
 <packaging>jar</packaging>
 <name>cjs-kafka-example</name>
 <description></description>
 <parent>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-parent</artifactId>
 <version>2.0.4.RELEASE</version>
 <relativePath/> <!-- lookup parent from repository -->
 </parent>
 <properties>
 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
 <java.version>1.8</java.version>
 </properties>
 <dependencies>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-web</artifactId>
 </dependency>
 <dependency>
 <groupId>org.springframework.kafka</groupId>
 <artifactId>spring-kafka</artifactId>
 </dependency>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-test</artifactId>
 <scope>test</scope>
 </dependency>
 </dependencies>
 <build>
 <plugins>
 <plugin>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-maven-plugin</artifactId>
 </plugin>
 </plugins>
 </build>
</project>

若是想學習Java工程化、高性能及分佈式、深刻淺出。微服務、Spring,MyBatis,Netty源碼分析的朋友能夠加個人Java高級交流:854630135,羣裏有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給你們。

相關文章
相關標籤/搜索