下面分析get請求邏輯node
//根據key獲取相應的值,並使用解碼器進行解碼 public Object get(String key) { return get(key, transcoder); } //異步獲取future,並經過get方法設置超時時間,獲取結果 public <T> T get(String key, Transcoder<T> tc) { try { return asyncGet(key, tc).get(operationTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new RuntimeException("Interrupted waiting for value", e); } catch (ExecutionException e) { if(e.getCause() instanceof CancellationException) { throw (CancellationException) e.getCause(); } else { throw new RuntimeException("Exception waiting for value", e); } } catch (TimeoutException e) { throw new OperationTimeoutException("Timeout waiting for value: " + buildTimeoutMessage(operationTimeout, TimeUnit.MILLISECONDS), e); } } public <T> GetFuture<T> asyncGet(final String key, final Transcoder<T> tc) { final CountDownLatch latch = new CountDownLatch(1); final GetFuture<T> rv = new GetFuture<T>(latch, operationTimeout, key, executorService); //讀取到END/n/r的時候前後調用receivedStatus,complete //獲取到數據後調用gotData //執行順序是gotData,receivedStatus,complete Operation op = opFact.get(key, new GetOperation.Callback() { private Future<T> val; @Override public void receivedStatus(OperationStatus status) { rv.set(val, status); } @Override public void gotData(String k, int flags, byte[] data) { assert key.equals(k) : "Wrong key returned"; val = tcService.decode(tc, new CachedData(flags, data, tc.getMaxSize())); } @Override public void complete() { latch.countDown(); rv.signalComplete(); } }); rv.setOperation(op); mconn.enqueueOperation(key, op); ==》 return rv; } protected void addOperation(final String key, final Operation o) { MemcachedNode placeIn = null; //根據key路由,默認負載算法是根據key hash取模 MemcachedNode primary = locator.getPrimary(key); //節點可用或者失敗處理模式爲重試機制 if (primary.isActive() || failureMode == FailureMode.Retry) { placeIn = primary; } else if (failureMode == FailureMode.Cancel) { //節點不可用且重試模式爲當即中止 o.cancel(); } else { //若是primary不可用,且FailureMode爲Redistribute則選擇一個能夠節點處理 Iterator<MemcachedNode> i = locator.getSequence(key); while (placeIn == null && i.hasNext()) { MemcachedNode n = i.next(); if (n.isActive()) { placeIn = n; } } if (placeIn == null) { placeIn = primary; this.getLogger().warn("Could not redistribute to another node, " + "retrying primary node for %s.", key); } } assert o.isCancelled() || placeIn != null : "No node found for key " + key; if (placeIn != null) { //將op添加到該節點的inputQ中,同時也把op放入到addedQueue中 addOperation(placeIn, o); } else { assert o.isCancelled() : "No node found for " + key + " (and not " + "immediately cancelled)"; } }
調用完asyncGet後獲取到GetFuture,而後再調用GetFuture的get方法:redis
能夠看出首先根據rc獲取到Furure對象,而後經過Future get方法獲取最終結果算法
public T get(long duration, TimeUnit units) throws InterruptedException, TimeoutException, ExecutionException { Future<T> v = rv.get(duration, units); return v == null ? null : v.get(); }
再看rv.get(duration, units)方法:異步
public T get(long duration, TimeUnit units) throws InterruptedException, TimeoutException, ExecutionException { //阻塞,等待GetOperation.Callback()調用complete方法 if (!latch.await(duration, units)) { // whenever timeout occurs, continuous timeout counter will increase by 1. //記錄某個節點操做超時次數+1 MemcachedConnection.opTimedOut(op); if (op != null) { // op can be null on a flush //標識超時 op.timeOut(); } throw new CheckedOperationTimeoutException( "Timed out waiting for operation", op); } else { // continuous timeout counter will be reset //重置某個節點操做超時次數爲0 MemcachedConnection.opSucceeded(op); } if (op != null && op.hasErrored()) { throw new ExecutionException(op.getException()); } if (isCancelled()) { throw new ExecutionException(new CancellationException("Cancelled")); } if (op != null && op.isTimedOut()) { throw new ExecutionException(new CheckedOperationTimeoutException( "Operation timed out.", op)); } /* TODO: re-add assertion that op.getState() == OperationState.COMPLETE */ //獲取GetOperation中Callbac中gotData方法中 //tcService.decode(tc, new CachedData(flags, data,tc.getMaxSize()))生成的Future return objRef.get(); }