Golang RabbitMQ Demo

AMQP協議

AMQP(Advanced Message Queuing Protocol,高級消息隊列協議)是一個進程間傳遞異步消息的網絡協議。服務器

RabbitMQ 就是 amqp 協議的Erlang的實現。網絡

AMQP的模型架構的主要角色,生產者、消費者、交換器、隊列。架構

生產者、消費者、服務節點

  • 生產者(Producter) 消息投遞方
  • 消費者(Consumer) 消息接收方
  • 服務節點(Broker) 消息的服務節點,基本上能夠簡單的把一個broker當作一臺消息服務器

2019-12-17 10-34-47 的屏幕截圖.png

交換器、隊列、綁定

綁定

Rabbitmq中須要路由鍵和綁定鍵聯合使用才能使生產者成功投遞到隊列中去。異步

  • RoutingKey 生產者發送給交換器綁定的Key
  • BindingKey 交換器和隊列綁定的Key

生產者將消息投遞到交換器,經過交換器綁定的隊列,最終投遞到對應的隊列中去。
2019-12-17 10-55-51 的屏幕截圖.png性能

交換器

Rabbitmq共有4種交換器測試

  • fanout 把消息投遞到全部與此交換器綁定的隊列中
  • direct 把消息投遞到 BindingKey 和 RoutingKey 徹底匹配的隊列中
  • topic 規則匹配,BindingKey中存在兩種特殊字符ui

    • *匹配零個或多個單詞
    • #匹配一個單詞
  • header 不依賴於RoutingKey而是經過消息體中的headers屬性來進行匹配綁定,經過headers中的key和BindingKey徹底匹配,因爲性能較差通常用的比較少。

基本使用

在Golang中建立rabbitmq 生產者基本步驟是:spa

  1. 鏈接Connection
  2. 建立Channel
  3. 建立或鏈接一個交換器
  4. 建立或鏈接一個隊列
  5. 交換器綁定隊列
  6. 投遞消息
  7. 關閉Channel
  8. 關閉Connection

鏈接

// connection
connection, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
    panic(err)
}

// channel
channel, err := connection.Channel()
if err != nil {
    panic(err)
}

建立一個交換器

if err = channel.ExchangeDeclare("e1", "direct", true, false, false, true, nil); err != nil {
    panic(err)
}

參數解析:code

  • name 交換機名稱
  • kind 交換機類型
  • durable 持久化
  • autoDelete 是否自動刪除
  • internal 是不是內置交換機
  • noWait 是否等待服務器確認
  • args 其它配置

參數說明要點:blog

  • autoDelete:

自動刪除功能必需要在交換器曾經綁定過隊列或者交換器的狀況下,處於再也不使用的時候纔會自動刪除,
若是是剛剛建立的還沒有綁定隊列或者交換器的交換器或者早已建立只是未進行隊列或者交換器綁定的交換器是不會自動刪除的。

  • internal:

內置交換器是一種特殊的交換器,這種交換器不能直接接收生產者發送的消息,
只能做爲相似於隊列的方式綁定到另外一個交換器,來接收這個交換器中路由的消息,
內置交換器一樣能夠綁定隊列和路由消息,只是其接收消息的來源與普通交換器不一樣。

  • noWait

當noWait爲true時,聲明時無需等待服務器的確認。
該通道可能因爲錯誤而關閉。 添加一個NotifyClose偵聽器應對任何異常。

建立交換器還有一個差很少的方法(ExchangeDeclarePassive),他主要是假定交換已存在,並嘗試鏈接到
不存在的交換將致使RabbitMQ引起異常,可用於檢測交換的存在。

建立一個隊列

if _, err := channel.QueueDeclare("q1", true, false, false, true, nil); err != nil {
    panic(err)
}

參數解析:

  • name 隊列名稱
  • durable 持久化
  • autoDelete 自動刪除
  • exclusive 排他
  • noWait 是否等待服務器確認
  • args Table

參數說明要點:

  • exclusive 排他

排他隊列只對首次建立它的鏈接可見,排他隊列是基於鏈接(Connection)可見的,而且該鏈接內的全部信道(Channel)均可以訪問這個排他隊列,在這個鏈接斷開以後,該隊列自動刪除,因而可知這個隊列能夠說是綁到鏈接上的,對同一服務器的其餘鏈接不可見。
同一鏈接中不容許創建同名的排他隊列的
這種排他優先於持久化,即便設置了隊列持久化,在鏈接斷開後,該隊列也會自動刪除。
非排他隊列不依附於鏈接而存在,同一服務器上的多個鏈接均可以訪問這個隊列。

  • autoDelete 設置是否自動刪除。

爲true則設置隊列爲自動刪除。
自動刪除的前提是:至少有一個消費者鏈接到這個隊列,以後全部與這個隊列鏈接的消費者都斷開時,纔會自動刪除。
不能把這個參數錯誤地理解爲:"當鏈接到此隊列的全部客戶端斷開時,這個隊列自動刪除",由於生產者客戶端建立這個隊列,或者沒有消費者客戶端與這個隊列鏈接時,都不會自動刪除這個隊列。

建立隊列還有一個差很少的方法(QueueDeclarePassive),他主要是假定隊列已存在,並嘗試鏈接到
不存在的隊列將致使RabbitMQ引起異常,可用於檢測隊列的存在。

綁定交換器和隊列

if err = channel.QueueBind("q1", "q1Key", "e1", true, nil); err != nil {
    panic(err)
}

參數解析:

  • name 隊列名稱
  • key BindingKey 根據交換機類型來設定
  • exchange 交換機名稱
  • noWait 是否等待服務器確認
  • args Table

綁定交換器

if err = channel.ExchangeBind("dest", "q1Key", "src", false, nil); err != nil {
    panic(err)
}

參數解析:

  • destination 目的交換器
  • key RoutingKey 路由鍵
  • source 源交換器
  • noWait 是否等待服務器確認
  • args Table 其它參數

生產者發送消息至交換器source中,交換器source根據路由鍵找到與其匹配的另外一個交換器destination,井把消息轉發到destination中,進而存儲在.destination綁定的隊列queue中,某種程度上來講destination交換器能夠看做一個隊列。如圖:

2019-12-17 11-40-50 的屏幕截圖.png

投遞消息

if err = channel.Publish("e1", "q1Key", true, false, amqp.Publishing{
    Timestamp:   time.Now(),
    ContentType: "text/plain",
    Body:        []byte("Hello Golang and AMQP(Rabbitmq)!"),
}); err != nil {
    panic(err)
}

參數解析:

  • exchange 交換器名稱
  • key RouterKey
  • mandatory 是否爲沒法路由的消息進行返回處理
  • immediate 是否對路由到無消費者隊列的消息進行返回處理 RabbitMQ 3.0 廢棄
  • msg 消息體

參數說明要點:

  • mandatory

消息發佈的時候設置消息的 mandatory 屬性用於設置消息在發送到交換器以後沒法路由到隊列的狀況對消息的處理方式,
設置爲 true 表示將消息返回到生產者,不然直接丟棄消息。

  • immediate

參數告訴服務器至少將該消息路由到一個隊列中,不然將消息返回給生產者。imrnediate參數告訴服務器,若是該消息關聯的隊列上有消費者,則馬上投遞:若是全部匹配的隊列上都沒有消費者,則直接將消息返還給生產者,不用將消息存入隊列而等待消費者了。

RabbitMQ 3.0版本開始去掉了對imrnediate參數的支持

消費信息

Rabbitmq消費方式共有2種,分別是推模式和拉模式

推模式是經過持續訂閱的方式來消費信息,
Consume將信道(Channel)直爲接收模式,直到取消隊列的訂閱爲止。在接收模式期間,RabbitMQ會不斷地推送消息給消費者。
推送消息的個數仍是會受到channel.Qos的限制

deliveries, err := channel.Consume("q1", "any", false, false, false, true, nil)
if err != nil {
    panic(err)
}

若是ack設置爲false則表示須要手動進行ack消費

v, ok := <-deliveries
if ok {
    // 手動ack確認
    // 注意: 這裏只要調用了ack就是手動確認模式,
    // multiple 表示的是在此channel中先前全部未確認的deliveries都將被確認
    // 並非表示設置爲false就不進行當前ack確認
    if err := v.Ack(true); err != nil {
        fmt.Println(err.Error())
    }
} else {
    fmt.Println("Channel close")
}

參數解析:

  • queue 隊列名稱
  • consumer 消息者名稱
  • autoAck 是否確認消費
  • exclusive 排他
  • noLocal
  • noWait bool
  • args Table

參數說明要點:

  • noLocal

設置爲true則表示不能將同一個Connection中生產者發送的消息傳送給這個Connection中的消費者

拉模式:
相對來講比較簡單,是由消費者主動拉取信息來消費,一樣也須要進行ack確認消費

channel.Get(queue string, autoAck bool)

簡單示例Demo

下面是一個簡單示例,只是爲了通訊測試,單條數據收發

func Connection() (*amqp.Connection) {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        panic(err)
    }
    return conn
}

func Sample() {
    var wg sync.WaitGroup
    wg.Add(1)
    go SampleConsumption(&wg)

    // 建立鏈接
    connection := Connection()
    defer connection.Close()

    // 開啓 channel
    channel, err := connection.Channel()
    if err != nil {
        panic(err)
    }

    defer channel.Close()

    if err = channel.ExchangeDeclare("e1", "direct", true, false, false, true, nil); err != nil {
        panic(err)
    }

    if _, err := channel.QueueDeclare("q1", true, false, false, true, nil); err != nil {
        panic(err)
    }

    if err = channel.QueueBind("q1", "q1Key", "e1", true, nil); err != nil {
        panic(err)
    }

    // mandatory true 未找到隊列返回給消費者
    returnChan := make(chan amqp.Return,0)
    channel.NotifyReturn(returnChan)

    // Publish
    if err = channel.Publish("e1", "q1Key", true, false, amqp.Publishing{
        Timestamp:   time.Now(),
        ContentType: "text/plain",
        Body:        []byte("Hello Golang and AMQP(Rabbitmq)!"),
    }); err != nil {
        panic(err)
    }

    //for v := range returnChan{
    //    fmt.Printf("Return %#v\n",v)
    //}
    
    wg.Wait()
}

func SampleConsumption(wg *sync.WaitGroup) {
    connection := Connection()
    defer connection.Close()

    channel, err := connection.Channel()
    if err != nil {
        panic(err)
    }

    defer channel.Close()
    
    deliveries, err := channel.Consume("q1", "any", false, false, false, true, nil)
    if err != nil {
        panic(err)
    }

    // 這裏只取一條,由於product只發一條
    v, ok := <-deliveries
    if ok {
        if err := v.Ack(true); err != nil {
            fmt.Println(err.Error())
        }
    } else {
        fmt.Println("Channel close")
    }
    wg.Done()
}

來源:https://blog.crcms.cn/2019/09/29/go-ioc/

相關文章
相關標籤/搜索