rabbitmq簡單使用

rabbitmq提供了6中消息隊列模型服務器

  1. 簡單模式

  • 鏈接rabbitmq
public class RabbitmqConnectionUtil {
    public static Connection getConnection() throws IOException, TimeoutException {
        // 鏈接工廠
        ConnectionFactory connectionFactory = new 	ConnectionFactory();
        // 服務器地址
        connectionFactory.setHost("localhost");
        //服務器端口
        connectionFactory.setPort(5672);
        // rabbitmq用戶名
        connectionFactory.setUsername("guest");
        // rabbitmq密碼
        connectionFactory.setPassword("guest");
        return connectionFactory.newConnection();
    }
}
  • 生產者
public class Sender {
    public static final String QUEUE_NAME = "test_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 獲取rabbitmq 服務器鏈接
        Connection connection = RabbitmqConnectionUtil.getConnection();

        // 建立rabbitmq的隊列
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 將消息發佈到隊列
        String message = "hello, world";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

        // 關閉
        channel.close();
        connection.close();
    }
}
  • 消費者
public class Receive {
    private static final String QUEUE_NAME = "test_queue";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = RabbitmqConnectionUtil.getConnection();

        // 建立通道
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 定義消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(message);
            }
        };
        // 綁定通道和消費者
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }

    private static void oldApi() throws IOException, TimeoutException, InterruptedException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        // 從鏈接中建立通道
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 定義消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);

        // 監聽隊列
        channel.basicConsume(QUEUE_NAME, false, consumer);

        while (true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("[Rec]:"+ message);
        }
    }
}

簡單模型:生產者與消費者是一一對應的,一個生產者只能有一個消費者,若是多個的話,必須建立多個,耦合性強
2. Work Queues
Simple隊列是生產者和消費者一一對應的,實際開發中,生產者發送消息是絕不費力的,消費者通常須要處理業務,花費的時間較長,若是使用Simple會形成消息的擠壓異步

  • 生產者
public class Send {
    public static final String QUEUE_NAME = "test_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        
        // 建立channal
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        
        //發佈信息
        for (int i = 0; i < 50; i++) {
            String message = "hello "+i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

            try {
                Thread.sleep(i*100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
        channel.close();
        connection.close();
    }
}
  • 消費者一
public class Rece1 {
    public static final String QUEUE_NAME = "test_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("[1] reced " + message);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("done");
                }
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}
  • 消費者二 和消費者一代碼同樣 消費1和消費2處理的消息是同樣的,輪訓去消費消費隊列裏面的消息, 輪訓分發(Round-Robin)
    消費者1:都是奇數
    消費者2:都是偶數
    無論消費端誰忙誰閒,都不會多個一個消息,任務消息你一個我一個 公平分發:多勞多得
  1. 公平分發, 多勞多得
  • 生產者
public class Send {
    public static final String QUEUE_NAME = "test_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        
        // 建立channal
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        /*
         每一個消費者發送確認消息以前,消息隊列再也不發送下一個消息到消費者,一次只處理一個消息
         */
        int prefectCount = 1;
        channel.basicQos(prefectCount);

        //發佈信息
        for (int i = 0; i < 50; i++) {
            String message = "hello "+i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        }
        channel.close();
        connection.close(); 
    }
}
  • 消費者一
public class Rece1 {
    public static final String QUEUE_NAME = "test_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("[1] reced " + message);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("done");
                }
                // 完成以後,手動應答
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        //關閉自動應答
        boolean ack = false;
        channel.basicConsume(QUEUE_NAME, ack, consumer);
    }
}
  • 消費者二
public class Rece2 {
    public static final String QUEUE_NAME = "test_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("[1] reced " + message);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("done");
                }
                // 手動應答
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        //關閉自動應答
        boolean ack = false;
        channel.basicConsume(QUEUE_NAME, ack, consumer);
    }
}
  1. 消費應答與消息持久化
  • 消息應答 boolean ack = false;
    channel.basicConsume(QUEUE_NAME, ack, consumer);
    boolean ack = true: 自動應答,一旦rabbitmq將消息分發給消費者,就會從內存中刪除,這種狀況下,若是殺死正在執行的消費者,就會丟失正在處理的消息

boolean ack = false 手動模式,若是一個消費者掛掉,就會交付給其餘消費者,rabbitmq支持消息應答,消費者發送一個消息應答,告訴rabbitmq這個消息我已經處理掉了,你能夠刪除了,而後rabbitmq就刪除內存中的消息
消息應答模式是打開的,false
若是這種方式 rabbitmq掛的話,整個消息都會丟失ide

  • 消息持久化
boolean durable = false;
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

durable=true,設置爲true,若是在代碼運行前QUEUE_NAME隊列已經存在,而且不是持久化的,rabbitmq不容許從新定義一個已經存在的隊列
  1. 訂閱模式

    解讀:
  • 一個生產者,多個消費者
  • 每一個消費者都有本身的隊列
  • 生產者沒有直接將消息發送給隊列,而是到了交換機exchange
  • 每一個隊列都綁定到交換機上
  • 生產者發送消息 通過交換機 到達隊列就能實現一個消息被多個消費者消費
  • 生產者
public class Send {
    public static final String EXCHANGE_NAME = "test_exchange_fanout";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明交換機
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");  // 分發

        // 發送消息
        String message = "hello world";
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        channel.close();
        connection.close();

    }
}
  • 消費者一
public class Receive {
    public static final String QUEUE_NAME = "test_queue_email";
    public static final String EXCHANGE_NAME = "test_exchange_fanout";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false,null);

        // 綁定交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("[1] reced " + message);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("done");
                }
                // 手動應答
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        //關閉自動應答
        boolean ack = false;
        channel.basicConsume(QUEUE_NAME, ack, consumer);
    }
}
  • 消費者二
public class Receive2 {
    public static final String QUEUE_NAME = "test_queue_sms";
    public static final String EXCHANGE_NAME = "test_exchange_fanout";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false,null);

        // 綁定交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("[2] reced " + message);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("done");
                }
                // 手動應答
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        //關閉自動應答
        boolean ack = false;
        channel.basicConsume(QUEUE_NAME, ack, consumer);
    }
}
  1. Direct 模式
  • 生產者
public class Send {
    private static final String EXCHANGE_NAME = "test_exchange_direct";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 聲明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        channel.basicQos(1);
        String message = "hello direct";

        // 發送消息
        String routingKey = "warning";
        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
        System.out.println("send " +  message);
        channel.close();
        connection.close();
    }
}
  • 消費者一
public class Recv1 {
    private static final String EXCHANGE_NAME = "test_exchange_direct";
    private static final String QUEUE_NAME = "test_queue_direct_1";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.basicQos(1);
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("[1] Recv "+ message);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    // 手動應答
                    System.out.println("done");
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        // 關閉自動應答
        boolean ack = false;
        channel.basicConsume(QUEUE_NAME, ack, defaultConsumer);
    }
}
  • 消費者二
public class Recv2 {
    private static final String EXCHANGE_NAME = "test_exchange_direct";
    private static final String QUEUE_NAME = "test_queue_direct_2";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.basicQos(1);
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("[1] Recv "+ message);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    // 手動應答
                    System.out.println("done");
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        // 關閉自動應答
        boolean ack = false;
        channel.basicConsume(QUEUE_NAME, ack, defaultConsumer);
    }
}
  1. Topic 主題模式

  • 生產者
public class Send {
    private static final String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        String routingKey = "goods.update";
        String message = "商品。。。。";

        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());

        channel.close();
        connection.close();
    }
}
  • 消費者一
public class Recv1 {
    private static final String EXCHANGE_NAME = "test_exchange_topic";
    private static final String QUEUE_NAME = "test_queue_topic_2";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.basicQos(1);
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#");

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("[2] Recv "+ message);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    // 手動應答
                    System.out.println("done");
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        // 關閉自動應答
        boolean ack = false;
        channel.basicConsume(QUEUE_NAME, ack, defaultConsumer);
    }
}
  • 消費者二
public class Recv2 {
    private static final String EXCHANGE_NAME = "test_exchange_topic";
    private static final String QUEUE_NAME = "test_queue_topic_1";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.basicQos(1);
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.add");

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("[1] Recv "+ message);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    // 手動應答
                    System.out.println("done");
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        // 關閉自動應答
        boolean ack = false;
        channel.basicConsume(QUEUE_NAME, ack, defaultConsumer);

    }
}
  1. 消息確認機制
    生產者將消息發送出去以後,消息到底有沒有到達rabbitmq服務器 默認狀況不知道。有兩種方式
  • AMQP協議實現了事務機制
  • Confirm模式
    事務機制
    txSelect、txCommit、txRollback
    txSelect:用戶將當前channel設置transation模式
    txCommit:用於提交事務
    txRollback:回滾事務
  • 生產者
public class TxSend {
    private static final String QUEUE_NAME = "test_queue_tx";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 發送數據
        String message  = "send  tx message";
        try{
            channel.txSelect();
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            int tt = 1/0;
            channel.txCommit();
        }catch (Exception e){
            channel.txRollback();
            System.out.println("發送消息失敗");
        }
        channel.close();
        connection.close();
    }
}
  • 消費者
public class TxRecv {
    private static final String QUEUE_NAME = "test_queue_tx";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body);
                System.out.println("[recv] message: " + message);
            }
        });
    }
}
  1. Confirm模式
    實現原理 生產者將信道設置爲Confirm模式,一旦進入confirm模式,全部在該信道上面發送的消息都被指派一個惟一的id,一旦消息被投遞到全部匹配的隊列以後,broker就會發送一個確認給生產者,使得生產者知道消息被髮送到目的隊列了,若是消息和隊列是可持久的,那麼確認消息會將消息寫入磁盤以後發出,broker回傳給生產者的確認消息中 driver-tag域包含了確認消息的序列號,此外broker也能夠設置basic.ack的multiple域,表示到這個序號以前全部的消息都已經獲得了處理。存在兩種方式:
    (1)、普通 發送一條
    (2)、批量發送
    (3)、異步發送confirm模式:提供一個回調方法
    普通模式
public class Send {
    private static final String QUEUE_NAME = "test_queue_confirm_1";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 開啓confirm模式,一個隊列只能有一種模式
        channel.confirmSelect();

        String message = "send confirm...";
        channel.basicPublish("", QUEUE_NAME, null , message.getBytes());

        if(!channel.waitForConfirms()){
            System.out.println("confirm send failed");
        }else{
            System.out.println("confirm send ok ");
        }

        // 關閉信道和鏈接
        channel.close();
        connection.close();
    }

批量模式code

public class Send2 {
    private static final String QUEUE_NAME = "test_queue_confirm_1";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 開啓confirm模式,一個隊列只能有一種模式
        channel.confirmSelect();

        String message = "send confirm...";
        for (int i = 0; i < 10; i++) {
            channel.basicPublish("", QUEUE_NAME, null , message.getBytes());
        }

        if(!channel.waitForConfirms()){
            System.out.println("confirm send failed");
        }else{
            System.out.println("confirm send ok ");
        }
        // 關閉信道和鏈接
        channel.close();
        connection.close();
    }
}
相關文章
相關標籤/搜索