rabbitmq消息中間件讀後感

四.RabbitMQ安裝與使用
1.安裝(rpm方式)
下載對應版本的Erlang和RabbitMQ。css

安裝erlang:html

安裝socat密鑰:
要先安裝socat再安裝rabbitmq,否則會報錯。
安裝rabbitmq:java

2.修改配置文件
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
默認端口號5672node

修改loopback_users:(保留guest也可)mysql


3.RabbitMq基本命令
關於服務的操做:linux

服務啓動:rabbitmqctl start_app / rabbitmq-server start &
這裏有用到主機名,能夠經過vim /etc/hostname來配置。git

能夠看到rabbitmq已經啓動:
查看服務是否啓動 lsof -i:5672github

服務中止:rabbitmqctl stop_app / rabbitmq-server stop
服務重啓:service rabbitmq-server restart
節點狀態:rabbitmqctl status
啓動管理控制檯:web

管理插件:rabbitmq-plugins enable rabbitmq_management
控制檯訪問地址:http://192.168.58.129:15672
若是既修改過配置文件,又開啓了插件,可是還訪問失敗,能夠看下是否是沒有關閉防火牆。
firewalld的基本使用
啓動: systemctl start firewalld
關閉: systemctl stop firewalld
查看狀態: systemctl status firewalld
開機禁用: systemctl disable firewalld
開機啓用: systemctl enable firewalld

redis

關於用戶的操做:

添加用戶:rabbitmqctl add_user username password
列出全部用戶:rabbitmqctl list_users
刪除用戶:rabbitmqctl delete_user username
清除用戶權限:rabbitmqctl clear_permissions -p vhostpath username
列出用戶權限:rabbitmqctl list_user_permissions username
修改密碼:rabbitmqctl change_password username newpassword
設置用戶權限:rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*"
關於虛擬主機的操做:

建立虛擬主機:rabbitmqctl add_vhost vhostpath
列出全部虛擬主機:rabbitmqctl list_vhost
列出虛擬主機上全部權限:rabbitmqctl list_permissions -p vhostpath
刪除虛擬主機:rabbitmqctl delete_vhost vhostpath
關於消息隊列的操做:

查看全部隊列信息:rabbitmqctl list_queues
清除隊列裏的消息:rabbitmqctl -p vhostpath purge_queue blue
等等。

4.RabbitMq高級命令
rabbitmqctl reset:移除全部數據,要在rabbitmqctl stop_app以後使用。
組成集羣命令:rabbitmqctl join_cluster <clusternode> [--ram] (ram內存級別存儲,disc磁盤)
查看集羣狀態:rabbitmqctl cluster_status
修改集羣節點的存儲形式:rabbitmqctl change_cluster_node_type disc | ram
忘記(摘除)節點:rabbitmqctl forget_cluster_node [--offline] (offline服務不啓動的狀況下)
修改節點名稱:rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2] [newnode2 ...]

 

1:RabbitMQ是一個開源的消息代理和隊列服務器,能夠經過基本協議在徹底不一樣的應用之間共享數據,使用Erlang語言開發的,是基於AMQP(高級消息隊列協議)協議,Erlang主要用於交換機的開發,有着與原生socket同樣的延遲這也是爲何RabbitMQ高性能的緣由

2:AMQP(高級消息隊列協議)協議模型

3:核心概念

server:又稱Broker
Connection:鏈接,應用程序和server的鏈接
channel:網絡信道,幾乎全部的操做都在channel上進行的,channel是進行消息讀寫的通道,每一個客戶端能夠創建多個channel,
每一個channel表明一個會話任務

message:有Properties和body組成,Properties能夠對消息進行修飾如消息的優先級,消息的延遲,body就是消息的內容

Virtual host:虛擬地址,用於進行邏輯隔離,最上面的消息路由,一個Virtual host能夠有若干個exchange和queue,可是一個Virtual host不能有相同的exchange和queue

 exchange:交換機接受消息,根據路由鍵轉發到綁定的隊列

binding:exchange和queue之間的虛擬鏈接;binding 能夠包含routing key

routing key:路由規則

4:依賴項目

<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
5:生產者代碼
public class Procuder {
public static void main(String[] args) throws Exception {
//1 建立一個ConnectionFactory, 並進行配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 經過鏈接工廠建立鏈接
Connection connection = connectionFactory.newConnection();
//3 經過connection建立一個Channel
Channel channel = connection.createChannel();
//4 經過Channel發送數據
for (int i = 0; i < 5; i++) {
String msg = "Hello RabbitMQ!";
//1 exchange 2 routingKey
//props是消息的一些屬性設置
//當exchange爲空時候會路由到和routingkey同樣名字的消息隊列
channel.basicPublish("", "test001", null, msg.getBytes());
}
//5 記得要關閉相關的鏈接
channel.close();
connection.close();
}
}

6:消費者

public class Consumer {
public static void main(String[] args) throws Exception {
//1 建立一個ConnectionFactory, 並進行配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 經過鏈接工廠建立鏈接
Connection connection = connectionFactory.newConnection();
//3 經過connection建立一個Channel
Channel channel = connection.createChannel();
//4 聲明(建立)一個隊列
String queueName = "test001";
//durable表明聲明的隊列持久化到服務器上
//exclusive是否爲當前鏈接的專用隊列,在鏈接斷開後,會自動刪除該隊列
//autoDelete 當沒有任何消費者使用時,自動刪除該隊列
//arguments 其餘隊列配置
channel.queueDeclare(queueName, true, false, false, null);
//5 建立消費者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//6 設置Channel
//autoAck
//自動確認
//指定autoAck參數,當autoAck=true時,一旦消費者接收到了消息,就視爲自動確認了消息。若是消費者在處理消息的過程當中,中間出錯就沒法從新處理該消息,因此須要常常性的在代碼裏進行手動確認。
//
//手動確認
//需設置autoAck=false,此時RabbitMQ會等待消費者顯式發回ACK信號後才從內存(或磁盤)中移去消息,消費者就有足夠的時間處理消息(任務),不用擔憂處理消息過程當中消費者進程掛掉後消息丟失的問題,由於RabbitMQ會一直持有消息直到消費者顯式調用basicAck爲止。(消息此處會有兩份,投遞出去的和broke自留的,若是投遞未收到Ack且鏈接斷開,broker將會把消息投遞給下一個消費者)
//通常使用手動確認會將消息的處理放在try/catch語句塊中,成功處理了,就給MQ一個確認應答,若是處理異常了,就在catch中,進行消息的拒絕。
channel.basicConsume(queueName, true, queueingConsumer);
while (true) {
//7 獲取消息
Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println("消費端: " + msg);
//Envelope envelope = delivery.getEnvelope();
}
}
}

7:direct exchange consumer

public class Consumer4DirectExchange {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//當出現網絡抖動的時候嘗試從新鏈接
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//4 聲明
String exchangeName = "test_direct_exchange";
String exchangeType = "direct";
String queueName = "test_direct_queue";
String routingKey = "test.direct";
//表示聲明瞭一個交換機
//internal 標識的是是不是內部使用 一般設置爲false便可
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
//表示聲明瞭一個隊列
channel.queueDeclare(queueName, false, false, false, null);
//創建一個綁定關係:
channel.queueBind(queueName, exchangeName, routingKey);
QueueingConsumer consumer = new QueueingConsumer(channel);
//參數:隊列名稱、是否自動ACK、Consumer
channel.basicConsume(queueName, true, consumer);
//循環獲取消息
while(true){
//獲取消息,若是沒有消息,這一步將會一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
}

 

8:direct exchange product

public class Producer4DirectExchange {
public static void main(String[] args) throws Exception {
//1 建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 建立Connection
Connection connection = connectionFactory.newConnection();
//3 建立Channel
Channel channel = connection.createChannel();
//4 聲明
String exchangeName = "test_direct_exchange";
String routingKey = "test.direct111";
//5 發送
String msg = "Hello World RabbitMQ 4 Direct Exchange Message 111 ... ";
channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());
}
}

9:全部發送到Direct Exchange的消息被轉發到RouteKey中指定的Queue。

  Direct模式,可使用rabbitMQ自帶的Exchange:default Exchange 。因此不須要將Exchange進行任何綁定(binding)操做 。消息傳遞時,RouteKey必須徹底匹配,纔會被隊列接收,不然該消息會被拋棄。

 

10:topic exchange 

一個exchange能夠有兩個不一樣的routekey 綁定同一個隊列

routingkey模糊匹配

 consumer
public class Consumer4TopicExchange {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//4 聲明
String exchangeName = "test_topic_exchange";
String exchangeType = "topic";
String queueName = "test_topic_queue";
//String routingKey = "user.*";匹配一個詞
//String routingKey = "user.#";匹配多個詞
String routingKey = "user.*";
// 1 聲明交換機
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 2 聲明隊列
channel.queueDeclare(queueName, false, false, false, null);
// 3 創建交換機和隊列的綁定關係:
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//參數:隊列名稱、是否自動ACK、Consumer
channel.basicConsume(queueName, true, consumer);
//循環獲取消息
while(true){
//獲取消息,若是沒有消息,這一步將會一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
}

product

public class Producer4TopicExchange {
public static void main(String[] args) throws Exception {
//1 建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 建立Connection
Connection connection = connectionFactory.newConnection();
//3 建立Channel
Channel channel = connection.createChannel();
//4 聲明
String exchangeName = "test_topic_exchange";
String routingKey1 = "user.save";
String routingKey2 = "user.update";
String routingKey3 = "user.delete.abc";
//5 發送
String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());
channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());
channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes());
channel.close();
connection.close();
}
}

11:fanout exchange

不須要要routingkey 路由 發送到全部exchange綁定的隊列上 因此性能最高

Producer
public class Producer4FanoutExchange {
public static void main(String[] args) throws Exception {
//1 建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 建立Connection
Connection connection = connectionFactory.newConnection();
//3 建立Channel
Channel channel = connection.createChannel();
//4 聲明
String exchangeName = "test_fanout_exchange";
//5 發送
for(int i = 0; i < 10; i ++) {
String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ...";
channel.basicPublish(exchangeName, "", null , msg.getBytes());
}
channel.close();
connection.close();
}
}
Consumer
public class Consumer4FanoutExchange {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//4 聲明
String exchangeName = "test_fanout_exchange";
String exchangeType = "fanout";
String queueName = "test_fanout_queue";
String routingKey = ""; //不設置路由鍵
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//參數:隊列名稱、是否自動ACK、Consumer
channel.basicConsume(queueName, true, consumer);
//循環獲取消息
while(true){
//獲取消息,若是沒有消息,這一步將會一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
}

12:message 相關屬性

product

Map<String, Object> headers = new HashMap<>();
headers.put("my1", "111");
headers.put("my2", "222");
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)//是否持久化
.contentEncoding("UTF-8")
.expiration("10000")//消息過時時間 當過時了會自動拋棄
.headers(headers)//自定義消息屬性
.build();

//4 經過Channel發送數據
for(int i=0; i < 5; i++){
String msg = "Hello RabbitMQ!";
//1 exchange 2 routingKey
channel.basicPublish("", "test001", properties, msg.getBytes());
}

consumer

while(true){
//7 獲取消息
Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println("消費端: " + msg);
Map<String, Object> headers = delivery.getProperties().getHeaders();
System.err.println("headers get my1 value: " + headers.get("my1"));

//Envelope envelope = delivery.getEnvelope();
}

 13:生產端消息可靠性投遞

   (1)保證消息的成功發出(2)保證mq節點的成功接收(3)發送端收到mq節點的確認應答(4)完善的消息補償機制

一般有兩種方式

  (1)消息落庫,對消息狀態進行打標

(2)消息的延遲投遞,作二次確認,回調檢查

消息落庫方案以下

消息的延遲投遞方案以下

14:消息重複投遞的冪等性問題

(1)經過惟一ID利用數據庫主鍵去重(高併發狀況下經過分庫分表來提升性能)

(2)利用redis原子性去實現()

 15:confirm確認機制

生產者投遞消息後若是broker收到消息就會給生產端一個應答,生產端接收應答用來確認這個消息是否被正常發送到broker這就是消息可靠性投遞的核心須要如下兩個步驟

(1)第一步須要在channel上開啓確認模式

//4 指定咱們的消息投遞模式: 消息的確認模式 
channel.confirmSelect();

(2)在channel上添加監聽addConfirmListener監聽成功和失敗的返回結果

//6 添加一個確認監聽
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.err.println("-------no ack!-----------");
}

@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.err.println("-------ack!-----------");
}
});
完整代碼以下
public class Producer {
public static void main(String[] args) throws Exception {
//1 建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 獲取C onnection
Connection connection = connectionFactory.newConnection();
//3 經過Connection建立一個新的Channel
Channel channel = connection.createChannel();
//4 指定咱們的消息投遞模式: 消息的確認模式
channel.confirmSelect();
String exchangeName = "test_confirm_exchange";
String routingKey = "confirm.save";
//5 發送一條消息
String msg = "Hello RabbitMQ Send confirm message!";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
//6 添加一個確認監聽
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.err.println("-------no ack!-----------");
//多是最大隊列已滿,磁盤不足等情況下觸發
}
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.err.println("-------ack!-----------");
}
});
}
}

 16:Return 消息機制

Return  Listener 用於處理監聽一些不可路由的消息(exchange不存在,或者routing key 路由不到)

mandatory爲true,監聽器就會接收路由不可達的消息而後作進一步處理
mandatory爲false 那麼broker會自動刪除消息
代碼以下:
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchange = "test_return_exchange";
String routingKey = "return.save";
String routingKeyError = "abc.save";
String msg = "Hello RabbitMQ Return Message";
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange,
String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {

System.err.println("---------handle return----------");
System.err.println("replyCode: " + replyCode);
System.err.println("replyText: " + replyText);
System.err.println("exchange: " + exchange);
System.err.println("routingKey: " + routingKey);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
});
//mandatory 爲true時監聽器就會接收路由不可達的消息而後作進一步處理,false 會自動刪除
      channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
//channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
}
}

 17:消費端自定義監聽

咱們經過while循環進行獲取消息進行消費處理,咱們也能夠自定義consumer更加方便解耦性更增強

 public class Consumer {

   public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_consumer_exchange";
String routingKey = "consumer.#";
String queueName = "test_consumer_queue";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
channel.basicConsume(queueName, true, new MyConsumer(channel));
}
}
MyConsumer代碼以下
public class MyConsumer extends DefaultConsumer {

public MyConsumer(Channel channel) {
super(channel);
}

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
//同一個會話, consumerTag 是固定的 能夠作此會話的名字, deliveryTag 每次接收消息+1,能夠作此消息處理通道的名字。

   //所以 deliveryTag 能夠用來回傳告訴 rabbitmq 這個消息處理成功 清除此消息(basicAck方法)


}

}


19:消費端限流
使用場景:巨量消息瞬間所有推送過來,單個客戶端沒法處理這麼多消息
在非自動確認消息的前提下。經過設置consumer或者channel設置的qos的值,若是必定數量的消息未被確認就不進行消息的消息
//1 限流方式  第一件事就是 autoAck設置爲 false
channel.basicQos(0, 1, false);
channel.basicConsume(queueName, false, new MyConsumer(channel));
basicQos(int prefetchSize, int prefetchCount, boolean global)
//global是否將以上設置應用於channel仍是consumer級別。

prefetchSize 和global這兩個配置rabbitmq暫時還沒實現
rabbitmq向第一個消費者投遞了prefetchCount條消息後,消費者未對prefetchCount條消息進行ack,rabbitmq不會再向該消費者投遞消息


手動簽收
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));

channel.basicAck(envelope.getDeliveryTag(), false);

}

20:消費端ack和nack
手工簽收與重回隊列 (通常實際工做中不會重回隊列)
消息消費者手工簽收 必需要關閉 autoAck = false
// 手工簽收 必需要關閉 autoAck = false
channel.basicConsume(queueName, false, new MyConsumer(channel));
MyConsumer代碼以下
public class MyConsumer extends DefaultConsumer {
private Channel channel ;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("body: " + new String(body));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if((Integer)properties.getHeaders().get("num") == 0) {
channel.basicNack(envelope.getDeliveryTag(), false, true);//true 重回隊列
} else {
channel.basicAck(envelope.getDeliveryTag(), false);//
}
}

21:TTL隊列與消息
能夠在消息上指定消息過時時間,也能夠在隊列上指定消息過時時間(從消息進入隊列開始到指定的超過隊列過時時間,
那麼消息會自動的清除,在聲明隊列時能夠指定x-message-ttl消息過時時間做爲Map<String, Object> arguments的參數)
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.expiration("10000")
.headers(headers)
.build();
 22:死信隊列使用
x-max-length 指定隊列最大長度
x-message-ttl 指定隊列的消息過時時間
Map<String, Object> agruments = new HashMap<String, Object>();
agruments.put("x-dead-letter-exchange", "dlx.exchange");指定當前隊列是對應一個exchange爲
dlx.exchange死信的exchange
當消息在一個隊列中變成死信的時候,它能從新被publish另外一個exchange中
這個exchange就是x-dead-letter-exchange指定的
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 這就是一個普通的交換機 和 隊列 以及路由
String exchangeName = "test_dlx_exchange";
String routingKey = "dlx.#";
String queueName = "test_dlx_queue";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
Map<String, Object> agruments = new HashMap<String, Object>();
agruments.put("x-dead-letter-exchange", "dlx.exchange");
//這個agruments屬性,要設置到聲明隊列上
channel.queueDeclare(queueName, true, false, false, agruments);
channel.queueBind(queueName, exchangeName, routingKey);
//要進行死信隊列的聲明:
channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "#");
channel.basicConsume(queueName, true, new MyConsumer(channel));
}
}
消息變成死信的如下幾種狀況
    消息被拒絕,而且requeue= false
    消息ttl過時
    隊列達到最大的長度

23:和springboot 整合須要引入如下包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId
</dependency>
 
 24:RabbitAdmin類能夠很好的操做RabbitMQ,在Spring中直接進行注入便可

(2)autoStartup必須設置true,不然Spring容器不會加載RabbitAdmin類

(3)RabbitAdmin底層實現就是從Spring容器中獲取Exchagge,Bingding,RoutingKey以及Queue的@Bean聲明

(4)而後使用RabbitTemplate的execute方法執行對應聲明,修改,刪除等操做

@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
 25:RabbitMQConfig 配置 注入rabbitAdmin
@Configuration
@ComponentScan({"com.bfxy.spring.*"})
public class RabbitMQConfig {

@Bean
public ConnectionFactory connectionFactory() {

CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses("192.168.11.76:5672");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
return connectionFactory;
}

@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}

26:Test使用rabbitAdmin
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {

@Test
public void contextLoads() {
}

@Autowired
private RabbitAdmin rabbitAdmin;

@Test
public void testAdmin() throws Exception {
rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));

rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));

rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));

rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));

rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));

rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));

rabbitAdmin.declareBinding(new Binding("test.direct.queue",
Binding.DestinationType.QUEUE,
"test.direct", "direct", new HashMap<>()));

rabbitAdmin.declareBinding(
BindingBuilder
.bind(new Queue("test.topic.queue", false)) //直接建立隊列
.to(new TopicExchange("test.topic", false, false)) //直接建立交換機 創建關聯關係
.with("user.#")); //指定路由Key


rabbitAdmin.declareBinding(
BindingBuilder
.bind(new Queue("test.fanout.queue", false))
.to(new FanoutExchange("test.fanout", false, false)));

//清空隊列數據
rabbitAdmin.purgeQueue("test.topic.queue", false);

}
}

27:直接建立對應交換機 & 隊列
@Configuration
@ComponentScan({"com.bfxy.spring.*"})
public class RabbitMQConfig {

@Bean
public ConnectionFactory connectionFactory() {

CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses("192.168.11.76:5672");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
return connectionFactory;
}

@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}

/**
* 針對消費者配置
* 1. 設置交換機類型
* 2. 將隊列綁定到交換機
* FanoutExchange: 將消息分發到全部的綁定隊列,無routingkey的概念
* HeadersExchange :經過添加屬性key-value匹配
* DirectExchange:按照routingkey分發到指定隊列
* TopicExchange:多關鍵字匹配
*/
@Bean
public TopicExchange exchange001() {
return new TopicExchange("topic001", true, false);
}

@Bean
public Queue queue001() {
return new Queue("queue001", true); //隊列持久
}

@Bean
public Binding binding001() {
return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*");
}

@Bean
public TopicExchange exchange002() {
return new TopicExchange("topic002", true, false);
}

@Bean
public Queue queue002() {
return new Queue("queue002", true); //隊列持久
}

@Bean
public Binding binding002() {
return BindingBuilder.bind(queue002()).to(exchange002()).with("rabbit.*");
}

@Bean
public Queue queue003() {
return new Queue("queue003", true); //隊列持久
}

@Bean
public Binding binding003() {
return BindingBuilder.bind(queue003()).to(exchange001()).with("mq.*");
}

@Bean
public Queue queue_image() {
return new Queue("image_queue", true); //隊列持久
}

@Bean
public Queue queue_pdf() {
return new Queue("pdf_queue", true); //隊列持久
}


@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
}
28:測試
直接建立對應交換機 & 隊列經過rabbitTemplate發送消息
它提供了豐富的發送消息方法,包括可靠性投遞消息方法、回調監聽消息接口 ConfirmCallback、
返回值確認接口 ReturnCallback 等等。一樣咱們須要進行注入到 Spring 容器中,而後直接使用。
RabbitTemplate  Spring 整合時須要實例化,可是 Springboot 整合時,配置文件裏添加配置便可
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage() throws Exception {
//1 建立消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.getHeaders().put("desc", "信息描述..");
messageProperties.getHeaders().put("type", "自定義消息類型..");
Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
System.err.println("------添加額外的設置---------");
message.getMessageProperties().getHeaders().put("desc", "額外修改的信息描述");
message.getMessageProperties().getHeaders().put("attr", "額外新加的屬性");
return message;
}
});
}
@Test
public void testSendMessage2() throws Exception {
//1 建立消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
Message message = new Message("mq 消息1234".getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.abc", message);
rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!");
rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!");
}

29:
SimpleMessageListenerContailer

(1)簡單消息監聽容器:這個類很是的強大,咱們能夠對他進行不少設置,對於消費者的配置項,這個類均可以知足

(2)設置事務特性,事務管理器,事務屬性,事務容量,事務開啓等

(3)設置消息確認和自動確認模式,是否重回隊列,異常捕獲handler函數

(4)設置消費者標籤生成策略,是否獨佔模式,消費者屬性等

(5)simpleMessageListenerContailer能夠進行動態設置,好比在運行中的應用能夠動態的修改其消費者數量的大小,接收消息的模式等

@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(5);
container.setDefaultRequeueRejected(false);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setExposeListenerChannel(true);
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});


container.setMessageListener(new ChannelAwareMessageListener() {
@Override public void onMessage(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody());
System.err.println("----------消費者: " + msg);
}
});
}

30:MessageListenerAdapter 適配器

   適配器方式. 默認是有本身的方法名字的:handleMessage
// 能夠本身指定一個方法的名字: consumeMessage
// 也能夠添加一個轉換器: 從字節數組轉換爲String
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
adapter.setMessageConverter(new TextMessageConverter());
container.setMessageListener(adapter);

MessageDelegate 類
public class MessageDelegate {

public void handleMessage(byte[] messageBody) {
System.err.println("默認方法, 消息內容:" + new String(messageBody));
}

public void consumeMessage(byte[] messageBody) {
System.err.println("字節數組方法, 消息內容:" + new String(messageBody));
}

public void consumeMessage(String messageBody) {
System.err.println("字符串方法, 消息內容:" + messageBody);
}
}
TextMessageConverter 類
public class TextMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
return new Message(object.toString().getBytes(), messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
String contentType = message.getMessageProperties().getContentType();
if(null != contentType && contentType.contains("text")) {
return new String(message.getBody());
}
return message.getBody();
}
}

31:配器方式: 咱們的隊列名稱 和 方法名稱 也能夠進行一一的匹配
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setMessageConverter(new TextMessageConverter());
Map<String, String> queueOrTagToMethodName = new HashMap<>();
queueOrTagToMethodName.put("queue001", "method1");
queueOrTagToMethodName.put("queue002", "method2");
adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
container.setMessageListener(adapter);
32:支持json格式的轉換器

MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");

Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
adapter.setMessageConverter(jackson2JsonMessageConverter);

container.setMessageListener(adapter);



public class MessageDelegate {


public void consumeMessage(Map messageBody) {
System.err.println("map方法, 消息內容:" + messageBody);
}
}
@Test
public void testSendJsonMessage() throws Exception {
Order order = new Order();
order.setId("001");
order.setName("消息訂單");
order.setContent("描述信息");
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(order);
System.err.println("order 4 json: " + json);

MessageProperties messageProperties = new MessageProperties();
//這裏注意必定要修改contentType爲 application/json
messageProperties.setContentType("application/json");
Message message = new Message(json.getBytes(), messageProperties);

rabbitTemplate.send("topic001", "spring.order", message);
}

32:DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java對象轉換

MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");

Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();

DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);

adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);



public class MessageDelegate {
   public void consumeMessage(Order order) {
System.err.println("order對象, 消息內容, id: " + order.getId() +
", name: " + order.getName() +
", content: "+ order.getContent());
}

}

@Test
public void testSendJavaMessage() throws Exception {

Order order = new Order();
order.setId("001");
order.setName("訂單消息");
order.setContent("訂單描述信息");
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(order);
System.err.println("order 4 json: " + json);

MessageProperties messageProperties = new MessageProperties();
//這裏注意必定要修改contentType爲 application/json
messageProperties.setContentType("application/json");
messageProperties.getHeaders().put("__TypeId__", "com.bfxy.spring.entity.Order");
Message message = new Message(json.getBytes(), messageProperties);

rabbitTemplate.send("topic001", "spring.order", message);
}

33:DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java對象多映射轉換

MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();

Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>();
idClassMapping.put("order", com.bfxy.spring.entity.Order.class);
idClassMapping.put("packaged", com.bfxy.spring.entity.Packaged.class);

javaTypeMapper.setIdClassMapping(idClassMapping);

jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);


public class MessageDelegate {
  
public void consumeMessage(Order order) {
System.err.println("order對象, 消息內容, id: " + order.getId() +
", name: " + order.getName() +
", content: "+ order.getContent());
}

public void consumeMessage(Packaged pack) {
System.err.println("package對象, 消息內容, id: " + pack.getId() +
", name: " + pack.getName() +
", content: "+ pack.getDescription());
}
}

@Test
public void testSendMappingMessage() throws Exception {

ObjectMapper mapper = new ObjectMapper();

Order order = new Order();
order.setId("001");
order.setName("訂單消息");
order.setContent("訂單描述信息");

String json1 = mapper.writeValueAsString(order);
System.err.println("order 4 json: " + json1);

MessageProperties messageProperties1 = new MessageProperties();
//這裏注意必定要修改contentType爲 application/json
messageProperties1.setContentType("application/json");
messageProperties1.getHeaders().put("__TypeId__", "order");
Message message1 = new Message(json1.getBytes(), messageProperties1);
rabbitTemplate.send("topic001", "spring.order", message1);

Packaged pack = new Packaged();
pack.setId("002");
pack.setName("包裹消息");
pack.setDescription("包裹描述信息");

String json2 = mapper.writeValueAsString(pack);
System.err.println("pack 4 json: " + json2);

MessageProperties messageProperties2 = new MessageProperties();
//這裏注意必定要修改contentType爲 application/json
messageProperties2.setContentType("application/json");
messageProperties2.getHeaders().put("__TypeId__", "packaged");
Message message2 = new Message(json2.getBytes(), messageProperties2);
rabbitTemplate.send("topic001", "spring.pack", message2);
}

34:配置全局的轉換器
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
//全局的轉換器:
ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter();
TextMessageConverter textConvert = new TextMessageConverter();
convert.addDelegate("text", textConvert);
convert.addDelegate("html/text", textConvert);
convert.addDelegate("xml/text", textConvert);
convert.addDelegate("text/plain", textConvert);
Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter();
convert.addDelegate("json", jsonConvert);
convert.addDelegate("application/json", jsonConvert);

ImageMessageConverter imageConverter = new ImageMessageConverter();
convert.addDelegate("image/png", imageConverter);
convert.addDelegate("image", imageConverter);

PDFMessageConverter pdfConverter = new PDFMessageConverter();
convert.addDelegate("application/pdf", pdfConverter);
adapter.setMessageConverter(convert);
container.setMessageListener(adapter);
return container;
35:圖片轉換器
public class ImageMessageConverter implements MessageConverter {

@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
throw new MessageConversionException(" convert error ! ");
}

@Override
public Object fromMessage(Message message) throws MessageConversionException {
System.err.println("-----------Image MessageConverter----------");

Object _extName = message.getMessageProperties().getHeaders().get("extName");
String extName = _extName == null ? "png" : _extName.toString();

byte[] body = message.getBody();
String fileName = UUID.randomUUID().toString();
String path = "d:/010_test/" + fileName + "." + extName;
File f = new File(path);
try {
Files.copy(new ByteArrayInputStream(body), f.toPath());
} catch (IOException e) {
e.printStackTrace();
}
return f;
}

}


    @Test
public void testSendExtConverterMessage() throws Exception {
// byte[] body = Files.readAllBytes(Paths.get("d:/002_books", "picture.png"));
// MessageProperties messageProperties = new MessageProperties();
// messageProperties.setContentType("image/png");
// messageProperties.getHeaders().put("extName", "png");
// Message message = new Message(body, messageProperties);
// rabbitTemplate.send("", "image_queue", message);

byte[] body = Files.readAllBytes(Paths.get("d:/002_books", "mysql.pdf"));
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("application/pdf");
Message message = new Message(body, messageProperties);
rabbitTemplate.send("", "pdf_queue", message);
}


36:spring boot整合 rabbitmq
producer端代碼
@Component
public class RabbitSender {

//自動注入RabbitTemplate模板類
@Autowired
private RabbitTemplate rabbitTemplate;

//回調函數: confirm確認
final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.err.println("correlationData: " + correlationData);
System.err.println("ack: " + ack);
if(!ack){
System.err.println("異常處理....");
}
}
};

//回調函數: return返回
final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
String exchange, String routingKey) {
System.err.println("return exchange: " + exchange + ", routingKey: "
+ routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
}
};

//發送消息方法調用: 構建Message消息
public void send(Object message, Map<String, Object> properties) throws Exception {
MessageHeaders mhs = new MessageHeaders(properties);
Message msg = MessageBuilder.createMessage(message, mhs);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
//id + 時間戳 全局惟一
CorrelationData correlationData = new CorrelationData("1234567890");
rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);
}

//發送消息方法調用: 構建自定義對象消息
public void sendOrder(Order order) throws Exception {
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
//id + 時間戳 全局惟一
CorrelationData correlationData = new CorrelationData("0987654321");
rabbitTemplate.convertAndSend("exchange-2", "springboot.def", order, correlationData);
}

}
 測試代碼
@Autowired
private RabbitSender rabbitSender;

private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

@Test
public void testSender1() throws Exception {
Map<String, Object> properties = new HashMap<>();
properties.put("number", "12345");
properties.put("send_time", simpleDateFormat.format(new Date()));
rabbitSender.send("Hello RabbitMQ For Spring Boot!", properties);
}

@Test
public void testSender2() throws Exception {
Order order = new Order("001", "第一個訂單");
rabbitSender.sendOrder(order);
}
 
36:spring boot整合 rabbitmq
consumer端代碼
@Component
public class RabbitReceiver {


@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "queue-1",
durable="true"),
exchange = @Exchange(value = "exchange-1",
durable="true",
type= "topic",
ignoreDeclarationExceptions = "true"),
key = "springboot.*"
)
)
@RabbitHandler
public void onMessage(Message message, Channel channel) throws Exception {
System.err.println("--------------------------------------");
System.err.println("消費端Payload: " + message.getPayload());
Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
//手工ACK
channel.basicAck(deliveryTag, false);
}


/**
*
* spring.rabbitmq.listener.order.queue.name=queue-2
spring.rabbitmq.listener.order.queue.durable=true
spring.rabbitmq.listener.order.exchange.name=exchange-1
spring.rabbitmq.listener.order.exchange.durable=true
spring.rabbitmq.listener.order.exchange.type=topic
spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
spring.rabbitmq.listener.order.key=springboot.*
* @param order
* @param channel
* @param headers
* @throws Exception
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}",
durable="${spring.rabbitmq.listener.order.queue.durable}"),
exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}",
durable="${spring.rabbitmq.listener.order.exchange.durable}",
type= "${spring.rabbitmq.listener.order.exchange.type}",
ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
key = "${spring.rabbitmq.listener.order.key}"
)
)
@RabbitHandler
public void onOrderMessage(@Payload com.bfxy.springboot.entity.Order order,
Channel channel,
@Headers Map<String, Object> headers) throws Exception {
System.err.println("--------------------------------------");
System.err.println("消費端order: " + order.getId());
Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
//手工ACK
channel.basicAck(deliveryTag, false);
}
}
37:集羣模式 
主備模式

實現RabbitMQ的高可用集羣,通常在併發和數據量不高的狀況下,這種模式很是的好且簡單。主備模式也稱爲Warren模式

主節點若是掛了,備節點提供服務而已,和activemq利用zookeeper作主/備同樣

 
  • (主節點若是掛了,從節點提供服務而已,和activemq利用zookeeper作主/備同樣)
 
 
  • HaProxy配置:
listen rabbitmq_cluster bind 0.0.0.0:5672 mode tcp #配置TCP模式 balance roundrobin #簡單的輪詢 server bhz76 192.168.11.12:5672 check inter 5000 rise 2 fall 3 #主節點 server bhz77 192.168.11.13:5672 backup check inter 5000 rise 2 fall 3 #備用節點 

備註:rabbitmq集羣節點配置 #inter 每隔5秒對mq集羣作健康檢查,2次正確證實服務器可用,3次失敗證實服務器不可用,而且配置主備機制

鏡像模式(經常使用)

  • 鏡像模式:集羣模式很是經典的就是Mirror鏡像模式,保證100%數據不丟失,在實際工做中用的最多的。而且實現集羣很是的簡單,通常互聯網大廠都會構建這種鏡像集羣模式。

  • Mirror鏡像隊列,目的是爲了保證rabbitmq數據的高可靠性解決方案,主要就是實現數據的同步,通常來說是2-3個實現數據同步(對於100%數據可靠性解決方案通常是3個節點)集羣架構以下:

 
 

多活模式

  • 多活模式:這種模式也是實現異地數據複製的主流模式,由於Shovel模式配置比較複雜,因此通常來講實現異地集羣都是使用雙活或者多活模式來實現的。這種模式須要依賴rabbitmq的federation插件,能夠實現繼續的可靠AMQP數據通訊,多活模式在實際配置與應用很是的簡單。

  • RabbitMQ部署架構採用雙中心模式(多中心),那麼在兩套(或多套)數據中心中各部署一套RabbitMQ集羣,各中心之間還須要實現部分隊列消息共享。多活集羣架構以下:

 
 
  • Federation插件是一個不須要構建Cluster,而在Brokers之間傳輸消息的高性能插件,Federation插件能夠在Brokers或者Cluster之間傳輸消息,鏈接雙方可使用不一樣的users和vistual hosts,雙方也可使用版本不一樣的RabbitMQ和Erlang。Federation插件使用AMQP協議通訊,能夠接收不連續的傳輸。
 
 
  • Federation Exchanges,能夠當作Downstream從Upstream主動拉取消息,但並非拉取全部消息,必須是在Downstream上已經明肯定義Bindings關係的Exchange,也就是有實際的物理Queue來接收消息,纔會從Upstream拉取消息到Downstream。使用AMQP協議實施代理間通訊,Downstream會將綁定關係組合在一塊兒,綁定/解綁命令將會發送到Upstream交換機。所以,FederationExchange只接收具備訂閱的消息。

RabbitMQ集羣鏡像模式從0到1

  • RabbitMQ集羣環境節點說明


     
     

詳細步驟:
RabbitMQ鏡像集羣搭建步驟

  • HAProxy是一款提供高可用性、負載均衡以及基於TCP(第四層)和HTTP(第七層)應用的代理軟件,支持虛擬主機,他是免費、快速而且可靠的一種解決方案。HAProxy特別適用於那些負載特大的web站點,這些站點一般又須要會話保持或七層處理。HAProxy運行在時下的硬件上,徹底能夠支撐數以萬計的併發鏈接。而且它的運行模式使得它能夠很簡單安全的整合進您當前的架構中,同時能夠保護你的web服務器不被暴露到網絡上。
  • HAProxy藉助於OS上幾種常見的技術來實現性能的最大化:
  1. 單進程、時間驅動模型顯著下降上下文切換的開銷及內存佔用
  2. 在任何可用的狀況下,單緩衝(single buffering)機制能以不復制任何數據的方式完成讀寫操做,這會節約大量的CPU時鐘週期及內存帶寬
  3. 藉助於Linux2.6上的splice()系統調用,HAProxy能夠實現零複製轉發(Zero-copy- forwarding),在linux3.5及以上的OS上還能夠實現零複製啓動(zero-starting)
  • KeepAlived軟件主要是經過VRRP協議實現高可用功能的。VRRP是Virtual Router RedundancyProtocol(虛擬路由器冗餘協議)的縮寫,VRRP出現的目的就是爲了解決靜態路由單點故障問題的,它能保證黨個別節點宕機時,整個網絡能夠不間斷地運行,因此,KeepAlived一方面具備配置管理LVS的功能,同時還具有對LVS下面節點進行健康檢查差的功能,另外一方面可實現系統網絡服務的高可用功能。

  • KeepAlived服務的三個重要功能:

    1. 管理LVS負載均衡軟件
    2. 實現LVS集羣節點的健康檢查
    3. 做爲系統網絡服務的高可用性(failover)
  • KeepAlived高可用原理
    KeepAlived高可用服務對之間的故障轉移,是經過VRRP(Virtual Router Redundancy Protocol,虛擬路由器冗餘協議)來實現的。在KeepAlived服務正常工做是,主Master節點會不斷地向備節點發送(多播的方式)心跳消息,用以告訴備Backup節點本身還活着,當主master節點發生故障時,就沒法發送心跳消息,備節點也就所以沒法繼續監測到來自主Master節點的心跳了,因而調用自身的接管程序,接管主Master節點 的IP資源及服務。當主Master節點恢復時,備Backup節點又會釋放主節點故障時自身接管的IP資源和服務,恢復到原來的備用角色。



 
 40: 延遲插件

聲明exchange類型爲x-delayed-message,
聲明queue的參數中增長x-dead-letter-exchange,
消息頭部增長x-delay參數,
 41:消息發送模式
迅速消息 不作可靠性投遞 不作消息落庫
批量消息發送 把消息放到一個集合裏面統一進行提交延遲消息發送順序消息發送必須保證全部消息投遞到一個隊列中且這個隊列只能有一個消費者獨佔模式
相關文章
相關標籤/搜索