centos安裝kafka----(7)

消息系統負責將數據從一個應用程序傳輸到另外一個應用程序。
有兩種類型的消息模式:
1.點對點,相似於微信中的好友與好友聊天。該模式下,當消費者消費數據以後,會將消息系統中的數據進行刪除。
2.一對多,即發佈-訂閱(pub-sub),相似於微信中的朋友圈,即你發佈一條消息,多個朋友看獲得。
Kafka是一個分佈式發佈-訂閱消息系統,它的做用主要是將消息從一個端點傳遞到另外一個端點。kafka的消息保留在磁盤上,並在集羣內複製消息以防止數據丟失。最後,kafka須要構建在zookeeper同步服務之上。java

1.kafka專業術語介紹

  • Broker kafk實例,kafka集羣包含一個或多個實例,原則上來講一臺主機上只配一個實例。
  • Topic kafka消息主題,kafka的消息必須屬於某個主題,一個Broker存在多個Topic。
  • Partition
    它是物理層的概念,被稱之爲分區,每一個Topic包含一個或者多個分區。
  • Producer
    消息發佈者,即負責發佈消息到kafka的Broker
  • Consumer
    消息消費者,即負責向kafka Broker消費消息的客戶端
  • Consumer Group
    每一個消費者必須屬於一個消費者組

1.下載kafka

$ cd /usr/java
[root@43-c59438365-0048-0727982 java]# cd kafka
[root@43-c59438365-0048-0727982 kafka]# wget http://mirror.bit.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz
因而可知,我下載的kafka版本是2.3.0apache

2.解壓kafka

[root@43-c59438365-0048-0727982 kafka]# ll
total 55936
-rw-r--r-- 1 root root 57215197 Jun 25 08:38 kafka_2.12-2.3.0.tgz
[root@43-c59438365-0048-0727982 kafka]# tar zxvf kafka_2.12-2.3.0.tgz
...
[root@43-c59438365-0048-0727982 kafka]# ll
total 55940
drwxr-xr-x 6 root root 4096 Jun 20 04:44 kafka_2.12-2.3.0
-rw-r--r-- 1 root root 57215197 Jun 25 08:38 kafka_2.12-2.3.0.tgz
[root@43-c59438365-0048-0727982 kafka]#bootstrap

3.配置kafka環境變量

$ vi /etc/profile
添加以下兩行
export KAFKA_HOME=/usr/java/kafka/kafka_2.12-2.3.0
export PATH=$PATH:$KAFKA_HOME/bin
退出後,執行刷新命令
source profile微信

4.啓動kafka

因爲kafka依賴zookeeper,因此啓動kafka以前須要先啓動zookeeper。因此咱們須要找到zookeeper的配置文件,以下
[root@43-c59438365-0048-0727982 config]# ll
total 68
-rw-r--r-- 1 root root 906 Jun 20 04:43 connect-console-sink.properties
-rw-r--r-- 1 root root 909 Jun 20 04:43 connect-console-source.properties
-rw-r--r-- 1 root root 5321 Jun 20 04:43 connect-distributed.properties
-rw-r--r-- 1 root root 883 Jun 20 04:43 connect-file-sink.properties
-rw-r--r-- 1 root root 881 Jun 20 04:43 connect-file-source.properties
-rw-r--r-- 1 root root 1552 Jun 20 04:43 connect-log4j.properties
-rw-r--r-- 1 root root 2262 Jun 20 04:43 connect-standalone.properties
-rw-r--r-- 1 root root 1221 Jun 20 04:43 consumer.properties
-rw-r--r-- 1 root root 4727 Jun 20 04:43 log4j.properties
-rw-r--r-- 1 root root 1925 Jun 20 04:43 producer.properties
-rw-r--r-- 1 root root 6851 Jun 20 04:43 server.properties
-rw-r--r-- 1 root root 1032 Jun 20 04:43 tools-log4j.properties
-rw-r--r-- 1 root root 1169 Jun 20 04:43 trogdor.conf
-rw-r--r-- 1 root root 1023 Jun 20 04:43 zookeeper.properties
[root@43-c59438365-0048-0727982 config]# pwd
/usr/java/kafka/kafka_2.12-2.3.0/config
[root@43-c59438365-0048-0727982 config]#
如上就是zookeeper配置文件的絕對路徑,接下來啓動zookeeper,進入kafka安裝目錄bin下,執行以下啓動命令
[root@43-c59438365-0048-0727982 bin]# ./zookeeper-server-start.sh /usr/java/kafka/kafka_2.12-2.3.0/config/zookeeper.properties &
[1] 25053
[root@43-c59438365-0048-0727982 bin]# [2019-08-24 15:31:29,375] INFO Reading configuration from: /usr/java/kafka/kafka_2.12-2.3.0/config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
[2019-08-24 15:31:29,407] INFO tickTime set to 3000 (org.apache.zookeeper.server.ZooKeeperServer)
[2019-08-24 15:31:29,407] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2019-08-24 15:31:29,407] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2019-08-24 15:31:29,413] INFO Using org.apache.zookeeper.server.NIOServerCnxnFactory as server connection factory (org.apache.zookeeper.server.ServerCnxnFactory)
[2019-08-24 15:31:29,416] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
^C
[root@43-c59438365-0048-0727982 bin]#
如上說明zookeeper啓動成功。
接着啓動kafka服務
kafka服務啓動須要kafka配置文件,配置文件同zookeeper配置文件是一個位置,如/usr/java/kafka/kafka_2.12-2.3.0/config
[root@43-c59438365-0048-0727982 bin]# ./kafka-server-start.sh /usr/java/kafka/kafka_2.12-2.3.0/config/server.properties &
[2] 25406
[root@43-c59438365-0048-0727982 bin]# [2019-08-24 15:37:12,829] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2019-08-24 15:37:13,489] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
[2019-08-24 15:37:13,490] INFO starting (kafka.server.KafkaServer)
[2019-08-24 15:37:13,491] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
...
[2019-08-24 15:37:14,823] INFO [SocketServer brokerId=0] Started data-plane processors for 1 acceptors (kafka.network.SocketServer)
[2019-08-24 15:37:14,830] INFO Kafka version: 2.3.0 (org.apache.kafka.common.utils.AppInfoParser)
[2019-08-24 15:37:14,830] INFO Kafka commitId: fc1aaa116b661c8a (org.apache.kafka.common.utils.AppInfoParser)
[2019-08-24 15:37:14,830] INFO Kafka startTimeMs: 1566632234826 (org.apache.kafka.common.utils.AppInfoParser)
[2019-08-24 15:37:14,831] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
因而可知,kafka啓動成功了。session

5.測試kafka

  • 建立Topic
    [root@43-c59438365-0048-0727982 bin]# kafka-topics.sh --create --zookeeper localhost:2181 --topic myTopic --partitions 1 --replication-factor 1
    [2019-08-24 16:15:21,854] INFO Accepted socket connection from /0:0:0:0:0:0:0:1:44731 (org.apache.zookeeper.server.NIOServerCnxnFactory)
    [2019-08-24 16:15:21,857] INFO Client attempting to establish new session at /0:0:0:0:0:0:0:1:44731 (org.apache.zookeeper.server.ZooKeeperServer)
    [2019-08-24 16:15:21,860] INFO Established session 0x10085c3259c0002 with negotiated timeout 30000 for client /0:0:0:0:0:0:0:1:44731 (org.apache.zookeeper.server.ZooKeeperServer)
    [2019-08-24 16:15:22,126] INFO Got user-level KeeperException when processing sessionid:0x10085c3259c0002 type:setData cxid:0x4 zxid:0x44 txntype:-1 reqpath:n/a Error Path:/config/topics/myTopic Error:KeeperErrorCode = NoNode for /config/topics/myTopic (org.apache.zookeeper.server.PrepRequestProcessor)
    Created topic myTopic.
    由上可知,咱們建立了一個叫myTopic的主題。
  • 建立生產者 [root@43-c59438365-0048-0727982 bin]# kafka-console-producer.sh --topic myTopic --broker-list localhost:9092
    $>
    由上可知,生產者建立好了。
  • 建立消費者
    這裏很明顯,須要新開一個窗口,
    [root@43-c59438365-0048-0727982 bin]# kafka-console-consumer.sh --topic myTopic --bootstrap-server localhost:9092
    以下就能夠在生產者窗口發送消息,消費者窗口及時收到消息。

自此結束。socket

相關文章
相關標籤/搜索