RabbitMQ學習筆記(4)----RabbitMQ Exchange(交換機)的使用

1. fanout模式

1.1 Publish/Subscribe(發佈/訂閱)結構圖

  

 

  上圖表示一個消費者消費消息以後,不講消息直接存儲到隊列,而是使用兩個消費者各自聲明一個隊列,將各自的對應的隊列與交換機綁定。這樣每一個消費者都讀取的是自身所對應的隊列的全部消息,大達到了一個生產者生產消息,全部消費者都能消費的目的。java

  將交換機類型設置爲fanout便可實現Publish/Subscribe服務器

1.2 生產者代碼

package com.wangx.rabbitmq.sp;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {

    private static final String EXCHANGE_NAME = "exchange";
    public static void main(String[] args) throws IOException, TimeoutException {

        //建立鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置服務器主機
        factory.setHost("127.0.0.1");
        //設置用戶名
        factory.setUsername("wangx");
        //設置密碼
        factory.setPassword("wangx");
        //設置VirtualHost
        factory.setVirtualHost("/wangx");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();

            //聲明交換機
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
            String message = "Hello World!";
            //發送消息
            for (int i = 0; i < 10; i++) {
                //發送消息
                channel.basicPublish(EXCHANGE_NAME, "", null, (message + i).getBytes());
                System.out.println(" [x] Sent '" + message + i + "'");
            }
        }catch (Exception e) {

        }finally {
            channel.close();
            connection.close();
        }
    }
}

  與普通消息生產者不一樣的地方在於,發佈訂閱時必需要顯示的聲明一個交換機,而且在發送消息的時候,沒有隊列,必須設置交換機名稱。ide

1.3 消費者實現代碼

package com.wangx.rabbitmq.sp;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer1 {
    /**
     * 隊列名字
     */
    private static String QUEUE_NAME = "queue1";
    private static final String EXCHANGE_NAME = "exchange";
    public static void main(String[] args){

        //建立鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置服務器主機
        factory.setHost("127.0.0.1");
        //設置用戶名
        factory.setUsername("wangx");
        //設置密碼
        factory.setPassword("wangx");
        //設置VirtualHost
        factory.setVirtualHost("/wangx");
        Connection connection = null;
        try {
            //建立鏈接
            connection = factory.newConnection();
            //建立消息通道
            final Channel  channel = connection.createChannel();
            //聲明交換機
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
            //聲明隊列
            channel.queueDeclare(QUEUE_NAME,false, false, false, null);
            //綁定隊列與交換機
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
            //消息服務器每次只向消費者發送一條消息
//            channel.basicQos(1);
            Consumer consumer = new DefaultConsumer(channel){
                //重寫DefaultConsumer中handleDelivery方法,在方法中獲取消息
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException{
                    try {
                        //消息沉睡一秒
                        Thread.sleep(1000);
                        String message = new String(body, "UTF-8");
                        System.out.println("consumer1 收到消息 '" + message + "'");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        System.out.println("consumer1 消息消費完成....");
                    }

                }
            };
            //監聽消息
            channel.basicConsume(QUEUE_NAME, true,consumer);
        }catch (Exception e) {
            e.printStackTrace();
        }finally {
        }
    }
}

  這裏仍然使用的是兩個不一樣的消費者,而且將兩個不一樣的消費者分別聲明不一樣的消息隊列,而後將聲明的隊列與交換機進行綁定,使用channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "fanout");方法。啓動兩個消費者,將兩個不一樣的消息隊列註冊並綁定上去。啓動消息生產者發送消息,將會看到,兩個不一樣的消費者均能接收到生產者發送的全部消息。學習

2. Routing 模式

  結構圖:ui

  

  發送消息時指定不一樣的key,交換機分發消息是根據key分發消息到不一樣的消費者隊列中。spa

  將交換機模式改成DIRECT,在消費端設置了不一樣的key,至關於爲消息分個類,以便於在交換機分發消息時,將消息分發給持有該key的消費者,生產者代碼以下:code

package com.wangx.rabbitmq.routing;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {

    private static final String EXCHANGE_NAME = "exchange-routing";
    public static void main(String[] args) throws IOException, TimeoutException {

        //建立鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置服務器主機
        factory.setHost("127.0.0.1");
        //設置用戶名
        factory.setUsername("wangx");
        //設置密碼
        factory.setPassword("wangx");
        //設置VirtualHost
        factory.setVirtualHost("/wangx");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();

            //聲明交換機
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            String message = "Hello World!";
            //發送消息
            for (int i = 0; i < 10; i++) {
               if ( i % 2 == 0) {
                   //發送消息,指定key
                   channel.basicPublish(EXCHANGE_NAME, "key2", null, (message + i).getBytes());
                   System.out.println(" 偶數消息 '" + message + i + "'");
               } else {
                   //發送消息
                   channel.basicPublish(EXCHANGE_NAME, "key1", null, (message + i).getBytes());
                   System.out.println(" 奇數消息 '" + message + i + "'");
               }
            }
        }catch (Exception e) {

        }finally {
            channel.close();
            connection.close();
        }
    }
}

 消費者代碼以下:blog

package com.wangx.rabbitmq.routing;

import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer1 {
    /**
     * 隊列名字
     */
    private static String QUEUE_NAME = "queue1";
    private static final String EXCHANGE_NAME = "exchange-routing";
    public static void main(String[] args){

        //建立鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置服務器主機
        factory.setHost("127.0.0.1");
        //設置用戶名
        factory.setUsername("wangx");
        //設置密碼
        factory.setPassword("wangx");
        //設置VirtualHost
        factory.setVirtualHost("/wangx");
        Connection connection = null;
        try {
            //建立鏈接
            connection = factory.newConnection();
            //建立消息通道
            final Channel  channel = connection.createChannel();
            //聲明交換機
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            //聲明隊列
            channel.queueDeclare(QUEUE_NAME,false, false, false, null);
            //綁定隊列與交換機,指定接收的消息的key
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key2");
            //消息服務器每次只向消費者發送一條消息
//            channel.basicQos(1);
            Consumer consumer = new DefaultConsumer(channel){
                //重寫DefaultConsumer中handleDelivery方法,在方法中獲取消息
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException{
                    try {
                        //消息沉睡一秒
                        Thread.sleep(1000);
                        String message = new String(body, "UTF-8");
                        System.out.println("consumer1 收到消息 '" + message + "'");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        System.out.println("consumer1 消息消費完成....");
                    }

                }
            };
            //監聽消息
            channel.basicConsume(QUEUE_NAME, true,consumer);
        }catch (Exception e) {
            e.printStackTrace();
        }finally {
        }
    }
}

  兩個消費者一個使用接收奇數,一個接收偶數消息(根據發送消息時判斷奇偶設置不一樣的key),接收時接收相應的key便可。rabbitmq

  這樣就能夠指定接收想要接收的類型的消息了,至關於前面學習的mq的消息的過濾。隊列

3. Topic模式

  將路由鍵和某模式進行匹配。此時隊列須要綁定要一個模式上。符號「#」匹配一個或多個詞,符號「*」匹配很少很多一個詞。所以「audit.#」可以匹配到「audit.irs.corporate」,可是「audit.*」 只會匹配到「audit.irs」。

   生產者實現:

package com.wangx.rabbitmq.topic;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {

    private static final String EXCHANGE_NAME = "exchange-topic";
    public static void main(String[] args) throws IOException, TimeoutException {

        //建立鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置服務器主機
        factory.setHost("127.0.0.1");
        //設置用戶名
        factory.setUsername("wangx");
        //設置密碼
        factory.setPassword("wangx");
        //設置VirtualHost
        factory.setVirtualHost("/wangx");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();

            //聲明交換機
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            String message = "Hello World!";
            //發送消息,綁定模式
            channel.basicPublish(EXCHANGE_NAME, "key.one", null, ("one -" + message).getBytes());
            channel.basicPublish(EXCHANGE_NAME, "key.two.msg", null, ("two -" + message).getBytes());
        }catch (Exception e) {

        }finally {
            channel.close();
            connection.close();
        }
    }
}

 

  這裏發送消息時,綁定了key.one和key.two.msg兩種模式。接下來使用*號和#號兩種通配符得消費者,以下:

Consumer1

package com.wangx.rabbitmq.topic;

import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer1 {
    /**
     * 隊列名字
     */
    private static String QUEUE_NAME = "queue-topic";
    private static final String EXCHANGE_NAME = "exchange-topic";
    public static void main(String[] args){

        //建立鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置服務器主機
        factory.setHost("127.0.0.1");
        //設置用戶名
        factory.setUsername("wangx");
        //設置密碼
        factory.setPassword("wangx");
        //設置VirtualHost
        factory.setVirtualHost("/wangx");
        Connection connection = null;
        try {
            //建立鏈接
            connection = factory.newConnection();
            //建立消息通道
            final Channel  channel = connection.createChannel();
            //聲明交換機
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            //聲明隊列
            channel.queueDeclare(QUEUE_NAME,false, false, false, null);
            //綁定隊列與交換機,使用通配符key.* 表示只能匹配key下的一個路徑,
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.*");
            //消息服務器每次只向消費者發送一條消息
//            channel.basicQos(1);
            Consumer consumer = new DefaultConsumer(channel){
                //重寫DefaultConsumer中handleDelivery方法,在方法中獲取消息
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException{
                    try {
                        //消息沉睡一秒
                        Thread.sleep(1000);
                        String message = new String(body, "UTF-8");
                        System.out.println("consumer1 收到消息 '" + message + "'");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        System.out.println("consumer1 消息消費完成....");
                    }

                }
            };
            //監聽消息
            channel.basicConsume(QUEUE_NAME, true,consumer);
        }catch (Exception e) {
            e.printStackTrace();
        }finally {
        }
    }
}

  因此在本例中只能匹配到key.one的消息,控制檯打印以下:

consumer1 收到消息 'one -Hello World!'
consumer1 消息消費完成....

  Consumer2實現:

package com.wangx.rabbitmq.topic;

import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer2 {
    /**
     * 隊列名字
     */
    private static String QUEUE_NAME = "queue-topic-2";
    private static final String EXCHANGE_NAME = "exchange-topic";
    public static void main(String[] args){
        //建立鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置服務器主機
        factory.setHost("127.0.0.1");
        //設置用戶名
        factory.setUsername("wangx");
        //設置密碼
        factory.setPassword("wangx");
        //設置VirtualHost
        factory.setVirtualHost("/wangx");
        Connection connection = null;
        try {
            //建立鏈接
            connection = factory.newConnection();
            //建立消息通道
            final Channel  channel = connection.createChannel();
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            //聲明隊列
            channel.queueDeclare(QUEUE_NAME,false, false, false, null);
            //這裏使用#號通配符,表示可以匹配到key下的任意路徑
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.#");
            Consumer consumer = new DefaultConsumer(channel){
                //重寫DefaultConsumer中handleDelivery方法,在方法中獲取消息
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException{
                    try {
                        //消息沉睡100ms
                        Thread.sleep(100);
                        String message = new String(body, "UTF-8");
                        System.out.println("consumer2 收到消息 '" + message + "'");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        System.out.println("consumer2 消息消費完成....");
                    }

                }
            };
            //監聽消息,第二個參數爲true時表示自動確認
           channel.basicConsume(QUEUE_NAME, true,consumer);
        }catch (Exception e) {
            e.printStackTrace();
        }finally {
        }
    }
}

  這裏使用了key.#表示匹配全部key.下的全部路徑,因此可以接收到全部以key開頭的消息,控制檯打印以下:

consumer2 收到消息 'one -Hello World!'
consumer2 消息消費完成....
consumer2 收到消息 'two -Hello World!'
consumer2 消息消費完成....

使用topic模式既能夠輕易的實現fanout模式,也能夠實現routing模式,同時提供了通配符的狀況下,使得匹配更加靈活,使用方式更加簡潔。

相關文章
相關標籤/搜索