介紹:RabbitMQ是消息代理:它接受並轉發消息。您能夠將其視爲郵局:將要發佈的郵件放在郵箱中時,能夠確保Mailperson先生或女士最終將郵件傳遞給收件人。以此類推,RabbitMQ是一個郵箱,一個郵局和一個郵遞員。git
import "github.com/streadway/amqp"
// 定義mqurl 格式:amqp:賬號:密碼@服務器地址:RabbitMQ端口號/虛擬host
const MQURL = "amqp:root:123456@127.0.0.1:5672/imooc"
// 定義結構體
type RabbitMQ struct {
// 鏈接
conn *amqp.Connection
// 頻道
channel *amqp.Channel
// 隊列名稱
QueueName string
// 交換機
Exchange string
// key
Key string
// 鏈接信息
MqUrl string
}
// 實例化
func newRabbitMQ(queueName string, exchange string, key string) {
rabbitmq := RabbitMQ{QueueName: queueName, Exchange: exchange, Key: key, MqUrl: MQURL}
// 定義錯誤
var err error
// 建立鏈接
rabbitmq.Conn, err = amqp.Dial(rabbitMQ.MqUrl)
rabbitmq.FailOnError(err, "建立鏈接錯誤")
// 獲取channel
rabbitmq.Channel, err = rabbitmq.Conn.Channel
rabbitmq.FailOnError(err, "獲取channel失敗")
}
// 定義錯誤處理
func (r *RabbitMQ) FailOnError(err error, message string) {
if err != nil {
log.Fatalf("%s:%s", message, err)
panic(fmt.Sprintf("%s:%s", message, err))
}
}
複製代碼
publish ->隊列 -> consumegithub
// 建立簡單模式的函數
func NewRabbitSimple(queueName string) {
// 實例化簡單模式的RabbitMQ
rabbitmq := newRabbitMQ(queueName, "", "")
return rabbitmq
}
// 建立簡單模式生產者
func (r *RabbitMQ) PublishSimple() {
//1.申請隊列,若是隊列不存在自動建立,若是存在則跳過建立
// 保證隊列存在,消息能發送到隊列中
_, err := r.channel.QueueDeclare(
r.QueueName,
// 是否持久化
false,
// 是否自動刪除
false,
// 是否具備排他性
false,
// 是否阻塞
false,
// 額外屬性
nil
)
if err != nil {
fmt.Println(err)
}
// 2.發送消息到隊列
r.channel.Publish(
r.Exchange,
r.QueueName,
false,
false,
amqp.Publishing{ContentType: "text/plain", Body: []byte(message)}
)
}
// 建立簡單模式消費者
func (r *RabbitMQ) ConsumeSimple() {
// 申明隊列 和上面同樣
_, err := r.Channel.QueueDeclear(r.QueueName, false, false, false, false, nil)
if err != nil {
fmt.Println(err)
}
// 接收消息
msg, err := r.Channel.Consume(
r.QueueName,
// 用來區分消費者
"",
// 是否自動應答
false,
// 是否具備排他性
false,
// 表示不能將同一個connection中生產者的消息發送給同一個connection中的消費者
false,
// 隊列消費是否阻塞 false就是不阻塞
false,
nil,
)
if err != nil {
fmt.Println(err)
}
forever := make(chan int)
go func() {
for d:= range msg{
// 對消息處理
fmt.Println(string(d.Body))
}
}()
// 阻塞主程序結束
<-forever
}
複製代碼