在上一章中SpringBoot整合RabbitMQ,已經詳細介紹了消息隊列的做用,這一種咱們直接來學習SpringBoot如何整合kafka發送消息。html
kafka是用Scala和Java語言開發的,高吞吐量的分佈式消息中間件。高吞吐量使它在大數據領域具備自然的優點,被普遍用來記錄日誌。java
注1:圖中的紅色箭頭表示消息的流動過程,藍色表示分區備份,綠色表示kafka集羣註冊到zookeeper。git
注2:在kafka0.9版本以前,消費者消費消息的位置記錄在zookeeper中,在0.9版本以後,消費消息的位置記錄在kafka的一個topic上。github
kafka名詞簡介:redis
採用ack確認機制來保證消息的可靠性。spring
kafka在發送消息後會同步到其餘分區副本,等全部副本都接收到消息後,kafka纔會發送ack進行確認。採用這種模式的劣勢就是當其中一個副本宕機後,則消息生產者就不會收到kafka的ack。數據庫
kafka採用ISR來解決這個問題。apache
ISR:Leader維護的一個和leader保持同步的follower集合。bootstrap
當ISR中的folower完成數據同步以後,leader就會向follower發送ack,若是follower長時間未向leader同步數據,則該follower就會被踢出ISR,該時間閥值的設置參數爲replica.lag.time.max.ms
,默認時間爲10s,leader發生故障後,就會從ISR中選舉新的leader。緩存
注:本文所講的kafka版本爲0.11,在0.9版本之前成爲ISR還有一個條件,就是同步消息的條數。
ack參數配置
0:生產者不等待broker的ack。
1:leader分區接收到消息向生產者發送ack。
-1(all):ISR中的leader和follower同步成功後,向生產者發送ack。
假如leader中有10條消息,向兩個follower同步數據,follower A同步了8條,follower B同步了9條。這時候leader宕機了,follower A和follower B中的消息是不一致的,剩下兩個follower就會從新選舉出一個leader。
LEO(log end offset):每一個副本的最後一個offset
HW(high watermark):全部副本中最小的offset
爲了保證數據的一致性,全部的follower會將各自的log文件高出HW的部分截掉,而後再重新的leader中同步數據。
在kafka0.11版本中引入了一個新特性:冪等性。啓用冪等性後,ack默認爲-1。將生產者中的enable.idompotence
設置爲true,即啓用了冪等性。
開啓冪等性的Producer在初始化的時候會被分配一個PID,發往同一Partition的消息會附帶Sequence Number。Broker端會對<PID,Partition,SeqNumber>作緩存,當具備相同主鍵的消息提交時,Broker只會緩存一條。可是每次重啓PID就會發生變化,所以只能保證一次會話同一分區的消息不重複。
kafka有兩種分配策略,一種是RoundRobin,另外一種是Range
RoundRobin是按照消費者組以輪詢的方式去給消費者分配分區的方式,前提條件是消費者組中的消費者須要訂閱同一個topic。
Range是kafka默認的分配策略,它是經過當前的topic按照必定範圍來分配的,假若有3個分區,消費者組有兩個消費者,則消費者A去消費1和2分區,消費者B去消費3分區。
Kafka 0.9 版本以前,consumer默認將offset保存在zookeeper中,0.9 版本開始,offset保存在kafka的一個內置topic中,該topic爲_consumer_offsets
。
爲了實現跨分區會話的事務,須要引入一個全局惟一的Tracscation ID,並將Producer 得到的PID與之綁定。這樣當Producer重啓後就能夠經過正在進行的Transaction ID得到原來的PID。
爲了管理Transcation ID,kafka引入了一個新的組件Transcation Coordinator。Producer就是經過和Transcation Coordinator交互得到Transction ID對應的任務狀態。
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
spring: kafka: # kafka服務地址 bootstrap-servers: 47.104.155.182:9092 producer: # 生產者消息key序列化方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer # 生產者消息value序列化方式 value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: # 消費者組 group-id: test-consumer-group # 消費者消息value反序列化方式 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 消費者消息value反序列化方式 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
@Component @Slf4j @KafkaListener(topics = {"first-topic"},groupId = "test-consumer-group") public class Consumer { @KafkaHandler public void receive(String message){ log.info("我是消費者,我接收到的消息是:"+message); } }
@RestController public class Producer { @Autowired private KafkaTemplate kafkaTemplate; @GetMapping("send") public void send(){ String message = "你好,我是Java旅途"; // 第一個參數 topic // 第二個參數 消息 kafkaTemplate.send("first-topic",message); } }
star
支持一下!spring-boot-route(一)Controller接收參數的幾種方式
spring-boot-route(二)讀取配置文件的幾種方式
spring-boot-route(五)整合Swagger生成接口文檔
spring-boot-route(六)整合JApiDocs生成接口文檔
spring-boot-route(七)整合jdbcTemplate操做數據庫
spring-boot-route(八)整合mybatis操做數據庫
spring-boot-route(九)整合JPA操做數據庫
spring-boot-route(十一)數據庫配置信息加密
spring-boot-route(十二)整合redis作爲緩存
spring-boot-route(十三)整合RabbitMQ
spring-boot-route(十五)整合RocketMQ
spring-boot-route(十六)使用logback生產日誌文件
spring-boot-route(十七)使用aop記錄操做日誌
spring-boot-route(十八)spring-boot-adtuator監控應用
spring-boot-route(十九)spring-boot-admin監控服務
spring-boot-route(二十)Spring Task實現簡單定時任務
spring-boot-route(二十一)quartz實現動態定時任務
spring-boot-route(二十二)實現郵件發送功能
這個系列的文章都是工做中頻繁用到的知識,學完這個系列,應付平常開發綽綽有餘。若是還想了解其餘內容,掃面下方二維碼告訴我,我會進一步完善這個系列的文章!