Lettuce同步命令源碼分析

  Lettuce同步源碼分析html

    在上一篇分享中分享了單機模式異步鏈接建立過程Lettuce建立鏈接過程源碼分析; 在本次分享內容主要介紹同步命令的處理過程.java

Lettuce是基於Netty的Redis高級客戶端,對於異步命令來講是自然的,那麼lettuce中是如何處理同步命令的呢?實際上同步鏈接仍是對異步命令的一次封裝;下面咱們就經過源碼進行分析看看Lettuce中的具體實現.react

   經過上一篇文章中能夠知道在StatefulRedisConnectionImpl中建立 異步模式,同步模式以及響應式模式命令處理模式,那麼咱們就從 該處看起redis

    public StatefulRedisConnectionImpl(RedisChannelWriter writer, RedisCodec<K, V> codec, Duration timeout) {

        super(writer, timeout);

        this.codec = codec;
        //建立異步redis命令處理模式
        this.async = newRedisAsyncCommandsImpl();
        //建立redis命令同步處理模式
        this.sync = newRedisSyncCommandsImpl();
        //建立redis命令響應式處理模式
        this.reactive = newRedisReactiveCommandsImpl();
    }

   經過這裏彷佛看不出同步處理模式同異步處理模式有什麼關聯,那麼咱們在深刻進去看一下api

protected RedisCommands<K, V> newRedisSyncCommandsImpl() {
        return syncHandler(async(), RedisCommands.class, RedisClusterCommands.class);
    }

  在這段代碼中能夠看到async(),這個就是redis命令異步處理模式,那麼它是如何封裝的呢?異步

protected <T> T syncHandler(Object asyncApi, Class<?>... interfaces) {
        //對異步API建立調用處理器
        FutureSyncInvocationHandler h = new FutureSyncInvocationHandler((StatefulConnection<?, ?>) this, asyncApi, interfaces);
        //建立動態代理
        return (T) Proxy.newProxyInstance(AbstractRedisClient.class.getClassLoader(), interfaces, h);
    }

  經過上面對源碼能夠發現原來是對異步api建立了一個JDK動態代理;那麼關鍵的邏輯仍是在FutureSyncInvocationHandler中,對於動態代理的知識就不在展開了.async

在invoke處理是在AbstractInvocationHandler中完成的,它將一些基本公用的抽象在了基類中,將特殊的實現延遲到子類中實現.源碼分析

 public final Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        //若是參數爲null則 將args設置爲"{}"
        if (args == null) {
            args = NO_ARGS;
        }
        //若是參數長度爲0同時方法名稱爲hashCode則直接返回hashCode
        if (args.length == 0 && method.getName().equals("hashCode")) {
            return hashCode();
        }
        //若是是equals
        if (args.length == 1 && method.getName().equals("equals") && method.getParameterTypes()[0] == Object.class) {
            Object arg = args[0];
            if (arg == null) {
                return false;
            }
            if (proxy == arg) {
                return true;
            }
            return isProxyOfSameInterfaces(arg, proxy.getClass()) && equals(Proxy.getInvocationHandler(arg));
        }
        //若是是toString
        if (args.length == 0 && method.getName().equals("toString")) {
            return toString();
        }
        return handleInvocation(proxy, method, args);
    }

  在FutureSyncInvocationHandler中實現了同步命令處理過程,其源碼以下:post

 protected Object handleInvocation(Object proxy, Method method, Object[] args) throws Throwable {

        try {
            //獲取當前method在asyncApi 中對應的方法
            Method targetMethod = this.translator.get(method);
            //調用異步接口
            Object result = targetMethod.invoke(asyncApi, args);
            //若是返回結果是RedisFuture類型
            if (result instanceof RedisFuture<?>) {
               //類型強轉
                RedisFuture<?> command = (RedisFuture<?>) result;
                  //若是不是事務控制方法 同時還在事務中則返回null
                if (isNonTxControlMethod(method.getName()) && isTransactionActive(connection)) {
                    return null;
                }
                //是事務控制方法,或不在事務中則進行以下處理
                //等待超時或取消
                LettuceFutures.awaitOrCancel(command, connection.getTimeout().toNanos(), TimeUnit.NANOSECONDS);
               //返回結果,這裏處理不是很好 上一步中就能夠直接返回了
                return command.get();
            }
            //若是不是RedisFuture類型則直接返回
            return result;
        } catch (InvocationTargetException e) {
            throw e.getTargetException();
        }
    }

  在上文中有一段是獲取獲取指定方法在delegate中對應方法的處理,下面就看看這個處理是如何實現的this

/**
     * 方法翻譯器
     */
    protected static class MethodTranslator {

        private final static WeakHashMap<Class<?>, MethodTranslator> TRANSLATOR_MAP = new WeakHashMap<>(32);
        
        //真實方法和代理類中方法映射表
        private final Map<Method, Method> map;

        private MethodTranslator(Class<?> delegate, Class<?>... methodSources) {

            map = createMethodMap(delegate, methodSources);
        }

        /**
         * 經過指定代理類,和目標類建立方法翻譯器
         */
        public static MethodTranslator of(Class<?> delegate, Class<?>... methodSources) {
            //同步代碼塊
            synchronized (TRANSLATOR_MAP) {
                //若是翻譯器映射表中不存在delegate的翻譯器則建立一個新的
                return TRANSLATOR_MAP.computeIfAbsent(delegate, key -> new MethodTranslator(key, methodSources));
            }
        }

        private Map<Method, Method> createMethodMap(Class<?> delegate, Class<?>[] methodSources) {

            Map<Method, Method> map;
            List<Method> methods = new ArrayList<>();
            //遍歷源類,找到全部public方法
            for (Class<?> sourceClass : methodSources) {
                methods.addAll(getMethods(sourceClass));
            }

            map = new HashMap<>(methods.size(), 1.0f);

            //建立方法和代理類的方法的映射表
            for (Method method : methods) {

                try {
                    map.put(method, delegate.getMethod(method.getName(), method.getParameterTypes()));
                } catch (NoSuchMethodException ignore) {
                }
            }
            return map;
        }
       //獲取目標方法中的全部方法
        private Collection<? extends Method> getMethods(Class<?> sourceClass) {

            //目標方法集合
            Set<Method> result = new HashSet<>();

            Class<?> searchType = sourceClass;
            while (searchType != null && searchType != Object.class) {
                 //將目標類中全部public方法添加到集合中
                result.addAll(filterPublicMethods(Arrays.asList(sourceClass.getDeclaredMethods())));
                //若是souceClass是接口類型
                if (sourceClass.isInterface()) {
                    //獲取souceClass的全部接口
                    Class<?>[] interfaces = sourceClass.getInterfaces();
                    //遍歷接口,將接口的public方法也添加到方法集合中
                    for (Class<?> interfaceClass : interfaces) {
                        result.addAll(getMethods(interfaceClass));
                    }

                    searchType = null;
                } else {//若是不是接口則查找父類

                    searchType = searchType.getSuperclass();
                }
            }

            return result;
        }

        //獲取給定方法集合中全部public方法
        private Collection<? extends Method> filterPublicMethods(List<Method> methods) {
            List<Method> result = new ArrayList<>(methods.size());

            for (Method method : methods) {
                if (Modifier.isPublic(method.getModifiers())) {
                    result.add(method);
                }
            }

            return result;
        }

        public Method get(Method key) {
           //從方法映射表中獲取目標方法
            Method result = map.get(key);
            //若是目標方法不爲null則返回,不然拋出異常
            if (result != null) {
                return result;
            }
            throw new IllegalStateException("Cannot find source method " + key);
        }
    }
}
相關文章
相關標籤/搜索