1、kafka簡介html
kafka是基於發佈/訂閱模式的一個分佈式消息隊列系統,用java語言研發,是ASF旗下的一個開源項目;相似的消息隊列服務還有rabbitmq、activemq、zeromq;kafka最主要的優點具有分佈式功能,而且結合zookeeper能夠實現動態擴容;kafka對消息保存是經過Topic進行分類,發送消息一方稱爲producer(生產者),接收消息一方稱爲consumer(消費者);一個kafka集羣有多個kafka server組成,咱們把每一個kafka server稱爲broker(消息掮客);java
ActiveMQ、RabbitMQ、kafka對比node
2、kafka集羣部署linux
環境說明apache
主機名 | ip地址 |
node04 | 192.168.0.44 |
node05 | 192.168.0.45 |
node06 | 192.168.0.46 |
提示:在部署kafka集羣以前,咱們要先把zk集羣部署起來,由於kafka是強依賴zk集羣;zk集羣部署請參考上一篇博客http://www.javashuo.com/article/p-poznrkgu-nu.html;上面3臺server只是kafka集羣的三臺server;bash
一、安裝jdk併發
[root@node04 ~]# yum install -y java-1.8.0-openjdk-devel
驗證java環境app
[root@node04 ~]# java -version openjdk version "1.8.0_262" OpenJDK Runtime Environment (build 1.8.0_262-b10) OpenJDK 64-Bit Server VM (build 25.262-b10, mixed mode) [root@node04 ~]#
提示:以上安裝Java環境,在kafka集羣的每一個server都要作一遍;除了上面的java環境,還有基礎環境像時間同步,主機名解析,關閉selinux,關閉防火牆,主機免密這些都要提早作好;ssh
二、下載kafka二進制壓縮包socket
[root@node04 ~]# ll total 0 [root@node04 ~]# wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.6.0/kafka_2.12-2.6.0.tgz --2020-10-21 20:06:28-- https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.6.0/kafka_2.12-2.6.0.tgz Resolving mirrors.tuna.tsinghua.edu.cn (mirrors.tuna.tsinghua.edu.cn)... 101.6.8.193, 2402:f000:1:408:8100::1 Connecting to mirrors.tuna.tsinghua.edu.cn (mirrors.tuna.tsinghua.edu.cn)|101.6.8.193|:443... connected. HTTP request sent, awaiting response... 200 OK Length: 65671917 (63M) [application/octet-stream] Saving to: ‘kafka_2.12-2.6.0.tgz’ 100%[================================================================================>] 65,671,917 6.38MB/s in 13s 2020-10-21 20:06:41 (4.96 MB/s) - ‘kafka_2.12-2.6.0.tgz’ saved [65671917/65671917] [root@node04 ~]# ll total 64136 -rw-r--r-- 1 root root 65671917 Aug 5 06:01 kafka_2.12-2.6.0.tgz [root@node04 ~]#
三、解壓二進制包,並作軟鏈接
[root@node04 ~]# tar xf kafka_2.12-2.6.0.tgz -C /usr/local/ [root@node04 ~]# ln -sv /usr/local/kafka_2.12-2.6.0 /usr/local/kafka ‘/usr/local/kafka’ -> ‘/usr/local/kafka_2.12-2.6.0’ [root@node04 ~]#
提示:其餘server也是相同的操做;
四、配置node04上的kafka
提示:broker.id是配置broker的id,這個id在kafka集羣中必須惟一;listeners是用來指定當前節點監聽的socket;log.dirs用來指定kafka的日誌文件路徑;log.retention.hours用來指定保存多少小時的日誌;zookeeper.conect用來指定zk集羣各節點信息,一般是把zk全部節點都寫上,用逗號隔開;其餘的參數均可以不用變;我這裏用到主機名,是由於我在hosts文件對全部節點都作了主機名解析;
建立日誌目錄
[root@node04 config]# mkdir -pv /data/kafka mkdir: created directory ‘/data’ mkdir: created directory ‘/data/kafka’ [root@node04 config]#
提示:後面的kafka-logs目錄在kafka啓動時會自動建立;到此node04就配置好了;
把node04上的配置文件拷貝到node05
[root@node04 config]# scp server.properties node05:/usr/local/kafka/config/ server.properties 100% 6882 2.0MB/s 00:00 [root@node04 config]#
修改broker.id和listeners配置
建立日誌目錄
[root@node05 ~]# mkdir -pv /data/kafka mkdir: created directory ‘/data’ mkdir: created directory ‘/data/kafka’ [root@node05 ~]#
把node05的配置文件,複製到node06的kafka配置文件目錄
[root@node05 ~]# scp /usr/local/kafka/config/server.properties node06:/usr/local/kafka/config/server.properties The authenticity of host 'node06 (192.168.0.46)' can't be established. ECDSA key fingerprint is SHA256:lE8/Vyni4z8hsXaa8OMMlDpu3yOIRh6dLcIr+oE57oE. ECDSA key fingerprint is MD5:14:59:02:30:c0:16:b8:6c:1a:84:c3:0f:a7:ac:67:b3. Are you sure you want to continue connecting (yes/no)? yes Warning: Permanently added 'node06,192.168.0.46' (ECDSA) to the list of known hosts. server.properties 100% 6882 1.9MB/s 00:00 [root@node05 ~]#
修改broker.id和listeners配置,並建立日誌目錄
到此,三個節點的kafka就配置好了;
啓動各節點上的kafka
[root@node04 config]# /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties [root@node04 config]# ss -tnl State Recv-Q Send-Q Local Address:Port Peer Address:Port LISTEN 0 128 *:22 *:* LISTEN 0 100 127.0.0.1:25 *:* LISTEN 0 128 :::22 :::* LISTEN 0 100 ::1:25 :::* LISTEN 0 50 :::39779 :::* LISTEN 0 50 ::ffff:192.168.0.44:9092 :::* [root@node04 config]#
提示:能夠看到node04上的9092處於監聽狀態;用一樣的命令把node05,node06上的kafka都啓動起來;
查看日誌
提示:kafka的啓動日誌放在安裝目錄下的logs目錄,有個server.log;咱們剛纔建立的日誌目錄,主要用來保存集羣事務的日誌;
測試kafka
一、在各節點驗證kafka進程是否啓動
[root@node04 config]# jps 1797 Kafka 2485 Jps [root@node04 config]# ssh node05 'jps' 1840 Jps 1772 Kafka [root@node04 config]# ssh node06 'jps' 2321 Kafka 2388 Jps [root@node04 config]#
二、在zk集羣上查看,是否有kafka節點註冊到上面
zk: localhost:2181(CONNECTED) 0] ls / [admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper] [zk: localhost:2181(CONNECTED) 1] ls -R / / /admin /brokers /cluster /config /consumers /controller /controller_epoch /isr_change_notification /latest_producer_id_block /log_dir_event_notification /zookeeper /admin/delete_topics /brokers/ids /brokers/seqid /brokers/topics /brokers/ids/0 /brokers/ids/1 /brokers/ids/2 /cluster/id /config/brokers /config/changes /config/clients /config/topics /config/users /zookeeper/config /zookeeper/quota [zk: localhost:2181(CONNECTED) 2]
提示:能夠看到在zk集羣上多了不少節點;
三、建立名爲test,partitions爲3,replication爲3的topic
[root@node04 config]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --partitions 3 --replication-factor 3 --topic test Created topic test. [root@node04 config]#
在kafka集羣的任意節獲取topic
[root@node06 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper node01:2181,node01:2181,node03:2181 --topic test Topic: test PartitionCount: 3 ReplicationFactor: 3 Configs: Topic: test Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0 Topic: test Partition: 1 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1 Topic: test Partition: 2 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2 [root@node06 ~]#
提示:從上面的返回的狀態信息能夠看到test topic有三個分區分別爲0、一、2,分區0的leader是2(broker.id),分區0有三個副本,而且狀態都爲lsr(ln-sync,表示能夠參加選舉成爲leader)。
四、刪除topic
六、建立topic,併發送消息
[root@node04 config]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --partitions 3 --replication-factor 3 --topic msgtest Created topic msgtest. [root@node04 config]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list node04:9092,node05:9092,node06:9092 --topic msgtest >hello >hi >
在其餘節點獲取消息
使用圖形工具kafka-tool工具獲取消息
ok,到此kafka這個消息系統就搭建好了;