kafka 集羣環境搭建與管理

這一節主要講kafka 集羣環境部署,apache

kafka 基礎概念介紹與強化

1)Producer:消息生產者,就是向kafkabroker發消息的客戶端;
2)Consumer:消息消費者,向kafkabroker取消息的客戶端;
3)Topic:能夠理解爲一個隊列;
4)Consumer Group(CG):這是kafka用來實現一個topic消息的廣播(發給全部的consumer)
和單播(發給任意一個consumer)的手段。一個topic能夠有多個CG。topic的消息會複製
(不是真的複製,是概念上的)到全部的CG,但每一個partion只會把消息發給該CG中的一
個consumer。若是須要實現廣播,只要每一個consumer有一個獨立的CG就能夠了。要實現
單播只要全部的consumer在同一個CG。用CG還能夠將consumer進行自由的分組而不需
要屢次發送消息到不一樣的topic;bootstrap

5)Broker:一臺kafka服務器就是一個broker。一個集羣由多個broker組成。一個broker
能夠容納多個topic;服務器

6)Partition:爲了實現擴展性,一個很是大的topic能夠分佈到多個broker(即服務器)上,
一個topic能夠分爲多個partition,每一個partition是一個有序的隊列。partition中的每條消息
都會被分配一個有序的id(offset)。kafka只保證按一個partition中的順序將消息發給
consumer,不保證一個topic的總體(多個partition間)的順序;curl

分區數越多,在必定程度上會提高消息處理的吞吐量,由於kafka是基於文件進行讀寫,所以也須要打開更多的文件句柄,也會增長必定的性能開銷。
若是分區過多,那麼日誌分段也會不少,寫的時候因爲是批量寫,其實就會變成隨機寫了,隨機 I/O 這個時候對性能影響很大。因此通常來講 Kafka 不能有太多的 Partition。socket

7)replication-factor
  用來設置主題的副本數。每一個主題能夠有多個副本,副本位於集羣中不一樣的broker上,也就是說副本的數量不能超過broker的數量,不然建立主題時會失敗。ide

在建立Topic的時候,有兩個參數是須要填寫的,那就是partions和replication-factor。

8)Offset:kafka的存儲文件都是按照offset.kafka來命名,用offset作名字的好處是方便查

找。例如你想找位於2049的位置,只要找到2048.kafka的文件便可。固然thefirstoffset就
是00000000000.kafka。性能

集羣環境搭建

  • zookeeper 集羣環境搭建
  1. 下載軟件包

wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gzfetch

  1. 解壓縮url

  2. 建立配置目錄,修改文件

[root@es7-1-80 conf]# cat zoo.cfg
initLimit=10
syncLimit=5
dataDir=/data/zk/data
clientPort=2181
maxClientCnxns=0
server.1=172.16.0.80:2888:3888
server.2=172.16.0.91:2888:3888
server.3=172.16.0.92:2888:3888線程

echo 1 >> =/data/zk/data/myid

  1. 啓動,製做標準啓動服務

    cat /etc/systemd/system/zookeeper.service
    [Unit]
    Description=zookeeper.service
    After=network.target

[Service]
Type=forking
ExecStart=/usr/local/zookeeper/bin/zkServer.sh start
ExecStop=/usr/local/zookeeper/bin/zkServer.sh stop
ExecReload=/usr/local/zookeeper/bin/zkServer.sh restart
[Install]
WantedBy=multi-user.target

其餘節點按照上述方式配置,並依次啓動

集羣狀態查看

/usr/local/zookeeper/bin/zkServer.sh status

  • kafka 集羣搭建
  1. 下載軟件包

curl -LO https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.1.0/kafka_2.12-2.1.0.tgz

  1. 解壓縮到所需位置在

  2. 建立配置目錄,修改文件

broker.id=0 #
num.inetwork.threads=3 #broker 處理消息的最大線程數,通常狀況下不須要去修改
num.io.threads=8 # # broker處理磁盤IO 的線程數 ,數值應該大於你的硬盤數
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka/kafka_logs

num.partitions=1 # 每一個topic的分區個數,如果在topic建立時候沒有指定的話 會被topic建立時的指定參數覆蓋
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=172.16.0.91:2181,172.16.0.80:2181,172.16.0.92:2181
zookeeper.connection.timeout.ms=6000

  1. 啓動,製做標準啓動服務

[Unit]
Description=Apache Kafka server (broker)
After=network.target
After=syslog.target
After=zookeeper.target

[Service]
Type=forking
User=root
Group=root
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh

ExecReload=/bin/kill -HUP $MAINPID
KillMode=none

Restart=on-failure
RestartSec=5s

[Install]
WantedBy=multi-user.target

五、將kafka 命令加入到環境變量中

export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin

其餘節點按照上述方式部署,修改broker.id

配置Kafka外網IP地址
kafka0.10如下
修改server.properties 配置文件
advertised.host.name=xxxx
advertised.port=9092
kafka0.10以上
listeners=PLAINTEXT://0.0.0.0:9093 //綁定全部ip
advertised.listeners=PLAINTEXT://42.159.7.75:9093

  • kafka 經常使用操做

    建立主題

    ./kafka-topics.sh --zookeeper 172.16.0.80:2181 --create --topic test1 --replication-factor 1 --partitions 3

    在建立Topic的時候,有兩個參數是須要填寫的,那就是partions和replication-factor。

    刪除主題

./kafka-topics.sh --zookeeper 172.16.0.80:2181 --delete --topic test1

查看主題列表

./kafka-topics.sh --zookeeper 172.16.0.80:2181:2181 --list

查看主題狀態

kafka-topics.sh --zookeeper 172.16.0.80:2181 --describe --topic test2
Topic:test2 PartitionCount:3 ReplicationFactor:1 Configs:
Topic: test2 Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic: test2 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: test2 Partition: 2 Leader: 1 Replicas: 1 Isr: 1

修改主題分區

./kafka-topics.sh --zookeeper 172.16.0.80:2181 --create --topic test3 --replication-factor 1 --partitions 1

kafka-topics.sh --zookeeper 172.16.0.80:2181  --alter  --topic test3 --partitions 3

  kafka-topics.sh --zookeeper 172.16.0.80:2181  --describe  --topic test3

Topic:test3 PartitionCount:3 ReplicationFactor:1 Configs:
Topic: test3 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: test3 Partition: 1 Leader: 2 Replicas: 2 Isr: 2
Topic: test3 Partition: 2 Leader: 0 Replicas: 0 Isr: 0

生產數據

kafka-console-producer.sh --broker-list 172.16.0.80:9092 --topic test4

hi go
[2019-11-26 15:02:32,608] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 1 : {test4=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2019-11-26 15:02:32,713] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 2 : {test4=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2019-11-26 15:02:32,820] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {test4=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2019-11-26 15:02:32,927] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 4 : {test4=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClien

解決方法
port=9092
listeners=PLAINTEXT://172.16.0.92:9092 # 修改成對應的ip

從新生產數據

kafka-console-producer.sh --broker-list 172.16.0.80:9092 --topic test4

hello kafaka

kafka-console-consumer.sh --zookeeper 172.16.0.91:2181 --topic test4 --from-beginning

消費數據

kafka-console-consumer.sh --zookeeper 172.16.0.91:2181 --topic test4 --from-beginning

列出消費者主組

消費組 即 Consumer Group,應該算是 Kafka 比較有亮點的設計了。那麼何謂 Consumer Group 呢?用一句話歸納就是:Consumer Group 是 Kafka 提供的可擴展且具備容錯性的消費者機制。

/kafka-consumer-groups.sh --bootstrap-server 172.16.0.80:9092 --list

獲取新版本消費者羣組testgroup的詳細信息

/kafka-consumer-groups.sh --bootstrap-server 172.16.0.80:9092 --group testgroup --describe

相關文章
相關標籤/搜索