在以前內容中咱們經過一個隊列實現了消息的發送跟接收。接下來咱們建立工做隊列(Work Queue),用於在多個工做者之間分配耗時的任務fetch
工做隊列(任務隊列)背後的核心主要是避免當即執行資源密集型的任務,必須等待其工做完成。咱們將任務封裝爲消息後將其發送到隊列,後臺的工做進程將彈出任務並最終執行,當咱們運行不少Worker時候,任務將在它們之間共享spa
爲了確保消息不會丟失,RabbitMQ支持消息確認,消費者消費了一個消息以後會發送一個ack給RabbitMQ,這樣RabbitMQ就能夠刪除掉這個消息code
若是一個消費者異常(通道關閉或連接關閉或TCP連接丟失)沒有發送ACK給rabbitMQ,rabbitMQ會將該消息從新放入隊列當中。此時若是有其餘消費者在線,rabbitMQ會從新將該消息再次投遞到另外一個消費者rabbitmq
msgs,err := ch.Consume( q.Name, "", false,//將autoAck設置爲false,則須要在消費者每次消費完成 // 消息的時候調用d.Ack(false)來告訴RabbitMQ該消息已經消費 false, false, false, nil, ) FailError(err,"Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs{ log.Printf("Received a message: %s", d.Body) dot_count := bytes.Count(d.Body, []byte(".")) t := time.Duration(dot_count) time.Sleep(t * time.Second) log.Printf("Done") //multiple爲true的時候:這次交付和以前沒有確認的交付都會在經過同一個通道交付,這在批量處理的時候頗有用 //爲false的時候只交付本次。只有該方法執行了,RabbitMQ收到該確認纔會將消息刪除 d.Ack(false) } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") <-forever
使用以上設置後,咱們能夠保證即便worker在執行任務的時候意外退出也不會丟失消息。在worker意外退出的不久以後消息將會被從新投遞。確認ack必須使用接收到消息的通道,若是使用不一樣的通道將會致使一個通道協議異常隊列
忘記確認ack進程
Listing queues for vhost / ... name messages_ready messages_unacknowledged hello 0 1
咱們已經知道如何確保即便消費者意外退出的狀況下保證任務不會丟失。可是若是RabbitMQ服務中止的話任務仍是會丟失。當RabbitMQ退出或異常的時候,它將會丟失隊列和消息,除非你設置RabbitMQ的兩個地方:將隊列和消息進行標記爲持久的ip
首先設置隊列durable爲true內存
q, err := ch.QueueDeclare( "hello", // name true, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments )
RabbitMQ不容許使用不一樣參數從新定義一個已經存在的隊列,因此隊列已經存在的話修改了上面的配置後運行程序是不會改變已經存在的隊列的資源
而後設置消息爲持久化存儲:路由
err = ch.Publish( "", // exchange q.Name, // routing key false, // mandatory false, amqp.Publishing { DeliveryMode: amqp.Persistent, ContentType: "text/plain", Body: []byte(body), })
注意:設置消息持久化並不能保證消息不會丟失,由於仍然有一小段時間片處於RabbitMQ收到消息可是還沒保存,它可能只是保存在內存當中。可是已經知足咱們的基本使用,若是你須要強保證的話可使用publisher confirms
err = ch.Qos( 1, // prefetch count 0, // prefetch size false, // global )
注意:消費者必需要設置,生產者不用設置
func main() { conn,err := amqp.Dial("amqp://guest:guest@localhost:5672/") failError(err,"send:Failed to connect to RabbitMQ") defer conn.Close() ch,err := conn.Channel() failError(err,"Failed to open a channel") defer ch.Close() q,err := ch.QueueDeclare( "task_queue", true,// 設置爲true以後RabbitMQ將永遠不會丟失隊列,不然重啓或異常退出的時候會丟失 false, false, false, nil, ) failError(err,"Failed to declare a queue") fmt.Println(q.Name) body := bodyFrom(os.Args) //生產者將消息發送到默認交換器中,不是發送到隊列中 ch.Publish( "",//默認交換器 q.Name,//使用隊列的名字來看成route-key是由於聲明的每個隊列都有一個隱式路由到默認交換器 false, false, amqp.Publishing{ DeliveryMode:amqp.Persistent, ContentType:"text/plain", Body:[]byte(body), }) failError(err,"Failed to publish a message") log.Printf(" [x] Sent %s",body) } func bodyFrom(args []string)string { var s string if len(args) < 2 || os.Args[1] == "" { s = "hello" }else { s = strings.Join(args[1:]," ") } return s } func failError(err error,msg string) { if err != nil { log.Fatal("%s : %s",msg,err) } }
func main() { conn,err := amqp.Dial("amqp://guest:guest@localhost:5672/") FailError1(err,"receive:Failed to connect to RabbitMQ") defer conn.Close() ch,err := conn.Channel() FailError1(err,"receive:Failed to open a channel") defer ch.Close() q,err := ch.QueueDeclare( "task_queue", true, false, false, false, nil, ) err = ch.Qos( 1, //// 在沒有返回ack以前,最多隻接收1個消息 0, false, ) FailError1(err,"Failed to set Qos") msgs,err := ch.Consume( q.Name, "", false,//將autoAck設置爲false,則須要在消費者每次消費完成 // 消息的時候調用d.Ack(false)來告訴RabbitMQ該消息已經消費 false, false, false, nil, ) FailError1(err,"Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs{ log.Printf("Received a message: %s", d.Body) dot_count := bytes.Count(d.Body, []byte(".")) t := time.Duration(dot_count) fmt.Println() time.Sleep(t * time.Second) log.Printf("Done") //multiple爲true的時候:這次交付和以前沒有確認的交付都會在經過同一個通道交付,這在批量處理的時候頗有用 //爲false的時候只交付本次。只有該方法執行了,RabbitMQ收到該確認纔會將消息刪除 d.Ack(false) } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") <-forever } func FailError1(err error,msg string) { if err != nil { log.Fatal("%s : %s",msg,err) } }