初識RabbitMq(三) 交換機

1fanout(廣播)

這種模式只須要將隊列綁定到交換機.上便可,是不須要設置路由鍵的,如圖(生產者先將消息給交換機,而後交換機將消息所有發給隊列。每一個消費者接收到的消息如出一轍(不會像以前同樣將消息平均分配給消費者))java

 

 

生產者代碼ide

 

 

package item.com.fanout;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLog {
    //定義交換機名稱
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception {
        //建立鏈接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPassword("admin");
        connectionFactory.setUsername("admin");
        //建立一個鏈接
        Connection connection = connectionFactory.newConnection();
        //創建一個頻道
        Channel channel = connection.createChannel();
        //交換聲明  channel.exchangeDeclare(交換機名字,交換機類型)
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        //發送消息
        for (int i = 0; i <10 ; i++) {
            String message=i+"";
          //  channel.basicPublish(交換機名稱,routingKey,props,消息)
            channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));
        }
        channel.close();;
        connection.close();
    }
}

 

消費者代碼(可寫多個)ui

package item.com.fanout;
import com.rabbitmq.client.*;
import java.io.IOException;

public class ReceiveLogs1 {
    //定義交換機名稱
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws  Exception{
        //建立鏈接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setHost("127.0.0.1");
        //建立一個鏈接
        Connection connection = connectionFactory.newConnection();
        //創建一個頻道
        Channel channel = connection.createChannel();
        //消息接收和隊列綁定,不和交換機綁定
        //獲取一個非持久的隊列
        String queueName = channel.queueDeclare().getQueue();
        //將隊列綁定到交換機上
        channel.queueBind(queueName,EXCHANGE_NAME,"");
        //  channel.queueBind(隊列名稱, 交換機名稱, routingKey);

        Consumer consumer =  new DefaultConsumer(channel) {
//重寫 handleDelivery   =》new handleDelivery而後回車
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("接收消息:"+message);
            }
        };

        //消息消費
        channel.basicConsume(queueName, true, consumer);
        // channel.basicConsume(queueName, 是否制動接收, consumer);
    }
}

注意,先運行接收者而後在運行消息發送者,能夠看到兩個消費者接收到的消息同樣spa

 2direct (不一樣消費者接收到不一樣消息)

根據RoutingKey匹配消息路由到指定的隊列code

 

 

 

 

 

首先-生產者代碼。blog

package item.com.direct;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogDirect {
    //定義交換機名稱
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws  Exception{
        //建立鏈接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPassword("admin");
        connectionFactory.setUsername("admin");
        //建立一個鏈接
        Connection connection = connectionFactory.newConnection();
        //創建一個頻道
        Channel channel = connection.createChannel();
        //交換聲明  channel.exchangeDeclare(交換機名字,交換機類型)
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //定義不一樣的消息  =》根據 routingKey
        String message = "Hello World!";
        String severity = "info";//routingKey
        channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
        severity = "warning";
        channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
        severity = "error";
        channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
        channel.close();
        connection.close();
    }
}

 

生產者代碼(此處代碼接收了三個routingKey)rabbitmq

package item.com.direct;

import com.rabbitmq.client.*;
import java.io.IOException;

public class ReceiveLogsDirect1 {
    //定義交換機名稱
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws Exception {
        //建立鏈接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPassword("admin");
        connectionFactory.setUsername("admin");
        //建立一個鏈接
        Connection connection = connectionFactory.newConnection();
        //創建一個頻道
        Channel channel = connection.createChannel();

        //交換聲明  channel.exchangeDeclare(交換機名字,交換機類型)
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //消息接收和隊列綁定,不和交換機綁定
        //獲取一個非持久的隊列
        String queueName = channel.queueDeclare().getQueue();
        //將隊列綁定到交換機上,一個交換機同時綁定三個queue
        channel.queueBind(queueName, EXCHANGE_NAME, "info");
        channel.queueBind(queueName, EXCHANGE_NAME, "warning");
        channel.queueBind(queueName, EXCHANGE_NAME, "error");

        Consumer consumer = new DefaultConsumer(channel) {
            //重寫 handleDelivery   =》new handleDelivery而後回車
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("  Received '" + envelope.getRoutingKey() + "':'" + message + "'");
            }
        };
        //消息消費
        channel.basicConsume(queueName,true,consumer);
        // channel.basicConsume(queueName, 是否制動接收, consumer);
    }
}

 3topic (direct升級)

和direct相比,多了模糊查詢進行篩選,功能比direct更增強大隊列

*能夠代替一個單詞
#能夠替代零個或多個單詞路由

 

 

 

 

 生產者代碼get

package item.com.topic;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogTopic {
    //定義交換機名稱
    private static final String EXCHANGE_NAME="topic_logs";

    public static void main(String[] args) throws  Exception {
        //建立鏈接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        //建立一個鏈接
        Connection connection = connectionFactory.newConnection();
        //創建一個頻道
        Channel channel = connection.createChannel();
        //交換聲明  channel.exchangeDeclare(交換機名字,交換機類型)
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        //定義不一樣的消息  =》根據 routingKey
        String message = "Animal World";
        //定義多個routingKey(路由鍵)
        String[] routingKeys = new String[9];
        routingKeys[0] = "quick.orange.rabbit";
        routingKeys[1] = "lazy.orange.elephant";
        routingKeys[2] = "quick.orange.fox";
        routingKeys[3] = "lazy.brown.fox";
        routingKeys[4] = "lazy.pink.rabbit";
        routingKeys[5] = "quick.brown.fox";
        routingKeys[6] = "orange";
        routingKeys[7] = "quick.orange.male.rabbit";
        routingKeys[8] = "lazy.orange.male.rabbit";

        //發送消息
        for (int i = 0; i <routingKeys.length ; i++) {
            channel.basicPublish(EXCHANGE_NAME,routingKeys[i],null,message.getBytes("UTF-8"));
        }

        channel.close();
        connection.close();
    }

}

消費者代碼1

package item.com.topic;

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogsTopic1 {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        String queueName = channel.queueDeclare().getQueue();

        //指定bindingKey
        String bindingKey1 = "*.*.rabbit";
        channel.queueBind(queueName, EXCHANGE_NAME, bindingKey1);
        String bindingKey2 = "lazy.#";
        channel.queueBind(queueName, EXCHANGE_NAME, bindingKey2);

        Consumer consumer=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(
                        " [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
            }
        };
        channel.basicConsume(queueName,true,consumer);
    }
}

消費者2代碼

package item.com.topic;

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogsTopic2 {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        String queueName = channel.queueDeclare().getQueue();

        //指定bindingKey
        String bindingKey = "*.orange.*";
        channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);

        Consumer consumer=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(
                        " [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
            }
        };
        channel.basicConsume(queueName,true,consumer);
    }
}

 

輸出結果【根據routingKey 進行相應的模糊查詢】

相關文章
相關標籤/搜索