前面講了一些Redis的使用場景和數據類型,本章會聊到:redis
- 一個抽獎的例子來闡述redis如何被應用去解決實際問題(代碼有點多,不適合放在博文中,如需請留言,我能夠發送),而且會使用到前面併發模塊聊的CountDownLatch和springBoot中的事件去異步緩存數據,和異步等待。
- 常見的一些使用redis帶來的問題,好比緩存穿透、緩存雪崩、以及數據庫和redis中數據不一致的問題,和布隆過濾器的一些底層思想(位圖)
- 經常使用的redis客戶端和他們的底層實現
- 本身動手實現一個Redisclient
Redis抽獎實現算法
總體流程:spring
設計思路:當一個請求過來的時候,會攜帶一個活動id數據庫
- 緩存獎品信息:咱們會使用這個活動id去查詢數據庫,並把查詢到的數據緩存在redis中(這個步驟是異步的,咱們用一個CountDownLatch去對他進行控制,緩存完成後,給count-1,後續須要redis中數據的流程就能夠繼續處理)
- 開始抽獎:這是一個簡單的區間算法,在lottery_item中有對於每一個獎品的機率比。從redis中拿到全部的獎項,若是沒有則從數據庫中獲取(由於上面緩存的那一步驟是異步的,可能這個時候還有緩存成功)。咱們根據隨機數去落到咱們設置好的機率區間中(區間越大,抽到某個獎品的機率越大)
- 發放獎品:咱們的獎品類型不一樣 (lottery_prize#prize_type)根據不一樣獎品的類型,走不一樣的邏輯,好比咱們有的獎品要發送短信,有的獎品不要,咱們就定一個模板方法,而後不一樣類型的獎品走不一樣類型的發送邏輯。
- 扣減庫存:咱們前面已經異步緩存了數據到redis中,那這裏直接使用incur的命令(以前說,這個命令是原子的,因此不會產生安全問題),咱們能夠先扣減redis中,而後進行數據的內存扣除
SpringBoot中使用方法(Lettuce)api
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>redis: port: 6379 host: ip lettuce: pool: max-active: -1 max-idle: 2000 max-wait: -1 min-idle: 1 time-between-eviction-runs: 5000
@Autowired RedisTemplate<String,String> redisTemplate;
Redis的客戶端緩存
常見的:Jedis、Redission、Lettuce(上面給的pom)安全
咱們發送一個命令(set n v)到redis上的時候,使用抓包工具發現數據結構
這裏的$*表示key的長度 ,好比:$3表示set的長度是3 *3 表示咱們傳遞了三個參數給redis多線程
那解析下來的命令就是:*3\r\n$3\r\nSET\r\n$1\r\nn\r\n$\r\nv,** 那是否是證實只要咱們符合這樣的編碼協議就能夠和redis交流了呢併發
定義get set 命令
View Codepublic class CommandConstant { public static final String START="*"; public static final String LENGTH="$"; public static final String LINE="\r\n"; //這裏提供兩個命令 public enum CommandEnum{ SET, GET } }封裝api
View Codepublic class CustomerRedisClient { private CustomerRedisClientSocket customerRedisClientSocket; //鏈接的redis地址和端口 public CustomerRedisClient(String host,int port) { customerRedisClientSocket=new CustomerRedisClientSocket(host,port); } //封裝一個api 發送指令 public String set(String key,String value){ //傳遞給redis,同時格式化成redis認識的數據 customerRedisClientSocket.send(convertToCommand(CommandConstant.CommandEnum.SET,key.getBytes(),value.getBytes())); return customerRedisClientSocket.read(); //在等待返回結果的時候,是阻塞的 } //獲取指令 public String get(String key){ customerRedisClientSocket.send(convertToCommand(CommandConstant.CommandEnum.GET,key.getBytes())); return customerRedisClientSocket.read(); } //這裏按照redis的要求格式化 就是前面抓包後拿到的格式 *3\r\n$3\r\nSET\r\n$1\r\nn\r\n$\r\nv public static String convertToCommand(CommandConstant.CommandEnum commandEnum,byte[]... bytes){ StringBuilder stringBuilder=new StringBuilder(); stringBuilder.append(CommandConstant.START).append(bytes.length+1).append(CommandConstant.LINE); stringBuilder.append(CommandConstant.LENGTH).append(commandEnum.toString().length()).append(CommandConstant.LINE); stringBuilder.append(commandEnum.toString()).append(CommandConstant.LINE); for (byte[] by:bytes){ stringBuilder.append(CommandConstant.LENGTH).append(by.length).append(CommandConstant.LINE); stringBuilder.append(new String(by)).append(CommandConstant.LINE); } return stringBuilder.toString(); } }鏈接redis
View Codepublic class CustomerRedisClientSocket { //這裏可使用nio private Socket socket; private InputStream inputStream; private OutputStream outputStream; public CustomerRedisClientSocket(String ip,int port){ try { socket=new Socket(ip,port); inputStream=socket.getInputStream(); outputStream=socket.getOutputStream(); } catch (IOException e) { e.printStackTrace(); } } //發送指令 public void send(String cmd){ try { outputStream.write(cmd.getBytes()); } catch (IOException e) { e.printStackTrace(); } } //讀取數據 public String read(){ byte[] bytes=new byte[1024]; int count=0; try { count=inputStream.read(bytes); } catch (IOException e) { e.printStackTrace(); } return new String(bytes,0,count); } }測試
View Codepublic class MainClient { public static void main(String[] args) { CustomerRedisClient customerRedisClient=new CustomerRedisClient("ip",6379); System.out.println(customerRedisClient.set("customer","Define")); System.out.println(customerRedisClient.get("customer")); } }結果 +ok(redis返回的成功報文) $6(返回的value是6的長度)
根據上面咱們本身實現的中間件,咱們能夠悟出,從這些層面選擇中間件:
- 通訊層面的優化:當咱們獲取返回結果的時候是阻塞的(咱們本身實現的中間件)
- 是否採用異步通訊:多線程(效率高)
- 針對key和value的優化 :傳遞的報文越小,傳遞的速度確定更快
- 鏈接的優化(鏈接池)
咱們發現redisson是提供這些功能作的比較好的,集成網上不少例子,這裏不聊了。
使用redis中碰見的問題
【數據庫和redis的數據一致性問題】:實際上很難解決強一致性問題,常見有兩種操做
- 先更新數據庫,再刪除緩存(刪除緩存就等於更新)推薦
- 更新數據庫成功,可是刪除緩存失敗
- 當數據庫更新成功後,把更新redis的消息放在mq中,這個時候必定能保證都更新成功。
- 解析數據庫的binary log,而後更新緩存
- 先刪除緩存,再更新數據庫(不推薦)
- 刪除緩存成功,更新數據庫失敗(看起來沒有什麼問題,可是看下面的場景):線程A先去刪除一個key,線程B去獲取這個Key,發現沒有數據,那他就去更新Redis緩存,這個時候線程A去更新數據庫 。那就會致使數據庫的數據是最新的,可是緩存不是最新的
【緩存雪崩】
- 【緣由】大量的熱點數據同時失效,由於設置了相同的過時時間,恰好這個時候請求量又很大,那這個時候壓力就到了數據庫上,從而就致使了雪崩。
- 【方案】 這是幾個設置過時的命令,(咱們能夠給key設置不一樣的過時時間,這樣就能有效地避免雪崩,或者熱點數據不設置過時時間 )
expire key seconds # 設置鍵在給定秒後過時pexpire key milliseconds # 設置鍵在給定毫秒後過時expireat key timestamp # 到達指定秒數時間戳以後鍵過時pexpireat key timestamp # 到達指定毫秒數時間戳以後鍵過時- 【redis key 過時實現原理】想一下redis是如何實現過時的,若是咱們存儲的數據庫十分巨大,redis怎麼精確的知道那個key過時了?而且對他進行刪除呢?
- 想法:咱們給去key每一個key設置一個定時器,一個個進行輪詢。性能太差了!!
- Redis對過時key的作法:
- 存儲:實際上redis使用了一個hash的結構進行存儲,對你設置的過時的key單獨用一個value存儲了一個過時時間
- 刪除:
- 被動刪除:當咱們使用get命令的時候,他去查詢他存儲的咱們傳遞的過時時間和電腦時間對比,若是過時,則進行刪除
- 主動刪除:隨機抽取20個key,刪除這20key中已通過期的key,若是發現這20個key中有20%的key已通過期了,那麼就再次抽取20個key,用這個方式循環。
【緩存穿透】:
- 【緣由】:Redis和數據庫中都不存在查詢的數據,那這就是一次無效查詢,若是有人僞造了不少請求,那可能會引起數據庫宕機,由於redis中沒有數據,請求確定就請求到數據庫,這就叫緩存穿透
- 【方案】:使用布隆過濾器
- 【流程】:
- 項目在啓動的時候,把全部的數據加載到布隆過濾器中
當客戶端有請求過來時,先到布隆過濾器中查詢一下當前訪問的key是否存在,若是布隆過濾器中沒有該key,則不須要去數據庫查詢直接反饋便可- 【實現】:
- 使用guava
View Code<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>21.0</version> </dependency>- 程序初始的時候加載數據到布隆過濾器中
View Code@Slf4j @Component public class BloomFilterDataLoadApplicationRunner implements ApplicationRunner { @Autowired ICityService cityService; @Override public void run(ApplicationArguments args) { List<City> cities=cityService.list(); //第一個參數指的是要存儲的數據,第二個數據是容許的容錯率 BloomFilter<String> bloomFilter=BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8),100000000,0.03); cities.parallelStream().forEach(city-> bloomFilter.put(RedisKeyConstants.CITY_KEY+":"+city.getId())); BloomFilterCache.cityBloom=bloomFilter; } }- 客戶端訪問時增長驗證
View Code@GetMapping("/bloom/{id}") public String filter(@PathVariable("id")Integer id){ String key=RedisKeyConstants.CITY_KEY+":"+id; if(BloomFilterCache.cityBloom.mightContain(key)){ return redisTemplate.opsForValue().get(key).toString(); } return "數據不存在"; } public class BloomFilterCache { public static BloomFilter<String> cityBloom; }- 剖析布隆過濾器:布隆過濾器是一種空間利用率極高的數據結構,他的底層實際上並不存儲咱們緩存的元素的內容,而是存儲緩存元素的標記 都是0/1。好比:一個int類型是32位4個字節,32位意味着咱們能夠存儲32個0或者1,那我如今若是要存儲32個條數據,只須要一個int類型,到底這是怎麼作到的?底層用到了位圖
- 一個例子解釋位圖:
- 如今有32位 【0000 0000 0000 0000 0000 0000 0000 0000】
- 好比存儲5這個數字 ->5 的二進制是101 【0000 0000 0000 0000 0000 0000 0010 1000】
- 第二個數字是9 ->9的二進制是 1001 【0000 0000 0000 0000 0000 0010 0110 1000】
- 布隆過濾器引入了多個函數去生成hash數值,咱們傳入的數據經過這些函數計算,則落入了這32位中。好比 有x y z 三個函數,咱們傳入了一個的數據,經過這三個函數進行hash換算,落到了 32位中的5 6 9 ,那就把這5 6 9 這幾個地方變爲1,當咱們要查詢時候咱們以前傳遞的數據是否存在的時候,再次用這些函數進行換算,若是相關位置是1 則說明數據存在,不然數據則不存在。
- 解析布隆過濾器的參數:傳遞的 100000000是咱們要構建多少個int類型(一個int類型能夠存儲32位),0.03是誤判率(誤判率指的就是對咱們傳遞的數據進行hash換算的函數)
BloomFilter<String> bloomFilter=BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8),100000000,0.03); 不少地方都運用了這種思想,好比
Redis的HyperLogLogbitmapprotobuf中的zigzag壓縮算法線程池中的線程狀態和線程數量(高低位擴容咱們以前聊過)ConcurrentHashMap中的數據遷移的線程數量存儲(線性探索咱們也聊過)