這一節主要講kafka 集羣環境部署,apache
1)Producer:消息生產者,就是向kafkabroker發消息的客戶端;
2)Consumer:消息消費者,向kafkabroker取消息的客戶端;
3)Topic:能夠理解爲一個隊列;
4)Consumer Group(CG):這是kafka用來實現一個topic消息的廣播(發給全部的consumer)
和單播(發給任意一個consumer)的手段。一個topic能夠有多個CG。topic的消息會複製
(不是真的複製,是概念上的)到全部的CG,但每一個partion只會把消息發給該CG中的一
個consumer。若是須要實現廣播,只要每一個consumer有一個獨立的CG就能夠了。要實現
單播只要全部的consumer在同一個CG。用CG還能夠將consumer進行自由的分組而不需
要屢次發送消息到不一樣的topic;bootstrap
5)Broker:一臺kafka服務器就是一個broker。一個集羣由多個broker組成。一個broker
能夠容納多個topic;服務器
6)Partition:爲了實現擴展性,一個很是大的topic能夠分佈到多個broker(即服務器)上,
一個topic能夠分爲多個partition,每一個partition是一個有序的隊列。partition中的每條消息
都會被分配一個有序的id(offset)。kafka只保證按一個partition中的順序將消息發給
consumer,不保證一個topic的總體(多個partition間)的順序;curl
分區數越多,在必定程度上會提高消息處理的吞吐量,由於kafka是基於文件進行讀寫,所以也須要打開更多的文件句柄,也會增長必定的性能開銷。
若是分區過多,那麼日誌分段也會不少,寫的時候因爲是批量寫,其實就會變成隨機寫了,隨機 I/O 這個時候對性能影響很大。因此通常來講 Kafka 不能有太多的 Partition。socket
7)replication-factor
用來設置主題的副本數。每一個主題能夠有多個副本,副本位於集羣中不一樣的broker上,也就是說副本的數量不能超過broker的數量,不然建立主題時會失敗。ide
在建立Topic的時候,有兩個參數是須要填寫的,那就是partions和replication-factor。 8)Offset:kafka的存儲文件都是按照offset.kafka來命名,用offset作名字的好處是方便查
找。例如你想找位於2049的位置,只要找到2048.kafka的文件便可。固然thefirstoffset就
是00000000000.kafka。性能
wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gzfetch
解壓縮url
[root@es7-1-80 conf]# cat zoo.cfg
initLimit=10
syncLimit=5
dataDir=/data/zk/data
clientPort=2181
maxClientCnxns=0
server.1=172.16.0.80:2888:3888
server.2=172.16.0.91:2888:3888
server.3=172.16.0.92:2888:3888線程
echo 1 >> =/data/zk/data/myid
啓動,製做標準啓動服務
cat /etc/systemd/system/zookeeper.service
[Unit]
Description=zookeeper.service
After=network.target
[Service]
Type=forking
ExecStart=/usr/local/zookeeper/bin/zkServer.sh start
ExecStop=/usr/local/zookeeper/bin/zkServer.sh stop
ExecReload=/usr/local/zookeeper/bin/zkServer.sh restart
[Install]
WantedBy=multi-user.target
其餘節點按照上述方式配置,並依次啓動
集羣狀態查看
/usr/local/zookeeper/bin/zkServer.sh status
curl -LO https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.1.0/kafka_2.12-2.1.0.tgz
解壓縮到所需位置在
broker.id=0 #
num.inetwork.threads=3 #broker 處理消息的最大線程數,通常狀況下不須要去修改
num.io.threads=8 # # broker處理磁盤IO 的線程數 ,數值應該大於你的硬盤數
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka/kafka_logs
num.partitions=1 # 每一個topic的分區個數,如果在topic建立時候沒有指定的話 會被topic建立時的指定參數覆蓋
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=172.16.0.91:2181,172.16.0.80:2181,172.16.0.92:2181
zookeeper.connection.timeout.ms=6000
[Unit]
Description=Apache Kafka server (broker)
After=network.target
After=syslog.target
After=zookeeper.target
[Service]
Type=forking
User=root
Group=root
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
ExecReload=/bin/kill -HUP $MAINPID
KillMode=none
Restart=on-failure
RestartSec=5s
[Install]
WantedBy=multi-user.target
五、將kafka 命令加入到環境變量中
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
其餘節點按照上述方式部署,修改broker.id
配置Kafka外網IP地址
kafka0.10如下
修改server.properties 配置文件
advertised.host.name=xxxx
advertised.port=9092
kafka0.10以上
listeners=PLAINTEXT://0.0.0.0:9093 //綁定全部ip
advertised.listeners=PLAINTEXT://42.159.7.75:9093
kafka 經常使用操做
建立主題
./kafka-topics.sh --zookeeper 172.16.0.80:2181 --create --topic test1 --replication-factor 1 --partitions 3
在建立Topic的時候,有兩個參數是須要填寫的,那就是partions和replication-factor。
刪除主題
./kafka-topics.sh --zookeeper 172.16.0.80:2181 --delete --topic test1
查看主題列表
./kafka-topics.sh --zookeeper 172.16.0.80:2181:2181 --list
查看主題狀態
kafka-topics.sh --zookeeper 172.16.0.80:2181 --describe --topic test2
Topic:test2 PartitionCount:3 ReplicationFactor:1 Configs:
Topic: test2 Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic: test2 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: test2 Partition: 2 Leader: 1 Replicas: 1 Isr: 1
修改主題分區
./kafka-topics.sh --zookeeper 172.16.0.80:2181 --create --topic test3 --replication-factor 1 --partitions 1
kafka-topics.sh --zookeeper 172.16.0.80:2181 --alter --topic test3 --partitions 3 kafka-topics.sh --zookeeper 172.16.0.80:2181 --describe --topic test3
Topic:test3 PartitionCount:3 ReplicationFactor:1 Configs:
Topic: test3 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: test3 Partition: 1 Leader: 2 Replicas: 2 Isr: 2
Topic: test3 Partition: 2 Leader: 0 Replicas: 0 Isr: 0
生產數據
kafka-console-producer.sh --broker-list 172.16.0.80:9092 --topic test4
hi go
[2019-11-26 15:02:32,608] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 1 : {test4=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2019-11-26 15:02:32,713] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 2 : {test4=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2019-11-26 15:02:32,820] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {test4=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2019-11-26 15:02:32,927] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 4 : {test4=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClien
解決方法
port=9092
listeners=PLAINTEXT://172.16.0.92:9092 # 修改成對應的ip
從新生產數據
kafka-console-producer.sh --broker-list 172.16.0.80:9092 --topic test4
hello kafaka
kafka-console-consumer.sh --zookeeper 172.16.0.91:2181 --topic test4 --from-beginning
消費數據
kafka-console-consumer.sh --zookeeper 172.16.0.91:2181 --topic test4 --from-beginning
列出消費者主組
消費組 即 Consumer Group,應該算是 Kafka 比較有亮點的設計了。那麼何謂 Consumer Group 呢?用一句話歸納就是:Consumer Group 是 Kafka 提供的可擴展且具備容錯性的消費者機制。
/kafka-consumer-groups.sh --bootstrap-server 172.16.0.80:9092 --list
獲取新版本消費者羣組testgroup的詳細信息
/kafka-consumer-groups.sh --bootstrap-server 172.16.0.80:9092 --group testgroup --describe