rabbitmq是一個消息代理系統,爲應用提供一個通用得消息發佈,接受平臺,爲應用提供非阻塞的消息系統,方便進行異步處理。服務器
減小用戶對沒必要要的耗時操做的等待,處理結果以異步方式(郵件,消息推送)進行提醒。併發
當某個應用發展到必定規模的時候,須要把裏面的模塊分別拆出來進行解耦,而模塊之間的通信方式是多樣的,常見的有rpc,消息隊列,http請求。其中消息隊列在內部模塊通訊是更爲穩定。異步
若是突發遇到大量的數據請求的時候,服務器若是不作隊列處理,一會兒處理所有的請求,會很容易形成宕機,若是把請求的數據都放入隊列裏,以後再逐個逐個地進行處理,能夠平緩地渡過流量高峯期。編碼
rabbitmq的工做方式以下,生產者(publisher)發送消息到交換機,交換機(exchange)根據本身的類型以及消息的路由鍵,路由到對應的隊列裏,隊列分發消息到消費者(consumer)
debug
如今咱們假設有這個場景,客服A須要發送客戶的下單信息給庫存人員B,客服A有一個訂單信息發送器,庫存人員B擁有消息接收器。
首先庫存人員B創建鏈接並接受消息,僞代碼:設計
// 創建鏈接 conn, _ := amqp.Dial("amqp://localhost") ch, _ := conn.Channel() // 聲明隊列,不存在則建立,存在則不會進行任何操做 queue, _ := ch.QueueDeclare("order") // 從隊列裏面獲取消息 deliver, _ := ch.Consume(q.Name) for d:= range deliver { // 輸出消息主體 log.Printf("B Received a message: %s", d.Body) // 返回獲取成功標識給隊列 d.Ack(true) }
而後客服A也創建鏈接併發送消息,僞代碼:代理
// 創建鏈接 conn, _ := amqp.Dial("amqp://localhost") ch, _ := conn.Chanenel() // 聲明隊列,不存在則建立,存在則不會進行任何操做 queue, _ := ch.QueueDeclare("order") // 發佈消息 ch.Publish( q.Name, // 隊列名字 amqp.Publishing{ Body:[]byte("new order" + product.String()), })
上面就是一種簡單的直接經過隊列進行鏈接的方法,可能會有人看出來,爲何沒有交換機的參與,其實上面的操做實際上是經過默認交換機進行消息傳遞,能夠不指定交換機名字直接指定隊列名字進行交互。日誌
經過上面的簡單例子,咱們能夠更進一步地瞭解到rabbitmq的工做方式,下面我會更詳細地講解各個模塊。code
消息是通訊內容的主體,消息對象有點像http的request,除了能夠攜帶消息內容,還能夠帶有各類屬性,如:對象
有些屬性只是約定規範,如ContentType,ContentEncoding,須要程序本身作處理,有些屬性rabbitmq會根據值來進行處理,如RemoteKey,交換機會根據消息的RemoteKey和自身的類型來決定投遞到哪些隊列,DeliveryMode能夠決定是否持久化消息。
#消息投遞 ch.Publish( "", # exchange名字,空爲默認交換機 key, # routingkey 路由鍵 false, false, # 消息 amqp.Publishing{ DeliveryMode:amqp.Persistent, ContentType:"text/plain", Body:[]byte("hello world"), })
隊列是存儲消息的主體,隊列自己所擁有的一些屬性:
在代碼裏面隊列聲明,若是隊列不存在則新建隊列,若是已存在相同名字的隊列且屬性不一樣的話則會報錯。能夠選擇讓系統自動生成隊列,而後返回隊列名字。
# 隊列聲明,參數依次爲name,Durable,auto-delete,exclusive,no-wait.args amqp.QueueDeclare("queuename", true, false, flase, false, nil)
消費者用以消費隊列裏的消息的自定義程序片斷,消費者獲取隊列裏的消息有兩種方式,一種是拉取(pull)的方式,經過channel.basicget方法,一種是訂閱方式,隊列推送消息到rabbitmq,這種方式用的最多。
消息處理,消費者端鏈接隊列後,能夠獲得一個相似於句柄的東西,若是沒有消息就會一直阻塞。
消費者在收到消息以後處理的狀況多是成功的,也有多是失敗的,爲了確保消息已經成功處理而後隊列刪除消息,若是失敗則進行其餘機制,以避免消息一直重複在隊列裏面,或消息因消費者宕機而丟失。
若是消息成功地被消費者處理的話,須要有一個消息確認的機制。
rabbitmq提供兩種確認機制:
通常而言咱們用的更多的是顯式確認模式,若是消費者接收到消息沒有進行確認以後就宕機了,隊列裏面的該消息仍是會存在的,而後會把消息轉發到其餘消費者。
若是消費者對消息的處理出現了一些問題,能夠調用rabbitmq的basic.reject來拒絕消息,拒絕消息以後,能夠作的是把消息放回到隊列裏面,或者直接刪除消息。
其實若是出現問題的消息,即使是交給其餘的消費者,很會很大機率繼續出現問題,這時候咱們能夠把消息放到其餘專門處理記錄問題的隊列裏面,交由另外的消費者處理。
交換機更像是消息的路由層,隊列綁定到交換機,而後發佈者能夠發送的消息都是通過交換機的,而後經由消息的remote key(路由鍵)路由到交換機所綁定的隊列裏。
交換機分爲4種類型:
直連型交換機(direct exchange)是根據消息攜帶的路由鍵(routing key)將消息投遞給對應隊列的。
其實初嘗rabbitmq的例子裏面,看上去沒有綁定交換機,實際上也是綁定了直連交換機,只是是一個特殊的預先聲明好的,名字爲空字符串的交換機,叫默認交換機,每一個隊列都會自動綁定到默認交換機上。
扇型交換機(funout exchange)將消息路由給綁定到它身上的全部隊列,而不理會綁定的路由鍵。若是N個隊列綁定到某個扇型交換機上,當有消息發送給此扇型交換機時,交換機會將消息的拷貝分別發送給這全部的N個隊列。扇型用來交換機處理消息的廣播路由(broadcast routing)。
主題交換機(topic exchanges)經過對消息的路由鍵和隊列到交換機的綁定模式之間的匹配,將消息路由給一個或多個隊列。主題交換機常常用來實現各類分發/訂閱模式及其變種。主題交換機一般用來實現消息的多播路由(multicast routing)。
主題交換機在我看來就像添加了簡單的通配符+字符串來達到一個路由的規則。
頭交換機用的不是不少,有時消息的路由操做會涉及到多個屬性,此時使用消息頭就比用路由鍵更容易表達,頭交換機(headers exchange)就是爲此而生的。頭交換機使用多個消息屬性來代替路由鍵創建路由規則。經過判斷消息頭的值可否與指定的綁定相匹配來確立路由規則。
生產者代碼
conn, _ := amqp.Dial("amqp://localhost") ch, _ := conn.Channel() ch.ExchangeDeclare( "hello", "fanout", true, false, false, false, nil, ) ch.Publish( "hello", "", // 因爲是廣播,因此能夠不填寫路由鍵 false, false, amqp.Publishing{ DeliveryMode:amqp.Persistent, Body:[]byte("hello"+time.Now().String()), })
消費者代碼
conn, _ := amqp.Dial("amqp://localhost") ch, _ := conn.Channel() ch.ExchangeDeclare( "hello", // 交換機名字 "fanout", // 交換機類型 true, // durable false, // autoDelete false, // internal false, // noWait nil, // args ) q, _ := ch.QueueDeclare( "", false, // durable false, // autoDelete true, // exclusive false, // noWait nil, // ) ch.QueueBind( q.Name, // queuename "", // remote key,因爲是廣播,能夠不填寫路由鍵 "hello", // exchange name false, // nowait nil, ) msgs, _ := ch.Consume(q.Name,"", true, false, false,false,nil) for msg := range msgs { log.Printf("%s", msg.Body) }
設有以下場景:設計一個日誌收集系統,日誌有不一樣的級別,debug,info,warn,error,日誌格式爲:
級別.模塊名字 如:info.login
有不一樣的隊列負責收集不一樣級別的日誌,其中有個隊列專門收集收集warn和error的數據,設計以下:
生產者
func main() { conn, _ := amqp.Dial("amqp://localhost") ch, _ := conn.Channel() ch.ExchangeDeclare( "logs", "topic", true, false, false, false, nil, ) ch.Publish( "logs", "debug.123", false, false, amqp.Publishing{ DeliveryMode:amqp.Persistent, Body:[]byte("hello"), }, ) }
消費者
func main() { conn, _ := amqp.Dial("amqp://localhost") ch, _ := conn.Channel() ch.ExchangeDeclare( "logs", "topic", true, false, false, false, nil, ) q, _ := ch.QueueDeclare( "log1", true, false, false, false, nil, ) // 隊列綁定的remote key keys := []string{"error.*", "warn.*"} for _, key := range keys{ ch.QueueBind( q.Name, key, "logs", false, nil, ) } deliver, _ := ch.Consume( q.Name, "", false, false, false, false, nil, ) for d:= range deliver { fmt.Println(string(d.Body)) d.Ack(true) } }