ZStack源碼剖析之模塊鑑賞——LongJob

本文首發於泊浮目的專欄: https://segmentfault.com/blog...

前言

在ZStack中,當用戶在UI上發起操做時,前端會調用後端的API對實際的資源發起操做請求。但在一個分佈式系統中,咱們不能假設網絡是可靠的(一樣要面對的還有單點故障等)——這每每致使API會超時。ZStack有默認的API超時機制,爲30mins。但從UI上看來,用戶的體驗不是很好,以下:
前端

若是API遇到什麼狀況而一直沒有響應,在這裏用戶也只能默默等到其超時。由於這個狀態下,API是交給一個線程在執行的,見ZStack源碼剖析之核心庫鑑賞——ThreadFacade中的分析
。最可怕的是,因爲隊列的存在,對該資源操做的API將所有處於隊列中而成爲等待狀態。java

解決方案

在ZStack 2.3版本開始引入了一個新的概念——LongJob。這基於ZStack的原有設計——FlowChain(我在個人博客中詳細分析過FlowChain,若是不懂的小夥伴能夠點這裏),依靠FlowChain,咱們把業務邏輯拆成一個個個Flow,並設置對應的RollBack。爲了不以後講起來有點迷,先解釋一下技術名詞。linux

LongJob的狀態是用於被APIQuery的,也提供了進度條。而且容許start、stop、cancel等行爲。spring

名詞

LongJob

長任務。以API可操做的概念具現。上面提到過,容許運行、暫停、取消等行爲。數據庫

LongJobInstance

長任務實例。每一個做業執行時,都會生成一個實例,實例會存放在LongJobVO這個數據庫表中。便於UI調用API查看各個LongJobInstance的狀態。segmentfault

Flow

最小的一個業務單元。LongJob的組成,前面說過,LongJob基於FlowChain。後端

LongJob Parameters

LongJob參數。用於提交LongJob的參數,不一樣的參數能夠區分不一樣的Job。api

數據結構

LongJobVO

@Entity
@Table
public class LongJobVO extends ResourceVO {
    @Column
    private String name;

    @Column
    private String description;

    @Column
    private String apiId;

    @Column
    private String jobName;

    @Column
    private String jobData;

    @Column
    private String jobResult;

    @Column
    @Enumerated(EnumType.STRING)
    private LongJobState state;

    @Column
    private String targetResourceUuid;

    @Column
    @ForeignKey(parentEntityClass = ManagementNodeVO.class, onDeleteAction = ForeignKey.ReferenceOption.SET_NULL)
    private String managementNodeUuid;

    @Column
    private Timestamp createDate;

    @Column
    private Timestamp lastOpDate;
//忽略get set方法
}

該數據結構描述了以下關鍵信息:網絡

  • targeResourceUuid - 用以描述 job 針對的資源,對於分類查找比較有用。經過 resourceUuid 能夠在 ResourceVO 裏找到類型。
  • apiId - 用以查詢該 job 在 TaskProgressVO 中的進度信息。
  • jobName - 執行該 job 的 class 名字。參見下面的 JobExecution (相似現有的 AbstractSchedulerJob)
  • jobData - 存放執行該 job 須要的額外參數信息。

LongJob

public interface LongJob {
    void start(LongJobVO job, Completion completion);
    void cancel(LongJobVO job, Completion completion);
}

全部LongJob都必須實現該接口,並實現start/cancel等方法。session

LongJobFor

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface LongJobFor {
    Class<?> value();
}

爲具體的LongJob增長該註解,表示該LongJob針對哪一個APIMessage。

好比爲BackupStorageMigrateImageJob增長註解:@LongJobFor(APIBackupStorageMigrateImageMsg.class)

LongJobData

interface LongJobData {
}

因爲LongJob要複用現有邏輯以及保證可維護性,這裏處理的代碼和原先邏輯爲同一處。handleApiMessage和handleLongJobMessage必需要將全部的參數抽出來再傳到共用的邏輯層。不只如此,以後定時任務也有可能作成LongJob,故此定義這個接口。

LongJobMessageData

public class LongJobMessageData implements LongJobData {
    protected final NeedReplyMessage needReplyMessage;
 
    public LongJobMessageData(NeedReplyMessage msg){
        this.needReplyMessage = msg;
    }
 
    public NeedReplyMessage getNeedReplyMessage() {
        return needReplyMessage;
    }
}

該接口實現了LongJobData(這裏LongJobData僅僅用於標識一個類型),用於完成目前的需求——部分LongJob Feature來自於APIMessage的改進。而InnerMessage和APIMessage都繼承於NeedReplyMessage,爲增強代碼可讀性,將公用數據結構抽取了出來,方便調用。

LongJobFactory

根據jobName獲取LongJob實例。

好比當jobName爲APIBackupStorageMigrateImageMsg時,獲取BackupStorageMigrateImageJob實例。

LongJobManager

用以處理 Job 相關的 API,好比 APICancelJobMsg,APIRestartJobMsg 等等。維護 jobUuid 和相應的 CancellableSharedFlowChain 之間的關係。

CancellableShareFlowChain

繼承 ShareFlowChain,實現 Cancellable。每一個 Job 底層邏輯都必須用 CancellableSharedFlowChain 實現。

詳解

LongJob相關的API


在圖中咱們能夠看到LongJob提供了幾個API,較爲重要的是QueryAPI——用戶可使用它來查詢LongJob的一個進度狀態。

從白話講起

LongJob則是基於FlowChain之上擴展的,首先,每一個LongJob調用與原有APIMessage行爲相同的內部Message。咱們以APIAddImageMsg爲例,看一下它的邏輯。

在這裏,咱們能夠看到Msg們將其的參數都Copy到了相應的LongJobData中,並進行傳參,進入了一個統一的入口。這樣便於邏輯的維護,免於和原有的API handle處分爲兩段邏輯。

再看調用實例

那麼是如何調用的呢?按照老規矩,咱們來看一個TestCase——AddImageLongJobCase

void testAddImage() {
        int oldSize = Q.New(ImageVO.class).list().size()
        int flag = 0
        myDescription = "my-test"

        env.afterSimulator(SftpBackupStorageConstant.DOWNLOAD_IMAGE_PATH) { Object response ->
            //DownloadImageMsg
            LongJobVO vo = Q.New(LongJobVO.class).eq(LongJobVO_.description, myDescription).find()
            assert vo.state == LongJobState.Running
            flag += 1
            return response
        }

        APIAddImageMsg msg = new APIAddImageMsg()
        msg.setName("TinyLinux")
        msg.setBackupStorageUuids(Collections.singletonList(bs.uuid))
        msg.setUrl("http://192.168.1.20/share/images/tinylinux.qcow2")
        msg.setFormat(ImageConstant.QCOW2_FORMAT_STRING)
        msg.setMediaType(ImageConstant.ImageMediaType.RootVolumeTemplate.toString())
        msg.setPlatform(ImagePlatform.Linux.toString())

        LongJobInventory jobInv = submitLongJob {
            sessionId = adminSession()
            jobName = "APIAddImageMsg"
            jobData = gson.toJson(msg)
            description = myDescription
        } as LongJobInventory

        assert jobInv.getJobName() == "APIAddImageMsg"
        assert jobInv.state == org.zstack.sdk.LongJobState.Running

        retryInSecs() {
            LongJobVO job = dbFindByUuid(jobInv.getUuid(), LongJobVO.class)
            assert job.state == LongJobState.Succeeded
        }

        int newSize = Q.New(ImageVO.class).count().intValue()
        assert newSize > oldSize
        assert 1 == flag
    }

能夠看到本質是將原來的APIMsg轉爲字符串做爲LongJob的Data傳入,調用起來很方便。

實現

再來看看它的實現,當APISubmitLongJobMsg被髮送出去後,handle的地方作了什麼呢?見LongJobManagerImpl

private void handle(APISubmitLongJobMsg msg) {
        // create LongJobVO
        LongJobVO vo = new LongJobVO();
        if (msg.getResourceUuid() != null) {
            vo.setUuid(msg.getResourceUuid());
        } else {
            vo.setUuid(Platform.getUuid());
        }
        if (msg.getName() != null) {
            vo.setName(msg.getName());
        } else {
            vo.setName(msg.getJobName());
        }
        vo.setDescription(msg.getDescription());
        vo.setApiId(msg.getId());
        vo.setJobName(msg.getJobName());
        vo.setJobData(msg.getJobData());
        vo.setState(LongJobState.Waiting);
        vo.setTargetResourceUuid(msg.getTargetResourceUuid());
        vo.setManagementNodeUuid(Platform.getManagementServerId());
        vo = dbf.persistAndRefresh(vo);
        logger.info(String.format("new longjob [uuid:%s, name:%s] has been created", vo.getUuid(), vo.getName()));
        tagMgr.createTagsFromAPICreateMessage(msg, vo.getUuid(), LongJobVO.class.getSimpleName());
        acntMgr.createAccountResourceRef(msg.getSession().getAccountUuid(), vo.getUuid(), LongJobVO.class);
        msg.setJobUuid(vo.getUuid());

        // wait in line
        thdf.chainSubmit(new ChainTask(msg) {
            @Override
            public String getSyncSignature() {
                return "longjob-" + msg.getJobUuid();
            }

            @Override
            public void run(SyncTaskChain chain) {
                APISubmitLongJobEvent evt = new APISubmitLongJobEvent(msg.getId());
                LongJobVO vo = dbf.findByUuid(msg.getJobUuid(), LongJobVO.class);
                vo.setState(LongJobState.Running);
                vo = dbf.updateAndRefresh(vo);
                // launch the long job right now
                ThreadContext.put(Constants.THREAD_CONTEXT_API, vo.getApiId());
                ThreadContext.put(Constants.THREAD_CONTEXT_TASK_NAME, vo.getJobName());
                LongJob job = longJobFactory.getLongJob(vo.getJobName());
                job.start(vo, new Completion(msg) {
                    LongJobVO vo = dbf.findByUuid(msg.getJobUuid(), LongJobVO.class);

                    @Override
                    public void success() {
                        vo.setState(LongJobState.Succeeded);
                        vo.setJobResult("Succeeded");
                        dbf.update(vo);
                        logger.info(String.format("successfully run longjob [uuid:%s, name:%s]", vo.getUuid(), vo.getName()));
                    }

                    @Override
                    public void fail(ErrorCode errorCode) {
                        vo.setState(LongJobState.Failed);
                        vo.setJobResult("Failed : " + errorCode.toString());
                        dbf.update(vo);
                        logger.info(String.format("failed to run longjob [uuid:%s, name:%s]", vo.getUuid(), vo.getName()));
                    }
                });


                evt.setInventory(LongJobInventory.valueOf(vo));
                logger.info(String.format("longjob [uuid:%s, name:%s] has been started", vo.getUuid(), vo.getName()));
                bus.publish(evt);

                chain.next();
            }

            @Override
            public String getName() {
                return getSyncSignature();
            }
        });
    }

這段邏輯大體爲:

  • 建立一個LongJob記錄,以及相關的SystemTag和帳戶資源管理引用
  • 提交至線程池。使用LongJobFactory獲取一個LongJob實例。並執行LongJob對應實現的start,在合適的時機進行狀態變化。

LongJobFactory

public class LongJobFactoryImpl implements LongJobFactory, Component {
    private static final CLogger logger = Utils.getLogger(LongJobFactoryImpl.class);
    /**
     * Key:LongJobName
     */
    private TreeMap<String, LongJob> allLongJob = new TreeMap<>();

    @Override
    public LongJob getLongJob(String jobName) {
        LongJob job = allLongJob.get(jobName);
        if (null == job) {
            throw new OperationFailureException(operr("%s has no corresponding longjob", jobName));
        }
        return job;
    }

    @Override
    public boolean start() {
        LongJob job = null;
        List<Class> longJobClasses = BeanUtils.scanClass("org.zstack", LongJobFor.class);
        for (Class it : longJobClasses) {
            LongJobFor at = (LongJobFor) it.getAnnotation(LongJobFor.class);
            try {
                job = (LongJob) it.newInstance();
            } catch (InstantiationException | IllegalAccessException e) {
                e.printStackTrace();
            }
            if (null == job) {
                logger.warn(String.format("[LongJob] class name [%s] but get LongJob instance is null ", at.getClass().getSimpleName()));
                continue;
            }
            logger.debug(String.format("[LongJob] collect class [%s]", job.getClass().getSimpleName()));
            allLongJob.put(at.value().getSimpleName(), job);
        }
        return true;
    }

    @Override
    public boolean stop() {
        allLongJob.clear();
        return true;
    }
}

該FactoryImpl繼承了Component接口。在ZStack Start的時候會利用反射收集帶有LongJobFor這個Annotation的Class。在原先的版本中則是每一次調用的時候利用反射去尋找,會形成一個沒必要要的開銷。故此這裏也是作了一個Cache般的改進,由於在Application起來後是不會動態的去添加一種LongJob的。

回來,仍是以AddImageLongJob爲例,咱們來看看start時會作什麼,見AddImageLongJob

package org.zstack.image;

import org.springframework.beans.factory.annotation.Autowire;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Configurable;
import org.zstack.core.Platform;
import org.zstack.core.cloudbus.CloudBus;
import org.zstack.core.cloudbus.CloudBusCallBack;
import org.zstack.core.db.DatabaseFacade;
import org.zstack.header.core.Completion;
import org.zstack.header.image.APIAddImageMsg;
import org.zstack.header.image.AddImageMsg;
import org.zstack.header.image.ImageConstant;
import org.zstack.header.longjob.LongJobFor;
import org.zstack.header.longjob.LongJobVO;
import org.zstack.header.message.MessageReply;
import org.zstack.longjob.LongJob;
import org.zstack.utils.gson.JSONObjectUtil;


/**
 * Created by on camile 2018/2/2.
 */
@LongJobFor(APIAddImageMsg.class)
@Configurable(preConstruction = true, autowire = Autowire.BY_TYPE)
public class AddImageLongJob implements LongJob {
    @Autowired
    protected CloudBus bus;
    @Autowired
    protected DatabaseFacade dbf;

    @Override
    public void start(LongJobVO job, Completion completion) {
        AddImageMsg msg = JSONObjectUtil.toObject(job.getJobData(), AddImageMsg.class);
        bus.makeLocalServiceId(msg, ImageConstant.SERVICE_ID);
        bus.send(msg, new CloudBusCallBack(null) {
            @Override
            public void run(MessageReply reply) {
                if (reply.isSuccess()) {
                    completion.success();
                } else {
                    completion.fail(reply.getError());
                }
            }
        });
    }

    @Override
    public void cancel(LongJobVO job, Completion completion) {
        // TODO
        completion.fail(Platform.operr("not supported"));
    }
}

這裏則是發送了一個inner msg出去,咱們看一下handle處的邏輯:

private void handle(AddImageMsg msg) {
        AddImageReply evt = new AddImageReply();
        AddImageLongJobData data = new AddImageLongJobData(msg);
        BeanUtils.copyProperties(msg, data);
        handleAddImageMsg(data, evt);
    }

能夠看到這裏將msg的參數所有取了出來,放入一個公共結構裏,並傳入了真正的handle處。

APIAddImageMsg也是這麼作的:

private void handle(final APIAddImageMsg msg) {
        APIAddImageEvent evt = new APIAddImageEvent(msg.getId());
        AddImageLongJobData data = new AddImageLongJobData(msg);
        BeanUtils.copyProperties(msg, data);
        handleAddImageMsg(data, evt);
    }

在前面提到過,爲了更好的可維護性,這兩個Msg共用了一段邏輯。

複用Intercepter

瞭解ZStack的同窗都知道,任何一條APIMsg發送的時候會進入Intercepter。那麼LongJob的submit實際上是把APIMsg做爲參數傳入了,那麼如何複用以前的Intercepter呢?咱們來看看LongJobApiInterceptor

public class LongJobApiInterceptor implements ApiMessageInterceptor, Component {
    private static final CLogger logger = Utils.getLogger(LongJobApiInterceptor.class);

    /**
     * Key:LongJobName
     */
    private TreeMap<String, Class<APIMessage>> apiMsgOfLongJob = new TreeMap<>();

    @Override
    public APIMessage intercept(APIMessage msg) throws ApiMessageInterceptionException {
        if (msg instanceof APISubmitLongJobMsg) {
            validate((APISubmitLongJobMsg) msg);
        } else if (msg instanceof APICancelLongJobMsg) {
            validate((APICancelLongJobMsg) msg);
        } else if (msg instanceof APIDeleteLongJobMsg) {
            validate((APIDeleteLongJobMsg) msg);
        }

        return msg;
    }

    private void validate(APISubmitLongJobMsg msg) {
        Class<APIMessage> apiClass = apiMsgOfLongJob.get(msg.getJobName());
        if (null == apiClass) {
            throw new ApiMessageInterceptionException(argerr("%s is not an API", msg.getJobName()));
        }
        // validate msg.jobData
        Map<String, Object> config = new HashMap<>();
        List<String> serviceConfigFolders = new ArrayList<>();
        serviceConfigFolders.add("serviceConfig");
        config.put("serviceConfigFolders", serviceConfigFolders);
        ApiMessageProcessor processor = new ApiMessageProcessorImpl(config);
        APIMessage jobMsg = JSONObjectUtil.toObject(msg.getJobData(), apiClass);
        jobMsg.setSession(msg.getSession());
        jobMsg = processor.process(jobMsg);                     // may throw ApiMessageInterceptionException
        msg.setJobData(JSONObjectUtil.toJsonString(jobMsg));    // msg may be changed during validation
    }

    private void validate(APICancelLongJobMsg msg) {
        LongJobState state = Q.New(LongJobVO.class)
                .select(LongJobVO_.state)
                .eq(LongJobVO_.uuid, msg.getUuid())
                .findValue();

        if (state == LongJobState.Succeeded) {
            throw new ApiMessageInterceptionException(argerr("cannot cancel longjob that is succeeded"));
        }
        if (state == LongJobState.Canceled) {
            throw new ApiMessageInterceptionException(argerr("cannot cancel longjob that is already canceled"));
        }
        if (state == LongJobState.Failed) {
            throw new ApiMessageInterceptionException(argerr("cannot cancel longjob that is failed"));
        }
    }

    private void validate(APIDeleteLongJobMsg msg) {
        LongJobState state = Q.New(LongJobVO.class)
                .select(LongJobVO_.state)
                .eq(LongJobVO_.uuid, msg.getUuid())
                .findValue();

        if (state != LongJobState.Succeeded && state != LongJobState.Canceled && state != LongJobState.Failed) {
            throw new ApiMessageInterceptionException(argerr("delete longjob only when it's succeeded, canceled, or failed"));
        }
    }

    @Override
    public boolean start() {
        Class<APIMessage> apiClass = null;
        List<Class> longJobClasses = BeanUtils.scanClass("org.zstack", LongJobFor.class);
        for (Class it : longJobClasses) {
            LongJobFor at = (LongJobFor) it.getAnnotation(LongJobFor.class);
            try {
                apiClass = (Class<APIMessage>) Class.forName(at.value().getName());
            } catch (ClassNotFoundException | ClassCastException e) {
                //ApiMessage and LongJob are not one by one corresponding ,so we skip it
                e.printStackTrace();
                continue;
            }
            logger.debug(String.format("[LongJob] collect api class [%s]", apiClass.getSimpleName()));
            apiMsgOfLongJob.put(at.value().getSimpleName(), apiClass);
        }
        return true;
    }

    @Override
    public boolean stop() {
        apiMsgOfLongJob.clear();
        return true;
    }
}

邏輯很簡單,經過LongJob的name找出了對應的APIMsg,並將APIMsg發向了對應Intercepter。

在查找APIMsg這一步也是採用了Cache的思想,在Start的時候就進行了收集。

展望

在前面的定義中,咱們提到了LongJob是容許暫停和取消行爲的。這在接口中也能夠看到相似的期許:

public interface LongJob {
    void start(LongJobVO job, Completion completion);
    void cancel(LongJobVO job, Completion completion);
}

那麼該如何實現它呢?在這裏咱們僅僅作一個展望,到時仍是以釋放出來的代碼爲準。

Stop

首先,在CancellableSharedFlowChain 定義一個必須被實現的接口。如`stop
Condition`,返回一個boolean。在每一個Flow執行前會判斷該boolean是否爲true,若是爲true。則保存context到db,並中止執行。

Cancel

一樣,也是在CancellableSharedFlowChain 定義一個必須被實現的接口。如cancelCondition,返回一個boolean。在每一個Flow執行前會判斷該boolean是否爲true,若是爲true。則中止執行並觸發以前的全部rollback。

Rollback的特殊技巧

那麼可能會有同窗問了,在這樣的設計下,若是發生瞭如斷電的狀況,必然致使沒法Rollback。這種狀況若是發生在一個數據中心,能夠說是災難也不爲過。可是咱們能夠考慮一下如何實現更具備原子性Rollback。

淺談數據庫事務的實現

數據庫的事務主要是經過Undo日誌來實現。在一條記錄更新前(更新到硬盤),必定要把相關的Undo日誌寫入硬盤;而「提交事務」這種記錄,要在記錄更新完畢後再寫入硬盤。所謂的Undo日誌,就是沒有操做前的日誌。若是同窗們聽完仍是以爲有點迷,能夠看這篇文章:

能夠考慮的方案

在瞭解了數據庫事務的實現後,咱們能夠大體設計出一種方案,用於保證斷電後Rollback的完整性:

  1. 在一個FlowChain執行前,在DB裏存入一個相似Start FlowChain的標記
  2. 定義每個Flow的Number號,如第一個Flow爲1。在Flow執行前,記錄當前Flow Number到數據庫,寫Flow1開始執行。Flow執行完以前,寫Flow1執行完畢。
  3. Flow執行完了,在DB裏存入一個相似Done FlowChian的標記。這裏咱們把Done的那部分也看作一個Flow。

那麼在任何以步驟出問題的時候,基本均可以完成一個Rollback。咱們來看一看:

還沒執行Flow的時候斷電

DB中的記錄爲Start FlowChain,那麼是不須要Rollback的。

執行一個Flow的時候斷電

DB中的最新記錄爲Flow1開始執行的話,不須要Rollback。這種分佈式場景下若是須要作到強一致性,只能對每行代碼作相似Undo日誌的記錄了。

可是若是記錄爲Flow1執行完畢,開始Rollback。

以後執行幾個Flow都是參考這裏的一個作法。

小結

在本文中,筆者和你們瞭解了ZStack在2.3引入的新模塊——LongJob。並對其的出現的背景、解決的痛點和實現進行了分析,最後展望了一下接下來版本中可能會加強的功能。

相關文章
相關標籤/搜索