Kafka
是一個分佈式消息隊列(MQ, Message queue)中間件,支持點對點(Quene)、發佈訂閱(Topic)模式。Kafka 的定位主要在日誌等方面,單擊吞吐量特別大, 由於Kafka 設計的初衷就是處理日誌的,能夠看作是一個日誌(消息)系統一個重要組件,針對性很強。php
使用場景:html
官網:http://kafka.apache.org/
中文站:http://kafka.apachecn.org/python
名稱: Kafka 所屬社區/公司:Apache 開發語言: Java 協議: 自行設計的協議,仿AMQP 事務:不支持 集羣:支持,依賴ZooKeeper
官方的 quickstart 已經很是詳細了,按照文檔能夠一步一步的達到入門的效果。地址:http://kafka.apache.org/quickstartgit
這裏我記錄一下簡單的步驟,僅做爲測試使用,真實環境請參考官方文檔部署:
一、下載解壓:github
$ cd /opt $ wget http://mirror.bit.edu.cn/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz $ tar -xzf kafka_2.12-2.2.0.tgz $ cd kafka_2.12-2.2.0
Kafka 依賴 ZooKeeper 。安裝包裏已經包含了 ZooKeeper。golang
二、啓動 ZooKeeper面試
$ bin/zookeeper-server-start.sh config/zookeeper.properties # 限於篇幅,省略大部分輸出 ... [2019-05-11 13:15:44,643] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
若是須要後臺運行,請在命令後面追加
&
。redis
三、啓動 Kafka Server端apache
$ bin/kafka-server-start.sh config/server.properties # 限於篇幅,省略大部分輸出 ... [2019-05-11 13:18:34,578] INFO Kafka version: 2.2.0 (org.apache.kafka.common.utils.AppInfoParser) [2019-05-11 13:18:34,578] INFO Kafka commitId: 05fcfde8f69b0349 (org.apache.kafka.common.utils.AppInfoParser) [2019-05-11 13:18:34,579] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
若是須要後臺運行,請在命令後面追加
&
。bootstrap
四、建立主題(Topic)
建立一個名爲 test 的主題,包含1個分區(partition),1個副本(replication-factor):
$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
建立完畢後能夠查看該主題:
$ bin/kafka-topics.sh --list --bootstrap-server localhost:9092 test
也能夠在配置裏設置爲在發佈不存在的主題時自動建立主題,而不是手動建立主題。這個後面再說明。
五、發佈消息
咱們新啓動一個命令行窗口充當生產者,向 Kafka 裏發送消息,指定主題爲 test
:
$ cd /opt/kafka_2.12-2.2.0/ $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test >
而後命令行等待咱們輸入消息。咱們輸入 hello
回車:
>hello >
消息就發出去了。接下來咱們啓動消費者。
六、消費消息
咱們新啓動一個命令行窗口充當消費者來消費消息,指定主題爲 test
:
$ cd /opt/kafka_2.12-2.2.0/ $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning hello
就消費了1條消息。咱們能夠在生產者命令行窗口繼續發生消息,消費者端能夠實時消費。
好了,基本的安裝測試就到這。關於設置kakfa集羣請參考:http://kafka.apache.org/quickstart#quickstart_multibroker
上一節僅演示了在命令行裏使用,能夠方便調試。對於在項目裏使用,須要藉助 SDK。這個頁面收錄了全部的客戶端:https://cwiki.apache.org/confluence/display/KAFKA/Clients
經常使用的SDK:
這裏以 kafka-php
爲例。
kafka-php
使用純粹的PHP 編寫的 kafka 客戶端,目前支持 0.8.x 以上版本的 Kafka。最新的kafka-php
版本是v0.2.8
(截止到2019-05-11),詳見:https://github.com/weiboad/kafka-php/releases 。kafka-php
的v0.2.x
和v0.1.x
不兼容,若是使用原有的v0.1.x
的能夠參照文檔 Kafka PHP v0.1.x Document, 不過建議切換到v0.2.x
上。
kafka-php
(v0.2.8
) 環境要求:
一、發送消息,同步方式:
require '../vendor/autoload.php'; date_default_timezone_set('PRC'); // use Monolog\Logger; // //use Monolog\Handler\StdoutHandler; // Create the logger // $logger = new Logger('my_logger'); // //Now add some handlers // $logger->pushHandler(new StdoutHandler()); $config = \Kafka\ProducerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('127.0.0.1:9192,127.0.0.1:9193'); $config->setBrokerVersion('0.10.2.1'); $config->setRequiredAck(1); $config->setIsAsyn(false); $config->setProduceInterval(500); $config->setTimeout(2000); $producer = new \Kafka\Producer(); // $producer->setLogger($logger); for($i = 0; $i < 100; $i++) { $result = $producer->send(array( array( 'topic' => 'test1', 'value' => 'test1....message.', 'key' => '', ), )); var_dump($result); }
說明:
1) 設置 logger
不是必選的。可是若是須要調試,建議加上。若是沒有安裝Monolog
,也能夠本身定一個 logger ,只要實現了 psr/log
規範便可。
2) MetadataBrokerList
支持集羣配置。使用英文逗號隔開便可。
3) BrokerVersion
版本需與安裝的 kafka 版本一致。
二、消費消息
消費消息通常須要寫腳本常駐運行。能夠藉助 Supervisor 工具。
require '../vendor/autoload.php'; date_default_timezone_set('PRC'); // use Monolog\Logger; // use Monolog\Handler\StdoutHandler; // // Create the logger // $logger = new Logger('my_logger'); // // Now add some handlers // $logger->pushHandler(new StdoutHandler()); $config = \Kafka\ConsumerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('10.13.4.159:9192'); $config->setGroupId('test'); //消費者組 $config->setBrokerVersion('0.10.2.1'); $config->setTopics(['test']); //主題 //$config->setOffsetReset('earliest'); $consumer = new \Kafka\Consumer(); // $consumer->setLogger($logger); $consumer->start(function($topic, $part, $message) { var_dump($message); });
注意:
1) 消費者組能夠有多個,互相之間不影響。每一個消費者組均可以消費到完整的一份消息。
2) setOffsetReset
的值有:earliest
(從最先的開始消費)、latest
(從最新的開始消費)。
發送消息示例:
from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='localhost:9092') for _ in range(100): producer.send('test', b'some_message_bytes')
(上圖爲Kakfa架構圖)
一個典型的消息隊列 Kafka 集羣包含:
Producer
:經過 push 模式向消息隊列 Kafka Broker 發送消息,能夠是網站的頁面訪問、服務器日誌等,也能夠是 CPU 和內存相關的系統資源信息;Kafka Broker
:消息隊列 Kafka 的服務器,用於存儲消息;支持水平擴展,通常 Broker 節點數量越多,集羣吞吐率越高;Consumer Group
:經過 pull 模式從消息隊列 Kafka Broker 訂閱並消費消息;Zookeeper
:管理集羣的配置、選舉 leader,以及在 Consumer Group 發生變化時進行負載均衡。Broker
:消息隊列 Kafka 集羣包含一個或多個消息處理服務器,該服務器被稱爲 Broker。Topic
:主題。每條發佈到Kafka集羣的消息都有一個類別,這個類別被稱爲Topic。Partition
:分區。一個Topic中的消息數據按照多個分區組織,分區是kafka消息隊列組織的最小單位,一個分區能夠看做是一個FIFO
( First Input First Output的縮寫,先入先出隊列)的隊列。Producer
: 消息發佈者,也稱爲消息生產者,負責生產併發送消息到 Kafka Broker。Consumer
: 消息訂閱者,也稱爲消息消費者,負責向 Kafka Broker 讀取消息並進行消費。Consumer Group
:消費者組。這類 Consumer 一般接收並消費同一類消息,且消費邏輯一致。Consumer Group 和 Topic 的關係是 N:N,同一個 Consumer Group 能夠訂閱多個 Topic,同一個 Topic 也能夠被多個 Consumer Group 訂閱。Replication
:副本。爲了保證分佈式可靠性,kafka0.8開始對每一個分區的數據進行備份(不一樣的Broker上),防止其中一個Broker宕機形成分區上的數據不可用。消息隊列 Kafka 採用 Pub/Sub
(發佈/訂閱)模型,其中:
說明:
一、同一個分區(partition)內的消息只能被同一個組中的一個消費者(consumer)消費,當消費者數量多於分區的數量時,多餘的消費者空閒。
二、啓動多個組,則會使同一個消息被消費屢次。
詳細請看:https://www.jianshu.com/p/6233d5341dfe
生產者消費者關係:
對於每個topic, Kafka集羣都會維持一個分區日誌,以下所示:
kafka分區是提升kafka性能的關鍵所在,當發現集羣性能不高時,經常使用手段就是增長Topic的分區,分區裏面的消息是按照重新到老的順序進行組織,消費者從隊列頭訂閱消息,生產者從隊列尾添加消息。
Kafka 負載消費的內部原理是,把訂閱的 Topic 的分區,平均分配給各個消費實例。所以,消費實例的個數不要大於分區的數量,不然會有實例分配不到任何分區而處於空跑狀態。這個負載均衡發生的時間,除了第一次啓動上線以外,後續消費實例發生重啓、增長、減小等變動時,都會觸發一次負載均衡。
Kafka支持的配置很是多,這裏僅僅列出來部分關於 broker 的配置。broker 配置文件是config/server.properties
。
每一個kafka broker中配置文件默認必須配置的屬性以下:
broker.id=0 port=9092 num.network.threads=2 num.io.threads=8 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=1 log.retention.hours=168 log.segment.bytes=536870912 log.retention.check.interval.ms=60000 log.cleaner.enable=false zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=1000000
配置說明:
參數 | 默認值 | 描述 |
---|---|---|
broker.id | -1 | 用於服務的broker id。若是沒設置,將生成一個惟一broker id。爲了不ZooKeeper生成的id和用戶配置的broker id相沖突,生成的id將在reserved.broker.max.id的值基礎上加1。 |
port | 9092 | broker server服務端口。僅在未設置listeners 時使用。 |
host.name | broker的主機地址,如果設置了,那麼會綁定到這個地址上,如果沒有,會綁定到全部的接口上,並將其中之一發送到ZK。僅在未設置listeners 時使用。 |
|
log.dirs | /tmp/kafka-logs | kafka數據的存放地址,多個地址的話用逗號分割,多個目錄分佈在不一樣磁盤上能夠提升讀寫性能 /data/kafka-logs-1,/data/kafka-logs-2 |
message.max.bytes | 1000012 | 表示消息體的最大大小,單位是字節 |
num.network.threads | 3 | broker處理消息的最大線程數,通常狀況下數量爲cpu核數 |
num.io.threads | 8 | 處理IO的線程數 |
log.flush.interval.messages | Long.MaxValue | 在數據被寫入到硬盤和消費者可用前最大累積的消息的數量 |
log.flush.interval.ms | Long.MaxValue | 在數據被寫入到硬盤前的最大時間 |
log.flush.scheduler.interval.ms | Long.MaxValue | 檢查數據是否要寫入到硬盤的時間間隔。 |
log.retention.hours | 168 | 控制一個log保留多長個小時 |
log.retention.bytes | -1 | 控制log文件最大尺寸 |
log.cleaner.enable | false | 是否log cleaning |
log.cleanup.policy | delete | delete仍是compat. |
log.segment.bytes | 1073741824 | 單一的log segment文件大小 |
log.roll.hours | 168 | 開始一個新的log文件片斷的最大時間 |
background.threads | 10 | 後臺線程序 |
num.partitions | 1 | 默認分區數 |
socket.send.buffer.bytes | 102400 | socket SO_SNDBUFF參數 |
socket.receive.buffer.bytes | 102400 | socket SO_RCVBUFF參數 |
zookeeper.connect | 指定zookeeper鏈接字符串, 格式如hostname:port/chroot。chroot是一個namespace | |
zookeeper.connection.timeout.ms | 6000 | 指定客戶端鏈接zookeeper的最大超時時間 |
zookeeper.session.timeout.ms | 6000 | 鏈接zk的session超時時間 |
zookeeper.sync.time.ms | 2000 | zk follower落後於zk leader的最長時間 |
auto.create.topics.enable | true | 是否容許在服務器上自動建立topic |
更多配置查看官方文檔:http://kafka.apache.org/documentation.html#configuration
$ bin/zookeeper-server-start.sh config/zookeeper.properties &
$ bin/zookeeper-server-stop.sh
$ bin/kafka-server-start.sh config/server.properties &
$ bin/kafka-server-stop.sh
$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
$ bin/kafka-topics.sh --list --bootstrap-server localhost:9092
$ bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test
$ bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic test
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
https://github.com/Morningstar/kafka-offset-monitor
消息隊列主要是解決了應用解耦、異常處理、流量削鋒等問題。常見的消息隊列還有:
ActiveMQ
、RabbitMQ
、RocketMQ
、ZeroMQ
、MetaMQ
等等。固然,咱們也可使用Redis
做爲簡單的消息隊列使用。
消息隊列對比參考:
(圖片來源於互聯網)
一、Apache Kafka
http://kafka.apache.org/documentation/
二、消息隊列Kafka、RocketMQ、RabbitMQ的優劣勢比較 - 知乎
https://zhuanlan.zhihu.com/p/60288391
三、weiboad/kafka-php: kafka php client
https://github.com/weiboad/kafka-php
四、kafka中partition和消費者對應關係 - 簡書
https://www.jianshu.com/p/6233d5341dfe
五、kafka經常使用的命令 - 隨筆 - SegmentFault 思否
http://www.javashuo.com/article/p-afrncoki-bk.html
六、消息中間件部署及比較:rabbitMQ、activeMQ、zeroMQ、rocketMQ、Kafka、redis - 掘金
http://www.javashuo.com/article/p-azrmvmrq-gv.html
七、面試官問分佈式技術面試題,一臉懵逼怎麼辦?_ITPUB博客
http://blog.itpub.net/69917606/viewspace-2642545/
八、產品架構_產品簡介_消息隊列 Kafka-阿里雲
https://help.aliyun.com/document_detail/68152.html?spm=a2c4g.11186623.6.543.3ba272e4cAMqaH