kafka最初是由linkedin開發的,是一個分佈式,分區的,多副本的,基於Zookeeper協調的分佈式日誌系統,固然它也能夠當作消息隊列來使用。
常見的能夠用於Web,nginx日誌,訪問日誌,消息服務等等。
因此kafka的應用場景主要有:日誌收集系統和消息系統。html
消費者生產者之間不想相互耦合,只要都遵循一樣的接口約束就行。nginx
這裏主要是爲了保證數據不會丟失,許多消息隊列採用"插入-獲取-刪除"的模式,在把一個消息從隊列中年刪除以前,須要系統明確指出這個消息已經被處理完畢,從而確保數據被安全地保存直到使用完畢。web
支持擴展spring
在訪問量劇增的狀況下,使用消息隊列可以使得關鍵組件頂住忽然的訪問壓力,使得應用仍然須要繼續發揮做用。apache
系統的一部分組件失效時,不會影響整個系統,即便一個處理消息的線程掛掉,加入隊列中的消息也能夠在系統恢復後被處理。json
Kafka保證一個Partition中的消息的有序性。bootstrap
經過一個緩衝層來幫助任務最高效率地執行,寫入隊列的處理儘量地傳遞。緩存
採用異步通訊機制,容許先把消息放入隊列,但並不當即處理,而是在須要的時候再去用它們。安全
Kafka集羣包括一個或者多個服務器,服務器節點稱爲broker。broker存儲topic的數據,若是某個topic有N個partition,集羣有N個broker,那麼每一個broker存儲該topic的一個partition,若是某個topic有N個partition,集羣有N+m個broker,那麼N個broker存儲該topic中的一個partition,剩下的m個broker不存儲該topic的partition數據。若是某個topic的broker數量比partition的數量少,那麼一個broker可能會存儲多個該topic的partition。
在實際生產中應該儘可能避免這種狀況發生,由於很容易形成kafka集羣數據不均衡。服務器
每條發佈到kafka的集羣消息都有一個類別,這個類別稱爲topic。
Topic的數據分割爲一個或者多個partition,每一個partition中的數據使用過個segment文件存儲。partition的數據是有序的,不一樣partition間的數據丟失了數據的順序,若是topic有多個partition,消費數據就不能保證數據的順序,在須要嚴格保證消息的消息順序的場景下,須要將partition數目須要1。
生產者
消費者
每一個Consumer屬於一個特定的ComsumerGroup,可爲每一個Consumer指定GroupName,不指定則爲默認。
每一個Partition有多個副本,其中有且僅有一個Leader,即負責讀寫數據的Partition。
Follower跟隨Leader,全部的寫請求都經過Leader路由,數據變動會廣播到全部的Follower。若是Leader失效,那麼Follower中會選舉出一個新的Leader。
本想繼續寫一寫kafka的架構,高可用設計和其中的一些特性的,可是我這兩天在看這些東西的時候發現這些仍是在一個demo的基礎上再去學習比較好,因此這些留在下一篇寫了。
安裝kafka和Zookeeper,kafka運行須要Zookeeper來支持,來進行心跳等機制,因此在運行kafka以前安裝好Zookeeper。網上帖子不少,就不細寫了,可是我這裏Zookeeper和kafka都是單實例的,並無配置集羣。
IDEA用SpringInitializer創建一個大工程,而後創建KafkaConsumer和KafkaProducer兩個module就好了。
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.18</version> </dependency> </dependencies>
生產者
server.port=8099 # kafka地址 spring.kafka.bootstrap-servers=127.0.0.1:9092 #寫入失敗的時候的重試次數 spring.kafka.producer.retries=0 # 每次批量發送消息的數量 spring.kafka.producer.batch-size=16384 # producer積累數據一次性發送,緩存大小到達這個值就發送數據 spring.kafka.producer.buffer-memory=33554432 #acks = 0 若是設置爲零,則生產者將不會等待來自服務器的任何確認,該記錄將當即添加到套接字緩衝區並視爲已發送。在這種狀況下,沒法保證服務器已收到記錄,而且重試配置將不會生效(由於客戶端一般不會知道任何故障),爲每條記錄返回的偏移量始終設置爲-1。 #acks = 1 這意味着leader會將記錄寫入其本地日誌,但無需等待全部副本服務器的徹底確認便可作出迴應,在這種狀況下,若是leader在確認記錄後當即失敗,但在將數據複製到全部的副本服務器以前,則記錄將會丟失。 #acks = all 這意味着leader將等待完整的同步副本集以確認記錄,這保證了只要至少一個同步副本服務器仍然存活,記錄就不會丟失,這是最強有力的保證,這至關於acks = -1的設置。 spring.kafka.producer.acks=1 # 指定消息key和消息體的編解碼方式 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
消費者
server.port=8090 # kafka地址 spring.kafka.bootstrap-servers=127.0.0.1:9092 # 自動提交的時間間隔 spring.kafka.consumer.auto-commit-interval=1S # 指定消費者在讀取一個沒有偏移量的分區或者偏移量無效的分區的狀況下如何處理。 # latest在偏移量無效的狀況下,消費者將從最新的記錄開始讀取數據 # earliest在偏移量無效的狀況下,消費者將從起始位置讀取分區的記錄 spring.kafka.consumer.auto-offset-reset=earliest # 是否自動提交偏移量,爲了不出現重複數據和數據丟失,能夠把它設置爲false,而後手動提交偏移量 spring.kafka.consumer.enable-auto-commit=false # key的反序列化方式 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.listener.concurrency=5 spring.kafka.listener.ack-mode=manual_immediate spring.kafka.listener.missing-topics-fatal=false
我這裏採用的就是簡單的StringSerializer和StringDeserializer,若是是傳遞對象,有兩種方式,一種是自定義解碼和編碼器,須要實現Serializer接口,另外一種就是用已有的格式來解碼和編碼,好比json格式來傳遞信息,而後用fastjson等框架來解碼和編碼。
另一點就是消費者的監聽器必需要設置ack-mode,由於上面設置的自動提交的選項設置爲了false,因此須要手動設置提交offset的模式。
@Component @Slf4j public class KafkaProducer { @Autowired private KafkaTemplate<String,Object> kafkaTemplate; public void send(Object o){ String objStr = JSONObject.toJSONString((o)); log.info("sending info:"+objStr); ListenableFuture<SendResult<String,Object>> future= kafkaTemplate.send("test-topic-1",o); future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() { @Override public void onFailure(Throwable throwable) { log.info("test-topic-1發送失敗,"+throwable.getMessage()); } @Override public void onSuccess(SendResult<String, Object> stringObjectSendResult) { log.info("test-topic-1發送成功,"+stringObjectSendResult.toString()); } }); } }
而後簡單寫一個Controller來觸發消息的發送。
@RestController public class KafkaController { @Autowired private KafkaProducer kafkaProducer; @GetMapping("/message/send") public boolean send(){ kafkaProducer.send("this is a test message"); return true; } }
@Component @Slf4j public class KafkaConsumer { @KafkaListener(topics = "test-topic-1",groupId = "test-group-1") public void topic_test(ConsumerRecord<?,?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC)String topic){ Optional message = Optional.ofNullable(record.value()); if(message.isPresent()){ Object msg = message.get(); log.info("消費了: topic:"+topic+",message:"+msg); ack.acknowledge(); } } @KafkaListener(topics = "test-topic-1",groupId = "test-group-2") public void topic_test_1(ConsumerRecord<?,?>record,Acknowledgment ack,@Header(KafkaHeaders.RECEIVED_TOPIC)String topic){ Optional message = Optional.ofNullable(record.value()); if (message.isPresent()) { Object msg = message.get(); log.info("消費了: topic:"+topic+",message:"+msg); ack.acknowledge(); } } }
在啓動這兩個模塊以前,須要確認kafka和Zookeeper都已經啓動。
啓動生產者,控制檯有以下信息:
2020-09-13 21:53:10.892 INFO 17928 --- [nio-8099-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.1 2020-09-13 21:53:10.894 INFO 17928 --- [nio-8099-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 0efa8fb0f4c73d92 2020-09-13 21:53:10.894 INFO 17928 --- [nio-8099-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1600005190890 2020-09-13 21:53:11.125 INFO 17928 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: OtDSNkOFT4eFbSso_V8qAQ 2020-09-13 21:53:11.167 INFO 17928 --- [ad | producer-1] c.e.k.producer.KafkaProducer : test-topic-1發送成功,SendResult [producerRecord=ProducerRecord(topic=test-topic-1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=this is a test message, timestamp=null), recordMetadata=test-topic-1-0@4] 2020-09-13 21:55:34.570 INFO 17928 --- [nio-8099-exec-3] c.e.k.producer.KafkaProducer : sending info:"this is a test message" 2020-09-13 21:55:34.579 INFO 17928 --- [ad | producer-1] c.e.k.producer.KafkaProducer : test-topic-1發送成功,SendResult [producerRecord=ProducerRecord(topic=test-topic-1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=this is a test message, timestamp=null), recordMetadata=test-topic-1-0@5]
啓動消費者,能夠看到控制檯打印了發過來的信息
2020-09-13 21:55:24.077 INFO 13296 --- [ntainer#1-4-C-1] o.s.k.l.KafkaMessageListenerContainer : test-group-2: partitions assigned: [test-topic-1-0] 2020-09-13 21:55:24.077 INFO 13296 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : test-group-1: partitions assigned: [test-topic-1-0] 2020-09-13 21:55:24.114 INFO 13296 --- [ntainer#0-0-C-1] c.e.k.consumer.KafkaConsumer : 消費了: topic:test-topic-1,message:this is a test message 2020-09-13 21:55:24.114 INFO 13296 --- [ntainer#1-4-C-1] c.e.k.consumer.KafkaConsumer : topic_test1 消費了: Topic:test-topic-1,Message:this is a test message 2020-09-13 21:55:34.579 INFO 13296 --- [ntainer#0-0-C-1] c.e.k.consumer.KafkaConsumer : 消費了: topic:test-topic-1,message:this is a test message 2020-09-13 21:55:34.580 INFO 13296 --- [ntainer#1-4-C-1] c.e.k.consumer.KafkaConsumer : topic_test1 消費了: Topic:test-topic-1,Message:this is a test message