golang rabbitmq 的學習

https://www.rabbitmq.com/tutorials/tutorial-one-go.htmlhtml

 

Rabbitmq的任務分發機制

producer_task.go

package main

import (
    "fmt"
    "github.com/streadway/amqp"
    "log"
    "math/rand"
    "os"
    "strings"
    "time"
)

const (
    //AMQP URI
    uri = "amqp://guest:guest@localhost:5672/"
    //Durable AMQP exchange name
    exchangeName = ""
    //Durable AMQP queue name
    queueName = "test-idoall-queues-task"
)

//若是存在錯誤,則輸出
func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
        panic(fmt.Sprintf("%s: %s", msg, err))
    }
}

func main() {
    bodyMsg := bodyFrom(os.Args)
    //調用發佈消息函數
    publish(uri, exchangeName, queueName, bodyMsg)
    log.Printf("published %dB OK", len(bodyMsg))
}

func bodyFrom(args []string) string {
    var s string
    if (len(args) < 2) || os.Args[1] == "" {
        s = "hello idoall.org"
    } else {
        s = strings.Join(args[1:], " ")
    }
    return s
}

//發佈者的方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名稱
//@queue, queue的名稱
//@body, 主體內容
func publish(amqpURI string, exchange string, queue string, body string) {
    //創建鏈接
    log.Printf("dialing %q", amqpURI)
    connection, err := amqp.Dial(amqpURI)
    failOnError(err, "Failed to connect to RabbitMQ")
    defer connection.Close()

    //建立一個Channel
    log.Printf("got Connection, getting Channel")
    channel, err := connection.Channel()
    failOnError(err, "Failed to open a channel")
    defer channel.Close()

    log.Printf("got queue, declaring %q", queue)

    //建立一個queue
    q, err := channel.QueueDeclare(
        queueName, // name
        false,     // durable
        false,     // delete when unused
        false,     // exclusive
        false,     // no-wait
        nil,       // arguments
    )
    failOnError(err, "Failed to declare a queue")

    log.Printf("declared queue, publishing %dB body (%q)", len(body), body)

    // Producer只能發送到exchange,它是不能直接發送到queue的。
    // 如今咱們使用默認的exchange(名字是空字符)。這個默認的exchange容許咱們發送給指定的queue。
    // routing_key就是指定的queue名字。
    tick := time.NewTicker(time.Millisecond * time.Duration(rand.Intn(1000)))
    for {
        <-tick.C
        err = channel.Publish(
            exchange, // exchange
            q.Name,   // routing key
            false,    // mandatory
            false,    // immediate
            amqp.Publishing{
                Headers:         amqp.Table{},
                ContentType:     "text/plain",
                ContentEncoding: "",
                Body:            []byte(body),
            })
    }
    failOnError(err, "Failed to publish a message")
}
consumer_task.go

package main

import (
    "bytes"
    "fmt"
    "github.com/streadway/amqp"
    "log"
    "time"
)

const (
    //AMQP URI
    uri = "amqp://guest:guest@localhost:5672/"
    //Durable AMQP exchange nam
    exchangeName = ""
    //Durable AMQP queue name
    queueName = "test-idoall-queues-task"
)

//若是存在錯誤,則輸出
func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
        panic(fmt.Sprintf("%s: %s", msg, err))
    }
}

func main() {
    //調用消息接收者
    consumer(uri, exchangeName, queueName)
}

//接收者方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名稱
//@queue, queue的名稱
func consumer(amqpURI string, exchange string, queue string) {
    //創建鏈接
    log.Printf("dialing %q", amqpURI)
    connection, err := amqp.Dial(amqpURI)
    failOnError(err, "Failed to connect to RabbitMQ")
    defer connection.Close()

    //建立一個Channel
    log.Printf("got Connection, getting Channel")
    channel, err := connection.Channel()
    failOnError(err, "Failed to open a channel")
    defer channel.Close()

    log.Printf("got queue, declaring %q", queue)

    //建立一個queue
    q, err := channel.QueueDeclare(
        queueName, // name
        false,     // durable
        false,     // delete when unused
        false,     // exclusive
        false,     // no-wait
        nil,       // arguments
    )
    failOnError(err, "Failed to declare a queue")

    log.Printf("Queue bound to Exchange, starting Consume")
    //訂閱消息
    msgs, err := channel.Consume(
        q.Name, // queue
        "",     // consumer
        false,  // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    //建立一個channel
    forever := make(chan bool)

    //調用gorountine
    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
            //*
            dot_count := bytes.Count(d.Body, []byte("."))
            t := time.Duration(dot_count)
            time.Sleep(t * time.Second)
            //*/
            log.Printf("Done")
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")

    //沒有寫入數據,一直等待讀,阻塞當前線程,目的是讓線程不退出
    <-forever
}

開了三個任務窗口接收,發現是並行接收的git

 

Message acknowledgment 消息確認

 

producer_acknowledgments.go

package main

import (
    "fmt"
    "log"
    "os"
    "strings"
    "github.com/streadway/amqp"
)

/**
 * use
 * go run producer_acknowledgments.go First message. && go run producer_acknowledgments.go Second message.. && go run producer_acknowledgments.go Third message... && go run producer_acknowledgments.go Fourth message.... && go run producer_acknowledgments.go Fifth message.....
 */
const (
    //AMQP URI
    uri          =  "amqp://guest:guest@localhost:5672/"
    //Durable AMQP exchange name
    exchangeName =  ""
    //Durable AMQP queue name
    queueName    = "test-idoall-queues-acknowledgments"
)

//若是存在錯誤,則輸出
func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
        panic(fmt.Sprintf("%s: %s", msg, err))
    }
}

func main(){
    bodyMsg := bodyFrom(os.Args)
    //調用發佈消息函數
    publish(uri, exchangeName, queueName, bodyMsg)
    log.Printf("published %dB OK", len(bodyMsg))
}

func bodyFrom(args []string) string {
    var s string
    if (len(args) < 2) || os.Args[1] == "" {
        s = "hello idoall.org"
    } else {
        s = strings.Join(args[1:], " ")
    }
    return s
}

//發佈者的方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名稱
//@queue, queue的名稱
//@body, 主體內容
func publish(amqpURI string, exchange string, queue string, body string){
    //創建鏈接
    log.Printf("dialing %q", amqpURI)
    connection, err := amqp.Dial(amqpURI)
    failOnError(err, "Failed to connect to RabbitMQ")
    defer connection.Close()

    //建立一個Channel
    log.Printf("got Connection, getting Channel")
    channel, err := connection.Channel()
    failOnError(err, "Failed to open a channel")
    defer channel.Close()

    log.Printf("got queue, declaring %q", queue)

    //建立一個queue
    q, err := channel.QueueDeclare(
        queueName, // name
        false,   // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )
    failOnError(err, "Failed to declare a queue")

    log.Printf("declared queue, publishing %dB body (%q)", len(body), body)

    // Producer只能發送到exchange,它是不能直接發送到queue的。
    // 如今咱們使用默認的exchange(名字是空字符)。這個默認的exchange容許咱們發送給指定的queue。
    // routing_key就是指定的queue名字。
    err = channel.Publish(
        exchange,     // exchange
        q.Name, // routing key
        false,  // mandatory
        false,  // immediate
        amqp.Publishing {
            Headers:         amqp.Table{},
            ContentType: "text/plain",
            ContentEncoding: "",
            Body:        []byte(body),
        })
    failOnError(err, "Failed to publish a message")
}
consumer_acknowledgments.go

package main

import (
    "fmt"
    "log"
    "bytes"
    "time"
    "github.com/streadway/amqp"
)

const (
    //AMQP URI
    uri           =  "amqp://guest:guest@localhost:5672/"
    //Durable AMQP exchange nam
    exchangeName  = ""
    //Durable AMQP queue name
    queueName     = "test-idoall-queues-acknowledgments"
)
/**
 *
 */
//若是存在錯誤,則輸出
func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
        panic(fmt.Sprintf("%s: %s", msg, err))
    }
}

func main(){
    //調用消息接收者
    consumer(uri, exchangeName, queueName)
}

//接收者方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名稱
//@queue, queue的名稱
func consumer(amqpURI string, exchange string, queue string){
    //創建鏈接
    log.Printf("dialing %q", amqpURI)
    connection, err := amqp.Dial(amqpURI)
    failOnError(err, "Failed to connect to RabbitMQ")
    defer connection.Close()

    //建立一個Channel
    log.Printf("got Connection, getting Channel")
    channel, err := connection.Channel()
    failOnError(err, "Failed to open a channel")
    defer channel.Close()

    log.Printf("got queue, declaring %q", queue)

    //建立一個queue
    q, err := channel.QueueDeclare(
        queueName, // name
        false,   // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )
    failOnError(err, "Failed to declare a queue")

    log.Printf("Queue bound to Exchange, starting Consume")
    //訂閱消息
    msgs, err := channel.Consume(
        q.Name, // queue
        "",     // consumer
        false,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    //建立一個channel
    forever := make(chan bool)

    //調用gorountine
    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
            dot_count := bytes.Count(d.Body, []byte("."))
            t := time.Duration(dot_count)
            time.Sleep(t * time.Second)
            log.Printf("Done")
            d.Ack(false)
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")

    //沒有寫入數據,一直等待讀,阻塞當前線程,目的是讓線程不退出
    <-forever
}

Message durability消息持久化

producer_durability.go
package main

import (
    "fmt"
    "log"
    "os"
    "strings"
    "github.com/streadway/amqp"
)


const (
    //AMQP URI
    uri          =  "amqp://guest:guest@localhost:5672/"
    //Durable AMQP exchange name
    exchangeName =  ""
    //Durable AMQP queue name
    queueName    = "test-idoall-queues-durability"
)

//若是存在錯誤,則輸出
func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
        panic(fmt.Sprintf("%s: %s", msg, err))
    }
}

func main(){
    bodyMsg := bodyFrom(os.Args)
    //調用發佈消息函數
    publish(uri, exchangeName, queueName, bodyMsg)
    log.Printf("published %dB OK", len(bodyMsg))
}

func bodyFrom(args []string) string {
    var s string
    if (len(args) < 2) || os.Args[1] == "" {
        s = "hello idoall.org"
    } else {
        s = strings.Join(args[1:], " ")
    }
    return s
}

//發佈者的方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名稱
//@queue, queue的名稱
//@body, 主體內容
func publish(amqpURI string, exchange string, queue string, body string){
    //創建鏈接
    log.Printf("dialing %q", amqpURI)
    connection, err := amqp.Dial(amqpURI)
    failOnError(err, "Failed to connect to RabbitMQ")
    defer connection.Close()

    //建立一個Channel
    log.Printf("got Connection, getting Channel")
    channel, err := connection.Channel()
    failOnError(err, "Failed to open a channel")
    defer channel.Close()

    log.Printf("got queue, declaring %q", queue)

    //建立一個queue
    q, err := channel.QueueDeclare(
        queueName, // name
        true,   // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )
    failOnError(err, "Failed to declare a queue")

    log.Printf("declared queue, publishing %dB body (%q)", len(body), body)

    // Producer只能發送到exchange,它是不能直接發送到queue的。
    // 如今咱們使用默認的exchange(名字是空字符)。這個默認的exchange容許咱們發送給指定的queue。
    // routing_key就是指定的queue名字。
    err = channel.Publish(
        exchange,     // exchange
        q.Name, // routing key
        false,  // mandatory
        false,  // immediate
        amqp.Publishing {
            Headers:         amqp.Table{},
            DeliveryMode: amqp.Persistent,
            ContentType: "text/plain",
            ContentEncoding: "",
            Body:        []byte(body),
        })
    failOnError(err, "Failed to publish a message")
}
consumer_durability.go
package main

import (
    "fmt"
    "log"
    "bytes"
    "time"
    "github.com/streadway/amqp"
)

const (
    //AMQP URI
    uri           =  "amqp://guest:guest@localhost:5672/"
    //Durable AMQP exchange nam
    exchangeName  = ""
    //Durable AMQP queue name
    queueName     = "test-idoall-queues-durability"
)

//若是存在錯誤,則輸出
func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
        panic(fmt.Sprintf("%s: %s", msg, err))
    }
}

func main(){
    //調用消息接收者
    consumer(uri, exchangeName, queueName)
}

//接收者方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名稱
//@queue, queue的名稱
func consumer(amqpURI string, exchange string, queue string){
    //創建鏈接
    log.Printf("dialing %q", amqpURI)
    connection, err := amqp.Dial(amqpURI)
    failOnError(err, "Failed to connect to RabbitMQ")
    defer connection.Close()

    //建立一個Channel
    log.Printf("got Connection, getting Channel")
    channel, err := connection.Channel()
    failOnError(err, "Failed to open a channel")
    defer channel.Close()

    log.Printf("got queue, declaring %q", queue)

    //建立一個queue
    q, err := channel.QueueDeclare(
        queueName, // name
        true,   // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )
    failOnError(err, "Failed to declare a queue")

    log.Printf("Queue bound to Exchange, starting Consume")
    //訂閱消息
    msgs, err := channel.Consume(
        q.Name, // queue
        "",     // consumer
        false,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    //建立一個channel
    forever := make(chan bool)

    //調用gorountine
    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
            dot_count := bytes.Count(d.Body, []byte("."))
            t := time.Duration(dot_count)
            time.Sleep(t * time.Second)
            log.Printf("Done")
            d.Ack(false)
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")

    //沒有寫入數據,一直等待讀,阻塞當前線程,目的是讓線程不退出
    <-forever
}

Fair dispatch 公平分發

producer_fair_dispatch.go
package main

import (
"fmt"
"log"
"os"
"strings"
"github.com/streadway/amqp"
)

const (
//AMQP URI
uri = "amqp://guest:guest@localhost:5672/"
//Durable AMQP exchange name
exchangeName = ""
//Durable AMQP queue name
queueName = "test-idoall-queues-fair_dispatch"
)

//若是存在錯誤,則輸出
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}

func main(){
bodyMsg := bodyFrom(os.Args)
//調用發佈消息函數
publish(uri, exchangeName, queueName, bodyMsg)
log.Printf("published %dB OK", len(bodyMsg))
}

func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello idoall.org"
} else {
s = strings.Join(args[1:], " ")
}
return s
}

//發佈者的方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名稱
//@queue, queue的名稱
//@body, 主體內容
func publish(amqpURI string, exchange string, queue string, body string){
//創建鏈接
log.Printf("dialing %q", amqpURI)
connection, err := amqp.Dial(amqpURI)
failOnError(err, "Failed to connect to RabbitMQ")
defer connection.Close()

//建立一個Channel
log.Printf("got Connection, getting Channel")
channel, err := connection.Channel()
failOnError(err, "Failed to open a channel")
defer channel.Close()

log.Printf("got queue, declaring %q", queue)

//建立一個queue
q, err := channel.QueueDeclare(
queueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

log.Printf("declared queue, publishing %dB body (%q)", len(body), body)

// Producer只能發送到exchange,它是不能直接發送到queue的。
// 如今咱們使用默認的exchange(名字是空字符)。這個默認的exchange容許咱們發送給指定的queue。
// routing_key就是指定的queue名字。
err = channel.Publish(
exchange, // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
Headers: amqp.Table{},
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
ContentEncoding: "",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
}
 
consumer_fair_dispatch.go
package main

import (
    "fmt"
    "log"
    "bytes"
    "time"
    "github.com/streadway/amqp"
)

const (
    //AMQP URI
    uri           =  "amqp://guest:guest@localhost:5672/"
    //Durable AMQP exchange nam
    exchangeName  = ""
    //Durable AMQP queue name
    queueName     = "test-idoall-queues-fair_dispatch"
)

//若是存在錯誤,則輸出
func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
        panic(fmt.Sprintf("%s: %s", msg, err))
    }
}

func main(){
    //調用消息接收者
    consumer(uri, exchangeName, queueName)
}

//接收者方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名稱
//@queue, queue的名稱
func consumer(amqpURI string, exchange string, queue string){
    //創建鏈接
    log.Printf("dialing %q", amqpURI)
    connection, err := amqp.Dial(amqpURI)
    failOnError(err, "Failed to connect to RabbitMQ")
    defer connection.Close()

    //建立一個Channel
    log.Printf("got Connection, getting Channel")
    channel, err := connection.Channel()
    failOnError(err, "Failed to open a channel")
    defer channel.Close()

    log.Printf("got queue, declaring %q", queue)

    //建立一個queue
    q, err := channel.QueueDeclare(
        queueName, // name
        true,   // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )
    failOnError(err, "Failed to declare a queue")

    //每次只取一條消息
    err = channel.Qos(
        1,     // prefetch count
        0,     // prefetch size
        false, // global
    )
    failOnError(err, "Failed to set QoS")

    log.Printf("Queue bound to Exchange, starting Consume")
    //訂閱消息
    msgs, err := channel.Consume(
        q.Name, // queue
        "",     // consumer
        false,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    //建立一個channel
    forever := make(chan bool)

    //調用gorountine
    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
            dot_count := bytes.Count(d.Body, []byte("."))
            t := time.Duration(dot_count)
            time.Sleep(t * time.Second)
            log.Printf("Done")
            d.Ack(false)
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")

    //沒有寫入數據,一直等待讀,阻塞當前線程,目的是讓線程不退出
    <-forever
}

Exchanges & Bindings

producer_exchange_logs.go
package main

import (
    "fmt"
    "log"
    "os"
    "strings"
    "github.com/streadway/amqp"
)


const (
    //AMQP URI
    uri          =  "amqp://guest:guest@localhost:5672/"
    //Durable AMQP exchange name
    exchangeName =  "test-idoall-exchange-logs"
    //Exchange type - direct|fanout|topic|x-custom
    exchangeType = "fanout"
    //AMQP routing key
    routingKey   = ""
)

//若是存在錯誤,則輸出
func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
        panic(fmt.Sprintf("%s: %s", msg, err))
    }
}

func main(){
    bodyMsg := bodyFrom(os.Args)
    //調用發佈消息函數
    publish(uri, exchangeName, exchangeType, routingKey, bodyMsg)
    log.Printf("published %dB OK", len(bodyMsg))
}


func bodyFrom(args []string) string {
    var s string
    if (len(args) < 2) || os.Args[1] == "" {
        s = "hello idoall.org"
    } else {
        s = strings.Join(args[1:], " ")
    }
    return s
}

//發佈者的方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名稱
//@exchangeType, exchangeType的類型direct|fanout|topic
//@routingKey, routingKey的名稱
//@body, 主體內容
func publish(amqpURI string, exchange string, exchangeType string, routingKey string, body string){
    //創建鏈接
    log.Printf("dialing %q", amqpURI)
    connection, err := amqp.Dial(amqpURI)
    failOnError(err, "Failed to connect to RabbitMQ")
    defer connection.Close()

    //建立一個Channel
    log.Printf("got Connection, getting Channel")
    channel, err := connection.Channel()
    failOnError(err, "Failed to open a channel")
    defer channel.Close()


    //建立一個queue
    log.Printf("got Channel, declaring %q Exchange (%q)", exchangeType, exchange)
    err = channel.ExchangeDeclare(
        exchange,     // name
        exchangeType, // type
        true,         // durable
        false,        // auto-deleted
        false,        // internal
        false,        // noWait
        nil,          // arguments
    )
    failOnError(err, "Failed to declare a queue")

    // 發佈消息
    log.Printf("declared queue, publishing %dB body (%q)", len(body), body)
    err = channel.Publish(
        exchange,     // exchange
        routingKey, // routing key
        false,  // mandatory
        false,  // immediate
        amqp.Publishing {
            Headers:         amqp.Table{},
            ContentType: "text/plain",
            ContentEncoding: "",
            Body:        []byte(body),
        })
    failOnError(err, "Failed to publish a message")
}

consumer_exchange_logs.gogithub

package main

import (
    "fmt"
    "log"
    "github.com/streadway/amqp"
)

const (
    //AMQP URI
    uri           =  "amqp://guest:guest@localhost:5672/"
    //Durable AMQP exchange name
    exchangeName =  "test-idoall-exchange-logs"
    //Exchange type - direct|fanout|topic|x-custom
    exchangeType = "fanout"
    //AMQP binding key
    bindingKey   = ""
    //Durable AMQP queue name
    queueName     = ""
)

//若是存在錯誤,則輸出
func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
        panic(fmt.Sprintf("%s: %s", msg, err))
    }
}

func main(){
    //調用消息接收者
    consumer(uri, exchangeName, exchangeType, queueName, bindingKey)
}

//接收者方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名稱
//@exchangeType, exchangeType的類型direct|fanout|topic
//@queue, queue的名稱
//@key , 綁定的key名稱
func consumer(amqpURI string, exchange string, exchangeType string, queue string, key string){
    //創建鏈接
    log.Printf("dialing %q", amqpURI)
    connection, err := amqp.Dial(amqpURI)
    failOnError(err, "Failed to connect to RabbitMQ")
    defer connection.Close()

    //建立一個Channel
    log.Printf("got Connection, getting Channel")
    channel, err := connection.Channel()
    failOnError(err, "Failed to open a channel")
    defer channel.Close()

    //建立一個exchange
    log.Printf("got Channel, declaring Exchange (%q)", exchange)
    err = channel.ExchangeDeclare(
        exchange,     // name of the exchange
        exchangeType, // type
        true,         // durable
        false,        // delete when complete
        false,        // internal
        false,        // noWait
        nil,          // arguments
    );
    failOnError(err, "Exchange Declare:")

    //建立一個queue
    q, err := channel.QueueDeclare(
        queueName, // name
        false,   // durable
        false,   // delete when unused
        true,   // exclusive 當Consumer關閉鏈接時,這個queue要被deleted
        false,   // no-wait
        nil,     // arguments
    )
    failOnError(err, "Failed to declare a queue")

    //綁定到exchange
    err = channel.QueueBind(
        q.Name, // name of the queue
        key,        // bindingKey
        exchange,   // sourceExchange
        false,      // noWait
        nil,        // arguments
    );
    failOnError(err, "Failed to bind a queue")

    log.Printf("Queue bound to Exchange, starting Consume")
    //訂閱消息
    msgs, err := channel.Consume(
        q.Name, // queue
        "",     // consumer
        false,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    //建立一個channel
    forever := make(chan bool)

    //調用gorountine
    go func() {
        for d := range msgs {
            log.Printf(" [x] %s", d.Body)
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")

    //沒有寫入數據,一直等待讀,阻塞當前線程,目的是讓線程不退出
    <-forever
}

 

遠程調用RPC

 rpc_server.go 
package main

import (
        "fmt"
        "log"
        "strconv"

        "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
        if err != nil {
                log.Fatalf("%s: %s", msg, err)
        }
}

func fib(n int) int {
        if n == 0 {
                return 0
        } else if n == 1 {
                return 1
        } else {
                return fib(n-1) + fib(n-2)
        }
}

func main() {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        q, err := ch.QueueDeclare(
                "rpc_queue", // name
                false,       // durable
                false,       // delete when usused
                false,       // exclusive
                false,       // no-wait
                nil,         // arguments
        )
        failOnError(err, "Failed to declare a queue")

        err = ch.Qos(
                1,     // prefetch count
                0,     // prefetch size
                false, // global
        )
        failOnError(err, "Failed to set QoS")

        msgs, err := ch.Consume(
                q.Name, // queue
                "",     // consumer
                false,  // auto-ack
                false,  // exclusive
                false,  // no-local
                false,  // no-wait
                nil,    // args
        )
        failOnError(err, "Failed to register a consumer")

        forever := make(chan bool)

        go func() {
                for d := range msgs {
                        n, err := strconv.Atoi(string(d.Body))
                        failOnError(err, "Failed to convert body to integer")

                        log.Printf(" [.] fib(%d)", n)
                        response := fib(n)

                        err = ch.Publish(
                                "",        // exchange
                                d.ReplyTo, // routing key
                                false,     // mandatory
                                false,     // immediate
                                amqp.Publishing{
                                        ContentType:   "text/plain",
                                        CorrelationId: d.CorrelationId,
                                        Body:          []byte(strconv.Itoa(response)),
                                })
                        failOnError(err, "Failed to publish a message")

                        d.Ack(false)
                }
        }()

        log.Printf(" [*] Awaiting RPC requests")
        <-forever
}
 rpc_client.go
package main

import (
        "fmt"
        "log"
        "math/rand"
        "os"
        "strconv"
        "strings"
        "time"

        "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
        if err != nil {
                log.Fatalf("%s: %s", msg, err)
        }
}

func randomString(l int) string {
        bytes := make([]byte, l)
        for i := 0; i < l; i++ {
                bytes[i] = byte(randInt(65, 90))
        }
        return string(bytes)
}

func randInt(min int, max int) int {
        return min + rand.Intn(max-min)
}

func fibonacciRPC(n int) (res int, err error) {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        q, err := ch.QueueDeclare(
                "",    // name
                false, // durable
                false, // delete when usused
                true,  // exclusive
                false, // noWait
                nil,   // arguments
        )
        failOnError(err, "Failed to declare a queue")

        msgs, err := ch.Consume(
                q.Name, // queue
                "",     // consumer
                true,   // auto-ack
                false,  // exclusive
                false,  // no-local
                false,  // no-wait
                nil,    // args
        )
        failOnError(err, "Failed to register a consumer")

        corrId := randomString(32)

        err = ch.Publish(
                "",          // exchange
                "rpc_queue", // routing key
                false,       // mandatory
                false,       // immediate
                amqp.Publishing{
                        ContentType:   "text/plain",
                        CorrelationId: corrId,
                        ReplyTo:       q.Name,
                        Body:          []byte(strconv.Itoa(n)),
                })
        failOnError(err, "Failed to publish a message")

        for d := range msgs {
                if corrId == d.CorrelationId {
                        res, err = strconv.Atoi(string(d.Body))
                        failOnError(err, "Failed to convert body to integer")
                        break
                }
        }

        return
}

func main() {
        rand.Seed(time.Now().UTC().UnixNano())

        n := bodyFrom(os.Args)

        log.Printf(" [x] Requesting fib(%d)", n)
        res, err := fibonacciRPC(n)
        failOnError(err, "Failed to handle RPC request")

        log.Printf(" [.] Got %d", res)
}

func bodyFrom(args []string) int {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
                s = "30"
        } else {
                s = strings.Join(args[1:], " ")
        }
        n, err := strconv.Atoi(s)
        failOnError(err, "Failed to convert arg to integer")
        return n
}
相關文章
相關標籤/搜索