rabbitmq分佈式工做隊列work queues

任務隊列解耦

工做隊列中,任務避免當即執行對資源相關的操做,由於耗時較長,須要等待任務完成才能執行下一步動做web

咱們將任務封裝成一個消息,併發送到隊列中。併發

後臺運行的執行者將發送到隊列中,若是有多個執行者,任務將會分配到不一樣的執行者執行tcp

這個用在web應用解決一個短http請求窗口處理複雜處理邏輯很是有用fetch

 

Round-robin 分發

使用任務隊列可以併發執行任務,默認地,rabbitmq將發送每一個消息到下一個消費者,每一個消費者都會平均分配消息,這種分發消息的方式爲round-robinspa

message acknowledgment 消息確認

 

完成任務須要一段時間,假設在這段時間內線程崩潰,目前一旦將消息發送給消費者,這條消息就打上了刪除的標示,一旦中止這個線程就丟失了正在處理的消息,咱們也會丟失掉全部發到這個工做者的消息,這些消息都沒有被處理。線程

咱們須要的是,當一個工做者在不健康的狀態,咱們能夠把任務從新分配給另外一個工做者。blog

rabbitmq支持消息的確認,消費者告訴rabbitmq特定的消息已經收到,處理,rabbitmq能夠刪除它rabbitmq

 

若是消費者(channel關閉,鏈接斷開,tcp鏈接斷開)沒有確認消息,rabbitmq確認消息沒有徹底處理,將它從新放入隊列,若是有其餘在線的消費者,將會把消息處理的任務分配給其餘在線的消費者,消息確認默認是開着的,沒有確認的消息會再次被處理隊列

 

消息持久化

 

咱們已經學會了如何確保消費者服務失敗後,任務不會丟失,可是若是rabbitmq服務失敗後,任務也會丟失資源

當rabbitmq退出或者崩潰後,它將會丟失隊列和消息

兩方面能夠保證不會丟失,咱們應該將隊列和消息設置爲持久的

首先將queue設置爲持久的

命令自己是對的,可是咱們已經使用了一個非持久的同名隊列

rabbitmq不容許從新定義參數不一樣的相同隊列,所以會報錯

其次咱們能夠設置消息也是持久的 MessageProperties.PERSISTENT_TEXT_PLAIN

 

 

公平分發

 

平均分配,並不能保證每個消費者都可以公平的處理消息,可能有些很忙,有些卻在等待消息

設置參數prefetchCount=1,這樣的設置保證了一個消費者在接收到

消息後,在消費完成以前不會接收新的消息,會把這個消息發送給

尚未接收到消息的那個消費者

 

若是消息隊列滿了,而且全部的消費者都有任務在處理中,那麼

可能須要添加更多的消費者,或者使用其餘的策略對消息進行分發

相關代碼以下

 

 

public class NewTask {

private static final String TASK_QUEUE_NAME = "task_queue";

 

public static void main(String[] args) {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {

channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

String[] name={"a","b","c"};

String message = String.join(" ", name);

channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));

System.out.println(" [x] send '" + message + "'");

} catch (Exception e) {

throw new RuntimeException();

}

}

}

 

 

 

 

 

public class Worker {

 

 

private static final String TASK_QUEUE_NAME = "task_queue";

 

public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

final Connection connection = factory.newConnection();

final Channel channel = connection.createChannel();

channel.basicQos(1);

DeliverCallback deliverCallback = ((consumerTag, delivery) -> {

String message = new String(delivery.getBody(), "UTF-8");

System.out.println("received >>" + message);

try {

doWork(message);

} finally {

System.out.println("[x] done");

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

}

});

channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> {

});

 

 

}

 

private static void doWork(String task) {

for (char ch : task.toCharArray()) {

if ('.' == ch) {

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

Thread.currentThread().interrupt();

}

}

}

}

}

相關文章
相關標籤/搜索