RabbitMQ結合JavaAPI簡單使用

總結

  • 消息消費者只須要明確從哪一個消息隊列獲取消息
  • exchange建立後 不能再建立相同名字+不一樣模式的
  • 多個模式可結合使用 能夠直接發到指定隊列,也能夠發到交換機由對應策略轉發到對應隊列

RabbitMQ Java原生api使用

1.HelloWorld 簡單模式

添加Virtual Hosts

簡單模式消息生產者

public class Provider {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.建立連接工廠對象
        ConnectionFactory factory = new ConnectionFactory();
        // 2.設置RabbitMQ服務主機地址,默認localhost
        factory.setHost("localhost");
        // 3.設置RabbitMQ服務端口,默認5672
        factory.setPort(5672);
        // 4.設置虛擬主機名字,默認/
        factory.setVirtualHost("/demo1");
        // 5.設置用戶鏈接名,默認guest
//        factory.setUsername("guest");
        // 6.設置連接密碼,默認guest
//        factory.setPassword("guest");
        // 7.建立一個新連接
        Connection connection = factory.newConnection();
        // 8.建立消息通道
        Channel channel = connection.createChannel();
        // 9.建立隊列
        channel.queueDeclare("simple_queue",true,false,false,null);
        // 10.建立消息
        String msg="simple queue demo";
        // 11.消息發送
        channel.basicPublish("","simple_queue",null,msg.getBytes());
        // 12.關閉資源
        channel.close();
        connection.close();
    }
}

簡單模式消息消費者

public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.建立連接工廠對象
        ConnectionFactory factory = new ConnectionFactory();
        // 2.設置RabbitMQ服務主機地址,默認localhost
        factory.setHost("localhost");
        // 3.設置RabbitMQ服務端口,默認5672
        factory.setPort(5672);
        // 4.設置虛擬主機名字,默認/
        factory.setVirtualHost("/demo1");
        // 5.設置用戶鏈接名,默認guest
        // 6.設置連接密碼,默認guest
        // 7.建立一個新連接
        Connection connection = factory.newConnection();
        // 8.建立消息通道
        Channel channel = connection.createChannel();
        // 9.建立隊列
        channel.queueDeclare("simple_queue",true,false,false,null);
        // 10.建立消費者,並設置消息處理
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 路由
                String routingKey = envelope.getRoutingKey();
                // 交換機
                String exchange = envelope.getExchange();
                // 消息id
                long deliveryTag = envelope.getDeliveryTag();
                // 消息體
                String message = new String(body, "UTF-8");
                System.out.println("路由:" + routingKey + ",交換機:" + exchange + ",消息id:" + deliveryTag + ",消息體:" + message);
                super.handleDelivery(consumerTag, envelope, properties, body);
            }
        };
        // 11.消息監聽
        channel.basicConsume("simple_queue", true, consumer);
        // 12.關閉資源(不建議關閉,建議一直監聽消息)
    }
}

2.Work queues 工做隊列模式

// 與上面同樣,不過就是建立多個消息消費者去監聽同一個消息隊列,消息分配爲輪詢式

3.Publish/subscribe 發佈訂閱模式

此模式下呢多出一個概念exchange(交換機)能夠在交換機上綁定多個消息隊列,而如今消息產生者將消息發送到交換機,由交換機調度給他下面的消息隊列,此模式下交換機將會將消息發給全部與他綁定的隊列java

工做隊列模式消息生產者

// 9.建立隊列
        channel.queueDeclare("simple_queue1",true,false,false,null);
        channel.queueDeclare("simple_queue2",true,false,false,null);
        //  建立交換機:arg0,交換機名稱  arg1,交換機類型(廣播)
        channel.exchangeDeclare("demo3Exchange", BuiltinExchangeType.FANOUT);
        // 將隊列綁定到交換機
        channel.queueBind("simple_queue1","demo3Exchange","");
        channel.queueBind("simple_queue2","demo3Exchange","");
        // 10.建立消息  ....

工做隊列模式消息消費者

class Consumer1
	...
	// 11.監聽 消息隊列simple_queue1
        channel.basicConsume("simple_queue1", true, consumer);
        ...
            
    class Consumer2
    ...
    // 11.監聽 消息隊列simple_queue2
        channel.basicConsume("simple_queue2", true, consumer);
        ...

4.Routing 路由模式

此模式下多出概念路由,基於上一模式,上一模式交換機會將接收到的消息發給全部綁定了的隊列,此模式下接收到的消息會多一個參數Routing,會將消息轉發到對應Routing的隊列api

路由模式消息生產者

// 9.建立隊列
        channel.queueDeclare("simple_queue1",true,false,false,null);
        channel.queueDeclare("simple_queue2",true,false,false,null);
        //  建立交換機:arg0,交換機名稱  arg1,交換機類型(路由對應)
        channel.exchangeDeclare("demo4Exchange", BuiltinExchangeType.DIRECT);
        // 將隊列綁定到交換機
        // simple_queue1只會接收到routingKey 爲error的消息
        channel.queueBind("simple_queue1","demo4Exchange","error");
        // simple_queue2會接收到routingKey 爲info,warning,error的消息
        channel.queueBind("simple_queue2","demo4Exchange","info");
        channel.queueBind("simple_queue2","demo4Exchange","warning");
        channel.queueBind("simple_queue2","demo4Exchange","error");
        // 10.建立消息
        // 11.消息發送
        for (int i = 0; i < 100; i++) {
            // 建立消息
            String message = "routing_key:" + i;
            String routingKey = "";
            if (i%2 == 0){ // routing_key_queue一、routing_key_queue2 0、二、四、六、8
                routingKey = "error";
            }else if (i%5 == 0){    // routing_key_queue2:5
                routingKey = "info";
            }else { // 0、一、5
                routingKey = "warning";
            }
            message += "--->" + routingKey;
            // 消息發送
            channel.basicPublish("demo4Exchange", routingKey, null, message.getBytes());
        }

路由模式消息消費者

// 同上 消息消費者只須要監聽對應的消息隊列便可

5.Topic 通配符模式

此模式下在上面的基礎上將routingKey改成可使用通配符的模式
通配符規則:
多個單詞之間以」.」分割
‘#’:匹配一個或多個詞
:匹配很少很多剛好1個詞
舉例:
item.#:可以匹配item.insert.abc 或者 item.insert
item.
:只能匹配item.insertide

通配符模式消息生產者

// 9.建立隊列
        channel.queueDeclare("simple_queue1",true,false,false,null);
        channel.queueDeclare("simple_queue2",true,false,false,null);
        //  建立交換機:arg0,交換機名稱  arg1,交換機類型(主題)
        channel.exchangeDeclare("demo5Exchange", BuiltinExchangeType.TOPIC);
        // 將隊列綁定到交換機
        channel.queueBind("simple_queue1","demo5Exchange","#");
        channel.queueBind("simple_queue2","demo5Exchange","www.#");
        channel.queueBind("simple_queue2","demo5Exchange","*.com");
        // 10.建立消息
        // 11.消息發送
        for (int i = 0; i < 100; i++) {
            // 建立消息
            String message = "routing_key:" + i;
            String routingKey = "";
            if (i%2 == 0){ // routing_key_queue一、routing_key_queue2 0、二、四、六、8
                routingKey = "www.baidu.com";
            }else if (i%5 == 0){    // routing_key_queue2:5
                routingKey = "jd.com";
            }else { // 0、一、5
                routingKey = "dqdwwfwevweevwe21e13r23dr2gerfdqw.dqefw122e.23f.2f.23.f.2.f2.f.24.ff2wf2qef.2";
            }
            message += "--->" + routingKey;

            // 消息發送
            channel.basicPublish("demo5Exchange", routingKey, null, message.getBytes());
        }

通配符模式消息消費者

// 同上 消息消費者只須要監聽對應的消息隊列便可
相關文章
相關標籤/搜索