RabbitMQ 04 訂閱模式/路由模式

rabbitmq六種工做模式

3.發佈訂閱模式

即向多個消費者傳遞同一條信息
image算法

1).Exchanges 交換機

RabbitMQ消息傳遞模型的核心思想是,生產者永遠不會將任何消息直接發送到隊列。安全

相反,生產者只能向交換機(Exchange)發送消息。交換機是一個很是簡單的東西。一邊接收來自生產者的消息,另外一邊將消息推送到隊列。交換器必須確切地知道如何處理它接收到的消息。它應該被添加到一個特定的隊列中嗎?它應該添加到多個隊列中嗎?或者它應該被丟棄。這些規則由exchange的類型定義。dom

有幾種可用的交換類型:direct、topic、header和fanout
建立fanout交換機logs: c.exchangeDeclare("logs", "fanout");
c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);ide

fanout交換機很是簡單。它只是將接收到的全部消息廣播給它所知道的全部隊列。ui

2).綁定 Bindings

image

建立了一個fanout交換機和一個隊列。如今咱們須要告訴exchange向指定隊列發送消息。exchange和隊列之間的關係稱爲綁定。spa

//指定的隊列,與指定的交換機關聯起來
//稱爲綁定 -- binding
//第三個參數時 routingKey, 因爲是fanout交換機, 這裏忽略 routingKey
ch.queueBind(queueName, "logs", "");
3).總體代碼

1.生產者
最重要的更改是,咱們如今但願將消息發佈到logs交換機,而不是無名的日誌交換機。咱們須要在發送時提供一個routingKey,可是對於fanout交換機類型,該值會被忽略。日誌

public class Producer {
    public static void main(String[] args) throws Exception {
        //創建鏈接
 ConnectionFactory f = new ConnectionFactory();
 f.setHost("192.168.64.140");
 f.setPort(5672);
 f.setUsername("admin");
 f.setPassword("admin");
 Connection con = f.newConnection();
 Channel c = con.createChannel();
 //定義fanout類型交換機:logs
 //c.exchangeDeclare("logs", "fanout"); c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
 //向交換機發送信息
 while (true){
            System.out.println("輸入消息:");
 String msg = new Scanner(System.in).nextLine();
 c.basicPublish("logs",
 "",
 null, msg.getBytes());
 }
    }
}

2.消費者
若是尚未隊列綁定到交換器,消息就會丟失,但這對咱們來講沒有問題;若是尚未消費者在聽,咱們能夠安全地丟棄這些信息。code

public class Consumer {
    public static void main(String[] args) throws Exception {
        //創建鏈接
 ConnectionFactory f = new ConnectionFactory();
 f.setHost("192.168.64.140");
 f.setPort(5672);
 f.setUsername("admin");
 f.setPassword("admin");
 Connection con = f.newConnection();
 Channel c = con.createChannel();
 //1.定義隨機隊列 2.定義交換機 3.綁定
 //隨機命名,非持久,獨佔,自動刪除
 String queue = UUID.randomUUID().toString();
 c.queueDeclare(queue,
 false, true, true, null);
 c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
 //第三個參數對發佈訂閱模式fanout交換機無效
 c.queueBind(queue, "logs", "");
 DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
 public void handle(String consumerTag, Delivery message) throws IOException {
                String msg = new String(message.getBody());
 System.out.println("收到:"+msg);
 }
        };
 CancelCallback cancelCallback = new CancelCallback() {
            @Override
 public void handle(String consumerTag) throws IOException {
            }
        };
 //正常的消費數據
 c.basicConsume(queue,
 true, deliverCallback,
 cancelCallback);
 }
}

4.路由模式

路由模式與訂閱模式不一樣之處在於,咱們將向其添加一個特性—咱們將只訂閱全部消息中的一部分.本文中已添加err/info/warning等報錯提示來示範.
imageblog

1).綁定 Bindings

綁定是交換機和隊列之間的關係。這能夠簡單地理解爲:隊列對來自此交換的消息感興趣。rabbitmq

綁定可使用額外的routingKey參數。爲了不與basic_publish參數混淆,咱們將其稱爲bindingKey。這是咱們如何建立一個鍵綁定:

ch.queueBind(queueName, EXCHANGE_NAME, "black");

bindingKey的含義取決於交換機類型。咱們前面使用的fanout交換機徹底忽略它。

2).直連交換機 Direct exchange

上一節中的日誌系統向全部消費者廣播全部消息。咱們但願擴展它,容許根據消息的嚴重性過濾消息。

前面咱們使用的是fanout交換機,這並無給咱們太多的靈活性——它只能進行簡單的廣播。

咱們將用直連交換機(Direct exchange)代替。它背後的路由算法很簡單——消息傳遞到bindingKey與routingKey徹底匹配的隊列。

3).多重綁定 Multiple bindings

使用相同的bindingKey綁定多個隊列是徹底容許的。可使用binding key "black"將X與Q1和Q2綁定。在這種狀況下,直連交換機的行爲相似於fanout,並將消息廣播給全部匹配的隊列。一條路由鍵爲black的消息將同時發送到Q1和Q2。

4).更改

1.發送消息
咱們將提供日誌級別做爲routingKey,這樣,接收程序將可以選擇它但願接收的級別

//參數1: 交換機名
//參數2: routingKey, 路由鍵,這裏咱們用日誌級別,如"error","info","warning"
//參數3: 其餘配置屬性
//參數4: 發佈的消息數據 
ch.basicPublish("direct_logs", "error", null, message.getBytes());

2.接收消息
咱們將爲感興趣的每一個日誌級別建立一個新的綁定

ch.queueBind(queueName, "logs", "info");
ch.queueBind(queueName, "logs", "warning");
5).完整代碼

1.生產者

public class Producer {
    public static void main(String[] args) throws Exception {
        //創建鏈接
 ConnectionFactory f = new ConnectionFactory();
 f.setHost("192.168.64.140");
 f.setPort(5672);
 f.setUsername("admin");
 f.setPassword("admin");
 Connection con = f.newConnection();
 Channel c = con.createChannel();
 //定義fanout類型交換機:logs
 //c.exchangeDeclare("logs", "fanout"); c.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
 //向交換機發送信息
 while (true){
            System.out.println("輸入消息:");
 String msg = new Scanner(System.in).nextLine();
 System.out.println("輸入路由鍵:");
 String key = new Scanner(System.in).nextLine();
 c.basicPublish("direct_logs",
 key, //路由鍵關鍵詞
 null,
 msg.getBytes());
 }
    }
}

2.消費者

public class Consumer {
    public static void main(String[] args) throws Exception {
        //創建鏈接
 ConnectionFactory f = new ConnectionFactory();
 f.setHost("192.168.64.140");
 f.setPort(5672);
 f.setUsername("admin");
 f.setPassword("admin");
 Connection con = f.newConnection();
 Channel c = con.createChannel();
 //1.定義隨機隊列 2.定義交換機 3.綁定
 //隨機命名,非持久,獨佔,自動刪除
 String queue = UUID.randomUUID().toString();
 c.queueDeclare(queue,
 false, true, true, null);
 c.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
 //用輸入綁定鍵進行綁定
 System.out.println("輸入綁定鍵,用空格隔開:");
 String s = new Scanner(System.in).nextLine();
 String[] a = s.split(" "); //["aaa","bbb","ccc"]
 for (String key:a){
            c.queueBind(queue, "direct_logs", key);
 }
        DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
 public void handle(String consumerTag, Delivery message) throws IOException {
                String msg = new String(message.getBody());
 String key = message.getEnvelope().getRoutingKey();
 System.out.println("收到:"+msg+" - "+key);
 }
        };
 CancelCallback cancelCallback = new CancelCallback() {
            @Override
 public void handle(String consumerTag) throws IOException {
            }
        };
 //正常的消費數據
 c.basicConsume(queue,
 true, deliverCallback,
 cancelCallback);
 }
}
相關文章
相關標籤/搜索