RabbitMQ Work Queues(工做隊列)

  • RabbitMQ Work Queues(工做隊列)

    • 工做隊列模式爲一個生產者對應多個消費者,可是隻有一個消費者得到消息,即一個隊列被多個消費者監聽,但一條消息只能被其中的一個消費者獲取java

    • 代碼以下:git

      生產者代碼:github

public class WorkSend {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        //獲取鏈接
        Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
        //聲明通道
        Channel channel = connection.createChannel();
        //建立隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicPublish("", "hello", null, "First.".getBytes());
        channel.basicPublish("", "hello", null, "Secode..".getBytes());
        channel.basicPublish("", "hello", null, "Third....".getBytes());
        channel.basicPublish("", "hello", null, "Fourth....".getBytes());
        channel.basicPublish("", "hello", null, "Fifth.....".getBytes());
        //六、關閉通道和鏈接
        channel.close();
        connection.close();
    }
}

​ 消費者代碼spring

public class WorkRecv {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        //獲取鏈接
        Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");

        //聲明通道
        Channel channel = connection.createChannel();

        //聲明隊列隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);


        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");

            System.out.println(" [x] Received '" + message + "'");
            try {
                doWork(message);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.println(" [x] Done");
                //channel.basicAck();
            }
        };
        boolean autoAck = true; // acknowledgment is covered below
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
        });


//        DeliverCallback deliverCallback = new DeliverCallback(){
//            @Override
//            public void handle(String consumerTag, Delivery delivery) throws IOException {
//                String message = new String(delivery.getBody(), "UTF-8");
//                System.out.println(" [x] Received '" + message + "'");
//            }
//        };
//
//        channel.basicConsume(QUEUE_NAME, true, deliverCallback, new CancelCallback(){
//            @Override
//            public void handle(String consumerTag) throws IOException {
//
//            }
//        });

    }

    private static void doWork(String task) throws InterruptedException {
        for (char ch : task.toCharArray()) {
            if (ch == '.') Thread.sleep(1000);
        }
    }
}

1 生產者將消息交個交換機 2 交換機交給綁定的隊列 3 隊列由多個消費者同時監聽,只有其中一個可以獲取這一條消息,造成了資源的爭搶,誰的資源空閒大,爭搶到的可能越大;app

  • Round-robin dispatching(輪詢分發)ide

    使用任務隊列的優勢之一是可以輕鬆並行工做。若是咱們這裏積壓了不少的消息,咱們能夠增長work的並行度,這樣就能夠輕鬆擴展。fetch

    默認狀況下,RabbitMQ將每一個消息依次發送給下一個消費者。平均而言,每一個消費者都會收到相同數量的消息。這種分發消息的方式稱爲輪詢。ui

  • Message acknowledgment(消息確認)this

    使用咱們當前的代碼,RabbitMQ一旦向消費者發送了一條消息,便當即將其標記爲刪除。在這種狀況下,若是咱們kill掉一個worker,咱們將丟失正在處理的消息。而且還將丟失全部發送給該特定worker但還沒有處理的消息。 爲了確保消息永不丟失,RabbitMQ支持 消息確認。消費者發送回一個確認(告知),告知RabbitMQ特定的消息已被接收並處理,而且RabbitMQ能夠自由刪除它。 若是消費者死了(其通道已關閉,鏈接已關閉或TCP鏈接丟失)而沒有發送確認,RabbitMQ將瞭解消息未徹底處理,並將從新排隊。若是同時有其餘消費者在線,它將很快將其從新分發給另外一個消費者。這樣,您能夠確保即便worker偶爾死亡也不會丟失任何消息。 沒有任何消息超時;消費者死亡時,RabbitMQ將從新傳遞消息。即便處理一條消息花費很是很是長的時間也不要緊。 默認狀況下,手動消息確認處於打開狀態。咱們經過autoAck = false 標誌顯式關閉了它們。在消息完成投遞的時候,手動確認消息投遞成功。spa

    消費者代碼修改以下:

public class ManWorkRecv {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        //獲取鏈接
        Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");

        //聲明通道
        Channel channel = connection.createChannel();

        //聲明隊列隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);


        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");

            System.out.println(" [x] Received '" + message + "'");
            try {
                doWork(message);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.println(" [x] Done");
                //手動確認消息已經成功投遞
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        boolean autoAck = false; //設置消息手動確認
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
        });

    }

    private static void doWork(String task) throws InterruptedException {
        for (char ch : task.toCharArray()) {
            if (ch == '.') Thread.sleep(1000);
        }
    }
}

​ 上述代碼能夠確保咱們在kill掉一個消費者的狀況下,消息不會丟失。

  • Message durability(消息持久化)

    前面已經講了即便一個消費者退出,消息不會丟失,可是若是RabbitMQ服務退出時,隊列和消息仍然會丟失,這是由於 默認隊列和消息都是放在內存中的 。爲保證消息和隊列不丟失,須要把隊列和消息設置爲持久化 確保RabbitMQ永遠不會丟失咱們的隊列。聲明持久化代碼以下:

    boolean durable = true;
    channel.queueDeclare("hello", durable, false, false, null);

    因爲RabbitMQ已經存在一個hello隊列,而且RabbitMQ不支持對已經存在的隊列進行參數修改,因此須要咱們刪除以前建立的隊列或者從新建立一個隊列。

    其次,咱們須要保證消息的持久化,消息持久化設置以下:

    channel.basicPublish("", "hello", MessageProperties.PERSISTENT_TEXT_PLAIN, "Fifth.....".getBytes());

    這樣,咱們就能夠保證RabbitMQ服務宕機的狀況下,消息和隊列都不會丟失。

  • Fair dispatching(公平分發)

    ​ RabbitMQ在進行消息分發的時候,能夠設置一次分發給某一個消費者多少條消息,如在消費者端設置prefetchCount=1;以下代碼

    int prefetchCount = 1 ; 
    channel.basicQos(prefetchCount);

    該設置表示,RabbitMQ一次分發給消費者一條消息,在消費者處理並確認上一條消息以前,不會再給這個消費者發送一條新消息,而會將其分發給其餘的消費者

  • SpringBoot實現:

    @SpringBootApplication
    @EnableScheduling
    public class RabbitAmqpTutorialsApplication {
    
        public static void main(String[] args) throws Exception {
            SpringApplication.run(RabbitAmqpTutorialsApplication.class, args);
        }
    
    }
    @Configuration
      public class Tut2Config {
    
          @Bean
          public Queue hello() {
              return new Queue("hello");
          }
    
          /**
           * 這裏是啓動兩個消費者
           */
          private static class ReceiverConfig{
              @Bean
              public Tut2Receiver receiver1(){
                  return new Tut2Receiver(1);
              }
    
              @Bean
              public Tut2Receiver receiver2(){
                  return new Tut2Receiver(2);
              }
          }
    
          @Bean
          public Tut2Sender sender() {
              return new Tut2Sender();
          }
    
    
      }
    //監聽隊列
    @RabbitListener(queues = "hello")
    public class Tut2Receiver {
    
        private final int instance;
    
        public Tut2Receiver(int i) {
            this.instance = i;
        }
    
        @RabbitHandler
        public void receive(String in) throws InterruptedException {
            StopWatch watch = new StopWatch();
            watch.start();
            System.out.println("instance " + this.instance +
                    " [x] Received '" + in + "'");
            doWork(in);
            watch.stop();
            System.out.println("instance " + this.instance +
                    " [x] Done in " + watch.getTotalTimeSeconds() + "s");
        }
    
        private void doWork(String in) throws InterruptedException {
            for (char ch : in.toCharArray()) {
                if (ch == '.') {
                    Thread.sleep(1000);
                }
            }
        }
    
    }
    public class Tut2Sender {
        @Autowired
        private RabbitTemplate template;
    
        //隊列
        @Autowired
        private Queue queue;
    
        AtomicInteger dots = new AtomicInteger(0);
    
        AtomicInteger count = new AtomicInteger(0);
    
        /**
         * 定時向隊列hello發送消息
         */
        @Scheduled(fixedDelay = 1000, initialDelay = 500)
        public void send() {
            StringBuilder builder = new StringBuilder("Hello");
            if (dots.incrementAndGet() == 3) {
                dots.set(1);
            }
            for (int i = 0; i < dots.get(); i++) {
                builder.append('.');
            }
            builder.append(count.incrementAndGet());
            String message = builder.toString();
            //向隊列中發送消息
            template.convertAndSend(queue.getName(), message);
            System.out.println(" [x] Sent '" + message + "'");
        }
    
    }

    相關代碼連接: https://github.com/albert-liu435/springmq

相關文章
相關標籤/搜索