Kafka 學習筆記歸整

Kafka 的安裝和配置

0 安裝

略,到官網下載便可。注意 Kafka 還須要 Zookeeper 支持。java

Kafka 版本 : 
kafka_2.13-2.4.0

Zookeeper 版本 : 
Zookeeper-3.5.4-beta

jdk 版本 : 
openjdk 8

1 Kafka 配置

Kafka 的主要配置文件是 /config/server.properties。spring

## 實例 id,同一集羣中的全部實例的 id 不可相同
broker.id = 0

## kafka 服務監聽的地址和端口
## 設置了 advertised.listeners 以後能夠不設置客戶端的 hosts 文件,緣由未知
listener = PLAINTEXT://xxxxxxx:9092
advertised.listeners = PLAINTEXT://xxxxxxx:9092

##### 通用配置 #####

## 處理磁盤 io 的線程數
num.io.threads = 8

## 處理網絡 io 的線程數
num.network.threads = 3

## 用來處理後臺任務的線程數,例如過時消息文件的刪除等
backgroud.threads = 4

## kafka 啓動時回覆數據和關閉時保存數據到磁盤時使用的線程數
num.recovery.threads.per.data.dir = 10

## 消息發送的緩存區大小
socket.send.buffer.bytes = 102400

## 消息接收的緩存區大小
socket.receive.buffer.bytes = 102400

## socket 請求的最大緩存值
socket.request.max.bytes = 10240000

## 再平衡延遲時間
## 再平衡的意義是當 consumer 發生上下線的時候,會從新分配 partition 的消費權
## 延遲的好處是若是 consumer 在一段時間內集中上下線,能夠在延遲時間以後一次性處理
## 若是發生一次變更就處理一次,效率會較低
## 默認爲 0 ms,即爲不延遲
group.initial.rebalance.delay.ms = 10


##### 數據日誌配置 #####

## kafka 日誌數據存儲目錄,用逗號分割能夠指定多個
# log.dirs = /tmp/kafka-logs-1,/tmp/kafka-logs-2
log.dirs = /tmp/kafka-logs


## 刷新日誌到磁盤的閾值
# 根據消息數量作刷新,默認使用該策略
# log.flush.interval.messages = 10000
# 根據時間作刷新,單位 毫秒,默認不開啓
log.flush.interval.ms = 1000

## 檢查刷新機制的時間間隔
## 該參數的意義是每隔必定時間檢查一次是否到達 flush 的設置閾值
log.flush.scheduler.interval.ms = 3000

## 記錄上次固化數據到硬盤的時間點,主要用於數據恢復
## 默認值 60000
log.flush.offset.checkpoint.interval.ms = 60000


## 日誌的存儲時間,能夠以小時單位,也能夠設置爲分鐘或者毫秒
## 當超過這個時間,就會執行日誌清除
# log.retention.minutes = 120
# log.retention.ms = 120
# 默認使用小時,值爲 168
log.retention.hours = 2

## 每一個 partition 的存儲大小,若是超過了就會執行日誌清除
## -1 表明不限制,默認 1073741824
log.retention.bytes = 1073741824

## 日誌大小的檢查週期,若是已經到達了大小,就觸發文件刪除
log.retention.check.interval.ms = 300000


## 指定日誌每隔多久檢查一次是否能夠被刪除,默認爲 1 min
log.cleanup.internal.mins = 1

## 日誌清除的策略,默認爲 delete
## 若是要使用日誌壓縮,就須要讓策略包含 compact
## 須要注意的是,若是開啓了 compact 策略,則客戶端提交的消息的 key 不容許爲 null,不然提交報錯
# log.cleanup.policy = delete
log.cleanup.policy = delete


## 是否開啓日誌壓縮
## 默認開啓,可是隻有在日誌清除策略包含 compact 的時候日誌壓縮纔會生效
## 日誌壓縮的邏輯是對 key 進行整合,對相同 key 的不一樣 value 值只保存最後一個版本
log.cleaner.enable = true

## 開啓壓縮的狀況才生效,日誌壓縮運行的線程數
log.cleaner.threads = 8

## 日誌壓縮去重的緩存內存,內存越大效率越好
## 單位 byte,默認 524288 byte
log.cleaner.io.buffer.size = 524288

## 日誌清理的頻率,越大就越高效,可是內存消耗會更大
log.cleaner.min.cleanable.ratio = 0.7


## 單個日誌文件的大小,默認 1073741824
log.segment.bytes = 1073741824

## 日誌被真正清除的時間
## 日誌過了保存時間以後,只是被邏輯性刪除,沒法被索引到,可是沒有真的從磁盤中被刪除
## 此參數用於設置在被標註爲邏輯刪除後的日誌被真正刪除的時間
log.segment.delete.delay.ms = 60000


##### Topic 配置 #####

## 是否容許自動建立 topic
## 若爲 false,則只能經過命令建立 topic
## 默認 true
auto.create.topics.enable = true

## 每一個 topic 的默認分區 partition 個數,在 topic 建立的時候能夠指定,若是不指定就使用該參數
## partition 數量直接影響了可以容納的 cosumer 數量
num.partitions = 1

## topic partition 的副本數,副本越多,越不容易由於個別 broker 的問題而丟失數據
## 副本越多,可用性越高,可是每次數據寫入以後同步花費的時間更多
offsets.topic.replication.factor = 1
transaction.state.log.replication.factor = 1
transaction.state.log.min.isr = 1



##### Zookeeper 配置 #####

## Zookeeper 地址,用逗號分割能夠指定多個
# zookeeper.connect = localhost:2181,localhost:2182
zookeeper.connect = localhost:2101
## Zookeeper 集羣的超時時間
zookeeper.connection.timeout.ms = 6000

2 建立 topic

上述配置中設置了自動建立 topic,可是也能夠手工建立 :apache

./bin/kafka-topics.sh --create \
--zookeeper localhost:2101 \
--replication-factor 1 \
--partitions 1 \
--topic test1

查看 topic :bootstrap

./bin/kafka-topics.sh --zookeeper localhost:2101 --list

3 啓動 Kafka

./bin/kafka-server-start.sh -daemon ./config/server.properties &

SpringBoot Kafka 配置代碼

1 pom

spring boot 版本 :緩存

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.2.RELEASE</version>
    <relativePath/>
</parent>

引入 jar 包 :bash

<!-- kafka 必須的包 -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.4.1.RELEASE</version>
    <exclusions>
        <!-- 若是已經在別處引入 spring-boot-starter,此處能夠排除 spring 相關的包 -->
        <exclusion>
            <groupId>org.springframework</groupId>
            <artifactId>spring-*</artifactId>
        </exclusion>
    </exclusions>
</dependency>

2 yaml 配置

spring:
  kafka:

    ## 生產者配置,若是本實例只是消費者,能夠不配置該部分
    producer:
      # client id,隨意配置,不可重複
      client-id: boot-producer
      # kafka 服務地址,pi + 端口
      bootstrap-servers: aliyun-ecs:9092
      # 用於序列化的工具類
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 消息發送失敗狀況下的重試次數
      retries: 1
      # 批量上傳的 buffer size,能夠是消息數量,也能夠是內存量
      batch-size: 10000
      buffer-memory: 300000
      # 等待副本同步以後才確認消息發送成功,可選的值有 0,1,-1,all 等
      # 設置爲 0 的意思是不等待任何副本同步完成就直接返回
      # 設置爲 1 的意思是隻等待 leader 同步完成
      # all 的意思是所有同步完才確認,可是速度會比較慢
      acks: 1

    ## 消費者配置,若是本實例只是生產者,能夠不配置該部分
    consumer:
      # client id,隨意配置,不可重複
      client-id: boot-consumer
      # 消費者分組 id,同一組別的不一樣消費者共同消費一份數據
      group-id: consumer-group-1
      # kafka 服務地址,pi + 端口
      bootstrap-servers: aliyun-ecs:9092
      # 用於反序列化的工具類
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 自動更新 offset
      enable-auto-commit: true
      # 若是 enable-auto-commit 設置爲 true,則每隔一段時間提交一次 offset
      # 時間單位爲毫秒,默認值 5000 (5s)
      auto-commit-interval: 1000
      # offset 消費指針
      # earliest 表明從頭開始消費,lastest 表明重新產生的部分開始消費
      auto-offset-reset: earliest

3 代碼

生產者配套代碼:網絡

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;

/**
 * kafka mq 生產者包裝類
 */
@Component
public class MQProductor {

    @Resource
    KafkaTemplate<String,String> kt;

    /**
     * 發送消息的方法
     * @param topic  建立的 topic
     * @param partition  topic 分片編號,從 0 開始
     * @param key  消息 key,主要用來分片和做爲壓縮憑據,能夠重複,能夠爲空
     * @param message  消息主體
     */
    public void send(String topic,Integer partition,String key,String message) {
        kt.send(topic,partition,key,message);
    }

    public void send(String message) {
        send("test-topic",0,"",message);
    }
}

消費者配套代碼:socket

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * kafka mq 消費者包裝類
 */
@Component
public class MQListener {

    /**
     * 監聽方法,能夠一次性監聽多個 topic
     * @param cr  kafka 返回的消息包裝類
     */
    @KafkaListener(topics = {"test-topic" /*,"test-topic-2"*/ })
    public void consume(ConsumerRecord<String,String> cr) {
        // value
        String value = cr.value();
        System.out.println(value);
        // key
        String key = cr.key();
        System.out.println(key);
        // 讀取指針
        long offset = cr.offset();
        System.out.println(offset);
        // 讀取的分區編號
        int partition = cr.partition();
        System.out.println(partition);
        // topic
        String topic = cr.topic();
        System.out.println(topic);
    }
}
相關文章
相關標籤/搜索