Lettuce 鏈接被設計爲線程安全,因此一個鏈接能夠被多個線程共享,同時lettuce鏈接默認是自動重連.雖然鏈接池在大多數狀況下是沒必要要的,但在某些用例中多是有用的.lettuce提供通用的鏈接池支持. 若有疏漏後續會更新 https://www.cnblogs.com/wei-zw/p/9163687.htmlhtml
鏈接池是否有必要?java
Lettuce被線程安全的,它知足了多數場景需求. 全部Redis用戶的操做是單線程執行的.使用多鏈接並不能改善一個應用的性能. 阻塞操做的使用一般與得到專用鏈接的工做線程結合在一塊兒.
使用Redis事務是使用動態鏈接池的典型場景,由於須要專用鏈接的線程數趨於動態.也就是說,動態鏈接池的需求是有限的.鏈接池老是伴隨着複雜性和維護成本提高.react
同步鏈接池apache
使用命令式編程,同步鏈接池是正確的選擇,由於它在用於執行執行Redis命令的線程上執行全部操做.編程
前提條件
Lettuce須要依賴 Apache的 common-pool2(至少是2.2)提供鏈接池. 確認在你的classpath下包含這個依賴.不然你就不能使用鏈接池.
若是使用Maven,向你的pom.xml添加以下依賴緩存
<dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.4.3</version> </dependency>
鏈接池支持安全
Lettuce提供通用鏈接池支持,它須要一個用於建立任何支持類型鏈接(單個,發佈訂閱,哨兵,主從,集羣)的提供者. ConnectionPoolSupport 將根據你的需求建立一個 GenericObjectPool或SoftReferenceObjectPool. 鏈接池能夠分配包裝類型或直接鏈接async
- 包裝實例在調用StatefulConnection.close()時,會將鏈接歸還到鏈接池
- 直接鏈接須要調用GenericObjectPool.returnObject(...)歸還到鏈接池
基本用法:ide
包裝鏈接源碼分析
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); poolConfig.setMaxIdle(2); GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport.createGenericObjectPool( () -> client.connect(), poolConfig); for (int i = 0; i < 10; i++) { StatefulRedisConnection<String, String> connection = pool.borrowObject(); RedisCommands<String, String> sync = connection.sync(); sync.ping(); connection.close(); }
直接鏈接
GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport.createGenericObjectPool( () -> client.connect(), new GenericObjectPoolConfig(), false); for (int i = 0; i < 10; i++) { StatefulRedisConnection<String, String> connection = pool.borrowObject(); RedisCommands<String, String> sync = connection.sync(); sync.ping();
//主動將鏈接歸還到鏈接池 pool.returnObject(connection); }
相關源碼分析
public static <T extends StatefulConnection<?, ?>> GenericObjectPool<T> createGenericObjectPool( Supplier<T> connectionSupplier, GenericObjectPoolConfig config, boolean wrapConnections) { LettuceAssert.notNull(connectionSupplier, "Connection supplier must not be null"); LettuceAssert.notNull(config, "GenericObjectPoolConfig must not be null"); AtomicReference<ObjectPool<T>> poolRef = new AtomicReference<>(); GenericObjectPool<T> pool = new GenericObjectPool<T>(new RedisPooledObjectFactory<T>(connectionSupplier), config) { @Override public T borrowObject() throws Exception { //若是wrapConnection 設置爲true,則對鏈接建立動態代理 return wrapConnections ? wrapConnection(super.borrowObject(), this) : super.borrowObject(); } @Override public void returnObject(T obj) { if (wrapConnections && obj instanceof HasTargetConnection) { super.returnObject((T) ((HasTargetConnection) obj).getTargetConnection()); return; } super.returnObject(obj); } }; poolRef.set(pool); return pool; }
建立一個包裝類型到鏈接
private static <T> T wrapConnection(T connection, ObjectPool<T> pool) { //建立調用處理器 ReturnObjectOnCloseInvocationHandler<T> handler = new ReturnObjectOnCloseInvocationHandler<T>(connection, pool); Class<?>[] implementedInterfaces = connection.getClass().getInterfaces(); Class[] interfaces = new Class[implementedInterfaces.length + 1]; interfaces[0] = HasTargetConnection.class; System.arraycopy(implementedInterfaces, 0, interfaces, 1, implementedInterfaces.length); //建立代理鏈接 T proxiedConnection = (T) Proxy.newProxyInstance(connection.getClass().getClassLoader(), interfaces, handler); //向鏈接調用處理器設置代理鏈接 handler.setProxiedConnection(proxiedConnection); //返回代理鏈接 return proxiedConnection; }
包裝類型鏈接的動態調用處理器
private static class ReturnObjectOnCloseInvocationHandler<T> extends AbstractInvocationHandler { //被代理對鏈接 private T connection; private T proxiedConnection; private Map<Method, Object> connectionProxies = new ConcurrentHashMap<>(5, 1); //鏈接池 private final ObjectPool<T> pool; ReturnObjectOnCloseInvocationHandler(T connection, ObjectPool<T> pool) { this.connection = connection; this.pool = pool; } //設置代理鏈接 void setProxiedConnection(T proxiedConnection) { this.proxiedConnection = proxiedConnection; } @Override protected Object handleInvocation(Object proxy, Method method, Object[] args) throws Throwable { //若是調用方法是 getStatefulConnection則返回代理鏈接 if (method.getName().equals("getStatefulConnection")) { return proxiedConnection; } //若是調用的方法是getTargetConnection 則返回真實鏈接 if (method.getName().equals("getTargetConnection")) { return connection; } //若是真實鏈接爲null則拋出異常 if (connection == null) { throw new RedisException("Connection is deallocated and cannot be used anymore."); } //若是調用的方法是close則將代理鏈接歸還到鏈接池,並將真實鏈接設置和代理鏈接設置爲null if (method.getName().equals("close")) { pool.returnObject(proxiedConnection); connection = null; proxiedConnection = null; connectionProxies.clear(); return null; } try { //若是調用方法是獲取鏈接則從代理鏈接池中獲取,若是沒有則建立代理鏈接並放入緩存 if (method.getName().equals("sync") || method.getName().equals("async") || method.getName().equals("reactive")) { return connectionProxies.computeIfAbsent( method, m -> getInnerProxy(method, args)); } //其它方法不在多任何攔截 return method.invoke(connection, args); } catch (InvocationTargetException e) { throw e.getTargetException(); } } @SuppressWarnings("unchecked") private Object getInnerProxy(Method method, Object[] args) { try { Object result = method.invoke(connection, args); result = Proxy.newProxyInstance(getClass().getClassLoader(), result.getClass().getInterfaces(), new DelegateCloseToConnectionInvocationHandler<>((AutoCloseable) proxiedConnection, result)); return result; } catch (IllegalAccessException e) { throw new RedisException(e); } catch (InvocationTargetException e) { throw new RedisException(e.getTargetException()); } } public T getConnection() { return connection; } }