這邊就先不介紹消息隊列的優劣,主要列了一下它的三種核心的場景。node
解耦react
異步git
削峯github
點對點: Work Queueweb
發佈-訂閱:Publish/Subscribe算法
目前咱們項目應用到的場景:typescript
目前咱們使用RabbitMq, 主要使用點對點的消費模式。session
削峯 , 異步:架構
咱們這些場景若是用 Kafka
該如何實現?app
官網的描述是這幾句:
Apache Kafka® is a distributed streaming platform**. What exactly does that mean?**
A streaming platform has three key capabilities:
- Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
- Store streams of records in a fault-tolerant durable way.
- Process streams of records as they occur.
Kafka 是一個流處理平臺
一個流處理平臺有三個關鍵的特色:
Kafka is generally used for two broad classes of applications:
- Building real-time streaming data pipelines that reliably get data between systems or applications
- Building real-time streaming applications that transform or react to the streams of data
Kafka 主要應用在兩個類應用中
Producer: 生產者,發送信息的服務端
Consumer:消費者,訂閱消息的客戶端
Broker:消息中間件處理節點,一個Kafka節點就是一個broker,一個或者多個Broker能夠組成一個Kafka集羣
Topic: 主題,能夠理解成隊列
ConsumerGroup:消費者組,一個 ConsumerGoup 裏面包括多個 Consumer,每一個 ConsumerGoup 裏面只有一個 Consumer 能夠消費一個 Topic。基於這個特性,每一個 ConsumerGoup 裏面只存一個 Consumer 能夠實現廣播;全部 Consumer 都存在於同一個 ConsumerGoup 內則能夠實現單播。
Partition:基於 Kafka 的拓展性,有可能一個很大的 Topic 會存在於不一樣的 Broker 裏面。這時一個 Topic 裏面就會存在多個 Partition,Partition 是一個有序的隊列,Partition 上每一個消息會有一個順序的 id —— Offset。可是,值得注意的是,Kafka 會保證 Partition 的順序性,而沒有保證 Topic 的順序性。
Offset:Kafka 的存儲文件都是offset順序存儲的,以 offset.kafka 來命名。例如第一個就是 0000.kafka, 第 n 個文件就是 n-1.kafka
Zookeerper:管理多個 Kafka 節點,具備管理集羣配置的功能
單個消費者的實現,應用場景是隻有一個消費者節點 須要消費該消息。
圖例:
Producer:
// Producer.ts
import * as kafka from 'kafka-node'
const client = new kafka.KafkaClient({kafkaHost: 'localhost:9092'})
const producer = new kafka.HighLevelProducer(client)
producer.on('ready', function () {
console.log('Kafka Producer is connected and ready.')
})
// For this demo we just log producer errors to the console.
producer.on('error', function (error) {
console.error(error)
})
const sendRecord = (objData, cb) => {
const buffer = Buffer.from(JSON.stringify(objData))
// Create a new payload
const record = [
{
topic: 'webevents.dev',
messages: buffer,
attributes: 1 /* Use GZip compression for the payload */
}
]
// Send record to Kafka and log result/error
producer.send(record, cb)
}
let times = 0
setInterval(() => {
sendRecord({
msg: `this is message ${++times}!`
}, (err, data) => {
if (err) {
console.log(`err: ${err}`)
}
console.log(`data: ${JSON.stringify(data)}`)
})
}, 1000)
複製代碼
Consumer代碼:
// Consumer.ts
import * as kafka from 'kafka-node'
const client = new kafka.KafkaClient({kafkaHost: 'localhost:9092'})
const topics = [
{
topic: 'webevents.dev'
}
]
const options = {
autoCommit: true,
fetchMaxWaitMs: 1000,
fetchMaxBytes: 1024 * 1024
// encoding: 'buffer'
}
// { autoCommit: false, fetchMaxWaitMs: 1000, fetchMaxBytes: 1024 * 1024 };
const consumer = new kafka.Consumer(client, topics, options)
consumer.on('message', function (message) {
// Read string into a buffer.
console.info(`[message]:==:>${JSON.stringify(message)}`)
const buf = new Buffer(String(message.value), 'binary')
const decodedMessage = JSON.parse(buf.toString())
console.log('decodedMessage: ', decodedMessage)
})
consumer.on('error', function (err) {
console.log('error', err)
})
process.on('SIGINT', function () {
consumer.close(true, function () {
process.exit()
})
})
複製代碼
當個人服務是多節點,如何保證同一個消息只被其中一個節點消費呢。 這個時候就須要把每一個節點當作同一個 ConsumerGroup裏的不一樣 Consumer。
圖例:
Producer 同上
Consumer:
// Consumer1
import * as kafka from 'kafka-node'
const client = new kafka.KafkaClient({kafkaHost: 'localhost:9092'})
const offset = new kafka.Offset(client)
import * as bluebird from 'bluebird'
const consumerGoupOptions = {
kafkaHost: 'localhost:9092',
groupId: 'ExampleTestGroup',
sessionTimeout: 15000,
protocol: ['roundrobin'],
fromOffset: 'earliest' // equivalent of auto.offset.reset valid values are 'none', 'latest', 'earliest'
} as any
const consumer = new kafka.ConsumerGroup(Object.assign({id: 'consumer1'}, consumerGoupOptions), ['test'])
export default consumer
// 處理消息
consumer.on('message', async function (message) {
console.info('i am consumer1!')
// Read string into a buffer.
console.info(`[message]:==:>${JSON.stringify(message)}`)
// const buf = new Buffer(String(message.value), 'binary')
const decodedMessage = message // JSON.parse(buf.toString())
await bluebird.delay(1000)
console.log('decodedMessage: ', decodedMessage)
})
// 消息處理錯誤
consumer.on('error', function (err) {
console.log('error', err)
})
consumer.on('offsetOutOfRange', function (topic) {
console.info(`[offsetOutOfRange]:==:>${topic}`)
topic.maxNum = 2
offset.fetch([topic], function (err, offsets) {
if (err) {
return console.error(err)
}
let min = Math.min.apply(null, offsets[topic.topic][topic.partition])
consumer.setOffset(topic.topic, topic.partition, min)
})
})
process.on('SIGINT', function () {
consumer.close(true, function () {
console.log('consumer colse!')
process.exit()
})
})
複製代碼
// Consumer2
import * as kafka from 'kafka-node'
const client = new kafka.KafkaClient({kafkaHost: 'localhost:9092'})
const offset = new kafka.Offset(client)
import * as bluebird from 'bluebird'
const consumerGoupOptions = {
kafkaHost: 'localhost:9092',
groupId: 'ExampleTestGroup',
sessionTimeout: 15000,
protocol: ['roundrobin'],
fromOffset: 'earliest' // equivalent of auto.offset.reset valid values are 'none', 'latest', 'earliest'
} as any
const consumer = new kafka.ConsumerGroup(Object.assign({id: 'consumer2'}, consumerGoupOptions), ['test'])
export default consumer
// 處理消息
consumer.on('message', async function (message) {
console.info('i am consumer2!')
// Read string into a buffer.
console.info(`[message]:==:>${JSON.stringify(message)}`)
// const buf = new Buffer(String(message.value), 'binary')
const decodedMessage = message // JSON.parse(buf.toString())
await bluebird.delay(1000)
console.log('decodedMessage: ', decodedMessage)
})
// 消息處理錯誤
consumer.on('error', function (err) {
console.log('error', err)
})
consumer.on('offsetOutOfRange', function (topic) {
console.info(`[offsetOutOfRange]:==:>${topic}`)
topic.maxNum = 2
offset.fetch([topic], function (err, offsets) {
if (err) {
return console.error(err)
}
let min = Math.min.apply(null, offsets[topic.topic][topic.partition])
consumer.setOffset(topic.topic, topic.partition, min)
})
})
process.on('SIGINT', function () {
consumer.close(true, function () {
console.log('consumer colse!')
process.exit()
})
})
複製代碼
執行以後,發現了一個問題:同一個 ConsumerGroup 的不一樣 Consumer 沒有均勻消費數據, 會出現一段時間,只有一個 Consumer 消費, 而另外一個 Conumser 不消費的狀況。
爲何呢?
這裏就須要知道消費端的均衡算法
算法以下:
1.A=(partition數量/同分組消費者總個數) 2.M=對上面所獲得的A值小數點第一位向上取整 3.計算出該消費者拉取數據的patition合集:
Ci = [P(M*i ),P((i + 1) * M -1)]
Partition 數量爲 1 , 由於只有一個 broker
同分組消費者總個數: 2
A = 1 / 2
M = roundUp (A) = 1
C0 = [P(0), P(0
]`
C1 = [P(1), P(1)]
因此,若是不是 C0 消費者不可用, C1 一直都不會去消費 Partition0 裏面的消息
結論是,若是非多 Kafka 節點的話, 單純增長同一消費組裏的消費者, 並不能作到均衡消費數據的狀況。
有其餘方法能夠實現嗎?
有的, 咱們能夠從 Producer 裏面入手,分發消息時固定 Topic 對應 固定的消費者節點。
Producer:
// Producer
// ...
const sendRecord = (objData, cb) => {
const partition = Date.now() % 2 === 0 ? 0 : 1
const buffer = Buffer.from(JSON.stringify(objData) + '_' + partition)
// Create a new payload
const record = [
{
topic: `test${partition}`, // 這裏用了隨機方法分配 topic
messages: buffer,
attributes: 1, /* Use GZip compression for the payload */
key: `key_${partition}`
}
]
// Send record to Kafka and log result/error
console.info(`[record]:==:>${JSON.stringify(record)}`)
producer.send(record, cb)
}
// ...
複製代碼
Consumer:
// Consumer1
// ...
const consumer = new kafka.ConsumerGroup(Object.assign({id: 'consumer1'}, consumerGoupOptions), ['test0', 'test1']) // 這裏須要優先輸入 須要消費的 topic, 次要消費的 topic 也要寫上,以防另外一節點重啓時, 消息沒及時消費
// ...
複製代碼
// Consumer2
// ...
const consumer = new kafka.ConsumerGroup(Object.assign({id: 'consumer2'}, consumerGoupOptions), ['test1', 'test2']) // 這裏須要優先輸入 須要消費的 topic, 次要消費的 topic 也要寫上,以防另外一節點重啓時, 消息沒及時消費
// ...
複製代碼
Kafka
設計上:隊列消息不刪除,不一樣 ConsumerGroup均可以publish-subscribe,同一 ConsumerGroup 裏面只有一個 Consumer 能消費同一個 Topic
延遲消費:不支持: Consumer 開啓後, 會自動獲取 Producer 生產對應 Topic 的消息, 若想 Consumer 暫時不消費消息, 須要中斷 Consumer 的服務
負載均衡:從集羣上看, 即便其中一個 Broker 掛了,其餘 Broker上的 partition 都會存在副本集,kafka 仍然能夠正常運行。從 ConsumerGroup 上看,即便其中的Consumer 掛了, 同一 ConsumerGroup 的其餘 Consumer 仍然能夠消費其Topic 的消息,而不須要擔憂服務中斷。
實際上:Kafka 作點對點隊列,有點浪費。只用一個 ConsumerGroup,並無發揮 Kafka 的優點。可是 Kafka 這種很方便就能拓展成發佈-訂閱模式,消費端創建另一個 ConsumerGroup,就能夠爲另外一個服務啓用。