spymemcached源碼深刻分析

spymemcached深刻分析 html

author:智深 前端

version0.7 java

日誌:http://my.oschina.net/astute node

QQ2548921609(技術交流) git


1、簡介

spymemcached 是一個 memcache 的客戶端, 使用 NIO 實現 github

分析 spymemcached 須要瞭解 NIOmemcached使用,memcached協議,參考資料中列出了有用的資源鏈接。 算法

NIONew I/O的縮寫,Java裏邊你們通常稱爲異步IO,實際上對應Linux系統編程中的事件驅動IOevent-driven IO),是對 epoll 的封裝。其它的IO模型還包括同步,阻塞,非阻塞,多路複用(selectpoll)。阻塞/非阻塞是 fd 的屬性,同步會跟阻塞配合,這樣的應用會一直 sleep,直到IO完成被內核喚醒;同步非阻塞的話,第一次讀取時,若是沒有數據,應用線程會馬上返回,可是應用須要肯定以什麼樣的策略進行後面的系統調用,若是是簡單的while循環會致使CPU 100%,複雜的相似自旋的策略增長了應用編程的難度,所以同步非阻塞不多使用。多路複用是Linux早期的一個進程監控多個fd的方式,性能比較低,每次調用涉及3次循環遍歷,具體分析見 http://my.oschina.net/astute/blog/92433 event-driven IO,應用註冊 感興趣的socket IO事件(READ,WRITE),調用wait開始sleep,當條件成立時,如數據到達(可讀),寫緩衝區可用(可寫),內核喚醒應用線程,應用線程根據獲得的socket執行同步的調用讀/寫 數據。 編程


2、協議簡介

memcachded服務器和客戶端之間採用 TCP 的方式通訊,自定義了一套字節流的格式。本文分析的文本協議的構建和其它文本協議相似,mc裏面分紅命令行和數據塊行,命令行裏指明數據塊的字節數目,命令行和數據塊後都跟隨\r\n。重要的一點是服務器在讀取數據塊時是根據命令行裏指定的字節數目,所以數據塊中含有\r\n並不影響服務器讀塊操做。數據塊後必須跟隨\r\n 數組

存儲命令

發送 tomcat

<command name> <key> <flags> <exptime> <bytes> [noreply]\r\n

cas <key> <flags> <exptime> <bytes> <cas unique> [noreply]\r\n

<data block>\r\n

command name = "set", "add", "replace", "append" or "prepend"

flags - 32位整數 server並不操做這個數據 get時返回給客戶端

exptime - 過時時間,能夠是unix時間戳或偏移量,偏移量的話最大爲30*24*60*60, 超過這個值,服務器會認爲是unix時間戳

bytes - 數據塊字節的個數

響應

<data block>\r\n

STORED\r\n - 成功

NOT_STORED\r\n - addreplace命令沒有知足條件

EXISTS\r\n - cas命令 代表item已經被修改

NOT_FOUND\r\n - cas命令 item不存在

獲取命令

發送

get <key>*\r\n

gets <key>*\r\n

<key>* - 空格分割的一個或多個字符串

響應

VALUE <key> <flags> <bytes> [<cas unique>]\r\n

<data block>\r\n

VALUE <key> <flags> <bytes> [<cas unique>]\r\n

<data block>\r\n

END\r\n

本文以 get 操做爲例;key = someKey  value=abcdABC中文

以字節流的形式最終發送的數據

[103, 101, 116, 32, 115, 111, 109, 101, 75, 101, 121, 13, 10, 0]

103 101 116 - "get"

32 - "" 空格

115 111 109 101 75 101 121 - someKey

13 10 - \r\n

接收到的數據

VALUE someKey 0 13

61 62 63 64 41 42 43 E4 B8 AD E6 96 87\r\n

END\r\n

刪除命令

發送

delete <key> [noreply]\r\n

響應

DELETED\r\n - 成功刪除

NOT_FOUND\r\n - 刪除的條目不存在

其它命令

詳見參考資料 mc 協議

3、spymemcached中的重要對象
簡介

spymc的客戶端,所以spy中全部對象須要基於它要完成的 功能 和 mc服務器的通訊協議來進行設計。最重要的MemcachedClient表示mc集羣的client,應用中單例便可。spy中的每個mc節點,用MemcachedNode表示,這個對象內部含有一個channel,網絡鏈接到mc節點。要根據key的哈希值查找某個mc節點,spy中使用NodeLocator,默認locatorArrayModNodeLocator,這個對象內部含有全部的MemcachedNodespy使用的hash算法都在對象DefaultHashAlgorithm中,默認使用NATIVE_HASH,也就是String.hashCode()locatorclient中間還有一個對象,叫MemcachedConnection ,它表示到mc集羣的鏈接,內部持有locatorclent內部持有MemcachedConnection(mconn)spy使用NIO實現,所以有一個selector,這個對象存在於mconn中。要和服務器進行各類操做的通訊,協議數據發送,數據解析,spy中抽象爲Operation,文本協議的get操做最終實現爲net.spy.memcached.protocol.ascii.GetOperationImpl。爲了實現工做線程和IO線程之間的調度,spy抽象出了一個 GetFuture,內部持有一個OperationFuture

TranscodeService執行字節數據和對象之間的轉換,spy中實現方式爲任務隊列+線程池,這個對象的實例在client中。

對象詳解

SpyObject - spy中的基類 定義 Logger

MemcachedConnection 表示到多臺 mc 節點的鏈接

MemcachedConnection 詳細屬性

    shouldOptimize - 是否須要優化多個連續的get操做 --> gets 默認true

    addedQueue - 用來記錄排隊到節點的操做

    selector - 監控到多個 mc 服務器的讀寫事件

    locator - 定位某個 mc 服務器

GetFuture 前端線程和工做線程交互的對象

    --> OperationFuture

ConnectionFactory 建立 MemcachedConnection 實例;建立操做隊列;建立 OperationFactory;制定 Hash 算法。

DefaultConnectionFactory 默認鏈接工廠

DefaultHashAlgorithm - Hash算法的實現類

MemcachedNode 定義到 單個memcached 服務器的鏈接

    TCPMemcachedNodeImpl - 

        AsciiMemcachedNodeImpl - 

        BinaryMemcachedNodeImpl - 

TCPMemcachedNodeImpl 重要屬性

    socketAddress - 服務器地址

    rbuf - 讀緩衝區 默認大小 16384

    wbuf - 寫緩衝區 默認大小 16384

    writeQ - 寫隊列

    readQ - 讀隊列

    inputQueue - 輸入隊列 memcachclient添加操做時先添加到 inputQueue

    opQueueMaxBlockTime - 操做的最大阻塞時間 默認10

    reconnectAttempt - 重連嘗試次數 volatile

    channel - socket 通道

    toWrite - 要向socket發送的字節數

    optimizedOp - 優化後的Operation 實現類是OptimizedGetImpl

       sk - channel註冊到selector後的key

    shouldAuth - 是否須要認證 默認 false

    authLatch - 認證須要的Latch

    reconnectBlocked - 

    defaultOpTimeout - 操做默認超時時間 默認值 2.5

    continuousTimeout - 連續超時次數

    opFact - 操做工廠

MemcachedClient 重要屬性

    mconn - MemcachedConnection 

    opFact - 操做工廠

    transcoder - 解碼器

    tcService - 解碼線程池服務

    connFactory - 鏈接工廠

Operation 全部操做的基本接口

    BaseOperationImpl

        OperationImpl

            BaseGetOpImpl - initialize 協議解析 構建緩衝區

                GetOperationImpl

OperationFactory 爲協議構建操做 好比生成 GetOperation

    BaseOperationFactory

        AsciiOperationFactory - 文本協議的操做工廠 默認的操做工廠

        BinaryOperationFactory - 二進制協議的操做工廠

OperationFactory 根據 protocol handlers 構建操做

    BaseOperationFactory

        AsciiOperationFactory - 支持 ascii protocol

        BinaryOperationFactory - 支持 binary operations

NodeLocator 根據 key hash 值查找節點

    ArrayModNodeLocator - hash 值和節點列表長度取模,做爲下標,簡單的數組查詢

    KetamaNodeLocator - Ketama一致性hash的實現

Transcoder 對象和字節數組之間的轉換接口

    BaseSerializingTranscoder

        SerializingTranscoder - 默認的transcoder

TranscodeService 異步的解碼服務,含有一個線程池

FailureMode - node失效的模式

    Redistribute - 節點失效後移動到下一個有效的節點  默認模式

    Retry - 重試失效節點 直至恢復

    Cancel - 取消操做

4、總體流程
初始化

客戶端執行new MemcachedClient(new InetSocketAddress("192.168.56.101", 11211))。初始化 MemcachedClient,內部初始化MemcachedConnection,建立selector,註冊channelselector,啓動IO線程。

線程模型

初始化完成後,把監聽mc節點事件的線程,也就是調用select的線程,稱爲IO線程;應用執行 c.get("someKey"),把應用所在的線程稱爲工做線程。工做線程一般由tomcat啓動,負責建立操做,加入節點的操做隊列,工做線程一般有多個;IO線程負責從隊列中拿到操做,執行操做。

工做線程

工做線程最終會調用asyncGet,方法內部會建立CountDownLatch(1), GetFutureGetOperationImpl(持有一個內部類,工做線程執行完成後,最終會調用 latch.countDown()),選擇mc節點,操做op初始化(生成寫緩衝區),把op放入節點等待隊列inputQueue中,同時會把當前節點放入mc鏈接(mconn)addedQueue屬性中,最後喚醒selector。最終工做線程在latch上等待(默認超時2.5秒)IO線程的執行結果。

IO線程

IO線程被喚醒後

一、handleInputQueue()。移動OperationinputQueuewriteQ中。對添加到addedQueue中的每個MemcachedNode分別進行處理。這個函數會處理全部節點上的全部操做,所有發送到mc服務器(以前節點上就有寫操做的才這麼處理,不然只是註冊寫事件)。

二、循環過程當中,若是當前node中沒有寫操做,則判斷writeQreadQ中有操做,在SK上註冊讀/寫事件;若是有寫操做,須要執行handleWrites函數。這個函數內部首先作的是填充緩衝區fillWriteBuffer():從writeQ中取出一個可寫的操做(remove掉取消的和超時的),改變操做的狀態爲WRITING,把操做的數據複製到寫緩衝區(寫緩衝區默認16K,操做的字節數從十幾字節到1M,這個地方有複雜的處理,後面會詳細分析,如今只考慮簡單狀況),複製完成後把操做狀態變爲READING,從writeQremove當前操做,把操做addreadQ當中,這個地方會再去複製pending的操做;、發送寫緩衝區的內容,所有發送完成後,會再次去填充緩衝區fillWriteBuffer()(好比說一個大的命令,一個緩衝區不夠)。循環,直到全部的寫操做都處理完。ƒ、判斷writeQreadQ是否有操做,更新sk註冊的讀寫事件。get操做的話,如今已經註冊了讀事件。

三、selector.select()

四、數據到達時,執行handleIO(sk),處理讀事件;執行channel.read(rbuf);執行readFromBuffer(),解析數據,讀取到END\r\n將操做狀態置爲COMPLETE

5、初始化詳細流程

一、默認鏈接工廠爲 DefaultConnectionFactory。接着建立TranscodeService(解碼的線程池,默認線程最多爲10),建立AsciiOperationFactory(支持ascii協議的操做工廠,負責生成各類操做,好比 GetOperationImpl),建立MemcachedConnection,設置操做超時時間(默認2.5秒)。

二、DefaultConnectionFactory建立MemcachedConnection詳細過程:建立reconnectQueueaddedQueue,設置shouldOptimizetrue,設置maxDelay30秒,設置opFact,設置timeoutExceptionThreshold1000(超過這個值,關閉到 mc node 的鏈接),打開 Selector,建立nodesToShutdown,設置bufSize16384字節,建立到每一個node的 MemcachedNode(默認是AsciiMemcachedNodeImpl,這一步建立SocketChannel,鏈接到mc節點,註冊到selector,設置sk爲剛註冊獲得的SelectionKey),最後啓動 MemcachedConnection 線程,進入事件處理的循環代碼 

while(running) handleIO()

6、核心流程代碼
一、工做線程

一切從工做線程調用 c.get("someKey") 方法開始
基本流程是:建立操做(Operation),操做初始化,查找節點,把操做加入節點的等待隊列,喚醒IO線程,而後工做線程在Future上等待IO線程的執行結果

// 默認等待2.5秒
return asyncGet(key, tc).get(2500, TimeUnit.MILLISECONDS)
// 內部類GetOperation.Callback,是工做線程和IO線程交互的類
// IO線程獲得全部的操做響應數據後,調用gotData方法
// IO線程接收到END\r\n後,調用receivedStatus和complete方法
public <T> GetFuture<T> asyncGet(final String key, final Transcoder<T> tc) {
	final CountDownLatch latch = new CountDownLatch(1);
	final GetFuture<T> rv = new GetFuture<T>(latch, operationTimeout, key);
	Operation op = opFact.get(key, new GetOperation.Callback() {
		private Future<T> val = null;
		public void receivedStatus(OperationStatus status) {
			rv.set(val, status);
		}
		public void gotData(String k, int flags, byte[] data) {
			val = tcService.decode(tc, new CachedData(flags, data, tc.getMaxSize()));
		}
		public void complete() {
			latch.countDown();
		}
	});
	rv.setOperation(op);
	mconn.enqueueOperation(key, op);
	return rv;	// 最終會在rv上調用get方法
}
// 向節點中隊列中添加操做 一、查找節點 二、放入隊列
// 查找節點,根據key的hash值,對節點數取模
protected void addOperation(final String key, final Operation o) {
	MemcachedNode placeIn = null;
	MemcachedNode primary = locator.getPrimary(key);
	if (primary.isActive() || failureMode == FailureMode.Retry) {
		placeIn = primary;
	} else if (failureMode == FailureMode.Cancel) {
		o.cancel();
	} else {
		for (Iterator<MemcachedNode> i = locator.getSequence(key); placeIn == null
		&& i.hasNext();) {
			MemcachedNode n = i.next();
			if (n.isActive()) {
				placeIn = n;
			}
		}
		if (placeIn == null) {
			placeIn = primary;
		}
	}
	if (placeIn != null) {
		addOperation(placeIn, o);
	} else {
	}
}
// 最重要的方法 
protected void addOperation(final MemcachedNode node, final Operation o) {
	o.setHandlingNode(node);
	o.initialize();  // 操做初始化,生成要發送的字節流數據,放到緩衝區中
	node.addOp(o);   // 添加到節點的inputQueue中
	addedQueue.offer(node);   // 有操做的節點放入 addedQueue中
	Selector s = selector.wakeup(); // 喚醒IO線程
}
工做線程和IO線程之間傳遞的Future對象,結構以下
GetFuture ---> OperationFuture ---> latch
---> 表示依賴關係


// 最終工做線程在 OperationFuture的get方法上等待latch
public T get(long duration, TimeUnit units) {
	if (!latch.await(duration, units)) { // 等待2.5秒
		MemcachedConnection.opTimedOut(op);
		if (op != null) {
			op.timeOut(); //2.5秒後,操做沒有執行完,設置超時(IO線程會判斷,若是超時,就remove)
		}
		// throw exception
	}
	return objRef.get(); // objRef是一個原子引用,來保證對象的安全發佈(線程安全)
}
// objRef引用的是一個TranscodeService.Task(自己是個FutureTask)對象,若是沒有壓縮和序列化的話,最終工做線程會調用tc.decode方法,獲得返回值。


二、IO線程

IO線程的操做循環
處理輸入隊列,註冊寫事件;執行寫操做,註冊讀事件;處理讀操做,解析結果。

public void run() {
  while (running) {
  handleIO();
  }
}
public void handleIO() throws IOException {
	handleInputQueue();
	int selected = selector.select(delay);
	Set<SelectionKey> selectedKeys = selector.selectedKeys();

	if (selectedKeys.isEmpty() && !shutDown) {
		// some code
	} else {
		for (SelectionKey sk : selectedKeys) {
			handleIO(sk);
		}
		selectedKeys.clear();
	}
}
handleInputQueue
處理addedQueue中的全部節點,對每個節點複製inputQueue中的操做到writeQ中。註冊讀寫事件。
private void handleInputQueue() {
	if (!addedQueue.isEmpty()) {
		Collection<MemcachedNode> toAdd = new HashSet<MemcachedNode>();
		Collection<MemcachedNode> todo = new HashSet<MemcachedNode>();
		MemcachedNode qaNode = null;
		while ((qaNode = addedQueue.poll()) != null) {
			todo.add(qaNode);
		}
		for (MemcachedNode qa : todo) {
			boolean readyForIO = false;
			if (qa.isActive()) {
				if (qa.getCurrentWriteOp() != null) {
					readyForIO = true;
				}
			} else {
				toAdd.add(qa);
			}
			qa.copyInputQueue();
			if (readyForIO) {
				try {
					if (qa.getWbuf().hasRemaining()) {
						handleWrites(qa.getSk(), qa);
					}
				} catch (IOException e) {
					lostConnection(qa);
				}
			}
			qa.fixupOps();
		}
		addedQueue.addAll(toAdd);
	}
}
spy中註冊讀寫事件的函數
readQ不爲空註冊讀事件;writeQ不爲空註冊寫事件;網絡沒有鏈接上註冊鏈接事件。
public final void fixupOps() {
	SelectionKey s = sk;
	if (s != null && s.isValid()) {
		int iops = getSelectionOps();
		s.interestOps(iops);
	} else {
	}
}
public final int getSelectionOps() {
	int rv = 0;
	if (getChannel().isConnected()) {
		if (hasReadOp()) {
			rv |= SelectionKey.OP_READ;
		}
		if (toWrite > 0 || hasWriteOp()) {
			rv |= SelectionKey.OP_WRITE;
		}
	} else {
		rv = SelectionKey.OP_CONNECT;
	}
	return rv;
}
public final boolean hasReadOp() {
	return !readQ.isEmpty();
}

public final boolean hasWriteOp() {
	return !(optimizedOp == null && writeQ.isEmpty());
}

三、handleWrites(SelectionKey sk, MemcachedNode qa)
我可以想到的一些場景,這個狀態機代碼必須處理的
⑴ 當前隊列中有1個操做,操做要發送的字節數目小於16K
⑵ 當前隊列中有1個操做,操做要發送的字節數目大於16K(很大的set操做)
⑶ 當前隊列中有多個操做,操做要發送的字節數目小於16K
⑷ 當前隊列中有多個操做,操做要發送的字節數目大於16K
⑸ 任意一次寫操做wrote爲0

summary:處理節點中writeQ和inputQueue中的全部操做。每次循環會盡可能填滿發送緩衝區,而後將發送緩衝區的內容所有發送到網絡上,循環往復,沒有異常的狀況下,直至發送完數據。操做中發送的內容只要放入到發送緩衝區後,就把操做加入到readQ(spy中根據writeQ和readQ中有沒有數據,來註冊讀寫事件)。

執行時機:IO線程在select上休眠,被工做線程喚醒後,處理輸入隊列,把操做複製到writeQ 中,註冊寫事件;再次調用select,返回後,就會調用handleWrites(),數據所有發送後,會註冊讀事件。處理輸入隊列時,若是wbuf還有東西沒有發送,那麼會在select調用前,調用handleWrites函數。

private void handleWrites(SelectionKey sk, MemcachedNode qa) throws IOException {
	qa.fillWriteBuffer(shouldOptimize); --->
	boolean canWriteMore = qa.getBytesRemainingToWrite() > 0;
	while (canWriteMore) {
		int wrote = qa.writeSome(); --->
		qa.fillWriteBuffer(shouldOptimize);
		canWriteMore = wrote > 0 && qa.getBytesRemainingToWrite() > 0;
	}
}

 -- 發送數據;執行一次後,wbuf可能還有數據未寫完
public final int writeSome() throws IOException {
	int wrote = channel.write(wbuf);
	toWrite -= wrote;
	return wrote;
}
-- 填充緩衝區
toWrite=0 代表 寫緩衝區之前的內容已經所有寫入到網絡中,這樣纔會進行下一次的填充寫緩衝區
操做會盡可能填滿16K的緩衝區(單一操做數據量很大好比500K;或多個操做數據量500K)
當一個操做中的數據徹底寫入緩衝區後,操做的狀態變成READING,從writeQ中移除當前操做。
public final void fillWriteBuffer(boolean shouldOptimize) {
	if (toWrite == 0 && readQ.remainingCapacity() > 0) {
		getWbuf().clear();
		Operation o=getNextWritableOp(); --->

		while(o != null && toWrite < getWbuf().capacity()) {
			synchronized(o) {
				ByteBuffer obuf = o.getBuffer();
				int bytesToCopy = Math.min(getWbuf().remaining(), obuf.remaining());
				byte[] b = new byte[bytesToCopy];
				obuf.get(b);
				getWbuf().put(b);
				if (!o.getBuffer().hasRemaining()) {
					o.writeComplete();
					transitionWriteItem();
					preparePending(); -- copyInputQueue()
					if (shouldOptimize) {
						optimize();
					}
					o=getNextWritableOp();
				}
				toWrite += bytesToCopy;
			}
		}
		getWbuf().flip();
	} else {
	}
}
-- 獲取節點寫隊列中下一個可寫的操做
若是操做已經取消(前端線程等待超時,取消操做),或超時(IO線程沒有來得及執行操做,操做超時),那麼把操做從隊列中移除,繼續查找下一個操做。把可寫的操做的狀態從WRITE_QUEUED變成WRITING,同時把操做放入讀隊列中。
private Operation getNextWritableOp() {
	Operation o = getCurrentWriteOp(); --->④
	while (o != null && o.getState() == OperationState.WRITE_QUEUED) {
		synchronized(o) {
			if (o.isCancelled()) {
				Operation cancelledOp = removeCurrentWriteOp();--->⑤
			} else if (o.isTimedOut(defaultOpTimeout)) {
				Operation timedOutOp = removeCurrentWriteOp();
			} else {
				o.writing();
				if (!(o instanceof TapAckOperationImpl)) {
					readQ.add(o);
				}
				return o;
			}
			o = getCurrentWriteOp();
		}
	}
	return o;
}
④ -- 拿到當前寫操做(並不從隊列中移除)
public final Operation getCurrentWriteOp() {
	return optimizedOp == null ? writeQ.peek() : optimizedOp;
}

⑤ -- remove當前寫操做
public final Operation removeCurrentWriteOp() {
	Operation rv = optimizedOp;
	if (rv == null) {
		rv = writeQ.remove();
	} else {
		optimizedOp = null;
	}
	return rv;
}
四、handleReads
handleReads(SelectionKey sk, MemcachedNode qa)
從網絡中讀取數據,放入rbuf。解析rbuf,獲得結果;
private void handleReads(SelectionKey sk, MemcachedNode qa) throws IOException {
	Operation currentOp = qa.getCurrentReadOp();
	if (currentOp instanceof TapAckOperationImpl) { // no response
		qa.removeCurrentReadOp();
		return;
	}
	ByteBuffer rbuf = qa.getRbuf();
	final SocketChannel channel = qa.getChannel();
	int read = channel.read(rbuf);
	if (read < 0) {
		// some code
	}
	while (read > 0) { // 從網絡中讀數據一直讀到0爲止
		rbuf.flip();
		while (rbuf.remaining() > 0) { // 只要緩衝區有數據 就去解析操做
			synchronized(currentOp) {
				currentOp.readFromBuffer(rbuf); // 從rbuf中解析響應
				if (currentOp.getState() == OperationState.COMPLETE) {
					Operation op = qa.removeCurrentReadOp(); // 操做解析成功,移除
				} else if (currentOp.getState() == OperationState.RETRY) {
					((VBucketAware) currentOp).addNotMyVbucketNode(currentOp.getHandlingNode());
					Operation op = qa.removeCurrentReadOp();
					retryOps.add(currentOp);
				}
			}
			currentOp=qa.getCurrentReadOp();
		}
		rbuf.clear();
		read = channel.read(rbuf);
	}
}
// 解析rbuf;readType有兩種取值,LINE 和 DATA,用來區分正在操做的數據是命令行仍是數據塊。解析過程當中,分別調用工做線程傳入到操做中的回調對象的三個方法,分別是:receivedStatus,gotData,complete。
public void readFromBuffer(ByteBuffer data) throws IOException {
	while (getState() != OperationState.COMPLETE && data.remaining() > 0) {
		if (readType == OperationReadType.DATA) {
			handleRead(data);
		} else {
			int offset = -1;
			for (int i = 0; data.remaining() > 0; i++) {
				byte b = data.get();
				if (b == '\r') {
					foundCr = true;
				} else if (b == '\n') {
					assert foundCr : "got a \\n without a \\r";
					offset = i;
					foundCr = false;
					break;
				} else {
					assert !foundCr : "got a \\r without a \\n";
					byteBuffer.write(b);
				}
			}
			if (offset >= 0) {
				String line = new String(byteBuffer.toByteArray(), CHARSET);
				byteBuffer.reset();
				OperationErrorType eType = classifyError(line);
				if (eType != null) {
					handleError(eType, line);
				} else {
					handleLine(line); // 取到完整的一行後 調用這個函數 解析行數據
				}
			}
		}
	}
}
// 處理命令行和END行
// 解析命令行時,分析各類參數 data爲數據塊的字節數
public final void handleLine(String line) {
	if (line.equals("END")) {
		if (hasValue) {
			getCallback().receivedStatus(END);
		} else {
			getCallback().receivedStatus(NOT_FOUND);
		}
		transitionState(OperationState.COMPLETE);
		data = null;
	} else if (line.startsWith("VALUE ")) {
		String[] stuff = line.split(" ");
		currentKey = stuff[1];
		currentFlags = Integer.parseInt(stuff[2]);
		data = new byte[Integer.parseInt(stuff[3])];
		if (stuff.length > 4) {
			casValue = Long.parseLong(stuff[4]);
		}
		readOffset = 0;
		hasValue = true;
		setReadType(OperationReadType.DATA);
	} else if (line.equals("LOCK_ERROR")) {
		getCallback().receivedStatus(LOCK_ERROR);
		transitionState(OperationState.COMPLETE);
	} else {
		assert false : "Unknown line type: " + line;
	}
}

五、那個著名的bug

JAVA NIO bug 會致使 CPU 100%

http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6403933 

int selected = selector.select(delay);

    Set<SelectionKey> selectedKeys = selector.selectedKeys();

    if (selectedKeys.isEmpty() && !shutDown) {

      if (++emptySelects > DOUBLE_CHECK_EMPTY) {

        for (SelectionKey sk : selector.keys()) {

          if (sk.readyOps() != 0) {

            handleIO(sk);

          } else {

            lostConnection((MemcachedNode) sk.attachment());

          }

DOUBLE_CHECK_EMPTY = 256,當連續的select返回爲空時,++emptySelects,超過256,鏈接到當前mc節點的socket channel關閉,放入重連隊列。

7、調試 spymemcached

調試 spymemcached IO線程的過程當中,工做線程放入到節點隊列的操做很容易超時,所以須要繼承DefaultConnectionFactory 複寫相關方法。

public class AstuteConnectionFactory extends DefaultConnectionFactory {
@Override
public boolean shouldOptimize() {
return false;
}
@Override
public long getOperationTimeout() {
return 3000000L; // 3000S
}
}

8、參考資料

NIOhttp://www.ibm.com/developerworks/cn/education/java/j-nio/index.html 

memcachedhttp://memcached.org/ 

protocolhttps://github.com/memcached/memcached/blob/master/doc/protocol.txt

相關文章
相關標籤/搜索