首先看下spy memcache的使用demonode
List<InetSocketAddress> serverAddress = new LinkedList<>(); serverAddress.add(new InetSocketAddress("127.0.0.1", 11211)); //構造者 ConnectionFactoryBuilder connectionFactoryBuilder = new ConnectionFactoryBuilder(); connectionFactoryBuilder.setOpTimeout(50); connectionFactoryBuilder.setTimeoutExceptionThreshold(DEFAULT_MAX_TIMEOUTEXCEPTION_THRESHOLD); connectionFactoryBuilder.setReadBufferSize(65535); MemcachedClient client = new MemcachedClient(connectionFactoryBuilder.build(), serverAddress); System.out.println(client.set("zhurui", 2, "zhurui").get());
首先設置ip、port,client能夠鏈接多個memcache node。而後建立ConnectionFactoryBuilder,設置一些基礎屬性,而後buid,返回一個DefaultConnectionFactory,並實現了抽象方法。而後new MemcachedClient。異步
接下來看下MemcachedClient的構造方法:socket
public MemcachedClient(ConnectionFactory cf, List<InetSocketAddress> addrs) throws IOException { if (cf == null) { throw new NullPointerException("Connection factory required"); } if (addrs == null) { throw new NullPointerException("Server list required"); } if (addrs.isEmpty()) { throw new IllegalArgumentException("You must have at least one server to" + " connect to"); } if (cf.getOperationTimeout() <= 0) { throw new IllegalArgumentException("Operation timeout must be positive."); } connFactory = cf; //初始化異步轉換編碼器,內部建立一個線程池 tcService = new TranscodeService(cf.isDaemon()); //返回默認的object-》byte array序列化qi器SerializingTranscoder transcoder = cf.getDefaultTranscoder(); //返回AsciiOperationFactory opFact = cf.getOperationFactory(); assert opFact != null : "Connection factory failed to make op factory"; //建立MemcachedConnection,核心,待會分析 mconn = cf.createConnection(addrs); assert mconn != null : "Connection factory failed to make a connection"; operationTimeout = cf.getOperationTimeout(); authDescriptor = cf.getAuthDescriptor(); executorService = cf.getListenerExecutorService(); if (authDescriptor != null) { addObserver(this); } }
下面看cf.createConnection(addrs)ide
public MemcachedConnection createConnection(List<InetSocketAddress> addrs) throws IOException { return new MemcachedConnection(getReadBufSize(), this, addrs, getInitialObservers(), getFailureMode(), getOperationFactory()); }
getFailureMode( )返回DEFAULT_FAILURE_MODE,即FailureMode.Redistribute,策略是當前節點處理失敗能夠轉移其餘的節點處理memcached
getOperationFactory( )返回AsciiOperationFactory對象優化
下面分析怎麼構造MemcachedConnection對象的:ui
/** * Construct a {@link MemcachedConnection}. * * @param bufSize the size of the buffer used for reading from the server. * @param f the factory that will provide an operation queue. * @param a the addresses of the servers to connect to. * @param obs the initial observers to add. * @param fm the failure mode to use. * @param opfactory the operation factory. * @throws IOException if a connection attempt fails early */ public MemcachedConnection(final int bufSize, final ConnectionFactory f, final List<InetSocketAddress> a, final Collection<ConnectionObserver> obs, final FailureMode fm, final OperationFactory opfactory) throws IOException { connObservers.addAll(obs); //重連隊列,key是過多久能夠嘗試重連的時間 reconnectQueue = new TreeMap<Long, MemcachedNode>(); //用來記錄排隊到節點的操做 addedQueue = new ConcurrentLinkedQueue<MemcachedNode>(); //請求失敗策略 failureMode = fm; //是否須要優化多個連續的get操做,默認false,不作優化處理 shouldOptimize = f.shouldOptimize(); //重連最大等待時間,這裏是30s maxDelay = TimeUnit.SECONDS.toMillis(f.getMaxReconnectDelay()); //clone,create operation工廠,這裏是AsciiOperationFactory opFact = opfactory; //最大的鏈接超時致使異常的次數 996 timeoutExceptionThreshold = f.getTimeoutExceptionThreshold(); //選擇器 selector = Selector.open(); //存放須要被重試的操做 retryOps = Collections.synchronizedList(new ArrayList<Operation>()); //存放須要被定時關閉的節點 nodesToShutdown = new ConcurrentLinkedQueue<MemcachedNode>(); //回調用的鏈接池 listenerExecutorService = f.getListenerExecutorService(); //從服務端讀取數據的buffer size ,65535 this.bufSize = bufSize; //建立 MemcachedNode的工廠 this.connectionFactory = f; String verifyAlive = System.getProperty("net.spy.verifyAliveOnConnect"); if(verifyAlive != null && verifyAlive.equals("true")) { verifyAliveOnConnect = true; } else { verifyAliveOnConnect = false; } wakeupDelay = Integer.parseInt( System.getProperty("net.spy.wakeupDelay", Integer.toString(DEFAULT_WAKEUP_DELAY))); //根據提供的地址建立memcache鏈接 List<MemcachedNode> connections = createConnections(a); //建立locator,這邊使用Native hash (String.hashCode())取模的方法進行多節點負載 locator = f.createLocator(connections); metrics = f.getMetricCollector(); metricType = f.enableMetrics(); registerMetrics(); setName("Memcached IO over " + this); setDaemon(f.isDaemon()); //啓動MemcachedConnection的run方法 start(); }
接下來看下怎麼建立MemcachedConnection的this
protected List<MemcachedNode> createConnections( final Collection<InetSocketAddress> addrs) throws IOException { List<MemcachedNode> connections = new ArrayList<MemcachedNode>(addrs.size()); for (SocketAddress sa : addrs) { //打開一個SocketChannel SocketChannel ch = SocketChannel.open(); //設置成非阻塞 ch.configureBlocking(false); //建立一個memcachedNode,實現類是AsciiMemcachedNodeImpl, //內部初始化了讀操做、寫操做、鏈接操做隊列,最大阻塞操等待做完成時間(10s),操做(50ms)、驗證超時設置 MemcachedNode qa = connectionFactory.createMemcachedNode(sa, ch, bufSize); qa.setConnection(this); int ops = 0; //設置NoDelay爲true ch.socket().setTcpNoDelay(!connectionFactory.useNagleAlgorithm()); try { //鏈接服務端,若是當即鏈接成功則返回true if (ch.connect(sa)) { getLogger().info("Connected to %s immediately", qa); connected(qa); } else { getLogger().info("Added %s to connect queue", qa); ops = SelectionKey.OP_CONNECT; } //喚醒select()方法 selector.wakeup(); //註冊CONNECT事件並把selection保存到node qa.setSk(ch.register(selector, ops, qa)); assert ch.isConnected() || qa.getSk().interestOps() == SelectionKey.OP_CONNECT : "Not connected, and not wanting to connect"; } catch (SocketException e) { getLogger().warn("Socket error on initial connect", e); queueReconnect(qa); } connections.add(qa); } return connections; }