https://www.rabbitmq.com/tutorials/tutorial-one-go.htmlhtml
producer_task.go package main import ( "fmt" "github.com/streadway/amqp" "log" "math/rand" "os" "strings" "time" ) const ( //AMQP URI uri = "amqp://guest:guest@localhost:5672/" //Durable AMQP exchange name exchangeName = "" //Durable AMQP queue name queueName = "test-idoall-queues-task" ) //若是存在錯誤,則輸出 func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { bodyMsg := bodyFrom(os.Args) //調用發佈消息函數 publish(uri, exchangeName, queueName, bodyMsg) log.Printf("published %dB OK", len(bodyMsg)) } func bodyFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "hello idoall.org" } else { s = strings.Join(args[1:], " ") } return s } //發佈者的方法 // //@amqpURI, amqp的地址 //@exchange, exchange的名稱 //@queue, queue的名稱 //@body, 主體內容 func publish(amqpURI string, exchange string, queue string, body string) { //創建鏈接 log.Printf("dialing %q", amqpURI) connection, err := amqp.Dial(amqpURI) failOnError(err, "Failed to connect to RabbitMQ") defer connection.Close() //建立一個Channel log.Printf("got Connection, getting Channel") channel, err := connection.Channel() failOnError(err, "Failed to open a channel") defer channel.Close() log.Printf("got queue, declaring %q", queue) //建立一個queue q, err := channel.QueueDeclare( queueName, // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") log.Printf("declared queue, publishing %dB body (%q)", len(body), body) // Producer只能發送到exchange,它是不能直接發送到queue的。 // 如今咱們使用默認的exchange(名字是空字符)。這個默認的exchange容許咱們發送給指定的queue。 // routing_key就是指定的queue名字。 tick := time.NewTicker(time.Millisecond * time.Duration(rand.Intn(1000))) for { <-tick.C err = channel.Publish( exchange, // exchange q.Name, // routing key false, // mandatory false, // immediate amqp.Publishing{ Headers: amqp.Table{}, ContentType: "text/plain", ContentEncoding: "", Body: []byte(body), }) } failOnError(err, "Failed to publish a message") }
consumer_task.go package main import ( "bytes" "fmt" "github.com/streadway/amqp" "log" "time" ) const ( //AMQP URI uri = "amqp://guest:guest@localhost:5672/" //Durable AMQP exchange nam exchangeName = "" //Durable AMQP queue name queueName = "test-idoall-queues-task" ) //若是存在錯誤,則輸出 func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { //調用消息接收者 consumer(uri, exchangeName, queueName) } //接收者方法 // //@amqpURI, amqp的地址 //@exchange, exchange的名稱 //@queue, queue的名稱 func consumer(amqpURI string, exchange string, queue string) { //創建鏈接 log.Printf("dialing %q", amqpURI) connection, err := amqp.Dial(amqpURI) failOnError(err, "Failed to connect to RabbitMQ") defer connection.Close() //建立一個Channel log.Printf("got Connection, getting Channel") channel, err := connection.Channel() failOnError(err, "Failed to open a channel") defer channel.Close() log.Printf("got queue, declaring %q", queue) //建立一個queue q, err := channel.QueueDeclare( queueName, // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") log.Printf("Queue bound to Exchange, starting Consume") //訂閱消息 msgs, err := channel.Consume( q.Name, // queue "", // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") //建立一個channel forever := make(chan bool) //調用gorountine go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) //* dot_count := bytes.Count(d.Body, []byte(".")) t := time.Duration(dot_count) time.Sleep(t * time.Second) //*/ log.Printf("Done") } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") //沒有寫入數據,一直等待讀,阻塞當前線程,目的是讓線程不退出 <-forever }
開了三個任務窗口接收,發現是並行接收的git
producer_acknowledgments.go package main import ( "fmt" "log" "os" "strings" "github.com/streadway/amqp" ) /** * use * go run producer_acknowledgments.go First message. && go run producer_acknowledgments.go Second message.. && go run producer_acknowledgments.go Third message... && go run producer_acknowledgments.go Fourth message.... && go run producer_acknowledgments.go Fifth message..... */ const ( //AMQP URI uri = "amqp://guest:guest@localhost:5672/" //Durable AMQP exchange name exchangeName = "" //Durable AMQP queue name queueName = "test-idoall-queues-acknowledgments" ) //若是存在錯誤,則輸出 func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main(){ bodyMsg := bodyFrom(os.Args) //調用發佈消息函數 publish(uri, exchangeName, queueName, bodyMsg) log.Printf("published %dB OK", len(bodyMsg)) } func bodyFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "hello idoall.org" } else { s = strings.Join(args[1:], " ") } return s } //發佈者的方法 // //@amqpURI, amqp的地址 //@exchange, exchange的名稱 //@queue, queue的名稱 //@body, 主體內容 func publish(amqpURI string, exchange string, queue string, body string){ //創建鏈接 log.Printf("dialing %q", amqpURI) connection, err := amqp.Dial(amqpURI) failOnError(err, "Failed to connect to RabbitMQ") defer connection.Close() //建立一個Channel log.Printf("got Connection, getting Channel") channel, err := connection.Channel() failOnError(err, "Failed to open a channel") defer channel.Close() log.Printf("got queue, declaring %q", queue) //建立一個queue q, err := channel.QueueDeclare( queueName, // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") log.Printf("declared queue, publishing %dB body (%q)", len(body), body) // Producer只能發送到exchange,它是不能直接發送到queue的。 // 如今咱們使用默認的exchange(名字是空字符)。這個默認的exchange容許咱們發送給指定的queue。 // routing_key就是指定的queue名字。 err = channel.Publish( exchange, // exchange q.Name, // routing key false, // mandatory false, // immediate amqp.Publishing { Headers: amqp.Table{}, ContentType: "text/plain", ContentEncoding: "", Body: []byte(body), }) failOnError(err, "Failed to publish a message") }
consumer_acknowledgments.go package main import ( "fmt" "log" "bytes" "time" "github.com/streadway/amqp" ) const ( //AMQP URI uri = "amqp://guest:guest@localhost:5672/" //Durable AMQP exchange nam exchangeName = "" //Durable AMQP queue name queueName = "test-idoall-queues-acknowledgments" ) /** * */ //若是存在錯誤,則輸出 func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main(){ //調用消息接收者 consumer(uri, exchangeName, queueName) } //接收者方法 // //@amqpURI, amqp的地址 //@exchange, exchange的名稱 //@queue, queue的名稱 func consumer(amqpURI string, exchange string, queue string){ //創建鏈接 log.Printf("dialing %q", amqpURI) connection, err := amqp.Dial(amqpURI) failOnError(err, "Failed to connect to RabbitMQ") defer connection.Close() //建立一個Channel log.Printf("got Connection, getting Channel") channel, err := connection.Channel() failOnError(err, "Failed to open a channel") defer channel.Close() log.Printf("got queue, declaring %q", queue) //建立一個queue q, err := channel.QueueDeclare( queueName, // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") log.Printf("Queue bound to Exchange, starting Consume") //訂閱消息 msgs, err := channel.Consume( q.Name, // queue "", // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") //建立一個channel forever := make(chan bool) //調用gorountine go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) dot_count := bytes.Count(d.Body, []byte(".")) t := time.Duration(dot_count) time.Sleep(t * time.Second) log.Printf("Done") d.Ack(false) } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") //沒有寫入數據,一直等待讀,阻塞當前線程,目的是讓線程不退出 <-forever }
producer_durability.go package main import ( "fmt" "log" "os" "strings" "github.com/streadway/amqp" ) const ( //AMQP URI uri = "amqp://guest:guest@localhost:5672/" //Durable AMQP exchange name exchangeName = "" //Durable AMQP queue name queueName = "test-idoall-queues-durability" ) //若是存在錯誤,則輸出 func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main(){ bodyMsg := bodyFrom(os.Args) //調用發佈消息函數 publish(uri, exchangeName, queueName, bodyMsg) log.Printf("published %dB OK", len(bodyMsg)) } func bodyFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "hello idoall.org" } else { s = strings.Join(args[1:], " ") } return s } //發佈者的方法 // //@amqpURI, amqp的地址 //@exchange, exchange的名稱 //@queue, queue的名稱 //@body, 主體內容 func publish(amqpURI string, exchange string, queue string, body string){ //創建鏈接 log.Printf("dialing %q", amqpURI) connection, err := amqp.Dial(amqpURI) failOnError(err, "Failed to connect to RabbitMQ") defer connection.Close() //建立一個Channel log.Printf("got Connection, getting Channel") channel, err := connection.Channel() failOnError(err, "Failed to open a channel") defer channel.Close() log.Printf("got queue, declaring %q", queue) //建立一個queue q, err := channel.QueueDeclare( queueName, // name true, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") log.Printf("declared queue, publishing %dB body (%q)", len(body), body) // Producer只能發送到exchange,它是不能直接發送到queue的。 // 如今咱們使用默認的exchange(名字是空字符)。這個默認的exchange容許咱們發送給指定的queue。 // routing_key就是指定的queue名字。 err = channel.Publish( exchange, // exchange q.Name, // routing key false, // mandatory false, // immediate amqp.Publishing { Headers: amqp.Table{}, DeliveryMode: amqp.Persistent, ContentType: "text/plain", ContentEncoding: "", Body: []byte(body), }) failOnError(err, "Failed to publish a message") }
consumer_durability.go package main import ( "fmt" "log" "bytes" "time" "github.com/streadway/amqp" ) const ( //AMQP URI uri = "amqp://guest:guest@localhost:5672/" //Durable AMQP exchange nam exchangeName = "" //Durable AMQP queue name queueName = "test-idoall-queues-durability" ) //若是存在錯誤,則輸出 func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main(){ //調用消息接收者 consumer(uri, exchangeName, queueName) } //接收者方法 // //@amqpURI, amqp的地址 //@exchange, exchange的名稱 //@queue, queue的名稱 func consumer(amqpURI string, exchange string, queue string){ //創建鏈接 log.Printf("dialing %q", amqpURI) connection, err := amqp.Dial(amqpURI) failOnError(err, "Failed to connect to RabbitMQ") defer connection.Close() //建立一個Channel log.Printf("got Connection, getting Channel") channel, err := connection.Channel() failOnError(err, "Failed to open a channel") defer channel.Close() log.Printf("got queue, declaring %q", queue) //建立一個queue q, err := channel.QueueDeclare( queueName, // name true, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") log.Printf("Queue bound to Exchange, starting Consume") //訂閱消息 msgs, err := channel.Consume( q.Name, // queue "", // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") //建立一個channel forever := make(chan bool) //調用gorountine go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) dot_count := bytes.Count(d.Body, []byte(".")) t := time.Duration(dot_count) time.Sleep(t * time.Second) log.Printf("Done") d.Ack(false) } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") //沒有寫入數據,一直等待讀,阻塞當前線程,目的是讓線程不退出 <-forever }
producer_fair_dispatch.go
package main
import (
"fmt"
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
const (
//AMQP URI
uri = "amqp://guest:guest@localhost:5672/"
//Durable AMQP exchange name
exchangeName = ""
//Durable AMQP queue name
queueName = "test-idoall-queues-fair_dispatch"
)
//若是存在錯誤,則輸出
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main(){
bodyMsg := bodyFrom(os.Args)
//調用發佈消息函數
publish(uri, exchangeName, queueName, bodyMsg)
log.Printf("published %dB OK", len(bodyMsg))
}
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello idoall.org"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
//發佈者的方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名稱
//@queue, queue的名稱
//@body, 主體內容
func publish(amqpURI string, exchange string, queue string, body string){
//創建鏈接
log.Printf("dialing %q", amqpURI)
connection, err := amqp.Dial(amqpURI)
failOnError(err, "Failed to connect to RabbitMQ")
defer connection.Close()
//建立一個Channel
log.Printf("got Connection, getting Channel")
channel, err := connection.Channel()
failOnError(err, "Failed to open a channel")
defer channel.Close()
log.Printf("got queue, declaring %q", queue)
//建立一個queue
q, err := channel.QueueDeclare(
queueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
log.Printf("declared queue, publishing %dB body (%q)", len(body), body)
// Producer只能發送到exchange,它是不能直接發送到queue的。
// 如今咱們使用默認的exchange(名字是空字符)。這個默認的exchange容許咱們發送給指定的queue。
// routing_key就是指定的queue名字。
err = channel.Publish(
exchange, // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
Headers: amqp.Table{},
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
ContentEncoding: "",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
}
consumer_fair_dispatch.go package main import ( "fmt" "log" "bytes" "time" "github.com/streadway/amqp" ) const ( //AMQP URI uri = "amqp://guest:guest@localhost:5672/" //Durable AMQP exchange nam exchangeName = "" //Durable AMQP queue name queueName = "test-idoall-queues-fair_dispatch" ) //若是存在錯誤,則輸出 func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main(){ //調用消息接收者 consumer(uri, exchangeName, queueName) } //接收者方法 // //@amqpURI, amqp的地址 //@exchange, exchange的名稱 //@queue, queue的名稱 func consumer(amqpURI string, exchange string, queue string){ //創建鏈接 log.Printf("dialing %q", amqpURI) connection, err := amqp.Dial(amqpURI) failOnError(err, "Failed to connect to RabbitMQ") defer connection.Close() //建立一個Channel log.Printf("got Connection, getting Channel") channel, err := connection.Channel() failOnError(err, "Failed to open a channel") defer channel.Close() log.Printf("got queue, declaring %q", queue) //建立一個queue q, err := channel.QueueDeclare( queueName, // name true, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") //每次只取一條消息 err = channel.Qos( 1, // prefetch count 0, // prefetch size false, // global ) failOnError(err, "Failed to set QoS") log.Printf("Queue bound to Exchange, starting Consume") //訂閱消息 msgs, err := channel.Consume( q.Name, // queue "", // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") //建立一個channel forever := make(chan bool) //調用gorountine go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) dot_count := bytes.Count(d.Body, []byte(".")) t := time.Duration(dot_count) time.Sleep(t * time.Second) log.Printf("Done") d.Ack(false) } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") //沒有寫入數據,一直等待讀,阻塞當前線程,目的是讓線程不退出 <-forever }
producer_exchange_logs.go package main import ( "fmt" "log" "os" "strings" "github.com/streadway/amqp" ) const ( //AMQP URI uri = "amqp://guest:guest@localhost:5672/" //Durable AMQP exchange name exchangeName = "test-idoall-exchange-logs" //Exchange type - direct|fanout|topic|x-custom exchangeType = "fanout" //AMQP routing key routingKey = "" ) //若是存在錯誤,則輸出 func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main(){ bodyMsg := bodyFrom(os.Args) //調用發佈消息函數 publish(uri, exchangeName, exchangeType, routingKey, bodyMsg) log.Printf("published %dB OK", len(bodyMsg)) } func bodyFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "hello idoall.org" } else { s = strings.Join(args[1:], " ") } return s } //發佈者的方法 // //@amqpURI, amqp的地址 //@exchange, exchange的名稱 //@exchangeType, exchangeType的類型direct|fanout|topic //@routingKey, routingKey的名稱 //@body, 主體內容 func publish(amqpURI string, exchange string, exchangeType string, routingKey string, body string){ //創建鏈接 log.Printf("dialing %q", amqpURI) connection, err := amqp.Dial(amqpURI) failOnError(err, "Failed to connect to RabbitMQ") defer connection.Close() //建立一個Channel log.Printf("got Connection, getting Channel") channel, err := connection.Channel() failOnError(err, "Failed to open a channel") defer channel.Close() //建立一個queue log.Printf("got Channel, declaring %q Exchange (%q)", exchangeType, exchange) err = channel.ExchangeDeclare( exchange, // name exchangeType, // type true, // durable false, // auto-deleted false, // internal false, // noWait nil, // arguments ) failOnError(err, "Failed to declare a queue") // 發佈消息 log.Printf("declared queue, publishing %dB body (%q)", len(body), body) err = channel.Publish( exchange, // exchange routingKey, // routing key false, // mandatory false, // immediate amqp.Publishing { Headers: amqp.Table{}, ContentType: "text/plain", ContentEncoding: "", Body: []byte(body), }) failOnError(err, "Failed to publish a message") }
consumer_exchange_logs.gogithub
package main import ( "fmt" "log" "github.com/streadway/amqp" ) const ( //AMQP URI uri = "amqp://guest:guest@localhost:5672/" //Durable AMQP exchange name exchangeName = "test-idoall-exchange-logs" //Exchange type - direct|fanout|topic|x-custom exchangeType = "fanout" //AMQP binding key bindingKey = "" //Durable AMQP queue name queueName = "" ) //若是存在錯誤,則輸出 func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main(){ //調用消息接收者 consumer(uri, exchangeName, exchangeType, queueName, bindingKey) } //接收者方法 // //@amqpURI, amqp的地址 //@exchange, exchange的名稱 //@exchangeType, exchangeType的類型direct|fanout|topic //@queue, queue的名稱 //@key , 綁定的key名稱 func consumer(amqpURI string, exchange string, exchangeType string, queue string, key string){ //創建鏈接 log.Printf("dialing %q", amqpURI) connection, err := amqp.Dial(amqpURI) failOnError(err, "Failed to connect to RabbitMQ") defer connection.Close() //建立一個Channel log.Printf("got Connection, getting Channel") channel, err := connection.Channel() failOnError(err, "Failed to open a channel") defer channel.Close() //建立一個exchange log.Printf("got Channel, declaring Exchange (%q)", exchange) err = channel.ExchangeDeclare( exchange, // name of the exchange exchangeType, // type true, // durable false, // delete when complete false, // internal false, // noWait nil, // arguments ); failOnError(err, "Exchange Declare:") //建立一個queue q, err := channel.QueueDeclare( queueName, // name false, // durable false, // delete when unused true, // exclusive 當Consumer關閉鏈接時,這個queue要被deleted false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") //綁定到exchange err = channel.QueueBind( q.Name, // name of the queue key, // bindingKey exchange, // sourceExchange false, // noWait nil, // arguments ); failOnError(err, "Failed to bind a queue") log.Printf("Queue bound to Exchange, starting Consume") //訂閱消息 msgs, err := channel.Consume( q.Name, // queue "", // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") //建立一個channel forever := make(chan bool) //調用gorountine go func() { for d := range msgs { log.Printf(" [x] %s", d.Body) } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") //沒有寫入數據,一直等待讀,阻塞當前線程,目的是讓線程不退出 <-forever }
rpc_server.go package main import ( "fmt" "log" "strconv" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } func fib(n int) int { if n == 0 { return 0 } else if n == 1 { return 1 } else { return fib(n-1) + fib(n-2) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() q, err := ch.QueueDeclare( "rpc_queue", // name false, // durable false, // delete when usused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") err = ch.Qos( 1, // prefetch count 0, // prefetch size false, // global ) failOnError(err, "Failed to set QoS") msgs, err := ch.Consume( q.Name, // queue "", // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { n, err := strconv.Atoi(string(d.Body)) failOnError(err, "Failed to convert body to integer") log.Printf(" [.] fib(%d)", n) response := fib(n) err = ch.Publish( "", // exchange d.ReplyTo, // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", CorrelationId: d.CorrelationId, Body: []byte(strconv.Itoa(response)), }) failOnError(err, "Failed to publish a message") d.Ack(false) } }() log.Printf(" [*] Awaiting RPC requests") <-forever }
rpc_client.go package main import ( "fmt" "log" "math/rand" "os" "strconv" "strings" "time" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } func randomString(l int) string { bytes := make([]byte, l) for i := 0; i < l; i++ { bytes[i] = byte(randInt(65, 90)) } return string(bytes) } func randInt(min int, max int) int { return min + rand.Intn(max-min) } func fibonacciRPC(n int) (res int, err error) { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() q, err := ch.QueueDeclare( "", // name false, // durable false, // delete when usused true, // exclusive false, // noWait nil, // arguments ) failOnError(err, "Failed to declare a queue") msgs, err := ch.Consume( q.Name, // queue "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") corrId := randomString(32) err = ch.Publish( "", // exchange "rpc_queue", // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", CorrelationId: corrId, ReplyTo: q.Name, Body: []byte(strconv.Itoa(n)), }) failOnError(err, "Failed to publish a message") for d := range msgs { if corrId == d.CorrelationId { res, err = strconv.Atoi(string(d.Body)) failOnError(err, "Failed to convert body to integer") break } } return } func main() { rand.Seed(time.Now().UTC().UnixNano()) n := bodyFrom(os.Args) log.Printf(" [x] Requesting fib(%d)", n) res, err := fibonacciRPC(n) failOnError(err, "Failed to handle RPC request") log.Printf(" [.] Got %d", res) } func bodyFrom(args []string) int { var s string if (len(args) < 2) || os.Args[1] == "" { s = "30" } else { s = strings.Join(args[1:], " ") } n, err := strconv.Atoi(s) failOnError(err, "Failed to convert arg to integer") return n }