工做隊列中,任務避免當即執行對資源相關的操做,由於耗時較長,須要等待任務完成才能執行下一步動做web
咱們將任務封裝成一個消息,併發送到隊列中。併發
後臺運行的執行者將發送到隊列中,若是有多個執行者,任務將會分配到不一樣的執行者執行tcp
這個用在web應用解決一個短http請求窗口處理複雜處理邏輯很是有用fetch
使用任務隊列可以併發執行任務,默認地,rabbitmq將發送每一個消息到下一個消費者,每一個消費者都會平均分配消息,這種分發消息的方式爲round-robinspa
完成任務須要一段時間,假設在這段時間內線程崩潰,目前一旦將消息發送給消費者,這條消息就打上了刪除的標示,一旦中止這個線程就丟失了正在處理的消息,咱們也會丟失掉全部發到這個工做者的消息,這些消息都沒有被處理。線程
咱們須要的是,當一個工做者在不健康的狀態,咱們能夠把任務從新分配給另外一個工做者。blog
rabbitmq支持消息的確認,消費者告訴rabbitmq特定的消息已經收到,處理,rabbitmq能夠刪除它rabbitmq
若是消費者(channel關閉,鏈接斷開,tcp鏈接斷開)沒有確認消息,rabbitmq確認消息沒有徹底處理,將它從新放入隊列,若是有其餘在線的消費者,將會把消息處理的任務分配給其餘在線的消費者,消息確認默認是開着的,沒有確認的消息會再次被處理隊列
咱們已經學會了如何確保消費者服務失敗後,任務不會丟失,可是若是rabbitmq服務失敗後,任務也會丟失資源
當rabbitmq退出或者崩潰後,它將會丟失隊列和消息
兩方面能夠保證不會丟失,咱們應該將隊列和消息設置爲持久的
首先將queue設置爲持久的
命令自己是對的,可是咱們已經使用了一個非持久的同名隊列
rabbitmq不容許從新定義參數不一樣的相同隊列,所以會報錯
其次咱們能夠設置消息也是持久的 MessageProperties.PERSISTENT_TEXT_PLAIN
公平分發
平均分配,並不能保證每個消費者都可以公平的處理消息,可能有些很忙,有些卻在等待消息
設置參數prefetchCount=1,這樣的設置保證了一個消費者在接收到
消息後,在消費完成以前不會接收新的消息,會把這個消息發送給
尚未接收到消息的那個消費者
若是消息隊列滿了,而且全部的消費者都有任務在處理中,那麼
可能須要添加更多的消費者,或者使用其餘的策略對消息進行分發
相關代碼以下
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
String[] name={"a","b","c"};
String message = String.join(" ", name);
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
System.out.println(" [x] send '" + message + "'");
} catch (Exception e) {
throw new RuntimeException();
}
}
}
public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.basicQos(1);
DeliverCallback deliverCallback = ((consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("received >>" + message);
try {
doWork(message);
} finally {
System.out.println("[x] done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
});
channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> {
});
}
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
if ('.' == ch) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
}