前面有一篇文章中,博主爲你們介紹了**xxchat系統線程監控方案。今天博主爲你們分享的是**微xxxx系統三大隊列任務執行時所用到的線程池的不停服務器監控、暫停、啓用、更改參數等等操做。java
固然,在大多數狀況下,小夥伴們可能都遇到的是容許線上停機改配置重啓的操做。可是,若是大家的業務有不少人在用,老闆要求不宕機的狀況下對隊列線程池進行動態的修改、重啓、監控,那麼此時咱們就須要使用一些技術(Jmx、http等)來遠程操控。博主今天主要分享的使用jmx來進行動態操做線程池的方案,在分享以前咱們首先得了解jmx這項技術是什麼?有什麼用途?在何時用?下面博主就開始帶你們一塊兒去領略jmx高級監控技術的世界。spring
Jmx(Java Managerment Extensions,即"Java管理擴展"),是一個爲應用程序、設備、系統等植入管理功能的框架。Jmx能夠跨越一系列異構操做系統平臺、系統體系結構和網絡傳輸協議,靈活的開發無縫隙集成的系統、網絡和服務管理應用。 Jmx在Java編程語言中定義了應用程序以及網絡管理和監控的體系結構、設計模式、應用程序接口以及服務。一般使用Jmx來監控系統的運行狀態或管理系統的某些方面,好比清空緩存、從新加載配置文件等。apache
此處,**系統主要使用Jmx來註冊自定義線程池組件,線程池組件上暴露了一系列的方法給Jmx;它們包括:線程池的中止,修改線程池的核心線程數、最大線程數、超時時間、隊列大小等等參數,重啓線程池等等方法。而後,客戶可使用Jmx管理工具(如:jconsole/ jvisualvm/程序Agent等等)鏈接上組件暴露的Jmx端口,而後調用其暴露的方法以實現具體的業務功能。編程
如下是**項目的自定義線程池組件代碼:windows
1.Jmx組件接口CommonTaskManagerMBean設計模式
package com.xxxxxx.job.queue; /** * * 類CommonTaskManagerMBean.java的實現描述:jmx組件接口 * @author arron 2018年x月2x日 下午x:x2:x9 */ public interface CommonTaskManagerMBean { int getPageSize(); void setPageSize(int pageSize); int getCorePoolSize(); int getQueueSize(); void setQueueSize(int queueSize); void setCorePoolSize(int corePoolSize); int getMaximumPoolSize() ; void setMaximumPoolSize(int maximumPoolSize); boolean isKill() ; public boolean isStop(); /** * 馬上停掉 * @return */ String stopTaskNow(); /** * 優雅停掉 * @return */ String stopTaskDely(); /** * 開啓任務線程池 * @return */ String startTask(); }
2.Jmx組件接口CommonTaskManagerMBean的具體實現類,也是線程池組件實現類; CommonTaskManager緩存
package com.xxxxxx.job.queue; import java.lang.management.ManagementFactory; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.management.MBeanServer; import javax.management.ObjectName; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; /** * * 類CommonTaskManager.java的實現描述:通用線程池管理組件 * * @author arron 2018年x月2x日 下午x:x2:x9 */ public class CommonTaskManager implements InitializingBean,CommonTaskManagerMBean { private static final Logger logger = LoggerFactory.getLogger(CommonTaskManager.class); private int corePoolSize=10; private int maximumPoolSize=20; private int pageSize=1000; private int queueSize=1000; //queue size 是否生效 private boolean iseffect=true; //第幾回設置queuesize private int opcount=0; //以前生效的queuesize大小 private int oldQueueSize=0; //任務管理器名稱 private String taskManagerName="未命名"; private ThreadPoolExecutor executor=null; private volatile boolean kill=false; private volatile boolean stop=false; public boolean isKill() { return kill; } public void setKill(boolean kill) { this.kill = kill; } public boolean isStop() { return stop; } public String getTaskManagerName() { return taskManagerName; } public void setTaskManagerName(String taskManagerName) { if(StringUtils.isNotBlank(taskManagerName)){ this.taskManagerName = taskManagerName; }else{ logger.error("任務線程池管理器[{"+taskManagerName+"}]--setTaskManagerName()->設置線程池設置任務管理器名稱執行失敗,任務管理器名稱不能爲空"); } } public int getPageSize() { return pageSize; } public void setPageSize(int pageSize) { if(pageSize>0){ this.pageSize = pageSize; }else{ logger.error("任務線程池管理器[{"+taskManagerName+"}]--setPageSize()->設置線程池分頁處理size數執行失敗,分頁處理size數必須大於0"); } } public int getCorePoolSize() { return corePoolSize; } public int getQueueSize() { return queueSize; } public void setQueueSize(int queueSize) { if(queueSize>0){ this.queueSize = queueSize; }else{ logger.error("任務線程池管理器[{"+taskManagerName+"}]--setQueueSize()->設置線程池隊列size數執行失敗,隊列size數必須大於0"); } opcount=opcount+1; if(opcount>1){ iseffect=false; } } public void setCorePoolSize(int corePoolSize) { if(corePoolSize>=0){ this.corePoolSize = corePoolSize; if(executor!=null&&!executor.isShutdown()){ try{ executor.setCorePoolSize(corePoolSize); }catch(Exception e){ logger.error("任務線程池管理器[{"+taskManagerName+"}]--setCorePoolSize()->馬上設置線程池核心線程數執行失敗",e); } } }else{ logger.error("任務線程池管理器[{"+taskManagerName+"}]--setCorePoolSize()->馬上設置線程池核心線程數執行失敗,最大線程數必須大於或者等於0"); } } public int getMaximumPoolSize() { return maximumPoolSize; } public void setMaximumPoolSize(int maximumPoolSize) { if(maximumPoolSize>0){ this.maximumPoolSize = maximumPoolSize; if(executor!=null&&!executor.isShutdown()){ if(maximumPoolSize>0){ try{ executor.setMaximumPoolSize(maximumPoolSize); }catch(Exception e){ logger.error("任務線程池管理器[{"+taskManagerName+"}]--setMaximumPoolSize()->馬上設置線程池最大線程數執行失敗",e); } } } }else{ logger.error("任務線程池管理器[{"+taskManagerName+"}]--setMaximumPoolSize()->馬上設置線程池最大線程數執行失敗,最大線程數必須大於0"); } } protected ThreadPoolExecutor getExecutor() { return executor; } /** * 當spring xml中定義的屬性初始化完成後,執行該方法 */ @Override public void afterPropertiesSet() throws Exception { logger.warn("任務線程池管理器[{"+taskManagerName+"}]--執行初始化消息管理組件CommonTaskManager: init-afterPropertiesSet"); //初始化線程池 executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(queueSize)); oldQueueSize=queueSize; } private ExecutorService pool; /** * 初始化加載上次未完成發送的消息繼續發送; */ public void initMethod() { logger.warn("任務線程池管理器[{"+taskManagerName+"}]--執行初始化消息管理組件CommonTaskManager[{}]: init-method",taskManagerName); //註冊到jmx管理 MBeanServer beanServer = ManagementFactory.getPlatformMBeanServer(); try { ObjectName objectName = new ObjectName("CommonTaskManager:type=CommonTaskManager-"+taskManagerName); beanServer.registerMBean(this, objectName); pool = Executors.newSingleThreadExecutor(); //建立實現了Runnable接口對象,Thread對象固然也實現了Runnable接口 pool.submit(new Runnable() { @Override public void run() { while (true) { try { logger.warn("任務線程池管理器[{"+taskManagerName+"}]--任務管理器中參數詳情:corePoolSize:{},maximumPoolSize:{},pageSize:{},queueSize:{}",corePoolSize ,maximumPoolSize,pageSize,queueSize); if(executor!=null&&!executor.isShutdown()){ if(!iseffect){ logger.warn("任務線程池管理器[{"+taskManagerName+"}]--任務管理器中線程池的queueSize設置後還未重啓,當前處於未生效狀態,"); } logger.warn("任務線程池管理器[{"+taskManagerName+"}]--任務管理器中線程池中參數詳情:corePoolSize:{},maximumPoolSize:{},pageSize:{},queueSize:{},queue隊列中任務數:{}",executor.getCorePoolSize() ,executor.getMaximumPoolSize(),pageSize,oldQueueSize,executor.getQueue().size()); }else{ logger.error("任務線程池管理器[{"+taskManagerName+"}]--任務管理器中線程池處於非工做狀態!"); } //休眠5秒鐘 Thread.sleep(30000l); } catch (InterruptedException e) { e.printStackTrace(); } } } }); } catch (Exception e) { logger.error("任務線程池管理器[{"+taskManagerName+"}]--initMethod()->馬上中止線程池執行失敗",e); } } /** * 執行銷燬線程池管理組件代碼; */ public void destroyMethod() { logger.warn("任務線程池管理器[{"+taskManagerName+"}]--執行銷燬消息管理組件destroy-method"); shutdownAndAwaitTermination(executor); pool.shutdownNow(); logger.warn("任務線程池管理器[{"+taskManagerName+"}]--執行銷燬消息管理組件destroy-method...任務管理器完成銷燬動做..."); } /** * 線程池暴力銷燬 * @param pool */ private void shutdownNow() { try{ executor.shutdownNow(); logger.warn("任務線程池管理器[{"+taskManagerName+"}]--shutdownNow()->馬上中止線程池執行成功"); }catch(Exception e){ logger.error("任務線程池管理器[{"+taskManagerName+"}]--shutdownNow()->馬上中止線程池執行失敗",e); } executor=null; } /** * 執行銷燬線程池管理組件代碼; */ private void shuntdownDely() { logger.warn("任務線程池管理器[{"+taskManagerName+"}]--執行銷燬消息管理組件shuntdownDely-method"); shutdownAndAwaitTermination(executor); logger.warn("任務線程池管理器[{"+taskManagerName+"}]--執行銷燬消息管理組件shuntdownDely-method...任務管理器完成銷燬動做..."); } /** * 線程池優雅銷燬 * * @param pool */ private void shutdownAndAwaitTermination(ExecutorService pool) { try { if(pool.isShutdown()){ logger.warn("任務線程池管理器[{"+taskManagerName+"}]--pool has been shutdown"); }else{ //讓線程池沒法提交新的任務 pool.shutdown(); // Disable new tasks from being submitted // Wait a while for existing tasks to terminate //等待以前的存在的任務執行60s,而後中止 if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { //當即中止正在執行的任務 pool.shutdownNow(); // Cancel currently executing tasks // Wait a while for tasks to respond to being cancelled if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { logger.warn("任務線程池管理器[{"+taskManagerName+"}]--Pool did not terminate"); } } } } catch (InterruptedException ie) { // (Re-)Cancel if current thread also interrupted logger.error("任務線程池管理器[{"+taskManagerName+"}]-中止時發生異常",ie); try{ pool.shutdownNow(); // Preserve interrupt status //Thread.currentThread().interrupt(); }catch(Exception e){ logger.error("任務線程池管理器[{"+taskManagerName+"}]-中止時發生異常",e); } } pool=null; } /** * 發起任務請求 submit Callable task request * @param <V> */ public <V> Future<V> submitCallableTaskRequest(Callable<V> callTask) { Future<V> result=null; try{ result = submitTask(callTask); logger.warn("任務線程池管理器[{"+taskManagerName+"}]--submitCallableTaskRequest()->任務提交成功"); }catch(Exception e){ logger.error("任務線程池管理器[{"+taskManagerName+"}]--submitCallableTaskRequest()->任務提交失敗",e); } return result; } /** * 發起任務請求 submit Runnable task request * @param <V> */ public void submitRunableTaskRequest(Runnable runTask) { try{ excuteTask(runTask); logger.warn("任務線程池管理器[{"+taskManagerName+"}]--submitRunableTaskRequest()->任務提交成功"); }catch(Exception e){ logger.error("任務線程池管理器[{"+taskManagerName+"}]--submitRunableTaskRequest()->任務提交失敗",e); } } protected <V> Future<V> submitTask(Callable<V> callTask) { Future<V> rs= executor.submit(callTask); return rs; } protected void excuteTask(Runnable runTask) { executor.execute(runTask); } @Override public String stopTaskNow() { if(!this.kill){ this.kill=true; } stop=true; shutdownNow(); return "success"; } @Override public String stopTaskDely() { if(!this.kill){ this.kill=true; } stop=true; shuntdownDely(); return "success"; } @Override public String startTask() { String rs="success"; if(executor!=null&&!executor.isShutdown()){ rs ="fail"; logger.warn("任務線程池管理器[{"+taskManagerName+"}]--startTask()->線程池開啓失敗"); }else{ //初始化線程池 executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(queueSize)); stop=false; iseffect=true; oldQueueSize=queueSize; logger.warn("任務線程池管理器[{"+taskManagerName+"}]--startTask()->線程池開啓成功"); } return rs; } }
3.spring Jmx線程池組件配置文件(可定義多個實例,進行不一樣的操做)tomcat
<bean id="synMemberTaskManager" class="com.xxxxxx.job.queue.CommonTaskManager" init-method="initMethod" destroy-method="destroyMethod"> <property name="corePoolSize" value="10" /> <property name="maximumPoolSize" value="20" /> <property name="pageSize" value="1000" /> <property name="queueSize" value="1000" /> <property name="taskManagerName" value="member-taskManager" /> </bean> <bean id="qrcodeTaskManager" class="com.xxxxxxx.job.queue.CommonTaskManager" init-method="initMethod" destroy-method="destroyMethod"> <property name="corePoolSize" value="10" /> <property name="maximumPoolSize" value="20" /> <property name="pageSize" value="1000" /> <property name="queueSize" value="1000" /> <property name="taskManagerName" value="qrcode-taskManager" /> </bean> <bean id="templateMsgTaskManager" class="com.xxxxxx.job.queue.CommonTaskManager" init-method="initMethod" destroy-method="destroyMethod"> <property name="corePoolSize" value="10" /> <property name="maximumPoolSize" value="20" /> <property name="pageSize" value="1000" /> <property name="queueSize" value="1000" /> <property name="taskManagerName" value="templateMsg-taskManager" /> </bean>
4.定時任務隊列中使用該線程池組件的案例SynMember 服務器
package com.xxxxxx.jobImpl; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.annotation.Resource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import com.xxxxxx.job.queue.CommonTaskManager; import com.xxxxxx.model.ThreadLastRuntime; import com.xxxxxx.model.WccMemberCommon; import com.xxxxxx.utils.HessianHelper; /** * 定時同步中間表的信息到會員表 * */ @Lazy(false) @Component public class SynMember { private static final Logger logger = LoggerFactory.getLogger(SynMember.class); @Resource private CommonTaskManager synMemberTaskManager; private static ArrayBlockingQueue queue= new ArrayBlockingQueue(1); //初始化線程池 //每1分鐘執行一次 @Async @Scheduled(cron = "0 0/1 * * * ? ") public void synMember(){ //獲取中間表未處理的數據 if(queue.size()>0){ logger.warn("前一次任務未執行完成跳過本次任務"); return; } synMemberTaskManager.setKill(false); if(synMemberTaskManager.isStop()){ logger.warn("會員同步線程池爲啓動,跳過本次任務"); return; } try{ queue.add(new Object()); updateLastThreaRunTime(); Integer size=synMemberTaskManager.getPageSize(); List<WccMemberCommon> memberCommons = HessianHelper.getAppService().findMemberCommon(size); boolean rssiz=memberCommons != null && memberCommons.size() > 0; logger.warn("會員同步中間表未處理的大小==="+memberCommons.size()); while(rssiz){ updateLastThreaRunTime(); if(synMemberTaskManager.isKill()||synMemberTaskManager.isStop()){ logger.warn("synMember()-會員同步處理任務退出本次任務,"); queue.clear(); return ; } List<Future<Boolean>> futureList=new ArrayList<Future<Boolean>>(); logger.warn("synMember()-會員同步處理開始提交任務"); for(WccMemberCommon memberCommon:memberCommons){ if(synMemberTaskManager.isKill()||synMemberTaskManager.isStop()){ break ; } Future<Boolean> future=synMemberTaskManager.submitCallableTaskRequest(new SynMemberCallableThread(memberCommon)); if(future!=null){ futureList.add( future); } } updateLastThreaRunTime(); logger.warn("synMember()-會員同步處理提交任務-結束-開始獲取執行結果"); if(futureList.size()>0){ for(Future<Boolean> item:futureList){ try { if(synMemberTaskManager.isKill()||synMemberTaskManager.isStop()){ logger.warn("synMember()-會員同步處理任務退出本次任務,"); queue.clear(); return ; } if(item!=null){ if(item.isCancelled()){ queue.clear(); return; } if(item.isDone()){ //若是已經完成,就不作任何事情 }else{ boolean r=item.get(30, TimeUnit.SECONDS); } } } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); logger.error("synMember--會員同步出現錯誤",e); }catch (Exception e) { logger.error("synMember--會員同步任務出現錯誤",e); } } }else{ logger.warn("synMember()-會員同步處理提交任務成功數爲0,線程池拒絕了任務"); break; } updateLastThreaRunTime(); memberCommons = HessianHelper.getAppService().findMemberCommon(size); logger.warn("synMember()-會員同步處理獲取執行結果結束"); logger.warn("synMember()-會員同步處理完成一輪,剩餘size大小"+memberCommons.size()); rssiz=memberCommons != null && memberCommons.size() > 0; } logger.warn("會員同步中間表未處理的大小==="+memberCommons.size()+",任務結束"); }catch(Exception e){ logger.error("會員同步中間表任務報錯",e); } queue.clear(); } public void updateLastThreaRunTime(){ int rscount= HessianHelper.getAppService().updateByThreadType("memberSync"); ThreadLastRuntime threadLastRuntime=new ThreadLastRuntime(); threadLastRuntime.setThreadType("memberSync"); threadLastRuntime.setLastThreadRuntime(new Date()); if(rscount==0){ HessianHelper.getAppService().save(threadLastRuntime); }else if (rscount>1){ HessianHelper.getAppService().deleteByThreadType("memberSync"); HessianHelper.getAppService().save(threadLastRuntime); } } }
5.Jmx配置(此處服務器是windows server 2012,因此配置是在tomcat註冊文件中修改)網絡
-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=333111 -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false
經過註冊表修改
win+r --> regedit 打開註冊表【有道深刻理解jvm虛擬機】
打開 註冊表HKEY_LOCAL_MACHINE\SOFTWARE\Wow6432Node\Apache Software Foundation\Procrun 2.0\Tomcat6\Parameters\Java(路徑可能有一點點差異)中的Options。
6.項目組件啓動效果圖
7.Jmx鏈接效果圖(此處我就用本地截圖了)
最後,以上就是**項目系統監控操做線程池的全過程了,若是小夥伴們對Jmx這塊的知識感興趣,能夠聯繫博主或者留言。後期博主將推出Jmx的整個技術系列的教程,敬請期待。