- 爲何寫這個庫
- 應用場景有哪些
- 如何使用
- 總結
在開始自研 go-queue
以前,針對如下咱們調研目前的開源隊列方案:html
beanstalkd
有一些特殊好用功能:支持任務priority、延時(delay)、超時重發(time-to-run)和預留(buried),可以很好的支持分佈式的後臺任務和定時任務處理。以下是 beanstalkd
基本部分:git
job
:任務單元;tube
:任務隊列,存儲統一類型 job
。producer 和 consumer 操做對象;producer
:job
生產者,經過 put 將 job 加入一個 tube;consumer
:job
消費者,經過 reserve/release/bury/delete 來獲取job或改變job的狀態;很幸運的是官方提供了 go client:https://github.com/beanstalkd...。github
可是這對不熟悉 beanstalkd
操做的 go 開發者而言,須要學習成本。redis
相似基於 kafka
消息隊列做爲存儲的方案,存儲單元是消息,若是要實現延時執行,能夠想到的方案是以延時執行的時間做爲 topic
,這樣在大型的消息系統中,充斥大量一次性的 topic
(dq_1616324404788, dq_1616324417622
),當時間分散,會容易形成磁盤隨機寫的狀況。json
並且在 go 生態中,異步
同時考慮如下因素:分佈式
因此咱們本身基於以上兩個基礎組件開發了 go-queue
:函數
beanstalkd
開發了 dq
,支持定時和延時操做。同時加入 redis
保證消費惟一性。kafka
開發了 kq
,簡化生產者和消費者的開發API,同時在寫入kafka使用批量寫,節省IO。總體設計以下:微服務
首先在消費場景來講,一個是針對任務隊列,一個是消息隊列。而二者最大的區別:性能
因此在背後的基礎設施選型上,也是基於這種消費場景。
dq
:依賴於 beanstalkd
,適合延時、定時任務執行;kq
:依賴於 kafka
,適用於異步、批量任務執行;而從其中 dq
的 API 中也能夠看出:
// 延遲任務執行 - dq.Delay(msg, delayTime); // 定時任務執行 - dq.At(msg, atTime);
而在咱們內部:
kq
:kq.Push(msg)
;dq
;分別介紹 dq
和 kq
的使用方式:
// [Producer] producer := dq.NewProducer([]dq.Beanstalk{ { Endpoint: "localhost:11300", Tube: "tube", }, { Endpoint: "localhost:11301", Tube: "tube", }, }) for i := 1000; i < 1005; i++ { _, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second*5) if err != nil { fmt.Println(err) } }
// [Consumer] consumer := dq.NewConsumer(dq.DqConf{ Beanstalks: []dq.Beanstalk{ { Endpoint: "localhost:11300", Tube: "tube", }, { Endpoint: "localhost:11301", Tube: "tube", }, }, Redis: redis.RedisConf{ Host: "localhost:6379", Type: redis.NodeType, }, }) consumer.Consume(func(body []byte) { // your consume logic fmt.Println(string(body)) })
和普通的 生產者-消費者 模型相似,開發者也只須要關注如下:
producer.go
:
// message structure type message struct { Key string `json:"key"` Value string `json:"value"` Payload string `json:"message"` } pusher := kq.NewPusher([]string{ "127.0.0.1:19092", "127.0.0.1:19093", "127.0.0.1:19094", }, "kq") ticker := time.NewTicker(time.Millisecond) for round := 0; round < 3; round++ { select { case <-ticker.C: count := rand.Intn(100) // 準備消息 m := message{ Key: strconv.FormatInt(time.Now().UnixNano(), 10), Value: fmt.Sprintf("%d,%d", round, count), Payload: fmt.Sprintf("%d,%d", round, count), } body, err := json.Marshal(m) if err != nil { log.Fatal(err) } fmt.Println(string(body)) // push to kafka broker if err := pusher.Push(string(body)); err != nil { log.Fatal(err) } } }
config.yaml
:
Name: kq Brokers: - 127.0.0.1:19092 - 127.0.0.1:19092 - 127.0.0.1:19092 Group: adhoc Topic: kq Offset: first Consumers: 1
consumer.go
:
var c kq.KqConf conf.MustLoad("config.yaml", &c) // WithHandle: 具體的處理msg的logic // 這也是開發者須要根據本身的業務定製化 q := kq.MustNewQueue(c, kq.WithHandle(func(k, v string) error { fmt.Printf("=> %s\n", v) return nil })) defer q.Stop() q.Start()
和 dq
不一樣的是:開發者不須要關注任務類型(在這裏也沒有任務的概念,傳遞的都是 message data
)。
其餘操做和 dq
相似,只是將 業務處理函數 當成配置直接傳入消費者中。
在咱們目前的場景中,kq
大量使用在咱們的異步消息服務;而延時任務,咱們除了 dq
,還可使用內存版的 TimingWheel「go-zero
生態組件」。
關於 go-queue
更多的設計和實現文章,能夠持續關注咱們。歡迎你們去關注和使用。
https://github.com/tal-tech/go-queue
https://github.com/tal-tech/go-zero
歡迎使用 go-zero 並 star 支持咱們!
go-zero 系列文章見『微服務實踐』公衆號