Lettuce是一個可伸縮線程安全的Redis客戶端。多個線程能夠共享同一個RedisConnection.本文是基於Lettuce5,主要介紹的知識點以下:java
@Configuration public class LettuceConfig { /** * 配置客戶端資源 * @return */ @Bean(destroyMethod = "shutdown") ClientResources clientResources() { return DefaultClientResources.builder().ioThreadPoolSize(8).computationThreadPoolSize(10).build(); } /** * 配置Socket選項 * keepAlive=true * tcpNoDelay=true * connectionTimeout=5秒 * @return */ @Bean SocketOptions socketOptions(){ return SocketOptions.builder().keepAlive(true).tcpNoDelay(true).connectTimeout(Duration.ofSeconds(5)).build(); } /** * 配置客戶端選項 * @return */ @Bean ClientOptions clientOptions(SocketOptions socketOptions) { return ClientOptions.builder().socketOptions(socketOptions).build(); } /** * 建立RedisClient * @param clientResources 客戶端資源 * @param clientOptions 客戶端選項 * @return */ @Bean(destroyMethod = "shutdown") RedisClient redisClient(ClientResources clientResources, ClientOptions clientOptions) { RedisURI uri = RedisURI.builder().withSentinel("xx.xx.xx.xx", 26009).withPassword("abcd1234").withSentinelMasterId("xxx").build(); RedisClient client = RedisClient.create(clientResources, uri); client.setOptions(clientOptions); return client; } /** * 建立鏈接 * @param redisClient * @return */ @Bean(destroyMethod = "close") StatefulRedisConnection<String, String> connection(RedisClient redisClient) { return redisClient.connect(); } }
基本使用node
public Mono<ServerResponse> hello(ServerRequest request) throws Exception { //響應式使用 Mono<String> resp = redisConnection.reactive().get("gxt_new"); //同步使用 redisConnection.sync().get("test"); redisConnection.async().get("test").get(5, TimeUnit.SECONDS); return ServerResponse.ok().body(resp, String.class); }
客戶端訂閱事件react
客戶端使用事件總線傳輸運行期間產生的事件;EventBus能夠從客戶端資源進行配置和獲取,並用於客戶端和自定義事件。 redis
以下事件能夠被客戶端發送:緩存
client.getResources().eventBus().get().subscribe(e -> { System.out.println("client 訂閱事件: " + e); });
client 訂閱事件: ConnectionActivatedEvent [/xx:49910 -> /xx:6008] client 訂閱事件: ConnectionActivatedEvent [/xx:49911 -> /xx:6018] client 訂閱事件: ConnectedEvent [/xx:49912 -> /xx:6018]
發佈事件安全
發佈使用也是經過使用eventBus進行發佈事件,Event接口只是一個標籤接口服務器
eventBus.publish(new Event() { @Override public String toString() { return "自定義事件"; } });
訂閱者就能夠訂閱到這個自定義事件了 負載均衡
client 訂閱事件: 自定義事件
讀寫分離 異步
@Bean(destroyMethod = "close") StatefulRedisMasterSlaveConnection<String, String> statefulRedisMasterSlaveConnection(RedisClient redisClient, RedisURI redisURI) { StatefulRedisMasterSlaveConnection connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), redisURI); connection.setReadFrom(ReadFrom.NEAREST); return connection; } }
StatefulRedisMasterSlaveConnection 支持讀寫分離,經過設置ReadFrom控制讀是從哪一個節點讀取.socket
參數 | 含義 |
MASTER | 從master節點讀取 |
SLAVE | 從slave節點讀取 |
MASTER_PREFERRED |
從master節點讀取,若是master節點不能夠則從slave節點讀取 |
SLAVE_PREFERRED |
從slave節點讀取,若是slave節點不可用則倒退到master節點讀取 |
NEAREST |
從最近到節點讀取 |
具體是如何實現到呢? 下面看一下MasterSlaveConnectionProvider相關源碼
//根據意圖獲取鏈接 public StatefulRedisConnection<K, V> getConnection(Intent intent) { if (debugEnabled) { logger.debug("getConnection(" + intent + ")"); } //若是readFrom不爲null且是READ if (readFrom != null && intent == Intent.READ) { //根據readFrom配置從已知節點中選擇可用節點描述 List<RedisNodeDescription> selection = readFrom.select(new ReadFrom.Nodes() { @Override public List<RedisNodeDescription> getNodes() { return knownNodes; } @Override public Iterator<RedisNodeDescription> iterator() { return knownNodes.iterator(); } }); //若是可選擇節點集合爲空則拋出異常 if (selection.isEmpty()) { throw new RedisException(String.format("Cannot determine a node to read (Known nodes: %s) with setting %s", knownNodes, readFrom)); } try { //遍歷全部可用節點 for (RedisNodeDescription redisNodeDescription : selection) { //獲取節點鏈接 StatefulRedisConnection<K, V> readerCandidate = getConnection(redisNodeDescription); //若是節點鏈接不是打開到鏈接則繼續查找下一個鏈接 if (!readerCandidate.isOpen()) { continue; } //返回可用鏈接 return readerCandidate; } //若是沒有找到可用鏈接,默認返回第一個 return getConnection(selection.get(0)); } catch (RuntimeException e) { throw new RedisException(e); } } //若是沒有配置readFrom或者不是READ 則返回master鏈接 return getConnection(getMaster()); }
咱們能夠看到選擇鏈接到邏輯是通用的,不一樣的處理就是在selection的處理上,下面看一下不一樣readFrom策略對於selection的處理
ReadFromSlavePerferred和ReadFromMasterPerferred都是有優先級到概念,看看相關邏輯的處理
static final class ReadFromSlavePreferred extends ReadFrom { @Override public List<RedisNodeDescription> select(Nodes nodes) { List<RedisNodeDescription> result = new ArrayList<>(nodes.getNodes().size()); //優先添加slave節點 for (RedisNodeDescription node : nodes) { if (node.getRole() == RedisInstance.Role.SLAVE) { result.add(node); } } //最後添加master節點 for (RedisNodeDescription node : nodes) { if (node.getRole() == RedisInstance.Role.MASTER) { result.add(node); } } return result; }
static final class ReadFromMasterPreferred extends ReadFrom { @Override public List<RedisNodeDescription> select(Nodes nodes) { List<RedisNodeDescription> result = new ArrayList<>(nodes.getNodes().size()); //優先添加master節點 for (RedisNodeDescription node : nodes) { if (node.getRole() == RedisInstance.Role.MASTER) { result.add(node); } } //其次在添加slave節點 for (RedisNodeDescription node : nodes) { if (node.getRole() == RedisInstance.Role.SLAVE) { result.add(node); } } return result; } }
對於ReadFromMaster和ReadFromSlave都是獲取指定角色的節點
static final class ReadFromSlave extends ReadFrom { @Override public List<RedisNodeDescription> select(Nodes nodes) { List<RedisNodeDescription> result = new ArrayList<>(nodes.getNodes().size()); //只獲取slave節點 for (RedisNodeDescription node : nodes) { if (node.getRole() == RedisInstance.Role.SLAVE) { result.add(node); } } return result; } }
static final class ReadFromMaster extends ReadFrom { @Override public List<RedisNodeDescription> select(Nodes nodes) { for (RedisNodeDescription node : nodes) { if (node.getRole() == RedisInstance.Role.MASTER) { return LettuceLists.newList(node); } } return Collections.emptyList(); } }
獲取最近的節點這個就有點特殊了,它對已知對節點沒有作處理,直接返回了它們的節點描述,也就是誰在前面就優先使用誰
static final class ReadFromNearest extends ReadFrom { @Override public List<RedisNodeDescription> select(Nodes nodes) { return nodes.getNodes(); } }
在SentinelTopologyProvider中能夠發現,獲取nodes節點老是優先獲取Master節點,其次是slave節點,這樣Nearest效果就等效與MasterPreferred
public List<RedisNodeDescription> getNodes() { logger.debug("lookup topology for masterId {}", masterId); try (StatefulRedisSentinelConnection<String, String> connection = redisClient.connectSentinel(CODEC, sentinelUri)) { RedisFuture<Map<String, String>> masterFuture = connection.async().master(masterId); RedisFuture<List<Map<String, String>>> slavesFuture = connection.async().slaves(masterId); List<RedisNodeDescription> result = new ArrayList<>(); try { Map<String, String> master = masterFuture.get(timeout.toNanos(), TimeUnit.NANOSECONDS); List<Map<String, String>> slaves = slavesFuture.get(timeout.toNanos(), TimeUnit.NANOSECONDS); //添加master節點 result.add(toNode(master, RedisInstance.Role.MASTER)); //添加全部slave節點 result.addAll(slaves.stream().filter(SentinelTopologyProvider::isAvailable) .map(map -> toNode(map, RedisInstance.Role.SLAVE)).collect(Collectors.toList())); } catch (ExecutionException | InterruptedException | TimeoutException e) { throw new RedisException(e); } return result; } }
自定義負載均衡
經過上文能夠發現只須要實現 ReadFrom接口,就能夠經過該接口實現Master,Slave負載均衡;下面的示例是經過將nodes節點進行打亂,進而實現
@Bean(destroyMethod = "close") StatefulRedisMasterSlaveConnection<String, String> statefulRedisMasterSlaveConnection(RedisClient redisClient, RedisURI redisURI) { StatefulRedisMasterSlaveConnection connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), redisURI); connection.setReadFrom(new ReadFrom() { @Override public List<RedisNodeDescription> select(Nodes nodes) { List<RedisNodeDescription> list = nodes.getNodes(); Collections.shuffle(list); return list; } }); return connection; }
在大規模使用的時候會使用多組主備服務,能夠經過客戶端分片的方式將部分請求路由到指定的服務器上,可是Lettuce沒有提供這樣的支持,下面是自定義的實現:
public class Sharded< C extends StatefulRedisConnection,V> { private TreeMap<Long, String> nodes; private final Hashing algo = Hashing.MURMUR_HASH; private final Map<String, StatefulRedisConnection> resources = new LinkedHashMap<>(); private RedisClient redisClient; private String password; private Set<HostAndPort> sentinels; private RedisCodec<String, V> codec; public Sharded(List<String> masters, RedisClient redisClient, String password, Set<HostAndPort> sentinels, RedisCodec<String, V> codec) { this.redisClient = redisClient; this.password = password; this.sentinels = sentinels; this.codec = codec; initialize(masters); } private void initialize(List<String> masters) { nodes = new TreeMap<>(); for (int i = 0; i != masters.size(); ++i) { final String master = masters.get(i); for (int n = 0; n < 160; n++) { nodes.put(this.algo.hash("SHARD-" + i + "-NODE-" + n), master); } RedisURI.Builder builder = RedisURI.builder(); for (HostAndPort hostAndPort : sentinels) { builder.withSentinel(hostAndPort.getHostText(), hostAndPort.getPort()); } RedisURI redisURI = builder.withPassword(password).withSentinelMasterId(master).build(); resources.put(master, MasterSlave.connect(redisClient, codec, redisURI)); } } public StatefulRedisConnection getConnectionBy(String key) { return resources.get(getShardInfo(SafeEncoder.encode(key))); } public Collection<StatefulRedisConnection> getAllConnection(){ return Collections.unmodifiableCollection(resources.values()); } public String getShardInfo(byte[] key) { SortedMap<Long, String> tail = nodes.tailMap(algo.hash(key)); if (tail.isEmpty()) { return nodes.get(nodes.firstKey()); } return tail.get(tail.firstKey()); } public void close(){ for(StatefulRedisConnection connection: getAllConnection()){ connection.close(); } } private static class SafeEncoder { static byte[] encode(final String str) { try { if (str == null) { throw new IllegalArgumentException("value sent to redis cannot be null"); } return str.getBytes("UTF-8"); } catch (UnsupportedEncodingException e) { throw new RuntimeException(e); } } } private interface Hashing { Hashing MURMUR_HASH = new MurmurHash(); long hash(String key); long hash(byte[] key); } private static class MurmurHash implements Hashing { static long hash64A(byte[] data, int seed) { return hash64A(ByteBuffer.wrap(data), seed); } static long hash64A(ByteBuffer buf, int seed) { ByteOrder byteOrder = buf.order(); buf.order(ByteOrder.LITTLE_ENDIAN); long m = 0xc6a4a7935bd1e995L; int r = 47; long h = seed ^ (buf.remaining() * m); long k; while (buf.remaining() >= 8) { k = buf.getLong(); k *= m; k ^= k >>> r; k *= m; h ^= k; h *= m; } if (buf.remaining() > 0) { ByteBuffer finish = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN); // for big-endian version, do this first: // finish.position(8-buf.remaining()); finish.put(buf).rewind(); h ^= finish.getLong(); h *= m; } h ^= h >>> r; h *= m; h ^= h >>> r; buf.order(byteOrder); return h; } public long hash(byte[] key) { return hash64A(key, 0x1234ABCD); } public long hash(String key) { return hash(SafeEncoder.encode(key)); } } }
@Bean(destroyMethod = "close") Sharded<StatefulRedisMasterSlaveConnection,String> sharded(RedisClient redisClient) { Set<HostAndPort> hostAndPorts=new HashSet<>(); hostAndPorts.add(HostAndPort.parse("1xx:26009")); hostAndPorts.add(HostAndPort.parse("1xx:26009")); return new Sharded<>(Arrays.asList("te009","test68","test67"),redisClient,"password",hostAndPorts, new Utf8StringCodec()); }
使用方式
//只從slave節點中讀取 StatefulRedisMasterSlaveConnection redisConnection = (StatefulRedisMasterSlaveConnection) sharded.getConnectionBy("key"); //使用異步模式獲取緩存值 System.out.println(redisConnection.sync().get("key"));