Apache Kafka是一種頗受歡迎的分佈式消息代理系統,旨在有效地處理大量的實時數據。Kafka集羣不只具備高度可擴展性和容錯性,並且與其餘消息代理(如ActiveMQ和RabbitMQ)相比,還具備更高的吞吐量。雖然它一般用做pub/sub消息傳遞系統,但許多組織也將其用於日誌聚合,由於它爲發佈的消息提供持久存儲。html
您能夠在一臺服務器上部署Kafka,也能夠構建一個分佈式的Kafka集羣來提升性能。本文介紹如何在多節點CentOS 7服務器實例上安裝Apache Kafka。java
先決條件:node
欲安裝kafka集羣服務器,首先要安裝如下組件:apache
《Linux JAVA JDK JRE 環境變量安裝與配置》
《在 Linux 多節點安裝配置 Apache Zookeeper 分佈式集羣》編程服務器列表:bootstrap
10.10.204.63
10.10.204.64
10.10.204.65vim
1.安裝api
建立用戶和組:服務器
# groupadd kafka # useradd -g kafka -s /sbin/nologin kafka
下載Kafka包:session
# cd /usr/local # wget http://apache.fayea.com/kafka/0.10.2.1/kafka_2.10-0.10.2.1.tgz
解壓建立軟鏈接:
# tar zxvf kafka_2.10-0.10.2.1.tgz # ln -s kafka_2.10-0.10.2.1 kafka
設置權限及建立Kafka日誌存放目錄:
# chown -R kafka:kafka kafka_2.10-0.10.2.1 kafka # mkdir -p /usr/local/kafka/logs
添加系統變量:
編輯:/etc/profile 文件,在最下面添加如下內容:
export KAFKA_HOME=/usr/local/kafka_2.10-0.10.2.1 export PATH=$KAFKA_HOME/bin:$PATH
使變量生效:
# source /etc/profile
2.配置
修改添加Kafka服務器的配置文件:
# cd /usr/local/kafka/config # vim server.properties
#惟一值,每一個server填寫不同。
broker.id=63
#容許刪除主題。
delete.topic.enable=true
#修改;協議、當前broker機器ip、端口,此值能夠配置多個,跟SSL等有關係。
listeners=PLAINTEXT://10.10.204.63:9092
#修改;kafka數據的存放地址,多個地址的話用逗號分割,例如 /data/kafka-logs-1,/data/kafka-logs-2。
log.dirs=/usr/local/kafka/logs/kafka-logs
#每一個topic的分區個數,如果在topic建立時候沒有指定的話會被topic建立時的指定參數覆蓋。
num.partitions=3
#新增;表示消息體的最大大小,單位是字節。
message.max.bytes=5242880
#新增;是否容許自動建立topic,如果false,就須要經過命令建立topic。
default.replication.factor=2
#新增;replicas每次獲取數據的最大大小。
replica.fetch.max.bytes=5242880
#新增;配置文件中必須使用如下配置,不然只會標記爲刪除,而不是真正刪除。
delete.topic.enable=true
#新增;是否容許 leader 進行自動平衡,boolean 值,默認爲 true。
auto.leader.rebalance.enable=true
#kafka鏈接的zk地址,各個broker配置一致。
zookeeper.connect=10.10.204.63:2181,10.10.204.64:2181,10.10.204.65:2181#可選配置
#是否容許自動建立 topic,boolean 值,默認爲 true。
auto.create.topics.enable=true
#指定 topic 的壓縮方式,string 值,可選有。
#compression.type=high
#會把全部的日誌同步到磁盤上,避免重啓以後的日誌恢復,減小重啓時間。
controlled.shutdown.enable=true注:broker的配置文件中有zookeeper的地址,也有本身的broker ID, 當broker啓動後,會在zookeeper中新建一個znode。
修改其餘配置文件:
# vim zookeeper.properties
修改成:
dataDir=/usr/local/zookeeper/data
新增: server.1=10.10.204.63:2888:3888 server.2=10.10.204.64:2888:3888 server.3=10.10.204.65:2888:3888
修改如下配置文件:
# vim producer.properties bootstrap.servers=10.10.204.63:9092,10.10.204.64:9092,10.10.204.65:9092 # vim consumer.properties zookeeper.connect=10.10.204.63:2181,10.10.204.64:2181,10.10.204.65:2181
3.啓動
啓動全部節點kafka服務(能夠經過查看日誌,或者檢查進程狀態,保證Kafka集羣啓動成功):
# /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
執行上述命令後,會出來滾動的啓動信息,直至窗口靜止,此時需重開終端檢查是否啓動成功;
# jps 9939 Jps 2201 QuorumPeerMain 2303 Kafka
4.使用測試
下面操做能夠在任意節點,從新打開一個終端操做:
執行如下命令,創建一個名爲 renwole 的topic。
# cd /usr/local/kafka/bin # ./kafka-topics.sh --create --zookeeper 10.10.204.63:2181,10.10.204.64:2181,10.10.204.65:2181 --replication-factor 1 --partitions 1 --topic renwole Created topic "renwole".
解釋:
--replication-factor 1 複製1份 --partitions 1 建立1個分區 --topic 主題爲renwole查看已建立的topic:
# ./kafka-topics.sh --list --zookeeper 10.10.204.63:2181 _consumer_offsets renwole
注:能夠配置 broker 自動建立 topic。
發送消息(Kafka 使用一個簡單的命令行producer(而後能夠隨意輸入內容,回車能夠發送,ctrl+c 退出)默認的每條命令將發送一條消息。):
# ./kafka-console-producer.sh --broker-list 10.10.204.64:9092 --topic renwole
在消息接收端,執行如下命令查看收到的消息:
# ./kafka-console-consumer.sh --bootstrap-server 10.10.204.63:9092 --topic renwole --from-beginning
執行如下命令刪除topic:
# ./kafka-topics.sh --delete --zookeeper 10.10.204.63:2181,10.10.204.64:2181,10.10.204.65:2181 --topic renwole
5.查看集羣狀態
kafka已經成功完成安裝,查看kafka集羣節點ID狀態:
注:可在任意節點鏈接zookeeper客戶端。
# cd /usr/local/zookeeper/bin # ./zkCli.sh Connecting to localhost:2181 [myid:] - INFO [main:Environment@100] - Client environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT [myid:] - INFO [main:Environment@100] - Client environment:host.name=10-10-204-63.10.10.204.63 [myid:] - INFO [main:Environment@100] - Client environment:java.version=1.8.0_144 [myid:] - INFO [main:Environment@100] - Client environment:java.vendor=Oracle Corporation [myid:] - INFO [main:Environment@100] - Client environment:java.home=/usr/java/jdk1.8.0_144/jre [myid:] - INFO [main:Environment@100] - Client environment:java.class.path=/usr/local/zookeeper/bin/../build/classes:/usr/local/zookeeper/bin/../build/lib/*.jar:/usr/local/zookeeper/bin/../lib/slf4j-log4j12-1.6.1.jar:/usr/local/zookeeper/bin/../lib/slf4j-api-1.6.1.jar:/usr/local/zookeeper/bin/../lib/netty-3.10.5.Final.jar:/usr/local/zookeeper/bin/../lib/log4j-1.2.16.jar:/usr/local/zookeeper/bin/../lib/jline-0.9.94.jar:/usr/local/zookeeper/bin/../zookeeper-3.4.10.jar:/usr/local/zookeeper/bin/../src/java/lib/*.jar:/usr/local/zookeeper/bin/../conf:.:/usr/java/jdk1.8.0_144/jre/lib/rt.jar:/usr/java/jdk1.8.0_144/lib/dt.jar:/usr/java/jdk1.8.0_144/lib/tools.jar:/usr/java/jdk1.8.0_144/jre/lib [myid:] - INFO [main:Environment@100] - Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib [myid:] - INFO [main:Environment@100] - Client environment:java.io.tmpdir=/tmp [myid:] - INFO [main:Environment@100] - Client environment:java.compiler= [myid:] - INFO [main:Environment@100] - Client environment:os.name=Linux [myid:] - INFO [main:Environment@100] - Client environment:os.arch=amd64 [myid:] - INFO [main:Environment@100] - Client environment:os.version=3.10.0-514.21.2.el7.x86_64 [myid:] - INFO [main:Environment@100] - Client environment:user.name=root [myid:] - INFO [main:Environment@100] - Client environment:user.home=/root [myid:] - INFO [main:Environment@100] - Client environment:user.dir=/usr/local/zookeeper-3.4.10/bin [myid:] - INFO [main:ZooKeeper@438] - Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@69d0a921 Welcome to ZooKeeper! [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1032] - Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error) JLine support is enabled [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@876] - Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1299] - Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x35ddf80430b0008, negotiated timeout = 30000 WATCHER:: WatchedEvent state:SyncConnected type:None path:null [zk: localhost:2181(CONNECTED) 0] ls /brokers/ids #查看ID [63, 64, 65]能夠看到三臺kafka實例ID都在線,若是模擬任意一臺節點掉線,查看結果就會不一樣(這個ID號是前面設置的 broker.id )。
開放端口加入防火牆:
# firewall-cmd --permanent --add-port=9092/tcp # firewall-cmd --reload
6.開機啓動
建立system單元文件:
在 /usr/lib/systemd/system 目錄下建立 kafka.service 填寫如下內容:
[Unit] Description=Apache Kafka server (broker) Documentation=http://kafka.apache.org/documentation/ Requires=network.target remote-fs.target After=network.target remote-fs.target [Service] Type=simple Environment="LOG_DIR=/usr/local/kafka/logs" User=kafka Group=kafka #Environment=JAVA_HOME=/usr/java/jdk1.8.0_144 ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh Restart=on-failure SyslogIdentifier=kafka [Install] WantedBy=multi-user.target
啓動kafka實例服務器:
# systemctl daemon-reload # systemctl enable kafka.service # systemctl start kafka.service # systemctl status kafka.service
7.此外Kafka也能夠經過 Web UI 界面進行集羣的管理,你能夠參閱:
滴滴,好了,到目前爲止,kafka分佈式消息隊列已經部署完成,其中包括必要的優化信息。目前,您能夠用於大多數編程語言的Kafka客戶端去建立Kafka生產者和消費者,輕鬆地將其用於您的項目中。
參考資料:
http://kafka.apache.org/documentation.html#quickstart
https://www.ibm.com/developerworks/cn/opensource/os-cn-kafka/
https://tech.meituan.com/kafka-fs-design-theory.html
http://blog.jobbole.com/99195/
版權聲明:本站原創文章,歡迎任何形式的轉載。
轉載請註明:在Centos 7上安裝配置 Apche Kafka 分佈式消息系統集羣 | 任我樂