建立完鏈接後,就開始監聽並處理請求了,查看MemcachedConnection的run方法,能夠看到是個死循環處理io請求,下面分析handleIO方法:node
/** * Handle all IO that flows through the connection. * * This method is called in an endless loop, listens on NIO selectors and * dispatches the underlying read/write calls if needed. */ public void handleIO() throws IOException { //若是鏈接關閉,則當即中止 if (shutDown) { getLogger().debug("No IO while shut down."); return; } //處理客戶端的全部請求,下面會分析 handleInputQueue(); ==》 getLogger().debug("Done dealing with queue."); long delay = 1000; if (!reconnectQueue.isEmpty()) { long now = System.currentTimeMillis(); long then = reconnectQueue.firstKey(); delay = Math.max(then - now, 1); } getLogger().debug("Selecting with delay of %sms", delay); //檢查selectors是否正確 assert selectorsMakeSense() : "Selectors don't make sense."; int selected = selector.select(delay); if (shutDown) { return; } else if (selected == 0 && addedQueue.isEmpty()) { //沒有請求的時候調用,空方法。 handleWokenUpSelector(); } else if (selector.selectedKeys().isEmpty()) { //註冊事件爲空時,檢測次數超過DOUBLE_CHECK_EMPTY時可能會釋放鏈接,放入重連隊列 handleEmptySelects(); } else { getLogger().debug("Selected %d, selected %d keys", selected, selector.selectedKeys().size()); emptySelects = 0; Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while(iterator.hasNext()) { SelectionKey sk = iterator.next(); //處理io請求 handleIO(sk); ==》 iterator.remove(); } } handleOperationalTasks(); }
處理客戶端請求,主要的操做應該是把inputQ中的ops轉移到writeQ中:數據庫
/** * Handle any requests that have been made against the client. */ private void handleInputQueue() { if (!addedQueue.isEmpty()) { getLogger().debug("Handling queue"); Collection<MemcachedNode> toAdd = new HashSet<MemcachedNode>(); Collection<MemcachedNode> todo = new HashSet<MemcachedNode>(); MemcachedNode qaNode; while ((qaNode = addedQueue.poll()) != null) { todo.add(qaNode); } for (MemcachedNode node : todo) { boolean readyForIO = false; if (node.isActive()) { //判斷當前是否有可用的write operation if (node.getCurrentWriteOp() != null) { readyForIO = true; getLogger().debug("Handling queued write %s", node); } } else { toAdd.add(node); } //get set操做的時候會向inputQueue中放入op將node中 //inputQueue中部分op轉移到writeQ中, //op的數量根據writeQ可接受大小而定,若是writeQ不爲空, //在下面 node.fixupOps()中則會註冊write事件 node.copyInputQueue(); if (readyForIO) { try { if (node.getWbuf().hasRemaining()) { //處理寫操做,後面分析 handleWrites(node); } } catch (IOException e) { getLogger().warn("Exception handling write", e); lostConnection(node); } } //若是readQ不爲空,則註冊read事件 //若是writeQ不爲空,則註冊write事件 node.fixupOps(); } addedQueue.addAll(toAdd); } } /** * Handle IO for a specific selector. * * Any IOException will cause a reconnect. Note that this code makes sure * that the corresponding node is not only able to connect, but also able to * respond in a correct fashion (if verifyAliveOnConnect is set to true * through a property). This is handled by issuing a dummy * version/noop call and making sure it returns in a correct and timely * fashion. * * @param sk the selector to handle IO against. */ private void handleIO(final SelectionKey sk) { MemcachedNode node = (MemcachedNode) sk.attachment(); try { getLogger().debug("Handling IO for: %s (r=%s, w=%s, c=%s, op=%s)", sk, sk.isReadable(), sk.isWritable(), sk.isConnectable(), sk.attachment()); //是否能夠鏈接 if (sk.isConnectable() && belongsToCluster(node)) { getLogger().debug("Connection state changed for %s", sk); final SocketChannel channel = node.getChannel(); if (channel.finishConnect()) { //鏈接成功後,將node添加到addedQueue finishConnect(sk, node); } else { assert !channel.isConnected() : "connected"; } } else { //處理讀寫事件 handleReadsAndWrites(sk, node); } } catch (ClosedChannelException e) { if (!shutDown) { getLogger().info("Closed channel and not shutting down. Queueing" + " reconnect on %s", node, e); lostConnection(node); } } catch (ConnectException e) { getLogger().info("Reconnecting due to failure to connect to %s", node, e); queueReconnect(node); } catch (OperationException e) { node.setupForAuth(); getLogger().info("Reconnection due to exception handling a memcached " + "operation on %s. This may be due to an authentication failure.", node, e); lostConnection(node); } catch (Exception e) { node.setupForAuth(); getLogger().info("Reconnecting due to exception on %s", node, e); lostConnection(node); } node.fixupOps(); } private void handleReadsAndWrites(final SelectionKey sk, final MemcachedNode node) throws IOException { if (sk.isValid()) { if (sk.isReadable()) { handleReads(node); } if (sk.isWritable()) { handleWrites(node); } } }
首先看 handleWrites服務器
/** * Handle pending writes for the given node. * * @param node the node to handle writes for. * @throws IOException can be raised during writing failures. */ private void handleWrites(final MemcachedNode node) throws IOException { //填充writeBuffer,shouldOptimize爲false node.fillWriteBuffer(shouldOptimize); ==> //判斷toWrite的值,大於零說明有數據須要write boolean canWriteMore = node.getBytesRemainingToWrite() > 0; while (canWriteMore) { //想memcache server發送消息 int wrote = node.writeSome(); metrics.updateHistogram(OVERALL_AVG_BYTES_WRITE_METRIC, wrote); //試圖再次填充writeBuffer node.fillWriteBuffer(shouldOptimize); canWriteMore = wrote > 0 && node.getBytesRemainingToWrite() > 0; } } //不斷從writeQ中獲取op,將op中的cmd buffer轉移到wbuf,直到writeQ沒有元素或者wbuf填滿 public final void fillWriteBuffer(boolean shouldOptimize) { if (toWrite == 0 && readQ.remainingCapacity() > 0) { getWbuf().clear(); //op的初始狀態是OperationState.WRITE_QUEUED,便可寫狀態 //1.op是中止的,則會從writeQ中移除,從writeQ中獲取下一個op //2.op是超時的,則會從writeQ中移除,從writeQ中獲取下一個op //3.op正常,狀態轉換成OperationState.WRITING,即開始發送數據狀態,並將o加入readQ中,準備讀取響應 Operation o=getNextWritableOp(); while(o != null && toWrite < getWbuf().capacity()) { synchronized(o) { assert o.getState() == OperationState.WRITING; //獲取write buffer ByteBuffer obuf = o.getBuffer(); assert obuf != null : "Didn't get a write buffer from " + o; int bytesToCopy = Math.min(getWbuf().remaining(), obuf.remaining()); byte[] b = new byte[bytesToCopy]; obuf.get(b); getWbuf().put(b); getLogger().debug("After copying stuff from %s: %s", o, getWbuf()); //cmd中沒有數據則說明所有轉移完成 if (!o.getBuffer().hasRemaining()) { //OperationState.WRITING狀態轉換成OperationState.READING o.writeComplete(); //從writeQ中刪除此op transitionWriteItem(); //準備即將發生的操做,將inputQ中的op轉移到writeQ中,並刪除cancel狀態的op preparePending(); if (shouldOptimize) { //優化操做,TODO optimize(); } o=getNextWritableOp(); } toWrite += bytesToCopy; } } getWbuf().flip(); assert toWrite <= getWbuf().capacity() : "toWrite exceeded capacity: " + this; assert toWrite == getWbuf().remaining() : "Expected " + toWrite + " remaining, got " + getWbuf().remaining(); } else { getLogger().debug("Buffer is full, skipping"); } }
下面分析spy memcached如何處理memcache的響應。若是channel是可讀,則會觸發handleReads方法less
private void handleReads(final MemcachedNode node) throws IOException { //從readQ中獲取op Operation currentOp = node.getCurrentReadOp(); if (currentOp instanceof TapAckOperationImpl) { node.removeCurrentReadOp(); return; } ByteBuffer rbuf = node.getRbuf(); final SocketChannel channel = node.getChannel(); //讀取請求響應 int read = channel.read(rbuf); metrics.updateHistogram(OVERALL_AVG_BYTES_READ_METRIC, read); //沒有數據可讀了 TODO if (read < 0) { currentOp = handleReadsWhenChannelEndOfStream(currentOp, node, rbuf); } while (read > 0) { getLogger().debug("Read %d bytes", read); rbuf.flip(); while (rbuf.remaining() > 0) { if (currentOp == null) { throw new IllegalStateException("No read operation."); } long timeOnWire = System.nanoTime() - currentOp.getWriteCompleteTimestamp(); metrics.updateHistogram(OVERALL_AVG_TIME_ON_WIRE_METRIC, (int)(timeOnWire / 1000)); metrics.markMeter(OVERALL_RESPONSE_METRIC); synchronized(currentOp) { readBufferAndLogMetrics(currentOp, rbuf, node); ==》 } currentOp = node.getCurrentReadOp(); } rbuf.clear(); read = channel.read(rbuf); node.completedRead(); } } /** * Read from the buffer and add metrics information. * * @param currentOp the current operation to read. * @param rbuf the read buffer to read from. * @param node the node to read from. * @throws IOException if reading was not successful. */ private void readBufferAndLogMetrics(final Operation currentOp, final ByteBuffer rbuf, final MemcachedNode node) throws IOException { //讀取數據並進行協議解析 currentOp.readFromBuffer(rbuf); //讀取成功 if (currentOp.getState() == OperationState.COMPLETE) { getLogger().debug("Completed read op: %s and giving the next %d " + "bytes", currentOp, rbuf.remaining()); //移除當前的readOps Operation op = node.removeCurrentReadOp(); assert op == currentOp : "Expected to pop " + currentOp + " got " + op; if (op.hasErrored()) { metrics.markMeter(OVERALL_RESPONSE_FAIL_METRIC); } else { metrics.markMeter(OVERALL_RESPONSE_SUCC_METRIC); } } else if (currentOp.getState() == OperationState.RETRY) { handleRetryInformation(currentOp.getErrorMsg()); getLogger().debug("Reschedule read op due to NOT_MY_VBUCKET error: " + "%s ", currentOp); ((VBucketAware) currentOp).addNotMyVbucketNode( currentOp.getHandlingNode()); Operation op = node.removeCurrentReadOp(); assert op == currentOp : "Expected to pop " + currentOp + " got " + op; retryOps.add(currentOp); metrics.markMeter(OVERALL_RESPONSE_RETRY_METRIC); } } # 下面分析下memcache的get協議: get <鍵>*/r/n <鍵>* - key key是一個不爲空的字符串組合,發送這個指令之後,等待服務器的返回。 若是服務器端沒有任何數據,則是返回: END/r/n 證實沒有不存在這個key,沒有任何數據,若是存在數據,則 返回指定格式: VALUE <鍵> <標記> <數據長度>/r/n <數據塊>/r/n END/r/n 返回的數據是以VALUE開始的,後面跟着key和flags,以及數據長度,第二行跟着數據塊。 <鍵> -key 是發送過來指令的key內容 <標記> - flags 是調用set指令保存數據時候的flags標記 <數據長度> - bytes 是保存數據時候定位的長度 <數據塊> - data block 數據長度下一行就是提取的數據塊內容 END 結束 public void readFromBuffer(ByteBuffer data) throws IOException { // Loop while there's data remaining to get it all drained. while (getState() != OperationState.COMPLETE && data.remaining() > 0) { if (readType == OperationReadType.DATA) { //讀取數據塊數據,並返回客戶端數據 handleRead(data); } else { //提取命令,直到/r/n結束,獲取命令行line int offset = -1; for (int i = 0; data.remaining() > 0; i++) { byte b = data.get(); if (b == '\r') { foundCr = true; } else if (b == '\n') { assert foundCr : "got a \\n without a \\r"; offset = i; foundCr = false; break; } else { assert !foundCr : "got a \\r without a \\n"; byteBuffer.write(b); } } if (offset >= 0) { String line = new String(byteBuffer.toByteArray(), CHARSET); byteBuffer.reset(); OperationErrorType eType = classifyError(line); if (eType != null) { errorMsg = line.getBytes(); handleError(eType, line); } else { //處理命令行,咱們分析下BaseGetOpImpl handleLine(line); ==》 } } } } } //BaseGetOpImpl public final void handleLine(String line) { if (line.equals("END")) { //get響應結束 getLogger().debug("Get complete!"); if (hasValue) { //有返回值,即數據塊有值 getCallback().receivedStatus(END); } else { //沒有獲取到數據,key不存在等 getCallback().receivedStatus(NOT_FOUND); } //處理完成 transitionState(OperationState.COMPLETE); data = null; } else if (line.startsWith("VALUE ")) { //讀取命令行VALUE。。。 getLogger().debug("Got line %s", line); String[] stuff = line.split(" "); assert stuff[0].equals("VALUE"); currentKey = stuff[1]; currentFlags = Integer.parseInt(stuff[2]); data = new byte[Integer.parseInt(stuff[3])]; if (stuff.length > 4) { casValue = Long.parseLong(stuff[4]); } readOffset = 0; //設置有數據庫標識 hasValue = true; getLogger().debug("Set read type to data"); setReadType(OperationReadType.DATA); } else if (line.equals("LOCK_ERROR")) { getCallback().receivedStatus(LOCK_ERROR); transitionState(OperationState.COMPLETE); } else { assert false : "Unknown line type: " + line; } }