Kafka部署篇

安裝

下載與安裝

kafka下載地址:https://kafka.apache.org/downloads
須要說明的是,kafka的安裝依賴於zk,zk的部署可直接參考《Zookeeper介紹與基本部署》。固然,kafka默認也內置了zk的啓動腳本,在kafka安裝路徑的bin目錄下,名稱爲zookeeper-server-start.sh,若是不想獨立安裝zk,可直接使用該腳本。java

wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz
tar xf kafka_2.12-2.2.0.tgz -C /usr/local/
cd /usr/local
ln -s kafka_2.12-2.2.0 kafka

配置

kafka主配置文件爲/usr/local/kafka/config/server.properties,配置示例以下:apache

broker.id=0
listeners=PLAINTEXT://10.1.60.29:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/logs
num.partitions=3
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.1.60.29:2181,10.1.61.195:2181,10.1.61.27:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
auto.create.topics.enable=true
delete.topics.enable=true

配置說明:json

  • broker.id:每一個broker在集羣中的惟一標識,正整數。當該服務器的ip地址發生變動,但broker.id未變,則不會影響consumers的消費狀況
  • listeners:kafka的監聽地址與端口,在實際測試中若是寫0.0.0.0會報錯。
  • num.network.threads:kafka用於處理網絡請求的線程數
  • num.io.threads:kafka用於處理磁盤io的線程數
  • socket.send.buffer.bytes:發送數據的緩衝區
  • socket.receive.buffer.bytes:接收數據的緩衝區
  • socket.request.max.bytes:容許接收的最大數據包的大小(防止數據包過大致使OOM)
  • log.dirs:kakfa用於保存數據的目錄,全部的消息都會存儲在該目錄當中。能夠經過逗號來指定多個路徑,kafka會根據最少被使用的原則選擇目錄分配新的partition。須要說明的是,kafka在分配partition的時候選擇的原則不是按照磁盤空間大小來定的,而是根據分配的partition的個數多少而定
  • num.partitions:設置新建立的topic的默認分區數
  • number.recovery.threads.per.data.dir:用於恢復每一個數據目錄時啓動的線程數
  • log.retention.hours:配置kafka中消息保存的時間,還支持log.retention.minutes和log.retention.ms。若是多個同時設置會選擇時間最短的配置,默認爲7天。
  • log.retention.check.interval.ms:用於檢測數據過時的週期
  • log.segment.bytes:配置partition中每一個segment數據文件的大小。默認爲1GB。超出該大小後,會自動建立一個新的segment文件。
  • zookeeper.connect:指定鏈接的zk的地址,zk中存儲了broker的元數據信息。能夠經過逗號來設置多個值。格式爲:hostname:port/path。hostname爲zk的主機名或ip,port爲zk監聽的端口。/path表示kafka的元數據存儲到zk上的目錄,若是不設置,默認爲根目錄
  • zookeeper.connection.timeout:kafka鏈接zk的超時時間
  • group.initial.rebalance.delay.ms:在實際環境當中,當將多個consumer加入到一個空的consumer group中時,每加入一個consumer就會觸發一次對partition消費的重平衡,若是加入100個,就得重平衡100次,這個過程就會變得很是耗時。經過設置該參數,能夠延遲重平衡的時間,好比有100個consumer會在10s內所有加入到一個consumer group中,就能夠將該值設置爲10s,10s以後,只須要作一次重平衡便可。默認爲0則表明不開啓該特性。
  • auto.create.topics.enable:當有producer向一個不存在的topic中寫入消息時,是否自動建立該topic
  • delete.topics.enable:kafka提供了刪除topic的功能,但默認並不會直接將topic數據物理刪除。若是要從物理上刪除(刪除topic後,數據文件也一併刪除),則須要將此項設置爲true

須要說明的是,多個kafka節點依賴zk實現集羣,因此各節點並不須要做特殊配置,只須要broker.id不一樣,並接入到同一個zk集羣便可。bootstrap

啓停操做

#啓動
/usr/local/kafka/bin/kafka-server-start.sh  -daemon /usr/local/kafka/config/server.properties 

#檢查java進程

# jps
1394 QuorumPeerMain
13586 Logstash
27591 Kafka
27693 Jps

#中止
/usr/local/kafka/bin/kafka-server-start.sh

驗證

能夠經過zookeeper查看kafka的元數據信息:服務器

#經過zk客戶端鏈接zookeeper

../zookeeper/bin/zkCli.sh 

#查看根下多了不少目錄
[zk: localhost:2181(CONNECTED) 1] ls /
[cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]

#查看/brokers/ids,能夠看到有三個broker已經加入
[zk: localhost:2181(CONNECTED) 8] ls /brokers/ids
[0, 1, 2]

#查看/brokers/topics,目前爲空,說明尚未建立任何的topic
[zk: localhost:2181(CONNECTED) 3] ls /brokers/topics
[]

基本操做

建立topic

上面完成了kafka的部署,經過驗證部署咱們發現當前沒有topic,因此建立一個topic以下:網絡

# ./bin/kafka-topics.sh --create --zookeeper localhost:2181  --replication-factor 2 --partitions 3 --topic myfirsttopic 
Created topic myfirsttopic.

參數說明:socket

  • --create:建立一個topic
  • --zookeeper: 指定zookeeper集羣的主機列表,多個server可以使用逗號分隔,這裏由於kafka和zk是在同一個server,因此直接鏈接了本機的2181端口
  • --replication-factor:指定建立這個topic的副本數
  • --partitions:指定該topic的分區數
  • --topic:指定topic的名稱

列出現有的topic

上面經過操做zk就能夠看到topic相關信息,接下來咱們直接經過kafka命令行來進行相關操做:測試

# ./bin/kafka-topics.sh --zookeeper localhost:2181 --list                                                               
myfirsttopic

查看topic的詳細信息

#查看myfirsttopic的詳細信息
# ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic myfirsttopic
Topic:myfirsttopic      PartitionCount:3        ReplicationFactor:2     Configs:
        Topic: myfirsttopic     Partition: 0    Leader: 0       Replicas: 0,2   Isr: 0
        Topic: myfirsttopic     Partition: 1    Leader: 1       Replicas: 1,0   Isr: 1,0
        Topic: myfirsttopic     Partition: 2    Leader: 2       Replicas: 2,1   Isr: 2,1

參數說明:大數據

  • --describe:查看topic的詳細信息

輸出說明:

  • leader:當前負責讀寫的leader broker
  • replicas:當前分區全部的副本對應的broker列表
  • isr:處於活動狀態的broker

增長topic的partition數量

# ./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --partitions 6 --topic myfirsttopic
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

# ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic myfirsttopic            
Topic:myfirsttopic      PartitionCount:6        ReplicationFactor:2     Configs:
        Topic: myfirsttopic     Partition: 0    Leader: 0       Replicas: 0,2   Isr: 0
        Topic: myfirsttopic     Partition: 1    Leader: 1       Replicas: 1,0   Isr: 1,0
        Topic: myfirsttopic     Partition: 2    Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: myfirsttopic     Partition: 3    Leader: 0       Replicas: 0,2   Isr: 0,2
        Topic: myfirsttopic     Partition: 4    Leader: 1       Replicas: 1,0   Isr: 1,0
        Topic: myfirsttopic     Partition: 5    Leader: 2       Replicas: 2,1   Isr: 2,1

修改一個topic的副本數

#建立一個topic名爲mysecondtopic,指定分區爲2,副本爲1
# ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 2 --topic mysecondtopic
Created topic mysecondtopic.

#查看新建立的topic詳細信息
# ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic mysecondtopic
Topic:mysecondtopic     PartitionCount:2        ReplicationFactor:1     Configs:
        Topic: mysecondtopic    Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: mysecondtopic    Partition: 1    Leader: 1       Replicas: 1     Isr: 1

#將broker.id爲0上的partition的副本由原來的[0]擴充爲[0,2],將broker.id爲1上的partition的副本由原來的[1]擴充爲[1,2]。
#須要先建立一個json文件以下:
# cat partitions-to-move.json
{
    "partitions":
    [
        {
            "topic":"mysecondtopic",
            "partition": 0,
            "replicas": [0,2]
        },
        {
            "topic": "mysecondtopic",
            "partition": 1,
            "replicas": [1,2]
        }
    ],
    "version": 1
}

#執行副本修改
# ./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ./partitions-to-move.json --execute
Current partition replica assignment

{"version":1,"partitions":[{"topic":"mysecondtopic","partition":1,"replicas":[1],"log_dirs":["any"]},{"topic":"mysecondtopic","partition":0,"replicas":[0],"log_dirs":["any"]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.

#再次查看topic狀態,發現副本數由按照預期發生變動
# ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic mysecondtopic
Topic:mysecondtopic     PartitionCount:2        ReplicationFactor:2     Configs:
        Topic: mysecondtopic    Partition: 0    Leader: 0       Replicas: 0,2   Isr: 0
        Topic: mysecondtopic    Partition: 1    Leader: 1       Replicas: 1,2   Isr: 1

刪除一個topic

#執行刪除操做
# ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myfirsttopic
Topic myfirsttopic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

#查看topic,能夠看到myfirsttopic已被刪除
# ./bin/kafka-topics.sh --zookeeper localhost:2181 --list
__consumer_offsets
mysecondtopic

經過producer生產消息

# ./bin/kafka-console-producer.sh  --broker-list 10.1.60.29:9092 --topic mysecondtopic                                                
>hello kafka!
>hello world!
>just a test!
>
>hi world!
>hahahaha!
>

經過consumer消費消息

# ./bin/kafka-console-consumer.sh --bootstrap-server 10.1.60.29:9092 --topic mysecondtopic --from-beginning       
hello kafka!
just a test!
hi world!
hello world!

hahahaha!
相關文章
相關標籤/搜索