深刻淺出Zookeeper(六):客戶端的請求在服務器中經歷了什麼

本文首發於 泊浮目的簡書: https://www.jianshu.com/u/204...
版本 日期 備註
1.0 2020.5.23 文章首發

1. 前言

當咱們向zk發出一個數據更新請求時,這個請求會通過怎樣的處理流程呢?zk又是使用了什麼共識算法來保證一致性呢?帶着這些問題,咱們進入今天的正文。java

2. 設計模式:責任鏈模式(Chain of Responsibility)

在分析源碼以前,必須先和你們簡單的科普一下責任鏈模式,由於這和本文的內容密切相關。簡單的說:責任鏈模式將多個對象組成一條指責鏈,而後按照它們在職責鏈的順序一個個地找出到底誰來負責處理。node

那它的好處是什麼呢?仔細想一想,該設計模式像極面向對象版本的if...else if...else(咱們都知道if...else if...else屬於面向過程),但因爲面向對象的特性,會比面向過程更容易複用。算法

3. 請求邏輯追蹤

咱們先從ZooKeeperServer這個類入手,查看其實現類。咱們須要關心的有(常見的zk服務器角色以下):sql

  • LeaderZooKeeperServer
  • FollowerZooKeeperServer
  • ObserverZooKeeperServer

3.1 LeaderZooKeeperServer

@Override
    protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
        commitProcessor = new CommitProcessor(toBeAppliedProcessor,
                Long.toString(getServerId()), false,
                getZooKeeperServerListener());
        commitProcessor.start();
        ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
                commitProcessor);
        proposalProcessor.initialize();
        prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
        prepRequestProcessor.start();
        firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);

        setupContainerManager();
    }

整理一下順序:數據庫

  1. LeaderRequestProcessor
  2. PrepRequestProcessor
  3. ProposalRequestProcessor

接下來分兩個分支:apache

  • 事務型請求會額外走這條鏈:SyncRequestProcessor -> AckRequestProcessor
  • CommitProcessor -> ToBeAppliedRequestProcessor ->FinalRequestProcessor

3.1.1 LeaderRequestProcessor

@Override
    public void processRequest(Request request)
            throws RequestProcessorException {
        // Check if this is a local session and we are trying to create
        // an ephemeral node, in which case we upgrade the session
        Request upgradeRequest = null;
        try {
            upgradeRequest = lzks.checkUpgradeSession(request);
        } catch (KeeperException ke) {
            if (request.getHdr() != null) {
                LOG.debug("Updating header");
                request.getHdr().setType(OpCode.error);
                request.setTxn(new ErrorTxn(ke.code().intValue()));
            }
            request.setException(ke);
            LOG.info("Error creating upgrade request " + ke.getMessage());
        } catch (IOException ie) {
            LOG.error("Unexpected error in upgrade", ie);
        }
        if (upgradeRequest != null) {
            nextProcessor.processRequest(upgradeRequest);
        }

        nextProcessor.processRequest(request);
    }

這段邏輯很清楚。因須要檢查會話是否過時,去建立一個臨時節點。若是失敗那麼就拋出異常。設計模式

3.1.2 PrepRequestProcessor

該類有1000多行代碼,故此會挑出較爲典型的代碼進行剖析。在此以前,咱們先看註釋:服務器

This request processor is generally at the start of a RequestProcessor
change. It sets up any transactions associated with requests that change the
state of the system. It counts on ZooKeeperServer to update
outstandingRequests, so that it can take into account transactions that are
in the queue to be applied when generating a transaction.

簡單來講,它通常位於請求處理鏈的頭部,它會設置事務型請求(改變系統狀態的請求)。session

OpCode.create2

對於建立型請求邏輯大體爲:併發

case OpCode.create2:
                CreateRequest create2Request = new CreateRequest();
                pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);
                break;

跳往pRequest2Txn

protected void pRequest2Txn(int type, long zxid, Request request,
                                Record record, boolean deserialize)
        throws KeeperException, IOException, RequestProcessorException
    {
        request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
                Time.currentWallTime(), type));

        switch (type) {
            case OpCode.create:
            case OpCode.create2:
            case OpCode.createTTL:
            case OpCode.createContainer: {
                pRequest2TxnCreate(type, request, record, deserialize);
                break;
            }
//....多餘代碼再也不展現

跳往pRequest2TxnCreate

private void pRequest2TxnCreate(int type, Request request, Record record, boolean deserialize) throws IOException, KeeperException {
        if (deserialize) {
            ByteBufferInputStream.byteBuffer2Record(request.request, record);
        }

        int flags;
        String path;
        List<ACL> acl;
        byte[] data;
        long ttl;
        if (type == OpCode.createTTL) {
            CreateTTLRequest createTtlRequest = (CreateTTLRequest)record;
            flags = createTtlRequest.getFlags();
            path = createTtlRequest.getPath();
            acl = createTtlRequest.getAcl();
            data = createTtlRequest.getData();
            ttl = createTtlRequest.getTtl();
        } else {
            CreateRequest createRequest = (CreateRequest)record;
            flags = createRequest.getFlags();
            path = createRequest.getPath();
            acl = createRequest.getAcl();
            data = createRequest.getData();
            ttl = -1;
        }
        CreateMode createMode = CreateMode.fromFlag(flags);
        validateCreateRequest(path, createMode, request, ttl);
        String parentPath = validatePathForCreate(path, request.sessionId);

        List<ACL> listACL = fixupACL(path, request.authInfo, acl);
        ChangeRecord parentRecord = getRecordForPath(parentPath);

        checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo);
        int parentCVersion = parentRecord.stat.getCversion();
        if (createMode.isSequential()) {
            path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
        }
        validatePath(path, request.sessionId);
        try {
            if (getRecordForPath(path) != null) {
                throw new KeeperException.NodeExistsException(path);
            }
        } catch (KeeperException.NoNodeException e) {
            // ignore this one
        }
        boolean ephemeralParent = EphemeralType.get(parentRecord.stat.getEphemeralOwner()) == EphemeralType.NORMAL;
        if (ephemeralParent) {
            throw new KeeperException.NoChildrenForEphemeralsException(path);
        }
        int newCversion = parentRecord.stat.getCversion()+1;
        if (type == OpCode.createContainer) {
            request.setTxn(new CreateContainerTxn(path, data, listACL, newCversion));
        } else if (type == OpCode.createTTL) {
            request.setTxn(new CreateTTLTxn(path, data, listACL, newCversion, ttl));
        } else {
            request.setTxn(new CreateTxn(path, data, listACL, createMode.isEphemeral(),
                    newCversion));
        }
        StatPersisted s = new StatPersisted();
        if (createMode.isEphemeral()) {
            s.setEphemeralOwner(request.sessionId);
        }
        parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
        parentRecord.childCount++;
        parentRecord.stat.setCversion(newCversion);
        addChangeRecord(parentRecord);
        addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL));
    }

大體能夠總結下邏輯:

  1. 組裝請求
  2. 校驗請求是否合理:未定義的請求、參數不合理
  3. 檢查上級路徑是否存在
  4. 檢查ACL
  5. 檢查路徑是否合法
  6. 將請求裝入outstandingChanges隊列
  7. 發送至下一個Processor

OpCode.multi

事務型請求:

case OpCode.multi:
                MultiTransactionRecord multiRequest = new MultiTransactionRecord();
                try {
                    ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest);
                } catch(IOException e) {
                    request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(),
                            Time.currentWallTime(), OpCode.multi));
                    throw e;
                }
                List<Txn> txns = new ArrayList<Txn>();
                //Each op in a multi-op must have the same zxid!
                long zxid = zks.getNextZxid();
                KeeperException ke = null;

                //Store off current pending change records in case we need to rollback
                Map<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest);

                for(Op op: multiRequest) {
                    Record subrequest = op.toRequestRecord();
                    int type;
                    Record txn;

                    /* If we've already failed one of the ops, don't bother
                     * trying the rest as we know it's going to fail and it
                     * would be confusing in the logfiles.
                     */
                    if (ke != null) {
                        type = OpCode.error;
                        txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());
                    }

                    /* Prep the request and convert to a Txn */
                    else {
                        try {
                            pRequest2Txn(op.getType(), zxid, request, subrequest, false);
                            type = request.getHdr().getType();
                            txn = request.getTxn();
                        } catch (KeeperException e) {
                            ke = e;
                            type = OpCode.error;
                            txn = new ErrorTxn(e.code().intValue());

                            if (e.code().intValue() > Code.APIERROR.intValue()) {
                                LOG.info("Got user-level KeeperException when processing {} aborting" +
                                        " remaining multi ops. Error Path:{} Error:{}",
                                        request.toString(), e.getPath(), e.getMessage());
                            }

                            request.setException(e);

                            /* Rollback change records from failed multi-op */
                            rollbackPendingChanges(zxid, pendingChanges);
                        }
                    }

                    //FIXME: I don't want to have to serialize it here and then
                    //       immediately deserialize in next processor. But I'm
                    //       not sure how else to get the txn stored into our list.
                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
                    BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
                    txn.serialize(boa, "request") ;
                    ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());

                    txns.add(new Txn(type, bb.array()));
                }

                request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
                        Time.currentWallTime(), request.type));
                request.setTxn(new MultiTxn(txns));

                break;

代碼雖然看起來很噁心,可是邏輯卻是挺簡單的:

  • 遍歷全部請求,一個個組裝成起來(要經過一系列的校驗:請求合理、上級路徑存在、ACL、路徑合法),若是中間一直沒有異常,則組裝成一個請求,裏面封裝了事務的記錄。否則則變成一個標記爲錯誤的請求,並回滾掉當前做用域裏的記錄(一個Map)。不管如何,請求都會被髮送至下一個Processor。

OpCode.sync

//All the rest don't need to create a Txn - just verify session
            case OpCode.sync:
                zks.sessionTracker.checkSession(request.sessionId,
                        request.getOwner());
                break;

非事務型請求,校驗一下session就能夠發送至下一個Processor了。

3.1.3 PrepRequestProcessor小結

其本質爲事務性請求進行一些預處理,如:建立事務頭、事務體、會話檢查、ACL檢查和版本檢查等等。

3.1.4 ProposalRequestProcessor

對於事務請求會發起Proposal,併發送給CommitProcessor。並且ProposalRequestProcessor還會將事務請求交付給SyncRequestProcessor。

public void processRequest(Request request) throws RequestProcessorException {
        // LOG.warn("Ack>>> cxid = " + request.cxid + " type = " +
        // request.type + " id = " + request.sessionId);
        // request.addRQRec(">prop");


        /* In the following IF-THEN-ELSE block, we process syncs on the leader.
         * If the sync is coming from a follower, then the follower
         * handler adds it to syncHandler. Otherwise, if it is a client of
         * the leader that issued the sync command, then syncHandler won't
         * contain the handler. In this case, we add it to syncHandler, and
         * call processRequest on the next processor.
         */

        if (request instanceof LearnerSyncRequest){
            zks.getLeader().processSync((LearnerSyncRequest)request);
        } else {
            nextProcessor.processRequest(request);
            if (request.getHdr() != null) {
                // We need to sync and get consensus on any transactions
                try {
                    zks.getLeader().propose(request);
                } catch (XidRolloverException e) {
                    throw new RequestProcessorException(e.getMessage(), e);
                }
                syncProcessor.processRequest(request);
            }
        }
    }

接着看propose:

/**
     * create a proposal and send it out to all the members
     *
     * @param request
     * @return the proposal that is queued to send to all the members
     */
    public Proposal propose(Request request) throws XidRolloverException {
        /**
         * Address the rollover issue. All lower 32bits set indicate a new leader
         * election. Force a re-election instead. See ZOOKEEPER-1277
         */
        if ((request.zxid & 0xffffffffL) == 0xffffffffL) {
            String msg =
                    "zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start";
            shutdown(msg);
            throw new XidRolloverException(msg);
        }

        byte[] data = SerializeUtils.serializeRequest(request);
        proposalStats.setLastBufferSize(data.length);
        QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);

        Proposal p = new Proposal();
        p.packet = pp;
        p.request = request;                
        
        synchronized(this) {
           p.addQuorumVerifier(self.getQuorumVerifier());
                   
           if (request.getHdr().getType() == OpCode.reconfig){
               self.setLastSeenQuorumVerifier(request.qv, true);                       
           }
           
           if (self.getQuorumVerifier().getVersion()<self.getLastSeenQuorumVerifier().getVersion()) {
               p.addQuorumVerifier(self.getLastSeenQuorumVerifier());
           }
                   
            if (LOG.isDebugEnabled()) {
                LOG.debug("Proposing:: " + request);
            }

            lastProposed = p.packet.getZxid();
            outstandingProposals.put(lastProposed, p);
            sendPacket(pp);
        }
        return p;
    }

此次提交的記錄是一個QuorumPacket,其實現了Record接口。指定了type爲PROPOSAL。咱們看一下注釋:

/**
     * This message type is sent by a leader to propose a mutation.
     */
    public final static int PROPOSAL = 2;

很顯然,這個只有Leader才能夠發起的一種變化型請求。再簡單描述下邏輯:

  1. 放到outstandingProposals的Map裏
  2. 組裝成發送的Packet
  3. 將Proposal傳遞給下一個Processor

3.1.5 CommitProcessor

顧名思義,事務提交器。只關心事務請求——等待集羣內Proposal投票直到可被提交。有了CommitProcessor,每一個服務器均可以很好的對事務進行順序處理。

該部分的代碼實在簡陋,故不佔篇幅來分析。讀者朋友知道上述信息後,也能夠理解整個請求鏈是怎樣的。

3.1.6 SyncRequestProcessor

邏輯很是的簡單,將請求記錄到事務日誌中,並嘗試觸發快照。

public void processRequest(Request request) {
        // request.addRQRec(">sync");
        queuedRequests.add(request);
    }

   //線程的核心方法,會對queuedRequests這個隊列進行操做
    @Override
    public void run() {
        try {
            int logCount = 0;

            // we do this in an attempt to ensure that not all of the servers
            // in the ensemble take a snapshot at the same time
            int randRoll = r.nextInt(snapCount/2);
            while (true) {
                Request si = null;
                if (toFlush.isEmpty()) {
                    si = queuedRequests.take();
                } else {
                    si = queuedRequests.poll();
                    if (si == null) {
                        flush(toFlush);
                        continue;
                    }
                }
                if (si == requestOfDeath) {
                    break;
                }
                if (si != null) {
                    // track the number of records written to the log
                    if (zks.getZKDatabase().append(si)) {
                        logCount++;
                        if (logCount > (snapCount / 2 + randRoll)) {
                            randRoll = r.nextInt(snapCount/2);
                            // roll the log
                            zks.getZKDatabase().rollLog();
                            // take a snapshot
                            if (snapInProcess != null && snapInProcess.isAlive()) {
                                LOG.warn("Too busy to snap, skipping");
                            } else {
                                snapInProcess = new ZooKeeperThread("Snapshot Thread") {
                                        public void run() {
                                            try {
                                                zks.takeSnapshot();
                                            } catch(Exception e) {
                                                LOG.warn("Unexpected exception", e);
                                            }
                                        }
                                    };
                                snapInProcess.start();
                            }
                            logCount = 0;
                        }
                    } else if (toFlush.isEmpty()) {
                        // optimization for read heavy workloads
                        // iff this is a read, and there are no pending
                        // flushes (writes), then just pass this to the next
                        // processor
                        if (nextProcessor != null) {
                            nextProcessor.processRequest(si);
                            if (nextProcessor instanceof Flushable) {
                                ((Flushable)nextProcessor).flush();
                            }
                        }
                        continue;
                    }
                    toFlush.add(si);
                    if (toFlush.size() > 1000) {
                        flush(toFlush);
                    }
                }
            }
        } catch (Throwable t) {
            handleException(this.getName(), t);
        } finally{
            running = false;
        }
        LOG.info("SyncRequestProcessor exited!");
    }

3.1.7 ToBeAppliedRequestProcessor

該處理器的核心爲一個toBeApplied隊列,專門用來存儲那些已經被CommitProcessor處理過的可提交的Proposal——直到FinalRequestProcessor處理完後,纔會將其移除。

/*
         * (non-Javadoc)
         *
         * @see org.apache.zookeeper.server.RequestProcessor#processRequest(org.apache.zookeeper.server.Request)
         */
        public void processRequest(Request request) throws RequestProcessorException {
            next.processRequest(request);

            // The only requests that should be on toBeApplied are write
            // requests, for which we will have a hdr. We can't simply use
            // request.zxid here because that is set on read requests to equal
            // the zxid of the last write op.
            if (request.getHdr() != null) {
                long zxid = request.getHdr().getZxid();
                Iterator<Proposal> iter = leader.toBeApplied.iterator();
                if (iter.hasNext()) {
                    Proposal p = iter.next();
                    if (p.request != null && p.request.zxid == zxid) {
                        iter.remove();
                        return;
                    }
                }
                LOG.error("Committed request not found on toBeApplied: "
                          + request);
            }
        }

3.1.8 FinalRequestProcessor

該類核心邏輯約有500多行,爲節約篇幅,在這裏僅描述下邏輯——既然是最後一個處理器,那麼總得回覆請求吧。並負責事務請求的生效——改變內存數據庫的狀態。

3.2 FollowerZooKeeperServer

先看一下其組裝Processors的代碼:

@Override
    protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        commitProcessor = new CommitProcessor(finalProcessor,
                Long.toString(getServerId()), true, getZooKeeperServerListener());
        commitProcessor.start();
        firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
        ((FollowerRequestProcessor) firstProcessor).start();
        syncProcessor = new SyncRequestProcessor(this,
                new SendAckRequestProcessor((Learner)getFollower()));
        syncProcessor.start();
    }

能夠看到,這裏又兩對兒請求鏈:

  1. FollowerRequestProcessor -> CommitProcessor -> FinalProcessor
  2. SyncRequestProcessor -> SendAckRequestProcessor

那麼請求來的時候,是哪一個Processor來handle呢?這邊能夠大體跟蹤一下:

  • firstProcessor(即FollowerRequestProcessor),是主要handle流程,由父類ZooKeeperServer來調度,handle 請求
  • syncProcessor(即SyncRequestProcessor)從logRequest的入口進來。該類的由Learner調度進來,handle leader的請求。

看到這裏有人就要問了,這明明是個Observer,怎麼從Learner進來的呢?這就得看簽名了:

/**
 * This class is the superclass of two of the three main actors in a ZK
 * ensemble: Followers and Observers. Both Followers and Observers share 
 * a good deal of code which is moved into Peer to avoid duplication. 
 */
public class Learner {

爲了不重複代碼,就把一些共同的代碼抽取上來了。

3.2.1 FollowerRequestProcessor

Follower的正常處理器,會判斷是否是事務,是事務就發送給Leader,否則本身處理。

FollowerRequestProcessor.run

@Override
    public void run() {
        try {
            while (!finished) {
                Request request = queuedRequests.take();
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK,
                            'F', request, "");
                }
                if (request == Request.requestOfDeath) {
                    break;
                }
                // We want to queue the request to be processed before we submit
                // the request to the leader so that we are ready to receive
                // the response
                nextProcessor.processRequest(request);

                // We now ship the request to the leader. As with all
                // other quorum operations, sync also follows this code
                // path, but different from others, we need to keep track
                // of the sync operations this follower has pending, so we
                // add it to pendingSyncs.
                switch (request.type) {
                case OpCode.sync:
                    zks.pendingSyncs.add(request);
                    zks.getFollower().request(request);
                    break;
                case OpCode.create:
                case OpCode.create2:
                case OpCode.createTTL:
                case OpCode.createContainer:
                case OpCode.delete:
                case OpCode.deleteContainer:
                case OpCode.setData:
                case OpCode.reconfig:
                case OpCode.setACL:
                case OpCode.multi:
                case OpCode.check:
                    zks.getFollower().request(request);
                    break;
                case OpCode.createSession:
                case OpCode.closeSession:
                    // Don't forward local sessions to the leader.
                    if (!request.isLocalSession()) {
                        zks.getFollower().request(request);
                    }
                    break;
                }
            }
        } catch (Exception e) {
            handleException(this.getName(), e);
        }
        LOG.info("FollowerRequestProcessor exited loop!");
    }

而提交到CommitProcessor 說的很清楚,這樣就能夠收到回覆。該Processor在前文說過,用於等待集羣內Proposal投票直到可被提交。也是這個時候,該Follower的CommitProcessor已經在等待Proposal投票被提交了。

3.2.1 SendAckRequestProcessor

public void processRequest(Request si) {
        if(si.type != OpCode.sync){
            QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null,
                null);
            try {
                learner.writePacket(qp, false);
            } catch (IOException e) {
                LOG.warn("Closing connection to leader, exception during packet send", e);
                try {
                    if (!learner.sock.isClosed()) {
                        learner.sock.close();
                    }
                } catch (IOException e1) {
                    // Nothing to do, we are shutting things down, so an exception here is irrelevant
                    LOG.debug("Ignoring error closing the connection", e1);
                }
            }
        }
    }

邏輯很是的簡單,用於反饋ACK成功,表示自身完成了事務日誌的記錄。

3.3 ObserverZooKeeperServer

/**
     * Set up the request processors for an Observer:
     * firstProcesor->commitProcessor->finalProcessor
     */
    @Override
    protected void setupRequestProcessors() {      
        // We might consider changing the processor behaviour of 
        // Observers to, for example, remove the disk sync requirements.
        // Currently, they behave almost exactly the same as followers.
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        commitProcessor = new CommitProcessor(finalProcessor,
                Long.toString(getServerId()), true,
                getZooKeeperServerListener());
        commitProcessor.start();
        firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
        ((ObserverRequestProcessor) firstProcessor).start();

        /*
         * Observer should write to disk, so that the it won't request
         * too old txn from the leader which may lead to getting an entire
         * snapshot.
         *
         * However, this may degrade performance as it has to write to disk
         * and do periodic snapshot which may double the memory requirements
         */
        if (syncRequestProcessorEnabled) {
            syncProcessor = new SyncRequestProcessor(this, null);
            syncProcessor.start();
        }
    }

邏輯很清晰(大概是由於3.3.0後加入的代碼吧),正常的請求鏈爲:

  1. ObserverRequestProcessor
  2. CommitProcessor
  3. FinalProcessor

若是syncRequestProcessorEnabled開啓的狀況下(缺省爲開),這意味着Observer也會去記錄事務日誌以及作快照,這會給降低必定的性能,以及更多的內存要求。

而後再看下ObserverRequestProcessor,簡直和FollowerRequestProcessor一模一樣,難道他們不知道複用一下代碼嗎???

@Override
    public void run() {
        try {
            while (!finished) {
                Request request = queuedRequests.take();
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK,
                            'F', request, "");
                }
                if (request == Request.requestOfDeath) {
                    break;
                }
                // We want to queue the request to be processed before we submit
                // the request to the leader so that we are ready to receive
                // the response
                nextProcessor.processRequest(request);

                // We now ship the request to the leader. As with all
                // other quorum operations, sync also follows this code
                // path, but different from others, we need to keep track
                // of the sync operations this Observer has pending, so we
                // add it to pendingSyncs.
                switch (request.type) {
                case OpCode.sync:
                    zks.pendingSyncs.add(request);
                    zks.getObserver().request(request);
                    break;
                case OpCode.create:
                case OpCode.create2:
                case OpCode.createTTL:
                case OpCode.createContainer:
                case OpCode.delete:
                case OpCode.deleteContainer:
                case OpCode.setData:
                case OpCode.reconfig:
                case OpCode.setACL:
                case OpCode.multi:
                case OpCode.check:
                    zks.getObserver().request(request);
                    break;
                case OpCode.createSession:
                case OpCode.closeSession:
                    // Don't forward local sessions to the leader.
                    if (!request.isLocalSession()) {
                        zks.getObserver().request(request);
                    }
                    break;
                }
            }
        } catch (Exception e) {
            handleException(this.getName(), e);
        }
        LOG.info("ObserverRequestProcessor exited loop!");
    }

以上,就是源碼分析部分,基於3.5.7版本。

4. 分佈式事務:ZK如何進行事務處理

上面和你們過了一下源碼,相信各位對ZK請求處理流程有了必定的瞭解。接下來,讓咱們理一理事務請求的過程。從Leader的ProposalRequestProcessor開始,大體會分爲三個階段,即:

  1. Sync
  2. Proposal
  3. Commit

4.1 Sync

主要由ProposalRequestProcessor來作,參與Proposql的機器(Leader和Follower)都要記錄事務日誌。

4.2 Proposal

每一個事務請求都要超過半數的投票承認(Leader + Follower)。

  1. Leader檢查服務端的ZXID可用,可用的話發起Proposal。不可用則拋出XidRolloverException。(見org.apache.zookeeper.server.quorum.Leader.propose)
  2. 根據請求頭、事務體以及ZXID生成Proposal(見org.apache.zookeeper.server.quorum.Leader.propose)
  3. 廣播給全部Follower服務器(見org.apache.zookeeper.server.quorum.Leader.sendPacket)
  4. Follower記錄日誌,並ACK給Leader服務器。直到超過半數,進入Commit。或者到超時。
  5. 將請求丟入toBeApplied隊列中。(見org.apache.zookeeper.server.quorum.Leader.tryToCommit)
  6. 廣播Commit,發給Follower的爲COMMIT,而Observer的爲Inform。這使它們提交該Proposal。(見org.apache.zookeeper.server.quorum.Leader.commit && inform)

直到這裏,算是完成了SyncRequestProcessor -> AckRequestProcessor

4.3 Commit

接下來說CommitProcessor -> ToBeAppliedRequestProcessor ->FinalRequestProcessor的過程。

  1. 請求到CommitPrcocessor後是放入一個隊列裏,由線程一個個取出來。當取出來是事務請求時,那麼就會設置一個pending對象到投票結束。這樣保證了事務的順序性,也可讓CommitPrcocessor方便的直到集羣中是否有進行中的事務。
  2. 投票經過,喚醒commit流程。提交請求至committedRequests這個隊列中,而後一個個發送至ToBeAppliedRequestProcessor裏去。
  3. ToBeAppliedRequestProcessor會等待FinalRequestProcessor處理完成後,從toBeApplied隊列中移除這個Proposal。
  4. FinalRequestProcessor會先去校驗最新的一個元素是否zxid小於等於當前的請求,是的話則移除該元素。等因而正常現象,由於這個對列是在PrepRequestProcessor時添加元素的。若是是小於的話說明當前事務比以前的事務早到達了這裏,故移除掉(中途可能已經失敗了)。接着就是應用到內存數據庫了,該內存數據庫實例會維護一個默認上限爲500的committedLog——存放最近成功的Proposal,便於快速同步。
相關文章
相關標籤/搜索