**xxchat系統之線程池實時監控方案(Jmx遠程後臺監控、暫停、啓用)

    前面有一篇文章中,博主爲你們介紹了**xxchat系統線程監控方案。今天博主爲你們分享的是**微xxxx系統三大隊列任務執行時所用到的線程池的不停服務器監控、暫停、啓用、更改參數等等操做。java

    固然,在大多數狀況下,小夥伴們可能都遇到的是容許線上停機改配置重啓的操做。可是,若是大家的業務有不少人在用,老闆要求不宕機的狀況下對隊列線程池進行動態的修改、重啓、監控,那麼此時咱們就須要使用一些技術(Jmx、http等)來遠程操控。博主今天主要分享的使用jmx來進行動態操做線程池的方案,在分享以前咱們首先得了解jmx這項技術是什麼?有什麼用途?在何時用?下面博主就開始帶你們一塊兒去領略jmx高級監控技術的世界。spring

    Jmx(Java Managerment Extensions,即"Java管理擴展"),是一個爲應用程序、設備、系統等植入管理功能的框架。Jmx能夠跨越一系列異構操做系統平臺、系統體系結構和網絡傳輸協議,靈活的開發無縫隙集成的系統、網絡和服務管理應用。 Jmx在Java編程語言中定義了應用程序以及網絡管理和監控的體系結構、設計模式、應用程序接口以及服務。一般使用Jmx來監控系統的運行狀態或管理系統的某些方面,好比清空緩存、從新加載配置文件等。apache

    此處,**系統主要使用Jmx來註冊自定義線程池組件,線程池組件上暴露了一系列的方法給Jmx;它們包括:線程池的中止,修改線程池的核心線程數、最大線程數、超時時間、隊列大小等等參數,重啓線程池等等方法。而後,客戶可使用Jmx管理工具(如:jconsole/ jvisualvm/程序Agent等等)鏈接上組件暴露的Jmx端口,而後調用其暴露的方法以實現具體的業務功能。編程

    如下是**項目的自定義線程池組件代碼:windows

1.Jmx組件接口CommonTaskManagerMBean設計模式

package com.xxxxxx.job.queue;
/**
 * 
 * 類CommonTaskManagerMBean.java的實現描述:jmx組件接口
 * @author arron 2018年x月2x日 下午x:x2:x9
 */
public interface CommonTaskManagerMBean {
    int getPageSize();

    void setPageSize(int pageSize);

    int getCorePoolSize();
    
    int getQueueSize();

    void setQueueSize(int queueSize);

    void setCorePoolSize(int corePoolSize);
    
    int getMaximumPoolSize() ;

    void setMaximumPoolSize(int maximumPoolSize);
    
    boolean isKill() ;
    
    public boolean isStop();
    /**
     * 馬上停掉
     * @return
     */
    String stopTaskNow();
    /**
     * 優雅停掉
     * @return
     */
    String stopTaskDely();
    /**
     * 開啓任務線程池
     * @return
     */
    String startTask();
}

2.Jmx組件接口CommonTaskManagerMBean的具體實現類,也是線程池組件實現類;       CommonTaskManager緩存

package com.xxxxxx.job.queue;

import java.lang.management.ManagementFactory;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import javax.management.MBeanServer;
import javax.management.ObjectName;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;


/**
 * 
 * 類CommonTaskManager.java的實現描述:通用線程池管理組件
 * 
 * @author arron 2018年x月2x日 下午x:x2:x9
 */
public class CommonTaskManager implements InitializingBean,CommonTaskManagerMBean {
    private static final Logger      logger        = LoggerFactory.getLogger(CommonTaskManager.class);
    private int                      corePoolSize=10;

    private int                      maximumPoolSize=20;
    
    private int                      pageSize=1000;
    
    private int                      queueSize=1000;
    //queue size 是否生效
    private boolean              iseffect=true;
    //第幾回設置queuesize
    private int                      opcount=0;
    //以前生效的queuesize大小
    private int                      oldQueueSize=0;
    //任務管理器名稱
    private String                 taskManagerName="未命名"; 
    private ThreadPoolExecutor executor=null;
    private volatile boolean kill=false;
   
    private volatile boolean stop=false;
    public boolean isKill() {
        return kill;
    }
    public void setKill(boolean kill) {
        this.kill = kill;
    }
    
    public boolean isStop() {
        return stop;
    }
    
    public String getTaskManagerName() {
        return taskManagerName;
    }

    public void setTaskManagerName(String taskManagerName) {
        if(StringUtils.isNotBlank(taskManagerName)){
            this.taskManagerName = taskManagerName;
        }else{
            logger.error("任務線程池管理器[{"+taskManagerName+"}]--setTaskManagerName()->設置線程池設置任務管理器名稱執行失敗,任務管理器名稱不能爲空");
        }
    }

    public int getPageSize() {
        return pageSize;
    }

    public void setPageSize(int pageSize) {
        if(pageSize>0){
            this.pageSize = pageSize;
        }else{
            logger.error("任務線程池管理器[{"+taskManagerName+"}]--setPageSize()->設置線程池分頁處理size數執行失敗,分頁處理size數必須大於0");
        }
    }
    
    public int getCorePoolSize() {
        return corePoolSize;
    }
    
    public int getQueueSize() {
        return queueSize;
    }

    public void setQueueSize(int queueSize) {
        if(queueSize>0){
            this.queueSize = queueSize;
        }else{
            logger.error("任務線程池管理器[{"+taskManagerName+"}]--setQueueSize()->設置線程池隊列size數執行失敗,隊列size數必須大於0");
        }
        opcount=opcount+1;
        if(opcount>1){
            iseffect=false;
        }
    }

    public void setCorePoolSize(int corePoolSize) {
        if(corePoolSize>=0){
            this.corePoolSize = corePoolSize;
            if(executor!=null&&!executor.isShutdown()){
                    try{
                        executor.setCorePoolSize(corePoolSize);
                    }catch(Exception e){
                        logger.error("任務線程池管理器[{"+taskManagerName+"}]--setCorePoolSize()->馬上設置線程池核心線程數執行失敗",e);
                    }               
            }
        }else{
            logger.error("任務線程池管理器[{"+taskManagerName+"}]--setCorePoolSize()->馬上設置線程池核心線程數執行失敗,最大線程數必須大於或者等於0");
        }
    }

    public int getMaximumPoolSize() {
        return maximumPoolSize;
    }

    public void setMaximumPoolSize(int maximumPoolSize) {
        if(maximumPoolSize>0){
            this.maximumPoolSize = maximumPoolSize;
            if(executor!=null&&!executor.isShutdown()){
                if(maximumPoolSize>0){
                    try{
                        executor.setMaximumPoolSize(maximumPoolSize);
                    }catch(Exception e){
                        logger.error("任務線程池管理器[{"+taskManagerName+"}]--setMaximumPoolSize()->馬上設置線程池最大線程數執行失敗",e);
                    }     
                }
            }
        }else{
            logger.error("任務線程池管理器[{"+taskManagerName+"}]--setMaximumPoolSize()->馬上設置線程池最大線程數執行失敗,最大線程數必須大於0");
        }
    }

   

    protected ThreadPoolExecutor getExecutor() {
        return executor;
    }


    /**
     * 當spring xml中定義的屬性初始化完成後,執行該方法
     */
    @Override
    public void afterPropertiesSet() throws Exception {
       logger.warn("任務線程池管理器[{"+taskManagerName+"}]--執行初始化消息管理組件CommonTaskManager: init-afterPropertiesSet");
        //初始化線程池
       executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 0L, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(queueSize));
       oldQueueSize=queueSize;
    }
    private ExecutorService pool;
    /**
     * 初始化加載上次未完成發送的消息繼續發送;
     */
    public void initMethod() {
        logger.warn("任務線程池管理器[{"+taskManagerName+"}]--執行初始化消息管理組件CommonTaskManager[{}]: init-method",taskManagerName);
        //註冊到jmx管理
        MBeanServer beanServer = ManagementFactory.getPlatformMBeanServer();  
        try {
            ObjectName objectName = new ObjectName("CommonTaskManager:type=CommonTaskManager-"+taskManagerName);  
            beanServer.registerMBean(this, objectName);  
            pool = Executors.newSingleThreadExecutor();
            //建立實現了Runnable接口對象,Thread對象固然也實現了Runnable接口
            pool.submit(new Runnable() {
                @Override
                public void run() {
                    while (true) {
                        try {
                            logger.warn("任務線程池管理器[{"+taskManagerName+"}]--任務管理器中參數詳情:corePoolSize:{},maximumPoolSize:{},pageSize:{},queueSize:{}",corePoolSize ,maximumPoolSize,pageSize,queueSize);
                            if(executor!=null&&!executor.isShutdown()){
                                if(!iseffect){
                                    logger.warn("任務線程池管理器[{"+taskManagerName+"}]--任務管理器中線程池的queueSize設置後還未重啓,當前處於未生效狀態,");
                                }
                                logger.warn("任務線程池管理器[{"+taskManagerName+"}]--任務管理器中線程池中參數詳情:corePoolSize:{},maximumPoolSize:{},pageSize:{},queueSize:{},queue隊列中任務數:{}",executor.getCorePoolSize() ,executor.getMaximumPoolSize(),pageSize,oldQueueSize,executor.getQueue().size());
                            }else{
                                logger.error("任務線程池管理器[{"+taskManagerName+"}]--任務管理器中線程池處於非工做狀態!");
                            }
                            //休眠5秒鐘
                            Thread.sleep(30000l);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                       
                    }
                }
            });
        } catch (Exception e) {
            logger.error("任務線程池管理器[{"+taskManagerName+"}]--initMethod()->馬上中止線程池執行失敗",e);
        }
    }

    /**
     * 執行銷燬線程池管理組件代碼;
     */
    public void destroyMethod() {
        logger.warn("任務線程池管理器[{"+taskManagerName+"}]--執行銷燬消息管理組件destroy-method");
        shutdownAndAwaitTermination(executor);
        pool.shutdownNow();
        logger.warn("任務線程池管理器[{"+taskManagerName+"}]--執行銷燬消息管理組件destroy-method...任務管理器完成銷燬動做...");
    }
    /**
     * 線程池暴力銷燬
     * @param pool
     */
    private void shutdownNow() {
        try{
            executor.shutdownNow();
            logger.warn("任務線程池管理器[{"+taskManagerName+"}]--shutdownNow()->馬上中止線程池執行成功");
        }catch(Exception e){
            logger.error("任務線程池管理器[{"+taskManagerName+"}]--shutdownNow()->馬上中止線程池執行失敗",e);
        }
        executor=null;
    }
    /**
     * 執行銷燬線程池管理組件代碼;
     */
    private void shuntdownDely() {
        logger.warn("任務線程池管理器[{"+taskManagerName+"}]--執行銷燬消息管理組件shuntdownDely-method");
        shutdownAndAwaitTermination(executor);
        logger.warn("任務線程池管理器[{"+taskManagerName+"}]--執行銷燬消息管理組件shuntdownDely-method...任務管理器完成銷燬動做...");
    }
    /**
     * 線程池優雅銷燬
     * 
     * @param pool
     */
    private void shutdownAndAwaitTermination(ExecutorService pool) {
        try {
            if(pool.isShutdown()){
                logger.warn("任務線程池管理器[{"+taskManagerName+"}]--pool has been shutdown");
            }else{
                //讓線程池沒法提交新的任務
                pool.shutdown(); // Disable new tasks from being submitted
                // Wait a while for existing tasks to terminate
                //等待以前的存在的任務執行60s,而後中止
                if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
                    //當即中止正在執行的任務
                    pool.shutdownNow(); // Cancel currently executing tasks
                    // Wait a while for tasks to respond to being cancelled
                    if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
                        logger.warn("任務線程池管理器[{"+taskManagerName+"}]--Pool did not terminate");
                    }
                }
            }
        } catch (InterruptedException ie) {
            // (Re-)Cancel if current thread also interrupted
            logger.error("任務線程池管理器[{"+taskManagerName+"}]-中止時發生異常",ie);
            try{
                pool.shutdownNow();
                // Preserve interrupt status
                //Thread.currentThread().interrupt();
            }catch(Exception e){
                logger.error("任務線程池管理器[{"+taskManagerName+"}]-中止時發生異常",e);
            }
           
        }
        pool=null;
    }
    /**
     * 發起任務請求 submit Callable task request
     * @param <V>
     */
    public <V> Future<V> submitCallableTaskRequest(Callable<V> callTask) {
        Future<V> result=null;
        try{
            result = submitTask(callTask);
            logger.warn("任務線程池管理器[{"+taskManagerName+"}]--submitCallableTaskRequest()->任務提交成功");
        }catch(Exception e){
            logger.error("任務線程池管理器[{"+taskManagerName+"}]--submitCallableTaskRequest()->任務提交失敗",e);
        }
        return result;
    }
    /**
     * 發起任務請求 submit Runnable task request
     * @param <V>
     */
    public void submitRunableTaskRequest(Runnable runTask) {
        try{
            excuteTask(runTask);
            logger.warn("任務線程池管理器[{"+taskManagerName+"}]--submitRunableTaskRequest()->任務提交成功");
        }catch(Exception e){
            logger.error("任務線程池管理器[{"+taskManagerName+"}]--submitRunableTaskRequest()->任務提交失敗",e);
        }
         
    }
    protected <V> Future<V> submitTask(Callable<V> callTask) {
         Future<V> rs= executor.submit(callTask);
         return rs;
    }
    protected void excuteTask(Runnable runTask) {
         executor.execute(runTask);
    }

    @Override
    public String stopTaskNow() {
        if(!this.kill){
            this.kill=true;
        }
        stop=true;
        shutdownNow();
        return  "success";
    }

    @Override
    public String stopTaskDely() {
        if(!this.kill){
            this.kill=true;
        }
        stop=true;
        shuntdownDely();
        return "success";
    }

    @Override
    public String startTask() {
        String rs="success";
        if(executor!=null&&!executor.isShutdown()){
            rs ="fail";
            logger.warn("任務線程池管理器[{"+taskManagerName+"}]--startTask()->線程池開啓失敗");
        }else{
            //初始化線程池
            executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 0L, TimeUnit.MILLISECONDS,
                    new ArrayBlockingQueue<Runnable>(queueSize));
            stop=false;
            iseffect=true;
            oldQueueSize=queueSize;
            logger.warn("任務線程池管理器[{"+taskManagerName+"}]--startTask()->線程池開啓成功");
        }
        return  rs;
    }
    
}

3.spring Jmx線程池組件配置文件(可定義多個實例,進行不一樣的操做)tomcat

<bean id="synMemberTaskManager" class="com.xxxxxx.job.queue.CommonTaskManager" init-method="initMethod" destroy-method="destroyMethod">
    	<property name="corePoolSize"  value="10" />
		<property name="maximumPoolSize" value="20" />
		<property name="pageSize" value="1000" />
		<property name="queueSize" value="1000"  />
		<property name="taskManagerName" value="member-taskManager"  />
</bean> 
<bean id="qrcodeTaskManager" class="com.xxxxxxx.job.queue.CommonTaskManager" init-method="initMethod" destroy-method="destroyMethod">
    	<property name="corePoolSize"  value="10" />
		<property name="maximumPoolSize" value="20" />
		<property name="pageSize" value="1000" />
		<property name="queueSize" value="1000"  />
		<property name="taskManagerName" value="qrcode-taskManager"  />
</bean> 
<bean id="templateMsgTaskManager" class="com.xxxxxx.job.queue.CommonTaskManager" init-method="initMethod" destroy-method="destroyMethod">
    	<property name="corePoolSize"  value="10" />
		<property name="maximumPoolSize" value="20" />
		<property name="pageSize" value="1000" />
		<property name="queueSize" value="1000"  />
		<property name="taskManagerName" value="templateMsg-taskManager"  />
</bean>

4.定時任務隊列中使用該線程池組件的案例SynMember 服務器

package com.xxxxxx.jobImpl;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import javax.annotation.Resource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import com.xxxxxx.job.queue.CommonTaskManager;
import com.xxxxxx.model.ThreadLastRuntime;
import com.xxxxxx.model.WccMemberCommon;
import com.xxxxxx.utils.HessianHelper;

/**
 * 定時同步中間表的信息到會員表
 * 
*/
@Lazy(false)
@Component
public class SynMember {
    private static final Logger logger = LoggerFactory.getLogger(SynMember.class);
    @Resource
    private CommonTaskManager synMemberTaskManager;
    private  static ArrayBlockingQueue queue=  new ArrayBlockingQueue(1);
    //初始化線程池
    //每1分鐘執行一次
    @Async
    @Scheduled(cron = "0 0/1 * * * ? ")
    public  void synMember(){
        //獲取中間表未處理的數據
        if(queue.size()>0){
            logger.warn("前一次任務未執行完成跳過本次任務");
            return;
        }
        synMemberTaskManager.setKill(false);
        if(synMemberTaskManager.isStop()){
            logger.warn("會員同步線程池爲啓動,跳過本次任務");
            return;
        }
        try{
            queue.add(new Object());
            updateLastThreaRunTime();
            Integer size=synMemberTaskManager.getPageSize();
            List<WccMemberCommon> memberCommons = HessianHelper.getAppService().findMemberCommon(size);
            boolean rssiz=memberCommons != null && memberCommons.size() > 0;
            logger.warn("會員同步中間表未處理的大小==="+memberCommons.size());
            while(rssiz){
                updateLastThreaRunTime();
                if(synMemberTaskManager.isKill()||synMemberTaskManager.isStop()){
                    logger.warn("synMember()-會員同步處理任務退出本次任務,");
                    queue.clear();
                    return ;
                }
                 List<Future<Boolean>>  futureList=new ArrayList<Future<Boolean>>();
                 logger.warn("synMember()-會員同步處理開始提交任務");
                 for(WccMemberCommon memberCommon:memberCommons){
                     if(synMemberTaskManager.isKill()||synMemberTaskManager.isStop()){
                         break ;
                     }
                     Future<Boolean>  future=synMemberTaskManager.submitCallableTaskRequest(new SynMemberCallableThread(memberCommon));
                     if(future!=null){
                         futureList.add( future);
                     }
                 }
                 updateLastThreaRunTime();
                 logger.warn("synMember()-會員同步處理提交任務-結束-開始獲取執行結果");
                 if(futureList.size()>0){
                     for(Future<Boolean>  item:futureList){
                         try {
                             if(synMemberTaskManager.isKill()||synMemberTaskManager.isStop()){
                                 logger.warn("synMember()-會員同步處理任務退出本次任務,");
                                 queue.clear();
                                 return ;
                             }
                             if(item!=null){
                                 if(item.isCancelled()){
                                     queue.clear();
                                     return;
                                 }
                                 if(item.isDone()){
                                     //若是已經完成,就不作任何事情
                                 }else{
                                     boolean r=item.get(30, TimeUnit.SECONDS);
                                 }
                             }
                          } catch (InterruptedException | ExecutionException e) {
                              e.printStackTrace();
                              logger.error("synMember--會員同步出現錯誤",e);
                          }catch (Exception e) {
                              logger.error("synMember--會員同步任務出現錯誤",e);
                          }
                     }
                 }else{
                     logger.warn("synMember()-會員同步處理提交任務成功數爲0,線程池拒絕了任務");
                     break;
                 }
                 updateLastThreaRunTime();
                 memberCommons = HessianHelper.getAppService().findMemberCommon(size);
                 logger.warn("synMember()-會員同步處理獲取執行結果結束");
                 logger.warn("synMember()-會員同步處理完成一輪,剩餘size大小"+memberCommons.size());
                 rssiz=memberCommons != null && memberCommons.size() > 0;
            }
            logger.warn("會員同步中間表未處理的大小==="+memberCommons.size()+",任務結束");
        }catch(Exception e){
            logger.error("會員同步中間表任務報錯",e);
        }
        queue.clear();
    }
    public  void updateLastThreaRunTime(){
        int rscount= HessianHelper.getAppService().updateByThreadType("memberSync");
        ThreadLastRuntime threadLastRuntime=new ThreadLastRuntime();
        threadLastRuntime.setThreadType("memberSync");
        threadLastRuntime.setLastThreadRuntime(new Date());
        if(rscount==0){
            HessianHelper.getAppService().save(threadLastRuntime);
        }else if (rscount>1){
            HessianHelper.getAppService().deleteByThreadType("memberSync");
            HessianHelper.getAppService().save(threadLastRuntime);
        }
    }
}

5.Jmx配置(此處服務器是windows server 2012,因此配置是在tomcat註冊文件中修改)網絡

-Dcom.sun.management.jmxremote
-Dcom.sun.management.jmxremote.port=333111
-Dcom.sun.management.jmxremote.ssl=false
-Dcom.sun.management.jmxremote.authenticate=false

經過註冊表修改
win+r   -->  regedit 打開註冊表【有道深刻理解jvm虛擬機】
打開 註冊表HKEY_LOCAL_MACHINE\SOFTWARE\Wow6432Node\Apache Software Foundation\Procrun 2.0\Tomcat6\Parameters\Java(路徑可能有一點點差異)中的Options。

6.項目組件啓動效果圖

 

7.Jmx鏈接效果圖(此處我就用本地截圖了)

 

最後,以上就是**項目系統監控操做線程池的全過程了,若是小夥伴們對Jmx這塊的知識感興趣,能夠聯繫博主或者留言。後期博主將推出Jmx的整個技術系列的教程,敬請期待。

相關文章
相關標籤/搜索