AMQP(Advanced Message Queuing Protocol,高級消息隊列協議)是一個進程間傳遞異步消息的網絡協議。服務器
RabbitMQ 就是 amqp 協議的Erlang的實現。網絡
AMQP的模型架構的主要角色,生產者、消費者、交換器、隊列。架構
Rabbitmq中須要路由鍵和綁定鍵聯合使用才能使生產者成功投遞到隊列中去。異步
生產者將消息投遞到交換器,經過交換器綁定的隊列,最終投遞到對應的隊列中去。
性能
Rabbitmq共有4種交換器測試
topic 規則匹配,BindingKey中存在兩種特殊字符ui
*
匹配零個或多個單詞#
匹配一個單詞在Golang中建立rabbitmq 生產者基本步驟是:spa
// connection connection, err := amqp.Dial("amqp://guest:guest@localhost:5672/") if err != nil { panic(err) } // channel channel, err := connection.Channel() if err != nil { panic(err) }
if err = channel.ExchangeDeclare("e1", "direct", true, false, false, true, nil); err != nil { panic(err) }
參數解析:code
參數說明要點:blog
自動刪除功能必需要在交換器曾經綁定過隊列或者交換器的狀況下,處於再也不使用的時候纔會自動刪除,
若是是剛剛建立的還沒有綁定隊列或者交換器的交換器或者早已建立只是未進行隊列或者交換器綁定的交換器是不會自動刪除的。
內置交換器是一種特殊的交換器,這種交換器不能直接接收生產者發送的消息,
只能做爲相似於隊列的方式綁定到另外一個交換器,來接收這個交換器中路由的消息,
內置交換器一樣能夠綁定隊列和路由消息,只是其接收消息的來源與普通交換器不一樣。
當noWait爲true時,聲明時無需等待服務器的確認。
該通道可能因爲錯誤而關閉。 添加一個NotifyClose偵聽器應對任何異常。
建立交換器還有一個差很少的方法(ExchangeDeclarePassive),他主要是假定交換已存在,並嘗試鏈接到
不存在的交換將致使RabbitMQ引起異常,可用於檢測交換的存在。
if _, err := channel.QueueDeclare("q1", true, false, false, true, nil); err != nil { panic(err) }
參數解析:
參數說明要點:
排他隊列只對首次建立它的鏈接可見,排他隊列是基於鏈接(Connection)可見的,而且該鏈接內的全部信道(Channel)均可以訪問這個排他隊列,在這個鏈接斷開以後,該隊列自動刪除,因而可知這個隊列能夠說是綁到鏈接上的,對同一服務器的其餘鏈接不可見。
同一鏈接中不容許創建同名的排他隊列的
這種排他優先於持久化,即便設置了隊列持久化,在鏈接斷開後,該隊列也會自動刪除。
非排他隊列不依附於鏈接而存在,同一服務器上的多個鏈接均可以訪問這個隊列。
爲true則設置隊列爲自動刪除。
自動刪除的前提是:至少有一個消費者鏈接到這個隊列,以後全部與這個隊列鏈接的消費者都斷開時,纔會自動刪除。
不能把這個參數錯誤地理解爲:"當鏈接到此隊列的全部客戶端斷開時,這個隊列自動刪除",由於生產者客戶端建立這個隊列,或者沒有消費者客戶端與這個隊列鏈接時,都不會自動刪除這個隊列。
建立隊列還有一個差很少的方法(QueueDeclarePassive),他主要是假定隊列已存在,並嘗試鏈接到
不存在的隊列將致使RabbitMQ引起異常,可用於檢測隊列的存在。
if err = channel.QueueBind("q1", "q1Key", "e1", true, nil); err != nil { panic(err) }
參數解析:
if err = channel.ExchangeBind("dest", "q1Key", "src", false, nil); err != nil { panic(err) }
參數解析:
生產者發送消息至交換器source中,交換器source根據路由鍵找到與其匹配的另外一個交換器destination,井把消息轉發到destination中,進而存儲在.destination綁定的隊列queue中,某種程度上來講destination交換器能夠看做一個隊列。如圖:
if err = channel.Publish("e1", "q1Key", true, false, amqp.Publishing{ Timestamp: time.Now(), ContentType: "text/plain", Body: []byte("Hello Golang and AMQP(Rabbitmq)!"), }); err != nil { panic(err) }
參數解析:
參數說明要點:
消息發佈的時候設置消息的 mandatory 屬性用於設置消息在發送到交換器以後沒法路由到隊列的狀況對消息的處理方式,
設置爲 true 表示將消息返回到生產者,不然直接丟棄消息。
參數告訴服務器至少將該消息路由到一個隊列中,不然將消息返回給生產者。imrnediate參數告訴服務器,若是該消息關聯的隊列上有消費者,則馬上投遞:若是全部匹配的隊列上都沒有消費者,則直接將消息返還給生產者,不用將消息存入隊列而等待消費者了。
RabbitMQ 3.0版本開始去掉了對imrnediate參數的支持
Rabbitmq消費方式共有2種,分別是推模式和拉模式
推模式是經過持續訂閱的方式來消費信息,
Consume將信道(Channel)直爲接收模式,直到取消隊列的訂閱爲止。在接收模式期間,RabbitMQ會不斷地推送消息給消費者。
推送消息的個數仍是會受到channel.Qos的限制
deliveries, err := channel.Consume("q1", "any", false, false, false, true, nil) if err != nil { panic(err) }
若是ack設置爲false則表示須要手動進行ack消費
v, ok := <-deliveries if ok { // 手動ack確認 // 注意: 這裏只要調用了ack就是手動確認模式, // multiple 表示的是在此channel中先前全部未確認的deliveries都將被確認 // 並非表示設置爲false就不進行當前ack確認 if err := v.Ack(true); err != nil { fmt.Println(err.Error()) } } else { fmt.Println("Channel close") }
參數解析:
參數說明要點:
設置爲true則表示不能將同一個Connection中生產者發送的消息傳送給這個Connection中的消費者
拉模式:
相對來講比較簡單,是由消費者主動拉取信息來消費,一樣也須要進行ack確認消費
channel.Get(queue string, autoAck bool)
下面是一個簡單示例,只是爲了通訊測試,單條數據收發
func Connection() (*amqp.Connection) { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") if err != nil { panic(err) } return conn } func Sample() { var wg sync.WaitGroup wg.Add(1) go SampleConsumption(&wg) // 建立鏈接 connection := Connection() defer connection.Close() // 開啓 channel channel, err := connection.Channel() if err != nil { panic(err) } defer channel.Close() if err = channel.ExchangeDeclare("e1", "direct", true, false, false, true, nil); err != nil { panic(err) } if _, err := channel.QueueDeclare("q1", true, false, false, true, nil); err != nil { panic(err) } if err = channel.QueueBind("q1", "q1Key", "e1", true, nil); err != nil { panic(err) } // mandatory true 未找到隊列返回給消費者 returnChan := make(chan amqp.Return,0) channel.NotifyReturn(returnChan) // Publish if err = channel.Publish("e1", "q1Key", true, false, amqp.Publishing{ Timestamp: time.Now(), ContentType: "text/plain", Body: []byte("Hello Golang and AMQP(Rabbitmq)!"), }); err != nil { panic(err) } //for v := range returnChan{ // fmt.Printf("Return %#v\n",v) //} wg.Wait() } func SampleConsumption(wg *sync.WaitGroup) { connection := Connection() defer connection.Close() channel, err := connection.Channel() if err != nil { panic(err) } defer channel.Close() deliveries, err := channel.Consume("q1", "any", false, false, false, true, nil) if err != nil { panic(err) } // 這裏只取一條,由於product只發一條 v, ok := <-deliveries if ok { if err := v.Ack(true); err != nil { fmt.Println(err.Error()) } } else { fmt.Println("Channel close") } wg.Done() }