客戶端和服務器經過 TCP 鏈接來進行數據交互, 服務器默認的端口號爲 6379 。
客戶端和服務器發送的命令或數據一概以 \r\n (CRLF 回車+換行)結尾。html
若是使用 wireshark 對 jedis 抓包:
環境:Jedis 鏈接到虛擬機 202,運行 main,對 VMnet8 抓包。
過濾條件:ip.dst==192.168.8.202 and tcp.port in {6379}
set qingshan 抓包:java
能夠看到實際發出的數據包是:node
*3\r\n$3\r\nSET\r\n$8\r\nqingshan\r\n$4\r\n2673\r\n
get qingshan 抓包:python
*2\r\n$3\r\nGET\r\n$8\r\nqingshan\r\n
客戶端跟 Redis 之間 使用一種特殊的編碼格式(在 AOF 文件裏面咱們看到了),叫作 Redis Serialization Protocol (Redis 序列化協議)。特色:容易實現、解析快、可讀性強。客戶端發給服務端的消息須要通過編碼,服務端收到以後會按約定進行解碼,反之亦然。mysql
基於此,咱們能夠本身實現一個 Redis 客戶端。
參考:myclient.MyClient.java
一、創建 Socket 鏈接
二、OutputStream 寫入數據(發送到服務端)
三、InputStream 讀取數據(從服務端接口)
基於這種協議,咱們能夠用 Java 實現全部的 Redis 操做命令。固然,咱們不須要這麼作,由於已經有不少比較成熟的 Java 客戶端,實現了完整的功能和高級特性,而且提供了良好的性能。react
https://redis.io/clients#java
官網推薦的 Java 客戶端有 3 個 Jedis,Redisson 和 Luttuce。git
客戶端 | 描述 |
---|---|
Jedis | A blazingly small and sane redis java client |
lettuce | Advanced Redis client for thread-safe sync, async, and reactive usage. Supports Cluster, Sentinel,Pipelining, and codecs |
Redisson | distributed and scalable Java data structures on top of Redis server |
Spring 鏈接 Redis 用的是什麼?RedisConnectionFactory 接口支持多種實現,例如 : JedisConnectionFactory 、 JredisConnectionFactory 、LettuceConnectionFactory、SrpConnectionFactory。github
https://github.com/xetorthio/jedis面試
Jedis 是咱們最熟悉和最經常使用的客戶端。輕量,簡潔,便於集成和改造。redis
public static void main(String[] args) { Jedis jedis = new Jedis("127.0.0.1", 6379); jedis.set("qingshan", "2673"); System.out.println(jedis.get("qingshan")); jedis.close(); }
Jedis 多個線程使用一個鏈接的時候線程不安全。可使用鏈接池,爲每一個請求建立不一樣的鏈接,基於 Apache common pool 實現。跟數據庫同樣,能夠設置最大鏈接數等參數。Jedis 中有多種鏈接池的子類。
例如:
public class ShardingTest { public static void main(String[] args) { JedisPoolConfig poolConfig = new JedisPoolConfig(); // Redis服務器 JedisShardInfo shardInfo1 = new JedisShardInfo("127.0.0.1", 6379); JedisShardInfo shardInfo2 = new JedisShardInfo("192.168.8.205", 6379); // 鏈接池 List<JedisShardInfo> infoList = Arrays.asList(shardInfo1, shardInfo2); ShardedJedisPool jedisPool = new ShardedJedisPool(poolConfig, infoList); ShardedJedis jedis = null; try{ jedis = jedisPool.getResource(); for(int i=0; i<100; i++){ jedis.set("k"+i, ""+i); } for(int i=0; i<100; i++){ Client client = jedis.getShard("k"+i).getClient(); System.out.println("取到值:"+jedis.get("k"+i)+","+"當前key位於:" + client.getHost() + ":" + client.getPort()); } }finally{ if(jedis!=null) { jedis.close(); } } } }
Jedis 有 4 種工做模式:單節點、分片、哨兵、集羣。
3 種請求模式:Client、Pipeline、事務。Client 模式就是客戶端發送一個命令,阻塞等待服務端執行,而後讀取 返回結果。Pipeline 模式是一次性發送多個命令,最後一次取回全部的返回結果,這種模式經過減小網絡的往返時間和 io 讀寫次數,大幅度提升通訊性能。第三種是事務模式。Transaction 模式即開啓 Redis 的事務管理,事務模式開啓後,全部的命令(除了 exec,discard,multi 和 watch)到達服務端之後不會當即執行,會進入一個等待隊列。
問題:Jedis 鏈接 Sentinel 的時候,咱們配置的是所有哨兵的地址。Sentinel 是如何返回可用的 master 地址的呢?
在構造方法中:
pool = new JedisSentinelPool(masterName, sentinels);
調用了:
HostAndPort master = initSentinels(sentinels, masterName);
查看:
private HostAndPort initSentinels(Set<String> sentinels, final String masterName) { HostAndPort master = null; boolean sentinelAvailable = false; log.info("Trying to find master from available Sentinels..."); // 有多個 sentinels,遍歷這些個 sentinels for (String sentinel : sentinels) { // host:port 表示的 sentinel 地址轉化爲一個 HostAndPort 對象。 final HostAndPort hap = HostAndPort.parseString(sentinel); log.fine("Connecting to Sentinel " + hap); Jedis jedis = null; try { // 鏈接到 sentinel jedis = new Jedis(hap.getHost(), hap.getPort()); // 根據 masterName 獲得 master 的地址,返回一個 list,host= list[0], port =// list[1] List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName); // connected to sentinel... sentinelAvailable = true; if (masterAddr == null || masterAddr.size() != 2) { log.warning("Can not get master addr, master name: " + masterName + ". Sentinel: " + hap + "."); continue; } // 若是在任何一個 sentinel 中找到了 master,再也不遍歷 sentinels master = toHostAndPort(masterAddr); log.fine("Found Redis master at " + master); break; } catch (JedisException e) { // resolves #1036, it should handle JedisException there's another chance // of raising JedisDataException log.warning("Cannot get master address from sentinel running @ " + hap + ". Reason: " + e + ". Trying next one."); } finally { if (jedis != null) { jedis.close(); } } } // 到這裏,若是 master 爲 null,則說明有兩種狀況,一種是全部的 sentinels 節點都 down 掉了,一種是 master節點沒有被存活的 sentinels 監控到 if (master == null) { if (sentinelAvailable) { // can connect to sentinel, but master name seems to not // monitored throw new JedisException("Can connect to sentinel, but " + masterName + " seems to be not monitored..."); } else { throw new JedisConnectionException("All sentinels down, cannot determine where is " + masterName + " master is running..."); } } // 若是走到這裏,說明找到了 master 的地址 log.info("Redis master running at " + master + ", starting Sentinel listeners..."); // 啓動對每一個 sentinels 的監聽爲每一個 sentinel 都啓動了一個監聽者 MasterListener。MasterListener 自己是一個線程,它會去訂閱 sentinel 上關於 master 節點地址改變的消息。 for (String sentinel : sentinels) { final HostAndPort hap = HostAndPort.parseString(sentinel); MasterListener masterListener = new MasterListener(masterName, hap.getHost(), hap.getPort()); // whether MasterListener threads are alive or not, process can be stopped masterListener.setDaemon(true); masterListeners.add(masterListener); masterListener.start(); } return master; }
問題:使用 Jedis 鏈接 Cluster 的時候,咱們只須要鏈接到任意一個或者多個 redisgroup 中的實例地址,那咱們是怎麼獲取到須要操做的 Redis Master 實例的?
關鍵問題:在於如何存儲 slot 和 Redis 鏈接池的關係。
一、程序啓動初始化集羣環境,讀取配置文件中的節點配置,不管是主從,不管多少個,只拿第一個,獲取 redis 鏈接實例(後面有個 break)。
// redis.clients.jedis.JedisClusterConnectionHandler#initializeSlotsCache private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig, String password) { for (HostAndPort hostAndPort : startNodes) { // 獲取一個 Jedis 實例 Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort()); if (password != null) { jedis.auth(password); } try { // 獲取 Redis 節點和 Slot 虛擬槽 cache.discoverClusterNodesAndSlots(jedis); // 直接跳出循環 break; } catch (JedisConnectionException e) { // try next nodes } finally { if (jedis != null) { jedis.close(); } } }
二、用獲取的 redis 鏈接實例執行 clusterSlots ()方法,實際執行 redis 服務端 clusterslots 命令,獲取虛擬槽信息。
該集合的基本信息爲[long, long, List, List], 第一,二個元素是該節點負責槽點的起始位置,第三個元素是主節點信息,第四個元素爲主節點對應的從節點信息。該 list 的基本信息爲[string,int,string],第一個爲 host 信息,第二個爲 port 信息,第三個爲惟一id。
三、獲取有關節點的槽點信息後,調用 getAssignedSlotArray(slotinfo)來獲取全部的槽點值。
四、再獲取主節點的地址信息,調用 generateHostAndPort(hostInfo)方法,生成一個 ostAndPort 對象。
五、再根據節點地址信息來設置節點對應的 JedisPool,即設置 Map<String,JedisPool> nodes 的值。
接下來判斷若此時節點信息爲主節點信息時,則調用 assignSlotsToNodes 方法,設置每一個槽點值對應的鏈接池,即設置 Map<Integer, JedisPool> slots 的值。
public void discoverClusterNodesAndSlots(Jedis jedis) { w.lock(); try { reset(); // 獲取節點集合 List<Object> slots = jedis.clusterSlots(); // 遍歷 3 個 master 節點 for (Object slotInfoObj : slots) { // slotInfo 槽開始,槽結束,主,從 // {[0,5460,7291,7294],[5461,10922,7292,7295],[10923,16383,7293,7296]} List<Object> slotInfo = (List<Object>) slotInfoObj; // 若是<=2,表明沒有分配 slot if (slotInfo.size() <= MASTER_NODE_INDEX) { continue; } // 獲取分配到當前 master 節點的數據槽,例如 7291 節點的{0,1,2,3……5460} List<Integer> slotNums = getAssignedSlotArray(slotInfo); // hostInfos int size = slotInfo.size(); // size 是 4,槽最小最大,主,從 // 第 3 位和第 4 位是主從端口的信息 for (int i = MASTER_NODE_INDEX; i < size; i++) { List<Object> hostInfos = (List<Object>) slotInfo.get(i); if (hostInfos.size() <= 0) { continue; } // 根據 IP 端口生成 HostAndPort 實例 HostAndPort targetNode = generateHostAndPort(hostInfos); // 據HostAndPort解析出ip:port的key值,再根據key從緩存中查詢對應的jedisPool實例。若是沒有jedisPool實例,就建立 JedisPool 實例,最後放入緩存中。nodeKey 和 nodePool 的關係 setupNodeIfNotExist(targetNode); // 把 slot 和 jedisPool 緩存起來(16384 個),key 是 slot 下標,value 是鏈接池 if (i == MASTER_NODE_INDEX) { assignSlotsToNode(slotNums, targetNode); } } } finally { w.unlock(); } }
從集羣環境存取值:
一、把 key 做爲參數,執行 CRC16 算法,獲取 key 對應的 slot 值。
二、經過該 slot 值,去 slots 的 map 集合中獲取 jedisPool 實例。
三、經過 jedisPool 實例獲取 jedis 實例,最終完成 redis 數據存取工做。
咱們看到 set 2 萬個 key 用了好幾分鐘,這個速度太慢了,徹底沒有把 Redis 10萬的 QPS 利用起來。可是單個命令的執行到底慢在哪裏?
Redis 使用的是客戶端/服務器(C/S)模型和請求/響應協議的 TCP 服務器。這意味着一般狀況下一個請求會遵循如下步驟:
客戶端向服務端發送一個查詢請求,並監聽 Socket 返回,一般是以阻塞模式,等待服務端響應。
服務端處理命令,並將結果返回給客戶端。
Redis 客戶端與 Redis 服務器之間使用 TCP 協議進行鏈接,一個客戶端能夠經過一個 socket 鏈接發起多個請求命令。每一個請求命令發出後 client 一般會阻塞並等待 redis服務器處理,redis 處理完請求命令後會將結果經過響應報文返回給 client,所以當執行多條命令的時候都須要等待上一條命令執行完畢才能執行。執行過程如圖:
Redis 自己提供了一些批量操做命令,好比 mget,mset,能夠減小通訊的時間,可是大部分命令是不支持 multi 操做的,例如 hash 就沒有.
因爲通訊會有網絡延遲,假如 client 和 server 之間的包傳輸時間須要 10 毫秒,一次交互就是 20 毫秒(RTT:Round Trip Time)。這樣的話,client 1 秒鐘也只能也只能發送 50 個命令。這顯然沒有充分利用 Redis 的處理能力。另一個,Redis 服務端執行 I/O 的次數過多。
https://redis.io/topics/pipelining
那咱們能不能像數據庫的 batch 操做同樣,把一組命令組裝在一塊兒發送給 Redis 服務端執行,而後一次性得到返回結果呢?這個就是 Pipeline 的做用。Pipeline 經過一個隊列把全部的命令緩存起來,而後把多個命令在一次鏈接中發送給服務器。
先來看一下效果(先 flushall):
PipelineSet.java,PipelineGet.java
要實現 Pipeline,既要服務端的支持,也要客戶端的支持。對於服務端來講,須要可以處理客戶端經過一個 TCP 鏈接發來的多個命令,而且逐個地執行命令一塊兒返回 。
對於客戶端來講,要把多個命令緩存起來,達到必定的條件就發送出去,最後才處理 Redis 的應答(這裏也要注意對客戶端內存的消耗)。
jedis-pipeline 的 client-buffer 限制:8192bytes,客戶端堆積的命令超過 8192bytes 時,會發送給服務端。
源碼:redis.clients.util.RedisOutputStream.java
public RedisOutputStream(final OutputStream out) { this(out, 8192); }
pipeline 對於命令條數沒有限制,可是命令可能會受限於 TCP 包大小。
若是 Jedis 發送了一組命令,而發送請求尚未結束,Redis 響應的結果會放在接緩衝區。若是接收緩衝區滿了,jedis 會通知 redis win=0,此時 redis 不會再發送結果給 jedis 端,轉而把響應結果保存在 Redis 服務端的輸出緩衝區中。
輸出緩衝區的配置:redis.conf
client-output-buffer-limit
client-output-buffer-limit normal 0 0 0 client-output-buffer-limit replica 256mb 64mb 60 client-output-buffer-limit pubsub 32mb 8mb 60
配置 | 做用 |
---|---|
class | 客戶端類型,分爲三種。a)normal:普通客戶端;b)slave:slave 客戶端,用於複製;c)pubsub:發佈訂閱客戶端 |
hard limit | 若是客戶端使用的輸出緩衝區大於
|
soft limit soft seconds |
若是客戶端使用的輸出緩衝區超過了
|
每一個客戶端使用的輸出緩衝區的大小能夠用 client list 命令查看
redis> client list
id=5 addr=192.168.8.1:10859 fd=8 name= age=5 idle=0 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=5 qbuf-free=32763 obl=16380 oll=227 omem=4654408 events=rw cmd=set
Pipeline 適用於什麼場景呢?
若是某些操做須要立刻獲得 Redis 操做是否成功的結果,這種場景就不適合。
有些場景,例如批量寫入數據,對於結果的實時性和成功性要求不高,就能夠用Pipeline
原文地址:https://redis.io/topics/distlock
中文地址:http://redis.cn/topics/distlock.html
分佈式鎖的基本特性或者要求:
一、互斥性:只有一個客戶端可以持有鎖。
二、不會產生死鎖:即便持有鎖的客戶端崩潰,也能保證後續其餘客戶端能夠獲取鎖。
三、只有持有這把鎖的客戶端才能解鎖。
distlock.DistLock.java
/** * 嘗試獲取分佈式鎖 * @param jedis Redis客戶端 * @param lockKey 鎖 * @param requestId 請求標識 * @param expireTime 超期時間 * @return 是否獲取成功 */ public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) { // set支持多個參數 NX(not exist) XX(exist) EX(seconds) PX(million seconds) String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); if (LOCK_SUCCESS.equals(result)) { return true; } return false; }
參數解讀:
一、lockKey 是 Redis key 的名稱,也就是誰添加成功這個 key 表明誰獲取鎖成功。
二、requestId 是客戶端的 ID(設置成 value),若是咱們要保證只有加鎖的客戶端才能釋放鎖,就必須得到客戶端的 ID(保證第 3 點)。
三、SET_IF_NOT_EXIST 是咱們的命令裏面加上 NX(保證第 1 點)。
四、SET_WITH_EXPIRE_TIME,PX 表明以毫秒爲單位設置 key 的過時時間(保證第 2 點)。expireTime 是自動釋放鎖的時間,好比 5000 表明 5 秒。
釋放鎖,直接刪除 key 來釋放鎖能夠嗎?就像這樣:
public static void wrongReleaseLock1(Jedis jedis, String lockKey) { jedis.del(lockKey); }
沒有對客戶端 requestId 進行判斷,可能會釋放其餘客戶端持有的鎖。
先判斷後刪除呢?
public static void wrongReleaseLock2(Jedis jedis, String lockKey, String requestId) { // 判斷加鎖與解鎖是否是同一個客戶端 if (requestId.equals(jedis.get(lockKey))) { // 若在此時,這把鎖忽然不是這個客戶端的,則會誤解鎖 jedis.del(lockKey); } }
若是在釋放鎖的時候,這把鎖已經不屬於這個客戶端(例如已通過期,而且被別的客戶端獲取鎖成功了),那就會出現釋放了其餘客戶端的鎖的狀況。
因此咱們把判斷客戶端是否相等和刪除 key 的操做放在 Lua 腳本里面執行。
/** * 釋放分佈式鎖 * @param jedis Redis客戶端 * @param lockKey 鎖 * @param requestId 請求標識 * @return 是否釋放成功 */ public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) { String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); if (RELEASE_SUCCESS.equals(result)) { return true; } return false; }
這個是 Jedis 裏面分佈式鎖的實現。
https://lettuce.io/
與 Jedis 相比,Lettuce 則徹底克服了其線程不安全的缺點:Lettuce 是一個可伸縮的線程安全的 Redis 客戶端,支持同步、異步和響應式模式(Reactive)。多個線程能夠共享一個鏈接實例,而沒必要擔憂多線程併發問題。
同步調用:
public class LettuceSyncTest { public static void main(String[] args) { // 建立客戶端 RedisClient client = RedisClient.create("redis://127.0.0.1:6379"); // 線程安全的長鏈接,鏈接丟失時會自動重連 StatefulRedisConnection<String, String> connection = client.connect(); // 獲取同步執行命令,默認超時時間爲 60s RedisCommands<String, String> sync = connection.sync(); // 發送get請求,獲取值 sync.set("gupao:sync","lettuce-sync-666" ); String value = sync.get("gupao:sync"); System.out.println("------"+value); //關閉鏈接 connection.close(); //關掉客戶端 client.shutdown(); } }
異步的結果使用 RedisFuture 包裝,提供了大量回調的方法。
異步調用:
import io.lettuce.core.RedisClient; import io.lettuce.core.RedisFuture; import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.api.async.RedisAsyncCommands; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class LettuceASyncTest { public static void main(String[] args) { RedisClient client = RedisClient.create("redis://127.0.0.1:6379"); // 線程安全的長鏈接,鏈接丟失時會自動重連 StatefulRedisConnection<String, String> connection = client.connect(); // 獲取異步執行命令api RedisAsyncCommands<String, String> commands = connection.async(); // 獲取RedisFuture<T> commands.set("gupao:async","lettuce-async-666"); RedisFuture<String> future = commands.get("gupao:async"); try { String value = future.get(60, TimeUnit.SECONDS); System.out.println("------"+value); } catch (InterruptedException | ExecutionException | TimeoutException e) { e.printStackTrace(); } } }
它基於 Netty 框架構建,支持 Redis 的高級功能,如 Pipeline、發佈訂閱,事務、Sentinel,集羣,支持鏈接池。
Lettuce 是 Spring Boot 2.x 默認的客戶端,替換了 Jedis。集成以後咱們不須要單獨使用它,直接調用 Spring 的 RedisTemplate 操做,鏈接和建立和關閉也不須要咱們操心。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
https://redisson.org/
https://github.com/redisson/redisson/wiki/目錄
Redisson 是一個在 Redis 的基礎上實現的 Java 駐內存數據網格(In-Memory
Data Grid),提供了分佈式和可擴展的 Java 數據結構。
基於 Netty 實現,採用非阻塞 IO,性能高
支持異步請求
支持鏈接池、pipeline、LUA Scripting、Redis Sentinel、Redis Cluster
不支持事務,官方建議以 LUA Scripting 代替事務
主從、哨兵、集羣都支持。Spring 也能夠配置和注入 RedissonClient。
在 Redisson 裏面提供了更加簡單的分佈式鎖的實現。
public static void main(String[] args) throws InterruptedException { RLock rLock=redissonClient.getLock("updateAccount"); // 最多等待 100 秒、上鎖 10s 之後自動解鎖 if(rLock.tryLock(100,10, TimeUnit.SECONDS)){ System.out.println("獲取鎖成功"); } // do something rLock.unlock(); }
在得到 RLock 以後,只須要一個 tryLock 方法,裏面有 3 個參數:
一、watiTime:獲取鎖的最大等待時間,超過這個時間再也不嘗試獲取鎖
二、leaseTime:若是沒有調用 unlock,超過了這個時間會自動釋放鎖
三、TimeUnit:釋放時間的單位
Redisson 的分佈式鎖是怎麼實現的呢?
在加鎖的時候,在 Redis 寫入了一個 HASH,key 是鎖名稱,field 是線程名稱,value是 1(表示鎖的重入次數)
源碼:
tryLock()——tryAcquire()——tryAcquireAsync()——tryLockInnerAsync()
最終也是調用了一段 Lua 腳本。裏面有一個參數,兩個參數的值
佔位 | 填充 | 含義 | 實際值 |
---|---|---|---|
KEYS[1] | getName() | 鎖的名稱(key) | updateAccount |
ARGV[1] | internalLockLeaseTime | 鎖釋放時間(毫秒) | 10000 |
ARGV[2] | getLockName(threadId) | 線程名稱 | b60a9c8c-92f8-4bfe-b0e7-308967346336:1 |
// KEYS[1] 鎖名稱 updateAccount // ARGV[1] key 過時時間 10000ms // ARGV[2] 線程名稱 // 鎖名稱不存在 if (redis.call('exists', KEYS[1]) == 0) then // 建立一個 hash,key=鎖名稱,field=線程名,value=1 redis.call('hset', KEYS[1], ARGV[2], 1); // 設置 hash 的過時時間 redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; // 鎖名稱存在,判斷是否當前線程持有的鎖 if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then // 若是是,value+1,表明重入次數+1 redis.call('hincrby', KEYS[1], ARGV[2], 1); // 從新得到鎖,須要從新設置 Key 的過時時間 redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; // 鎖存在,可是不是當前線程持有,返回過時時間(毫秒) return redis.call('pttl', KEYS[1])
釋放鎖,源碼:
unlock——unlockInnerAsync
佔位 | 填充 | 含義 | 實際值 |
---|---|---|---|
KEYS[1] | getName() | 鎖名稱 | updateAccount |
KEYS[2] | getChannelName() | 頻道名稱 | redisson_lock__channel:{updateAccount} |
KEYS[3] | LockPubSub.unlockMessage | 解鎖時的消息 | 0 |
KEYS[4] | internalLockLeaseTime | 釋放鎖的時間 | 10000 |
KEYS[5] | getLockName(threadId) | 線程名稱 | b60a9c8c-92f8-4bfe-b0e7-308967346336:1 |
// KEYS[1] 鎖的名稱 updateAccount // KEYS[2] 頻道名稱 redisson_lock__channel:{updateAccount} // ARGV[1] 釋放鎖的消息 0 // ARGV[2] 鎖釋放時間 10000 // ARGV[3] 線程名稱 // 鎖不存在(過時或者已經釋放了) if (redis.call('exists', KEYS[1]) == 0) then // 發佈鎖已經釋放的消息 redis.call('publish', KEYS[2], ARGV[1]); return 1; end; // 鎖存在,可是不是當前線程加的鎖 if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil; end; // 鎖存在,是當前線程加的鎖 // 重入次數-1 local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); // -1 後大於 0,說明這個線程持有這把鎖還有其餘的任務須要執行 if (counter > 0) then // 從新設置鎖的過時時間 redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else // -1 以後等於 0,如今能夠刪除鎖了 redis.call('del', KEYS[1]); // 刪除以後發佈釋放鎖的消息 redis.call('publish', KEYS[2], ARGV[1]); return 1; end; // 其餘狀況返回 nil return nil;
這個是 Redisson 裏面分佈式鎖的實現,咱們在調用的時候很是簡單。
Redisson 跟 Jedis 定位不一樣,它不是一個單純的 Redis 客戶端,而是基於 Redis 實現的分佈式的服務,若是有須要用到一些分佈式的數據結構,好比咱們還能夠基於Redisson 的分佈式隊列實現分佈式事務,就能夠引入 Redisson 的依賴實現。
針對讀多寫少的高併發場景,咱們可使用緩存來提高查詢速度。
當咱們使用 Redis 做爲緩存的時候,通常流程是這樣的:
一、若是數據在 Redis 存在,應用就能夠直接從 Redis 拿到數據,不用訪問數據庫。
二、若是 Redis 裏面沒有,先到數據庫查詢,而後寫入到 Redis,再返回給應用
由於這些數據是不多修改的,因此在絕大部分的狀況下能夠命中緩存。可是,一旦被緩存的數據發生變化的時候,咱們既要操做數據庫的數據,也要操做 Redis 的數據,因此問題來了。如今咱們有兩種選擇:
一、先操做 Redis 的數據再操做數據庫的數據
二、先操做數據庫的數據再操做 Redis 的數據
到底選哪種?
首先須要明確的是,無論選擇哪種方案, 咱們確定是但願兩個操做要麼都成功,要麼都一個都不成功。否則就會發生 Redis 跟數據庫的數據不一致的問題。
可是,Redis 的數據和數據庫的數據是不可能經過事務達到統一的,咱們只能根據相應的場景和所須要付出的代價來採起一些措施下降數據不一致的問題出現的機率,在數據一致性和性能之間取得一個權衡。
對於數據庫的實時性一致性要求不是特別高的場合,好比 T+1 的報表,能夠採用定時任務查詢數據庫數據同步到 Redis 的方案。
因爲咱們是以數據庫的數據爲準的,因此給緩存設置一個過時時間,是保證最終一致性的解決方案。
這裏咱們先要補充一點,當存儲的數據發生變化,Redis 的數據也要更新的時候,咱們有兩種方案,一種就是直接更新,調用 set;還有一種是直接刪除緩存,讓應用在下次查詢的時候從新寫入。
這兩種方案怎麼選擇呢?這裏咱們主要考慮更新緩存的代價。
更新緩存以前,是否是要通過其餘表的查詢、接口調用、計算才能獲得最新的數據,而不是直接從數據庫拿到的值。若是是的話,建議直接刪除緩存,這種方案更加簡單,並且避免了數據庫的數據和緩存不一致的狀況。在通常狀況下,咱們也推薦使用刪除的方案。
這一點明確以後,如今咱們就剩一個問題:
一、究竟是先更新數據庫,再刪除緩存
二、仍是先刪除緩存,再更新數據庫
咱們先看第一種方案。
正常狀況:
更新數據庫,成功。
刪除緩存,成功。
異常狀況:
一、更新數據庫失敗,程序捕獲異常,不會走到下一步,因此數據不會出現不一致。
二、更新數據庫成功,刪除緩存失敗。數據庫是新數據,緩存是舊數據,發生了不一致的狀況。
這種問題怎麼解決呢?咱們能夠提供一個重試的機制。
好比:若是刪除緩存失敗,咱們捕獲這個異常,把須要刪除的 key 發送到消息隊列。讓後本身建立一個消費者消費,嘗試再次刪除這個 key。
這種方式有個缺點,會對業務代碼形成入侵。
因此咱們又有了第二種方案(異步更新緩存):
由於更新數據庫時會往 binlog 寫入日誌,因此咱們能夠經過一個服務來監聽 binlog的變化(好比阿里的 canal),而後在客戶端完成刪除 key 的操做。若是刪除失敗的話,再發送到消息隊列。
總之,對於後刪除緩存失敗的狀況,咱們的作法是不斷地重試刪除,直到成功。
不管是重試仍是異步刪除,都是最終一致性的思想。
正常狀況:
刪除緩存,成功。
更新數據庫,成功。
異常狀況:
一、刪除緩存,程序捕獲異常,不會走到下一步,因此數據不會出現不一致。
二、刪除緩存成功,更新數據庫失敗。 由於以數據庫的數據爲準,因此不存在數據不一致的狀況。
看起來好像沒問題,可是若是有程序併發操做的狀況下:
1)線程 A 須要更新數據,首先刪除了 Redis 緩存
2)線程 B 查詢數據,發現緩存不存在,到數據庫查詢舊值,寫入 Redis,返回
3)線程 A 更新了數據庫
這個時候,Redis 是舊的值,數據庫是新的值,發生了數據不一致的狀況。
那問題就變成了:能不能讓對同一條數據的訪問串行化呢?代碼確定保證不了,由於有多個線程,即便作了任務隊列也可能有多個服務實例。數據庫也保證不了,由於會有多個數據庫的鏈接。只有一個數據庫只提供一個鏈接的狀況下,才能保證讀寫的操做是串行的,或者咱們把全部的讀寫請求放到同一個內存隊列當中,可是這種狀況吞吐量過低了。
因此咱們有一種延時雙刪的策略,在寫入數據以後,再刪除一次緩存。
A 線程:
1)刪除緩存
2)更新數據庫
3)休眠 500ms(這個時間,依據讀取數據的耗時而定)
4)再次刪除緩存
僞代碼:
public void write(String key,Object data){ redis.delKey(key); db.updateData(data); Thread.sleep(500); redis.delKey(key); }
在 Redis 存儲的全部數據中,有一部分是被頻繁訪問的。有兩種狀況可能會致使熱點問題的產生,一個是用戶集中訪問的數據,好比搶購的商品,明星結婚和明星出軌的微博。還有一種就是在數據進行分片的狀況下,負載不均衡,超過了單個服務器的承受能力。熱點問題可能引發緩存服務的不可用,最終形成壓力堆積到數據庫。
出於存儲和流量優化的角度,咱們必需要找到這些熱點數據。
除了自動的緩存淘汰機制以外,怎麼找出那些訪問頻率高的 key 呢?或者說,咱們能夠在哪裏記錄 key 被訪問的狀況呢?
第一個固然是在客戶端了,好比咱們可不能夠在全部調用了 get、set 方法的地方,加上 key 的計數。可是這樣的話,每個地方都要修改,重複的代碼也多。若是咱們用的是 Jedis 的客戶端,咱們能夠在 Jedis 的 Connection 類的 sendCommand()裏面,用一個 HashMap 進行 key 的計數。
可是這種方式有幾個問題:
一、不知道要存多少個 key,可能會發生內存泄露的問題。
二、會對客戶端的代碼形成入侵。
三、只能統計當前客戶端的熱點 key。
第二種方式就是在代理端實現,好比 TwemProxy 或者 Codis,可是不是全部的項目都使用了代理的架構。
第三種就是在服務端統計,Redis 有一個 monitor 的命令,能夠監控到全部 Redis執行的命令。
代碼:
jedis.monitor(new JedisMonitor() { @Override public void onCommand(String command) { System.out.println("#monitor: " + command); } });
Facebook 的 開 源 項 目 redis-faina(https://github.com/facebookarchive/redis-faina.git)就是基於這個原理實現的。它是一個 python 腳本,能夠分析 monitor 的數據。
redis-cli -p 6379 monitor | head -n 100000 | ./redis-faina.py
這種方法也會有兩個問題:1)monitor 命令在高併發的場景下,會影響性能,因此不適合長時間使用。
只能統計一個 Redis 節點的熱點 key。
還有一種方法就是機器層面的,經過對 TCP 協議進行抓包,也有一些開源的方案,
好比 ELK 的 packetbeat 插件。
當咱們發現了熱點 key 以後,咱們來看下熱點數據在高併發的場景下可能會出現的
問題,以及怎麼去解決。
緩存雪崩就是 Redis 的大量熱點數據同時過時(失效),由於設置了相同的過時時間,恰好這個時候 Redis 請求的併發量又很大,就會致使全部的請求落到數據庫。
1)加互斥鎖或者使用隊列,針對同一個 key 只容許一個線程到數據庫查詢
2)緩存定時預先更新,避免同時失效
3)經過加隨機數,使 key 在不一樣的時間過時
4)緩存永不過時
咱們已經知道了 Redis 使用的場景了。在緩存存在和緩存不存在的狀況下的什麼狀況咱們都瞭解了。
還有一種狀況,數據在數據庫和 Redis 裏面都不存在,多是一次條件錯誤的查詢。在這種狀況下,由於數據庫值不存在,因此確定不會寫入 Redis,那麼下一次查詢相同的key 的時候,確定仍是會再到數據庫查一次。那麼這種循環查詢數據庫中不存在的值,而且每次使用的是相同的 key 的狀況,咱們有沒有什麼辦法避免應用到數據庫查詢呢?
(1)緩存空數據 (2)緩存特殊字符串,好比&&
咱們能夠在數據庫緩存一個空字符串,或者緩存一個特殊的字符串,那麼在應用裏面拿到這個特殊字符串的時候,就知道數據庫沒有值了,也沒有必要再到數據庫查詢了。可是這裏須要設置一個過時時間,否則的話數據庫已經新增了這一條記錄,應用也仍是拿不到值。
這個是應用重複查詢同一個不存在的值的狀況,若是應用每一次查詢的不存在的值是不同的呢?即便你每次都緩存特殊字符串也沒用,由於它的值不同,好比咱們的用戶系統登陸的場景,若是是惡意的請求,它每次都生成了一個符合 ID 規則的帳號,可是這個帳號在咱們的數據庫是不存在的,那 Redis 就徹底失去了做用
這種由於每次查詢的值都不存在致使的 Redis 失效的狀況,咱們就把它叫作緩存穿透。這個問題咱們應該怎麼去解決呢?
其實它也是一個通用的問題,關鍵就在於咱們怎麼知道請求的 key 在咱們的數據庫裏面是否存在,若是數據量特別大的話,咱們怎麼去快速判斷。
這也是一個很是經典的面試題:
如何在海量元素中(例如 10 億無序、不定長、不重複)快速判斷一個元素是否存在?
若是是緩存穿透的這個問題,咱們要避免到數據庫查詢不存的數據,確定要把這 10億放在別的地方。這些數據在 Redis 裏面也是沒有的,爲了加快檢索速度,咱們要把數據放到內存裏面來判斷,問題來了:
若是咱們直接把這些元素的值放到基本的數據結構(List、Map、Tree)裏面,好比一個元素 1 字節的字段,10 億的數據大概須要 900G 的內存空間,這個對於普通的服務器來講是承受不了的。
因此,咱們存儲這幾十億個元素,不能直接存值,咱們應該找到一種最簡單的最節省空間的數據結構,用來標記這個元素有沒有出現。
這個東西咱們就把它叫作位圖,他是一個有序的數組,只有兩個值,0 和 1。0 表明不存在,1 表明存在。
那咱們怎麼用這個數組裏面的有序的位置來標記這10億個元素是否存在呢?咱們是否是必需要有一個映射方法,把元素映射到一個下標位置上?
對於這個映射方法,咱們有幾個基本的要求:
1)由於咱們的值長度是不固定的,我但願不一樣長度的輸入,能夠獲得固定長度的輸出。
2)轉換成下標的時候,我但願他在個人這個有序數組裏面是分佈均勻的,否則的話所有擠到一對去了,我也無法判斷到底哪一個元素存了,哪一個元素沒存。
這個就是哈希函數,好比 MD五、SHA-1 等等這些都是常見的哈希算法。
好比,這 6 個元素,咱們通過哈希函數和位運算,獲得了相應的下標。
這個時候,Tom 和 Mic 通過計算獲得的哈希值是同樣的,那麼再通過位運算獲得的下標確定是同樣的,咱們把這種狀況叫作哈希衝突或者哈希碰撞。
若是發生了哈希碰撞,這個時候對於咱們的容器存值確定是有影響的,咱們能夠經過哪些方式去下降哈希碰撞的機率呢?
第一種就是擴大維數組的長度或者說位圖容量。由於咱們的函數是分佈均勻的,因此,位圖容量越大,在同一個位置發生哈希碰撞的機率就越小。
是否是位圖容量越大越好呢?無論存多少個元素,都建立一個幾萬億大小的位圖,能夠嗎?固然不行,由於越大的位圖容量,意味着越多的內存消耗,因此咱們要建立一個合適大小的位圖容量。
除了擴大位圖容量,咱們還有什麼下降哈希碰撞機率的方法呢?
若是兩個元素通過一次哈希計算,獲得的相同下標的機率比較高,我能夠不能夠計算屢次呢? 原來我只用一個哈希函數,如今我對於每個要存儲的元素都用多個哈希函數計算,這樣每次計算出來的下標都相同的機率就小得多了。
一樣的,咱們能不能引入不少個哈希函數呢?好比都計算 100 次,均可以嗎?固然也會有問題,第一個就是它會填滿位圖的更多空間,第二個是計算是須要消耗時間的。
因此總的來講,咱們既要節省空間,又要很高的計算效率,就必須在位圖容量和函數個數之間找到一個最佳的平衡。
好比說:咱們存放 100 萬個元素,到底須要多大的位圖容量,須要多少個哈希函數呢?
固然,這個事情早就有人研究過了,在 1970 年的時候,有一個叫作布隆的前輩對於判斷海量元素中元素是否存在的問題進行了研究,也就是到底須要多大的位圖容量和多少個哈希函數,它發表了一篇論文,提出的這個容器就叫作布隆過濾器。
咱們來看一下布隆過濾器的工做原理。
首先,布隆過濾器的本質就是咱們剛纔分析的,一個位數組,和若干個哈希函數。
集合裏面有 3 個元素,要把它存到布隆過濾器裏面去,應該怎麼作?首先是 a 元素,這裏咱們用 3 次計算。b、c 元素也同樣。
元素已經存進去以後,如今我要來判斷一個元素在這個容器裏面是否存在,就要使用一樣的三個函數進行計算。
好比 d 元素,我用第一個函數 f1 計算,發現這個位置上是 1,沒問題。第二個位置也是 1,第三個位置也是 1 。
若是通過三次計算獲得的下標位置值都是 1,這種狀況下,能不能肯定 d 元素必定在這個容器裏面呢? 其實是不能的。好比這張圖裏面,這三個位置分別是把 a,b,c 存進去的時候置成 1 的,因此即便 d 元素以前沒有存進去,也會獲得三個 1,判斷返回 true。
因此,這個是布隆過濾器的一個很重要的特性,由於哈希碰撞不可避免,因此它會存在必定的誤判率。這種把原本不存在布隆過濾器中的元素誤判爲存在的狀況,咱們把它叫作假陽性(False Positive Probability,FPP)。
咱們再來看另外一個元素,e 元素。咱們要判斷它在容器裏面是否存在,同樣地要用這三個函數去計算。第一個位置是 1,第二個位置是 1,第三個位置是 0。
e 元素是否是必定不在這個容器裏面呢? 能夠肯定必定不存在。若是說當時已經把e 元素存到布隆過濾器裏面去了,那麼這三個位置確定都是 1,不可能出現 0。
總結:布隆過濾器的特色:
從容器的角度來講:
一、若是布隆過濾器判斷元素在集合中存在,不必定存在
二、若是布隆過濾器判斷不存在,必定不存在從元素的角度來講:
三、若是元素實際存在,布隆過濾器必定判斷存在
四、若是元素實際不存在,布隆過濾器可能判斷存在利用,第二個特性,咱們是否是就能解決持續從數據庫查詢不存在的值的問題?
谷歌的 Guava 裏面就提供了一個現成的布隆過濾器。
<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>21.0</version> </dependency>
建立布隆過濾器:
BloomFilter<String> bf = BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8), insertions);
布隆過濾器提供的存放元素的方法是 put()。
布隆過濾器提供的判斷元素是否存在的方法是 mightContain()。
if (bf.mightContain(data)) { if (sets.contains(data)) { // 判斷存在實際存在的時候,命中 right++; continue; } // 判斷存在卻不存在的時候,錯誤 wrong++; }
布隆過濾器把誤判率默認設置爲 0.03,也能夠在建立的時候指定。
public static <T> BloomFilter<T> create(Funnel<? super T> funnel, long expectedInsertions) { return create(funnel, expectedInsertions, 0.03D); }
位圖的容量是基於元素個數和誤判率計算出來的。
long numBits = optimalNumOfBits(expectedInsertions, fpp);
根據位數組的大小,咱們進一步計算出了哈希函數的個數。
int numHashFunctions = optimalNumOfHashFunctions(expectedInsertions, numBits);
存儲 100 萬個元素只佔用了 0.87M 的內存,生成了 5 個哈希函數。
https://hur.st/bloomfilter/?n=1000000&p=0.03&m=&k=
布隆過濾器的工做位置:
由於要判斷數據庫的值是否存在,因此第一步是加載數據庫全部的數據。在去 Redis查詢以前,先在布隆過濾器查詢,若是 bf 說沒有,那數據庫確定沒有,也不用去查了。若是 bf 說有,才走以前的流程。
import com.google.common.base.Charsets; import com.google.common.hash.BloomFilter; import com.google.common.hash.Funnels; import com.gupaoedu.entity.User; import com.gupaoedu.service.UserService; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.ValueOperations; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; import java.util.UUID; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @RunWith(SpringJUnit4ClassRunner.class) @SpringBootTest @EnableAutoConfiguration public class BloomTestsConcurrency { @Resource private RedisTemplate redisTemplate; @Autowired private UserService userService; private static final int THREAD_NUM = 1000; // 併發線程數量,Windows機器不要設置過大 static BloomFilter<String> bf; static List<User> allUsers; @PostConstruct public void init() { // 從數據庫獲取數據,加載到布隆過濾器 long start = System.currentTimeMillis(); allUsers = userService.getAllUser(); if (allUsers == null || allUsers.size() == 0) { return; } // 建立布隆過濾器,默認誤判率0.03,即3% bf = BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8), allUsers.size()); // 誤判率越低,數組長度越長,須要的哈希函數越多 // bf = BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8), allUsers.size(), 0.0001); // 將數據存入布隆過濾器 for (User user : allUsers) { bf.put(user.getAccount()); } long end = System.currentTimeMillis(); System.out.println("查詢並加載"+allUsers.size()+"條數據到布隆過濾器完畢,總耗時:"+(end -start ) +"毫秒"); } @Test public void cacheBreakDownTest() { long start = System.currentTimeMillis(); allUsers = userService.getAllUser(); CyclicBarrier cyclicBarrier = new CyclicBarrier(THREAD_NUM); ExecutorService executorService = Executors.newFixedThreadPool(THREAD_NUM); for (int i = 0; i < THREAD_NUM; i++){ executorService.execute(new BloomTestsConcurrency().new MyThread(cyclicBarrier, redisTemplate, userService)); } executorService.shutdown(); //判斷是否全部的線程已經運行完 while (!executorService.isTerminated()) { } long end = System.currentTimeMillis(); System.out.println("併發數:"+THREAD_NUM + ",新建線程以及過濾總耗時:"+(end -start ) +"毫秒,演示結束"); } public class MyThread implements Runnable { private CyclicBarrier cyclicBarrier; private RedisTemplate redisTemplate; private UserService userService; public MyThread(CyclicBarrier cyclicBarrier, RedisTemplate redisTemplate, UserService userService) { this.cyclicBarrier = cyclicBarrier; this.redisTemplate = redisTemplate; this.userService = userService; } @Override public void run() { //全部子線程等待,當子線程所有建立完成再一塊兒併發執行後面的代碼 try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } // 1.1 (測試:布隆過濾器判斷不存在,攔截——若是沒有布隆過濾器,將形成緩存穿透) // 隨機產生一個字符串,在布隆過濾器中不存在 String randomUser = UUID.randomUUID().toString(); // 1.2 (測試:布隆過濾器判斷存在,從Redis緩存取值,若是Redis爲空則查詢數據庫並寫入Redis) // 從List中獲取一個存在的用戶 // String randomUser = allUsers.get(new Random().nextInt(allUsers.size())).getAccount(); String key = "Key:" + randomUser; Date date1 = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // 若是布隆過濾器中不存在這個用戶直接返回,將流量擋掉 /* if (!bf.mightContain(randomUser)) { System.out.println(sdf.format(date1)+" 布隆過濾器中不存在,非法請求"); return; }*/ // 查詢緩存,若是緩存中存在直接返回緩存數據 ValueOperations<String, String> operation = (ValueOperations<String, String>) redisTemplate.opsForValue(); Object cacheUser = operation.get(key); if (cacheUser != null) { Date date2 = new Date(); System.out.println(sdf.format(date2)+" 命中redis緩存"); return; } // TODO 防止併發重複寫緩存,加鎖 synchronized (randomUser) { // 若是緩存不存在查詢數據庫 List<User> user = userService.getUserByAccount(randomUser); if (user == null || user.size() == 0) { // 很容易發生鏈接池不夠用的狀況 HikariPool-1 - Connection is not available, request timed out after System.out.println(" Redis緩存不存在,查詢數據庫也不存在,發生緩存穿透!!!"); return; } // 將mysql數據庫查詢到的數據寫入到redis中 Date date3 = new Date(); System.out.println(sdf.format(date3)+" 從數據庫查詢並寫入Reids"); operation.set("Key:" + user.get(0).getAccount(), user.get(0).getAccount()); } } } }
布隆過濾器解決的問題是什麼?如何在海量元素中快速判斷一個元素是否存在。因此除了解決緩存穿透的問題以外,咱們還有不少其餘的用途。
好比爬數據的爬蟲,爬過的 url 咱們不須要重複爬,那麼在幾十億的 url 裏面,怎麼判斷一個 url 是否是已經爬過了?
還有咱們的郵箱服務器,發送垃圾郵件的帳號咱們把它們叫作 spamer,在這麼多的郵箱帳號裏面,怎麼判斷一個帳號是否是 spamer 等等一些場景,咱們均可以用到布隆過濾器。
若是你們想要實時關注我更新的文章以及分享的乾貨的話,能夠關注個人公衆號。