這段時間一直在學習大數據相關的知識,從Spark,Spark Streaming,Scala到Kafka等等,涉及到的知識面不少,整體看下來,以爲大數據仍是很好玩的,在如今及之後的方方面面都很適用。下面說下Kafka消息的發佈-訂閱。html
(一)基本環境準備java
本人在虛擬機下安裝了CentOs7,這裏自行略過,必備的環境jdk,zookeeper,kafkalinux
(二)環境搭建(若不是root登陸,則如下操做需加上sudo)bootstrap
一、JDK安裝及環境配置(最好jdk8以上,此處略過)vim
二、Zookeeper安裝及環境配置ide
(1)解壓及移動至其餘目錄oop
#解壓Zookeeper並重命名 sudo tar -zxvf zookeeper-3.3.6.tar.gz sudo mv zookeeper-3.3.6 zookeeper #將zookeeper移動到/usr/local/目錄下,按本身喜愛 sudo mv zookeeper /usr/local
(2)編輯Zookeeper的配置文件學習
# 複製一份zoo_sample.cfg文件並更名爲zoo.cfg sudo cp /opt/zookeeper/zoo_sample.cfg zoo.cfg # 編輯zoo.cfg 文件 sudo vim /opt/zookeeper/zoo.cfg #主要修改dataDir和server.1=127.0.0.1:2888:3888這2處 # the directory where the snapshot is stored. dataDir=/usr/local/zookeeper/data # the port at which the clients will connect clientPort=2181 server.1=127.0.0.1:2888:3888
上面配置中的參數可參考:https://www.linuxidc.com/Linux/2017-06/144950.htm大數據
(3)配置Zookeeper環境變量spa
sudo vim /etc/profile #修改以下 JAVA_HOME=/usr/java/jdk1.8.0_161 JRE_HOME=/usr/java/jdk1.8.0_161/jre SCALA_HOME=/usr/local/scala ZOOKEEPER_HOME=/usr/local/zookeeper KAFKA_HOME=/usr/local/kafka PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin:$SCALA_HOME/bin:$ZOOKEEPER_HOME/bin:$KAFKA_HOME/bin CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib export JAVA_HOME JRE_HOME SCALA_HOME ZOOKEEPER_HOME KAFKA_HOME PATH CLASSPATH
注意,配置完成後必須執行:source /etc/profile,不然不生效。
(4)啓動Zookeeper
#cd 到Zookeeper/bin目錄下 ./zkServer.sh start
啓動成功以下:
ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg Starting zookeeper ... STARTED
關閉Zookeeper同上,只需修改部分:
#cd 到Zookeeper/bin目錄下 ./zkServer.sh stop
三、Kafka安裝及環境配置
(1)解壓及移動至其餘目錄
# 解壓及重命名爲kafka sudo tar -zxvf kafka_2.12-1.0.0.tgz sudo mv kafka_2.12-1 kafka # 移動至/usr/local/目錄下 sudo mv kafka /usr/local
(2)編輯Kafka的配置文件
#建立日誌存放目錄 cd /usr/local/kafka mkdir logs #修改配置文件/usr/local/kafka/config/server.properties sudo vim /usr/local/kafka/config/server.properties #主要修改下面幾項內容以下: broker.id=0 delete.topic.enable=true listeners = PLAINTEXT://127.0.0.1:9092 log.dirs=/usr/local/kafka/logs/ zookeeper.connect=127.0.0.1:2181
上面配置中的參數可參考:https://www.cnblogs.com/wangb0402/p/6187503.html
(3)配置Kafka環境變量:詳見上面的Zookeeper配置
(4)啓動Kafka
# cd到kafka/bin目錄下 ./kafka-server-start.sh /usr/local/kafka/config/server.properties
(三)Kafka的消息發佈-訂閱:打開四個終端
(1)建立一個Topic
# kafka/bin目錄下 ./kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test
顯示以下:
[hadoop@bogon bin]$ ./kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test Created topic "test".
若已存在該test的Topic,能夠經過delete刪除:
#利用命令刪除須要刪除的topic(配置中已經設置爲true) ./kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
(2)Producer向Topic發送消息
./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
顯示以下:
[hadoop@bogon bin]$ ./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test >producer send message >hello kafka >hello world >spark >heihei >send everything for people >
(3)Consumer讀取Topic的消息
./kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test --from-beginning
顯示以下,注意開始的警告信息不影響使用,需稍等片刻:
[hadoop@bogon bin]$ ./kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 -topic test --from-beginning Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]. producer send message hello kafka hello world spark heihei send everything for people