Kafka 是一種高吞吐的分佈式發佈訂閱消息系統,可以替代傳統的消息隊列用於解耦合數據處理,緩存未處理消息等,同時具備更高的吞吐率,支持分區、多副本、冗餘,所以被普遍用於大規模消息數據處理應用。Kafka 支持Java 及多種其它語言客戶端,可與Hadoop、Storm、Spark等其它大數據工具結合使用。node
Zookeeper集羣: 192.168.252.121:2181,192.168.252.122:2181,192.168.252.123:2181git
kafka 集羣: 192.168.252.124 , 192.168.252.125 , 192.168.252.126 github
kafka-manager: 192.168.252.127apache
CentOs7.3 修改主機名bootstrap
CentOs7.3 ssh 免密登陸segmentfault
CentOs7.3 搭建 ZooKeeper-3.4.9 Cluster 集羣服務服務器
Zookeeper集羣: 192.168.252.121:2181,192.168.252.122:2181,192.168.252.123:2181app
主機名依次被我修改爲: node1,node2,node3框架
kafka 集羣: 192.168.252.124 , 192.168.252.125 , 192.168.252.126
主機名依次被我修改爲: node4,node5,node6
kafka 官網下載 http://kafka.apache.org/downloads
下載最新版本的kafka ,我在北京我就選擇,清華鏡像比較快
清華鏡像:https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/
阿里鏡像:https://mirrors.aliyun.com/apache/kafka/
$ cd /opt $ wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/0.11.0.0/kafka_2.12-0.11.0.0.tgz $ tar -zxvf kafka_2.12-0.11.0.0.tgz $ cd kafka_2.12-0.11.0.0
在 node4 操做
$ vi /opt/kafka_2.12-0.11.0.0/config/server.properties
broker.id=0 每臺服務器不能重複 #設置zookeeper的集羣地址 zookeeper.connect=192.168.252.121:2181,192.168.252.122:2181,192.168.252.123:2181
把配置複製到 node5,node6 集羣
$ for a in {5..6} ; do scp -r /opt/kafka_2.12-0.11.0.0/ node$a:/opt/kafka_2.12-0.11.0.0 ; done
修改 node5,node6 集羣的broker.id
$ vi /opt/kafka_2.12-0.11.0.0/config/server.properties
在 node1,啓動 Kafka使用的 ZooKeeper,因此先啓動ZooKeeper服務器
$ for a in {1..3} ; do ssh node$a "source /etc/profile; /opt/zookeeper-3.4.9/bin/zkServer.sh start" ; done
如今 node4 啓動Kafka服務器
$ for a in {4..6} ; do ssh node$a "source /etc/profile; /opt/kafka_2.12-0.11.0.0/bin/kafka-server-start.sh /opt/kafka_2.12-0.11.0.0/config/server.properties" ; done
或者後臺啓動運行,日誌查看去Kafka解壓目錄有個log
文件夾查看
$ for a in {4..6} ; do ssh node$a "source /etc/profile; nohup /opt/kafka_2.12-0.11.0.0/bin/kafka-server-start.sh /opt/kafka_2.12-0.11.0.0/config/server.properties > /dev/null 2>&1 &" ; done
查看進程,Kafka 是否啓動成功
$ jps 3825 Kafka 6360 Jps
若是報錯刪除
kafka.common.KafkaException: Failed to acquire lock on file .lock in /tmp/kafka-logs. A Kafka instance in another process or thread is using this directory. $ rm -rf /tmp/kafka-logs
$ /opt/kafka_2.12-0.11.0.0/bin/kafka-topics.sh --create --zookeeper 192.168.252.121:2181,192.168.252.122:2181,192.168.252.123:2181 --replication-factor 2 --partitions 1 --topic ymq
--replication-factor 2 #複製兩份
--partitions 1 #建立1個分區
--topic #主題爲ymq
運行list topic命令,能夠看到該主題:
$ /opt/kafka_2.12-0.11.0.0/bin/kafka-topics.sh --list --zookeeper 192.168.252.121:2181,192.168.252.122:2181,192.168.252.123:2181
Kafka附帶一個命令行客戶端,它將從文件或標準輸入中輸入,並將其做爲消息發送到Kafka羣集。默認狀況下,每行將做爲單獨的消息發送。
在 node5 運行生產者,而後在控制檯中輸入一些消息以發送到服務器。
$ /opt/kafka_2.12-0.11.0.0/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic ymq >www.ymq.io
在node6 運行消費者,將把消息轉儲到標準輸出。
$ /opt/kafka_2.12-0.11.0.0/bin/kafka-console-consumer.sh --zookeeper 192.168.252.121:2181,192.168.252.122:2181,192.168.252.123:2181 --topic ymq --from-beginning Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]. www.ymq.io
用describe 查看集羣中topic每一個節點狀況
$ /opt/kafka_2.12-0.11.0.0/bin/kafka-topics.sh --describe --zookeeper 192.168.252.121:2181,192.168.252.122:2181,192.168.252.123:2181 --topic ymq Topic:ymq PartitionCount:1 ReplicationFactor:2 Configs: Topic: ymq Partition: 0 Leader: 1 Replicas: 1,3 Isr: 3,1
如下是輸出的說明。第一行給出了全部分區的摘要,每一個附加行提供有關一個分區的信息。因爲咱們這個主題只有一個分區,只有一行。
leader
負責給定分區的讀取和寫入分配節點編號,每一個分區的部分數據會隨機指定不一樣的節點replicas
是複製此分區的日誌的節點列表isr
一組正在同步的副本列表
$ /opt/kafka_2.12-0.11.0.0/bin/kafka-topics.sh --delete --zookeeper 192.168.252.121:2181,192.168.252.122:2181,192.168.252.123:2181 --topic ymq Topic ymq is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.
$ for a in {4..6} ; do ssh node$a "source /etc/profile; /opt/kafka_2.12-0.11.0.0/bin/kafka-server-stop.sh /opt/kafka_2.12-0.11.0.0/config/server.properties " ; done
Yahoo開源Kafka集羣管理器Kafka Manager
做爲一個分佈式的消息發佈-訂閱系統,Apache Kafka在Yahoo內部已經被不少團隊所使用,例如媒體分析團隊就將其應用到了實時分析流水線中,同時,Yahoo整個Kafka集羣處理的峯值帶寬超過了20Gbps(壓縮數據)。爲了讓開發者和服務工程師可以更加簡單地維護Kafka集羣,Yahoo構建了一個基於Web的管理工具,稱爲Kafka Manager,日前該項目已經在GitHub上開源。
經過Kafka Manager用戶可以更容易地發現集羣中哪些主題或者分區分佈不均勻,同時可以管理多個集羣,可以更容易地檢查集羣的狀態,可以建立主題,執行首選的副本選擇,可以基於集羣當前的狀態生成分區分配,並基於生成的分配執行分區的重分配,此外,Kafka Manager仍是一個很是好的能夠快速查看集羣狀態的工具。
Kafka Manager使用Scala語言編寫,其Web控制檯基於Play Framework實現,除此以外,Yahoo還遷移了一些Apache Kafka的幫助程序以便可以與Apache Curator框架一塊兒工做。
1、它支持如下內容:
在 kafka-manager: 192.168.252.127 node7 部署
編譯超級慢
$ yum install git $ cd /opt/ $ git clone https://github.com/yahoo/kafka-manager $ cd kafka-manager/ $ ./sbt clean dist
反正我是沒編譯成功,從網上找了一個編譯好的
連接: 百度網盤下載 密碼: kha2
$ yum install unzip $ unzip kafka-manager-1.3.2.1.zip $ vi /opt/kafka-manager-1.3.2.1/conf/application.conf
修改這個 zk 地址
kafka-manager.zkhosts="192.168.252.121:2181,192.168.252.122:2181,192.168.252.123:2181"
默認端口 NettyServer - Listening for HTTP on /0:0:0:0:0:0:0:0:9000
$ /opt/kafka-manager-1.3.2.1/bin/kafka-manager -Dconfig.file=conf/application.conf
或者後臺運行 而且配置端口
$ nohup bin/kafka-manager -Dconfig.file=/home/hadoop/app/kafka-manager-1.3.2.1/conf/application.conf -Dhttp.port=9000 &
訪問: http://ip:9000
圖片描述
圖片描述