本文首發於泊浮目的專欄: https://segmentfault.com/blog...
在ZStack中,最基本的執行單位不只僅是一個函數,也能夠是一個任務(Task。其本質實現了Java的Callable接口)。經過大小合理的線程池調度來並行的消費這些任務,使ZStack這個Iaas軟件有條不紊運行在大型的數據中內心。java
對線程池不太瞭解的同窗能夠先看個人一篇博客: Java多線程筆記(三):線程池
在這裏,將以ZStack中ThreadFacade
最經常使用的方法爲例進行演示。git
提交同步任務,線程將會等結果完成後才繼續下一個任務。github
這裏先參考ZStack中ApiMediatorImpl ,其中有一段用於API消息調度的邏輯。spring
@Override public void handleMessage(final Message msg) { thdf.syncSubmit(new SyncTask<Object>() { @Override public String getSyncSignature() { return "api.worker"; } @Override public int getSyncLevel() { return apiWorkerNum; } @Override public String getName() { return "api.worker"; } @MessageSafe public void handleMessage(Message msg) { if (msg instanceof APIIsReadyToGoMsg) { handle((APIIsReadyToGoMsg) msg); } else if (msg instanceof APIGetVersionMsg) { handle((APIGetVersionMsg) msg); } else if (msg instanceof APIGetCurrentTimeMsg) { handle((APIGetCurrentTimeMsg) msg); } else if (msg instanceof APIMessage) { dispatchMessage((APIMessage) msg); } else { logger.debug("Not an APIMessage.Message ID is " + msg.getId()); } } @Override public Object call() throws Exception { handleMessage(msg); return null; } }); }
每一個API消息都會被一個線程消費,同時最大併發量爲5(apiWorkerNum=5
)。每一個線程都會等着API消息的回覆,等到回覆後便給用戶。apache
提交異步任務,這裏的任務執行後將會執行隊列中的下一個任務,不會等待結果。編程
參考VmInstanceBase關於虛擬機啓動、重啓、暫停相關的代碼:segmentfault
//暫停虛擬機 protected void handle(final APIStopVmInstanceMsg msg) { thdf.chainSubmit(new ChainTask(msg) { @Override public String getName() { return String.format("stop-vm-%s", self.getUuid()); } @Override public String getSyncSignature() { return syncThreadName; } @Override public void run(SyncTaskChain chain) { stopVm(msg, chain); } }); } //重啓虛擬機 protected void handle(final APIRebootVmInstanceMsg msg) { thdf.chainSubmit(new ChainTask(msg) { @Override public String getName() { return String.format("reboot-vm-%s", self.getUuid()); } @Override public String getSyncSignature() { return syncThreadName; } @Override public void run(SyncTaskChain chain) { rebootVm(msg, chain); } }); } //啓動虛擬機 protected void handle(final APIStartVmInstanceMsg msg) { thdf.chainSubmit(new ChainTask(msg) { @Override public String getName() { return String.format("start-vm-%s", self.getUuid()); } @Override public String getSyncSignature() { return syncThreadName; } @Override public void run(SyncTaskChain chain) { startVm(msg, chain); } }); }
getSyncSignature
則指定了其隊列的key,這個任務隊列本質一個Map。根據相同的k,將任務做爲v按照順序放入map執行。單從這裏的業務邏輯來看,能夠有效避免虛擬機的狀態混亂。api
chainTask的默認併發度爲1,這意味着它是同步的。在稍後的源碼解析中咱們將會看到。
先從接口ThreadFacade
瞭解一下方法簽名:多線程
public interface ThreadFacade extends Component { <T> Future<T> submit(Task<T> task);//提交一個任務 <T> Future<T> syncSubmit(SyncTask<T> task); //提交一個有返回值的任務 Future<Void> chainSubmit(ChainTask task); //提交一個沒有返回值的任務 Future<Void> submitPeriodicTask(PeriodicTask task, long delay); //提交一個週期性任務,將在必定時間後執行 Future<Void> submitPeriodicTask(PeriodicTask task); //提交一個週期性任務 Future<Void> submitCancelablePeriodicTask(CancelablePeriodicTask task); //提交一個能夠取消的週期性任務 Future<Void> submitCancelablePeriodicTask(CancelablePeriodicTask task, long delay); //提交一個能夠取消的週期性任務,將在必定時間後執行 void registerHook(ThreadAroundHook hook); //註冊鉤子 void unregisterHook(ThreadAroundHook hook); //取消鉤子 ThreadFacadeImpl.TimeoutTaskReceipt submitTimeoutTask(Runnable task, TimeUnit unit, long delay); //提交一個過了必定時間就算超時的任務 void submitTimerTask(TimerTask task, TimeUnit unit, long delay); //提交一個timer任務 }
以及幾個方法邏輯實現類DispatchQueueImpl中的幾個成員變量。併發
private static final CLogger logger = Utils.getLogger(DispatchQueueImpl.class); @Autowired ThreadFacade _threadFacade; private final HashMap<String, SyncTaskQueueWrapper> syncTasks = new HashMap<String, SyncTaskQueueWrapper>(); private final HashMap<String, ChainTaskQueueWrapper> chainTasks = new HashMap<String, ChainTaskQueueWrapper>(); private static final CLogger _logger = CLoggerImpl.getLogger(DispatchQueueImpl.class); public static final String DUMP_TASK_DEBUG_SINGAL = "DumpTaskQueue";
關鍵就是syncTasks(同步隊列)和chainTasks(異步隊列) ,用於存儲兩種類型的任務隊列。
所以當咱們提交chainTask時,要注意記得顯示的調用next方法,避免後面的任務調度不到。
接着,咱們從最經常使用的幾個方法開始看它的代碼。
從ThreadFacadeImpl做爲入口
@Override public Future<Void> chainSubmit(ChainTask task) { return dpq.chainSubmit(task); }
DispatchQueue中的邏輯
//公有方法,即入口之一 @Override public Future<Void> chainSubmit(ChainTask task) { return doChainSyncSubmit(task); }
//內部邏輯 private <T> Future<T> doChainSyncSubmit(final ChainTask task) { assert task.getSyncSignature() != null : "How can you submit a chain task without sync signature ???"; DebugUtils.Assert(task.getSyncLevel() >= 1, String.format("getSyncLevel() must return 1 at least ")); synchronized (chainTasks) { final String signature = task.getSyncSignature(); ChainTaskQueueWrapper wrapper = chainTasks.get(signature); if (wrapper == null) { wrapper = new ChainTaskQueueWrapper(); chainTasks.put(signature, wrapper); } ChainFuture cf = new ChainFuture(task); wrapper.addTask(cf); wrapper.startThreadIfNeeded(); return cf; } }
這段邏輯大體爲:
signature
的任務。HashMap<String, ChainTaskQueueWrapper> chainTasks
,嘗試取出相同signature
的隊列。若是沒有則新建一個相關signature
的隊列,並初始化這個隊列的線程數量和它的signature
。不管如何,要將這個任務放置隊列。startThreadIfNeeded
。所謂ifNeeded就是指給這個隊列的線程數尚有空餘。而後提交一個任務到線程池中,這個任務的內容是:從等待隊列中取出一個Feture,若是等待隊列爲空,則刪除這個等待隊列的Map。private class ChainTaskQueueWrapper { LinkedList pendingQueue = new LinkedList(); final LinkedList runningQueue = new LinkedList(); AtomicInteger counter = new AtomicInteger(0); int maxThreadNum = -1; String syncSignature; void addTask(ChainFuture task) { pendingQueue.offer(task); if (maxThreadNum == -1) { maxThreadNum = task.getSyncLevel(); } if (syncSignature == null) { syncSignature = task.getSyncSignature(); } } void startThreadIfNeeded() { //若是運行線程數量已經大於等於限制,不start if (counter.get() >= maxThreadNum) { return; } counter.incrementAndGet(); _threadFacade.submit(new Task<Void>() { @Override public String getName() { return "sync-chain-thread"; } // start a new thread every time to avoid stack overflow @AsyncThread private void runQueue() { ChainFuture cf; synchronized (chainTasks) { // remove from pending queue and add to running queue later cf = (ChainFuture) pendingQueue.poll(); if (cf == null) { if (counter.decrementAndGet() == 0) { //而且線程只有一個(跑完就沒了),則將相關的signature隊列移除,避免佔用內存 chainTasks.remove(syncSignature); } //若是爲空,則沒有任務,返回 return; } } synchronized (runningQueue) { // add to running queue runningQueue.offer(cf); } //完成之後將任務挪出運行隊列 cf.run(new SyncTaskChain() { @Override public void next() { synchronized (runningQueue) { runningQueue.remove(cf); } runQueue(); } }); } //這個方法將會被線程池調用,做爲入口 @Override public Void call() throws Exception { runQueue(); return null; } }); } }
syncSubmit
的內部邏輯與咱們以前分析的chainSubmit
極爲類似,只是放入了不一樣的隊列中。
一樣,也是從ThreadFacadeImpl做爲入口
@Override public <T> Future<T> syncSubmit(SyncTask<T> task) { return dpq.syncSubmit(task); }
而後是DispatchQueue中的實現
@Override public <T> Future<T> syncSubmit(SyncTask<T> task) { if (task.getSyncLevel() <= 0) { return _threadFacade.submit(task); } else { return doSyncSubmit(task); } }
內部邏輯-私有方法
private <T> Future<T> doSyncSubmit(final SyncTask<T> syncTask) { assert syncTask.getSyncSignature() != null : "How can you submit a sync task without sync signature ???"; SyncTaskFuture f; synchronized (syncTasks) { SyncTaskQueueWrapper wrapper = syncTasks.get(syncTask.getSyncSignature()); if (wrapper == null) { wrapper = new SyncTaskQueueWrapper(); //放入syncTasks隊列。 syncTasks.put(syncTask.getSyncSignature(), wrapper); } f = new SyncTaskFuture(syncTask); wrapper.addTask(f); wrapper.startThreadIfNeeded(); } return f; }
提交一個定時任務本質上是經過了線程池的scheduleAtFixedRate
來實現。這個方法用於對任務進行週期性調度,任務調度的頻率是必定的,它以上一個任務開始執行時間爲起點,以後的period時間後調度下一次任務。若是任務的執行時間大於調度時間,那麼任務就會在上一個任務結束後,當即被調用。
調用這個方法時將會把任務放入定時任務隊列。當任務出現異常時,將會取消這個Futrue,而且挪出隊列。
public Future<Void> submitPeriodicTask(final PeriodicTask task, long delay) { assert task.getInterval() != 0; assert task.getTimeUnit() != null; ScheduledFuture<Void> ret = (ScheduledFuture<Void>) _pool.scheduleAtFixedRate(new Runnable() { public void run() { try { task.run(); } catch (Throwable e) { _logger.warn("An unhandled exception happened during executing periodic task: " + task.getName() + ", cancel it", e); final Map<PeriodicTask, ScheduledFuture<?>> periodicTasks = getPeriodicTasks(); final ScheduledFuture<?> ft = periodicTasks.get(task); if (ft != null) { ft.cancel(true); periodicTasks.remove(task); } else { _logger.warn("Not found feature for task " + task.getName() + ", the exception happened too soon, will try to cancel the task next time the exception happens"); } } } }, delay, task.getInterval(), task.getTimeUnit()); _periodicTasks.put(task, ret); return ret; }
而submitCancelablePeriodicTask
則是會在執行時檢測ScheduledFuture是否被要求cancel,若是有要求則取消。
@Override public Future<Void> submitCancelablePeriodicTask(final CancelablePeriodicTask task, long delay) { ScheduledFuture<Void> ret = (ScheduledFuture<Void>) _pool.scheduleAtFixedRate(new Runnable() { private void cancelTask() { ScheduledFuture<?> ft = cancelablePeriodicTasks.get(task); if (ft != null) { ft.cancel(true); cancelablePeriodicTasks.remove(task); } else { _logger.warn("cannot find feature for task " + task.getName() + ", the exception happened too soon, will try to cancel the task next time the exception happens"); } } public void run() { try { boolean cancel = task.run(); if (cancel) { cancelTask(); } } catch (Throwable e) { _logger.warn("An unhandled exception happened during executing periodic task: " + task.getName() + ", cancel it", e); cancelTask(); } } }, delay, task.getInterval(), task.getTimeUnit()); cancelablePeriodicTasks.put(task, ret); return ret; }
不一樣與一般的ZStack組件,它雖然實現了Component
接口。可是其start中的邏輯並不全面,初始化邏輯是基於spring bean的生命週期來作的。見ThreadFacade。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:zstack="http://zstack.org/schema/zstack" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://zstack.org/schema/zstack http://zstack.org/schema/zstack/plugin.xsd" default-init-method="init" default-destroy-method="destroy"> <bean id="ThreadFacade" class="org.zstack.core.thread.ThreadFacadeImpl"> <property name="totalThreadNum" value="500" /> <!-- don't declare Component extension, it's specially handled --> </bean> <bean id="ThreadAspectj" class="org.zstack.core.aspect.ThreadAspect" factory-method="aspectOf" /> </beans>
再讓回頭看看ThreadFacadeImpl的init與destory操做。
//init 操做 public void init() { //根據全局配置讀入線程池最大線程數量 totalThreadNum = ThreadGlobalProperty.MAX_THREAD_NUM; if (totalThreadNum < 10) { _logger.warn(String.format("ThreadFacade.maxThreadNum is configured to %s, which is too small for running zstack. Change it to 10", ThreadGlobalProperty.MAX_THREAD_NUM)); totalThreadNum = 10; } // 構建一個支持延時任務的線程池 _pool = new ScheduledThreadPoolExecutorExt(totalThreadNum, this, this); _logger.debug(String.format("create ThreadFacade with max thread number:%s", totalThreadNum)); //構建一個DispatchQueue dpq = new DispatchQueueImpl(); jmxf.registerBean("ThreadFacade", this); }
//destory public void destroy() { _pool.shutdownNow(); }
看了這裏可能你們會有疑問,這種關閉方式未免關於暴力(執行任務的線程會所有被中斷)。在此以前,咱們曾提到過,它實現了Component
接口。這個接口分別有一個start
和stop
方法,使一個組件的生命週期可以方便的在ZStack中註冊相應的鉤子。
//stop 方法 @Override public boolean stop() { _pool.shutdown(); timerPool.stop(); return true; }
ThreadFacadeImpl
同時也實現了ThreadFactory
,可讓線程在建立時作一些操做。
@Override public Thread newThread(Runnable arg0) { return new Thread(arg0, "zs-thread-" + String.valueOf(seqNum.getAndIncrement())); }
在這裏能夠看到ZStack爲每個新的線程賦予了一個名字。
ZStack對JDK中的線程池進行了必定的擴展,對一個任務執行先後都有相應的鉤子函數,同時也開放註冊鉤子。
package org.zstack.core.thread; import org.apache.logging.log4j.ThreadContext; import org.zstack.utils.logging.CLogger; import org.zstack.utils.logging.CLoggerImpl; import java.util.ArrayList; import java.util.List; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; public class ScheduledThreadPoolExecutorExt extends ScheduledThreadPoolExecutor { private static final CLogger _logger =CLoggerImpl.getLogger(ScheduledThreadPoolExecutorExt.class); List<ThreadAroundHook> _hooks = new ArrayList<ThreadAroundHook>(8); public ScheduledThreadPoolExecutorExt(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, threadFactory, handler); this.setMaximumPoolSize(corePoolSize); } public void registerHook(ThreadAroundHook hook) { synchronized (_hooks) { _hooks.add(hook); } } public void unregisterHook(ThreadAroundHook hook) { synchronized (_hooks) { _hooks.remove(hook); } } @Override protected void beforeExecute(Thread t, Runnable r) { ThreadContext.clearMap(); ThreadContext.clearStack(); ThreadAroundHook debugHook = null; List<ThreadAroundHook> tmpHooks; synchronized (_hooks) { tmpHooks = new ArrayList<ThreadAroundHook>(_hooks); } for (ThreadAroundHook hook : tmpHooks) { debugHook = hook; try { hook.beforeExecute(t, r); } catch (Exception e) { _logger.warn("Unhandle exception happend during executing ThreadAroundHook: " + debugHook.getClass().getCanonicalName(), e); } } } @Override protected void afterExecute(Runnable r, Throwable t) { ThreadContext.clearMap(); ThreadContext.clearStack(); ThreadAroundHook debugHook = null; List<ThreadAroundHook> tmpHooks; synchronized (_hooks) { tmpHooks = new ArrayList<ThreadAroundHook>(_hooks); } for (ThreadAroundHook hook : tmpHooks) { debugHook = hook; try { hook.afterExecute(r, t); } catch (Exception e) { _logger.warn("Unhandle exception happend during executing ThreadAroundHook: " + debugHook.getClass().getCanonicalName(), e); } } } }
另外,ScheduledThreadPoolExecutorExt
是繼承自ScheduledThreadPoolExecutor
。本質上是一個任務調度線程池,用的工做隊列也是一個延時工做隊列。
本文分析了ZStack的久經生產考驗的核心組件——線程池。經過線程池,使並行編程變得再也不那麼複雜。
固然,其中也有一些能夠改進的地方: