JMX可視化監控線程池

前兩天閱讀公司代碼看到了用JMX監控定時任務信息和狀態,JMX這個單詞感受很熟因而便去查閱了一下,並寫了監控線程池的Demojava

經過閱讀本篇文章你將瞭解到:

  • JMX介紹
  • 線程池介紹
  • JMX監控線程池應用

什麼是JMX

JMX簡介

JMX(Java Management Extensions),監控管理框架,經過使用JMX能夠監控和管理應用程序。JMX最多見的場景是監控Java程序的基本信息和運行狀況,任何Java程序均可以開啓JMX,而後使用JConsoleVisual VM進行預覽git

JMX架構

總共分爲三層,分發層、代理層、設備層
分發層:根據不一樣的協議定義了對代理層進行各類操做的管理接口,簡單的來講是監控指標的查看方式,能夠是 HTTP鏈接、 RMI鏈接、 SNMP鏈接
代理層:管理 MBean,經過將 MBean註冊到代理層實現 MBean的管理,除了註冊 MBean,還能夠註冊 Adapter,代理層在應用中通常都是 MBeanService
設備層:監控指標抽象出的類,能夠分爲如下幾種:

  • Standard MBean
  • Dynamic MBean
  • Open MBean
  • Model MBean
  • MXBean

應用中通常使用Standard MBean比較多,因此這裏只介紹Standard MBean,使用Standard MBean須要知足必定的規則,規則以下:github

  • 定義一個接口,接口名必須爲XXXXMBean的格式,必須MBean結尾
  • 若是接口爲XXXXMBean,則接口實現類必須爲MBean,不然程序將報錯
  • 接口中經過getset方法表示監控指標是否可讀、可寫。好比getXXX()抽象方法,則XXX就是監控的指標,getXXX()表示XXX性能指標可讀,setXXX()方法表示該監控指標可寫
  • 參數和返回類型只能是簡單的引用類型(如String)和基本數據類型,不能夠是自定義類型,若是返回值爲自定義類型能夠選擇MXBean

線程池簡單介紹

線程池是線程的管理工具,經過使用線程池能夠複用線程下降資源消耗、提升響應速度、提升線程的可管理性。若是在系統中大量使用線程池,就必須對線程池進行監控方便出錯時定位問題。能夠經過線程池提供的參數進行監控,線程池提供的參數以下:bash

方法 含義
getActiveCount 線程池中正在執行任務的線程數量
getCompletedTaskCount 線程池已完成的任務數量
getCorePoolSize 線程池的核心線程數量
getLargestPoolSize 線程池曾經建立過的最大線程數量
getMaximumPoolSize 線程池的最大線程數量
getPoolSize 線程池當前的線程數量
getTaskCount 線程池須要執行的任務數量

應用

介紹完JMX及線程池之後,寫一個JMX監控線程池的Demo,總不能紙上談兵吧架構

  • 定義線程池監控類:ThreadPoolMonitor.java框架

    public class ThreadPoolMonitor extends ThreadPoolExecutor {
        private final Logger logger = LoggerFactory.getLogger(getClass());
    
        /**
         * ActiveCount
         * */
        int ac = 0;
    
        /**
         * 當前全部線程消耗的時間
         * */
        private AtomicLong totalCostTime = new AtomicLong();
    
        /**
         * 當前執行的線程總數
         * */
        private AtomicLong totalTasks = new AtomicLong();
    
        /**
         * 線程池名稱
         */
        private String poolName;
    
        /**
         * 最短 執行時間
         * */
        private long minCostTime;
    
        /**
         * 最長執行時間
         * */
        private long maxCostTime;
    
    
        /**
         * 保存任務開始執行的時間
         */
        private ThreadLocal<Long> startTime = new ThreadLocal<>();
    
        public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                               TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
              Executors.defaultThreadFactory(), poolName);
        }
    
        public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                               TimeUnit unit, BlockingQueue<Runnable> workQueue,
                               ThreadFactory threadFactory, String poolName) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
            this.poolName = poolName;
        }
    
        public static ExecutorService newFixedThreadPool(int nThreads, String poolName) {
            return new ThreadPoolMonitor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), poolName);
        }
    
        public static ExecutorService newCachedThreadPool(String poolName) {
            return new ThreadPoolMonitor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), poolName);
        }
    
        public static ExecutorService newSingleThreadExecutor(String poolName) {
            return new ThreadPoolMonitor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), poolName);
        }
    
        /**
         * 線程池延遲關閉時(等待線程池裏的任務都執行完畢),統計線程池狀況
         */
        @Override
        public void shutdown() {
            // 統計已執行任務、正在執行任務、未執行任務數量
            logger.info("{} Going to shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}",
              this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
            super.shutdown();
        }
    
        @Override
        public List<Runnable> shutdownNow() {
            // 統計已執行任務、正在執行任務、未執行任務數量
            logger.info("{} Going to immediately shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}",
              this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
            return super.shutdownNow();
        }
    
        /**
         * 任務執行以前,記錄任務開始時間
         */
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            startTime.set(System.currentTimeMillis());
        }
    
        /**
         * 任務執行以後,計算任務結束時間
         */
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            long costTime = System.currentTimeMillis() - startTime.get();
            startTime.remove();  //刪除,避免佔用太多內存
            //設置最大最小執行時間
            maxCostTime = maxCostTime > costTime ? maxCostTime : costTime;
            if (totalTasks.get() == 0) {
                minCostTime = costTime;
            }
            minCostTime = minCostTime < costTime ? minCostTime : costTime;
            totalCostTime.addAndGet(costTime);
            totalTasks.incrementAndGet();
    
            logger.info("{}-pool-monitor: " +
                          "Duration: {} ms, PoolSize: {}, CorePoolSize: {}, ActiveCount: {}, " +
                          "Completed: {}, Task: {}, Queue: {}, LargestPoolSize: {}, " +
                          "MaximumPoolSize: {}, KeepAliveTime: {}, isShutdown: {}, isTerminated: {}",
                    this.poolName,
                    costTime, this.getPoolSize(), this.getCorePoolSize(), super.getActiveCount(),
                    this.getCompletedTaskCount(), this.getTaskCount(), this.getQueue().size(), this.getLargestPoolSize(),
                    this.getMaximumPoolSize(), this.getKeepAliveTime(TimeUnit.MILLISECONDS), this.isShutdown(), this.isTerminated());
        }
    
        public int getAc() {
            return ac;
        }
    
        /**
         * 線程平均耗時
         *
         * @return
         * */
        public float getAverageCostTime() {
            return totalCostTime.get() / totalTasks.get();
        }
    
        /**
         * 線程最大耗時
         * */
        public long getMaxCostTime() {
            return maxCostTime;
        }
    
        /**
         * 線程最小耗時
         * */
        public long getMinCostTime() {
            return minCostTime;
        }
    
        /**
         * 生成線程池所用的線程,改寫了線程池默認的線程工廠
         */
        static class EventThreadFactory implements ThreadFactory {
            private static final AtomicInteger poolNumber = new AtomicInteger(1);
            private final ThreadGroup group;
            private final AtomicInteger threadNumber = new AtomicInteger(1);
            private final String namePrefix;
    
            /**
             * 初始化線程工廠
             *
             * @param poolName 線程池名稱
             */
            EventThreadFactory(String poolName) {
                SecurityManager s = System.getSecurityManager();
                group = Objects.nonNull(s) ? s.getThreadGroup() :   Thread.currentThread().getThreadGroup();
                namePrefix = poolName + "-pool-" + poolNumber.getAndIncrement() + "-thread-";
            }
    
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
                if (t.isDaemon())
                    t.setDaemon(false);
                if (t.getPriority() != Thread.NORM_PRIORITY)
                    t.setPriority(Thread.NORM_PRIORITY);
                return t;
            }
        }
    }
    複製代碼

    經過繼承線程池來自定義線程池,並在構造函數中加入了poolName標明是哪個線程池,同時重寫了beforeExecuteafterExecuteterminated等方法,在beforeExecute方法中記錄線程池執行的時間,在afterExecute方法中計算線程執行的耗時、最大耗時、最小耗時、平均耗時。重寫線程池生成線程的方法,指定了生成的線程名dom

  • 定義一個MBeanThreadPoolParamMBean.java
    MBean其實並非一個實體類而是一個接口,裏面定義了監控的指標ide

    public interface ThreadPoolParamMBean {
        /**
         * 線程池中正在執行任務的線程數量
         *
         * @return
         */
        int getActiveCount();
    
        /**
         * 線程池已完成的任務數量
         *
         * @return
         */
        long getCompletedTaskCount();
    
        /**
         * 線程池的核心線程數量
         *
         * @return
         */
        int getCorePoolSize();
    
        /**
         * 線程池曾經建立過的最大線程數量
         *
         * @return
         */
        int getLargestPoolSize();
    
        /**
         * 線程池的最大線程數量
         *
         * @return
         */
        int getMaximumPoolSize();
    
        /**
         * 線程池當前的線程數量
         *
         * @return
         */
        int getPoolSize();
    
        /**
         * 線程池須要執行的任務數量
         *
         * @return
         */
        long getTaskCount();
    
        /**
         * 線程最大耗時
         *
         * @return
         * */
        long getMaxCostTime();
    
        /**
         * 線程最小耗時
         *
         * @return
         * */
        long getMinCostTime();
    
        /**
         * 線程平均耗時
         *
         * @return
         * */
        float getAverageCostTime();
    }
    複製代碼
  • 定義一個MBean實現類:ThreadPoolParam.java
    定義的是靜態MBean,因此接口實現類必須知足規定,即xxxMBean,實現類爲xxx函數

    public class ThreadPoolParam implements ThreadPoolParamMBean  {
        private ThreadPoolMonitor threadPoolMonitor;
    
        public ThreadPoolParam(ExecutorService es) {
            this.threadPoolMonitor = (ThreadPoolMonitor) es;
        }
    
        /**
         * 線程池中正在執行任務的線程數量
         *
         * @return
         */
        @Override
        public int getActiveCount() {
            return threadPoolMonitor.getAc();
        }
    
        /**
         * 線程池已完成的任務數量
         *
         * @return
         */
        @Override
        public long getCompletedTaskCount() {
            return threadPoolMonitor.getCompletedTaskCount();
        }
    
        /**
         * 線程池的核心線程數量
         *
         * @return
         */
        @Override
        public int getCorePoolSize() {
            return threadPoolMonitor.getCorePoolSize();
        }
    
        /**
         * 線程池曾經建立過的最大線程數量
         *
         * @return
         */
        @Override
        public int getLargestPoolSize() {
            return threadPoolMonitor.getLargestPoolSize();
        }
    
        /**
         * 線程池的最大線程數量
         *
         * @return
         */
        @Override
        public int getMaximumPoolSize() {
            return threadPoolMonitor.getMaximumPoolSize();
        }
    
        /**
         * 線程池當前的線程數量
         *
         * @return
         */
        @Override
        public int getPoolSize() {
            return threadPoolMonitor.getPoolSize();
        }
    
        /**
         * 線程池須要執行的任務數量
         *
         * @return
         */
        @Override
        public long getTaskCount() {
            return threadPoolMonitor.getTaskCount();
        }
    
        /**
         * 線程最大耗時
         *
         * @return
         * */
        @Override
        public long getMaxCostTime() {
            return threadPoolMonitor.getMaxCostTime();
        }
    
        /**
         * 線程最小耗時
         *
         * @return
         * */
        @Override
        public long getMinCostTime() {
            return threadPoolMonitor.getMinCostTime();
        }
    
        /**
         * 線程平均耗時
         *
         * @return
         * */
        @Override
        public float getAverageCostTime() {
            return threadPoolMonitor.getAverageCostTime();
        }
    }
    複製代碼

    監控的參數指標經過線程池獲得工具

  • 測試類:Test.java

    public class Test {
        private static Random random = new Random();
        public static void main(String[] args) throws MalformedObjectNameException,  InterruptedException {
            ExecutorService es1 = ThreadPoolMonitor.newCachedThreadPool("test-pool-1");
            ThreadPoolParam threadPoolParam1 = new ThreadPoolParam(es1);
    
            ExecutorService es2 = ThreadPoolMonitor.newCachedThreadPool("test-pool-2");
            ThreadPoolParam threadPoolParam2 = new ThreadPoolParam(es2);
    
            MBeanServerUtil.registerMBean(threadPoolParam1, new ObjectName("test-pool-1:type=threadPoolParam"));
            MBeanServerUtil.registerMBean(threadPoolParam2, new ObjectName("test-pool-2:type=threadPoolParam"));
    
            //http鏈接的方式查看監控任務
            HtmlAdaptor.start();
    
            executeTask(es1);
            executeTask(es2);
            Thread.sleep(1000 * 60 * 60);
        }
    
        private static void executeTask(ExecutorService es) {
            new Thread(() -> {
                for (int i = 0; i < 10; i++) {
                    int temp = i;
                    es.submit(() -> {
                        //隨機睡眠時間
                        try {
                            Thread.sleep(random.nextInt(60) * 1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println(temp);
                    });
                }
            }).start();
        }
    }
    複製代碼

    說明:

    • MBeanServerUtil.registerMBean()註冊監控的類
    • HtmlAdaptor.start()開啓HTTP鏈接的方式查看監控任務

    啓動程序後打開http://localhost:8082/以下圖:

點擊test-pool-1下的type=threadPoolParam

經過刷新獲取線程池最新的監控指標 test-pool-1type=threadPoolParam這些屬性是在ObjectName中定義的屬性值

總結

使用JMX監控線程池只是JMX一個功能,本篇文章只是學以至用,更多有關JMX以及線程池的內容能夠查閱其餘資料。文章如有錯誤歡迎指正

最後附:項目代碼,歡迎forkstar,【我都劃重點了就star一下】

相關文章
相關標籤/搜索