golang可以使用庫github.com/streadway/amqp操做rabbitmqgit
安裝github
go get github.com/streadway/amqp
鏈接golang
conn, err := amqp.Dial(amqp://guest:guest@172.17.84.205:5672/)
創建通道code
ch, err := conn.Channel()
聲明Queuerabbitmq
q, err := ch.QueueDeclare( "testqueue", //Queue name true, //durable false, false, false, nil, )
其中durable設爲true則queue持久化,不然不會作持久化。get
發佈消息string
err = ch.Publish( "", //exchange q.Name, //routing key(queue name) false, false, amqp.Publishing{ DeliveryMode: amqp.Persistent, //Msg set as persistent ContentType: "text/plain", Body: []byte(msgBody), })
其中amqp.Publishing的DeliveryMode若是設爲amqp.Persistent則消息會持久化。
須要注意的是若是須要消息持久化Queue也是須要設定爲持久化纔有效it
接收消息test
msgs, err := ch.Consume( q.Name, "MsgWorkConsumer", false, //Auto Ack false, false, false, nil, )
其中Auto ack能夠設置爲true。若是設爲true則消費者一接收到就從queue中去除了,若是消費者處理消息中發生意外該消息就丟失了。
若是Auto ack設爲false。consumer在處理完消息後,調用msg.Ack(false)後消息才從queue中去除。即使當前消費者處理該消息發生意外,只要沒有執行msg.Ack(false)那該消息就仍然在queue中,不會丟失。import
生成的Queue在生成是設定的參數,下次使用時不能更改設定參數,不然會報錯
例子代碼以下
conf.go
package config const ( RMQADDR = "amqp://guest:guest@172.17.84.205:5672/" QUEUENAME = "msgQueueWithPersist" PRODUCERCNT = 5 CONSUMERCNT = 20 )
producer.go
package main import ( config "xxx/conf" "fmt" "log" "sync" "github.com/streadway/amqp" ) func main() { conn, err := amqp.Dial(config.RMQADDR) failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() var wg sync.WaitGroup wg.Add(config.PRODUCERCNT) for routine := 0; routine < config.PRODUCERCNT; routine++ { go func(routineNum int) { ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() q, err := ch.QueueDeclare( config.QUEUENAME, //Queue name true, //durable false, false, false, nil, ) failOnError(err, "Failed to declare a queue") for i := 0; i < 500; i++ { msgBody := fmt.Sprintf("Message_%d_%d", routineNum, i) err = ch.Publish( "", //exchange q.Name, //routing key false, false, amqp.Publishing{ DeliveryMode: amqp.Persistent, //Msg set as persistent ContentType: "text/plain", Body: []byte(msgBody), }) log.Printf(" [x] Sent %s", msgBody) failOnError(err, "Failed to publish a message") } wg.Done() }(routine) } wg.Wait() log.Println("All messages sent!!!!") } func failOnError(err error, msg string) { if err != nil { fmt.Printf("%s: %s\n", msg, err) } }
consumer.go
package main import ( config "xxx/conf" "fmt" "log" "github.com/streadway/amqp" ) func main() { conn, err := amqp.Dial(config.RMQADDR) failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() forever := make(chan bool) for routine := 0; routine < config.CONSUMERCNT; routine++ { go func(routineNum int) { ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() q, err := ch.QueueDeclare( config.QUEUENAME, true, //durable false, false, false, nil, ) failOnError(err, "Failed to declare a queue") msgs, err := ch.Consume( q.Name, "MsgWorkConsumer", false, //Auto Ack false, false, false, nil, ) if err != nil { log.Fatal(err) } for msg := range msgs { log.Printf("In %d consume a message: %s\n", 0, msg.Body) log.Printf("Done") msg.Ack(false) //Ack } }(routine) } <-forever } func failOnError(err error, msg string) { if err != nil { fmt.Printf("%s: %s\n", msg, err) } }