經過前面幾篇文章,咱們能夠從總體上看到zk是如何處理網絡數據的宏觀架構。node
本文咱們從細節着手,看一下一個tcp的包是如何轉換到內部的數據流處理的。apache
基於NIO的端口監聽,獲取tcp數據流。安全
// org.apache.zookeeper.server.NIOServerCnxnFactory.AcceptThread#AcceptThread public AcceptThread(ServerSocketChannel ss, InetSocketAddress addr, Set<SelectorThread> selectorThreads) throws IOException { super("NIOServerCxnFactory.AcceptThread:" + addr); this.acceptSocket = ss; // 只監聽 OP_ACCEPT 事件 this.acceptKey = acceptSocket.register(selector, SelectionKey.OP_ACCEPT); this.selectorThreads = Collections.unmodifiableList(new ArrayList<SelectorThread>(selectorThreads)); selectorIterator = this.selectorThreads.iterator(); } // org.apache.zookeeper.server.NIOServerCnxnFactory.AcceptThread#run public void run() { try { // 死循環一直監聽 while (!stopped && !acceptSocket.socket().isClosed()) { try { select(); } catch (RuntimeException e) { LOG.warn("Ignoring unexpected runtime exception", e); } catch (Exception e) { LOG.warn("Ignoring unexpected exception", e); } } } finally { closeSelector(); // This will wake up the selector threads, and tell the // worker thread pool to begin shutdown. if (!reconfiguring) { NIOServerCnxnFactory.this.stop(); } LOG.info("accept thread exitted run method"); } } // org.apache.zookeeper.server.NIOServerCnxnFactory.AcceptThread#select private void select() { try { // nio select selector.select(); Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator(); while (!stopped && selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); selectedKeys.remove(); if (!key.isValid()) { continue; } if (key.isAcceptable()) { // 有可用數據,接收數據 if (!doAccept()) { // If unable to pull a new connection off the accept // queue, pause accepting to give us time to free // up file descriptors and so the accept thread // doesn't spin in a tight loop. pauseAccept(10); } } else { LOG.warn("Unexpected ops in accept select {}", key.readyOps()); } } } catch (IOException e) { LOG.warn("Ignoring IOException while selecting", e); } } // org.apache.zookeeper.server.NIOServerCnxnFactory.AcceptThread#doAccept /** * Accept new socket connections. Enforces maximum number of connections * per client IP address. Round-robin assigns to selector thread for * handling. Returns whether pulled a connection off the accept queue * or not. If encounters an error attempts to fast close the socket. * * @return whether was able to accept a connection or not */ private boolean doAccept() { boolean accepted = false; SocketChannel sc = null; try { sc = acceptSocket.accept(); accepted = true; InetAddress ia = sc.socket().getInetAddress(); int cnxncount = getClientCnxnCount(ia); if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns) { throw new IOException("Too many connections from " + ia + " - max is " + maxClientCnxns); } LOG.debug("Accepted socket connection from {}", sc.socket().getRemoteSocketAddress()); sc.configureBlocking(false); // Round-robin assign this connection to a selector thread // 選擇一個 selector線程出來進行接收鏈接請求,而後由該線程負責後續處理 if (!selectorIterator.hasNext()) { selectorIterator = selectorThreads.iterator(); } SelectorThread selectorThread = selectorIterator.next(); if (!selectorThread.addAcceptedConnection(sc)) { throw new IOException("Unable to add connection to selector queue" + (stopped ? " (shutdown in progress)" : "")); } acceptErrorLogger.flush(); } catch (IOException e) { // accept, maxClientCnxns, configureBlocking ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1); acceptErrorLogger.rateLimitLog("Error accepting new connection: " + e.getMessage()); fastCloseSock(sc); } return accepted; } } // org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#addAcceptedConnection /** * Place new accepted connection onto a queue for adding. Do this * so only the selector thread modifies what keys are registered * with the selector. */ public boolean addAcceptedConnection(SocketChannel accepted) { // 添加到 SelectorThread 的隊列中,而後即返回繼續監聽 if (stopped || !acceptedQueue.offer(accepted)) { return false; } // 喚醒 selector, 使其開始工做 wakeupSelector(); return true; }
接上一個nio提交過來的鏈接後,由 SelectorThread 進行數據的讀寫。網絡
// org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread /** * The main loop for the thread selects() on the connections and * dispatches ready I/O work requests, then registers all pending * newly accepted connections and updates any interest ops on the * queue. */ public void run() { try { // 死循環一直處理 while (!stopped) { try { select(); processAcceptedConnections(); processInterestOpsUpdateRequests(); } catch (RuntimeException e) { LOG.warn("Ignoring unexpected runtime exception", e); } catch (Exception e) { LOG.warn("Ignoring unexpected exception", e); } } // Close connections still pending on the selector. Any others // with in-flight work, let drain out of the work queue. for (SelectionKey key : selector.keys()) { NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment(); if (cnxn.isSelectable()) { cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN); } cleanupSelectionKey(key); } SocketChannel accepted; while ((accepted = acceptedQueue.poll()) != null) { fastCloseSock(accepted); } updateQueue.clear(); } finally { closeSelector(); // This will wake up the accept thread and the other selector // threads, and tell the worker thread pool to begin shutdown. NIOServerCnxnFactory.this.stop(); LOG.info("selector thread exitted run method"); } } private void select() { try { // 該selector爲本線程私有,因此其操做將是線程安全的 // 都是由 this.selector = Selector.open(); 得到 // 由 AcceptThread 線程調用 wakeupSelector, selector.wakeup(); 喚醒處理 selector.select(); Set<SelectionKey> selected = selector.selectedKeys(); ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected); Collections.shuffle(selectedList); Iterator<SelectionKey> selectedKeys = selectedList.iterator(); while (!stopped && selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); selected.remove(key); if (!key.isValid()) { cleanupSelectionKey(key); continue; } if (key.isReadable() || key.isWritable()) { // 確認進入讀寫事件的處理流程 handleIO(key); } else { LOG.warn("Unexpected ops in select {}", key.readyOps()); } } } catch (IOException e) { LOG.warn("Ignoring IOException while selecting", e); } } /** * Schedule I/O for processing on the connection associated with * the given SelectionKey. If a worker thread pool is not being used, * I/O is run directly by this thread. */ private void handleIO(SelectionKey key) { IOWorkRequest workRequest = new IOWorkRequest(this, key); NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment(); // Stop selecting this key while processing on its // connection cnxn.disableSelectable(); key.interestOps(0); touchCnxn(cnxn); // 封裝IOWorkRequest 添加到工做線程池中去 workerPool.schedule(workRequest); } // org.apache.zookeeper.server.WorkerService#schedule /** * Schedule work to be done. If a worker thread pool is not being * used, work is done directly by this thread. This schedule API is * for use with non-assignable WorkerServices. For assignable * WorkerServices, will always run on the first thread. */ public void schedule(WorkRequest workRequest) { schedule(workRequest, 0); } /** * Schedule work to be done by the thread assigned to this id. Thread * assignment is a single mod operation on the number of threads. If a * worker thread pool is not being used, work is done directly by * this thread. */ public void schedule(WorkRequest workRequest, long id) { if (stopped) { workRequest.cleanup(); return; } ScheduledWorkRequest scheduledWorkRequest = new ScheduledWorkRequest(workRequest); // If we have a worker thread pool, use that; otherwise, do the work // directly. int size = workers.size(); if (size > 0) { try { // make sure to map negative ids as well to [0, size-1] // 此處的 id=0, 因此只會有一個線程池使用,提交到 scheduledWorkRequest 去處理 int workerNum = ((int) (id % size) + size) % size; ExecutorService worker = workers.get(workerNum); worker.execute(scheduledWorkRequest); } catch (RejectedExecutionException e) { LOG.warn("ExecutorService rejected execution", e); workRequest.cleanup(); } } else { // When there is no worker thread pool, do the work directly // and wait for its completion scheduledWorkRequest.run(); } }
接上一個線程submit過來的數據!session
// org.apache.zookeeper.server.WorkerService#schedule /** * Schedule work to be done by the thread assigned to this id. Thread * assignment is a single mod operation on the number of threads. If a * worker thread pool is not being used, work is done directly by * this thread. */ public void schedule(WorkRequest workRequest, long id) { if (stopped) { workRequest.cleanup(); return; } ScheduledWorkRequest scheduledWorkRequest = new ScheduledWorkRequest(workRequest); // If we have a worker thread pool, use that; otherwise, do the work // directly. int size = workers.size(); if (size > 0) { try { // make sure to map negative ids as well to [0, size-1] int workerNum = ((int) (id % size) + size) % size; ExecutorService worker = workers.get(workerNum); worker.execute(scheduledWorkRequest); } catch (RejectedExecutionException e) { LOG.warn("ExecutorService rejected execution", e); workRequest.cleanup(); } } else { // When there is no worker thread pool, do the work directly // and wait for its completion scheduledWorkRequest.run(); } } // org.apache.zookeeper.server.WorkerService.ScheduledWorkRequest private class ScheduledWorkRequest implements Runnable { private final WorkRequest workRequest; ScheduledWorkRequest(WorkRequest workRequest) { this.workRequest = workRequest; } @Override public void run() { try { // Check if stopped while request was on queue if (stopped) { workRequest.cleanup(); return; } // 調用 WorkRequest workRequest.doWork(); } catch (Exception e) { LOG.warn("Unexpected exception", e); workRequest.cleanup(); } } } // org.apache.zookeeper.server.NIOServerCnxnFactory.IOWorkRequest#doWork public void doWork() throws InterruptedException { if (!key.isValid()) { selectorThread.cleanupSelectionKey(key); return; } // 針對客戶端請求,進行讀寫處理 if (key.isReadable() || key.isWritable()) { // 將key交給 對應的 事務處理服務處理 cnxn.doIO(key); // Check if we shutdown or doIO() closed this connection if (stopped) { cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN); return; } if (!key.isValid()) { selectorThread.cleanupSelectionKey(key); return; } // 關聯到對應的channnel, 以即可以進行響應 touchCnxn(cnxn); } // Mark this connection as once again ready for selection cnxn.enableSelectable(); // Push an update request on the queue to resume selecting // on the current set of interest ops, which may have changed // as a result of the I/O operations we just performed. if (!selectorThread.addInterestOpsUpdateRequest(key)) { cnxn.close(ServerCnxn.DisconnectReason.CONNECTION_MODE_CHANGED); } } // 處理讀寫數據 // org.apache.zookeeper.server.NIOServerCnxn#doIO void doIO(SelectionKey k) throws InterruptedException { try { if (!isSocketOpen()) { LOG.warn("trying to do i/o on a null socket for session: 0x{}", Long.toHexString(sessionId)); return; } if (k.isReadable()) { int rc = sock.read(incomingBuffer); if (rc < 0) { handleFailedRead(); } if (incomingBuffer.remaining() == 0) { boolean isPayload; if (incomingBuffer == lenBuffer) { // start of next request incomingBuffer.flip(); isPayload = readLength(k); incomingBuffer.clear(); } else { // continuation isPayload = true; } if (isPayload) { // not the case for 4letterword // 數據準備完成,讀取消息體,順便進行數據傳遞到下游 readPayload(); } else { // four letter words take care // need not do anything else return; } } } if (k.isWritable()) { handleWrite(k); if (!initialized && !getReadInterest() && !getWriteInterest()) { throw new CloseRequestException("responded to info probe", DisconnectReason.INFO_PROBE); } } } catch (CancelledKeyException e) { LOG.warn("CancelledKeyException causing close of session: 0x{}", Long.toHexString(sessionId)); LOG.debug("CancelledKeyException stack trace", e); close(DisconnectReason.CANCELLED_KEY_EXCEPTION); } catch (CloseRequestException e) { // expecting close to log session closure close(); } catch (EndOfStreamException e) { LOG.warn("Unexpected exception", e); // expecting close to log session closure close(e.getReason()); } catch (ClientCnxnLimitException e) { // Common case exception, print at debug level ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1); LOG.warn("Closing session 0x{}", Long.toHexString(sessionId), e); close(DisconnectReason.CLIENT_CNX_LIMIT); } catch (IOException e) { LOG.warn("Close of session 0x{}", Long.toHexString(sessionId), e); close(DisconnectReason.IO_EXCEPTION); } } // org.apache.zookeeper.server.NIOServerCnxn#readPayload /** Read the request payload (everything following the length prefix) */ private void readPayload() throws IOException, InterruptedException, ClientCnxnLimitException { if (incomingBuffer.remaining() != 0) { // have we read length bytes? int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok if (rc < 0) { handleFailedRead(); } } // 屢次確認數據是否已到齊 if (incomingBuffer.remaining() == 0) { // have we read length bytes? incomingBuffer.flip(); packetReceived(4 + incomingBuffer.remaining()); if (!initialized) { readConnectRequest(); } else { // 對初始化後的數據處理,直接讀取 buffers readRequest(); } lenBuffer.clear(); incomingBuffer = lenBuffer; } } // org.apache.zookeeper.server.NIOServerCnxn#readRequest private void readRequest() throws IOException { // 交給 zkServer 處理,其中包含了各類準備好的處理鏈配置 zkServer.processPacket(this, incomingBuffer); } // org.apache.zookeeper.server.ZooKeeperServer#processPacket public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException { // We have the request, now process and setup for next InputStream bais = new ByteBufferInputStream(incomingBuffer); BinaryInputArchive bia = BinaryInputArchive.getArchive(bais); RequestHeader h = new RequestHeader(); h.deserialize(bia, "header"); // Need to increase the outstanding request count first, otherwise // there might be a race condition that it enabled recv after // processing request and then disabled when check throttling. // // Be aware that we're actually checking the global outstanding // request before this request. // // It's fine if the IOException thrown before we decrease the count // in cnxn, since it will close the cnxn anyway. cnxn.incrOutstandingAndCheckThrottle(h); // Through the magic of byte buffers, txn will not be // pointing // to the start of the txn incomingBuffer = incomingBuffer.slice(); // 根據請求類型,簡單分類處理 if (h.getType() == OpCode.auth) { LOG.info("got auth packet {}", cnxn.getRemoteSocketAddress()); AuthPacket authPacket = new AuthPacket(); ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket); String scheme = authPacket.getScheme(); ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme); Code authReturn = KeeperException.Code.AUTHFAILED; if (ap != null) { try { // handleAuthentication may close the connection, to allow the client to choose // a different server to connect to. authReturn = ap.handleAuthentication( new ServerAuthenticationProvider.ServerObjs(this, cnxn), authPacket.getAuth()); } catch (RuntimeException e) { LOG.warn("Caught runtime exception from AuthenticationProvider: {}", scheme, e); authReturn = KeeperException.Code.AUTHFAILED; } } if (authReturn == KeeperException.Code.OK) { LOG.debug("Authentication succeeded for scheme: {}", scheme); LOG.info("auth success {}", cnxn.getRemoteSocketAddress()); ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue()); cnxn.sendResponse(rh, null, null); } else { if (ap == null) { LOG.warn( "No authentication provider for scheme: {} has {}", scheme, ProviderRegistry.listProviders()); } else { LOG.warn("Authentication failed for scheme: {}", scheme); } // send a response... ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.AUTHFAILED.intValue()); cnxn.sendResponse(rh, null, null); // ... and close connection cnxn.sendBuffer(ServerCnxnFactory.closeConn); cnxn.disableRecv(); } return; } else if (h.getType() == OpCode.sasl) { processSasl(incomingBuffer, cnxn, h); } else { // 通用請求處理 if (shouldRequireClientSaslAuth() && !hasCnxSASLAuthenticated(cnxn)) { ReplyHeader replyHeader = new ReplyHeader(h.getXid(), 0, Code.SESSIONCLOSEDREQUIRESASLAUTH.intValue()); cnxn.sendResponse(replyHeader, null, "response"); cnxn.sendCloseSession(); cnxn.disableRecv(); } else { // 封裝 Request, 至此, 內部數據模型已生成, sessionId, xid, buffers... Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo()); int length = incomingBuffer.limit(); if (isLargeRequest(length)) { // checkRequestSize will throw IOException if request is rejected checkRequestSizeWhenMessageReceived(length); si.setLargeRequestSize(length); } si.setOwner(ServerCnxn.me); // 提交到請求隊列中 submitRequest(si); } } } public void submitRequest(Request si) { enqueueRequest(si); } public void enqueueRequest(Request si) { if (requestThrottler == null) { synchronized (this) { try { // Since all requests are passed to the request // processor it should wait for setting up the request // processor chain. The state will be updated to RUNNING // after the setup. while (state == State.INITIAL) { wait(1000); } } catch (InterruptedException e) { LOG.warn("Unexpected interruption", e); } if (requestThrottler == null) { throw new RuntimeException("Not started"); } } } // 提交請求給限流器 // requestThrottler = new RequestThrottler(this); 節流器處理完成後,最終仍會提交給當前 zkServer 處理 requestThrottler.submitRequest(si); } // org.apache.zookeeper.server.RequestThrottler#submitRequest public void submitRequest(Request request) { // 若是已中止,則刪除隊列 if (stopping) { LOG.debug("Shutdown in progress. Request cannot be processed"); dropRequest(request); } else { // LinkedBlockingQueue 入隊,最終由該線程去異步處理 submittedRequests.add(request); } }
經過上面的tcp數據轉換,將其轉換爲了 Request 實例,提交到隊列了。接下來這個隊列將被 RequestThrottler 處理,它的做用是斷定是否超出了設置的最大請求數,若是超出,則做等待處理,防止下游沒法應對。多線程
// org.apache.zookeeper.server.RequestThrottler#run @Override public void run() { try { // 死循環一直處理 while (true) { if (killed) { break; } // 阻塞式獲取,即只要數據被提交,就會被當即處理 Request request = submittedRequests.take(); if (Request.requestOfDeath == request) { break; } if (request.mustDrop()) { continue; } // Throttling is disabled when maxRequests = 0 if (maxRequests > 0) { while (!killed) { if (dropStaleRequests && request.isStale()) { // Note: this will close the connection dropRequest(request); ServerMetrics.getMetrics().STALE_REQUESTS_DROPPED.add(1); request = null; break; } // 只要沒達到最大限制,直接經過 if (zks.getInProcess() < maxRequests) { break; } throttleSleep(stallTime); } } if (killed) { break; } // A dropped stale request will be null if (request != null) { if (request.isStale()) { ServerMetrics.getMetrics().STALE_REQUESTS.add(1); } // 驗證經過後,提交給 zkServer 處理 zks.submitRequestNow(request); } } } catch (InterruptedException e) { LOG.error("Unexpected interruption", e); } int dropped = drainQueue(); LOG.info("RequestThrottler shutdown. Dropped {} requests", dropped); } // org.apache.zookeeper.server.ZooKeeperServer#submitRequestNow public void submitRequestNow(Request si) { // 確保處理器鏈路已生成 if (firstProcessor == null) { synchronized (this) { try { // Since all requests are passed to the request // processor it should wait for setting up the request // processor chain. The state will be updated to RUNNING // after the setup. while (state == State.INITIAL) { wait(1000); } } catch (InterruptedException e) { LOG.warn("Unexpected interruption", e); } if (firstProcessor == null || state != State.RUNNING) { throw new RuntimeException("Not started"); } } } try { touch(si.cnxn); // 驗證數據類型是否支持 boolean validpacket = Request.isValid(si.type); if (validpacket) { setLocalSessionFlag(si); // 調用第一個處理器進行處理了,進入正式流程,如 LeaderRequestProcessor firstProcessor.processRequest(si); if (si.cnxn != null) { incInProcess(); } } else { LOG.warn("Received packet at server of unknown type {}", si.type); // Update request accounting/throttling limits requestFinished(si); new UnimplementedRequestProcessor().processRequest(si); } } catch (MissingSessionException e) { LOG.debug("Dropping request.", e); // Update request accounting/throttling limits requestFinished(si); } catch (RequestProcessorException e) { LOG.error("Unable to process request", e); // Update request accounting/throttling limits requestFinished(si); } }
至此,請求數據初始化就完成了,就能夠好好去分析處理器的處理過程了。架構
// org.apache.zookeeper.server.quorum.LeaderRequestProcessor#processRequest @Override public void processRequest(Request request) throws RequestProcessorException { // Screen quorum requests against ACLs first if (!lzks.authWriteRequest(request)) { return; } // Check if this is a local session and we are trying to create // an ephemeral node, in which case we upgrade the session Request upgradeRequest = null; try { upgradeRequest = lzks.checkUpgradeSession(request); } catch (KeeperException ke) { if (request.getHdr() != null) { LOG.debug("Updating header"); request.getHdr().setType(OpCode.error); request.setTxn(new ErrorTxn(ke.code().intValue())); } request.setException(ke); LOG.warn("Error creating upgrade request", ke); } catch (IOException ie) { LOG.error("Unexpected error in upgrade", ie); } if (upgradeRequest != null) { nextProcessor.processRequest(upgradeRequest); } // 直接轉給下一處理器方法,自己未作過多邏輯 nextProcessor.processRequest(request); } // org.apache.zookeeper.server.PrepRequestProcessor#processRequest public void processRequest(Request request) { request.prepQueueStartTime = Time.currentElapsedTime(); // LinkedBlockingQueue, 添加到 PrepRequestProcessor 的隊列中,進入真正的處理鏈邏輯 submittedRequests.add(request); // 添加監控信息統計 ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUED.add(1); }
6、firstProcessor 示例一號傳球手 --FollowerRequestProcessor異步
// org.apache.zookeeper.server.quorum.FollowerRequestProcessor#processRequest public void processRequest(Request request) { processRequest(request, true); } void processRequest(Request request, boolean checkForUpgrade) { if (!finished) { if (checkForUpgrade) { // Before sending the request, check if the request requires a // global session and what we have is a local session. If so do // an upgrade. Request upgradeRequest = null; try { upgradeRequest = zks.checkUpgradeSession(request); } catch (KeeperException ke) { if (request.getHdr() != null) { request.getHdr().setType(OpCode.error); request.setTxn(new ErrorTxn(ke.code().intValue())); } request.setException(ke); LOG.warn("Error creating upgrade request", ke); } catch (IOException ie) { LOG.error("Unexpected error in upgrade", ie); } if (upgradeRequest != null) { queuedRequests.add(upgradeRequest); } } // FollowerRequestProcessor 有必定的處理邏輯,將其添加到自有隊列 queuedRequests 中 queuedRequests.add(request); } } // org.apache.zookeeper.server.quorum.FollowerRequestProcessor#run @Override public void run() { try { while (!finished) { Request request = queuedRequests.take(); if (LOG.isTraceEnabled()) { ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK, 'F', request, ""); } if (request == Request.requestOfDeath) { break; } // Screen quorum requests against ACLs first if (!zks.authWriteRequest(request)) { continue; } // We want to queue the request to be processed before we submit // the request to the leader so that we are ready to receive // the response // 此處將會提交給 CommitProcessor, 進入 Follower 本地處理 nextProcessor.processRequest(request); // We now ship the request to the leader. As with all // other quorum operations, sync also follows this code // path, but different from others, we need to keep track // of the sync operations this follower has pending, so we // add it to pendingSyncs. // 針對寫請求,將其轉發給 Leader 處理 switch (request.type) { case OpCode.sync: zks.pendingSyncs.add(request); zks.getFollower().request(request); break; case OpCode.create: case OpCode.create2: case OpCode.createTTL: case OpCode.createContainer: case OpCode.delete: case OpCode.deleteContainer: case OpCode.setData: case OpCode.reconfig: case OpCode.setACL: case OpCode.multi: case OpCode.check: zks.getFollower().request(request); break; case OpCode.createSession: case OpCode.closeSession: // Don't forward local sessions to the leader. if (!request.isLocalSession()) { zks.getFollower().request(request); } break; } } } catch (Exception e) { handleException(this.getName(), e); } LOG.info("FollowerRequestProcessor exited loop!"); } // org.apache.zookeeper.server.quorum.Learner#request /** * send a request packet to the leader * * @param request * the request from the client * @throws IOException */ void request(Request request) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream oa = new DataOutputStream(baos); oa.writeLong(request.sessionId); oa.writeInt(request.cxid); oa.writeInt(request.type); if (request.request != null) { request.request.rewind(); int len = request.request.remaining(); byte[] b = new byte[len]; request.request.get(b); request.request.rewind(); oa.write(b); } oa.close(); // 轉發請求給 Leader, 詳情待續 QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo); writePacket(qp, true); }
7、firstProcessor 示例一號傳球手 --ObserverRequestProcessorsocket
ObserverRequestProcessor 與 Follower 相似.tcp
// org.apache.zookeeper.server.quorum.ObserverRequestProcessor#processRequest /** * Simply queue the request, which will be processed in FIFO order. */ public void processRequest(Request request) { if (!finished) { Request upgradeRequest = null; try { upgradeRequest = zks.checkUpgradeSession(request); } catch (KeeperException ke) { if (request.getHdr() != null) { request.getHdr().setType(OpCode.error); request.setTxn(new ErrorTxn(ke.code().intValue())); } request.setException(ke); LOG.info("Error creating upgrade request", ke); } catch (IOException ie) { LOG.error("Unexpected error in upgrade", ie); } if (upgradeRequest != null) { queuedRequests.add(upgradeRequest); } // 添加到自身的工做隊列中,稍後處理 queuedRequests.add(request); } } // org.apache.zookeeper.server.quorum.ObserverRequestProcessor#run @Override public void run() { try { while (!finished) { Request request = queuedRequests.take(); if (LOG.isTraceEnabled()) { ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK, 'F', request, ""); } if (request == Request.requestOfDeath) { break; } // Screen quorum requests against ACLs first if (!zks.authWriteRequest(request)) { continue; } // We want to queue the request to be processed before we submit // the request to the leader so that we are ready to receive // the response nextProcessor.processRequest(request); // We now ship the request to the leader. As with all // other quorum operations, sync also follows this code // path, but different from others, we need to keep track // of the sync operations this Observer has pending, so we // add it to pendingSyncs. switch (request.type) { case OpCode.sync: zks.pendingSyncs.add(request); zks.getObserver().request(request); break; case OpCode.create: case OpCode.create2: case OpCode.createTTL: case OpCode.createContainer: case OpCode.delete: case OpCode.deleteContainer: case OpCode.setData: case OpCode.reconfig: case OpCode.setACL: case OpCode.multi: case OpCode.check: zks.getObserver().request(request); break; case OpCode.createSession: case OpCode.closeSession: // Don't forward local sessions to the leader. if (!request.isLocalSession()) { zks.getObserver().request(request); } break; } } } catch (Exception e) { handleException(this.getName(), e); } LOG.info("ObserverRequestProcessor exited loop!"); }
OK!至此,咱們已經完整地看到了一個 tcp 數據流如何轉變爲 zk 的數據包,並準備進入 正式的業務處理流程。
整個過程總結下:
1. 一個 Accept 線程接入客戶端請求;
2. 一組 Selector 線程輪詢處理讀寫請求;
3. 線程池處理, 一組 WorkerService 進行提交讀寫數據;
4. 由 ZooKeeperServer 進行 Request包的封裝;
5. 經限流器把關 RequestThrottler, hold過多的請求, 而後再回高 ZooKeeperServer 入隊;
6. ZooKeeperServer 找到 firstProcessor, 將請求 Request 傳遞處處理鏈中, 準備過程完畢;
7. 各個 firstProcessor 的處理邏輯不一,其中 Leader 僅是將 Request 傳遞處處理鏈中, 而 Follower/Observer 則要本身作一些額外的工做;
整個過程,是在N個線程之間不停地傳遞處理,各自負責各自的領域點。而理清zk的工做方式,難點就在於其多線程的配合過程。