CentOs7 Kafka單機消息的發佈-訂閱

    這段時間一直在學習大數據相關的知識,從Spark,Spark Streaming,Scala到Kafka等等,涉及到的知識面不少,整體看下來,以爲大數據仍是很好玩的,在如今及之後的方方面面都很適用。下面說下Kafka消息的發佈-訂閱。html

    (一)基本環境準備java

    本人在虛擬機下安裝了CentOs7,這裏自行略過,必備的環境jdk,zookeeper,kafkalinux

    (二)環境搭建(若不是root登陸,則如下操做需加上sudo)bootstrap

    一、JDK安裝及環境配置(最好jdk8以上,此處略過)vim

    二、Zookeeper安裝及環境配置ide

    (1)解壓及移動至其餘目錄oop

#解壓Zookeeper並重命名
sudo tar -zxvf zookeeper-3.3.6.tar.gz
sudo mv zookeeper-3.3.6 zookeeper
#將zookeeper移動到/usr/local/目錄下,按本身喜愛
sudo mv zookeeper /usr/local

    (2)編輯Zookeeper的配置文件學習

# 複製一份zoo_sample.cfg文件並更名爲zoo.cfg
sudo cp /opt/zookeeper/zoo_sample.cfg zoo.cfg
# 編輯zoo.cfg 文件
sudo vim /opt/zookeeper/zoo.cfg
#主要修改dataDir和server.1=127.0.0.1:2888:3888這2處
# the directory where the snapshot is stored.
dataDir=/usr/local/zookeeper/data
# the port at which the clients will connect
clientPort=2181
server.1=127.0.0.1:2888:3888

     上面配置中的參數可參考:https://www.linuxidc.com/Linux/2017-06/144950.htm大數據

    (3)配置Zookeeper環境變量spa

sudo vim /etc/profile
#修改以下
JAVA_HOME=/usr/java/jdk1.8.0_161
JRE_HOME=/usr/java/jdk1.8.0_161/jre
SCALA_HOME=/usr/local/scala
ZOOKEEPER_HOME=/usr/local/zookeeper
KAFKA_HOME=/usr/local/kafka
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin:$SCALA_HOME/bin:$ZOOKEEPER_HOME/bin:$KAFKA_HOME/bin
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
export JAVA_HOME JRE_HOME SCALA_HOME ZOOKEEPER_HOME KAFKA_HOME PATH CLASSPATH

    注意,配置完成後必須執行:source /etc/profile,不然不生效。

    (4)啓動Zookeeper

#cd 到Zookeeper/bin目錄下
./zkServer.sh start

    啓動成功以下:

ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

    關閉Zookeeper同上,只需修改部分:

#cd 到Zookeeper/bin目錄下
./zkServer.sh stop

     三、Kafka安裝及環境配置

    (1)解壓及移動至其餘目錄

# 解壓及重命名爲kafka
sudo tar -zxvf kafka_2.12-1.0.0.tgz
sudo mv kafka_2.12-1 kafka
# 移動至/usr/local/目錄下
sudo mv kafka /usr/local

    (2)編輯Kafka的配置文件

#建立日誌存放目錄
cd /usr/local/kafka
mkdir logs
#修改配置文件/usr/local/kafka/config/server.properties
sudo vim /usr/local/kafka/config/server.properties
#主要修改下面幾項內容以下:
broker.id=0
delete.topic.enable=true
listeners = PLAINTEXT://127.0.0.1:9092
log.dirs=/usr/local/kafka/logs/
zookeeper.connect=127.0.0.1:2181

    上面配置中的參數可參考:https://www.cnblogs.com/wangb0402/p/6187503.html

    (3)配置Kafka環境變量:詳見上面的Zookeeper配置

    (4)啓動Kafka

# cd到kafka/bin目錄下
./kafka-server-start.sh /usr/local/kafka/config/server.properties

    (三)Kafka的消息發佈-訂閱:打開四個終端

    (1)建立一個Topic

# kafka/bin目錄下
./kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test

     顯示以下:

[hadoop@bogon bin]$ ./kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".

    若已存在該test的Topic,能夠經過delete刪除:

#利用命令刪除須要刪除的topic(配置中已經設置爲true)
./kafka-topics.sh --delete --zookeeper localhost:2181 --topic test

    (2)Producer向Topic發送消息

./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test

     顯示以下:

[hadoop@bogon bin]$ ./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
>producer send message
>hello kafka
>hello world
>spark
>heihei
>send everything for people
>

    (3)Consumer讀取Topic的消息

./kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test --from-beginning

    顯示以下,注意開始的警告信息不影響使用,需稍等片刻:

[hadoop@bogon bin]$ ./kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 -topic test --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
producer send message
hello kafka
hello world
spark
heihei
send everything for people
相關文章
相關標籤/搜索