在程序中使用內部隊列有不少充分的理由。大多數常見模式都包含相同的原理-將處理分爲兩個獨立的部分,而後每一個部分均可以自主工做。隊列是將對象從一個線程轉移到另外一個線程並確保屬於特定轉移對象的全部字段的正確可見性的最佳方法。這種常見的模式稱爲「消費者-生產者」模式。java
可是,今天,優銳課想更多地關注潛在的故障,監視以及如何避免程序中丟失消息。ios
假設你正在5個實例上的雲環境中運行微服務應用程序,這些實例一般會在幾天內部署新版本。你的應用程序具備REST端點,該端點將新任務排隊到內部隊列中進行處理,並當即返回OK。 而後,它在另外一個線程上異步處理任務。sql
有一天,你必須發佈新版本的微服務。這很容易;若是你在雲上運行,只需按一下按鈕,全部內容將做爲一個實例部署,而無需停機。服務器
在單擊按鈕時,你可能犯了一個嚴重的錯誤。你丟失了全部排隊的任務,而且可能會收到客戶的不少抱怨。架構
實際上,有兩種方法能夠解決此問題:app
你刪除了內部隊列並實現了外部隊列(例如RabbitMQ),直處處理結束時才確認任務已處理。若是你在部署過程當中將其切斷,那麼能夠在啓動並運行新版本的應用程序時從新處理任務。框架
你能夠斷開全部調用者與應用程序的鏈接,以中止填充內部隊列,並等待全部任務處理完畢,而後觸發部署。異步
可是,如何查看全部任務已被處理?個人內部隊列中有多少個任務?幾十,幾百或幾千?你可能不知道;很難猜想你的發佈者和隊列使用者之間的處理時間比例。ide
一般,有界隊列一般趨向於已滿或絕對爲空,這取決於發佈者和使用者之間的處理時間比例是穩定的仍是時間上不穩定的。若是你的隊列在某個高峯中被任務相對佔據(例如,在晚上8-11點之間)而且你有足夠的時間在晚上處理它們,那絕對是能夠的——固然,若是你願意犧牲單個任務的延遲。微服務
更糟糕的是,你有無限的隊列來保留未處理的任務,而後,若是發佈者的速度甚至比使用者的速度稍快,那麼在運行應用程序時,最終可能會遇到很大的隊列。
當你運行本身的代碼而且能夠決定要使用哪一種隊列時,就是這種狀況。當內部隊列由應用程序中使用的任何框架處理時,你甚至可能遇到這種狀況。可是,讓咱們關注一下一切都掌握在手中的狀況,而且你有機會在最後使用的內部隊列中進行一些更改。
讓咱們贊成,咱們須要有關內部隊列的更多信息,而且咱們不能僅假設將新版本的應用程序推入生產環境時咱們的隊列應該爲空。不幸的是,沒有任何方法能夠公開屬於JDK的隊列的信息。讓咱們深刻研究一下,嘗試本身暴露一些東西。
第一步,咱們將公開一些基本信息,這些信息在JDK的Queue接口中可用。
public interface QueueMonitor {
ThreadPoolExecutor executor();
/**
* Returns {@code true} if there is any thread executing any task.
*
* @return {@code true} if there is any active task.
*/
default boolean isRunning() {
return executor().getActiveCount() > 0;
}
/**
* Returns the approximate number of threads that are actively
* executing tasks.
*
* @return the number of threads
*/
default int activeCount() {
return executor().getActiveCount();
}
/**
* Returns the approximate total number of tasks that have
* completed execution. Because the states of tasks and threads
* may change dynamically during computation, the returned value
* is only an approximation, but one that does not ever decrease
* across successive calls.
*
* @return the number of tasks
*/
default long completedTasksTotal() {
return executor().getCompletedTaskCount();
}
/**
* Returns the approximate total number of tasks that have ever been
* scheduled for execution. Because the states of tasks and
* threads may change dynamically during computation, the returned
* value is only an approximation.
*
* @return the number of tasks
*/
default long enqueuedTasksTotal() {
return executor().getTaskCount();
}
/**
* Returns the approximate number of tasks that are current enqueued
* and waiting to be scheduled for execution.
*
* @return number of enqueued tasks.
*/
default long enqueuedTasksCurrent() {
return executor().getQueue().size();
}
/**
* Returns the {@link Stream stream} of currently enqueued tasks
* in an internal queue.
*
* @return number of enqueued tasks.
*/
default Stream<Runnable> enqueuedTasks() {
return executor().getQueue().stream();
}
}
若是你將此組件的接口保留全部ThreadPoolExecutor並使用executor方法提供,則你會自動獲取一些有關隊列的基本信息,這些信息可使用自定義REST監視器API或JMX進一步公開。這徹底取決於你的服務是內部服務(不暴露給外部世界)仍是你已經具備對應用程序的HTTP訪問權限。若是沒有,那麼根據應用程序的狀況和性質,JMX多是一種更好的方法。
讓咱們更深刻地查找更多信息。當前,咱們可以列出全部排隊的任務(未處理),並看到一些數字描述如何以及多少任務經過隊列。可是,咱們缺乏有關當前執行的任務的信息。咱們能夠在其上調用某種方法以獲取一些有用信息的確切對象。
/**
* A custom trackable thread pool which can keep and provide a currently running
* task and is able to execute {@link TrackableRunnable} which keeps useful
* information about the current execution.
* <p>
* This implementation follows configuration representing
* {@link Executors#newSingleThreadExecutor()}, the tracking will stop working
* with multiple workers, some additional changes needed to be done
* to support multiple workers.
*/
public class TrackableSingleThreadPoolExecutor extends ThreadPoolExecutor {
/*
* Task must be held as a volatile variable even in SingleThreadedExecutor.
* - A thread is destroyed and new one is recreated when an exception is thrown and caught.
*/
private volatile TrackableRunnable activeTask;
private TrackableSingleThreadPoolExecutor(ThreadFactory threadFactory) {
super(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), threadFactory);
}
@Override
protected void beforeExecute(Thread thread, Runnable runnable) {
if (!(runnable instanceof TrackableRunnable)) {
throw new IllegalArgumentException("Executed task must be an instance of "
+ TrackableRunnable.class.getSimpleName());
}
this.activeTask = (TrackableRunnable) runnable;
}
@Override
protected void afterExecute(Runnable runnable, Throwable thread) {
this.activeTask = null;
}
public TrackableRunnable getActiveTask() {
return activeTask;
}
/**
* Keeps a context with an executed runnable. We can track information
* about currently executed task.
*/
public static class TrackableRunnable implements Runnable {
private final Contextual context;
public TrackableRunnable(Contextual context) {
this.context = context;
}
@Override
public void run() {
// Some interesting computation.
}
public Contextual getContext() {
return context;
}
}
}
正如在JavaDoc中提到的那樣,此實現僅支持一個工做程序。 我認爲更改實現以返回保留一些上下文信息的活動任務列表並非一件難事。
你可使用兩種簡單的方法來發布它:
你只須要實現MBean並公開你要觀察的內容
啓動MBean服務器,使其可以經過如下方式與其鏈接: JVisualVM或其餘工具
僅在運行內部應用程序時使用,不然以某種方式保護端點可能頗有用:
[
{
"executor": "food-preparation",
"active": "spaghetti",
"enqueued-tasks-current": 0,
"enqueued-tasks-total": 6,
"completed-tasks-total": 6,
"enqueued-tasks": [
"pizza",
"caesar salad",
"cheerios"
]
},
{
"executor": "drink-preparation",
"active": "cuba libre",
"enqueued-tasks-current": 0,
"enqueued-tasks-total": 6,
"completed-tasks-total": 6,
"enqueued-tasks": [
"mojito",
"beer"
]
}
]
這是能夠幫助你在雲環境中從新啓動應用程序以前耗盡隊列的另外一種方法。一般,Kubernetes可以等待終止JVM並執行關閉鉤子
你只需配置ThreadPoolExecutor#shutdown()便可在關閉掛鉤中調用
Runtime.getRuntime().addShutdownHook(new Thread(executor::shutdown));
可是,你會遇到幾個問題:
終止能夠延遲更長的時間,尤爲是當你的無限制隊列中充滿任務時。
你須要確保你再也不接受任何任務,由於全部任務都將被執行者拒絕,而且當使用適當的RejectedExecutionHandler實現時,應指定執行者的行爲。
最好再保護一次任務(尤爲是重要任務)。 個人意思是實現一種機制,即不確認拒絕的消息,而後將其返回到外部隊列,例如,該消息能夠等待新的正常實例並隨後進行處理的外部隊列。當經過REST API調用咱們的應用程序並自動拒絕調用並丟失事務/任務時,可能會出現問題。
感謝閱讀!
另外近期整理了一套完整的java架構思惟導圖,分享給一樣正在認真學習的每位朋友~
更多JVM、Mysql、Tomcat、Spring Boot、Spring Cloud、Zookeeper、Kafka、RabbitMQ、RockerMQ、Redis、ELK、Git等Java學習資料和視頻課程乾貨歡迎私信我~