從阿里雲申請三臺雲服務器,這裏我使用了兩個不一樣的阿里雲帳號去申請雲服務器。咱們配置三臺主機名分別爲zy1,zy2,zy3。html
咱們經過阿里雲能夠獲取主機的公網ip地址,以下:java
經過secureRCT鏈接主機106.15.74.155,運行ifconfig,能夠查看其內網ip地址:node
主機zy1的公網ip爲:106.15.74.155,內網ip爲172.19.182.67。apache
主機zy2的公網ip爲:47.103.134.70,內網ip爲172.19.14.178。json
主機zy3的公網ip爲:47.97.10.51,內網ip爲172.16.229.255。bootstrap
因爲主機位於不一樣的局域網下,所以須要進行一個公網端口到內網端口的映射。在搭建zookeeper和kafka須要使用到2181,2888 ,3888,9092端口。須要在阿里雲中配置入規則,具體能夠參考阿里雲官方收藏:同一個地域、不一樣帳號下的實例實現內網互通 。vim
注意:若是47.103.134.70配置一個入端口3888,那麼對該47.103.134.70:3888的訪問會實際映射到172.19.14.178:3888下。若是是同一局域網下的兩個主機,是不須要配置這個的,能夠直接互通。服務器
若是想了解更多,能夠參考如下博客:網絡
經過SSH訪問阿里雲服務器的原理能夠參考-用SSH訪問內網主機的方法app
以主機zy1爲例:配置以下:
注意zy1對應的ip須要配置爲內網ip,也就是本機ip:172.19.182.67。而zy二、zy3配置的都是公網ip。
在每一個主機下執行如下操做:
rpm -qa |grep java rpm -qa |grep jdk rpm -qa |grep gcj
若是有就使用批量卸載命令
rpm -qa | grep java | xargs rpm -e --nodeps
yum install java-1.8.0-openjdk* -y
默認jre jdk 安裝路徑是/usr/lib/jvm 下面:
vim /etc/profile #set java environment , append export JAVA_HOME=/usr/lib/jvm/java export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/jre/lib/rt.jar export PATH=$PATH:$JAVA_HOME/bin
使得配置生效
. /etc/profile
echo $JAVA_HOME echo $CLASSPATH java -version
在主機zy1下面執行如下操做:
wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz
建立目錄/opt/bigdata:
mkdir /opt/bigdata
解壓文件到/opt/bigdata:
tar -zxvf zookeeper-3.4.13.tar.gz -C /opt/bigdata
跳轉目錄:
cd /opt/bigdata/zookeeper-3.4.13/
cp conf/zoo_sample.cfg conf/zoo.cfg
修改配置文件以下:
vim conf/zoo.cfg
其中部分參數意義以下:
建立/opt/bigdata/data/zookeeper/zkdata:
mkdir -vp /opt/bigdata/data/zookeeper/zkdata
建立myid文件:
echo 1 > /opt/bigdata/data/zookeeper/zkdata/myid
scp -r /opt/bigdata/zookeeper-3.4.13/ zy2:/opt/bigdata/ scp -r /opt/bigdata/zookeeper-3.4.13/ zy3:/opt/bigdata/
zyx主機:
建立/opt/bigdata/data/zookeeper/zkdata:
mkdir -vp /opt/bigdata/data/zookeeper/zkdata
建立myid文件:
echo x > /opt/bigdata/data/zookeeper/zkdata/myid
注意:x表示主機的編號。
vim /etc/profile #set java environment , append export ZOOKEEPER_HOME=/opt/bigdata/zookeeper-3.4.13 export PATH=$ZOOKEEPER_HOME/bin:$PATH
使得配置生效
. /etc/profile
進入到zookeeper目錄下,在每一個主機下分別執行
cd /opt/bigdata/zookeeper-3.4.13 bin/zkServer.sh start
檢查服務狀態
bin/zkServer.sh status
能夠用「jps」查看zk的進程,這個是zk的整個工程的main
jps
注意:zk集羣通常只有一個leader,多個follower,主通常是相應客戶端的讀寫請求,而從主同步數據,當主掛掉以後就會從follower裏投票選舉一個leader出來。
zookeeper服務開啓後,進入客戶端的命令:
zkCli.sh
更多經常使用命令參考博客:Kafka在zookeeper中存儲結構和查看方式。
一、防火牆
防火牆沒有關閉問題。解決方式參考:https://blog.csdn.net/weiyongle1996/article/details/73733228
二、端口沒有開啓
若是/etc/hosts所有配置爲公網:在zy1運行zkServer.sh start,查看端口開啓狀態:
netstat -an | grep 3888
則會發現沒法開啓公網3888端口,咱們應該打開的是內網機器對應的端口。
若是端口已經開啓,能夠經過telnet ip port判斷該端口是否能夠從外部訪問。
在主機zy1下執行:
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.1.1/kafka_2.12-2.1.1.tgz tar -zxvf kafka_2.12-2.1.1.tgz -C /opt/bigdata
重命名:
cd /opt/bigdata/ mv kafka_2.12-2.1.1 kafka
在/opt/bigdata/kafka下:
vim config/server.properties
各個參數意義:
注意,這裏若是但願在java中建立topic也是多個備份,須要添加一下屬性
#default replication factors for automatically created topics,默認值1;
default.replication.factor=3
#When a producer sets acks to "all" (or "-1"), this configuration specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful.
#min.insync.replicas and acks allow you to enforce greater durability guarantees,默認值1;
min.insync.replicas=3
上面是參數的解釋,實際的修改項爲:
broker.id=1
listeners=PLAINTEXT://zy1:9092 #內網地址
advertised.listeners=PLAINTEXT://106.15.74.155:9092 #公網地址(否則遠程客戶端沒法訪問)
log.dirs=/opt/bigdata/kafka/kafka-logs
#此外,能夠在log.retention.hours=168 下面新增下面三項:
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880
#設置zookeeper的鏈接端口
zookeeper.connect=zy1:2181,zy2:2181,zy3:2181
若是咱們須要刪除topic,還須要配置一下內容:
delete.topic.enable=true
具體參考博客:kafka安裝及刪除Topic,Kafka0.8.2.1刪除topic邏輯。
scp -r /opt/bigdata/kafka zy2:/opt/bigdata/
scp -r /opt/bigdata/kafka zy3:/opt/bigdata/
拷貝文件過去的其餘兩個節點須要更改broker.id和listeners,以zy2爲例:
咱們能夠根據Kafka內帶的zk集羣來啓動,可是建議使用獨立的zk集羣:
zkServer.sh start
在/opt/bigdata/kafka下 ,三個節點分別執行以下命令,啓動kafka集羣:
bin/kafka-server-start.sh config/server.properties &
運行命令後服務確實後臺啓動了,但日誌會打印在控制檯,並且關掉命令行窗口,服務就會隨之中止,這個讓我挺困惑的。後來,參考了其餘的啓動腳本,經過測試和調試最終找到了徹底知足要求的命令。
bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &
其中1>/dev/null 2>&1 是將命令產生的輸入和錯誤都輸入到空設備,也就是不輸出的意思。/dev/null表明空設備。
注意:若是內存不足:打開kafka安裝位置,在bin目錄下找到kafka-server-start.sh文件,將export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"修改成export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"。
思路:如下給出幾條kafka指令。建立一個topic,一個節點做爲生產者,兩個節點做爲消費者分別看看可否接收數據,進行驗證:
建立及查看topic:
cd /opt/big/data/kafka bin/kafka-topics.sh -list -zookeeper zy1:2181 bin/kafka-topics.sh --create --zookeeper zy1:2181 --replication-factor 3 --partitions 3 --topic zy-test
開啓生產者:
bin/kafka-console-producer.sh --broker-list zy1:9092 --topic zy-test
開啓消費者:
bin/kafka-console-consumer.sh --bootstrap-server zy2:9092 --topic zy-test --from-beginning
節點zy1產生消息,若是消息沒有清理,在節點zy二、zy3均可以接收到消息。
如下是kafka經常使用命令行總結:
查看topic的詳細信息
bin/kafka-topics.sh -zookeeper zy1:2181 --describe --topic zy-test
能夠看到topic包含3個複本,每一個副本又分爲三個partition。以zy-test:partition0爲例,其leader保存在broker.id=1的主機上,副本保存在二、3節點上。其消息保存在配置參數log.dirs所指定的路徑下:
爲topic增長副本
bin/kafka-reassign-partitions.sh --zookeeper zy1:2181 --reassignment-json-file json/partitions-to-move.json -execute
建立topic
bin/kafka-topics.sh --create --zookeeper zy1:2181 --replication-factor 3 --partitions 3 --topic zy-test
爲topic增長partition
bin/kafka-topics.sh –-zookeeper zy1:2181 –-alter –-partitions 3 –-topic zy-test
kafka生產者客戶端命令
bin/kafka-console-producer.sh --broker-list zy1:9092 --topic zy-test
kafka消費者客戶端命令
bin/kafka-console-consumer.sh --bootstrap-server zy2:9092 --topic zy-test --from-beginning
kafka服務啓動
bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &
刪除topic
bin/kafka-topics.sh --zookeeper zy1:2181 --delete --topic zy-test
bin/kafka-server-stop.sh
因爲Zookeeper並不適合大批量的頻繁寫入操做,新版Kafka已推薦將consumer的位移信息保存在kafka內部的topic中,即__consumer_offsets topic,而且默認提供了kafka_consumer_groups.sh腳本供用戶查看consumer信息。
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list zy1:9092 --topic zy-test
輸出結果每一個字段分別表示topic、partition、untilOffset(當前partition的最大偏移);
上面的輸出結果代表kafka隊列總有產生過4條消息(這並不表明kafka隊列如今必定有4條消息,由於kafka有兩種策略能夠刪除舊數據:基於時間、基於大小)。
因爲我使用kafka-console-producer.sh生成了四條消息:zy、1994110八、 liuyan、1。所以kafka消息隊列中存在4條消息。
bin/kafka-console-consumer.sh --bootstrap-server zy1:9092 --consumer.config config/consumer.properties --topic zy-test --from-beginning
再次創建消費者:
會發現獲取不到數據。這是由於咱們指定了消費組,第一次消費時從offset爲0開始消費,把4條消息所有讀出,此時offset移動到最後,當再次使用同一消費組讀取數據,則會從上次的offset開始獲取數據。
而使用:
bin/kafka-console-consumer.sh --bootstrap-server zy1:9092 --topic zy-test --from-beginning
每次都會獲取四條數據,這是由於每次都會建立一個新的消費者,這些消費者會被隨機分配到一個不一樣的組,所以每次都是從offset爲0開始消費。
參數解釋:
bin/kafka-consumer-groups.sh --bootstrap-server zy1:9092 --list
能夠看到有三個消費組,前兩個消費者沒有指定消費組,隨機產生一個console-consumer-***的group.ig。
第三個是咱們剛剛在config/consumer.properties 中指定的消費組。
bin/kafka-consumer-groups.sh --bootstrap-server zy1:9092 --describe --group test-consumer-group
若是此時再使用生產者客戶端生成兩條消息:
再次查看消費組test-consumer-group的消費狀況:
因爲咱們尚未介紹KAFKA的API,這塊內容就不先介紹,具體參考博客:Kafka消費者 之 指定位移消費。
所以上面介紹的kafka命令,與topic相關的使用--zookeeper zy1:2181,與生產者、消費者相關的使用 --bootstrap-server zy1:9092。
參考博客: