ZStack源碼剖析之核心庫鑑賞——ThreadFacade

本文首發於泊浮目的專欄: https://segmentfault.com/blog...

前言

在ZStack中,最基本的執行單位不只僅是一個函數,也能夠是一個任務(Task。其本質實現了Java的Callable接口)。經過大小合理的線程池調度來並行的消費這些任務,使ZStack這個Iaas軟件有條不紊運行在大型的數據中內心。java

對線程池不太瞭解的同窗能夠先看個人一篇博客: Java多線程筆記(三):線程池

演示代碼

在這裏,將以ZStack中ThreadFacade最經常使用的方法爲例進行演示。git

syncSubmit

提交同步任務,線程將會等結果完成後才繼續下一個任務。github

這裏先參考ZStack中ApiMediatorImpl ,其中有一段用於API消息調度的邏輯。spring

@Override
    public void handleMessage(final Message msg) {
        thdf.syncSubmit(new SyncTask<Object>() {
            @Override
            public String getSyncSignature() {
                return "api.worker";
            }

            @Override
            public int getSyncLevel() {
                return apiWorkerNum;
            }

            @Override
            public String getName() {
                return "api.worker";
            }

            @MessageSafe
            public void handleMessage(Message msg) {
                if (msg instanceof APIIsReadyToGoMsg) {
                    handle((APIIsReadyToGoMsg) msg);
                } else if (msg instanceof APIGetVersionMsg) {
                    handle((APIGetVersionMsg) msg);
                } else if (msg instanceof APIGetCurrentTimeMsg) {
                    handle((APIGetCurrentTimeMsg) msg);
                } else if (msg instanceof APIMessage) {
                    dispatchMessage((APIMessage) msg);
                } else {
                    logger.debug("Not an APIMessage.Message ID is " + msg.getId());
                }
            }

            @Override
            public Object call() throws Exception {
                handleMessage(msg);
                return null;
            }
        });
    }

每一個API消息都會被一個線程消費,同時最大併發量爲5(apiWorkerNum=5)。每一個線程都會等着API消息的回覆,等到回覆後便給用戶。apache

chainSubmit

提交異步任務,這裏的任務執行後將會執行隊列中的下一個任務,不會等待結果。編程

參考VmInstanceBase關於虛擬機啓動、重啓、暫停相關的代碼:segmentfault

//暫停虛擬機
    protected void handle(final APIStopVmInstanceMsg msg) {
        thdf.chainSubmit(new ChainTask(msg) {
            @Override
            public String getName() {
                return String.format("stop-vm-%s", self.getUuid());
            }

            @Override
            public String getSyncSignature() {
                return syncThreadName;
            }

            @Override
            public void run(SyncTaskChain chain) {
                stopVm(msg, chain);
            }
        });
    }
//重啓虛擬機
    protected void handle(final APIRebootVmInstanceMsg msg) {
        thdf.chainSubmit(new ChainTask(msg) {
            @Override
            public String getName() {
                return String.format("reboot-vm-%s", self.getUuid());
            }

            @Override
            public String getSyncSignature() {
                return syncThreadName;
            }

            @Override
            public void run(SyncTaskChain chain) {
                rebootVm(msg, chain);
            }
        });
    }
//啓動虛擬機
    protected void handle(final APIStartVmInstanceMsg msg) {
        thdf.chainSubmit(new ChainTask(msg) {
            @Override
            public String getName() {
                return String.format("start-vm-%s", self.getUuid());
            }

            @Override
            public String getSyncSignature() {
                return syncThreadName;
            }

            @Override
            public void run(SyncTaskChain chain) {
                startVm(msg, chain);
            }
        });
    }

通用特性

getSyncSignature則指定了其隊列的key,這個任務隊列本質一個Map。根據相同的k,將任務做爲v按照順序放入map執行。單從這裏的業務邏輯來看,能夠有效避免虛擬機的狀態混亂。api

chainTask的默認併發度爲1,這意味着它是同步的。在稍後的源碼解析中咱們將會看到。

它的實現

先從接口ThreadFacade瞭解一下方法簽名:多線程

public interface ThreadFacade extends Component {
    <T> Future<T> submit(Task<T> task);//提交一個任務
    
    <T> Future<T> syncSubmit(SyncTask<T> task); //提交一個有返回值的任務
    
    Future<Void> chainSubmit(ChainTask task); //提交一個沒有返回值的任務
    
    Future<Void> submitPeriodicTask(PeriodicTask task, long delay); //提交一個週期性任務,將在必定時間後執行
    
    Future<Void> submitPeriodicTask(PeriodicTask task); //提交一個週期性任務
    
    Future<Void> submitCancelablePeriodicTask(CancelablePeriodicTask task); //提交一個能夠取消的週期性任務
    
    Future<Void> submitCancelablePeriodicTask(CancelablePeriodicTask task, long delay); //提交一個能夠取消的週期性任務,將在必定時間後執行
    
    void registerHook(ThreadAroundHook hook);  //註冊鉤子
    
    void unregisterHook(ThreadAroundHook hook); //取消鉤子
    
    ThreadFacadeImpl.TimeoutTaskReceipt submitTimeoutTask(Runnable task, TimeUnit unit, long delay); //提交一個過了必定時間就算超時的任務

    void submitTimerTask(TimerTask task, TimeUnit unit, long delay); //提交一個timer任務
}

以及幾個方法邏輯實現類DispatchQueueImpl中的幾個成員變量。併發

private static final CLogger logger = Utils.getLogger(DispatchQueueImpl.class);

    @Autowired
    ThreadFacade _threadFacade;

    private final HashMap<String, SyncTaskQueueWrapper> syncTasks = new HashMap<String, SyncTaskQueueWrapper>();
    private final HashMap<String, ChainTaskQueueWrapper> chainTasks = new HashMap<String, ChainTaskQueueWrapper>();
    private static final CLogger _logger = CLoggerImpl.getLogger(DispatchQueueImpl.class);

    public static final String DUMP_TASK_DEBUG_SINGAL = "DumpTaskQueue";

關鍵就是syncTasks(同步隊列)和chainTasks(異步隊列) ,用於存儲兩種類型的任務隊列。

所以當咱們提交chainTask時,要注意記得顯示的調用next方法,避免後面的任務調度不到。

接着,咱們從最經常使用的幾個方法開始看它的代碼

chainSubmit方法

從ThreadFacadeImpl做爲入口

@Override
    public Future<Void> chainSubmit(ChainTask task) {
        return dpq.chainSubmit(task);
    }

DispatchQueue中的邏輯

//公有方法,即入口之一
    @Override
    public Future<Void> chainSubmit(ChainTask task) {
        return doChainSyncSubmit(task);
    }
//內部邏輯
    private <T> Future<T> doChainSyncSubmit(final ChainTask task) {
        assert task.getSyncSignature() != null : "How can you submit a chain task without sync signature ???";
        DebugUtils.Assert(task.getSyncLevel() >= 1, String.format("getSyncLevel() must return 1 at least "));

        synchronized (chainTasks) {
            final String signature = task.getSyncSignature();
            ChainTaskQueueWrapper wrapper = chainTasks.get(signature);
            if (wrapper == null) {
                wrapper = new ChainTaskQueueWrapper();
                chainTasks.put(signature, wrapper);
            }

            ChainFuture cf = new ChainFuture(task);
            wrapper.addTask(cf);
            wrapper.startThreadIfNeeded();
            return cf;
        }
    }

這段邏輯大體爲:

  • 斷言syncSignature不爲空,而且必須並行度必須大於等於1。由於1會被作成隊列,由一個線程完成這些任務。而1以上則指定了能夠有幾個線程來完成同一個signature的任務。
  • 加鎖HashMap<String, ChainTaskQueueWrapper> chainTasks ,嘗試取出相同signature的隊列。若是沒有則新建一個相關signature的隊列,並初始化這個隊列的線程數量和它的signature。不管如何,要將這個任務放置隊列。
  • 接下來就是startThreadIfNeeded。所謂ifNeeded就是指給這個隊列的線程數尚有空餘。而後提交一個任務到線程池中,這個任務的內容是:從等待隊列中取出一個Feture,若是等待隊列爲空,則刪除這個等待隊列的Map。
private class ChainTaskQueueWrapper {
        LinkedList pendingQueue = new LinkedList();
        final LinkedList runningQueue = new LinkedList();
        AtomicInteger counter = new AtomicInteger(0);
        int maxThreadNum = -1;
        String syncSignature;

        void addTask(ChainFuture task) {
            pendingQueue.offer(task);

            if (maxThreadNum == -1) {
                maxThreadNum = task.getSyncLevel();
            }
            if (syncSignature == null) {
                syncSignature = task.getSyncSignature();
            }
        }

        void startThreadIfNeeded() {
            //若是運行線程數量已經大於等於限制,不start
            if (counter.get() >= maxThreadNum) {
                return;
            }

            counter.incrementAndGet();
            _threadFacade.submit(new Task<Void>() {
                @Override
                public String getName() {
                    return "sync-chain-thread";
                }

                // start a new thread every time to avoid stack overflow
                @AsyncThread
                private void runQueue() {
                    ChainFuture cf;
                    synchronized (chainTasks) {
                        // remove from pending queue and add to running queue later
                        cf = (ChainFuture) pendingQueue.poll();

                        if (cf == null) {
                            if (counter.decrementAndGet() == 0) {
                                //而且線程只有一個(跑完就沒了),則將相關的signature隊列移除,避免佔用內存
                                chainTasks.remove(syncSignature);
                            }
                            //若是爲空,則沒有任務,返回
                            return;
                        }
                    }

                    synchronized (runningQueue) {
                        // add to running queue
                        runningQueue.offer(cf);
                    }
                    //完成之後將任務挪出運行隊列
                    cf.run(new SyncTaskChain() {
                        @Override
                        public void next() {
                            synchronized (runningQueue) {
                                runningQueue.remove(cf);
                            }

                            runQueue();
                        }
                    });
                }
                //這個方法將會被線程池調用,做爲入口
                @Override
                public Void call() throws Exception {
                    runQueue();
                    return null;
                }
            });
        }
    }

syncSubmit方法

syncSubmit的內部邏輯與咱們以前分析的chainSubmit極爲類似,只是放入了不一樣的隊列中。

一樣,也是從ThreadFacadeImpl做爲入口

@Override
    public <T> Future<T> syncSubmit(SyncTask<T> task) {
        return dpq.syncSubmit(task);
    }

而後是DispatchQueue中的實現

@Override
    public <T> Future<T> syncSubmit(SyncTask<T> task) {
        if (task.getSyncLevel() <= 0) {
            return _threadFacade.submit(task);
        } else {
            return doSyncSubmit(task);
        }
    }

內部邏輯-私有方法

private <T> Future<T> doSyncSubmit(final SyncTask<T> syncTask) {
        assert syncTask.getSyncSignature() != null : "How can you submit a sync task without sync signature ???";

        SyncTaskFuture f;
        synchronized (syncTasks) {
            SyncTaskQueueWrapper wrapper = syncTasks.get(syncTask.getSyncSignature());
            if (wrapper == null) {
                wrapper = new SyncTaskQueueWrapper();
                //放入syncTasks隊列。
                syncTasks.put(syncTask.getSyncSignature(), wrapper);
            }
            f = new SyncTaskFuture(syncTask);
            wrapper.addTask(f);
            wrapper.startThreadIfNeeded();
        }

        return f;
    }

submitPeriodicTask

提交一個定時任務本質上是經過了線程池的scheduleAtFixedRate來實現。這個方法用於對任務進行週期性調度,任務調度的頻率是必定的,它以上一個任務開始執行時間爲起點,以後的period時間後調度下一次任務。若是任務的執行時間大於調度時間,那麼任務就會在上一個任務結束後,當即被調用。

調用這個方法時將會把任務放入定時任務隊列。當任務出現異常時,將會取消這個Futrue,而且挪出隊列。

public Future<Void> submitPeriodicTask(final PeriodicTask task, long delay) {
        assert task.getInterval() != 0;
        assert task.getTimeUnit() != null;

        ScheduledFuture<Void> ret = (ScheduledFuture<Void>) _pool.scheduleAtFixedRate(new Runnable() {
            public void run() {
                try {
                    task.run();
                } catch (Throwable e) {
                    _logger.warn("An unhandled exception happened during executing periodic task: " + task.getName() + ", cancel it", e);
                    final Map<PeriodicTask, ScheduledFuture<?>> periodicTasks = getPeriodicTasks();
                    final ScheduledFuture<?> ft = periodicTasks.get(task);
                    if (ft != null) {
                        ft.cancel(true);
                        periodicTasks.remove(task);
                    } else {
                        _logger.warn("Not found feature for task " + task.getName()
                                + ", the exception happened too soon, will try to cancel the task next time the exception happens");
                    }
                }
            }
        }, delay, task.getInterval(), task.getTimeUnit());
        _periodicTasks.put(task, ret);
        return ret;
    }

submitCancelablePeriodicTask

submitCancelablePeriodicTask則是會在執行時檢測ScheduledFuture是否被要求cancel,若是有要求則取消。

@Override
    public Future<Void> submitCancelablePeriodicTask(final CancelablePeriodicTask task, long delay) {
        ScheduledFuture<Void> ret = (ScheduledFuture<Void>) _pool.scheduleAtFixedRate(new Runnable() {
            private void cancelTask() {
                ScheduledFuture<?> ft = cancelablePeriodicTasks.get(task);
                if (ft != null) {
                    ft.cancel(true);
                    cancelablePeriodicTasks.remove(task);
                } else {
                    _logger.warn("cannot find feature for task " + task.getName()
                            + ", the exception happened too soon, will try to cancel the task next time the exception happens");
                }
            }

            public void run() {
                try {
                    boolean cancel = task.run();
                    if (cancel) {
                        cancelTask();
                    }
                } catch (Throwable e) {
                    _logger.warn("An unhandled exception happened during executing periodic task: " + task.getName() + ", cancel it", e);
                    cancelTask();
                }
            }
        }, delay, task.getInterval(), task.getTimeUnit());
        cancelablePeriodicTasks.put(task, ret);
        return ret;
    }

初始化操做

不一樣與一般的ZStack組件,它雖然實現了Component接口。可是其start中的邏輯並不全面,初始化邏輯是基於spring bean的生命週期來作的。見ThreadFacade

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:tx="http://www.springframework.org/schema/tx" xmlns:zstack="http://zstack.org/schema/zstack"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
         http://www.springframework.org/schema/aop
         http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
         http://www.springframework.org/schema/tx 
          http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
          http://zstack.org/schema/zstack 
         http://zstack.org/schema/zstack/plugin.xsd"
    default-init-method="init" default-destroy-method="destroy">

    <bean id="ThreadFacade" class="org.zstack.core.thread.ThreadFacadeImpl">
        <property name="totalThreadNum" value="500" />
        <!-- don't declare Component extension, it's specially handled -->
    </bean>

    <bean id="ThreadAspectj" class="org.zstack.core.aspect.ThreadAspect" factory-method="aspectOf" />

</beans>

再讓回頭看看ThreadFacadeImpl的init與destory操做。

//init 操做
    public void init() {
          //根據全局配置讀入線程池最大線程數量
        totalThreadNum = ThreadGlobalProperty.MAX_THREAD_NUM;
        if (totalThreadNum < 10) {
            _logger.warn(String.format("ThreadFacade.maxThreadNum is configured to %s, which is too small for running zstack. Change it to 10", ThreadGlobalProperty.MAX_THREAD_NUM));
            totalThreadNum = 10;
        }
         // 構建一個支持延時任務的線程池
        _pool = new ScheduledThreadPoolExecutorExt(totalThreadNum, this, this);
        _logger.debug(String.format("create ThreadFacade with max thread number:%s", totalThreadNum));
        //構建一個DispatchQueue
        dpq = new DispatchQueueImpl();

        jmxf.registerBean("ThreadFacade", this);
    }
//destory
   public void destroy() {
        _pool.shutdownNow();
    }

看了這裏可能你們會有疑問,這種關閉方式未免關於暴力(執行任務的線程會所有被中斷)。在此以前,咱們曾提到過,它實現了Component接口。這個接口分別有一個startstop方法,使一個組件的生命週期可以方便的在ZStack中註冊相應的鉤子。

//stop 方法
    @Override
    public boolean stop() {
        _pool.shutdown();
        timerPool.stop();
        return true;
    }

線程工廠

ThreadFacadeImpl同時也實現了ThreadFactory,可讓線程在建立時作一些操做。

@Override
    public Thread newThread(Runnable arg0) {
        return new Thread(arg0, "zs-thread-" + String.valueOf(seqNum.getAndIncrement()));
    }

在這裏能夠看到ZStack爲每個新的線程賦予了一個名字。

線程池

ZStack對JDK中的線程池進行了必定的擴展,對一個任務執行先後都有相應的鉤子函數,同時也開放註冊鉤子。

package org.zstack.core.thread;

import org.apache.logging.log4j.ThreadContext;
import org.zstack.utils.logging.CLogger;
import org.zstack.utils.logging.CLoggerImpl;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;


public class ScheduledThreadPoolExecutorExt extends ScheduledThreadPoolExecutor {
    private static final CLogger _logger =CLoggerImpl.getLogger(ScheduledThreadPoolExecutorExt.class);
    
    List<ThreadAroundHook> _hooks = new ArrayList<ThreadAroundHook>(8);

    public ScheduledThreadPoolExecutorExt(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, threadFactory, handler);
        this.setMaximumPoolSize(corePoolSize);
    }
    
    public void registerHook(ThreadAroundHook hook) {
        synchronized (_hooks) {
            _hooks.add(hook);
        }
    }
    
    public void unregisterHook(ThreadAroundHook hook) {
        synchronized (_hooks) {
            _hooks.remove(hook);
        }
    }
    
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        ThreadContext.clearMap();
        ThreadContext.clearStack();

        ThreadAroundHook debugHook = null;
        List<ThreadAroundHook> tmpHooks;       
        synchronized (_hooks) {
            tmpHooks = new ArrayList<ThreadAroundHook>(_hooks);
        }
        
        for (ThreadAroundHook hook : tmpHooks) {
            debugHook = hook;
            try {
                hook.beforeExecute(t, r);
            } catch (Exception e) {
                _logger.warn("Unhandle exception happend during executing ThreadAroundHook: " + debugHook.getClass().getCanonicalName(), e);
            }
        }
    }
    
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        ThreadContext.clearMap();
        ThreadContext.clearStack();

        ThreadAroundHook debugHook = null;
        List<ThreadAroundHook> tmpHooks;
        synchronized (_hooks) {
            tmpHooks = new ArrayList<ThreadAroundHook>(_hooks);
        }
        
        for (ThreadAroundHook hook : tmpHooks) {
            debugHook = hook;
            try {
                hook.afterExecute(r, t);
            } catch (Exception e) {
                _logger.warn("Unhandle exception happend during executing ThreadAroundHook: " + debugHook.getClass().getCanonicalName(), e);
            }
        }
    }
}

另外,ScheduledThreadPoolExecutorExt是繼承自ScheduledThreadPoolExecutor。本質上是一個任務調度線程池,用的工做隊列也是一個延時工做隊列。

小結

本文分析了ZStack的久經生產考驗的核心組件——線程池。經過線程池,使並行編程變得再也不那麼複雜。

固然,其中也有一些能夠改進的地方:

  • 一些加鎖的地方(synchronized),能夠經過使用併發容器解決。這樣能夠有效提高吞吐量,節省由於競爭鎖而致使的開銷。
  • 在提交大量任務的狀況下,HashMap會由於擴容而致使性能耗損。能夠考慮換一種Map或在不一樣的策略下使HashMap的初始大小有個較爲合理的設置。
  • 隊列是無界的。在大量任務請求時,會對內存形成極大的負擔。
  • 任務隊列無超時邏輯判斷。ZStack中的調用絕大多數都是由MQ完成,每個msg有着對應的超時時間。可是每個任務卻沒有超時斷定,這意味着一個任務執行時間過長時,後面的任務有可能進入了超時狀態,而卻沒有挪出隊列,配合以前提到的無界隊列,就是一場潛在的災難。
相關文章
相關標籤/搜索