spring線程池(同步、異步)

1、spring異步線程池類圖

2、簡單介紹

2.1. TaskExecutor---Spring異步線程池的接口類,其實質是java.util.concurrent.Executor

如下是官方已經實現的所有7個TaskExecuter。Spring宣稱對於任何場景,這些TaskExecuter徹底夠用了:html

名字 特色
SimpleAsyncTaskExecutor 每次請求新開線程,沒有最大線程數設置.不是真的線程池,這個類不重用線程,每次調用都會建立一個新的線程--【1】
SyncTaskExecutor 不是異步的線程.同步能夠用SyncTaskExecutor,但這個能夠說不算一個線程池,由於還在原線程執行。這個類沒有實現異步調用,只是一個同步操做。
ConcurrentTaskExecutor Executor的適配類,不推薦使用。若是ThreadPoolTaskExecutor不知足要求時,才用考慮使用這個類。
SimpleThreadPoolTaskExecutor 監聽Spring’s lifecycle callbacks,而且能夠和Quartz的Component兼容.是Quartz的SimpleThreadPool的類。線程池同時被quartz和非quartz使用,才須要使用此類。
ThreadPoolTaskExecutor 最經常使用。要求jdk版本大於等於5。能夠在程序而不是xml裏修改線程池的配置.其實質是對java.util.concurrent.ThreadPoolExecutor的包裝。
TimerTaskExecutor  
WorkManagerTaskExecutor

 

3、Spring中的同步執行器

1. SyncTaskExecutor:同步能夠用SyncTaskExecutor,但這個能夠說不算一個線程池,由於還在原線程執行。這個類沒有實現異步調用,只是一個同步操做。java

2.也能夠用ThreadPoolTaskExecutor結合FutureTask作到同步。spring

3.2. SyncTaskExecutor與ThreadPoolTaskExecutor區別

前者是同步執行器,執行任務同步,後者是線程池,執行任務異步。併發

 

4、Spring中的異步執行器

異步執行用戶任務的SimpleAsyncTaskExecutor。每次執行客戶提交給它的任務時,它會啓動新的線程,並容許開發者控制併發線程的上限(concurrencyLimit),從而起到必定的資源節流做用。默認時,concurrencyLimit取值爲-1,即不啓用資源節流。app

SimpleAsyncTaskExecutor
<bean id="simpleAsyncTaskExecutor"   
    class="org.springframework.core.task.SimpleAsyncTaskExecutor">  
    <property name="daemon" value="true"/>  
    <property name="concurrencyLimit" value="2"/>  
    <property name="threadNamePrefix" value="simpleAsyncTaskExecutor"/>
</bean>  

 主要實現:1.支持限流處理 2.異步註冊線程返回結果框架

public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator implements AsyncListenableTaskExecutor, Serializable {
    //限流主要實現
    private final SimpleAsyncTaskExecutor.ConcurrencyThrottleAdapter concurrencyThrottle = new SimpleAsyncTaskExecutor.ConcurrencyThrottleAdapter();
    private ThreadFactory threadFactory;
    //設置最大的線程數量
    public void setConcurrencyLimit(int concurrencyLimit) {
            this.concurrencyThrottle.setConcurrencyLimit(concurrencyLimit);
    }
    //是否開啓了限流 限流數量大於0?
    public final boolean isThrottleActive() {
            return this.concurrencyThrottle.isThrottleActive();
    }
    //1.是否開啓限流 不然不開啓限流處理
    //2.執行開始以前檢測是否能夠知足要求 當前數量++
    //3.開啓限流將執行的Runable進行封裝,執行完成調用final方法 當前數量--
    public void execute(Runnable task, long startTimeout) {
            Assert.notNull(task, "Runnable must not be null");
            if(this.isThrottleActive() && startTimeout > 0L) {
                this.concurrencyThrottle.beforeAccess();
                this.doExecute(new SimpleAsyncTaskExecutor.ConcurrencyThrottlingRunnable(task));
            } else {
                this.doExecute(task);
            }
      }
     //異步提交有返回值
    public Future<?> submit(Runnable task) {
          FutureTask future = new FutureTask(task, (Object)null);
          this.execute(future, 9223372036854775807L);
          return future;
      }

      public <T> Future<T> submit(Callable<T> task) {
          FutureTask future = new FutureTask(task);
          this.execute(future, 9223372036854775807L);
          return future;
      }

      public ListenableFuture<?> submitListenable(Runnable task) {
          ListenableFutureTask future = new ListenableFutureTask(task, (Object)null);
          this.execute(future, 9223372036854775807L);
          return future;
      }

      public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
          ListenableFutureTask future = new ListenableFutureTask(task);
          this.execute(future, 9223372036854775807L);
          return future;
      }
      //擁有工廠?沒有的話調用父類能夠設置各類參數的建立線程
      protected void doExecute(Runnable task) {
          Thread thread = this.threadFactory != null?this.threadFactory.newThread(task):this.createThread(task);
          thread.start();
      }
      //父類的方法,方便配置線程,方便xml設置線程參數CustomizableThreadCreator 
      public Thread createThread(Runnable runnable) {
            Thread thread = new Thread(getThreadGroup(), runnable, nextThreadName());
            thread.setPriority(getThreadPriority());
            thread.setDaemon(isDaemon());
            return thread;
        }


 }

 4.2.限流處理其實就是在執行任務以前和以後對於當前線程數量進行統計

內部類的實現異步

//下面只是對於操做進行簡單的封裝,最真的實現仍是抽象的ConcurrencyThrottleSupport
  private static class ConcurrencyThrottleAdapter extends ConcurrencyThrottleSupport {
        private ConcurrencyThrottleAdapter() {
        }

        protected void beforeAccess() {
            super.beforeAccess();
        }

        protected void afterAccess() {
            super.afterAccess();
        }
    }

更多關於限流功能源碼見:《spring控制併發數的工具類ConcurrencyThrottleSupport和ConcurrencyThrottleInterceptorasync

//這裏是對於Runable對象執行在次封裝,在執行完畢後處理限流操做
private class ConcurrencyThrottlingRunnable implements Runnable {
        private final Runnable target;

        public ConcurrencyThrottlingRunnable(Runnable target) {
            this.target = target;
        }

        public void run() {
            try {
                this.target.run();
            } finally {
                SimpleAsyncTaskExecutor.this.concurrencyThrottle.afterAccess();
            }

        }
    }

簡單的經過synchronized和wati and notify達到控制線程數量的效果,從而實現限流的策略。工具

4.3.SimpleAsyncTaskExecutor中,執行任務時,每次都會新建一個線程,不使用線程池

看SimpleAsyncTaskExecutor.java的關鍵源代碼:post

 

    protected void doExecute(Runnable task) {
        Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
        thread.start();
    }

 

 

 

createThread()在父類中CustomizableThreadCreator.java中

    public Thread createThread(Runnable runnable) {
        Thread thread = new Thread(getThreadGroup(), runnable, nextThreadName());
        thread.setPriority(getThreadPriority());
        thread.setDaemon(isDaemon());
        return thread;
    }

 

4.4.步監聽獲取線程的結果,其實這個不算這裏面的實現

ListenableFutureTask 其實主要是依靠FutureTask這個JDK的封裝,覆蓋了原始的run方法,在run中封裝能夠獲取到線程的返回值。 
ListenableFutureTask 在次封裝,因爲FutureTask執行完成以後會調用done()空方法,ListenableFutureTask覆蓋done方法能夠獲取到執行的結果,而後在調用前期註冊的錯誤處理或者成功處理的方法,便可到達異步處理的效果。 
相似於回調的效果

public interface SuccessCallback<T> {

    /**
     * Called when the {@link ListenableFuture} successfully completes.
     * @param result the result
     */
    void onSuccess(T result);
}
public interface FailureCallback {

    /**
     * Called when the {@link ListenableFuture} fails to complete.
     * @param ex the exception that triggered the failure
     */
    void onFailure(Throwable ex);
}

public interface ListenableFuture<T> extends Future<T> {
    //成功和失敗的集合
    void addCallback(ListenableFutureCallback<? super T> var1);

    void addCallback(SuccessCallback<? super T> var1, FailureCallback var2);
}

實現類(ListenableFutureTask)可有返回值,可被監聽的,註冊監聽,這裏能夠註冊監聽者放在一個單獨的類中去處理,很好的分配工做ListenableFutureCallbackRegistry

public class ListenableFutureTask<T> extends FutureTask<T> implements ListenableFuture<T> {
    private final ListenableFutureCallbackRegistry<T> callbacks = new ListenableFutureCallbackRegistry();

    public ListenableFutureTask(Callable<T> callable) {
        super(callable);
    }

    public ListenableFutureTask(Runnable runnable, T result) {
        super(runnable, result);
    }

    public void addCallback(ListenableFutureCallback<? super T> callback) {
        this.callbacks.addCallback(callback);
    }

    public void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback) {
        this.callbacks.addSuccessCallback(successCallback);
        this.callbacks.addFailureCallback(failureCallback);
    }
    //FutureTask執行完成後的回調,調用監聽接口的實現類的方法
    protected final void done() {
        Object cause;
        try {
            Object ex = this.get();
            //回調實現類的方法
            this.callbacks.success(ex);
            return;
        } catch (InterruptedException var3) {
            Thread.currentThread().interrupt();
            return;
        } catch (ExecutionException var4) {
            cause = var4.getCause();
            if(cause == null) {
                cause = var4;
            }
        } catch (Throwable var5) {
            cause = var5;
        }

        this.callbacks.failure((Throwable)cause);
    }
}

註冊監聽,還維護了一個狀態量的信息,很標準的寫法,維護隊列的添加和成功消息和失敗消息的處理

public class ListenableFutureCallbackRegistry<T> {

    private final Queue<SuccessCallback<? super T>> successCallbacks = new LinkedList<SuccessCallback<? super T>>();

    private final Queue<FailureCallback> failureCallbacks = new LinkedList<FailureCallback>();

    private State state = State.NEW;

    private Object result = null;

    private final Object mutex = new Object();


    /**
     * Add the given callback to this registry.
     * @param callback the callback to add
     */
    public void addCallback(ListenableFutureCallback<? super T> callback) {
        Assert.notNull(callback, "'callback' must not be null");
        synchronized (this.mutex) {
            switch (this.state) {
                case NEW:
                    this.successCallbacks.add(callback);
                    this.failureCallbacks.add(callback);
                    break;
                case SUCCESS:
                    callback.onSuccess((T) this.result);
                    break;
                case FAILURE:
                    callback.onFailure((Throwable) this.result);
                    break;
            }
        }
    }

    /**
     * Add the given success callback to this registry.
     * @param callback the success callback to add
     * @since 4.1
     */
    public void addSuccessCallback(SuccessCallback<? super T> callback) {
        Assert.notNull(callback, "'callback' must not be null");
        synchronized (this.mutex) {
            switch (this.state) {
                case NEW:
                    this.successCallbacks.add(callback);
                    break;
                case SUCCESS:
                    callback.onSuccess((T) this.result);
                    break;
            }
        }
    }

    /**
     * Add the given failure callback to this registry.
     * @param callback the failure callback to add
     * @since 4.1
     */
    public void addFailureCallback(FailureCallback callback) {
        Assert.notNull(callback, "'callback' must not be null");
        synchronized (this.mutex) {
            switch (this.state) {
                case NEW:
                    this.failureCallbacks.add(callback);
                    break;
                case FAILURE:
                    callback.onFailure((Throwable) this.result);
                    break;
            }
        }
    }

    /**
     * Trigger a {@link ListenableFutureCallback#onSuccess(Object)} call on all
     * added callbacks with the given result.
     * @param result the result to trigger the callbacks with
     */
    public void success(T result) {
        synchronized (this.mutex) {
            this.state = State.SUCCESS;
            this.result = result;
            while (!this.successCallbacks.isEmpty()) {
                this.successCallbacks.poll().onSuccess(result);
            }
        }
    }

    public void failure(Throwable ex) {
        synchronized (this.mutex) {
            this.state = State.FAILURE;
            this.result = ex;
            while (!this.failureCallbacks.isEmpty()) {
                this.failureCallbacks.poll().onFailure(ex);
            }
        }
    }

    private enum State {NEW, SUCCESS, FAILURE}

}

5、使用ThreadPoolTaskExecutor

5.一、(傳統方式)

比起從線程池取一個線程再執行, 你僅僅須要把你的Runnable類加入到隊列中,而後TaskExecutor用它內置的規則決定什麼時候開始取一個線程並執行該Runnable類

先在xml中添加bean的配置:

<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
  <property name="corePoolSize" value="5" />
  <property name="maxPoolSize" value="10" />
  <property name="queueCapacity" value="25" />
</bean>

<bean id="taskExecutorExample" class="TaskExecutorExample">
  <constructor-arg ref="taskExecutor" />
</bean>

配置解釋 
當一個任務經過execute(Runnable)方法欲添加到線程池時: 
一、 若是此時線程池中的數量小於corePoolSize,即便線程池中的線程都處於空閒狀態,也要建立新的線程來處理被添加的任務。 
二、 若是此時線程池中的數量等於 corePoolSize,可是緩衝隊列 workQueue未滿,那麼任務被放入緩衝隊列。 
三、若是此時線程池中的數量大於corePoolSize,緩衝隊列workQueue滿,而且線程池中的數量小於maximumPoolSize,建新的線程來處理被添加的任務。 
四、 若是此時線程池中的數量大於corePoolSize,緩衝隊列workQueue滿,而且線程池中的數量等於maximumPoolSize,那麼經過 handler所指定的策略來處理此任務。也就是:處理任務的優先級爲:核心線程corePoolSize、任務隊列workQueue、最大線程 maximumPoolSize,若是三者都滿了,使用handler處理被拒絕的任務。 
五、 當線程池中的線程數量大於 corePoolSize時,若是某線程空閒時間超過keepAliveTime,線程將被終止。這樣,線程池能夠動態的調整池中的線程數。

具體調用:

import org.springframework.core.task.TaskExecutor;

public class TaskExecutorExample {

  private class MessagePrinterTask implements Runnable {

    private String message;

    public MessagePrinterTask(String message) {
      this.message = message;
    }

    public void run() {
      System.out.println(message);
    }

  }

  private TaskExecutor taskExecutor;

  public TaskExecutorExample(TaskExecutor taskExecutor) {
    this.taskExecutor = taskExecutor;
  }

  public void printMessages() {
    for(int i = 0; i < 25; i++) {
      taskExecutor.execute(new MessagePrinterTask("Message" + i));
    }
  }
}

 

5.2.推薦 - 使用ThreadPoolTaskExecutor(註解方式)

首先,爲了以註解方式使用異步功能,咱們須要在Spring的xml配置中定義相關的bean:

1.必須在*-servlet.xml而不是applicationContext.xml中定義@Async相關配置

引自http://stackoverflow.com/questions/6610563/spring-async-not-working

2 不使用線程池版本

引自http://www.springframework.org/schema/task/spring-task-3.2.xsd

因此,若是咱們僅僅添加<task:annotation-driven/>,也可使用@Async標籤。然而,此時使用的是SimpleAsyncTaskExecutor。如「官方文檔27章:Task Execution」中所述,SimpleAsyncTaskExecutor不會使用線程池,僅僅是爲每個請求新開一個線程。這樣在大併發的業務場景下,發生OutOfMemory是不足爲奇的。

<?xml version="1.0" encoding="UTF-8"?>
<!--Spring框架的xml標籤訂義文檔, 可訪問http://www.springframework.org/schema/task/查看最新task組件的xml標籤文檔-->
<beans xmlns:task="http://www.springframework.org/schema/task"
       xsi:schemaLocation="
                http://www.springframework.org/schema/task
                http://www.springframework.org/schema/task/spring-task-3.2.xsd">
    <!--掃描項目實例化@Component,@Service,@Controller修飾的類-->
    <context:component-scan base-package="com.your_app" /> 

    <!--create a SimpleAsyncTaskExecutor instance-->
    <task:annotation-driven/>
</beans>

 3 推薦 - 使用線程池版本

<?xml version="1.0" encoding="UTF-8"?>
<!--Spring框架的xml標籤訂義文檔, 可訪問http://www.springframework.org/schema/task/查看最新task組件的xml標籤文檔-->
<beans xmlns:task="http://www.springframework.org/schema/task"
       xsi:schemaLocation="
                http://www.springframework.org/schema/task
                http://www.springframework.org/schema/task/spring-task-3.2.xsd">

    <!--掃描項目實例化@Component,@Service,@Controller修飾的類-->
    <context:component-scan base-package="com.your_app" />

    <!-- 在代碼中@Async不加參數就會使用task:annotation-driven標籤訂義的executor-->
    <task:annotation-driven executor="myExecutor"/>
    <!-- 在代碼中@Async("myExecutor")能夠顯式指定executor爲"myExecutor"-->
    <task:executor id="myExecutor"  
               pool-size="5-25"
               queue-capacity="100"
               rejection-policy="CALLER_RUNS"/>
</beans>

 

其中,注意到屬性pool-size的值」5-25」是一個範圍,這對應的是線程池的min和max容量,它們的意義請參考本文上一節的「配置說明」裏的第三、4點。若是隻有一個值,如pool-size=n, 意味着minSize==maxSize==n

而關於rejection-policy,官方文檔裏說:

總結以下:

池滿時的拒絕策略 效果
AbortPolicy(默認) 拋異常
DiscardPolicy or DiscardOldestPolicy 放棄該線程
CallerRunsPolicy 通知該線程的建立者,讓其不要提交新的線程

 

轉自:https://blog.csdn.net/caib1109/article/details/51623089

相關文章
相關標籤/搜索