kafka-2-集羣部署

kakfa 集羣部署

準備工做

  1. jre 安裝

zookeeper 單機集羣搭建

假設當前目錄爲 /home/zk.配置3節點僞集羣.node

1.下載shell

wget http://mirror.bit.edu.cn/apache/zookeeper/stable/apache-zookeeper-3.5.5-bin.tar.gz

tar zxf apache-zookeeper-3.5.5-bin.tar.gz

mv apache-zookeeper-3.5.5-bin zookeeper

2.建立zookeeper data目錄apache

mkdir /home/zk/tmp/node-0/data -p
mkdir /home/zk/tmp/node-0/datalog -p
mkdir /home/zk/tmp/node-1/data -p
mkdir /home/zk/tmp/node-1/datalog -p
mkdir /home/zk/tmp/node-2/data -p
mkdir /home/zk/tmp/node-2/datalog -p

3.修改配置文件bootstrap

cp zookeeper/conf/zoo_sample.cfg zookeeper/conf/zoo-node-0.cfg

# 修改節點0配置
vi zookeeper/conf/zoo-node-0.cfg
dataDir=/home/zk/tmp/node-0/data
dataLogDir=/home/zk/tmp/node-0/datalog

# server.index = ip:port-原子廣播:port-選舉
server.0=127.0.0.1:2887:3887
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889

# 拷貝節點0配置
cp zookeeper/conf/zoo-node-0.cfg zookeeper/conf/zoo-node-1.cfg
cp zookeeper/conf/zoo-node-0.cfg zookeeper/conf/zoo-node-2.cfg

# 修改節點1配置
vi zookeeper/conf/zoo-node-1.cfg
dataDir=/home/zk/tmp/node-1/data
dataLogDir=/home/zk/tmp/node-1/datalog
clientPort=2182

# 修改節點2配置
vi zookeeper/conf/zoo-node-2.cfg
dataDir=/home/zk/tmp/node-2/data
dataLogDir=/home/zk/tmp/node-2/datalog
clientPort=2183

# 指定節點編號
echo 0 >> tmp/node-0/data/myid
echo 1 >> tmp/node-1/data/myid
echo 2 >> tmp/node-2/data/myid

4.啓動服務器

cd zookeeper
bin/zkServer.sh start conf/zoo-node-0.cfg
bin/zkServer.sh start conf/zoo-node-1.cfg
bin/zkServer.sh start conf/zoo-node-2.cfg

# 查看狀態
bin/zkServer.sh status conf/zoo-node-0.cfg
bin/zkServer.sh status conf/zoo-node-1.cfg
bin/zkServer.sh status conf/zoo-node-2.cfg

kafka 單機集羣搭建

假設當前目錄爲 /home/kfkapp

1.下載socket

wget http://mirror.bit.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz
tar zxf kafka_2.12-2.3.0.tgz
# 或者設置軟連接
mv kafka_2.12-2.3.0 kafka

2.建立日誌目錄ide

cd kafka

mkdir tmp/kafka-logs-0 -p
mkdir tmp/kafka-logs-1 -p
mkdir tmp/kafka-logs-2 -p

3.修改配置文件ui

cp config/server.properties config/server-0.properties

vi config/server-0.properties
broker.id=0
listeners=PLAINTEXT://:9092
log.dirs=/home/kfk/kafka/tmp/kafka-logs-0
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
# end vi

cp config/server-0.properties config/server-1.properties
cp config/server-0.properties config/server-2.properties

vi config/server-1.properties
broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/home/kfk/kafka/tmp/kafka-logs-1
# end vi

vi config/server-2.properties
broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/home/kfk/kafka/tmp/kafka-logs-2
# end vi

4.啓動this

nohup bin/kafka-server-start.sh config/server-0.properties 1>/dev/null 2>&1 &
nohup bin/kafka-server-start.sh config/server-1.properties 1>/dev/null 2>&1 &
nohup bin/kafka-server-start.sh config/server-2.properties 1>/dev/null 2>&1 &

經常使用配置說明

  1. broker 配置

    https://kafka.apache.org/documentation/#configuration

    https://blog.csdn.net/memoordit/article/details/78850086

    必要的配置

    • broker.id
    • log.dirs
    • zookeeper.connect
    ########################## Server Basics ###########################
      ## 服務節點id,整數 >=-1,集羣內惟一
      broker.id=0
    
    #################### Socket Server Settings #########################
      ## socket 服務器監聽地址
      listeners=PLAINTEXT://:9092
    
    ########################## Log Basics ################################
      ## 存儲日誌文件的目錄列表,逗號分隔
      log.dirs=/home/yukl/2_opensys/kafka/tmp/kafka-logs-0
    
      ## 每一個topic的默認分區數
      num.partitions=1
    
    ##################### Internal Topic Settings  #######################
      ## offset topic 的複製因子
      offsets.topic.replication.factor=1
      ## 事務 topic 的複製因子
      transaction.state.log.replication.factor=1
      ## 事務 topic 的min.insync.replicas 配置
      transaction.state.log.min.isr=1
    
    ####################### Log Flush Policy ############################
      ## 消息數達到目標刷新到磁盤
      #log.flush.interval.messages=10000
    
      ## 時間間隔達到目標刷新到磁盤
      #log.flush.interval.ms=1000
    
    ####################### Log Retention Policy #########################
      ## 日誌文件被刪除前的保存時間(單位小時)
      log.retention.hours=168
    
      ## 優先使用minutes配置,不然使用ms,若是這兩項都沒設置,則使用hours
      #log.retention.minutes
      #log.retention.ms
    
      ## 日誌達到刪除大小的閾值。每一個topic下每一個分區保存數據的最大文件大小;注意,這是每一個分區的上限,所以這個數值乘以分區的個數就是每一個topic保存的數據總量。同時注意:若是log.retention.hours和log.retention.bytes都設置了,則超過了任何一個限制都會形成刪除一個段文件。注意,這項設置能夠由每一個topic設置時進行覆蓋。-1爲不限制。
      #log.retention.bytes=1073741824
    
      ## topic 分區的日誌存放在某個目錄下諸多文件中,這些文件將partition的日誌切分紅一段一段的,這就是段文件(segment file);一個topic的一個分區對應的全部segment文件稱爲log。這個設置控制着一個segment文件的最大的大小,若是超過了此大小,就會生成一個新的segment文件。此配置能夠被覆蓋。 int
      log.segment.bytes=1073741824
    
      ## 檢查日誌段文件的間隔時間,以肯定是否文件屬性是否到達刪除要求。
      log.retention.check.interval.ms=300000
    
    ############################# Zookeeper #############################
      ## zookeeper 主機地址
      zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
    
      ## 與zookeeper創建鏈接的超時時間(ms)
      zookeeper.connection.timeout.ms=6000
    
    ################### Group Coordinator Settings########################
    
    # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
    
    # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
    
    # The default value for this is 3 seconds.
    
    # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
    
    # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
    
     group.initial.rebalance.delay.ms=0
    
    ## 消息中的時間戳取值爲建立時間或者日誌追加時間
    
    ## 取值 CreateTime 或者 LogAppendTime
    
    ## 默認 CreateTime
    
     log.message.timestamp.type=CreateTime
  2. producer 配置

    https://kafka.apache.org/docu...

    name desc type default valid values
    acks 生產者的請求反饋方式:0-不等待,僅僅將消息發送至緩衝區即返回,此模式下retries配置無效;1-等待leader寫成功即返回;all或者-1 表示等待全部的follower寫成功才返回。 string 1 [all, -1, 0, 1]
    bootstrap.servers 初始鏈接服務列表(host:port,host:port),是kafka集羣的一個子集,用於發現全部的kafak集羣 list "" non-null string
    compression.type 消息壓縮模式,支持none, gzip, snappy, lz4, or zstd. string none
    retries 設置一個比零大的值,客戶端若是發送失敗則會從新發送。注意,這個重試功能和客戶端在接到錯誤以後從新發送沒什麼不一樣。若是max.in.flight.requests.per.connection沒有設置爲1,有可能改變消息發送的順序,由於若是2個批次發送到一個分區中,並第一個失敗了並重試,可是第二個成功了,那麼第二個批次將超過第一個 int 2147483647 [0,...,2147483647]
    client.id 一個字符串,在請求時傳遞給服務器。用於追蹤請求源 string ""
    connections.max.idle.ms 鏈接閒置時間 long 540000
    delivery.timeout.ms 發送消息上報成功或失敗的最大時間.注意,若是此時間在retries耗盡以前達到,則斷定請求失敗 int ""
  3. consumer 配置

    https://kafka.apache.org/docu...

    name desc type default valid values
    bootstrap.servers 同producer
    group.id consumer group惟一標識 string
    auto.offset.reset earliest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費 <br/>latest :當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據 <br/>none :topic各分區都存在已提交的offset時,從offset後開始消費;只要有一個分區不存在已提交的offset,則拋出異常 string latest [earliest,<br/>latest,<br/>none]
    connections.max.idle.ms 同上
    client.id 同上
    enable.auto.commit 若是爲true,消費者的offset後臺會按期自動提交 boolean true
    auto.commit.interval.ms 消費者自動提交offset的週期(ms) int 5000
相關文章
相關標籤/搜索