ZStack源碼剖析之核心庫鑑賞——FlowChain

本文首發於泊浮目的專欄: https://segmentfault.com/blog...

前言

在ZStack(或者說產品化的IaaS軟件)中的任務一般有很長的執行路徑,錯誤可能發生在路徑的任意一處。爲了保證系統的正確性,需提供一種較爲完善的回滾機制——在ZStack中,經過一個工做流引擎,ZStack的每個步驟都被包裹在獨立的工做流中,能夠在出錯的時候回滾。此外,經過在配置文件中組裝工做流的方式,關鍵的執行路徑能夠被配置,這使得架構的耦合度進一步下降。java

系統解耦合的手段除了以前文章所提到的分層、分割、分佈等,還有一個重要手段是異步,業務之間的消息傳遞不是同步調用,而是將一個業務操做分紅多個階段,每一個階段之間經過共享數據的方式異步執行進行協做。

這便是一種在業務設計原則中——流程可定義原則的具象化。接觸過金融行業的同窗確定知道,不一樣的保險理賠流程是不同的。而承保流程和理賠流程是分離的,在須要時進行關聯,從而能夠複用一些理賠流程,並提供一些個性化理賠流程。git

演示代碼

就以建立VM爲例,在ZStack中大體能夠分如下幾個步驟:github

<bean id="VmInstanceManager" class="org.zstack.compute.vm.VmInstanceManagerImpl">
        <property name="createVmWorkFlowElements">
            <list>
                <value>org.zstack.compute.vm.VmImageSelectBackupStorageFlow</value>
                <value>org.zstack.compute.vm.VmAllocateHostFlow</value>
                <value>org.zstack.compute.vm.VmAllocatePrimaryStorageFlow</value>
                <value>org.zstack.compute.vm.VmAllocateVolumeFlow</value>
                <value>org.zstack.compute.vm.VmAllocateNicFlow</value>
                <value>org.zstack.compute.vm.VmInstantiateResourcePreFlow</value>
                <value>org.zstack.compute.vm.VmCreateOnHypervisorFlow</value>
                <value>org.zstack.compute.vm.VmInstantiateResourcePostFlow</value>
            </list>
        </property>
<!--  還有不少,介於篇幅再也不列出 -->

能夠說是代碼即文檔了。在這裏,ZStack顯式聲明這些Flow在Spring XML中,這些屬性將會被注入到createVmWorkFlowElements中。每個Flow都被拆成了一個個較小的單元,好處不只是將業務操做分紅了多個階段易於回滾,仍是能夠有效複用這些Flow。這也是編程思想中「組合」的體現。spring

如何使用

除了這種配置型聲明,還能夠在代碼中靈活的使用這些FlowChain。在這裏,咱們將以Case來講明這些FlowChain的用法,避免對ZStack業務邏輯不熟悉的讀者看的一頭霧水。編程

一共有兩種可用的FlowChain:segmentfault

  • SimpleFlowChain
  • ShareFlowChain

SimpleFlowChain

咱們先來看一個Casepromise

@Test
    public void test() {
        FlowChain chain = FlowChainBuilder.newShareFlowChain();

        chain.then(new ShareFlow() {
            int a;

            @Override
            public void setup() {
                flow(new NoRollbackFlow() {
                    @Override
                    public void run(FlowTrigger trigger, Map data) {
                        a = 1;
                        increase();
                        trigger.next();
                    }
                });

                flow(new NoRollbackFlow() {
                    @Override
                    public void run(FlowTrigger trigger, Map data) {
                        a = 2;
                        increase();
                        trigger.next();
                    }
                });
            }
        }).done(new FlowDoneHandler(null) {
            @Override
            public void handle(Map data) {
                success = true;
            }
        }).start();

        Assert.assertTrue(success);
        expect(2);
    }

咱們能夠看到,這就是一個工做流。完成一個工做流的時候(回調觸發時)執行下一個工做流——由trigger.next觸發。不只如此,還能夠添加Rollback屬性架構

@Test
    public void test() throws WorkFlowException {
        final int[] count = {0};

        new SimpleFlowChain()
                .then(new Flow() {
                    @Override
                    public void run(FlowTrigger chain, Map data) {
                        count[0]++;
                        chain.next();
                    }

                    @Override
                    public void rollback(FlowRollback chain, Map data) {
                        count[0]--;
                        chain.rollback();
                    }
                })
                .then(new Flow() {
                    @Override
                    public void run(FlowTrigger chain, Map data) {
                        count[0]++;
                        chain.next();
                    }

                    @Override
                    public void rollback(FlowRollback chain, Map data) {
                        count[0]--;
                        chain.rollback();
                    }
                })
                .then(new Flow() {
                    @Override
                    public void run(FlowTrigger chain, Map data) {
                        chain.fail(null);
                    }

                    @Override
                    public void rollback(FlowRollback chain, Map data) {
                        count[0]--;
                        chain.rollback();
                    }
                })
                .start();

        Assert.assertEquals(-1, count[0]);
    }

rollback由FlowTrigger的fail觸發。這樣咱們能夠保證在發生一些錯誤的時候及時回滾,防止咱們的系統處於一個有髒數據的中間狀態。同時,Map也能夠用來在Flow之間傳遞上下文。異步

ShareFlowChain

public class TestShareFlow {
    int[] count = {0};
    boolean success;

    private void increase() {
        count[0]++;
    }

    private void decrease() {
        count[0]--;
    }

    private void expect(int ret) {
        Assert.assertEquals(count[0], ret);
    }

    @Test
    public void test() {
        FlowChain chain = FlowChainBuilder.newShareFlowChain();

        chain.then(new ShareFlow() {
            int a;

            @Override
            public void setup() {
                flow(new NoRollbackFlow() {
                    @Override
                    public void run(FlowTrigger trigger, Map data) {
                        a = 1;
                        increase();
                        trigger.next();
                    }
                });

                flow(new NoRollbackFlow() {
                    @Override
                    public void run(FlowTrigger trigger, Map data) {
                        a = 2;
                        increase();
                        trigger.next();
                    }
                });
            }
        }).done(new FlowDoneHandler(null) {
            @Override
            public void handle(Map data) {
                success = true;
            }
        }).start();

        Assert.assertTrue(success);
        expect(2);
    }


    @Before
    public void setUp() throws Exception {
        new BeanConstructor().build();
    }
}

比起SimpleFlowChain,ShareFlowChain則是一個Inner class,在相同的做用域裏,傳遞數據變得更加的方便了。ide

它的實現

在ZStack中,FlowChain做爲核心庫,其實現也是很是的簡單(能夠直接參考SimpleFlowChainShareFlowChain),本質就是將任務放入List中,由內部方法進行迭代,在此基礎上作了一系列操做。下面將開始分析它的源碼。

從接口提及

public interface FlowChain {
    List<Flow> getFlows();

    FlowChain insert(Flow flow);

    FlowChain insert(int pos, Flow flow);

    FlowChain setFlowMarshaller(FlowMarshaller marshaller);

    FlowChain then(Flow flow);

    FlowChain done(FlowDoneHandler handler);

    FlowChain error(FlowErrorHandler handler);

    FlowChain Finally(FlowFinallyHandler handler);

    FlowChain setData(Map data);

    FlowChain putData(Map.Entry... es);

    FlowChain setName(String name);

    void setProcessors(List<FlowChainProcessor> processors);

    Map getData();

    void start();

    FlowChain noRollback(boolean no);

    FlowChain allowEmptyFlow();
}

接口的名字很是的易懂,那麼在這裏就很少做解釋了。FlowChain僅僅定義了一個Flow最小應有的行爲。

//定義了Flow的回滾操做接口
public interface FlowRollback extends AsyncBackup {
    //回滾操做
    void rollback();
    //設置跳過回滾操做
    void skipRestRollbacks();
}
//定義了觸發器的行爲接口
public interface FlowTrigger extends AsyncBackup {
    //觸發失敗,調用errorHandle
    void fail(ErrorCode errorCode);
    //觸發下一個flow
    void next();
    //setError後,在下次調用next的時纔會調用errorHandle
    void setError(ErrorCode error);
}

源碼解析

Flow

public interface Flow {
    void run(FlowTrigger trigger, Map data);

    void rollback(FlowRollback trigger, Map data);
}

Flow的定義其實很是的簡單——一組方法。執行和對應的回滾,通常在ZStack中都以匿名內部類的方式傳入。

Chain的用法

在以前的SimpleFlowChain的case中。咱們能夠看到一系列的鏈式調用,大體以下:

new SimpleFlowChain().then(new flow()).then(new flow()).then(new flow()).start();

then本質是往List<flow> flows裏添加一個flow。

public SimpleFlowChain then(Flow flow) {
        flows.add(flow);
        return this;
    }

再來看看start

@Override
    public void start() {
        // 檢測flow中是否設置了processors。通常用來打trace
        if (processors != null) {
            for (FlowChainProcessor p : processors) {
                p.processFlowChain(this);
            }
        }
        //若是flows爲空可是以前在設置中容許爲空,那麼就直接直接done部分的邏輯。否則就報錯
        if (flows.isEmpty() && allowEmptyFlow) {
            callDoneHandler();
            return;
        }

        if (flows.isEmpty()) {
            throw new CloudRuntimeException("you must call then() to add flow before calling start() or allowEmptyFlow() to run empty flow chain on purpose");
        }
        //每一個flow必須有一個map,用來傳遞上下文
        if (data == null) {
            data = new HashMap<String, Object>();
        }
        //標記爲已經開始
        isStart = true;
        //若是沒有名字的話給flow 取一個名字,由於頗有多是匿名使用的flow
        if (name == null) {
            name = "anonymous-chain";
        }

        logger.debug(String.format("[FlowChain(%s): %s] starts", id, name));
        //打印trace,方便調試
        if (logger.isTraceEnabled()) {
            List<String> names = CollectionUtils.transformToList(flows, new Function<String, Flow>() {
                @Override
                public String call(Flow arg) {
                    return String.format("%s[%s]", arg.getClass(), getFlowName(arg));
                }
            });
            logger.trace(String.format("execution path:\n%s", StringUtils.join(names, " -->\n")));
        }
        //生成一個迭代器
        it = flows.iterator();
        //從it中獲取一個不須要跳過的flow開始執行。若是沒有獲取到,就執行done邏輯
        Flow flow = getFirstNotSkippedFlow();
        if (flow == null) {
            // all flows are skipped
            callDoneHandler();
        } else {
            runFlow(flow);
        }
    }

再來看一下runFlow中的代碼

private void runFlow(Flow flow) {
        try {
            //看報錯信息就能夠猜到在作什麼防護措施了:若是一個transaction在一個flow中沒有被關閉而跳到下一個flow時,會拋出異常。這個防護機制來自於一個實習生寫的bug,當時被排查出來的時候花了很是大的力氣——現象很是的詭異。因此如今被寫在了這裏。
            if (TransactionSynchronizationManager.isActualTransactionActive()) {
                String flowName = null;
                String flowClassName = null;
                if (currentFlow != null) {
                    flowName = getFlowName(currentFlow);
                    flowClassName = currentFlow.getClass().getName();
                }

                throw new CloudRuntimeException(String.format("flow[%s:%s] opened a transaction but forgot closing it", flowClassName, flowName));
            }
            //toRun就是一個當前要run的flow
            Flow toRun = null;
            if (flowMarshaller != null) {
            //flowMarshaller 其實是一個很是噁心的玩意兒。尤爲在一些配置好掉的xml flow忽然由於一些條件而改變接下來執行的flow使人很無語...可是也提供了一些靈活性。
                toRun = flowMarshaller.marshalTheNextFlow(currentFlow == null ? null : currentFlow.getClass().getName(),
                        flow.getClass().getName(), this, data);
                if (toRun != null) {
                    logger.debug(String.format("[FlowChain(%s): %s] FlowMarshaller[%s] replaces the next flow[%s] to the flow[%s]",
                            id, name, flowMarshaller.getClass(), flow.getClass(), toRun.getClass()));
                }
            }
       
            if (toRun == null) {
                toRun = flow;
            }

            if (CoreGlobalProperty.PROFILER_WORKFLOW) {
                //對flow的監視。好比flow的執行時間等
                stopWatch.start(toRun);
            }

            currentFlow = toRun;

            String flowName = getFlowName(currentFlow);
            String info = String.format("[FlowChain(%s): %s] start executing flow[%s]", id, name, flowName);
            logger.debug(info);
            //在flow中還容許定義afterDone afterError afterFinal的行爲。稍後將會介紹
            collectAfterRunnable(toRun);
            //終於到了run,這裏就是調用者傳入的行爲來決定run中的邏輯
            toRun.run(this, data);
             //fail的邏輯稍後解析
        } catch (OperationFailureException oe) {
            String errInfo = oe.getErrorCode() != null ? oe.getErrorCode().toString() : "";
            logger.warn(errInfo, oe);
            fail(oe.getErrorCode());
        } catch (FlowException fe) {
            String errInfo = fe.getErrorCode() != null ? fe.getErrorCode().toString() : "";
            logger.warn(errInfo, fe);
            fail(fe.getErrorCode());
        } catch (Throwable t) {
            logger.warn(String.format("[FlowChain(%s): %s] unhandled exception when executing flow[%s], start to rollback",
                    id, name, flow.getClass().getName()), t);
            fail(errf.throwableToInternalError(t));
        }
    }

fail

@Override
    public void fail(ErrorCode errorCode) {
        isFailCalled = true;
        setErrorCode(errorCode);
        //放入Stack中,以後Rollback會根據Stack中的flow順序來
        rollBackFlows.push(currentFlow);
        //rollback會對this.rollBackFlows中flow按照順序調用rollback
        rollback();
    }

FlowTrigger

//定義了觸發器的行爲接口
public interface FlowTrigger extends AsyncBackup {
    //觸發失敗,調用errorHandle
    void fail(ErrorCode errorCode);
    //觸發下一個flow
    void next();
    //setError後,在下次調用next的時纔會調用errorHandle
    void setError(ErrorCode error);
}

以前已經看過fail的代碼。接下來來看看nextsetError

@Override
    public void next() {
        //若是flow沒有run起來的狀況下,是不能調用next的
        if (!isStart) {
            throw new CloudRuntimeException(
                    String.format("[FlowChain(%s): %s] you must call start() first, and only call next() in Flow.run()",
                            id, name));
        }
        //當rollback開始的時候也不容許next
        if (isRollbackStart) {
            throw new CloudRuntimeException(
                    String.format("[FlowChain(%s): %s] rollback has started, you can't call next()", id, name));
        }
        //將當前flow的push進rollback用的stack
        rollBackFlows.push(currentFlow);

        logger.debug(String.format("[FlowChain(%s): %s] successfully executed flow[%s]", id, name, getFlowName(currentFlow)));
        //獲取下一個flow。在這裏纔是真正意義上的next
        Flow flow = getFirstNotSkippedFlow();
        if (flow == null) {
            // no flows, or all flows are skipped
            if (errorCode == null) {
                callDoneHandler();
            } else {
                callErrorHandler(false);
            }
        } else {
            runFlow(flow);
        }
    }

能夠看一下getFirstNotSkippedFlow,本質上是利用了迭代器的特性。

private Flow getFirstNotSkippedFlow() {
        Flow flow = null;
        while (it.hasNext()) {
            flow = it.next();
            if (!isSkipFlow(flow)) {
                break;
            }
        }

        return flow;
    }

接下來是setError

@Override
    public void setError(ErrorCode error) {
        setErrorCode(error);
    }

//往下看
    private void setErrorCode(ErrorCode errorCode) {
        this.errorCode = errorCode;
    }

根據以前的next邏輯:

if (flow == null) {
            // no flows, or all flows are skipped
            if (errorCode == null) {
                callDoneHandler();
            } else {
                callErrorHandler(false);
            }
        } else {
            runFlow(flow);
        }

咱們能夠大體猜測到,若是在next的時候當前error不爲空,則調用錯誤handle。這樣在setError後還能夠作一些事情。

不管是調用errorHandle仍是doneHandle,都會調用finalHandle。finalHandle也容許用戶定義這部分的邏輯,使flow更加的靈活。

更好的選擇

因爲該庫是爲ZStack定製而生,故此有一些防護性判斷,源碼顯得略爲verbose。若是有同窗對此感興趣,想將其應用到本身的系統中,筆者推薦使用:jdeferred

Java Deferred/Promise library similar to JQuery

因爲JavaScript 中的代碼都是異步調用的。簡單說,它的思想是,每個異步任務返回一個Promise對象,該對象有一個then方法,容許指定回調函數。

在這裏列出幾個較爲簡單的示範,或者有興趣的讀者也能夠參考這裏

import org.jdeferred.DeferredManager;
import org.jdeferred.Promise;
import org.jdeferred.impl.DefaultDeferredManager;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

import java.util.concurrent.TimeUnit;


public class deferSimpleTest {

    private static int var = 0;
    final DeferredManager dm = new DefaultDeferredManager();

    @After
    public void cleanUp() {
        var = 0;
    }


    @Test
    public void test() {
        Promise p1 = dm.when(() -> {
            var += 1;
        }).then(result -> {
            var += 1;
        });

        Promise p2 = dm.when(() -> {
            var += 1;
        }).then(result -> {
            var += 1;
        });

        dm.when(p1, p2).done(Void -> var += 1);
        Assert.assertEquals(5, var);
    }

    @Test
    public void test2() {
        final DeferredManager dm = new DefaultDeferredManager();

        Promise promise = dm.when(() -> {
                var += 1;
            }).then(result -> {
                var += 1;
            });

        dm.when(promise).done(Void -> var += 1);
        Assert.assertEquals(3, var);
    }

    @Test
    public void testBadCallback() {
        Promise promise = dm.when(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        dm.when(promise).done(Void -> {
                    var += 1;
                    throw new RuntimeException("this exception is expected");
                }
        ).fail(Void -> {
            System.out.print("fail!");
            var -= 1;
        });
        Assert.assertEquals(0, var);

    }
}

若是你在使用Java8,那麼也能夠經過CompletableFuture來獲得「相似」的支持。

相關文章
相關標籤/搜索