本來想給 多線程任務 作一個掛起(暫停)功能,而後配合 httpcomponents-asyncclient 併發測試,結果意外使人汗顏,居然CPU佔用100%。。。 java
使用VisualVM觀察CPU抽樣,發現org.apache.http.impl.nio.reactor.AbstractIOReactor.execute()方法老是佔用大部分CPU,然而沒調用掛起操做時卻一切正常。 react
這掛起操做的其中一環須要中斷線程,這些線程均出產自自定義ThreadFactory: apache
public class GroupThreadFactory implements ThreadFactory { private final ThreadGroup group; private final AtomicInteger threadNumber; public GroupThreadFactory() { this.group = new ThreadGroup("WorkerGroup"); this.threadNumber = new AtomicInteger(1); } public Thread newThread(Runnable r) { Thread t = new Thread(null, r, "pool-thread-" + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } public ThreadGroup getGroup() { return this.group; } }
ThreadGroup能作到中斷從屬於此線程組的全部線程。 多線程
難道asyncclient的內部線程跑到這ThreadGroup裏了,而後被中斷後致使無限循環? 併發
故意用Eclipse搜索「ThreadFactory」引用關係,果然發現asyncclient實現的ThreadFactory: app
// org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor static class DefaultThreadFactory implements ThreadFactory { private static volatile int COUNT = 0; public Thread newThread(final Runnable r) { return new Thread(r, "I/O dispatcher " + (++COUNT)); } }
這就費解了……有扯到啥ThreadGroup嗎? async
迴歸Thread源碼,發現初始化方法: ide
private void init(ThreadGroup g, Runnable target, String name, long stackSize) { Thread parent = currentThread(); SecurityManager security = System.getSecurityManager(); if (g == null) { /* Determine if it's an applet or not */ /* If there is a security manager, ask the security manager what to do. */ if (security != null) { g = security.getThreadGroup(); } /* If the security doesn't have a strong opinion of the matter use the parent thread group. */ if (g == null) { g = parent.getThreadGroup(); } }
其中「 g = parent.getThreadGroup(); 」代表最低狀況下會採用當前線程的線程組,實際上幾乎採用這種方式。 測試
這就解釋了爲何內置的ThreadGroup會跑到asyncclient裏去了! ui
嘗試過度析修改Thread與ThreadGroup的引用關係,結果無奈放棄,畢竟是Java重要API,哪會隨便讓人修改。
最後,我從run方法下手。
/** * Give up {@link ThreadGroup} * @author Adan */ public abstract class InterruptableThreadFactory implements ThreadFactory { private final String name; private final AtomicInteger threadNumber = new AtomicInteger(1); private InterruptableThreadFactory(String factoryName){ this.name = factoryName + "-thread-"; } private abstract class IThread extends Thread{ public IThread(ThreadGroup group, Runnable target, String name, long stackSize) { super(group, target, name, stackSize); } protected abstract void started(); @Override public final void run() { if(Thread.currentThread() == this){ // the RUNNING thread only this.started(); // code here would be safe absolutely try { super.run(); } finally { this.terminated(); } }else{ super.run(); } } /** * invoked by the RUNNING thread */ protected abstract void terminated(); } public final Thread newThread(Runnable r) { Thread t = new IThread(null, r, this.name+threadNumber.getAndIncrement(), 0) { @Override protected void started() { hold(this); } @Override protected void terminated() { releaseThread(); } }; if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } /** * 釋放當前線程在此佔用的資源。 */ final void releaseThread() { this.release(Thread.currentThread()); } abstract void release(Thread t) ; // 注意掛起線程時與釋放資源時的鎖等待衝突 abstract void hold(Thread t) ; public abstract Object[] snapshotThreads() ; // copy on read abstract int threadSize() ; /** * 中斷全部持有線程。<br/> * @see #doInterrupt(Object[]) * @see #doInterrupt(Iterator) */ public void interruptThreads() { this.doInterrupt( this.snapshotThreads() ); } final void doInterrupt(Object[] threads) { for (int i = 0; i < threads.length; i++) { this.interrupt( (Thread) threads[i] ); } } /** * @param threadIterator 弱一致的 */ final void doInterrupt(Iterator<Thread> threadIterator) { // interruptDirectThreads while ( threadIterator.hasNext() ) { this.interrupt( threadIterator.next() ); } } private final void interrupt(Thread t) { if (t.getState() != Thread.State.TERMINATED) t.interrupt(); } @Override public String toString() { StringBuilder sb = new StringBuilder(50); sb.append("InterruptableThreadFactory@").append(Integer.toHexString(super.hashCode())) .append("(threadSize=").append(this.threadSize()).append(')'); return sb.toString(); } public static InterruptableThreadFactory newInstance(String factoryName, boolean large){ if(large){ return new InterruptableThreadFactory(factoryName){ ... }; }else{ return new InterruptableThreadFactory(factoryName){ @SuppressWarnings("serial") private final Collection<Thread> threadSet = new java.util.ArrayList<Thread>(){ @Override public synchronized final boolean remove(Object o){return super.remove(o);} @Override public synchronized final boolean add(Thread e){return super.add(e);} @Override public synchronized final Object[] toArray(){return super.toArray();} @Override public synchronized final int size() { return super.size(); } }; @Override final void release(Thread t) { this.threadSet.remove(t); } @Override final void hold(Thread t) { this.threadSet.add(t); } @Override public final Object[] snapshotThreads() { Object[] threads = this.threadSet.toArray(); return threads; } @Override final int threadSize() { return this.threadSet.size(); } }; } } }
OK,測試經過。
不過話說回來,感受「組」概念被削弱,成了雞肋。