消息隊列Kafka Nodejs 的使用

一. 消息隊列

(一) 使用場景:

這邊就先不介紹消息隊列的優劣,主要列了一下它的三種核心的場景。node

  1. 解耦react

  2. 異步git

  3. 削峯github

(二) 消費方式:

  1. 點對點: Work Queueweb

  2. 發佈-訂閱:Publish/Subscribe算法

目前咱們項目應用到的場景:typescript

目前咱們使用RabbitMq, 主要使用點對點的消費模式。session

削峯 , 異步:架構

咱們這些場景若是用 Kafka 該如何實現?app

二. Kafka

(一) 簡介

官網的描述是這幾句:

Apache Kafka® is a distributed streaming platform**. What exactly does that mean?**

A streaming platform has three key capabilities:

  • Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
  • Store streams of records in a fault-tolerant durable way.
  • Process streams of records as they occur.

Kafka 是一個流處理平臺

一個流處理平臺有三個關鍵的特色:

  1. 發佈&訂閱流式數據,相似於消息隊列或企業消息傳遞系統。
  2. 在高容錯方式下保存流式數據
  3. 當數據流產生時實時進行處理

Kafka is generally used for two broad classes of applications:

  • Building real-time streaming data pipelines that reliably get data between systems or applications
  • Building real-time streaming applications that transform or react to the streams of data

Kafka 主要應用在兩個類應用中

  1. 構建可在系統或應用程序以前構建可靠獲取數據的實時數據流管道
  2. 構建一個轉換或響應數據流的實時數據流應用程序

(二) kafka 架構圖

轉載自知乎<https://zhuanlan.zhihu.com/p/58836260>

(三)名詞

Producer: 生產者,發送信息的服務端

Consumer:消費者,訂閱消息的客戶端

Broker:消息中間件處理節點,一個Kafka節點就是一個broker,一個或者多個Broker能夠組成一個Kafka集羣

Topic: 主題,能夠理解成隊列

ConsumerGroup:消費者組,一個 ConsumerGoup 裏面包括多個 Consumer,每一個 ConsumerGoup 裏面只有一個 Consumer 能夠消費一個 Topic。基於這個特性,每一個 ConsumerGoup 裏面只存一個 Consumer 能夠實現廣播;全部 Consumer 都存在於同一個 ConsumerGoup 內則能夠實現單播。

Partition:基於 Kafka 的拓展性,有可能一個很大的 Topic 會存在於不一樣的 Broker 裏面。這時一個 Topic 裏面就會存在多個 Partition,Partition 是一個有序的隊列,Partition 上每一個消息會有一個順序的 id —— Offset。可是,值得注意的是,Kafka 會保證 Partition 的順序性,而沒有保證 Topic 的順序性。

Offset:Kafka 的存儲文件都是offset順序存儲的,以 offset.kafka 來命名。例如第一個就是 0000.kafka, 第 n 個文件就是 n-1.kafka

Zookeerper:管理多個 Kafka 節點,具備管理集羣配置的功能

三. Kafka Nodejs 實現

(一)消費方式:點對點

  1. 單個消費者的實現,應用場景是隻有一個消費者節點 須要消費該消息。

    圖例:

    Producer:

// Producer.ts

import * as kafka from 'kafka-node'

const client = new kafka.KafkaClient({kafkaHost: 'localhost:9092'})

const producer = new kafka.HighLevelProducer(client)
producer.on('ready', function () {
  console.log('Kafka Producer is connected and ready.')
})

// For this demo we just log producer errors to the console.
producer.on('error', function (error) {
  console.error(error)
})

const sendRecord = (objData, cb) => {
  const buffer = Buffer.from(JSON.stringify(objData))

  // Create a new payload
  const record = [
    {
      topic: 'webevents.dev',
      messages: buffer,
      attributes: 1 /* Use GZip compression for the payload */
    }
  ]

  // Send record to Kafka and log result/error
  producer.send(record, cb)
}

let times = 0

setInterval(() => {
  sendRecord({
    msg: `this is message ${++times}!`
  }, (err, data) => {
    if (err) {
      console.log(`err: ${err}`)
    }
    console.log(`data: ${JSON.stringify(data)}`)
  })
}, 1000)

複製代碼

​ Consumer代碼:

// Consumer.ts
import * as kafka from 'kafka-node'

const client = new kafka.KafkaClient({kafkaHost: 'localhost:9092'})

const topics = [
  {
    topic: 'webevents.dev'
  }
]
const options = {
  autoCommit: true,
  fetchMaxWaitMs: 1000,
  fetchMaxBytes: 1024 * 1024
  // encoding: 'buffer'
}
// { autoCommit: false, fetchMaxWaitMs: 1000, fetchMaxBytes: 1024 * 1024 };

const consumer = new kafka.Consumer(client, topics, options)

consumer.on('message', function (message) {

  // Read string into a buffer.
  console.info(`[message]:==:>${JSON.stringify(message)}`)
  const buf = new Buffer(String(message.value), 'binary')
  const decodedMessage = JSON.parse(buf.toString())

  console.log('decodedMessage: ', decodedMessage)
})

consumer.on('error', function (err) {
  console.log('error', err)
})

process.on('SIGINT', function () {
  consumer.close(true, function () {
    process.exit()
  })
})

複製代碼
  1. 當個人服務是多節點,如何保證同一個消息只被其中一個節點消費呢。 這個時候就須要把每一個節點當作同一個 ConsumerGroup裏的不一樣 Consumer。

    圖例:

    Producer 同上

    Consumer:

    // Consumer1
    import * as kafka from 'kafka-node'
    
    const client = new kafka.KafkaClient({kafkaHost: 'localhost:9092'})
    const offset = new kafka.Offset(client)
    import * as bluebird from 'bluebird'
    
    const consumerGoupOptions = {
      kafkaHost: 'localhost:9092',
      groupId: 'ExampleTestGroup',
      sessionTimeout: 15000,
      protocol: ['roundrobin'],
      fromOffset: 'earliest' // equivalent of auto.offset.reset valid values are 'none', 'latest', 'earliest'
    } as any
    const consumer = new kafka.ConsumerGroup(Object.assign({id: 'consumer1'}, consumerGoupOptions),  ['test'])
    export default consumer
    
    // 處理消息
    consumer.on('message', async function (message) {
    	console.info('i am consumer1!')
      // Read string into a buffer.
      console.info(`[message]:==:>${JSON.stringify(message)}`)
      // const buf = new Buffer(String(message.value), 'binary')
      const decodedMessage = message // JSON.parse(buf.toString())
    
      await bluebird.delay(1000)
      console.log('decodedMessage: ', decodedMessage)
    })
    
    // 消息處理錯誤
    consumer.on('error', function (err) {
      console.log('error', err)
    })
    
    consumer.on('offsetOutOfRange', function (topic) {
      console.info(`[offsetOutOfRange]:==:>${topic}`)
      topic.maxNum = 2
      offset.fetch([topic], function (err, offsets) {
        if (err) {
          return console.error(err)
        }
        let min = Math.min.apply(null, offsets[topic.topic][topic.partition])
        consumer.setOffset(topic.topic, topic.partition, min)
      })
    })
    
    process.on('SIGINT', function () {
      consumer.close(true, function () {
        console.log('consumer colse!')
        process.exit()
      })
    })
    
    複製代碼
    // Consumer2
    import * as kafka from 'kafka-node'
    
    const client = new kafka.KafkaClient({kafkaHost: 'localhost:9092'})
    const offset = new kafka.Offset(client)
    import * as bluebird from 'bluebird'
    
    const consumerGoupOptions = {
      kafkaHost: 'localhost:9092',
      groupId: 'ExampleTestGroup',
      sessionTimeout: 15000,
      protocol: ['roundrobin'],
      fromOffset: 'earliest' // equivalent of auto.offset.reset valid values are 'none', 'latest', 'earliest'
    } as any
    const consumer = new kafka.ConsumerGroup(Object.assign({id: 'consumer2'}, consumerGoupOptions),  ['test'])
    export default consumer
    
    // 處理消息
    consumer.on('message', async function (message) {
    	console.info('i am consumer2!')
      // Read string into a buffer.
      console.info(`[message]:==:>${JSON.stringify(message)}`)
      // const buf = new Buffer(String(message.value), 'binary')
      const decodedMessage = message // JSON.parse(buf.toString())
    
      await bluebird.delay(1000)
      console.log('decodedMessage: ', decodedMessage)
    })
    
    // 消息處理錯誤
    consumer.on('error', function (err) {
      console.log('error', err)
    })
    
    consumer.on('offsetOutOfRange', function (topic) {
      console.info(`[offsetOutOfRange]:==:>${topic}`)
      topic.maxNum = 2
      offset.fetch([topic], function (err, offsets) {
        if (err) {
          return console.error(err)
        }
        let min = Math.min.apply(null, offsets[topic.topic][topic.partition])
        consumer.setOffset(topic.topic, topic.partition, min)
      })
    })
    
    process.on('SIGINT', function () {
      consumer.close(true, function () {
        console.log('consumer colse!')
        process.exit()
      })
    })
    
    
    複製代碼

    執行以後,發現了一個問題:同一個 ConsumerGroup 的不一樣 Consumer 沒有均勻消費數據, 會出現一段時間,只有一個 Consumer 消費, 而另外一個 Conumser 不消費的狀況。

​ 爲何呢?

​ 這裏就須要知道消費端的均衡算法

算法以下:

1.A=(partition數量/同分組消費者總個數) 2.M=對上面所獲得的A值小數點第一位向上取整 3.計算出該消費者拉取數據的patition合集:Ci = [P(M*i ),P((i + 1) * M -1)]

Partition 數量爲 1 , 由於只有一個 broker

同分組消費者總個數: 2

A = 1 / 2

M = roundUp (A) = 1

C0 = [P(0), P(0]`

C1 = [P(1), P(1)]

因此,若是不是 C0 消費者不可用, C1 一直都不會去消費 Partition0 裏面的消息

結論是,若是非多 Kafka 節點的話, 單純增長同一消費組裏的消費者, 並不能作到均衡消費數據的狀況。

有其餘方法能夠實現嗎?

有的, 咱們能夠從 Producer 裏面入手,分發消息時固定 Topic 對應 固定的消費者節點。

Producer:

// Producer
// ...

const sendRecord = (objData, cb) => {
  const partition = Date.now() % 2 === 0 ? 0 : 1
  const buffer = Buffer.from(JSON.stringify(objData) + '_' + partition)

  // Create a new payload
  const record = [
    {
      topic: `test${partition}`,       // 這裏用了隨機方法分配 topic
      messages: buffer,
      attributes: 1, /* Use GZip compression for the payload */
      key: `key_${partition}`
    }
  ]

  // Send record to Kafka and log result/error
  console.info(`[record]:==:>${JSON.stringify(record)}`)
  producer.send(record, cb)
}

// ...
複製代碼

Consumer:

// Consumer1
// ...

const consumer = new kafka.ConsumerGroup(Object.assign({id: 'consumer1'}, consumerGoupOptions),  ['test0', 'test1'])		// 這裏須要優先輸入 須要消費的 topic, 次要消費的 topic 也要寫上,以防另外一節點重啓時, 消息沒及時消費

// ...
複製代碼
// Consumer2
// ...

const consumer = new kafka.ConsumerGroup(Object.assign({id: 'consumer2'}, consumerGoupOptions),  ['test1', 'test2'])		// 這裏須要優先輸入 須要消費的 topic, 次要消費的 topic 也要寫上,以防另外一節點重啓時, 消息沒及時消費

// ...
複製代碼

四. 總結:

Kafka

設計上:隊列消息不刪除,不一樣 ConsumerGroup均可以publish-subscribe,同一 ConsumerGroup 裏面只有一個 Consumer 能消費同一個 Topic

延遲消費:不支持: Consumer 開啓後, 會自動獲取 Producer 生產對應 Topic 的消息, 若想 Consumer 暫時不消費消息, 須要中斷 Consumer 的服務

負載均衡:從集羣上看, 即便其中一個 Broker 掛了,其餘 Broker上的 partition 都會存在副本集,kafka 仍然能夠正常運行。從 ConsumerGroup 上看,即便其中的Consumer 掛了, 同一 ConsumerGroup 的其餘 Consumer 仍然能夠消費其Topic 的消息,而不須要擔憂服務中斷。

實際上:Kafka 作點對點隊列,有點浪費。只用一個 ConsumerGroup,並無發揮 Kafka 的優點。可是 Kafka 這種很方便就能拓展成發佈-訂閱模式,消費端創建另一個 ConsumerGroup,就能夠爲另外一個服務啓用。

End

參考資料

代碼:
github.com/yuchenzhen/…

blog.csdn.net/tototuzuoqu…

zhuanlan.zhihu.com/p/58836260

juejin.im/post/5b59c6…

lotabout.me/2018/kafka-…

相關文章
相關標籤/搜索