Lettuce之RedisClusterClient使用以及源碼分析

  

  Redis Cluster模式簡介

    redis集羣並無使用一致性hash算法而引入了哈希槽概念,Redis 集羣有16384個哈希槽,每一個key經過CRC16校驗後對16384取模來決定放置哪一個槽.集羣的每一個節點負責一部分hash槽.也就是說若是key是不變的對應的slot也是不變的html

  Redis 服務器命令    

  • cluster info

能夠經過cluster info 命令查看集羣信息java

cluster info
cluster_state:ok
cluster_slots_assigned:16384
cluster_slots_ok:16384
cluster_slots_pfail:0
cluster_slots_fail:0
cluster_known_nodes:12
  • cluster nodes 

經過cluster nodes命令查看當前節點以及該節點分配的slot,以下圖能夠發現當前redis集羣有12個節點,每一個節點大約管理1365個slotnode

xx.xxx.xxx.xx:6959> cluster nodes 45abb8663c0cdb25ed17c29521bf6fda98e913ea xx.xxx.xxx.xx:6961 master - 0 1529229636724 11 connected 13653-15018 e40080f32a3fb89e34b7622038ce490682428fdf xx.xxx.xxx.xx:6960 master - 0 1529229633723 10 connected 12288-13652 a749bba5614680dea9f47e3c8fe595aa8be71a2c xx.xxx.xxx.xx:6954 master - 0 1529229639230 4 connected 4096-5460 1096e2a8737401b66c7d4ee0addcb10d7ff14088 xx.xxx.xxx.xx:6952 master - 0 1529229636224 2 connected 1365-2730 fbc76f3481271241c1a89fabeb5139905e1ec2a6 xx.xxx.xxx.xx:6962 master - 0 1529229638230 12 connected 15019-16383 85601fa67820a5af0de0cc21d102d72575709ec6 xx.xxx.xxx.xx:6959 myself,master - 0 0 9 connected 10923-12287 c00d86999c98f97d697f3a2b33ba26fbf50e46eb xx.xxx.xxx.xx:6955 master - 0 1529229634724 5 connected 5461-6826 0b09a5c4c9e9158520389dd2672bd711d55085c6 xx.xxx.xxx.xx:6953 master - 0 1529229637227 3 connected 2731-4095 9f26d208fa8772449d5c322eb63786a1cf9937e0 xx.xxx.xxx.xx:6958 master - 0 1529229635224 8 connected 9557-10922 274294a88758fcb674e1a0292db0e36a66a0bf48 xx.xxx.xxx.xx:6951 master - 0 1529229634223 1 connected 0-1364 369780bdf56d483a0f0a92cb2baab786844051f3 xx.xxx.xxx.xx:6957 master - 0 1529229640232 7 connected 8192-9556 71ed0215356c664cc56d4579684e86a83dba3a92 xx.xxx.xxx.xx:6956 master - 0 1529229635724 6 connected 6827-8191

  

 

  • client list

Redis Client List 命令用於返回全部鏈接到服務器的客戶端信息和統計數據。redis

redis 127.0.0.1:6379> CLIENT LIST 
addr=127.0.0.1:43143 fd=6 age=183 idle=0 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=32768 obl=0 oll=0 omem=0 events=r cmd=client 
addr=127.0.0.1:43163 fd=5 age=35 idle=15 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=ping
addr=127.0.0.1:43167 fd=7 age=24 idle=6 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=get

  

 

  •  cluster slots 

  Redis Client Slots 命令用於當前的集羣狀態算法

redis 127.0.0.1:6379> cluster slots
1) 1) (integer) 0
   2) (integer) 4095
   3) 1) "127.0.0.1"
      2) (integer) 7000
   4) 1) "127.0.0.1"
      2) (integer) 7004
2) 1) (integer) 12288
   2) (integer) 16383
   3) 1) "127.0.0.1"
      2) (integer) 7003
   4) 1) "127.0.0.1"
      2) (integer) 7007
3) 1) (integer) 4096
   2) (integer) 8191
   3) 1) "127.0.0.1"
      2) (integer) 7001
   4) 1) "127.0.0.1"
      2) (integer) 7005
4) 1) (integer) 8192
   2) (integer) 12287
   3) 1) "127.0.0.1"
      2) (integer) 7002
   4) 1) "127.0.0.1"
      2) (integer) 7006

  

  • cluster keyslot

cluster keyslot key  返回一個整數,用於標識指定鍵所散列到的哈希槽緩存

cluster keyslot test
(integer) 6918

  

請求重定向

因爲每一個節點只負責部分slot,以及slot可能從一個節點遷移到另外一節點,形成客戶端有可能會向錯誤的節點發起請求。所以須要有一種機制來對其進行發現和修正,這就是請求重定向。有兩種不一樣的重定向場景:服務器

  • MOVED

         聲明的是slot全部權的轉移,收到的客戶端須要更新其key-node映射關係異步

  • ASK

         申明的是一種臨時的狀態.在從新進行分片期間,源節點向目標節點遷移一個slot過程當中,可能會出現這樣一種狀況:屬於被遷移slot的一部分鍵值對保存在源節點裏面,一部分保存在目標節點裏面.當客戶端向源節點發送一個與鍵有關的命令,而且這個鍵企剛好被遷移到目標節點,則向客戶端返回一個ASK錯誤.由於這個節點還在處於遷移過程當中,全部權尚未轉移,因此客戶端在接收到ASK錯誤後,須要在目標節點執行命令前,先發送一個ASKING命令,若是不發放該命令到話,則會返回MOVED錯誤,ASKING表示已經知道遷移狀態,則會執行該命令.async

 

 

經過集羣查詢數據key爲test的值 redis-cli爲單機模式;若是爲集羣模式時(redis-cli -c) 接收到MOVED 錯誤時是不會打印MOVED錯誤,而是根據MOVED信息自動重定向到正確節點,並打印出重定向信息ide

xx.xxx.xxx.xx:6959> get test
(error) MOVED 6918 xx.xxx.xx.xxx:6956  

  此時返回的結果表示該key在6956這個實例上,經過這個實例能夠獲取到緩存值

xx.xxx.xx.xxx:6956> get test
"cluster"

  經過上文的示例能夠發現獲取緩存值的過程須要訪問cluster兩次,既然key到slot值的算法是已知的,若是能夠經過key直接計算slot,在經過每一個節點的管理的slot範圍就能夠知道這個key對應哪一個節點了,這樣不就能夠一次獲取到了嗎?其實lettuce中就是這樣處理的.下文會有詳細介紹

    若是mget操做值跨slot時會怎樣呢? 

mget test test1
(error) CROSSSLOT Keys in request don't hash to the same slot

 

Lettuce使用

    @Bean(name="clusterRedisURI")
    RedisURI clusterRedisURI(){
        return RedisURI.builder().withHost("xx.xx.xxx.xx").withPort(6954).build();
    }
  //配置集羣選項,自動重連,最多重定型1次
    @Bean
    ClusterClientOptions clusterClientOptions(){
        return ClusterClientOptions.builder().autoReconnect(true).maxRedirects(1).build();
    }

//建立集羣客戶端 @Bean RedisClusterClient redisClusterClient(ClientResources clientResources, ClusterClientOptions clusterClientOptions, RedisURI clusterRedisURI){ RedisClusterClient redisClusterClient= RedisClusterClient.create(clientResources,clusterRedisURI); redisClusterClient.setOptions(clusterClientOptions); return redisClusterClient; } /** * 集羣鏈接 */ @Bean(destroyMethod = "close") StatefulRedisClusterConnection<String,String> statefulRedisClusterConnection(RedisClusterClient redisClusterClient){ return redisClusterClient.connect(); }

  Lettuce在Spring 中的使用經過上文中的配置方式進行配置後就可使用了

  1. 經過StatefulRedisClusterConnection獲取命令處理方式,同步,異步以及響應式
  2. 執行redis相關命令

  

Lettuce相關源碼

     lettuce的使用方式仍是很簡單的那麼它的處理過程究竟是怎樣的呢?下面將經過源碼進行解析.

經過上文能夠知道鏈接是經過RedisClusterClient建立的,它默認使用了StringCodec(LettuceCharsets.UTF8)做爲編碼器建立鏈接

 public StatefulRedisClusterConnection<String, String> connect() {
        return connect(newStringStringCodec());
    }

  

     在建立鏈接時就會主動發現集羣拓撲信息,在第一次建立的時候partitions必定爲null則此時須要初始化分區信息

  <K, V> StatefulRedisClusterConnectionImpl<K, V> connectClusterImpl(RedisCodec<K, V> codec) {
         //若是分區信息爲null則初始化分區信息
        if (partitions == null) {
            initializePartitions();
        }
        //若是須要就激活拓撲刷新
        activateTopologyRefreshIfNeeded();

 初始化集羣分片信息,就是將加載分片信息賦值給partitions屬性 

 protected void initializePartitions() {
        this.partitions = loadPartitions();
    }

  具體加載分片信息處理過程以下:

  protected Partitions loadPartitions() {
        //獲取拓撲刷新信息,
        Iterable<RedisURI> topologyRefreshSource = getTopologyRefreshSource();

        String message = "Cannot retrieve initial cluster partitions from initial URIs " + topologyRefreshSource;
        try {
            //加載拓撲信息
            Map<RedisURI, Partitions> partitions = refresh.loadViews(topologyRefreshSource, useDynamicRefreshSources());

第一次能夠知道partitions爲null則此時須要初始化種子節點的,那麼它的種子節點又是什麼呢?經過代碼能夠發現種子節點就是初始化的URI,那麼它又是何時設置的呢?

protected Iterable<RedisURI> getTopologyRefreshSource() {

        //是否初始化種子節點
        boolean initialSeedNodes = !useDynamicRefreshSources();

        Iterable<RedisURI> seed;
        //若是須要初始化種子節點或分區信息爲null或分區信息爲空 則將初始URI賦值給種子
        if (initialSeedNodes || partitions == null || partitions.isEmpty()) {
            seed = RedisClusterClient.this.initialUris;
        } else {//不須要初始化種子節點
            List<RedisURI> uris = new ArrayList<>();
            for (RedisClusterNode partition : TopologyComparators.sortByUri(partitions)) {
                uris.add(partition.getUri());
            }
            seed = uris;
        }
        return seed;
    }

  經過以下代碼能夠發現種子節點是在建立redisClusterClient的時候指定的

 protected RedisClusterClient(ClientResources clientResources, Iterable<RedisURI> redisURIs) {

        super(clientResources);

        assertNotEmpty(redisURIs);
        assertSameOptions(redisURIs);
        //初始化節點
        this.initialUris = Collections.unmodifiableList(LettuceLists.newList(redisURIs));
         //根據第一個URI的超時時間做爲默認超時時間
        setDefaultTimeout(getFirstUri().getTimeout());
        setOptions(ClusterClientOptions.builder().build());
    }

  默認使用動態刷新

 protected boolean useDynamicRefreshSources() {

        //若是集羣客戶端選項不爲null
        if (getClusterClientOptions() != null) {
            //獲取集羣拓撲刷新選項
            ClusterTopologyRefreshOptions topologyRefreshOptions = getClusterClientOptions().getTopologyRefreshOptions();
            //返回集羣拓撲刷新選項中配置到是否使用動態刷新
            return topologyRefreshOptions.useDynamicRefreshSources();
        }
        //默認動態刷新
        return true;
    }

  下面看看加載分區信息的處理過程,第一次則根據種子節點的鏈接獲取整個集羣的拓撲信息

 public Map<RedisURI, Partitions> loadViews(Iterable<RedisURI> seed, boolean discovery) {

        //獲取超時時間,默認60秒
        long commandTimeoutNs = getCommandTimeoutNs(seed);

        Connections connections = null;
        try {
            //獲取全部種子鏈接
            connections = getConnections(seed).get(commandTimeoutNs, TimeUnit.NANOSECONDS);
            //Requests將異步執行命令封裝到多個節點
   //cluster nodes Requests requestedTopology = connections.requestTopology();
//client list Requests requestedClients = connections.requestClients(); //獲取節點拓撲視圖 NodeTopologyViews nodeSpecificViews = getNodeSpecificViews(requestedTopology, requestedClients, commandTimeoutNs); if (discovery) {//是否查找額外節點 //獲取集羣節點 Set<RedisURI> allKnownUris = nodeSpecificViews.getClusterNodes(); //排除種子節點,獲得須要發現節點 Set<RedisURI> discoveredNodes = difference(allKnownUris, toSet(seed)); //若是須要發現節點不爲空 if (!discoveredNodes.isEmpty()) { //須要發現節點鏈接 Connections discoveredConnections = getConnections(discoveredNodes).optionalGet(commandTimeoutNs, TimeUnit.NANOSECONDS); //合併鏈接 connections = connections.mergeWith(discoveredConnections); //合併請求 requestedTopology = requestedTopology.mergeWith(discoveredConnections.requestTopology()); requestedClients = requestedClients.mergeWith(discoveredConnections.requestClients()); //獲取節點視圖 nodeSpecificViews = getNodeSpecificViews(requestedTopology, requestedClients, commandTimeoutNs); //返回uri對應分區信息 return nodeSpecificViews.toMap(); } } return nodeSpecificViews.toMap(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RedisCommandInterruptedException(e); } finally { if (connections != null) { connections.close(); } } }

  

 

     這樣在建立connection的時候就已經知道集羣中的全部有效節點.根據以前的文章能夠知道對於集羣命令的處理是在ClusterDistributionChannelWriter中處理的.其中有一些信息在初始化writer的時候就初始化了

class ClusterDistributionChannelWriter implements RedisChannelWriter {
    //默認寫入器
    private final RedisChannelWriter defaultWriter;
    //集羣事件監聽器
    private final ClusterEventListener clusterEventListener;
    private final int executionLimit;
    //集羣鏈接提供器
    private ClusterConnectionProvider clusterConnectionProvider;
    //異步集羣鏈接提供器
    private AsyncClusterConnectionProvider asyncClusterConnectionProvider;
    //是否關閉
    private boolean closed = false;
    //分區信息
    private volatile Partitions partitions;

  寫命令的處理以下,會根據key計算出slot,進而找到這個slot對應的node,直接訪問這個node,這樣能夠有效減小訪問cluster次數

public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {

        LettuceAssert.notNull(command, "Command must not be null");
        //若是鏈接已經關閉則拋出異常
        if (closed) {
            throw new RedisException("Connection is closed");
        }
        //若是是集羣命令且命令沒有處理完畢
        if (command instanceof ClusterCommand && !command.isDone()) {
            //類型轉換, 轉換爲ClusterCommand
            ClusterCommand<K, V, T> clusterCommand = (ClusterCommand<K, V, T>) command;
            if (clusterCommand.isMoved() || clusterCommand.isAsk()) {

                HostAndPort target;
                boolean asking;
                //若是集羣命令已經遷移,此時經過ClusterCommand中到重試操做進行到此
                if (clusterCommand.isMoved()) {
                    //獲取命令遷移目標節點
                    target = getMoveTarget(clusterCommand.getError());
                    //觸發遷移事件
                    clusterEventListener.onMovedRedirection();
                    asking = false;
                } else {//若是是ask
                    target = getAskTarget(clusterCommand.getError());
                    asking = true;
                    clusterEventListener.onAskRedirection();
                }

                command.getOutput().setError((String) null);
                //鏈接遷移後的目標節點
                CompletableFuture<StatefulRedisConnection<K, V>> connectFuture = asyncClusterConnectionProvider
                        .getConnectionAsync(ClusterConnectionProvider.Intent.WRITE, target.getHostText(), target.getPort());
                //成功創建鏈接,則向該節點發送命令
                if (isSuccessfullyCompleted(connectFuture)) {
                    writeCommand(command, asking, connectFuture.join(), null);
                } else {
                    connectFuture.whenComplete((connection, throwable) -> writeCommand(command, asking, connection, throwable));
                }

                return command;
            }
        }
        //不是集羣命令就是RedisCommand,第一個請求命令就是非ClusterCommand
         //將當前命令包裝爲集羣命令
        ClusterCommand<K, V, T> commandToSend = getCommandToSend(command);
        //獲取命令參數
        CommandArgs<K, V> args = command.getArgs();

        //排除集羣路由的cluster命令
        if (args != null && !CommandType.CLIENT.equals(commandToSend.getType())) {
            //獲取第一個編碼後的key
            ByteBuffer encodedKey = args.getFirstEncodedKey();
            //若是encodedKey不爲null
            if (encodedKey != null) {
                //獲取slot值
                int hash = getSlot(encodedKey);
                //根據命令類型獲取命令意圖 是讀仍是寫
                ClusterConnectionProvider.Intent intent = getIntent(command.getType());
                //根據意圖和slot獲取鏈接
                CompletableFuture<StatefulRedisConnection<K, V>> connectFuture = ((AsyncClusterConnectionProvider) clusterConnectionProvider)
                        .getConnectionAsync(intent, hash);
                //若是成功獲取鏈接
                if (isSuccessfullyCompleted(connectFuture)) {
                    writeCommand(commandToSend, false, connectFuture.join(), null);
                } else {//若是鏈接還沒有處理完,或有異常,則添加完成處理器
                    connectFuture.whenComplete((connection, throwable) -> writeCommand(commandToSend, false, connection,
                            throwable));
                }

                return commandToSend;
            }
        }

        writeCommand(commandToSend, defaultWriter);

        return commandToSend;
    }

  可是若是計算出的slot由於集羣擴展致使這個slot已經不在這個節點上lettuce是如何處理的呢?經過查閱ClusterCommand源碼能夠發如今complete方法中對於該問題進行了處理;若是響應是MOVED則會繼續訪問MOVED目標節點,這個重定向的此時能夠指定的,默認爲5次,經過上文的配置能夠發現,在配置中只容許一次重定向

 @Override
    public void complete() {
        //若是響應是MOVED或ASK
        if (isMoved() || isAsk()) {
            //若是最大重定向次數大於當前重定向次數則能夠進行重定向
            boolean retryCommand = maxRedirections > redirections;
            //重定向次數自增
            redirections++;

            if (retryCommand) {
                try {
                    //重定向
                    retry.write(this);
                } catch (Exception e) {
                    completeExceptionally(e);
                }
                return;
            }
        }
        super.complete();
        completed = true;
    }

  若是是ask向重定向目標發送命令前須要同步發送asking

 private static <K, V> void writeCommand(RedisCommand<K, V, ?> command, boolean asking,
            StatefulRedisConnection<K, V> connection, Throwable throwable) {

        if (throwable != null) {
            command.completeExceptionally(throwable);
            return;
        }

        try {
            //若是須要發送asking請求,即接收到ASK錯誤消息,則在重定向到目標主機後須要發送asking命令
            if (asking) {
                connection.async().asking();
            }
            //發送命令
            writeCommand(command, ((RedisChannelHandler<K, V>) connection).getChannelWriter());
        } catch (Exception e) {
            command.completeExceptionally(e);
        }
    }

  

  上文主要介紹了lettuce對於單個key的處理,若是存在多個key,如mget lettuce又是如何處理的呢?其主要思路是將key根據slot進行分組,將在同一個slot的命令一塊兒發送到對應的節點,再將全部請求的返回值合併做爲最終結果.源碼以下:

  @Override
    public RedisFuture<List<KeyValue<K, V>>> mget(Iterable<K> keys) {
        //獲取分區和key的映射關係
        Map<Integer, List<K>> partitioned = SlotHash.partition(codec, keys);
        //若是分區數小於2也就是隻有一個分區即全部key都落在一個分區就直接獲取
        if (partitioned.size() < 2) {
            return super.mget(keys);
        }
        //每一個key與slot映射關係
        Map<K, Integer> slots = SlotHash.getSlots(partitioned);

        Map<Integer, RedisFuture<List<KeyValue<K, V>>>> executions = new HashMap<>();
        //遍歷分片信息,逐個發送
        for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
            RedisFuture<List<KeyValue<K, V>>> mget = super.mget(entry.getValue());
            executions.put(entry.getKey(), mget);
        }

        //恢復key的順序
        return new PipelinedRedisFuture<>(executions, objectPipelinedRedisFuture -> {
            List<KeyValue<K, V>> result = new ArrayList<>();
            for (K opKey : keys) {
                int slot = slots.get(opKey);

                int position = partitioned.get(slot).indexOf(opKey);
                RedisFuture<List<KeyValue<K, V>>> listRedisFuture = executions.get(slot);
                result.add(MultiNodeExecution.execute(() -> listRedisFuture.get().get(position)));
            }

            return result;
        });
    }
相關文章
相關標籤/搜索