Kafka簡明教程

概述

Kafka 是一個分佈式消息隊列(MQ, Message queue)中間件,支持點對點(Quene)、發佈訂閱(Topic)模式。Kafka 的定位主要在日誌等方面,單擊吞吐量特別大, 由於Kafka 設計的初衷就是處理日誌的,能夠看作是一個日誌(消息)系統一個重要組件,針對性很強。php

使用場景:html

  • 網站活動跟蹤:根據不一樣的業務數據類型,將消息發佈到不一樣的 Topic。
  • 日誌聚合:能夠將多臺主機或應用的日誌數據抽象成一個個日誌或事件的消息流,異步發送到 Kafka 集羣。
  • 流計算處理:構建應用系統和分析系統的橋樑,並將它們之間的關聯解耦。
  • 數據中轉樞紐:利用 Kafka 做爲數據中轉樞紐,同份數據能夠被導入到不一樣專用系統中。

官網:http://kafka.apache.org/
中文站:http://kafka.apachecn.org/python

名稱: Kafka
所屬社區/公司:Apache
開發語言: Java
協議: 自行設計的協議,仿AMQP
事務:不支持
集羣:支持,依賴ZooKeeper

快速入門

官方的 quickstart 已經很是詳細了,按照文檔能夠一步一步的達到入門的效果。地址:http://kafka.apache.org/quickstartgit

這裏我記錄一下簡單的步驟,僅做爲測試使用,真實環境請參考官方文檔部署:
一、下載解壓:github

$ cd /opt
$ wget http://mirror.bit.edu.cn/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz
$ tar -xzf kafka_2.12-2.2.0.tgz
$ cd kafka_2.12-2.2.0

Kafka 依賴 ZooKeeper 。安裝包裏已經包含了 ZooKeeper。golang

二、啓動 ZooKeeper面試

$ bin/zookeeper-server-start.sh config/zookeeper.properties

# 限於篇幅,省略大部分輸出
...
[2019-05-11 13:15:44,643] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

若是須要後臺運行,請在命令後面追加&redis

三、啓動 Kafka Server端apache

$ bin/kafka-server-start.sh config/server.properties

# 限於篇幅,省略大部分輸出
...
[2019-05-11 13:18:34,578] INFO Kafka version: 2.2.0 (org.apache.kafka.common.utils.AppInfoParser)
[2019-05-11 13:18:34,578] INFO Kafka commitId: 05fcfde8f69b0349 (org.apache.kafka.common.utils.AppInfoParser)
[2019-05-11 13:18:34,579] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

若是須要後臺運行,請在命令後面追加&bootstrap

四、建立主題(Topic)
建立一個名爲 test 的主題,包含1個分區(partition),1個副本(replication-factor):

$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

建立完畢後能夠查看該主題:

$  bin/kafka-topics.sh --list --bootstrap-server localhost:9092

test

也能夠在配置裏設置爲在發佈不存在的主題時自動建立主題,而不是手動建立主題。這個後面再說明。

五、發佈消息
咱們新啓動一個命令行窗口充當生產者,向 Kafka 裏發送消息,指定主題爲 test

$ cd /opt/kafka_2.12-2.2.0/
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

>

而後命令行等待咱們輸入消息。咱們輸入 hello回車:

>hello
>

消息就發出去了。接下來咱們啓動消費者。

六、消費消息

咱們新啓動一個命令行窗口充當消費者來消費消息,指定主題爲 test

$ cd /opt/kafka_2.12-2.2.0/
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

hello

就消費了1條消息。咱們能夠在生產者命令行窗口繼續發生消息,消費者端能夠實時消費。

好了,基本的安裝測試就到這。關於設置kakfa集羣請參考:http://kafka.apache.org/quickstart#quickstart_multibroker

如何在項目裏使用

上一節僅演示了在命令行裏使用,能夠方便調試。對於在項目裏使用,須要藉助 SDK。這個頁面收錄了全部的客戶端:https://cwiki.apache.org/confluence/display/KAFKA/Clients

PHP

經常使用的SDK:

這裏以 kafka-php 爲例。

kafka-php 使用純粹的PHP 編寫的 kafka 客戶端,目前支持 0.8.x 以上版本的 Kafka。最新的kafka-php 版本是 v0.2.8 (截止到2019-05-11),詳見:https://github.com/weiboad/kafka-php/releaseskafka-phpv0.2.xv0.1.x 不兼容,若是使用原有的 v0.1.x 的能夠參照文檔 Kafka PHP v0.1.x Document, 不過建議切換到 v0.2.x 上。

kafka-php (v0.2.8) 環境要求:

  • PHP 版本大於 5.5
  • Kafka Server 版本大於 0.8.0
  • 消費模塊 Kafka Server 版本須要大於 0.9.0

一、發送消息,同步方式:

require '../vendor/autoload.php';
date_default_timezone_set('PRC');

// use Monolog\Logger;
// //use Monolog\Handler\StdoutHandler;
// Create the logger
// $logger = new Logger('my_logger');
// //Now add some handlers
// $logger->pushHandler(new StdoutHandler());

$config = \Kafka\ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('127.0.0.1:9192,127.0.0.1:9193');
$config->setBrokerVersion('0.10.2.1');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);
$config->setTimeout(2000);

$producer = new \Kafka\Producer();
// $producer->setLogger($logger);

for($i = 0; $i < 100; $i++) {
    $result = $producer->send(array(
        array(
            'topic' => 'test1',
            'value' => 'test1....message.',
            'key' => '',
        ),
    ));
    var_dump($result);
}

說明:
1) 設置 logger 不是必選的。可是若是須要調試,建議加上。若是沒有安裝Monolog,也能夠本身定一個 logger ,只要實現了 psr/log規範便可。
2) MetadataBrokerList支持集羣配置。使用英文逗號隔開便可。
3) BrokerVersion版本需與安裝的 kafka 版本一致。

二、消費消息

消費消息通常須要寫腳本常駐運行。能夠藉助 Supervisor 工具。

require '../vendor/autoload.php';
date_default_timezone_set('PRC');

// use Monolog\Logger;
// use Monolog\Handler\StdoutHandler;
// // Create the logger
// $logger = new Logger('my_logger');
// // Now add some handlers
// $logger->pushHandler(new StdoutHandler());

$config = \Kafka\ConsumerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('10.13.4.159:9192');
$config->setGroupId('test'); //消費者組
$config->setBrokerVersion('0.10.2.1');
$config->setTopics(['test']); //主題
//$config->setOffsetReset('earliest');
$consumer = new \Kafka\Consumer();

// $consumer->setLogger($logger);

$consumer->start(function($topic, $part, $message) {
    var_dump($message);
});

注意:
1) 消費者組能夠有多個,互相之間不影響。每一個消費者組均可以消費到完整的一份消息。
2) setOffsetReset的值有:earliest(從最先的開始消費)、latest(從最新的開始消費)。

Golang

Python

發送消息示例:

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
for _ in range(100):
    producer.send('test', b'some_message_bytes')

Kakfa 原理


(上圖爲Kakfa架構圖)

一個典型的消息隊列 Kafka 集羣包含:

  • Producer:經過 push 模式向消息隊列 Kafka Broker 發送消息,能夠是網站的頁面訪問、服務器日誌等,也能夠是 CPU 和內存相關的系統資源信息;
  • Kafka Broker:消息隊列 Kafka 的服務器,用於存儲消息;支持水平擴展,通常 Broker 節點數量越多,集羣吞吐率越高;
  • Consumer Group:經過 pull 模式從消息隊列 Kafka Broker 訂閱並消費消息;
  • Zookeeper:管理集羣的配置、選舉 leader,以及在 Consumer Group 發生變化時進行負載均衡。

幾個重要概念

  • Broker:消息隊列 Kafka 集羣包含一個或多個消息處理服務器,該服務器被稱爲 Broker。
  • Topic:主題。每條發佈到Kafka集羣的消息都有一個類別,這個類別被稱爲Topic。
  • Partition:分區。一個Topic中的消息數據按照多個分區組織,分區是kafka消息隊列組織的最小單位,一個分區能夠看做是一個FIFO( First Input First Output的縮寫,先入先出隊列)的隊列。
  • Producer: 消息發佈者,也稱爲消息生產者,負責生產併發送消息到 Kafka Broker。
  • Consumer: 消息訂閱者,也稱爲消息消費者,負責向 Kafka Broker 讀取消息並進行消費。
  • Consumer Group:消費者組。這類 Consumer 一般接收並消費同一類消息,且消費邏輯一致。Consumer Group 和 Topic 的關係是 N:N,同一個 Consumer Group 能夠訂閱多個 Topic,同一個 Topic 也能夠被多個 Consumer Group 訂閱。
  • Replication:副本。爲了保證分佈式可靠性,kafka0.8開始對每一個分區的數據進行備份(不一樣的Broker上),防止其中一個Broker宕機形成分區上的數據不可用。

分區、組、消費者的關係

消息隊列 Kafka 採用 Pub/Sub(發佈/訂閱)模型,其中:

  • Consumer Group 和 Topic 的關係是 N:N。 同一個 Consumer Group 能夠訂閱多個 Topic,同一個 Topic 也能夠同時被多個 Consumer Group 訂閱。
  • 同一 Topic 的一條消息只能被同一個 Consumer Group 內的任意一個 Consumer 消費,但多個 Consumer Group 可同時消費這一消息。

說明:
一、同一個分區(partition)內的消息只能被同一個組中的一個消費者(consumer)消費,當消費者數量多於分區的數量時,多餘的消費者空閒。
二、啓動多個組,則會使同一個消息被消費屢次。

詳細請看:https://www.jianshu.com/p/6233d5341dfe

組成結構

生產者消費者關係:

對於每個topic, Kafka集羣都會維持一個分區日誌,以下所示:

kafka分區是提升kafka性能的關鍵所在,當發現集羣性能不高時,經常使用手段就是增長Topic的分區,分區裏面的消息是按照重新到老的順序進行組織,消費者從隊列頭訂閱消息,生產者從隊列尾添加消息。

負載均衡

Kafka 負載消費的內部原理是,把訂閱的 Topic 的分區,平均分配給各個消費實例。所以,消費實例的個數不要大於分區的數量,不然會有實例分配不到任何分區而處於空跑狀態。這個負載均衡發生的時間,除了第一次啓動上線以外,後續消費實例發生重啓、增長、減小等變動時,都會觸發一次負載均衡。

配置

Kafka支持的配置很是多,這裏僅僅列出來部分關於 broker 的配置。broker 配置文件是config/server.properties

每一個kafka broker中配置文件默認必須配置的屬性以下:

broker.id=0  
port=9092
num.network.threads=2  
num.io.threads=8  
socket.send.buffer.bytes=1048576  
socket.receive.buffer.bytes=1048576  
socket.request.max.bytes=104857600  
log.dirs=/tmp/kafka-logs  
num.partitions=1
log.retention.hours=168  
log.segment.bytes=536870912  
log.retention.check.interval.ms=60000  
log.cleaner.enable=false  
zookeeper.connect=localhost:2181  
zookeeper.connection.timeout.ms=1000000

配置說明:

參數 默認值 描述
broker.id -1   用於服務的broker id。若是沒設置,將生成一個惟一broker id。爲了不ZooKeeper生成的id和用戶配置的broker id相沖突,生成的id將在reserved.broker.max.id的值基礎上加1。
port 9092 broker server服務端口。僅在未設置listeners時使用。
host.name broker的主機地址,如果設置了,那麼會綁定到這個地址上,如果沒有,會綁定到全部的接口上,並將其中之一發送到ZK。僅在未設置listeners時使用。
log.dirs /tmp/kafka-logs kafka數據的存放地址,多個地址的話用逗號分割,多個目錄分佈在不一樣磁盤上能夠提升讀寫性能 /data/kafka-logs-1,/data/kafka-logs-2
message.max.bytes 1000012 表示消息體的最大大小,單位是字節
num.network.threads 3 broker處理消息的最大線程數,通常狀況下數量爲cpu核數
num.io.threads 8 處理IO的線程數
log.flush.interval.messages Long.MaxValue 在數據被寫入到硬盤和消費者可用前最大累積的消息的數量
log.flush.interval.ms Long.MaxValue 在數據被寫入到硬盤前的最大時間
log.flush.scheduler.interval.ms Long.MaxValue 檢查數據是否要寫入到硬盤的時間間隔。
log.retention.hours 168 控制一個log保留多長個小時
log.retention.bytes -1 控制log文件最大尺寸
log.cleaner.enable false 是否log cleaning
log.cleanup.policy delete   delete仍是compat.
log.segment.bytes 1073741824 單一的log segment文件大小
log.roll.hours 168 開始一個新的log文件片斷的最大時間
background.threads 10 後臺線程序
num.partitions 1 默認分區數
socket.send.buffer.bytes 102400 socket SO_SNDBUFF參數
socket.receive.buffer.bytes 102400 socket SO_RCVBUFF參數
zookeeper.connect 指定zookeeper鏈接字符串, 格式如hostname:port/chroot。chroot是一個namespace
zookeeper.connection.timeout.ms 6000 指定客戶端鏈接zookeeper的最大超時時間
zookeeper.session.timeout.ms    6000 鏈接zk的session超時時間
zookeeper.sync.time.ms 2000 zk follower落後於zk leader的最長時間
auto.create.topics.enable true 是否容許在服務器上自動建立topic

更多配置查看官方文檔:http://kafka.apache.org/documentation.html#configuration

經常使用命令

  • 啓動zookeeper
$ bin/zookeeper-server-start.sh config/zookeeper.properties &
  • 關閉zookeeper
$ bin/zookeeper-server-stop.sh
  • 啓動kafka
$ bin/kafka-server-start.sh config/server.properties &
  • 關閉kafka
$ bin/kafka-server-stop.sh
  • 建立topic
$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
  • 查看全部topic
$ bin/kafka-topics.sh --list --bootstrap-server localhost:9092
  • 查看某個topic具體信息
$ bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test
  • 刪除topic (可直接刪除的前提:delete.topic.enable=true)
$ bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic test
  • 生產消息
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
  • 消費消息
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

Kafka監控工具

https://github.com/Morningstar/kafka-offset-monitor

消息隊列比較

消息隊列主要是解決了應用解耦、異常處理、流量削鋒等問題。常見的消息隊列還有:ActiveMQRabbitMQRocketMQZeroMQMetaMQ 等等。固然,咱們也可使用Redis做爲簡單的消息隊列使用。

消息隊列對比參考:

(圖片來源於互聯網)

參考

一、Apache Kafka
http://kafka.apache.org/documentation/
二、消息隊列Kafka、RocketMQ、RabbitMQ的優劣勢比較 - 知乎
https://zhuanlan.zhihu.com/p/60288391
三、weiboad/kafka-php: kafka php client
https://github.com/weiboad/kafka-php
四、kafka中partition和消費者對應關係 - 簡書
https://www.jianshu.com/p/6233d5341dfe
五、kafka經常使用的命令 - 隨筆 - SegmentFault 思否
http://www.javashuo.com/article/p-afrncoki-bk.html
六、消息中間件部署及比較:rabbitMQ、activeMQ、zeroMQ、rocketMQ、Kafka、redis - 掘金
http://www.javashuo.com/article/p-azrmvmrq-gv.html
七、面試官問分佈式技術面試題,一臉懵逼怎麼辦?_ITPUB博客
http://blog.itpub.net/69917606/viewspace-2642545/
八、產品架構_產品簡介_消息隊列 Kafka-阿里雲
https://help.aliyun.com/document_detail/68152.html?spm=a2c4g.11186623.6.543.3ba272e4cAMqaH

相關文章
相關標籤/搜索