略,到官網下載便可。注意 Kafka 還須要 Zookeeper 支持。java
Kafka 版本 : kafka_2.13-2.4.0 Zookeeper 版本 : Zookeeper-3.5.4-beta jdk 版本 : openjdk 8
Kafka 的主要配置文件是 /config/server.properties。spring
## 實例 id,同一集羣中的全部實例的 id 不可相同 broker.id = 0 ## kafka 服務監聽的地址和端口 ## 設置了 advertised.listeners 以後能夠不設置客戶端的 hosts 文件,緣由未知 listener = PLAINTEXT://xxxxxxx:9092 advertised.listeners = PLAINTEXT://xxxxxxx:9092 ##### 通用配置 ##### ## 處理磁盤 io 的線程數 num.io.threads = 8 ## 處理網絡 io 的線程數 num.network.threads = 3 ## 用來處理後臺任務的線程數,例如過時消息文件的刪除等 backgroud.threads = 4 ## kafka 啓動時回覆數據和關閉時保存數據到磁盤時使用的線程數 num.recovery.threads.per.data.dir = 10 ## 消息發送的緩存區大小 socket.send.buffer.bytes = 102400 ## 消息接收的緩存區大小 socket.receive.buffer.bytes = 102400 ## socket 請求的最大緩存值 socket.request.max.bytes = 10240000 ## 再平衡延遲時間 ## 再平衡的意義是當 consumer 發生上下線的時候,會從新分配 partition 的消費權 ## 延遲的好處是若是 consumer 在一段時間內集中上下線,能夠在延遲時間以後一次性處理 ## 若是發生一次變更就處理一次,效率會較低 ## 默認爲 0 ms,即爲不延遲 group.initial.rebalance.delay.ms = 10 ##### 數據日誌配置 ##### ## kafka 日誌數據存儲目錄,用逗號分割能夠指定多個 # log.dirs = /tmp/kafka-logs-1,/tmp/kafka-logs-2 log.dirs = /tmp/kafka-logs ## 刷新日誌到磁盤的閾值 # 根據消息數量作刷新,默認使用該策略 # log.flush.interval.messages = 10000 # 根據時間作刷新,單位 毫秒,默認不開啓 log.flush.interval.ms = 1000 ## 檢查刷新機制的時間間隔 ## 該參數的意義是每隔必定時間檢查一次是否到達 flush 的設置閾值 log.flush.scheduler.interval.ms = 3000 ## 記錄上次固化數據到硬盤的時間點,主要用於數據恢復 ## 默認值 60000 log.flush.offset.checkpoint.interval.ms = 60000 ## 日誌的存儲時間,能夠以小時單位,也能夠設置爲分鐘或者毫秒 ## 當超過這個時間,就會執行日誌清除 # log.retention.minutes = 120 # log.retention.ms = 120 # 默認使用小時,值爲 168 log.retention.hours = 2 ## 每一個 partition 的存儲大小,若是超過了就會執行日誌清除 ## -1 表明不限制,默認 1073741824 log.retention.bytes = 1073741824 ## 日誌大小的檢查週期,若是已經到達了大小,就觸發文件刪除 log.retention.check.interval.ms = 300000 ## 指定日誌每隔多久檢查一次是否能夠被刪除,默認爲 1 min log.cleanup.internal.mins = 1 ## 日誌清除的策略,默認爲 delete ## 若是要使用日誌壓縮,就須要讓策略包含 compact ## 須要注意的是,若是開啓了 compact 策略,則客戶端提交的消息的 key 不容許爲 null,不然提交報錯 # log.cleanup.policy = delete log.cleanup.policy = delete ## 是否開啓日誌壓縮 ## 默認開啓,可是隻有在日誌清除策略包含 compact 的時候日誌壓縮纔會生效 ## 日誌壓縮的邏輯是對 key 進行整合,對相同 key 的不一樣 value 值只保存最後一個版本 log.cleaner.enable = true ## 開啓壓縮的狀況才生效,日誌壓縮運行的線程數 log.cleaner.threads = 8 ## 日誌壓縮去重的緩存內存,內存越大效率越好 ## 單位 byte,默認 524288 byte log.cleaner.io.buffer.size = 524288 ## 日誌清理的頻率,越大就越高效,可是內存消耗會更大 log.cleaner.min.cleanable.ratio = 0.7 ## 單個日誌文件的大小,默認 1073741824 log.segment.bytes = 1073741824 ## 日誌被真正清除的時間 ## 日誌過了保存時間以後,只是被邏輯性刪除,沒法被索引到,可是沒有真的從磁盤中被刪除 ## 此參數用於設置在被標註爲邏輯刪除後的日誌被真正刪除的時間 log.segment.delete.delay.ms = 60000 ##### Topic 配置 ##### ## 是否容許自動建立 topic ## 若爲 false,則只能經過命令建立 topic ## 默認 true auto.create.topics.enable = true ## 每一個 topic 的默認分區 partition 個數,在 topic 建立的時候能夠指定,若是不指定就使用該參數 ## partition 數量直接影響了可以容納的 cosumer 數量 num.partitions = 1 ## topic partition 的副本數,副本越多,越不容易由於個別 broker 的問題而丟失數據 ## 副本越多,可用性越高,可是每次數據寫入以後同步花費的時間更多 offsets.topic.replication.factor = 1 transaction.state.log.replication.factor = 1 transaction.state.log.min.isr = 1 ##### Zookeeper 配置 ##### ## Zookeeper 地址,用逗號分割能夠指定多個 # zookeeper.connect = localhost:2181,localhost:2182 zookeeper.connect = localhost:2101 ## Zookeeper 集羣的超時時間 zookeeper.connection.timeout.ms = 6000
上述配置中設置了自動建立 topic,可是也能夠手工建立 :apache
./bin/kafka-topics.sh --create \ --zookeeper localhost:2101 \ --replication-factor 1 \ --partitions 1 \ --topic test1
查看 topic :bootstrap
./bin/kafka-topics.sh --zookeeper localhost:2101 --list
./bin/kafka-server-start.sh -daemon ./config/server.properties &
spring boot 版本 :緩存
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.2.RELEASE</version> <relativePath/> </parent>
引入 jar 包 :bash
<!-- kafka 必須的包 --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.4.1.RELEASE</version> <exclusions> <!-- 若是已經在別處引入 spring-boot-starter,此處能夠排除 spring 相關的包 --> <exclusion> <groupId>org.springframework</groupId> <artifactId>spring-*</artifactId> </exclusion> </exclusions> </dependency>
spring: kafka: ## 生產者配置,若是本實例只是消費者,能夠不配置該部分 producer: # client id,隨意配置,不可重複 client-id: boot-producer # kafka 服務地址,pi + 端口 bootstrap-servers: aliyun-ecs:9092 # 用於序列化的工具類 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer # 消息發送失敗狀況下的重試次數 retries: 1 # 批量上傳的 buffer size,能夠是消息數量,也能夠是內存量 batch-size: 10000 buffer-memory: 300000 # 等待副本同步以後才確認消息發送成功,可選的值有 0,1,-1,all 等 # 設置爲 0 的意思是不等待任何副本同步完成就直接返回 # 設置爲 1 的意思是隻等待 leader 同步完成 # all 的意思是所有同步完才確認,可是速度會比較慢 acks: 1 ## 消費者配置,若是本實例只是生產者,能夠不配置該部分 consumer: # client id,隨意配置,不可重複 client-id: boot-consumer # 消費者分組 id,同一組別的不一樣消費者共同消費一份數據 group-id: consumer-group-1 # kafka 服務地址,pi + 端口 bootstrap-servers: aliyun-ecs:9092 # 用於反序列化的工具類 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 自動更新 offset enable-auto-commit: true # 若是 enable-auto-commit 設置爲 true,則每隔一段時間提交一次 offset # 時間單位爲毫秒,默認值 5000 (5s) auto-commit-interval: 1000 # offset 消費指針 # earliest 表明從頭開始消費,lastest 表明重新產生的部分開始消費 auto-offset-reset: earliest
生產者配套代碼:網絡
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * kafka mq 生產者包裝類 */ @Component public class MQProductor { @Resource KafkaTemplate<String,String> kt; /** * 發送消息的方法 * @param topic 建立的 topic * @param partition topic 分片編號,從 0 開始 * @param key 消息 key,主要用來分片和做爲壓縮憑據,能夠重複,能夠爲空 * @param message 消息主體 */ public void send(String topic,Integer partition,String key,String message) { kt.send(topic,partition,key,message); } public void send(String message) { send("test-topic",0,"",message); } }
消費者配套代碼:socket
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; /** * kafka mq 消費者包裝類 */ @Component public class MQListener { /** * 監聽方法,能夠一次性監聽多個 topic * @param cr kafka 返回的消息包裝類 */ @KafkaListener(topics = {"test-topic" /*,"test-topic-2"*/ }) public void consume(ConsumerRecord<String,String> cr) { // value String value = cr.value(); System.out.println(value); // key String key = cr.key(); System.out.println(key); // 讀取指針 long offset = cr.offset(); System.out.println(offset); // 讀取的分區編號 int partition = cr.partition(); System.out.println(partition); // topic String topic = cr.topic(); System.out.println(topic); } }