衆所周知RabbitMQ使用的是AMQP協議。咱們知道AMQP是一種網絡協議,可以支持符合要求的客戶端應用和消息中間件代理之間進行通訊。 數組
其中消息代理扮演的角色就是從生產者那兒接受消息,並根據既定的路由規則把接受到的消息發送給消息的處理者又稱消費者。由此能夠看出RabbitMQ在整個消息發送,處理的過程當中有三個比較重要的角色: 服務器
生產者:producer,消息生產者,就是投遞消息的程序 網絡
消息代理:broker,簡單來講就是消息隊列服務器實體,這裏簡單理解爲咱們安裝的RabbitMQ服務 spa
消費者:consumer,消息消費者,就是接受消息的程序 線程
接下來咱們將以一個簡單的控制檯程序來實現消息隊列的發送及接收(使用.NET版RabbitMQ客戶端): 代理
主要功能爲: 一個producer發送消息,一個consumer接收消息,並在控制檯打印出來。 中間件
使用Nuget添加RabbitMQ.Client程序包至項目中
Install-Package RabbitMQ.Client 隊列
建立消息的生產者 Producer.cs ,發送一條消息給消費者
-
using RabbitMQ.Client;
-
using System;
-
using System.Text;
-
-
namespace RabbitMQProducer
-
{
-
public class Producer
-
{
-
public static void Send()
-
{
-
//建立鏈接鏈接到RabbitMQ服務器,就是一個位於客戶端和Broker之間的TCP鏈接,建議共用此TCP鏈接,每次使用時建立一個新的channel便可,
-
var factory = new ConnectionFactory();
-
IConnection connection = null;
-
//方式1:使用AMQP協議URL amqp://username:password@hostname:port/virtual host 可經過http://127.0.0.1:15672/ RabbitMQWeb管理頁面查看每一個參數的具體內容
-
factory.Uri = "amqp://guest:guest@127.0.0.1:5672//";
-
connection = factory.CreateConnection();
-
-
////方式2:使用ConnectionFactory屬性賦值
-
//factory.UserName = ConnectionFactory.DefaultUser;
-
//factory.Password = ConnectionFactory.DefaultPass;
-
//factory.VirtualHost = ConnectionFactory.DefaultVHost;
-
//factory.HostName = "127.0.0.1"; //設置RabbitMQ服務器所在的IP或主機名
-
//factory.Port = AmqpTcpEndpoint.UseDefaultPort;
-
//connection = factory.CreateConnection();
-
-
////方式3:使用CreateConnection方法建立鏈接,默認使用第一個地址鏈接服務端,若是第一個不可用會依次使用後面的鏈接
-
//List<AmqpTcpEndpoint> endpoints = new List<AmqpTcpEndpoint>() {
-
// new AmqpTcpEndpoint() { HostName="localhost1",Port=5672},
-
// new AmqpTcpEndpoint() { HostName="localhost2",Port=5672},
-
// new AmqpTcpEndpoint() { HostName="localhost3",Port=5672},
-
// new AmqpTcpEndpoint() { HostName="localhost4",Port=5672}
-
//};
-
//connection = factory.CreateConnection(endpoints);
-
-
using (connection)
-
{
-
//建立一個消息通道,在客戶端的每一個鏈接裏,可創建多個channel,每一個channel表明一個會話任務。相似與Hibernate中的Session
-
//AMQP協議規定只有經過channel才能指定AMQP命令,因此僅僅在建立了connection後客戶端仍是不能發送消息的,必需要建立一個channel才行
-
//RabbitMQ建議客戶端線程之間不要共用Channel,至少要保證共用Channel的線程發送消息必須是串行的,可是建議儘可能共用Connection
-
using (IModel channel = connection.CreateModel())
-
{
-
//建立一個queue(消息隊列)
-
channel.QueueDeclare(
-
queue: "hello",
-
durable: false,
-
exclusive: false,
-
autoDelete: false,
-
arguments: null);
-
-
string message = "你好消費者,我是生產者發送的消息";
-
-
//往隊列中發出一條消息 使用了默認交換機而且綁定路由鍵(route key)與隊列名稱相同
-
channel.BasicPublish(
-
exchange: "",
-
routingKey: "hello",
-
basicProperties: null,
-
body: Encoding.UTF8.GetBytes(message));
-
-
Console.WriteLine($"我是生產者,我發送了一條消息{message}");
-
-
Console.WriteLine(" Press [enter] to exit.");
-
Console.ReadLine();
-
}
-
}
-
}
-
}
-
}
注意:1.隊列只會在它不存在的時候建立,屢次聲明並不會重複建立。 事件
2.信息的內容是字節數組,也就意味着能夠傳遞任何數據。 路由
3.建立消息的消費者Consumer.cs ,從隊列中獲取消息並打印到屏幕
-
using RabbitMQ.Client;
-
using RabbitMQ.Client.Events;
-
using System;
-
using System.Text;
-
-
namespace RabbitMQConsumer
-
{
-
public class Consumer
-
{
-
public static void Receive()
-
{
-
var factory = new ConnectionFactory();
-
factory.Uri = "amqp://guest:guest@127.0.0.1:5672//";
-
using (var connection = factory.CreateConnection())
-
{
-
using (var channel = connection.CreateModel())
-
{
-
//聲明隊列,主要爲了防止消息接收者先運行此程序,隊列還不存在時建立隊列
-
channel.QueueDeclare(
-
queue: "hello",
-
durable: false,
-
exclusive: false,
-
autoDelete: false,
-
arguments: null);
-
-
//建立事件驅動的消費者類型,儘可能不要使用while(ture)循環來獲取消息
-
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
-
consumer.Received += (model, ea) =>
-
{
-
var body = ea.Body;
-
var message = Encoding.UTF8.GetString(body);
-
Console.WriteLine(" 我是消費者我接收到消息: {0}", message);
-
};
-
-
//指定消費隊列
-
channel.BasicConsume(queue: "hello",
-
noAck: true,
-
consumer: consumer);
-
-
Console.WriteLine(" Press [enter] to exit.");
-
Console.ReadLine();
-
}
-
}
-
}
-
}
-
}
消息隊列的使用過程大體以下:
-
CreateConnection 建立一個鏈接鏈接到broker
-
CreateModel 建立一個channel 使用它來發送AMQP指令
-
ExchangeDeclare 建立一個exchange 對消息進行路由
-
QueueDeclare 建立一個queue 消息隊列 這是一個裝載消息的容器
-
QueueBind 把exchange和queue按照路由規則綁定起來
-
BasicPublish 往隊列中發送一條消息
-
BasicConsume 從隊列中獲取一條消息
exchange接收到消息後,就根據消息的key和已經設置的binding,進行消息路由,將消息投遞到一個或多個隊列裏。
本文中因爲使用了默認交換機因此並無用到 ExchangeDeclare和 QueueBind兩個方法
默認交換機其實是一個由消息代理預先聲明好的沒有名字(名字爲空字符串)的直連交換機。它有一個特殊的屬性使得它對於簡單應用特別有用處:那就是每一個新建隊列(queue)都會自動綁定到默認交換機上,綁定的路由鍵(routing key)名稱與隊列名稱相同