rabbitMQ第二篇:java簡單的實現RabbitMQ

 前言:在這裏我將用java來簡單的實現rabbitMQ。下面咱們帶着下面問題來一步步的瞭解和學習rabbitMQ。java

1:若是消費者鏈接中斷,這期間咱們應該怎麼辦服務器

2:如何作到負載均衡負載均衡

3:如何有效的將數據發送到相關的接收者?就是怎麼樣過濾ide

4:如何保證消費者收到完整正確的數據函數

5:如何讓優先級高的接收者先收到數據學習

一:"Hello RabbitMQ"

下面有一幅圖,其中P表示生產者,C表示消費者,紅色部分爲消息隊列spa

 二:項目開始

2.1:首先引入rabbitMQ jar包code

 <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.5</version>
 </dependency>

2.2:建立消費者Producerblog

/**
 * 消息生成者
 */
public class Producer {
    public final static String QUEUE_NAME="rabbitMQ.test";

    public static void main(String[] args) throws IOException, TimeoutException {
        //建立鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置RabbitMQ相關信息
        factory.setHost("localhost");
      //factory.setUsername("lp");
      //factory.setPassword("");
     // factory.setPort(2088);
        //建立一個新的鏈接
        Connection connection = factory.newConnection();
        //建立一個通道
        Channel channel = connection.createChannel();
        //  聲明一個隊列        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "Hello RabbitMQ";
        //發送消息到隊列中
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
        System.out.println("Producer Send +'" + message + "'");
        //關閉通道和鏈接
        channel.close();
        connection.close();
    }
}

注1:queueDeclare第一個參數表示隊列名稱、第二個參數爲是否持久化(true表示是,隊列將在服務器重啓時生存)、第三個參數爲是不是獨佔隊列(建立者可使用的私有隊列,斷開後自動刪除)、第四個參數爲當全部消費者客戶端鏈接斷開時是否自動刪除隊列、第五個參數爲隊列的其餘參數接口

注2:basicPublish第一個參數爲交換機名稱、第二個參數爲隊列映射的路由key、第三個參數爲消息的其餘屬性、第四個參數爲發送信息的主體

2.3:建立消費者

 

public class Customer {
    private final static String QUEUE_NAME = "rabbitMQ.test";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 建立鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置RabbitMQ地址
        factory.setHost("localhost");
        //建立一個新的鏈接
        Connection connection = factory.newConnection();
        //建立一個通道
        Channel channel = connection.createChannel();
        //聲明要關注的隊列
        channel.queueDeclare(QUEUE_NAME, false, false, true, null);
        System.out.println("Customer Waiting Received messages");
        //DefaultConsumer類實現了Consumer接口,經過傳入一個頻道,
        // 告訴服務器咱們須要那個頻道的消息,若是頻道中有消息,就會執行回調函數handleDelivery
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("Customer Received '" + message + "'");
            }
        };
        //自動回覆隊列應答 -- RabbitMQ中的消息確認機制
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }

前面代碼咱們能夠看出和生成者同樣的,後面的是獲取生產者發送的信息,其中envelope主要存放生產者相關信息(好比交換機、路由key等)body是消息實體。

2.4:運行結果

生產者:

 

消費者:

 三:實現任務分發

工做隊列

一個隊列的優勢就是很容易處理並行化的工做能力,可是若是咱們積累了大量的工做,咱們就須要更多的工做者來處理,這裏就要採用分佈機制了。

咱們新建立一個生產者NewTask

public class NewTask {
    private static final String TASK_QUEUE_NAME="task_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection=factory.newConnection();
        Channel channel=connection.createChannel();
   channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
        //分發信息
        for (int i=0;i<10;i++){
            String message="Hello RabbitMQ"+i;
            channel.basicPublish("",TASK_QUEUE_NAME,
                    MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
            System.out.println("NewTask send '"+message+"'");
        }
        channel.close();
        connection.close();
    }
}

而後建立2個工做者Work1和Work2代碼同樣

public class Work1 {
    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        final ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println("Worker1  Waiting for messages");

        //每次從隊列獲取的數量
        channel.basicQos(1);

        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("Worker1  Received '" + message + "'");
                try {
                    throw  new Exception();
                    //doWork(message);
                }catch (Exception e){
                    channel.abort();
                }finally {
                    System.out.println("Worker1 Done");
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };
        boolean autoAck=false;
        //消息消費完成確認
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
    }
    private static void doWork(String task) {
        try {
            Thread.sleep(1000); // 暫停1秒鐘
        } catch (InterruptedException _ignored) {
            Thread.currentThread().interrupt();
        }
    }
}

注:channel.basicQos(1);保證一次只分發一個 。autoAck是否自動回覆,若是爲true的話,每次生產者只要發送信息就會從內存中刪除,那麼若是消費者程序異常退出,那麼就沒法獲取數據,咱們固然是不但願出現這樣的狀況,因此纔去手動回覆,每當消費者收到並處理信息而後在通知生成者。最後從隊列中刪除這條信息。若是消費者異常退出,若是還有其餘消費者,那麼就會把隊列中的消息發送給其餘消費者,若是沒有,等消費者啓動時候再次發送。

 

關於上面咱們遺留問題在下一篇繼續講解

相關文章
相關標籤/搜索