RabbitMQ使用教程

 

  • 簡單消息隊列
    • 模型

官方圖例中,P表明消息生產者,中間表明消息隊列,C表明消息的消費者。java

  • 獲取RabbitMQ連接
/**

 * 獲取MQ鏈接

 */

public static Connection getConnection() throws IOException, TimeoutException {

    //定義一個鏈接工廠

    ConnectionFactory factory = new ConnectionFactory();

    //設置服務地址

    factory.setHost("*.*.*.*");

    // 協議:端口號 AMQP:5672

    factory.setPort(5672);

    //設置數據庫

    factory.setVirtualHost("/vhost_haoyuehong");

    //用戶名

    factory.setUsername("haoyuehong");

    //密碼

    factory.setPassword("123456");

    factory.setConnectionTimeout(30*1000);//默認60秒

    return factory.newConnection();

}

 

  • 發送一個消息
public class Send {

    private static final String QUEUE_NAME = "test_simple_queue";



    public static void main(String[] args) throws IOException, TimeoutException {

        //獲取一個鏈接

        Connection connection = ConnectionUtils.getConnection();

        //從鏈接中獲取吧通道

        Channel channel = connection.createChannel();

        //建立隊列聲明(註冊)

        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //要發送的消息

        String msg = "hello rabiitmq";

        channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());

        System.out.println("-->send:"+msg);

        channel.close();

        connection.close();

    }

}

運行以上程序,在RabbitMQ web控制面板QUEUE選項卡中能夠看到以下:python

說明消息發送成功。web

  • 接受(消費)消息
/**
 * 消息的消費者
 */
public class Recv {
    private static final String QUEUE_NAME = "test_simple_queue";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //獲取一個鏈接
        Connection connection = ConnectionUtils.getConnection();
        //從鏈接中獲取通道
        Channel channel = connection.createChannel();
        //定義隊列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //監聽隊列
        channel.basicConsume(QUEUE_NAME,true,consumer);
        while (true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String s = new String(delivery.getBody());
            System.out.println("<----recv:"+s);
        }
    }
}

消息數量變爲0,說明已被消費。數據庫

以上方法已過世,修改以下:分佈式

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        //獲取一個鏈接

        Connection connection = ConnectionUtils.getConnection();

        //從鏈接中獲取通道

        Channel channel = connection.createChannel();

        //聲明隊列

        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //事件機制  有消息時觸發

        DefaultConsumer consumer = new DefaultConsumer(channel) {

            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                String msg = new String(body, "UTF-8");

                System.out.println("<------new API recv:" + msg);

            }

        };

        //監聽隊列

        channel.basicConsume(QUEUE_NAME,true,consumer);

    }
  • 總結:簡單消息隊列耦合性高,生產者與消費者一一對應,不能讓多個消費者同時消費,不能知足分佈式場景,且當消費者的對列名變動時,生產者的對列名也要相應變動。
  • Work  Queues工做隊列
  • 模型
    1. 輪訓分發(Round-Robin dispatch

咱們假設有一個生產者和兩個消費者(生產者和消費者的代碼和Simple queue同樣,只是多了一個消費者),生產者每0.5秒發送一條消息(共發送50條),消費者12秒處理一條消息,消費者21秒處理一條消息,測試結果發現:消費者1處理的數據數量和消費者2消費的消息數量相同,且消費者1處理奇數條(下角標爲奇數)的數據,消費者2處理偶數條(下角標爲偶數)的消息,這種方式叫作輪訓分發(Round-Robin測試

        2.公平分發(fair dispatchfetch

消費者每次處理完消息後手動反饋給消息隊列,消息隊列分發下一條消息給消費者。ui

代碼以下:spa

  • 生產者:
public class Send {

    private static final String QUEUE_NAME = "test_work_queue";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        Connection connection = ConnectionUtils.getConnection();

        Channel channel = connection.createChannel();

        //聲明隊列

        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //每一個消費者在消息確認以前,消息隊列不發送消息到消費者,一次只處理一個消息

        int prefetchCount = 1;

        channel.basicQos(prefetchCount);

        //發送50條消息

        for(int i=0;i<50;i++){

            String msg = "hello work queue :"+i;

            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());

            Thread.sleep(i*20);

        }

        channel.close();

        connection.close();

    }

}
  • 消費者1
public class Recv1 {

    private static final String QUEUE_NAME = "test_work_queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        //獲取鏈接

        Connection connection = ConnectionUtils.getConnection();

        //獲取通道

        final Channel channel = connection.createChannel();

        //聲明隊列

        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        channel.basicQos(1);

        //定義消費者

        DefaultConsumer consumer = new DefaultConsumer(channel) {

            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                String s = new String(body, "UTF-8");

                System.out.println("Recv1 <------收到消息:"+s);

                try {

                    Thread.sleep(1000*2);

                } catch (InterruptedException e) {

                    e.printStackTrace();

                }finally {

                    System.out.println("[1] done");

                    channel.basicAck(envelope.getDeliveryTag(),false);

                }

            }

        };

        //監聽隊列

        boolean autoAck = false; //自動應答

        channel.basicConsume(QUEUE_NAME,autoAck,consumer);

    }

}
  • 消費者2
public class Recv2 {

    private static final String QUEUE_NAME = "test_work_queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        //獲取鏈接

        Connection connection = ConnectionUtils.getConnection();

        //獲取通道

        final Channel channel = connection.createChannel();

        //聲明隊列

        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        channel.basicQos(1);

        //定義消費者

        DefaultConsumer consumer = new DefaultConsumer(channel) {

            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                String s = new String(body, "UTF-8");

                System.out.println("Recv1 <------收到消息:"+s);

                try {

                    Thread.sleep(1000*2);

                } catch (InterruptedException e) {

                    e.printStackTrace();

                }finally {

                    System.out.println("[1] done");

                    channel.basicAck(envelope.getDeliveryTag(),false);

                }

            }

        };

        //監聽隊列

        boolean autoAck = false; //自動應答

        channel.basicConsume(QUEUE_NAME,autoAck,consumer);

    }

}

 

  • 輪訓分發與公平分發的區別:
  • 每一個消費者發送確認消息以前,消息隊列不發送下一個消息到消費者,一次只處理一個消息。限制發送給同一個消費者不得超過1條消息;
int prefetchCount = 1;

channel.basicQos(prefetchCount);

 

  • 消費者增長了channel.basicQos(1)控制同一消息消費次數
channel.basicQos(1);

 

  • 將輪詢分發時自動應答修改成手動應答
channel.basicAck(envelope.getDeliveryTag(),false);

………………

boolean autoAck = false; //自動應答

channel.basicConsume(QUEUE_NAME,autoAck,consumer);

 

  • 消息應答
boolean autoAck = false; //自動應答

channel.basicConsume(QUEUE_NAME,autoAck,consumer);

autoAck爲自動應答,是一個布爾值,3d

  1. boolean autoAck = true; 模式

一旦RabbitMQ將消息分發給消費者,就會將消息從內存中刪除,這種狀況下,若是消息處理過程當中發生異常,消息將會丟失,

    2. boolean autoAck = false; 手動確認模式

若是有一個消費者掛掉,消息隊列將會將該消息交付給其餘消費者,消費者發送消息應答後RabbitMQ將內存中的消息刪除

消息應答默認爲false,假設一種極端狀況,RabbitMQ發生故障宕機了,由於消息存在內存中,RabbitMQ掛掉後,消息也會丟失,這就須要將數據持久化

  • 消息持久化
boolean durable = false; //持久化

channel.queueDeclare(QUEUE_NAME, durable,false,false,null);

注意:聲明好的隊列不能修改durable,由於已經將隊列聲明好,RabbitMQ不許許從新定義一個已存在的對列

  • 訂閱模式(Publish/Subscribe
  • 模型

https://www.rabbitmq.com/img/tutorials/python-three.png

                                                                                            X---交換機

    • 一個生產者多個消費者
    • 每一個消費者都有本身的對列
    • 生產者沒有直接把消息發送到對列,而是發送到交換機(轉發器 exchange
    • 每一個隊列都要綁定到交換機上
    • 生產者發送的消息通過交換機到達對列,就能實現一個消息被多個消費者消費

生產者:

public class Send {

    private static final String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = ConnectionUtils.getConnection();

        Channel channel = connection.createChannel();

        //聲明交換機

        channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); //1 交換機名字  2 交換機類型

        //發送消息

        String msg = "hello ps";

        channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());

        System.out.println("send msg ----->"+msg);

        channel.close();

        connection.close();

    }

}

運行以上程序,查看控制檯:

此時,消息已經丟失,由於交換機沒有存儲能力,RabbitMQ裏面只有對列有存儲能力,因爲此時沒有對列綁定,因此消息丟失了

消費者1

public class Recv1 {

    private static final String QUEUE_NAME = "test_queue_fanout_email";

    private static final String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = ConnectionUtils.getConnection();

        final Channel channel = connection.createChannel();

        //對列聲明

        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //綁定對列到交換機

        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");

        channel.basicQos(1);

        //定義消費者

        DefaultConsumer consumer = new DefaultConsumer(channel) {

            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                String s = new String(body, "UTF-8");

                System.out.println("Recv1 <------收到消息:"+s);

                try {

                    Thread.sleep(1000*2);

                } catch (InterruptedException e) {

                    e.printStackTrace();

                }finally {

                    System.out.println("[1] done");

                    channel.basicAck(envelope.getDeliveryTag(),false);

                }

            }

        };

        //監聽隊列

        boolean autoAck = false; //自動應答

        channel.basicConsume(QUEUE_NAME,autoAck,consumer);

    }

}

消費者2

public class Recv1 {

    private static final String QUEUE_NAME = "test_queue_fanout_email";

    private static final String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = ConnectionUtils.getConnection();

        final Channel channel = connection.createChannel();

        //對列聲明

        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //綁定對列到交換機

        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");

        channel.basicQos(1);

        //定義消費者

        DefaultConsumer consumer = new DefaultConsumer(channel) {

            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                String s = new String(body, "UTF-8");

                System.out.println("Recv2 <------收到消息:"+s);

                try {

                    Thread.sleep(1000*2);

                } catch (InterruptedException e) {

                    e.printStackTrace();

                }finally {

                    System.out.println("[2] done");

                    channel.basicAck(envelope.getDeliveryTag(),false);

                }

            }

        };

        //監聽隊列

        boolean autoAck = false; //自動應答

        channel.basicConsume(QUEUE_NAME,autoAck,consumer);

    }

}
  • Exchange(交換機/轉發器)
  • 接受生產者消息
  • 向對列推送消息
  • Exchange類型

        交換器的類型,常見的有fanout、directtopicheaders這四種

  1.     Fanout Exchange : 不處理任何的路由鍵,它會把全部發送到該交換器的消息路由到全部與該交換器綁定的隊列中,如上代碼即是Fanout Exchange

    

    1.1    模式

https://www.rabbitmq.com/img/tutorials/python-three-overall.png

 

 

        2.    Direct Exchange : 直連的方式 把消息路由到那些 BindingKey RoutingKey 徹底匹配的隊列中

        2.1    模式

https://www.rabbitmq.com/img/tutorials/python-four.png

生產者:

public class Send {

    private static final String EXCHANGE_NAME = "test_exchange_rounting";

    private static final String ROUNTINGKEY = "key";

    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = ConnectionUtils.getConnection();

        Channel channel = connection.createChannel();

        //聲明交換機

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT.getType());

        String msg = "hello rounting";

        channel.basicPublish(EXCHANGE_NAME,ROUNTINGKEY,null,msg.getBytes());//ROUNTINGKEY聲明瞭路由鍵 相同的路由鍵能收到消息

        System.out.println("send msg ----->"+msg);

        channel.close();

        connection.close();

    }

}

 

消費者:

public class Recv1 {

    private static final String QUEUE_NAME = "test_queue_rounting_email";

    private static final String EXCHANGE_NAME = "test_exchange_rounting";

    private static final String ROUNTINGKEY = "key";

    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = ConnectionUtils.getConnection();

        final Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUNTINGKEY); //與生產者聲明的路由鍵相同

        channel.basicQos(1);

        //定義消費者

        DefaultConsumer consumer = new DefaultConsumer(channel) {

            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                String s = new String(body, "UTF-8");

                System.out.println("Recv1 <------收到消息:"+s);

                try {

                    Thread.sleep(1000*2);

                } catch (InterruptedException e) {

                    e.printStackTrace();

                }finally {

                    System.out.println("[1] done");

                    channel.basicAck(envelope.getDeliveryTag(),false);

                }

            }

        };

        //監聽隊列

        boolean autoAck = false; //自動應答

        channel.basicConsume(QUEUE_NAME,autoAck,consumer);

    }

}

 

  1. Topic Exchange : 交換器在匹配規則上進行了擴展,它有必定的約束條件:
    1. ​​​​​​​RoutingKey 爲一個點號" . 「分隔的字符串(被點號」.「分隔開的每段獨立的字符串稱爲一個單詞) 如:com.toher.user 和 org.toher.product
    2. BindingKey和RoutingKey一樣也是點號」."分隔的字符串;
    3. BindingKey 中能夠存在兩種特殊字符串 * 和 #,用於作模糊匹配,其中 * 表明一個單詞,# 能夠用於匹配多規格單詞(能夠是零個)

  1. 模式

 

https://www.rabbitmq.com/img/tutorials/python-five.png

相關文章
相關標籤/搜索