以前公司由於使用線程池習慣很差,致使線程池負載負載太高。觸發了拒絕策略,致使大量任務丟失。而並無對這個狀況進行監控,致使業務出現故障以後才發現拋出了拒絕異常。因此有必要對大量使用線程池的項目進行監控,而且最好能在不停機的狀況下對線程池的參數進行修改,由此咱們能夠用線程池的hook方法去對線程池的狀態進行埋點,而且經過Actuator作可視化監控,自定義Endpoint去修改線程池內部參數,實現能夠動態修改線程池參數。html
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency>
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Date; import java.util.List; import java.util.Objects; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; /** * 繼承ThreadPoolExecutor類,覆蓋了shutdown(), shutdownNow(), beforeExecute() 和 afterExecute() * 方法來統計線程池的執行狀況 */ public class ThreadPoolMonitor extends ThreadPoolExecutor { private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolMonitor.class); /** * 保存任務開始執行的時間,當任務結束時,用任務結束時間減去開始時間計算任務執行時間 */ private final ConcurrentHashMap<String, Date> startTimes; /** * 線程池名稱,通常以業務名稱命名,方便區分 */ private final String poolName; private long totalDiff; /** * 調用父類的構造方法,並初始化HashMap和線程池名稱 * * @param corePoolSize 線程池核心線程數 * @param maximumPoolSize 線程池最大線程數 * @param keepAliveTime 線程的最大空閒時間 * @param unit 空閒時間的單位 * @param workQueue 保存被提交任務的隊列 * @param poolName 線程池名稱 */ public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new EventThreadFactory(poolName), poolName); } /** * 調用父類的構造方法,並初始化HashMap和線程池名稱 * * @param corePoolSize 線程池核心線程數 * @param maximumPoolSize 線程池最大線程數 * @param keepAliveTime 線程的最大空閒時間 * @param unit 空閒時間的單位 * @param workQueue 保存被提交任務的隊列 * @param threadFactory 線程工廠 * @param poolName 線程池名稱 */ public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, String poolName) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); this.startTimes = new ConcurrentHashMap<>(); this.poolName = poolName; } /** * 線程池延遲關閉時(等待線程池裏的任務都執行完畢),統計線程池狀況 */ @Override public void shutdown() { // 統計已執行任務、正在執行任務、未執行任務數量 LOGGER.info("{} Going to shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}", this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size()); super.shutdown(); } /** * 線程池當即關閉時,統計線程池狀況 */ @Override public List<Runnable> shutdownNow() { // 統計已執行任務、正在執行任務、未執行任務數量 LOGGER.info("{} Going to immediately shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}", this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size()); return super.shutdownNow(); } /** * 任務執行以前,記錄任務開始時間 */ @Override protected void beforeExecute(Thread t, Runnable r) { startTimes.put(String.valueOf(r.hashCode()), new Date()); } /** * 任務執行以後,計算任務結束時間 */ @Override protected void afterExecute(Runnable r, Throwable t) { Date startDate = startTimes.remove(String.valueOf(r.hashCode())); Date finishDate = new Date(); long diff = finishDate.getTime() - startDate.getTime(); totalDiff += diff; // 統計任務耗時、初始線程數、核心線程數、正在執行的任務數量、 // 已完成任務數量、任務總數、隊列裏緩存的任務數量、池中存在的最大線程數、 // 最大容許的線程數、線程空閒時間、線程池是否關閉、線程池是否終止 LOGGER.info("{}-pool-monitor: " + "Duration: {} ms, PoolSize: {}, CorePoolSize: {}, Active: {}, " + "Completed: {}, Task: {}, Queue: {}, LargestPoolSize: {}, " + "MaximumPoolSize: {}, KeepAliveTime: {}, isShutdown: {}, isTerminated: {}", this.poolName, diff, this.getPoolSize(), this.getCorePoolSize(), this.getActiveCount(), this.getCompletedTaskCount(), this.getTaskCount(), this.getQueue().size(), this.getLargestPoolSize(), this.getMaximumPoolSize(), this.getKeepAliveTime(TimeUnit.MILLISECONDS), this.isShutdown(), this.isTerminated()); } /** * 生成線程池所用的線程,只是改寫了線程池默認的線程工廠,傳入線程池名稱,便於問題追蹤 */ static class EventThreadFactory implements ThreadFactory { private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; /** * 初始化線程工廠 * * @param poolName 線程池名稱 */ EventThreadFactory(String poolName) { SecurityManager s = System.getSecurityManager(); group = Objects.nonNull(s) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = poolName + "-pool-" + POOL_NUMBER.getAndIncrement() + "-thread-"; } @Override public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) { t.setDaemon(false); } if (t.getPriority() != Thread.NORM_PRIORITY) { t.setPriority(Thread.NORM_PRIORITY); } return t; } } public long getTotalDiff() { return totalDiff; } }
這裏咱們直接修改LinkedBlockingQueue的代碼,把capacity去掉final,變成一個可變參數。再新增get和set方法。java
/** * The type Resizeable blocking queue. * * @param <E> the type parameter */ public class ResizeableBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = -1232131234709194L; /* * 基於LinkedBlockingQueue 實現的一個可變隊列容量的阻塞隊列 * * */ /** * The type Node. * * @param <E> the type parameter */ static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } } private int capacity; private final AtomicInteger count = new AtomicInteger(); transient Node<E> head; private transient Node<E> last; private final ReentrantLock takeLock = new ReentrantLock(); private final Condition notEmpty = takeLock.newCondition(); private final ReentrantLock putLock = new ReentrantLock(); private final Condition notFull = putLock.newCondition(); /** * Gets capacity. * * @return the capacity */ public int getCapacity() { return capacity; } /** * Sets capacity. * * @param capacity the capacity */ public void setCapacity(int capacity) { this.capacity = capacity; } private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } } private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } } private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; } private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; } /** * Fully lock. */ void fullyLock() { putLock.lock(); takeLock.lock(); } /** * Fully unlock. */ void fullyUnlock() { takeLock.unlock(); putLock.unlock(); } /** * Instantiates a new Resizeable blocking queue. */ public ResizeableBlockingQueue() { this(Integer.MAX_VALUE); } /** * Instantiates a new Resizeable blocking queue. * * @param capacity the capacity */ public ResizeableBlockingQueue(int capacity) { if (capacity <= 0) { throw new IllegalArgumentException(); } this.capacity = capacity; last = head = new Node<E>(null); } /** * Instantiates a new Resizeable blocking queue. * * @param c the c */ public ResizeableBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) { throw new NullPointerException(); } if (n == capacity) { throw new IllegalStateException("Queue full"); } enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } } // this doc comment is overridden to remove the reference to collections // greater in size than Integer.MAX_VALUE @Override public int size() { return count.get(); } // this doc comment is a modified copy of the inherited doc comment, // without the reference to unlimited queues. @Override public int remainingCapacity() { return capacity - count.get(); } @Override public void put(E e) throws InterruptedException { if (e == null) { throw new NullPointerException(); } // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) { notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) { signalNotEmpty(); } } @Override public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) { throw new NullPointerException(); } long nanos = unit.toNanos(timeout); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { if (nanos <= 0) { return false; } nanos = notFull.awaitNanos(nanos); } enqueue(new Node<E>(e)); c = count.getAndIncrement(); if (c + 1 < capacity) { notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) { signalNotEmpty(); } return true; } @Override public boolean offer(E e) { if (e == null) { throw new NullPointerException(); } final AtomicInteger count = this.count; if (count.get() == capacity) { return false; } int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) { notFull.signal(); } } } finally { putLock.unlock(); } if (c == 0) { signalNotEmpty(); } return c >= 0; } @Override public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) { notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) { signalNotFull(); } return x; } @Override public E poll(long timeout, TimeUnit unit) throws InterruptedException { E x = null; int c = -1; long nanos = unit.toNanos(timeout); final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { if (nanos <= 0) { return null; } nanos = notEmpty.awaitNanos(nanos); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) { notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) { signalNotFull(); } return x; } @Override public E poll() { final AtomicInteger count = this.count; if (count.get() == 0) { return null; } E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { if (count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); if (c > 1) { notEmpty.signal(); } } } finally { takeLock.unlock(); } if (c == capacity) { signalNotFull(); } return x; } @Override public E peek() { if (count.get() == 0) { return null; } final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { Node<E> first = head.next; if (first == null) { return null; } else { return first.item; } } finally { takeLock.unlock(); } } void unlink(Node<E> p, Node<E> trail) { // assert isFullyLocked(); // p.next is not changed, to allow iterators that are // traversing p to maintain their weak-consistency guarantee. p.item = null; trail.next = p.next; if (last == p) { last = trail; } if (count.getAndDecrement() == capacity) { notFull.signal(); } } @Override public boolean remove(Object o) { if (o == null) { return false; } fullyLock(); try { for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { if (o.equals(p.item)) { unlink(p, trail); return true; } } return false; } finally { fullyUnlock(); } } @Override public boolean contains(Object o) { if (o == null) { return false; } fullyLock(); try { for (Node<E> p = head.next; p != null; p = p.next) { if (o.equals(p.item)) { return true; } } return false; } finally { fullyUnlock(); } } @Override public Object[] toArray() { fullyLock(); try { int size = count.get(); Object[] a = new Object[size]; int k = 0; for (Node<E> p = head.next; p != null; p = p.next) { a[k++] = p.item; } return a; } finally { fullyUnlock(); } } @Override @SuppressWarnings("unchecked") public <T> T[] toArray(T[] a) { fullyLock(); try { int size = count.get(); if (a.length < size) { a = (T[])java.lang.reflect.Array.newInstance (a.getClass().getComponentType(), size); } int k = 0; for (Node<E> p = head.next; p != null; p = p.next) { a[k++] = (T)p.item; } if (a.length > k) { a[k] = null; } return a; } finally { fullyUnlock(); } } @Override public String toString() { fullyLock(); try { Node<E> p = head.next; if (p == null) { return "[]"; } StringBuilder sb = new StringBuilder(); sb.append('['); for (;;) { E e = p.item; sb.append(e == this ? "(this Collection)" : e); p = p.next; if (p == null) { return sb.append(']').toString(); } sb.append(',').append(' '); } } finally { fullyUnlock(); } } @Override public void clear() { fullyLock(); try { for (Node<E> p, h = head; (p = h.next) != null; h = p) { h.next = h; p.item = null; } head = last; // assert head.item == null && head.next == null; if (count.getAndSet(0) == capacity) { notFull.signal(); } } finally { fullyUnlock(); } } @Override public int drainTo(Collection<? super E> c) { return drainTo(c, Integer.MAX_VALUE); } @Override public int drainTo(Collection<? super E> c, int maxElements) { if (c == null) { throw new NullPointerException(); } if (c == this) { throw new IllegalArgumentException(); } if (maxElements <= 0) { return 0; } boolean signalNotFull = false; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { int n = Math.min(maxElements, count.get()); // count.get provides visibility to first n Nodes Node<E> h = head; int i = 0; try { while (i < n) { Node<E> p = h.next; c.add(p.item); p.item = null; h.next = h; h = p; ++i; } return n; } finally { // Restore invariants even if c.add() threw if (i > 0) { // assert h.item == null; head = h; signalNotFull = (count.getAndAdd(-i) == capacity); } } } finally { takeLock.unlock(); if (signalNotFull) { signalNotFull(); } } } @Override public Iterator<E> iterator() { return new Itr(); } private class Itr implements Iterator<E> { /* * Basic weakly-consistent iterator. At all times hold the next * item to hand out so that if hasNext() reports true, we will * still have it to return even if lost race with a take etc. */ private Node<E> current; private Node<E> lastRet; private E currentElement; Itr() { fullyLock(); try { current = head.next; if (current != null) { currentElement = current.item; } } finally { fullyUnlock(); } } @Override public boolean hasNext() { return current != null; } private Node<E> nextNode(Node<E> p) { for (;;) { Node<E> s = p.next; if (s == p) { return head.next; } if (s == null || s.item != null) { return s; } p = s; } } @Override public E next() { fullyLock(); try { if (current == null) { throw new NoSuchElementException(); } E x = currentElement; lastRet = current; current = nextNode(current); currentElement = (current == null) ? null : current.item; return x; } finally { fullyUnlock(); } } @Override public void remove() { if (lastRet == null) { throw new IllegalStateException(); } fullyLock(); try { Node<E> node = lastRet; lastRet = null; for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { if (p == node) { unlink(p, trail); break; } } } finally { fullyUnlock(); } } } /** * The type Lbq spliterator. * * @param <E> the type parameter */ static final class LBQSpliterator<E> implements Spliterator<E> { static final int MAX_BATCH = 1 << 25; // max batch array size; final ResizeableBlockingQueue<E> queue; Node<E> current; // current node; null until initialized int batch; // batch size for splits boolean exhausted; // true when no more nodes long est; // size estimate LBQSpliterator(ResizeableBlockingQueue<E> queue) { this.queue = queue; this.est = queue.size(); } @Override public long estimateSize() { return est; } @Override public Spliterator<E> trySplit() { Node<E> h; final ResizeableBlockingQueue<E> q = this.queue; int b = batch; int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1; if (!exhausted && ((h = current) != null || (h = q.head.next) != null) && h.next != null) { Object[] a = new Object[n]; int i = 0; Node<E> p = current; q.fullyLock(); try { if (p != null || (p = q.head.next) != null) { do { if ((a[i] = p.item) != null) { ++i; } } while ((p = p.next) != null && i < n); } } finally { q.fullyUnlock(); } if ((current = p) == null) { est = 0L; exhausted = true; } else if ((est -= i) < 0L) { est = 0L; } if (i > 0) { batch = i; return Spliterators.spliterator (a, 0, i, Spliterator.ORDERED | Spliterator.NONNULL | Spliterator.CONCURRENT); } } return null; } @Override public void forEachRemaining(Consumer<? super E> action) { if (action == null) { throw new NullPointerException(); } final ResizeableBlockingQueue<E> q = this.queue; if (!exhausted) { exhausted = true; Node<E> p = current; do { E e = null; q.fullyLock(); try { if (p == null) { p = q.head.next; } while (p != null) { e = p.item; p = p.next; if (e != null) { break; } } } finally { q.fullyUnlock(); } if (e != null) { action.accept(e); } } while (p != null); } } @Override public boolean tryAdvance(Consumer<? super E> action) { if (action == null) { throw new NullPointerException(); } final ResizeableBlockingQueue<E> q = this.queue; if (!exhausted) { E e = null; q.fullyLock(); try { if (current == null) { current = q.head.next; } while (current != null) { e = current.item; current = current.next; if (e != null) { break; } } } finally { q.fullyUnlock(); } if (current == null) { exhausted = true; } if (e != null) { action.accept(e); return true; } } return false; } @Override public int characteristics() { return Spliterator.ORDERED | Spliterator.NONNULL | Spliterator.CONCURRENT; } } public Spliterator<E> spliterator() { return new LBQSpliterator<E>(this); } private void writeObject(java.io.ObjectOutputStream s) throws java.io.IOException { fullyLock(); try { // Write out any hidden stuff, plus capacity s.defaultWriteObject(); // Write out all elements in the proper order. for (Node<E> p = head.next; p != null; p = p.next) { s.writeObject(p.item); } // Use trailing null as sentinel s.writeObject(null); } finally { fullyUnlock(); } } private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { // Read in capacity, and any hidden stuff s.defaultReadObject(); count.set(0); last = head = new Node<E>(null); // Read in all elements and place in queue for (;;) { @SuppressWarnings("unchecked") E item = (E)s.readObject(); if (item == null) { break; } add(item); } } }
編寫線程池工具類,經過Util去建立線程池,而且用HashMap去指向建立的線程池,以後能夠經過這個HashMap去獲取線程池。node
/** * The type Thread pool util. * 線程池工具類 */ @Component public class ThreadPoolUtil { /** * 經過Hash去指向建立的線程池,以後能夠經過這個HashMap去獲取線程池 */ private final HashMap<String, ThreadPoolMonitor> threadPoolExecutorHashMap = new HashMap<>(); /** * Creat thread pool thread pool monitor. * * 能夠自定義隊列類型的構造器 * * @param corePoolSize the core pool size * @param maximumPoolSize the maximum pool size * @param keepAliveTime the keep alive time * @param unit the unit * @param workQueue the work queue * @param poolName the pool name * @return the thread pool monitor */ public ThreadPoolMonitor creatThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName) { ThreadPoolMonitor threadPoolExecutor = new ThreadPoolMonitor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, poolName); threadPoolExecutorHashMap.put(poolName, threadPoolExecutor); return threadPoolExecutor; } /** * Creat thread pool thread pool monitor. * * ResizeableBlockingQueue 裏面修改了capacity參數 * 能夠經過set方法去修改隊列的大小 * 使用默認隊列的構造器 * * @param corePoolSize the core pool size * @param maximumPoolSize the maximum pool size * @param keepAliveTime the keep alive time * @param unit the unit * @param queueSize the queue size * @param poolName the pool name * @return the thread pool monitor */ public ThreadPoolMonitor creatThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, int queueSize, String poolName) { ThreadPoolMonitor threadPoolExecutor = new ThreadPoolMonitor(corePoolSize, maximumPoolSize, keepAliveTime, unit, new ResizeableBlockingQueue<>(queueSize), poolName); threadPoolExecutorHashMap.put(poolName, threadPoolExecutor); return threadPoolExecutor; } /** * Gets thread pool executor hash map. * * @return the thread pool executor hash map */ public HashMap<String, ThreadPoolMonitor> getThreadPoolExecutorHashMap() { return threadPoolExecutorHashMap; } }
實現線程池信息的實體類用來EndPoint返回數據git
ThreadPoolDetailInfo.javagithub
/** * The type Thread pool detail info. */ public class ThreadPoolDetailInfo { private String threadPoolName; private Integer poolSize; private Integer corePoolSize; private Integer largestPoolSize; private Integer maximumPoolSize; private long completedTaskCount; private Integer active; private long task; private long keepAliveTime; private String activePercent; private Integer queueCapacity; private Integer queueSize; private long avgDiff; /** * Instantiates a new Thread pool detail info. * * @param threadPoolName the thread pool name * @param poolSize the pool size * @param corePoolSize the core pool size * @param largestPoolSize the largest pool size * @param maximumPoolSize the maximum pool size * @param completedTaskCount the completed task count * @param active the active * @param task the task * @param keepAliveTime the keep alive time * @param activePercent the active percent * @param queueCapacity the queue capacity * @param queueSize the queue size * @param avgDiff the avg diff */ public ThreadPoolDetailInfo(String threadPoolName, Integer poolSize, Integer corePoolSize, Integer largestPoolSize, Integer maximumPoolSize, long completedTaskCount, Integer active, long task, long keepAliveTime, String activePercent, Integer queueCapacity, Integer queueSize, long avgDiff) { this.threadPoolName = threadPoolName; this.poolSize = poolSize; this.corePoolSize = corePoolSize; this.largestPoolSize = largestPoolSize; this.maximumPoolSize = maximumPoolSize; this.completedTaskCount = completedTaskCount; this.active = active; this.task = task; this.keepAliveTime = keepAliveTime; this.activePercent = activePercent; this.queueCapacity = queueCapacity; this.queueSize = queueSize; this.avgDiff = avgDiff; } /** * Gets thread pool name. * * @return the thread pool name */ public String getThreadPoolName() { return threadPoolName; } /** * Sets thread pool name. * * @param threadPoolName the thread pool name */ public void setThreadPoolName(String threadPoolName) { this.threadPoolName = threadPoolName; } /** * Gets pool size. * * @return the pool size */ public Integer getPoolSize() { return poolSize; } /** * Sets pool size. * * @param poolSize the pool size */ public void setPoolSize(Integer poolSize) { this.poolSize = poolSize; } /** * Gets core pool size. * * @return the core pool size */ public Integer getCorePoolSize() { return corePoolSize; } /** * Sets core pool size. * * @param corePoolSize the core pool size */ public void setCorePoolSize(Integer corePoolSize) { this.corePoolSize = corePoolSize; } /** * Gets largest pool size. * * @return the largest pool size */ public Integer getLargestPoolSize() { return largestPoolSize; } /** * Sets largest pool size. * * @param largestPoolSize the largest pool size */ public void setLargestPoolSize(Integer largestPoolSize) { this.largestPoolSize = largestPoolSize; } /** * Gets maximum pool size. * * @return the maximum pool size */ public Integer getMaximumPoolSize() { return maximumPoolSize; } /** * Sets maximum pool size. * * @param maximumPoolSize the maximum pool size */ public void setMaximumPoolSize(Integer maximumPoolSize) { this.maximumPoolSize = maximumPoolSize; } /** * Gets completed task count. * * @return the completed task count */ public long getCompletedTaskCount() { return completedTaskCount; } /** * Sets completed task count. * * @param completedTaskCount the completed task count */ public void setCompletedTaskCount(long completedTaskCount) { this.completedTaskCount = completedTaskCount; } /** * Gets active. * * @return the active */ public Integer getActive() { return active; } /** * Sets active. * * @param active the active */ public void setActive(Integer active) { this.active = active; } /** * Gets task. * * @return the task */ public long getTask() { return task; } /** * Sets task. * * @param task the task */ public void setTask(long task) { this.task = task; } /** * Gets keep alive time. * * @return the keep alive time */ public long getKeepAliveTime() { return keepAliveTime; } /** * Sets keep alive time. * * @param keepAliveTime the keep alive time */ public void setKeepAliveTime(long keepAliveTime) { this.keepAliveTime = keepAliveTime; } /** * Gets active percent. * * @return the active percent */ public String getActivePercent() { return activePercent; } /** * Sets active percent. * * @param activePercent the active percent */ public void setActivePercent(String activePercent) { this.activePercent = activePercent; } /** * Gets queue capacity. * * @return the queue capacity */ public Integer getQueueCapacity() { return queueCapacity; } /** * Sets queue capacity. * * @param queueCapacity the queue capacity */ public void setQueueCapacity(Integer queueCapacity) { this.queueCapacity = queueCapacity; } /** * Gets queue size. * * @return the queue size */ public Integer getQueueSize() { return queueSize; } /** * Sets queue size. * * @param queueSize the queue size */ public void setQueueSize(Integer queueSize) { this.queueSize = queueSize; } /** * Gets avg diff. * * @return the avg diff */ public long getAvgDiff() { return avgDiff; } /** * Sets avg diff. * * @param avgDiff the avg diff */ public void setAvgDiff(long avgDiff) { this.avgDiff = avgDiff; } }
ThreadPoolInfo.javaspring
/** * The type Thread pool info. */ public class ThreadPoolInfo { private String threadPoolName; private int corePoolSize; private int maximumPoolSize; private String queueType; private int queueCapacity; /** * Instantiates a new Thread pool info. * * @param threadPoolName the thread pool name * @param corePoolSize the core pool size * @param maximumPoolSize the maximum pool size * @param queueType the queue type * @param queueCapacity the queue capacity */ public ThreadPoolInfo(String threadPoolName, int corePoolSize, int maximumPoolSize, String queueType, int queueCapacity) { this.threadPoolName = threadPoolName; this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.queueType = queueType; this.queueCapacity = queueCapacity; } /** * Gets thread pool name. * * @return the thread pool name */ public String getThreadPoolName() { return threadPoolName; } /** * Sets thread pool name. * * @param threadPoolName the thread pool name */ public void setThreadPoolName(String threadPoolName) { this.threadPoolName = threadPoolName; } /** * Gets core pool size. * * @return the core pool size */ public int getCorePoolSize() { return corePoolSize; } /** * Sets core pool size. * * @param corePoolSize the core pool size */ public void setCorePoolSize(int corePoolSize) { this.corePoolSize = corePoolSize; } /** * Gets maximum pool size. * * @return the maximum pool size */ public int getMaximumPoolSize() { return maximumPoolSize; } /** * Sets maximum pool size. * * @param maximumPoolSize the maximum pool size */ public void setMaximumPoolSize(int maximumPoolSize) { this.maximumPoolSize = maximumPoolSize; } /** * Gets queue type. * * @return the queue type */ public String getQueueType() { return queueType; } /** * Sets queue type. * * @param queueType the queue type */ public void setQueueType(String queueType) { this.queueType = queueType; } /** * Gets capacity. * * @return the capacity */ public int getqueueCapacity() { return queueCapacity; } /** * Sets capacity. * * @param queueCapacity the queue capacity */ public void setqueueCapacity(int queueCapacity) { this.queueCapacity = queueCapacity; } }
經過actuator裏的@RestControllerEndpoint註解能夠添加Endpoints接口。本質上是和@Endpoint,@WebEndpoint做用是同樣的,都是爲服務增長actuator 接口,方便管理運行中的服務。可是有一個明顯的不一樣是,@RestControllerEndpoint只支持Http方式的訪問,不支持JMX的訪問。並且,端點的方法上面只支持@GetMapping,@PostMapping,@DeleteMapping,@RequestMapping等,而不支持@ReadOperation,@WriteOperation,@DeleteOperation。並且它返回的格式是:application/json。json
因爲我司的監控系統只支持json格式,實際上使用Metrics和Grafana去監控會更好。緩存
/** * The type Thread pool endpoint. * * @author newrank */ @RestControllerEndpoint(id = "threadpool") @Component public class ThreadPoolEndpoint { @Autowired private ThreadPoolUtil threadPoolUtil; private static final ReentrantLock LOCK = new ReentrantLock(); private static final String RESIZEABLE_BLOCKING_QUEUE = "ResizeableBlockingQueue"; /** * getThreadPools * 獲取當前全部線程池的線程名稱 */ @GetMapping("getThreadPools") private List<String> getThreadPools (){ List<String> threadPools = new ArrayList<>(); if (!threadPoolUtil.getThreadPoolExecutorHashMap().isEmpty()){ for (Map.Entry<String, ThreadPoolMonitor> entry : threadPoolUtil.getThreadPoolExecutorHashMap().entrySet()) { threadPools.add(entry.getKey()); } } return threadPools; } /** * 獲取線程池可變參數信息 * @param threadPoolName * @return */ @GetMapping("getThreadPoolFixInfo") private ThreadPoolInfo getThreadPoolInfo(@RequestParam String threadPoolName){ if (threadPoolUtil.getThreadPoolExecutorHashMap().containsKey(threadPoolName)){ ThreadPoolMonitor threadPoolExecutor = threadPoolUtil.getThreadPoolExecutorHashMap().get(threadPoolName); int queueCapacity = 0; if (RESIZEABLE_BLOCKING_QUEUE.equals(threadPoolExecutor.getQueue().getClass().getSimpleName())){ ResizeableBlockingQueue queue = (ResizeableBlockingQueue) threadPoolExecutor.getQueue(); queueCapacity = queue.getCapacity(); } return new ThreadPoolInfo(threadPoolName,threadPoolExecutor.getCorePoolSize(),threadPoolExecutor.getMaximumPoolSize(), threadPoolExecutor.getQueue().getClass().getSimpleName(),queueCapacity); } return null; } /** * 修改線程池配置 * @param threadPoolInfo * @return */ @PostMapping("setThreadPoolFixInfo") private Boolean setThreadPoolInfo(@RequestBody ThreadPoolInfo threadPoolInfo){ if (threadPoolUtil.getThreadPoolExecutorHashMap().containsKey(threadPoolInfo.getThreadPoolName())){ LOCK.lock(); try { ThreadPoolMonitor threadPoolExecutor = threadPoolUtil.getThreadPoolExecutorHashMap().get(threadPoolInfo.getThreadPoolName()); threadPoolExecutor.setMaximumPoolSize(threadPoolInfo.getMaximumPoolSize()); threadPoolExecutor.setCorePoolSize(threadPoolInfo.getCorePoolSize()); if (RESIZEABLE_BLOCKING_QUEUE.equals(threadPoolExecutor.getQueue().getClass().getSimpleName())){ ResizeableBlockingQueue queue = (ResizeableBlockingQueue) threadPoolExecutor.getQueue(); queue.setCapacity(threadPoolInfo.getqueueCapacity()); } return true; }catch (Exception e){ e.printStackTrace(); return false; } finally { LOCK.unlock(); } } return false; } /** * 獲取線程池監控信息 * @return */ @GetMapping("getThreadPoolListInfo") private List<ThreadPoolDetailInfo> getThreadPoolListInfo(){ List<ThreadPoolDetailInfo> detailInfoList = new ArrayList<>(); if (!threadPoolUtil.getThreadPoolExecutorHashMap().isEmpty()){ for (Map.Entry<String, ThreadPoolMonitor> entry : threadPoolUtil.getThreadPoolExecutorHashMap().entrySet()) { ThreadPoolDetailInfo threadPoolDetailInfo = threadPoolInfo(entry.getValue(),entry.getKey()); detailInfoList.add(threadPoolDetailInfo); } } return detailInfoList; } /** * 組裝線程池詳情 * @param threadPool * @param threadPoolName * @return */ private ThreadPoolDetailInfo threadPoolInfo(ThreadPoolMonitor threadPool,String threadPoolName) { BigDecimal activeCount = new BigDecimal(threadPool.getActiveCount()); BigDecimal maximumPoolSize = new BigDecimal(threadPool.getMaximumPoolSize()); BigDecimal result =activeCount.divide(maximumPoolSize, 2, BigDecimal.ROUND_HALF_UP); NumberFormat numberFormat = NumberFormat.getPercentInstance(); numberFormat.setMaximumFractionDigits(2); int queueCapacity = 0; if (RESIZEABLE_BLOCKING_QUEUE.equals(threadPool.getQueue().getClass().getSimpleName())){ ResizeableBlockingQueue queue = (ResizeableBlockingQueue) threadPool.getQueue(); queueCapacity = queue.getCapacity(); } return new ThreadPoolDetailInfo(threadPoolName,threadPool.getPoolSize(), threadPool.getCorePoolSize(), threadPool.getLargestPoolSize(), threadPool.getMaximumPoolSize(), threadPool.getCompletedTaskCount(), threadPool.getActiveCount(),threadPool.getTaskCount(),threadPool.getKeepAliveTime(TimeUnit.MILLISECONDS), numberFormat.format(result.doubleValue()),queueCapacity,threadPool.getQueue().size(),threadPool.getTotalDiff()/threadPool.getTaskCount()); } }
註解併發
@Async("asyncExecutor") public void getTrendQuery(){ //do something }
直接使用app
public void test() { asyncExecutor.execute(()->{ //do something } );
http://localhost/actuator/threadpool/getThreadPoolListInfo //GET請求
返回:
[ { "active": 0, //正在進行的任務數 "activePercent": "0%",//線程池負載 "completedTaskCount": 17, //完成的任務數 "corePoolSize": 16, //核心線程數 "keepAliveTime": 60000,//線程存活時間 "largestPoolSize": 16,//到達的最大線程數 "maximumPoolSize": 32, //最大線程數 "poolSize": 16,//當前線程數 "queueCapacity": 500,//隊列長度 ps:若是不是ResizeableBlockingQueue 隊列則默認爲0 "task": 0, //任務總數 "queueSize":0,//隊列中緩存的任務數量 "threadPoolName": "asyncExecutor" //線程池名稱 } ]
http://localhost/actuator/threadpool/getThreadPoolFixInfo?threadPoolName=asyncExecutor //GET請求
參數:
名稱 | 類型 |
---|---|
threadPoolName | String |
返回:
{ "corePoolSize": 16, //核心線程數 "maximumPoolSize": 32, //最大線程數 "queueCapacity": 500, //隊列大小 "queueType": "ResizeableBlockingQueue", //隊列類型 "threadPoolName": "asyncExecutor" //線程池名稱 }
https://localhost/actuator/th... //Post請求
參數:
名稱 | 類型 | 備註 |
---|---|---|
threadPoolName | String | |
corePoolSize | int | 可變 |
maximumPoolSize | int | 可變 |
queueCapacity | int | 可變 |
queueType | String | 不可變 |
請求類型:json
返回: Boolean
Github
Github
做者水平有限,如有錯誤遺漏,請指出。
參考文章
1.Java線程池實現原理及其在美團業務中的實踐