spring控制併發數的工具類ConcurrencyThrottleSupport和ConcurrencyThrottleInterceptor

在ConcurrencyThrottleSupport類中,簡單的經過synchronized和wati and notify達到控制線程數量的效果,從而實現限流的策略。html

1、類圖java

2、主要方法react

先看ConcurrencyThrottleInterceptor.java類的源碼:spring

看該攔截器中的invoke()方法中,在執行目標方法的先後分別執行beforeAccess()和 afterAccess()方法,異步

  • beforeAccess方法中經過內部計數器concurrencyCount來對比設置的閥值concurrencyLimit,若是超過設置值,則阻塞。若沒有超過設置值,則concurrencyCount自加。
  • afterAccess方法中自減concurrencyCount
public class ConcurrencyThrottleInterceptor extends ConcurrencyThrottleSupport
        implements MethodInterceptor, Serializable {

    public ConcurrencyThrottleInterceptor() {
        setConcurrencyLimit(1);
    }

    @Override
    public Object invoke(MethodInvocation methodInvocation) throws Throwable {
        beforeAccess();
        try {
            return methodInvocation.proceed();
        }
        finally {
            afterAccess();
        }
    }

}

 

beforeAccess()實現(在父類ConcurrencyThrottleSupport中實現)ide

    protected void beforeAccess() {
        if (this.concurrencyLimit == NO_CONCURRENCY) {
            throw new IllegalStateException(
                    "Currently no invocations allowed - concurrency limit set to NO_CONCURRENCY");
        }
        if (this.concurrencyLimit > 0) {
            boolean debug = logger.isDebugEnabled();
            synchronized (this.monitor) {
                boolean interrupted = false;
                while (this.concurrencyCount >= this.concurrencyLimit) {
                    if (interrupted) {
                        throw new IllegalStateException("Thread was interrupted while waiting for invocation access, " +
                                "but concurrency limit still does not allow for entering");
                    }
                    if (debug) {
                        logger.debug("Concurrency count " + this.concurrencyCount +
                                " has reached limit " + this.concurrencyLimit + " - blocking");
                    }
                    try {
                        this.monitor.wait();
                    }
                    catch (InterruptedException ex) {
                        // Re-interrupt current thread, to allow other threads to react.
                        Thread.currentThread().interrupt();
                        interrupted = true;
                    }
                }
                if (debug) {
                    logger.debug("Entering throttle at concurrency count " + this.concurrencyCount);
                }
                this.concurrencyCount++;
            }
        }
    }

 beforeAccess()實現(在父類ConcurrencyThrottleSupport中實現)post

    protected void afterAccess() {
        if (this.concurrencyLimit >= 0) {
            synchronized (this.monitor) {
                this.concurrencyCount--;
                if (logger.isDebugEnabled()) {
                    logger.debug("Returning from throttle at concurrency count " + this.concurrencyCount);
                }
                this.monitor.notify();
            }
        }
    }

 使用場景見《spring異步線程池-SimpleAsyncTaskExecutorthis

相關文章
相關標籤/搜索