前兩天閱讀公司代碼看到了用JMX監控定時任務信息和狀態,JMX這個單詞感受很熟因而便去查閱了一下,並寫了監控線程池的Demojava
JMX(Java Management Extensions)
,監控管理框架,經過使用JMX
能夠監控和管理應用程序。JMX
最多見的場景是監控Java
程序的基本信息和運行狀況,任何Java
程序均可以開啓JMX
,而後使用JConsole
或Visual VM
進行預覽git
HTTP
鏈接、
RMI
鏈接、
SNMP
鏈接
MBean
,經過將
MBean
註冊到代理層實現
MBean
的管理,除了註冊
MBean
,還能夠註冊
Adapter
,代理層在應用中通常都是
MBeanService
應用中通常使用Standard MBean
比較多,因此這裏只介紹Standard MBean
,使用Standard MBean
須要知足必定的規則,規則以下:github
XXXXMBean
的格式,必須以MBean
結尾XXXXMBean
,則接口實現類必須爲MBean
,不然程序將報錯get
和set
方法表示監控指標是否可讀、可寫。好比getXXX()
抽象方法,則XXX
就是監控的指標,getXXX()
表示XXX
性能指標可讀,setXXX()
方法表示該監控指標可寫String
)和基本數據類型,不能夠是自定義類型,若是返回值爲自定義類型能夠選擇MXBean
線程池是線程的管理工具,經過使用線程池能夠複用線程下降資源消耗、提升響應速度、提升線程的可管理性。若是在系統中大量使用線程池,就必須對線程池進行監控方便出錯時定位問題。能夠經過線程池提供的參數進行監控,線程池提供的參數以下:bash
方法 | 含義 |
---|---|
getActiveCount | 線程池中正在執行任務的線程數量 |
getCompletedTaskCount | 線程池已完成的任務數量 |
getCorePoolSize | 線程池的核心線程數量 |
getLargestPoolSize | 線程池曾經建立過的最大線程數量 |
getMaximumPoolSize | 線程池的最大線程數量 |
getPoolSize | 線程池當前的線程數量 |
getTaskCount | 線程池須要執行的任務數量 |
介紹完JMX
及線程池之後,寫一個JMX
監控線程池的Demo
,總不能紙上談兵吧架構
定義線程池監控類:ThreadPoolMonitor.java
框架
public class ThreadPoolMonitor extends ThreadPoolExecutor {
private final Logger logger = LoggerFactory.getLogger(getClass());
/**
* ActiveCount
* */
int ac = 0;
/**
* 當前全部線程消耗的時間
* */
private AtomicLong totalCostTime = new AtomicLong();
/**
* 當前執行的線程總數
* */
private AtomicLong totalTasks = new AtomicLong();
/**
* 線程池名稱
*/
private String poolName;
/**
* 最短 執行時間
* */
private long minCostTime;
/**
* 最長執行時間
* */
private long maxCostTime;
/**
* 保存任務開始執行的時間
*/
private ThreadLocal<Long> startTime = new ThreadLocal<>();
public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), poolName);
}
public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory, String poolName) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
this.poolName = poolName;
}
public static ExecutorService newFixedThreadPool(int nThreads, String poolName) {
return new ThreadPoolMonitor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), poolName);
}
public static ExecutorService newCachedThreadPool(String poolName) {
return new ThreadPoolMonitor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), poolName);
}
public static ExecutorService newSingleThreadExecutor(String poolName) {
return new ThreadPoolMonitor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), poolName);
}
/**
* 線程池延遲關閉時(等待線程池裏的任務都執行完畢),統計線程池狀況
*/
@Override
public void shutdown() {
// 統計已執行任務、正在執行任務、未執行任務數量
logger.info("{} Going to shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}",
this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
super.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
// 統計已執行任務、正在執行任務、未執行任務數量
logger.info("{} Going to immediately shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}",
this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
return super.shutdownNow();
}
/**
* 任務執行以前,記錄任務開始時間
*/
@Override
protected void beforeExecute(Thread t, Runnable r) {
startTime.set(System.currentTimeMillis());
}
/**
* 任務執行以後,計算任務結束時間
*/
@Override
protected void afterExecute(Runnable r, Throwable t) {
long costTime = System.currentTimeMillis() - startTime.get();
startTime.remove(); //刪除,避免佔用太多內存
//設置最大最小執行時間
maxCostTime = maxCostTime > costTime ? maxCostTime : costTime;
if (totalTasks.get() == 0) {
minCostTime = costTime;
}
minCostTime = minCostTime < costTime ? minCostTime : costTime;
totalCostTime.addAndGet(costTime);
totalTasks.incrementAndGet();
logger.info("{}-pool-monitor: " +
"Duration: {} ms, PoolSize: {}, CorePoolSize: {}, ActiveCount: {}, " +
"Completed: {}, Task: {}, Queue: {}, LargestPoolSize: {}, " +
"MaximumPoolSize: {}, KeepAliveTime: {}, isShutdown: {}, isTerminated: {}",
this.poolName,
costTime, this.getPoolSize(), this.getCorePoolSize(), super.getActiveCount(),
this.getCompletedTaskCount(), this.getTaskCount(), this.getQueue().size(), this.getLargestPoolSize(),
this.getMaximumPoolSize(), this.getKeepAliveTime(TimeUnit.MILLISECONDS), this.isShutdown(), this.isTerminated());
}
public int getAc() {
return ac;
}
/**
* 線程平均耗時
*
* @return
* */
public float getAverageCostTime() {
return totalCostTime.get() / totalTasks.get();
}
/**
* 線程最大耗時
* */
public long getMaxCostTime() {
return maxCostTime;
}
/**
* 線程最小耗時
* */
public long getMinCostTime() {
return minCostTime;
}
/**
* 生成線程池所用的線程,改寫了線程池默認的線程工廠
*/
static class EventThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
/**
* 初始化線程工廠
*
* @param poolName 線程池名稱
*/
EventThreadFactory(String poolName) {
SecurityManager s = System.getSecurityManager();
group = Objects.nonNull(s) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = poolName + "-pool-" + poolNumber.getAndIncrement() + "-thread-";
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
}
複製代碼
經過繼承線程池來自定義線程池,並在構造函數中加入了poolName
標明是哪個線程池,同時重寫了beforeExecute
、afterExecute
、terminated
等方法,在beforeExecute
方法中記錄線程池執行的時間,在afterExecute
方法中計算線程執行的耗時、最大耗時、最小耗時、平均耗時。重寫線程池生成線程的方法,指定了生成的線程名dom
定義一個MBean
:ThreadPoolParamMBean.java
MBean
其實並非一個實體類而是一個接口,裏面定義了監控的指標ide
public interface ThreadPoolParamMBean {
/**
* 線程池中正在執行任務的線程數量
*
* @return
*/
int getActiveCount();
/**
* 線程池已完成的任務數量
*
* @return
*/
long getCompletedTaskCount();
/**
* 線程池的核心線程數量
*
* @return
*/
int getCorePoolSize();
/**
* 線程池曾經建立過的最大線程數量
*
* @return
*/
int getLargestPoolSize();
/**
* 線程池的最大線程數量
*
* @return
*/
int getMaximumPoolSize();
/**
* 線程池當前的線程數量
*
* @return
*/
int getPoolSize();
/**
* 線程池須要執行的任務數量
*
* @return
*/
long getTaskCount();
/**
* 線程最大耗時
*
* @return
* */
long getMaxCostTime();
/**
* 線程最小耗時
*
* @return
* */
long getMinCostTime();
/**
* 線程平均耗時
*
* @return
* */
float getAverageCostTime();
}
複製代碼
定義一個MBean
實現類:ThreadPoolParam.java
定義的是靜態MBean,因此接口實現類必須知足規定,即xxxMBean
,實現類爲xxx
函數
public class ThreadPoolParam implements ThreadPoolParamMBean {
private ThreadPoolMonitor threadPoolMonitor;
public ThreadPoolParam(ExecutorService es) {
this.threadPoolMonitor = (ThreadPoolMonitor) es;
}
/**
* 線程池中正在執行任務的線程數量
*
* @return
*/
@Override
public int getActiveCount() {
return threadPoolMonitor.getAc();
}
/**
* 線程池已完成的任務數量
*
* @return
*/
@Override
public long getCompletedTaskCount() {
return threadPoolMonitor.getCompletedTaskCount();
}
/**
* 線程池的核心線程數量
*
* @return
*/
@Override
public int getCorePoolSize() {
return threadPoolMonitor.getCorePoolSize();
}
/**
* 線程池曾經建立過的最大線程數量
*
* @return
*/
@Override
public int getLargestPoolSize() {
return threadPoolMonitor.getLargestPoolSize();
}
/**
* 線程池的最大線程數量
*
* @return
*/
@Override
public int getMaximumPoolSize() {
return threadPoolMonitor.getMaximumPoolSize();
}
/**
* 線程池當前的線程數量
*
* @return
*/
@Override
public int getPoolSize() {
return threadPoolMonitor.getPoolSize();
}
/**
* 線程池須要執行的任務數量
*
* @return
*/
@Override
public long getTaskCount() {
return threadPoolMonitor.getTaskCount();
}
/**
* 線程最大耗時
*
* @return
* */
@Override
public long getMaxCostTime() {
return threadPoolMonitor.getMaxCostTime();
}
/**
* 線程最小耗時
*
* @return
* */
@Override
public long getMinCostTime() {
return threadPoolMonitor.getMinCostTime();
}
/**
* 線程平均耗時
*
* @return
* */
@Override
public float getAverageCostTime() {
return threadPoolMonitor.getAverageCostTime();
}
}
複製代碼
監控的參數指標經過線程池獲得工具
測試類:Test.java
public class Test {
private static Random random = new Random();
public static void main(String[] args) throws MalformedObjectNameException, InterruptedException {
ExecutorService es1 = ThreadPoolMonitor.newCachedThreadPool("test-pool-1");
ThreadPoolParam threadPoolParam1 = new ThreadPoolParam(es1);
ExecutorService es2 = ThreadPoolMonitor.newCachedThreadPool("test-pool-2");
ThreadPoolParam threadPoolParam2 = new ThreadPoolParam(es2);
MBeanServerUtil.registerMBean(threadPoolParam1, new ObjectName("test-pool-1:type=threadPoolParam"));
MBeanServerUtil.registerMBean(threadPoolParam2, new ObjectName("test-pool-2:type=threadPoolParam"));
//http鏈接的方式查看監控任務
HtmlAdaptor.start();
executeTask(es1);
executeTask(es2);
Thread.sleep(1000 * 60 * 60);
}
private static void executeTask(ExecutorService es) {
new Thread(() -> {
for (int i = 0; i < 10; i++) {
int temp = i;
es.submit(() -> {
//隨機睡眠時間
try {
Thread.sleep(random.nextInt(60) * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(temp);
});
}
}).start();
}
}
複製代碼
說明:
MBeanServerUtil.registerMBean()
註冊監控的類HtmlAdaptor.start()
開啓HTTP
鏈接的方式查看監控任務啓動程序後打開http://localhost:8082/
以下圖:
點擊test-pool-1
下的type=threadPoolParam
經過刷新獲取線程池最新的監控指標 test-pool-1
和type=threadPoolParam
這些屬性是在ObjectName
中定義的屬性值
使用JMX
監控線程池只是JMX
一個功能,本篇文章只是學以至用,更多有關JMX以及線程池的內容能夠查閱其餘資料。文章如有錯誤歡迎指正
最後附:項目代碼,歡迎fork與star,【我都劃重點了就star一下】