官網英文版學習——RabbitMQ學習筆記(七)Topic

    在上一篇中使用直接交換器改進了咱們的系統,使得它可以有選擇的進行接收消息,但它仍然有侷限性——它不能基於多個條件進行路由。本節咱們就進行可以基於多個條件進行路由的topics exchange學習。java

 

    發送給主題交換器的消息不能是任意的routing_key—它必須是一個單詞列表,由點分隔。這些詞能夠是任意的,但一般它們指定與消息相關的一些特性。幾個有效的路由示例:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit".。路由鍵中能夠有任意多的字,最多能夠有255個字節。
ide

    路由鍵也須要是相同的形式,topic交換器背後的邏輯相似於direct交換器——發送帶有特定路由鍵的消息將被傳送到綁定匹配路由鍵的全部隊列中,學習

然而,有兩個重要的特殊狀況須要綁定鍵:
ui

 

  • * (star) can substitute for exactly one word.                     星號能夠代替一個詞
  • # (hash) can substitute for zero or more words.               哈希能夠代替零個或多個詞
  • 經過下面的例子進行解釋:

  •     在這個例子中,咱們將發送全部描述動物的信息。消息將經過一個包含三個單詞(兩個點)的路由鍵發送。路徑鍵中的第一個詞將描述速度,第二個是顏色,第三個是物種:「<速度>.<顏色>.<物種>」。
  •         We created three bindings: Q1 is bound with binding key "*.orange.*" and Q2 with "*.*.rabbit" and "lazy.#".spa

    These bindings can be summarised as:code


  • Q1對全部的橙色動物都感興趣。
    blog

  • Q2但願聽到關於兔子的一切,以及關於懶惰動物的一切。
    three

  •     設置了路由鍵爲 "quick.orange.rabbit"的消息將被投遞到兩個隊列,消息 "lazy.orange.elephant" 也被投遞到他們兩個,而"quick.orange.fox"將被投遞到第一個隊列,"lazy.brown.fox"僅被投遞到第二個隊列,"quick.brown.fox"沒有匹配將被捨棄。rabbitmq

  •     經過上面的學習,咱們知道,topic主題的交換器投遞消息與redict交換器的不一樣在於,交換器類型和路由鍵的模糊匹配,如今咱們就去把以前的代碼進行改變,只須要將代碼中的交換器類型改成topic,並將綁定的路由鍵更改一下,投遞消息用的是肯定的路由鍵,接收消息經過設置匹配的模糊綁定鍵,能夠訂閱到多條件的消息,接下來上代碼,並將改動的代碼如下劃線形式標記出來。隊列

  •     發送方代碼:

  • package com.rabbitmq.HelloWorld;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Publish {
    	
    	private static final String EXCHANGE_NAME = "exchangeC";
    
    	public static void main(String[] args) throws IOException, TimeoutException {
    		// TODO Auto-generated method stub
    //		建立工廠
    		ConnectionFactory factory = new ConnectionFactory();
    		factory.setHost("192.168.10.185");
    		factory.setUsername("admin");
    		factory.setPassword("123456");
    		factory.setPort(5672);
    //		建立鏈接
    		Connection connetion = factory.newConnection();
    //		得到信道
    		Channel channel = connetion.createChannel();
    //		聲明交換器(聲明瞭一個名字位exchangeA,類型修改fanout爲direct類型的交換器)
    		channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    		String message = "555,2,2,33,66";
    //		發送消息,將第二項參數routingkey
    		channel.basicPublish(EXCHANGE_NAME, "my.hello.haha", null, message.getBytes());
    		System.out.println(" [x] Sent '" + message + "'");
    		channel.close();
    		connetion.close();
    	}
    
    }
    

  •     接收方一:

  • package com.rabbitmq.HelloWorld;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    
    public class Subscribe {
    	
     private static final String EXCHANGE_NAME = "exchangeC";
    	private static final String QUEUE_NAME = "queueA";
    
    	public static void main(String[] args) throws IOException, TimeoutException {
    		// TODO Auto-generated method stub
    //		建立工廠
    		ConnectionFactory factory = new ConnectionFactory();
    		factory.setHost("192.168.10.185");
    		factory.setUsername("admin");
    		factory.setPassword("123456");
    		factory.setPort(5672);
    //		建立鏈接
    		Connection connetion = factory.newConnection();
    //		得到信道
    		Channel channel = connetion.createChannel();
    //		聲明交換器(聲明瞭一個名字位exchangeA,類型修改fanout爲direct的交換器)
     channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    //		聲明一個隊列,在此採用臨時隊列
    		String queueName = channel.queueDeclare().getQueue();
    //		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    //		隊列和交換器進行綁定,並設定路由鍵爲error
     channel.queueBind(queueName, EXCHANGE_NAME, "my.#");
    		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    		Consumer consumer = new DefaultConsumer(channel){
    			@Override
    			public void handleDelivery(String consumerTag, Envelope envelope,
    					BasicProperties properties, byte[] body) throws IOException {
    				// TODO Auto-generated method stub
    				String message = new String(body,"utf-8");
    				System.out.println("[x] received'"+message+"'");
    			}
    		};
    		channel.basicConsume(queueName, consumer);
    	}
    
    }
  •     接收方二:

  • import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    
    public class Subscribe {
    	
    	private static final String EXCHANGE_NAME = "exchangeC";
    	private static final String QUEUE_NAME = "queueA";
    
    	public static void main(String[] args) throws IOException, TimeoutException {
    		// TODO Auto-generated method stub
    //		建立工廠
    		ConnectionFactory factory = new ConnectionFactory();
    		factory.setHost("192.168.10.185");
    		factory.setUsername("admin");
    		factory.setPassword("123456");
    		factory.setPort(5672);
    //		建立鏈接
    		Connection connetion = factory.newConnection();
    //		得到信道
    		Channel channel = connetion.createChannel();
    //		聲明交換器(聲明瞭一個名字位exchangeA,修改fanout類型爲direct類型的交換器�?
    	 channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    //		聲明�?個隊列,在此採用臨時隊列
    		String queueName = channel.queueDeclare().getQueue();
    //		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    //		隊列和交換器進行綁定,未設定路由鍵
    		channel.queueBind(queueName, EXCHANGE_NAME, "my.hello.*");
    		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    		Consumer consumer = new DefaultConsumer(channel){
    			@Override
    			public void handleDelivery(String consumerTag, Envelope envelope,
    					BasicProperties properties, byte[] body) throws IOException {
    				// TODO Auto-generated method stub
    				String message = new String(body,"utf-8");
    				System.out.println("[x] received'"+message+"'");
    			}
    		};
    		channel.basicConsume(queueName, consumer);
    	}
    
    }
    
           運行後,兩個接收方均接收道理髮送方發送的消息,儘管咱們在兩個接收方配置的綁定鍵並不相同,可是其模糊匹配規則都可以匹配到發送方發送消息的路由鍵,若是有大量接收方,咱們就能夠經過設置不一樣的綁定鍵來有選擇的接收較多的消息或者是不接受消息。
相關文章
相關標籤/搜索