Lettuce同步源碼分析html
在上一篇分享中分享了單機模式異步鏈接建立過程Lettuce建立鏈接過程源碼分析; 在本次分享內容主要介紹同步命令的處理過程.java
Lettuce是基於Netty的Redis高級客戶端,對於異步命令來講是自然的,那麼lettuce中是如何處理同步命令的呢?實際上同步鏈接仍是對異步命令的一次封裝;下面咱們就經過源碼進行分析看看Lettuce中的具體實現.react
經過上一篇文章中能夠知道在StatefulRedisConnectionImpl中建立 異步模式,同步模式以及響應式模式命令處理模式,那麼咱們就從 該處看起redis
public StatefulRedisConnectionImpl(RedisChannelWriter writer, RedisCodec<K, V> codec, Duration timeout) { super(writer, timeout); this.codec = codec; //建立異步redis命令處理模式 this.async = newRedisAsyncCommandsImpl(); //建立redis命令同步處理模式 this.sync = newRedisSyncCommandsImpl(); //建立redis命令響應式處理模式 this.reactive = newRedisReactiveCommandsImpl(); }
經過這裏彷佛看不出同步處理模式同異步處理模式有什麼關聯,那麼咱們在深刻進去看一下api
protected RedisCommands<K, V> newRedisSyncCommandsImpl() { return syncHandler(async(), RedisCommands.class, RedisClusterCommands.class); }
在這段代碼中能夠看到async(),這個就是redis命令異步處理模式,那麼它是如何封裝的呢?異步
protected <T> T syncHandler(Object asyncApi, Class<?>... interfaces) { //對異步API建立調用處理器 FutureSyncInvocationHandler h = new FutureSyncInvocationHandler((StatefulConnection<?, ?>) this, asyncApi, interfaces); //建立動態代理 return (T) Proxy.newProxyInstance(AbstractRedisClient.class.getClassLoader(), interfaces, h); }
經過上面對源碼能夠發現原來是對異步api建立了一個JDK動態代理;那麼關鍵的邏輯仍是在FutureSyncInvocationHandler中,對於動態代理的知識就不在展開了.async
在invoke處理是在AbstractInvocationHandler中完成的,它將一些基本公用的抽象在了基類中,將特殊的實現延遲到子類中實現.源碼分析
public final Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //若是參數爲null則 將args設置爲"{}" if (args == null) { args = NO_ARGS; } //若是參數長度爲0同時方法名稱爲hashCode則直接返回hashCode if (args.length == 0 && method.getName().equals("hashCode")) { return hashCode(); } //若是是equals if (args.length == 1 && method.getName().equals("equals") && method.getParameterTypes()[0] == Object.class) { Object arg = args[0]; if (arg == null) { return false; } if (proxy == arg) { return true; } return isProxyOfSameInterfaces(arg, proxy.getClass()) && equals(Proxy.getInvocationHandler(arg)); } //若是是toString if (args.length == 0 && method.getName().equals("toString")) { return toString(); } return handleInvocation(proxy, method, args); }
在FutureSyncInvocationHandler中實現了同步命令處理過程,其源碼以下:post
protected Object handleInvocation(Object proxy, Method method, Object[] args) throws Throwable { try { //獲取當前method在asyncApi 中對應的方法 Method targetMethod = this.translator.get(method); //調用異步接口 Object result = targetMethod.invoke(asyncApi, args); //若是返回結果是RedisFuture類型 if (result instanceof RedisFuture<?>) { //類型強轉 RedisFuture<?> command = (RedisFuture<?>) result; //若是不是事務控制方法 同時還在事務中則返回null if (isNonTxControlMethod(method.getName()) && isTransactionActive(connection)) { return null; } //是事務控制方法,或不在事務中則進行以下處理 //等待超時或取消 LettuceFutures.awaitOrCancel(command, connection.getTimeout().toNanos(), TimeUnit.NANOSECONDS); //返回結果,這裏處理不是很好 上一步中就能夠直接返回了 return command.get(); } //若是不是RedisFuture類型則直接返回 return result; } catch (InvocationTargetException e) { throw e.getTargetException(); } }
在上文中有一段是獲取獲取指定方法在delegate中對應方法的處理,下面就看看這個處理是如何實現的this
/** * 方法翻譯器 */ protected static class MethodTranslator { private final static WeakHashMap<Class<?>, MethodTranslator> TRANSLATOR_MAP = new WeakHashMap<>(32); //真實方法和代理類中方法映射表 private final Map<Method, Method> map; private MethodTranslator(Class<?> delegate, Class<?>... methodSources) { map = createMethodMap(delegate, methodSources); } /** * 經過指定代理類,和目標類建立方法翻譯器 */ public static MethodTranslator of(Class<?> delegate, Class<?>... methodSources) { //同步代碼塊 synchronized (TRANSLATOR_MAP) { //若是翻譯器映射表中不存在delegate的翻譯器則建立一個新的 return TRANSLATOR_MAP.computeIfAbsent(delegate, key -> new MethodTranslator(key, methodSources)); } } private Map<Method, Method> createMethodMap(Class<?> delegate, Class<?>[] methodSources) { Map<Method, Method> map; List<Method> methods = new ArrayList<>(); //遍歷源類,找到全部public方法 for (Class<?> sourceClass : methodSources) { methods.addAll(getMethods(sourceClass)); } map = new HashMap<>(methods.size(), 1.0f); //建立方法和代理類的方法的映射表 for (Method method : methods) { try { map.put(method, delegate.getMethod(method.getName(), method.getParameterTypes())); } catch (NoSuchMethodException ignore) { } } return map; } //獲取目標方法中的全部方法 private Collection<? extends Method> getMethods(Class<?> sourceClass) { //目標方法集合 Set<Method> result = new HashSet<>(); Class<?> searchType = sourceClass; while (searchType != null && searchType != Object.class) { //將目標類中全部public方法添加到集合中 result.addAll(filterPublicMethods(Arrays.asList(sourceClass.getDeclaredMethods()))); //若是souceClass是接口類型 if (sourceClass.isInterface()) { //獲取souceClass的全部接口 Class<?>[] interfaces = sourceClass.getInterfaces(); //遍歷接口,將接口的public方法也添加到方法集合中 for (Class<?> interfaceClass : interfaces) { result.addAll(getMethods(interfaceClass)); } searchType = null; } else {//若是不是接口則查找父類 searchType = searchType.getSuperclass(); } } return result; } //獲取給定方法集合中全部public方法 private Collection<? extends Method> filterPublicMethods(List<Method> methods) { List<Method> result = new ArrayList<>(methods.size()); for (Method method : methods) { if (Modifier.isPublic(method.getModifiers())) { result.add(method); } } return result; } public Method get(Method key) { //從方法映射表中獲取目標方法 Method result = map.get(key); //若是目標方法不爲null則返回,不然拋出異常 if (result != null) { return result; } throw new IllegalStateException("Cannot find source method " + key); } } }