在ConcurrencyThrottleSupport類中,簡單的經過synchronized和wati and notify達到控制線程數量的效果,從而實現限流的策略。html
1、類圖java
2、主要方法react
先看ConcurrencyThrottleInterceptor.java類的源碼:spring
看該攔截器中的invoke()方法中,在執行目標方法的先後分別執行beforeAccess()和 afterAccess()方法,異步
public class ConcurrencyThrottleInterceptor extends ConcurrencyThrottleSupport implements MethodInterceptor, Serializable { public ConcurrencyThrottleInterceptor() { setConcurrencyLimit(1); } @Override public Object invoke(MethodInvocation methodInvocation) throws Throwable { beforeAccess(); try { return methodInvocation.proceed(); } finally { afterAccess(); } } }
beforeAccess()實現(在父類ConcurrencyThrottleSupport中實現)ide
protected void beforeAccess() { if (this.concurrencyLimit == NO_CONCURRENCY) { throw new IllegalStateException( "Currently no invocations allowed - concurrency limit set to NO_CONCURRENCY"); } if (this.concurrencyLimit > 0) { boolean debug = logger.isDebugEnabled(); synchronized (this.monitor) { boolean interrupted = false; while (this.concurrencyCount >= this.concurrencyLimit) { if (interrupted) { throw new IllegalStateException("Thread was interrupted while waiting for invocation access, " + "but concurrency limit still does not allow for entering"); } if (debug) { logger.debug("Concurrency count " + this.concurrencyCount + " has reached limit " + this.concurrencyLimit + " - blocking"); } try { this.monitor.wait(); } catch (InterruptedException ex) { // Re-interrupt current thread, to allow other threads to react. Thread.currentThread().interrupt(); interrupted = true; } } if (debug) { logger.debug("Entering throttle at concurrency count " + this.concurrencyCount); } this.concurrencyCount++; } } }
beforeAccess()實現(在父類ConcurrencyThrottleSupport中實現)post
protected void afterAccess() { if (this.concurrencyLimit >= 0) { synchronized (this.monitor) { this.concurrencyCount--; if (logger.isDebugEnabled()) { logger.debug("Returning from throttle at concurrency count " + this.concurrencyCount); } this.monitor.notify(); } } }
使用場景見《spring異步線程池-SimpleAsyncTaskExecutor》this