Lettuce在Spring boot中的使用方式

Lettuce是一個可伸縮線程安全的Redis客戶端。多個線程能夠共享同一個RedisConnection.本文是基於Lettuce5,主要介紹的知識點以下:java

  1. Lettuce在Spring Boot中的配置
  2. Lettuce的同步,異步,響應式使用方式
  3. 事件的訂閱
  4. 發佈自定義事件
  5. 讀寫分離
  6. 讀寫分離策略實現源碼
  7. 客戶端分片實現
@Configuration
public class LettuceConfig {

    /**
     * 配置客戶端資源
     * @return
     */
    @Bean(destroyMethod = "shutdown")
    ClientResources clientResources() {
        return DefaultClientResources.builder().ioThreadPoolSize(8).computationThreadPoolSize(10).build();
    }


    /**
     * 配置Socket選項
     * keepAlive=true
     * tcpNoDelay=true
     * connectionTimeout=5秒
     * @return
     */
    @Bean
    SocketOptions socketOptions(){
       return SocketOptions.builder().keepAlive(true).tcpNoDelay(true).connectTimeout(Duration.ofSeconds(5)).build(); 
    }
    /**
     * 配置客戶端選項
     * @return
     */
    @Bean
    ClientOptions clientOptions(SocketOptions socketOptions) {
        return ClientOptions.builder().socketOptions(socketOptions).build();
    }

    /**
     * 建立RedisClient
     * @param clientResources 客戶端資源
     * @param clientOptions 客戶端選項
     * @return 
     */
    @Bean(destroyMethod = "shutdown")
    RedisClient redisClient(ClientResources clientResources, ClientOptions clientOptions) {
        RedisURI uri = RedisURI.builder().withSentinel("xx.xx.xx.xx", 26009).withPassword("abcd1234").withSentinelMasterId("xxx").build();
        RedisClient client = RedisClient.create(clientResources, uri);
        client.setOptions(clientOptions);
        return client;
    }

    /**
     * 建立鏈接
     * @param redisClient
     * @return
     */
    @Bean(destroyMethod = "close")
    StatefulRedisConnection<String, String> connection(RedisClient redisClient) {
        return redisClient.connect();
    }
}

  

 基本使用node

public Mono<ServerResponse> hello(ServerRequest request) throws  Exception {
    //響應式使用
    Mono<String> resp = redisConnection.reactive().get("gxt_new");
    //同步使用
    redisConnection.sync().get("test");
    redisConnection.async().get("test").get(5, TimeUnit.SECONDS);
    return ServerResponse.ok().body(resp, String.class);
}

 

 客戶端訂閱事件react

     客戶端使用事件總線傳輸運行期間產生的事件;EventBus能夠從客戶端資源進行配置和獲取,並用於客戶端和自定義事件。  redis

 

以下事件能夠被客戶端發送:緩存

  • 鏈接事件
  • 測量事件
  • 集羣拓撲事件

  

client.getResources().eventBus().get().subscribe(e -> {
            System.out.println("client 訂閱事件: " + e);
        });

  

client 訂閱事件: ConnectionActivatedEvent [/xx:49910 -> /xx:6008]
client 訂閱事件: ConnectionActivatedEvent [/xx:49911 -> /xx:6018]
client 訂閱事件: ConnectedEvent [/xx:49912 -> /xx:6018]

 發佈事件安全

    發佈使用也是經過使用eventBus進行發佈事件,Event接口只是一個標籤接口服務器

 eventBus.publish(new Event() {
            @Override
            public String toString() {
                return "自定義事件";
            }
        });

   訂閱者就能夠訂閱到這個自定義事件了  負載均衡

client 訂閱事件: 自定義事件

 讀寫分離 異步

 

@Bean(destroyMethod = "close")
    StatefulRedisMasterSlaveConnection<String, String> statefulRedisMasterSlaveConnection(RedisClient redisClient, RedisURI redisURI) {
        StatefulRedisMasterSlaveConnection connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), redisURI);
        connection.setReadFrom(ReadFrom.NEAREST);
        return connection;
    }
}

  StatefulRedisMasterSlaveConnection 支持讀寫分離,經過設置ReadFrom控制讀是從哪一個節點讀取.socket

參數 含義
MASTER 從master節點讀取
SLAVE 從slave節點讀取
MASTER_PREFERRED
從master節點讀取,若是master節點不能夠則從slave節點讀取
SLAVE_PREFERRED
從slave節點讀取,若是slave節點不可用則倒退到master節點讀取
NEAREST
從最近到節點讀取

具體是如何實現到呢? 下面看一下MasterSlaveConnectionProvider相關源碼

 //根據意圖獲取鏈接
    public StatefulRedisConnection<K, V> getConnection(Intent intent) {

        if (debugEnabled) {
            logger.debug("getConnection(" + intent + ")");
        }
        //若是readFrom不爲null且是READ
        if (readFrom != null && intent == Intent.READ) {
            //根據readFrom配置從已知節點中選擇可用節點描述
            List<RedisNodeDescription> selection = readFrom.select(new ReadFrom.Nodes() {
                @Override
                public List<RedisNodeDescription> getNodes() {
                    return knownNodes;
                }

                @Override
                public Iterator<RedisNodeDescription> iterator() {
                    return knownNodes.iterator();
                }
            });
            //若是可選擇節點集合爲空則拋出異常
            if (selection.isEmpty()) {
                throw new RedisException(String.format("Cannot determine a node to read (Known nodes: %s) with setting %s",
                        knownNodes, readFrom));
            }
            try {
                //遍歷全部可用節點
                for (RedisNodeDescription redisNodeDescription : selection) {
                    //獲取節點鏈接
                    StatefulRedisConnection<K, V> readerCandidate = getConnection(redisNodeDescription);
                    //若是節點鏈接不是打開到鏈接則繼續查找下一個鏈接
                    if (!readerCandidate.isOpen()) {
                        continue;
                    }
                    //返回可用鏈接
                    return readerCandidate;
                }
                //若是沒有找到可用鏈接,默認返回第一個
                return getConnection(selection.get(0));
            } catch (RuntimeException e) {
                throw new RedisException(e);
            }
        }
        //若是沒有配置readFrom或者不是READ 則返回master鏈接
        return getConnection(getMaster());
    }

 咱們能夠看到選擇鏈接到邏輯是通用的,不一樣的處理就是在selection的處理上,下面看一下不一樣readFrom策略對於selection的處理

ReadFromSlavePerferred和ReadFromMasterPerferred都是有優先級到概念,看看相關邏輯的處理

static final class ReadFromSlavePreferred extends ReadFrom {

        @Override
        public List<RedisNodeDescription> select(Nodes nodes) {

            List<RedisNodeDescription> result = new ArrayList<>(nodes.getNodes().size());
            //優先添加slave節點
            for (RedisNodeDescription node : nodes) {
                if (node.getRole() == RedisInstance.Role.SLAVE) {
                    result.add(node);
                }
            }
            //最後添加master節點
            for (RedisNodeDescription node : nodes) {
                if (node.getRole() == RedisInstance.Role.MASTER) {
                    result.add(node);
                }
            }

            return result;
        }

  

 static final class ReadFromMasterPreferred extends ReadFrom {

        @Override
        public List<RedisNodeDescription> select(Nodes nodes) {

            List<RedisNodeDescription> result = new ArrayList<>(nodes.getNodes().size());
            //優先添加master節點
            for (RedisNodeDescription node : nodes) {
                if (node.getRole() == RedisInstance.Role.MASTER) {
                    result.add(node);
                }
            }
            //其次在添加slave節點
            for (RedisNodeDescription node : nodes) {
                if (node.getRole() == RedisInstance.Role.SLAVE) {
                    result.add(node);
                }
            }

            return result;
        }
    }

對於ReadFromMaster和ReadFromSlave都是獲取指定角色的節點  

 static final class ReadFromSlave extends ReadFrom {

        @Override
        public List<RedisNodeDescription> select(Nodes nodes) {

            List<RedisNodeDescription> result = new ArrayList<>(nodes.getNodes().size());
            //只獲取slave節點
            for (RedisNodeDescription node : nodes) {
                if (node.getRole() == RedisInstance.Role.SLAVE) {
                    result.add(node);
                }
            }

            return result;
        }
    }

  

static final class ReadFromMaster extends ReadFrom {

        @Override
        public List<RedisNodeDescription> select(Nodes nodes) {

            for (RedisNodeDescription node : nodes) {
                if (node.getRole() == RedisInstance.Role.MASTER) {
                    return LettuceLists.newList(node);
                }
            }

            return Collections.emptyList();
        }
    }

  獲取最近的節點這個就有點特殊了,它對已知對節點沒有作處理,直接返回了它們的節點描述,也就是誰在前面就優先使用誰

static final class ReadFromNearest extends ReadFrom {

        @Override
        public List<RedisNodeDescription> select(Nodes nodes) {
            return nodes.getNodes();
        }
    }

  在SentinelTopologyProvider中能夠發現,獲取nodes節點老是優先獲取Master節點,其次是slave節點,這樣Nearest效果就等效與MasterPreferred

public List<RedisNodeDescription> getNodes() {

        logger.debug("lookup topology for masterId {}", masterId);

        try (StatefulRedisSentinelConnection<String, String> connection = redisClient.connectSentinel(CODEC, sentinelUri)) {

            RedisFuture<Map<String, String>> masterFuture = connection.async().master(masterId);
            RedisFuture<List<Map<String, String>>> slavesFuture = connection.async().slaves(masterId);

            List<RedisNodeDescription> result = new ArrayList<>();
            try {
                Map<String, String> master = masterFuture.get(timeout.toNanos(), TimeUnit.NANOSECONDS);
                List<Map<String, String>> slaves = slavesFuture.get(timeout.toNanos(), TimeUnit.NANOSECONDS);
                //添加master節點    
                result.add(toNode(master, RedisInstance.Role.MASTER));
                //添加全部slave節點
                result.addAll(slaves.stream().filter(SentinelTopologyProvider::isAvailable)
                        .map(map -> toNode(map, RedisInstance.Role.SLAVE)).collect(Collectors.toList()));

            } catch (ExecutionException | InterruptedException | TimeoutException e) {
                throw new RedisException(e);
            }

            return result;
        }
    }

   自定義負載均衡  

       經過上文能夠發現只須要實現 ReadFrom接口,就能夠經過該接口實現Master,Slave負載均衡;下面的示例是經過將nodes節點進行打亂,進而實現

 @Bean(destroyMethod = "close")
    StatefulRedisMasterSlaveConnection<String, String> statefulRedisMasterSlaveConnection(RedisClient redisClient, RedisURI redisURI) {
        StatefulRedisMasterSlaveConnection connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), redisURI);
        connection.setReadFrom(new ReadFrom() {
            @Override
            public List<RedisNodeDescription> select(Nodes nodes) {
                List<RedisNodeDescription> list = nodes.getNodes();
                Collections.shuffle(list);
                return list;
            }
        });
        return connection;
    }

  

   在大規模使用的時候會使用多組主備服務,能夠經過客戶端分片的方式將部分請求路由到指定的服務器上,可是Lettuce沒有提供這樣的支持,下面是自定義的實現:

public class Sharded< C extends StatefulRedisConnection,V> {

    private TreeMap<Long, String> nodes;
    private final Hashing algo = Hashing.MURMUR_HASH;
    private final Map<String, StatefulRedisConnection> resources = new LinkedHashMap<>();
    private RedisClient redisClient;
    private String password;
    private Set<HostAndPort> sentinels;
    private RedisCodec<String, V> codec;

    public Sharded(List<String> masters, RedisClient redisClient, String password, Set<HostAndPort> sentinels, RedisCodec<String, V> codec) {
        this.redisClient = redisClient;
        this.password = password;
        this.sentinels = sentinels;
        this.codec = codec;
        initialize(masters);
    }

    private void initialize(List<String> masters) {
        nodes = new TreeMap<>();

        for (int i = 0; i != masters.size(); ++i) {
            final String master = masters.get(i);
            for (int n = 0; n < 160; n++) {
                nodes.put(this.algo.hash("SHARD-" + i + "-NODE-" + n), master);
            }
            RedisURI.Builder builder = RedisURI.builder();
            for (HostAndPort hostAndPort : sentinels) {
                builder.withSentinel(hostAndPort.getHostText(), hostAndPort.getPort());
            }

            RedisURI redisURI = builder.withPassword(password).withSentinelMasterId(master).build();
            resources.put(master, MasterSlave.connect(redisClient, codec, redisURI));
        }

    }

    public StatefulRedisConnection getConnectionBy(String key) {
        return resources.get(getShardInfo(SafeEncoder.encode(key)));
    }

    public Collection<StatefulRedisConnection> getAllConnection(){
        return Collections.unmodifiableCollection(resources.values());
    }

    public String getShardInfo(byte[] key) {
        SortedMap<Long, String> tail = nodes.tailMap(algo.hash(key));
        if (tail.isEmpty()) {
            return nodes.get(nodes.firstKey());
        }
        return tail.get(tail.firstKey());
    }


    public void close(){
       for(StatefulRedisConnection connection:  getAllConnection()){
            connection.close();
        }
    }

    private static  class SafeEncoder {

         static byte[] encode(final String str) {
            try {
                if (str == null) {
                    throw new IllegalArgumentException("value sent to redis cannot be null");
                }
                return str.getBytes("UTF-8");
            } catch (UnsupportedEncodingException e) {
                throw new RuntimeException(e);
            }
        }
    }
    private interface Hashing {
        Hashing MURMUR_HASH = new MurmurHash();

        long hash(String key);

        long hash(byte[] key);
    }


    private static  class MurmurHash implements Hashing {

         static long hash64A(byte[] data, int seed) {
            return hash64A(ByteBuffer.wrap(data), seed);
        }


         static long hash64A(ByteBuffer buf, int seed) {
            ByteOrder byteOrder = buf.order();
            buf.order(ByteOrder.LITTLE_ENDIAN);

            long m = 0xc6a4a7935bd1e995L;
            int r = 47;

            long h = seed ^ (buf.remaining() * m);

            long k;
            while (buf.remaining() >= 8) {
                k = buf.getLong();

                k *= m;
                k ^= k >>> r;
                k *= m;

                h ^= k;
                h *= m;
            }

            if (buf.remaining() > 0) {
                ByteBuffer finish = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN);
                // for big-endian version, do this first:
                // finish.position(8-buf.remaining());
                finish.put(buf).rewind();
                h ^= finish.getLong();
                h *= m;
            }

            h ^= h >>> r;
            h *= m;
            h ^= h >>> r;

            buf.order(byteOrder);
            return h;
        }

        public long hash(byte[] key) {
            return hash64A(key, 0x1234ABCD);
        }

        public long hash(String key) {
            return hash(SafeEncoder.encode(key));
        }
    }




}

  

 @Bean(destroyMethod = "close")
    Sharded<StatefulRedisMasterSlaveConnection,String> sharded(RedisClient redisClient) {

        Set<HostAndPort> hostAndPorts=new HashSet<>();
        hostAndPorts.add(HostAndPort.parse("1xx:26009"));
        hostAndPorts.add(HostAndPort.parse("1xx:26009"));


        return new Sharded<>(Arrays.asList("te009","test68","test67"),redisClient,"password",hostAndPorts, new Utf8StringCodec());
    }

  使用方式

  //只從slave節點中讀取
            StatefulRedisMasterSlaveConnection redisConnection = (StatefulRedisMasterSlaveConnection) sharded.getConnectionBy("key");
            //使用異步模式獲取緩存值
            System.out.println(redisConnection.sync().get("key"));
相關文章
相關標籤/搜索