歡迎關注筆者微信公衆號: 小哈學Java, 每日持續推送 Java 領域乾貨文章,月送福利哦!!php
我的網站: www.exception.site/springboot/…java
Kafka 是 Apache 基金會開源的一個分佈式發佈 - 訂閱消息中間件,流處理平臺。 它起源於 LinkedIn,由 Scala 和 Java兩種語言編寫而成。於 2011 年成爲 Apache 項目,2012 成爲 Apache 基金會下頂級項目。git
Kafka 專爲分佈式高吞吐系統而設計。相比較其餘消息中間件,如 RabbitMq 等,Kafka 具備更好的吞吐量,內置分區,複製和固有的容錯能力,使得它很是適合應用在大數據領域。另外,Kafka 還支持離線、在線消費消息。github
接下來,小哈爲你們演示一下,在 Linux 系統中,採用最簡單的單機安裝方式, 由於本文着重點仍是介紹 Spring Boot 2.x 快速集成整合 Kafka.web
訪問 Kafka 官網 kafka.apache.org/downloads,下載 tgz 包, 這裏演示版本爲最新的 2.3.0 版本。spring
下載下來事後,放置到指定位置,執行命令解壓:apache
tar -zxvf kafka_2.11-2.3.0.tgz
複製代碼
解壓完成後,進入 Kafka 目錄下:json
cd kafka_2.11-2.3.0
複製代碼
經過 bin 目錄下的 zookeeper-server-start.sh
啓動腳本,來啓動 zk 單節點實例:bootstrap
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
複製代碼
經過 bin 目錄下的 kafka-server-start.sh
來啓動 :數組
bin/kafka-server-start.sh config/server.properties
複製代碼
注意:Kafka 默認使用 9092 端口,注意關閉防火牆,阿里雲服務器的話,記得添加安全組。
新建一個 Spring Boot 2.x Web 工程。
小哈這裏完整的 maven 依賴以下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>site.exception</groupId>
<artifactId>spring-boot-kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-boot-kafka</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- 阿里巴巴 fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.58</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
複製代碼
修改 application.yml 文件,添加 kafka 相關配置:
spring:
kafka:
# 指定 kafka 地址,我這裏在本地,直接就 localhost, 若外網地址,注意修改【PS: 能夠指定多個】
bootstrap-servers: localhost:9092
consumer:
# 指定 group_id
group-id: group_id
auto-offset-reset: earliest
# 指定消息key和消息體的編解碼方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
# 指定消息key和消息體的編解碼方式
key-deserializer: org.apache.kafka.common.serialization.StringSerializer
value-deserializer: org.apache.kafka.common.serialization.StringSerializer
複製代碼
auto.offset.reset
配置有3個值能夠設置,分別以下:
offset
時,從提交的 offset
開始消費;無提交的 offset
時,從頭開始消費;offset
時,從提交的 offset
開始消費;無提交的 offset
時,消費新產生的該分區下的數據;topic
各分區都存在已提交的 offset
時,從 offset
後開始消費;只要有一個分區不存在已提交的 offset
,則拋出異常;默認建議用 earliest
, 設置該參數後 kafka出錯後重啓,找到未消費的offset能夠繼續消費。
而 latest 這個設置容易丟失消息,假如 kafka 出現問題,還有數據往topic中寫,這個時候重啓kafka,這個設置會從最新的offset開始消費, 中間出問題的哪些就無論了。
none 這個設置沒有用過,兼容性太差,常常出問題。
模擬業務系統中,用戶每下一筆訂單,就發送一個消息,供其餘服務消費:
/** * @author 犬小哈(公衆號:小哈學Java) * @date 2019/4/12 * @time 下午3:05 * @discription 訂單實體類 **/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Order {
/** * 訂單id */
private long orderId;
/** * 訂單號 */
private String orderNum;
/** * 訂單建立時間 */
private LocalDateTime createTime;
}
複製代碼
新建一個 KafkaProvider
消息提供者類,源碼以下:
/** * @author 犬小哈(公衆號:小哈學Java) * @date 2019/4/12 * @time 下午3:05 * @discription 消息提供者 **/
@Component
@Slf4j
public class KafkaProvider {
/** * 消息 TOPIC */
private static final String TOPIC = "xiaoha";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(long orderId, String orderNum, LocalDateTime createTime) {
// 構建一個訂單類
Order order = Order.builder()
.orderId(orderId)
.orderNum(orderNum)
.createTime(createTime)
.build();
// 發送消息,訂單類的 json 做爲消息體
ListenableFuture<SendResult<String, String>> future =
kafkaTemplate.send(TOPIC, JSONObject.toJSONString(order));
// 監聽回調
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
log.info("## Send message fail ...");
}
@Override
public void onSuccess(SendResult<String, String> result) {
log.info("## Send message success ...");
}
});
}
}
複製代碼
消息發送出去了,固然就須要一個消費者,消費者拿到消息後,再作相關的業務處理,這裏,小哈僅僅是打印消息體。
添加 KafkaConsumer
消費者類:
/** * @author 犬小哈(公衆號:小哈學Java) * @date 2019/4/12 * @time 下午3:05 * @discription 消息消費者 **/
@Component
@Slf4j
public class KafkaConsumer {
@KafkaListener(topics = "xiaoha", groupId = "group_id")
public void consume(String message) {
log.info("## consume message: {}", message);
}
}
複製代碼
經過 @KafkaListener
註解,咱們能夠指定須要監聽的 topic
以及 groupId
, 注意,這裏的 topics
是個數組,意味着咱們能夠指定多個 topic
,如:@KafkaListener(topics = {"xiaoha", "xiaoha2"}, groupId = "group_id")
。
注意:消息發佈者的 TOPIC 須要保持與消費者監聽的 TOPIC 一致,否者消費不到消息。
新建單元測試,功能測試消息發佈,以及消費。
/** * @author 犬小哈(公衆號:小哈學Java) * @date 2019/4/12 * @time 下午3:05 * @discription **/
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringBootKafkaApplicationTests {
@Autowired
private KafkaProvider kafkaProvider;
@Test
public void sendMessage() throws InterruptedException {
// 發送 1000 個消息
for (int i = 0; i < 1000; i++) {
long orderId = i+1;
String orderNum = UUID.randomUUID().toString();
kafkaProvider.sendMessage(orderId, orderNum, LocalDateTime.now());
}
TimeUnit.MINUTES.sleep(1);
}
}
複製代碼
發送 1000 個消息,看消息是否可以被正常發佈與消費,控制檯日誌以下:
能夠發現,1000 個消息被成功發送,且被正常消費。
咱們再驗證下 Kafka 的 topic 列表,看 xiaoha
這個topic
是否正常被建立, 執行 bin
目錄下查看 topic
列表的 kafka-topics.sh
腳本:
bin/kafka-topics.sh --list --zookeeper localhost:2181
複製代碼
好了,大功告成!
小哈今天主要和你們分享了,如何安裝單機版的 kafka 環境、如何在 Spring Boot 2.x 中快速集成消息中間件 Kafka,以及演示了相關示例代碼來發布消息、消費消息,但願你們看完事後有所收穫,下期見!