Go RabbitMQ 工做隊列 (二)

rabbitMQ工做隊列

在以前內容中咱們經過一個隊列實現了消息的發送跟接收。接下來咱們建立工做隊列(Work Queue),用於在多個工做者之間分配耗時的任務fetch

工做隊列(任務隊列)背後的核心主要是避免當即執行資源密集型的任務,必須等待其工做完成。咱們將任務封裝爲消息後將其發送到隊列,後臺的工做進程將彈出任務並最終執行,當咱們運行不少Worker時候,任務將在它們之間共享spa

round-robin 調度

  • 使用任務隊列的優勢之一就是可以輕鬆的並行化工做
  • 默認狀況下,RabbitMQ會將每一條信息按照消費者順序發送給一個消費者,這樣平均每一個消費者會接收到相同數量的消息,這種消息分發的模式叫作round-robin(啓動多個接收端,而後發送多個消息試試)

message acknowledgment(消息確認)

爲了確保消息不會丟失,RabbitMQ支持消息確認,消費者消費了一個消息以後會發送一個ack給RabbitMQ,這樣RabbitMQ就能夠刪除掉這個消息code

若是一個消費者異常(通道關閉或連接關閉或TCP連接丟失)沒有發送ACK給rabbitMQ,rabbitMQ會將該消息從新放入隊列當中。此時若是有其餘消費者在線,rabbitMQ會從新將該消息再次投遞到另外一個消費者rabbitmq

  • 手動確認ACK
    • 手動確認ACK咱們能夠在建立消費者的時候將auto-ack設置爲false,一旦咱們消費消息任務完畢的時候使用d.Ack(false)來確認ack,告訴RabbitMQ該消息能夠刪除
    msgs,err := ch.Consume(
        q.Name,
        "",
        false,//將autoAck設置爲false,則須要在消費者每次消費完成
                        // 消息的時候調用d.Ack(false)來告訴RabbitMQ該消息已經消費
        false,
        false,
        false,
        nil,
        )
    FailError(err,"Failed to register a consumer")
    forever := make(chan bool)
    go func() {
        for d := range msgs{
            log.Printf("Received a message: %s", d.Body)
            dot_count := bytes.Count(d.Body, []byte("."))
            t := time.Duration(dot_count)
            time.Sleep(t * time.Second)
            log.Printf("Done")
            //multiple爲true的時候:這次交付和以前沒有確認的交付都會在經過同一個通道交付,這在批量處理的時候頗有用
            //爲false的時候只交付本次。只有該方法執行了,RabbitMQ收到該確認纔會將消息刪除
            d.Ack(false)
        }
    }()
    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever

    使用以上設置後,咱們能夠保證即便worker在執行任務的時候意外退出也不會丟失消息。在worker意外退出的不久以後消息將會被從新投遞。確認ack必須使用接收到消息的通道,若是使用不一樣的通道將會致使一個通道協議異常隊列

  • 忘記確認ack進程

    • 在開發的時候常常會忘記對消費過的消息進行ack確認,這是一個很嚴重的錯誤,可使用如下命令查看RabbitMQ中有多少消息在準備中或是未確認的: sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
    Listing queues for vhost / ...
    name    messages_ready  messages_unacknowledged
    hello   0   1
    • messages_ready:未投遞的消息
    • messages_unacknowledged:投遞未收到回覆的消息

消息持久

咱們已經知道如何確保即便消費者意外退出的狀況下保證任務不會丟失。可是若是RabbitMQ服務中止的話任務仍是會丟失。當RabbitMQ退出或異常的時候,它將會丟失隊列和消息,除非你設置RabbitMQ的兩個地方:將隊列和消息進行標記爲持久的ip

  1. 首先設置隊列durable爲true內存

    q, err := ch.QueueDeclare(
     "hello",      // name
     true,         // durable
     false,        // delete when unused
     false,        // exclusive
     false,        // no-wait
     nil,          // arguments
    )

    RabbitMQ不容許使用不一樣參數從新定義一個已經存在的隊列,因此隊列已經存在的話修改了上面的配置後運行程序是不會改變已經存在的隊列的資源

  2. 而後設置消息爲持久化存儲:路由

    err = ch.Publish(
     "",           // exchange
     q.Name,       // routing key
     false,        // mandatory
     false,
     amqp.Publishing {
    DeliveryMode: amqp.Persistent,
    ContentType:  "text/plain",
    Body:         []byte(body),
    })

    注意:設置消息持久化並不能保證消息不會丟失,由於仍然有一小段時間片處於RabbitMQ收到消息可是還沒保存,它可能只是保存在內存當中。可是已經知足咱們的基本使用,若是你須要強保證的話可使用publisher confirms

公平調度(Fair dispatch)

  • RabbitMQ的默認消息分配不可以知足咱們的須要,好比有兩個消費者,其中一個消費者常常忙碌的狀態,另一個消費者幾乎不作任何工做,可是RabbitMQ仍然均勻的在二者之間調度消息。這是由於RabbitMQ只作隊列當中的消息調度而沒有查看某個消費者中未確認的消息,它只是盲目的將第n條消息發送給第n個消費者
  • 解決以上問題咱們能夠設置prefetch count數值爲1,這樣只有當消費者消費完消息並返回ack確認後RabbitMQ纔會給其分發消息,不然只會將消息分發給其餘空閒狀態的消費者
err = ch.Qos(
  1,     // prefetch count
  0,     // prefetch size
  false, // global
)

注意:消費者必需要設置,生產者不用設置

完整代碼

  • new_task.go
func main() {
    conn,err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failError(err,"send:Failed to connect to RabbitMQ")
    defer conn.Close()
    ch,err := conn.Channel()
    failError(err,"Failed to open a channel")
    defer ch.Close()
    q,err := ch.QueueDeclare(
        "task_queue",
        true,// 設置爲true以後RabbitMQ將永遠不會丟失隊列,不然重啓或異常退出的時候會丟失
        false,
        false,
        false,
        nil,
    )
    failError(err,"Failed to declare a queue")
    fmt.Println(q.Name)
    body := bodyFrom(os.Args)
    //生產者將消息發送到默認交換器中,不是發送到隊列中
    ch.Publish(
        "",//默認交換器
        q.Name,//使用隊列的名字來看成route-key是由於聲明的每個隊列都有一個隱式路由到默認交換器
        false,
        false,
        amqp.Publishing{
            DeliveryMode:amqp.Persistent,
            ContentType:"text/plain",
            Body:[]byte(body),
        })
    failError(err,"Failed to publish a message")
    log.Printf(" [x] Sent %s",body)
}
func bodyFrom(args []string)string  {
    var s string
    if len(args) < 2 || os.Args[1] == "" {
        s = "hello"
    }else {
        s = strings.Join(args[1:]," ")
    }
    return s
}
func failError(err error,msg string)  {
    if err != nil {
        log.Fatal("%s : %s",msg,err)
    }
}
  • Worker.go
func main() {
    conn,err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    FailError1(err,"receive:Failed to connect to RabbitMQ")
    defer conn.Close()
    ch,err := conn.Channel()
    FailError1(err,"receive:Failed to open a channel")
    defer ch.Close()
    q,err := ch.QueueDeclare(
        "task_queue",
        true,
        false,
        false,
        false,
        nil,
    )
    err = ch.Qos(
        1, //// 在沒有返回ack以前,最多隻接收1個消息
        0,
        false,
    )
    FailError1(err,"Failed to set Qos")
    msgs,err := ch.Consume(
        q.Name,
        "",
        false,//將autoAck設置爲false,則須要在消費者每次消費完成
                        // 消息的時候調用d.Ack(false)來告訴RabbitMQ該消息已經消費
        false,
        false,
        false,
        nil,
        )
    FailError1(err,"Failed to register a consumer")
    forever := make(chan bool)
    go func() {
        for d := range msgs{
            log.Printf("Received a message: %s", d.Body)
            dot_count := bytes.Count(d.Body, []byte("."))
            t := time.Duration(dot_count)
            fmt.Println()
            time.Sleep(t * time.Second)
            log.Printf("Done")
            //multiple爲true的時候:這次交付和以前沒有確認的交付都會在經過同一個通道交付,這在批量處理的時候頗有用
            //爲false的時候只交付本次。只有該方法執行了,RabbitMQ收到該確認纔會將消息刪除
            d.Ack(false)
        }
    }()
    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}
func FailError1(err error,msg string)  {
    if err != nil {
        log.Fatal("%s : %s",msg,err)
    }
}
相關文章
相關標籤/搜索