RabbitMQ入門教程

簡介

rabbitmq是一個消息代理系統,爲應用提供一個通用得消息發佈,接受平臺,爲應用提供非阻塞的消息系統,方便進行異步處理。服務器

優勢

  1. 消息的可靠性。持久化消息,消息接受確認,消息重傳等可靠機制。
  2. 靈活的路由。交換機能夠根據廣播,或者根據路由鍵或匹配符匹配到不一樣的隊列。
  3. 高可用的集羣。

應用場景

1.異步處理

減小用戶對沒必要要的耗時操做的等待,處理結果以異步方式(郵件,消息推送)進行提醒。併發

2.應用解耦

當某個應用發展到必定規模的時候,須要把裏面的模塊分別拆出來進行解耦,而模塊之間的通信方式是多樣的,常見的有rpc,消息隊列,http請求。其中消息隊列在內部模塊通訊是更爲穩定。異步

3.流量削峯

若是突發遇到大量的數據請求的時候,服務器若是不作隊列處理,一會兒處理所有的請求,會很容易形成宕機,若是把請求的數據都放入隊列裏,以後再逐個逐個地進行處理,能夠平緩地渡過流量高峯期。編碼

工做方式

rabbitmq的工做方式以下,生產者(publisher)發送消息到交換機,交換機(exchange)根據本身的類型以及消息的路由鍵,路由到對應的隊列裏,隊列分發消息到消費者(consumer)
debug

初嘗rabbitmq

如今咱們假設有這個場景,客服A須要發送客戶的下單信息給庫存人員B,客服A有一個訂單信息發送器,庫存人員B擁有消息接收器。
首先庫存人員B創建鏈接並接受消息,僞代碼:設計

// 創建鏈接
conn, _ := amqp.Dial("amqp://localhost")
ch, _ := conn.Channel()
// 聲明隊列,不存在則建立,存在則不會進行任何操做
queue, _ := ch.QueueDeclare("order")
// 從隊列裏面獲取消息
deliver, _ := ch.Consume(q.Name)
for d:= range deliver {
    // 輸出消息主體
    log.Printf("B Received a message: %s", d.Body)
    // 返回獲取成功標識給隊列
    d.Ack(true)
}

而後客服A也創建鏈接併發送消息,僞代碼:代理

// 創建鏈接
conn, _ := amqp.Dial("amqp://localhost")
ch, _ := conn.Chanenel()
// 聲明隊列,不存在則建立,存在則不會進行任何操做
queue, _ := ch.QueueDeclare("order")
// 發佈消息
ch.Publish(
    q.Name, // 隊列名字
    amqp.Publishing{
        Body:[]byte("new order" + product.String()),
    })

上面就是一種簡單的直接經過隊列進行鏈接的方法,可能會有人看出來,爲何沒有交換機的參與,其實上面的操做實際上是經過默認交換機進行消息傳遞,能夠不指定交換機名字直接指定隊列名字進行交互。日誌

模塊介紹

經過上面的簡單例子,咱們能夠更進一步地瞭解到rabbitmq的工做方式,下面我會更詳細地講解各個模塊。code

消息

消息是通訊內容的主體,消息對象有點像http的request,除了能夠攜帶消息內容,還能夠帶有各類屬性,如:對象

  1. ContentType(內容類型)
  2. ContentEncoding(內容編碼)
  3. RemoteKey(路由鍵)
  4. DeliveryMode( 投遞模式,消息是否持久化)
  5. 等等...

有些屬性只是約定規範,如ContentType,ContentEncoding,須要程序本身作處理,有些屬性rabbitmq會根據值來進行處理,如RemoteKey,交換機會根據消息的RemoteKey和自身的類型來決定投遞到哪些隊列,DeliveryMode能夠決定是否持久化消息。

#消息投遞
ch.Publish(
    "",     # exchange名字,空爲默認交換機
    key,    # routingkey 路由鍵
    false,  
    false,
    # 消息
    amqp.Publishing{
        DeliveryMode:amqp.Persistent,
        ContentType:"text/plain",
        Body:[]byte("hello world"),
    })

隊列

隊列是存儲消息的主體,隊列自己所擁有的一些屬性:

  1. Name 隊列名字,不一樣的隊列名字應該保持惟一性
  2. Durable rabbitmq重啓後,隊列是否依舊存在,須要注意消息要持久化的要另外設置消息
  3. exclusive 當前隊列只能被一個消費者鏈接使用,關閉鏈接後刪除隊列。
  4. auto-delete 最後一個消費者退訂後刪除隊列。

在代碼裏面隊列聲明,若是隊列不存在則新建隊列,若是已存在相同名字的隊列且屬性不一樣的話則會報錯。能夠選擇讓系統自動生成隊列,而後返回隊列名字。

# 隊列聲明,參數依次爲name,Durable,auto-delete,exclusive,no-wait.args
amqp.QueueDeclare("queuename", true, false, flase, false, nil)

消費者(consumer)

消費者用以消費隊列裏的消息的自定義程序片斷,消費者獲取隊列裏的消息有兩種方式,一種是拉取(pull)的方式,經過channel.basicget方法,一種是訂閱方式,隊列推送消息到rabbitmq,這種方式用的最多。

消息處理

消息處理,消費者端鏈接隊列後,能夠獲得一個相似於句柄的東西,若是沒有消息就會一直阻塞。
消費者在收到消息以後處理的狀況多是成功的,也有多是失敗的,爲了確保消息已經成功處理而後隊列刪除消息,若是失敗則進行其餘機制,以避免消息一直重複在隊列裏面,或消息因消費者宕機而丟失。

消息確認(ack)

若是消息成功地被消費者處理的話,須要有一個消息確認的機制。
rabbitmq提供兩種確認機制:

  1. 自動確認模式,隊列將消息發送給消費者以後當即刪除消息(basic.deliver或basic.get-ok)
  2. 顯式確認模式,待消費者發送接受成功以後刪除(basic.ack)

通常而言咱們用的更多的是顯式確認模式,若是消費者接收到消息沒有進行確認以後就宕機了,隊列裏面的該消息仍是會存在的,而後會把消息轉發到其餘消費者。

消息拒絕(basic.reject)

若是消費者對消息的處理出現了一些問題,能夠調用rabbitmq的basic.reject來拒絕消息,拒絕消息以後,能夠作的是把消息放回到隊列裏面,或者直接刪除消息。

其實若是出現問題的消息,即使是交給其餘的消費者,很會很大機率繼續出現問題,這時候咱們能夠把消息放到其餘專門處理記錄問題的隊列裏面,交由另外的消費者處理。

交換機

交換機更像是消息的路由層,隊列綁定到交換機,而後發佈者能夠發送的消息都是通過交換機的,而後經由消息的remote key(路由鍵)路由到交換機所綁定的隊列裏。
交換機分爲4種類型:

1.直連交換機(direct)

直連型交換機(direct exchange)是根據消息攜帶的路由鍵(routing key)將消息投遞給對應隊列的。

  1. 將一個隊列綁定到某個交換機上,同時賦予該綁定一個路由鍵(routing key)
  2. 當一個攜帶着路由鍵爲R的消息被髮送給直連交換機時,交換機會把它路由給綁定值一樣爲R的隊列。

其實初嘗rabbitmq的例子裏面,看上去沒有綁定交換機,實際上也是綁定了直連交換機,只是是一個特殊的預先聲明好的,名字爲空字符串的交換機,叫默認交換機,每一個隊列都會自動綁定到默認交換機上。

2.扇形交換機(funout)

扇型交換機(funout exchange)將消息路由給綁定到它身上的全部隊列,而不理會綁定的路由鍵。若是N個隊列綁定到某個扇型交換機上,當有消息發送給此扇型交換機時,交換機會將消息的拷貝分別發送給這全部的N個隊列。扇型用來交換機處理消息的廣播路由(broadcast routing)。

3.主題交換機(topic)

主題交換機(topic exchanges)經過對消息的路由鍵和隊列到交換機的綁定模式之間的匹配,將消息路由給一個或多個隊列。主題交換機常常用來實現各類分發/訂閱模式及其變種。主題交換機一般用來實現消息的多播路由(multicast routing)。
主題交換機在我看來就像添加了簡單的通配符+字符串來達到一個路由的規則。

4.頭交換機(headers)

頭交換機用的不是不少,有時消息的路由操做會涉及到多個屬性,此時使用消息頭就比用路由鍵更容易表達,頭交換機(headers exchange)就是爲此而生的。頭交換機使用多個消息屬性來代替路由鍵創建路由規則。經過判斷消息頭的值可否與指定的綁定相匹配來確立路由規則。

扇形路由器實現廣播

生產者代碼

conn, _ := amqp.Dial("amqp://localhost")

ch, _ := conn.Channel()

ch.ExchangeDeclare(
    "hello",
    "fanout",
    true,
    false,
    false,
    false,
    nil,
    )


ch.Publish(
    "hello",
    "",     // 因爲是廣播,因此能夠不填寫路由鍵
    false,
    false,
    amqp.Publishing{
        DeliveryMode:amqp.Persistent,
        Body:[]byte("hello"+time.Now().String()),
    })

消費者代碼

conn, _ := amqp.Dial("amqp://localhost")

ch, _ := conn.Channel()
    
ch.ExchangeDeclare(
        "hello",  // 交換機名字
        "fanout", // 交換機類型
        true,     // durable
        false,    // autoDelete
        false,    // internal
        false,    // noWait
        nil,      // args
    )

q, _ := ch.QueueDeclare(
        "", 
        false,  // durable
        false,  // autoDelete
        true,   // exclusive
        false,  // noWait
        nil,    //
        )

ch.QueueBind(
    q.Name,     // queuename 
    "",         // remote key,因爲是廣播,能夠不填寫路由鍵
    "hello",    // exchange name
    false,      // nowait
    nil,
    )

msgs, _ := ch.Consume(q.Name,"", true, false, false,false,nil)

for msg := range msgs {
    log.Printf("%s", msg.Body)
}

主題交換機實現路由匹配

設有以下場景:設計一個日誌收集系統,日誌有不一樣的級別,debug,info,warn,error,日誌格式爲:

級別.模塊名字 如:info.login

有不一樣的隊列負責收集不一樣級別的日誌,其中有個隊列專門收集收集warn和error的數據,設計以下:
生產者

func main() {
    conn, _ := amqp.Dial("amqp://localhost")

    ch, _ := conn.Channel()


    ch.ExchangeDeclare(
        "logs",
        "topic",
        true,
        false,
        false,
        false,
        nil,
    )

    ch.Publish(
        "logs",
        "debug.123",
        false,
        false,
        amqp.Publishing{
            DeliveryMode:amqp.Persistent,
            Body:[]byte("hello"),
        },
    )
}

消費者

func main() {
    conn, _ := amqp.Dial("amqp://localhost")
    ch, _ := conn.Channel()

    ch.ExchangeDeclare(
        "logs",
        "topic",
        true,
        false,
        false,
        false,
        nil,
    )

    q, _ := ch.QueueDeclare(
        "log1",
        true,
        false,
        false,
        false,
        nil,
    )
    
    // 隊列綁定的remote key
    keys := []string{"error.*", "warn.*"}

    for _, key := range keys{
        ch.QueueBind(
            q.Name,
            key,
            "logs",
            false,
            nil,
        )
    }

    deliver, _ := ch.Consume(
        q.Name,
        "",
        false,
        false,
        false,
        false,
        nil,
    )

    for d:= range deliver {
        fmt.Println(string(d.Body))
        d.Ack(true)
    }
}
相關文章
相關標籤/搜索