分佈式存儲-Redis實戰&常見問題解決

分佈式存儲-Redis實戰&常見問題解決

前面講了一些Redis的使用場景和數據類型,本章會聊到:redis

  • 一個抽獎的例子來闡述redis如何被應用去解決實際問題(代碼有點多,不適合放在博文中,如需請留言,我能夠發送),而且會使用到前面併發模塊聊的CountDownLatch和springBoot中的事件去異步緩存數據,和異步等待。
  • 常見的一些使用redis帶來的問題,好比緩存穿透、緩存雪崩、以及數據庫和redis中數據不一致的問題,和布隆過濾器的一些底層思想(位圖)
  • 經常使用的redis客戶端和他們的底層實現
  • 本身動手實現一個Redisclient

 Redis抽獎實現算法

總體流程:spring

  設計思路:當一個請求過來的時候,會攜帶一個活動id數據庫

  • 緩存獎品信息:咱們會使用這個活動id去查詢數據庫,並把查詢到的數據緩存在redis中(這個步驟是異步的,咱們用一個CountDownLatch去對他進行控制,緩存完成後,給count-1,後續須要redis中數據的流程就能夠繼續處理)
  • 開始抽獎:這是一個簡單的區間算法,在lottery_item中有對於每一個獎品的機率比。從redis中拿到全部的獎項,若是沒有則從數據庫中獲取(由於上面緩存的那一步驟是異步的,可能這個時候還有緩存成功)。咱們根據隨機數去落到咱們設置好的機率區間中(區間越大,抽到某個獎品的機率越大
  • 發放獎品:咱們的獎品類型不一樣  (lottery_prize#prize_type)根據不一樣獎品的類型,走不一樣的邏輯,好比咱們有的獎品要發送短信,有的獎品不要,咱們就定一個模板方法,而後不一樣類型的獎品走不一樣類型的發送邏輯。
    • 扣減庫存:咱們前面已經異步緩存了數據到redis中,那這裏直接使用incur的命令(以前說,這個命令是原子的,因此不會產生安全問題),咱們能夠先扣減redis中,而後進行數據的內存扣除 

 SpringBoot中使用方法(Lettuce)api

<dependency>
         <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
  redis:
    port: 6379
    host: ip
    lettuce:
      pool:
        max-active: -1
        max-idle: 2000
        max-wait: -1
        min-idle: 1
        time-between-eviction-runs: 5000
@Autowired
 RedisTemplate<String,String> redisTemplate;

Redis的客戶端緩存

常見的:Jedis、Redission、Lettuce(上面給的pom)安全

咱們發送一個命令(set n v)到redis上的時候,使用抓包工具發現數據結構

 

這裏的$*表示key的長度 ,好比:$3表示set的長度是3 *3 表示咱們傳遞了三個參數給redis多線程

那解析下來的命令就是:*3\r\n$3\r\nSET\r\n$1\r\nn\r\n$\r\nv,** 那是否是證實只要咱們符合這樣的編碼協議就能夠和redis交流了呢併發

定義get set 命令

public class CommandConstant {

    public static final String START="*";

    public static final String LENGTH="$";

    public static final String LINE="\r\n";

    //這裏提供兩個命令
    public enum CommandEnum{
        SET,
        GET
    }
}
View Code

封裝api

public class CustomerRedisClient {

    private CustomerRedisClientSocket customerRedisClientSocket;

    //鏈接的redis地址和端口
    public CustomerRedisClient(String host,int port) {
        customerRedisClientSocket=new CustomerRedisClientSocket(host,port);
    }

    //封裝一個api 發送指令
    public String set(String key,String value){
        //傳遞給redis,同時格式化成redis認識的數據
        customerRedisClientSocket.send(convertToCommand(CommandConstant.CommandEnum.SET,key.getBytes(),value.getBytes()));
        return customerRedisClientSocket.read(); //在等待返回結果的時候,是阻塞的
    }

    //獲取指令
    public String get(String key){
        customerRedisClientSocket.send(convertToCommand(CommandConstant.CommandEnum.GET,key.getBytes()));
        return customerRedisClientSocket.read();
    }

    //這裏按照redis的要求格式化 就是前面抓包後拿到的格式 *3\r\n$3\r\nSET\r\n$1\r\nn\r\n$\r\nv
    public static String convertToCommand(CommandConstant.CommandEnum commandEnum,byte[]... bytes){
        StringBuilder stringBuilder=new StringBuilder();
        stringBuilder.append(CommandConstant.START).append(bytes.length+1).append(CommandConstant.LINE);
        stringBuilder.append(CommandConstant.LENGTH).append(commandEnum.toString().length()).append(CommandConstant.LINE);
        stringBuilder.append(commandEnum.toString()).append(CommandConstant.LINE);
        for (byte[] by:bytes){
            stringBuilder.append(CommandConstant.LENGTH).append(by.length).append(CommandConstant.LINE);
            stringBuilder.append(new String(by)).append(CommandConstant.LINE);
        }
        return stringBuilder.toString();
    }
}
View Code

鏈接redis

public class CustomerRedisClientSocket {

    //這裏可使用nio
    private Socket socket;

    private InputStream inputStream;

    private OutputStream outputStream;

    public CustomerRedisClientSocket(String ip,int port){
        try {
            socket=new Socket(ip,port);
            inputStream=socket.getInputStream();
            outputStream=socket.getOutputStream();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    //發送指令
    public void send(String cmd){
        try {
            outputStream.write(cmd.getBytes());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    //讀取數據
    public String read(){
        byte[] bytes=new byte[1024];
        int count=0;
        try {
            count=inputStream.read(bytes);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return new String(bytes,0,count);
    }
}
View Code

測試

public class MainClient {
    public static void main(String[] args) {
        CustomerRedisClient customerRedisClient=new CustomerRedisClient("ip",6379);

        System.out.println(customerRedisClient.set("customer","Define"));
        System.out.println(customerRedisClient.get("customer"));
    }
}
View Code

結果 +ok(redis返回的成功報文) $6(返回的value是6的長度)

根據上面咱們本身實現的中間件,咱們能夠悟出,從這些層面選擇中間件:

  • 通訊層面的優化:當咱們獲取返回結果的時候是阻塞的(咱們本身實現的中間件)
  • 是否採用異步通訊:多線程(效率高)
  • 針對key和value的優化 :傳遞的報文越小,傳遞的速度確定更快
  • 鏈接的優化(鏈接池)

咱們發現redisson是提供這些功能作的比較好的,集成網上不少例子,這裏不聊了。

 使用redis中碰見的問題

數據庫和redis的數據一致性問題:實際上很難解決強一致性問題,常見有兩種操做

  • 先更新數據庫,再刪除緩存(刪除緩存就等於更新)推薦
    •  更新數據庫成功,可是刪除緩存失敗
      • 當數據庫更新成功後,把更新redis的消息放在mq中,這個時候必定能保證都更新成功。
      • 解析數據庫的binary log,而後更新緩存
  • 先刪除緩存,再更新數據庫(不推薦)
    • 刪除緩存成功,更新數據庫失敗(看起來沒有什麼問題,可是看下面的場景):線程A先去刪除一個key,線程B去獲取這個Key,發現沒有數據,那他就去更新Redis緩存,這個時候線程A去更新數據庫 。那就會致使數據庫的數據是最新的,可是緩存不是最新的

緩存雪崩

  • 緣由大量的熱點數據同時失效,由於設置了相同的過時時間,恰好這個時候請求量又很大,那這個時候壓力就到了數據庫上,從而就致使了雪崩
    • 【方案】  這是幾個設置過時的命令,(咱們能夠給key設置不一樣的過時時間,這樣就能有效地避免雪崩,或者熱點數據不設置過時時間 
      • expire key seconds # 設置鍵在給定秒後過時
        pexpire key milliseconds # 設置鍵在給定毫秒後過時
        expireat key timestamp # 到達指定秒數時間戳以後鍵過時
        pexpireat key timestamp # 到達指定毫秒數時間戳以後鍵過時
    • 【redis key 過時實現原理】想一下redis是如何實現過時的,若是咱們存儲的數據庫十分巨大,redis怎麼精確的知道那個key過時了?而且對他進行刪除呢?
      • 想法:咱們給去key每一個key設置一個定時器,一個個進行輪詢。性能太差了!!
      • Redis對過時key的作法:
        • 存儲:實際上redis使用了一個hash的結構進行存儲,對你設置的過時的key單獨用一個value存儲了一個過時時間
        • 刪除
          • 被動刪除:當咱們使用get命令的時候,他去查詢他存儲的咱們傳遞的過時時間和電腦時間對比,若是過時,則進行刪除
          • 主動刪除:隨機抽取20個key,刪除這20key中已通過期的key,若是發現這20個key中有20%的key已通過期了,那麼就再次抽取20個key,用這個方式循環。

【緩存穿透】:

  • 【緣由】:Redis和數據庫中都不存在查詢的數據,那這就是一次無效查詢,若是有人僞造了不少請求,那可能會引起數據庫宕機,由於redis中沒有數據,請求確定就請求到數據庫,這就叫緩存穿透
  • 【方案】:使用布隆過濾器  
    • 流程】:
      • 項目在啓動的時候,把全部的數據加載到布隆過濾器中
      • 當客戶端有請求過來時,先到布隆過濾器中查詢一下當前訪問的key是否存在,若是布隆過
        濾器中沒有該key,則不須要去數據庫查詢直接反饋便可
    • 【實現】:
      • 使用guava  
      • <dependency>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
                <version>21.0</version>
        </dependency>
        View Code
      • 程序初始的時候加載數據到布隆過濾器中
      • @Slf4j
        @Component
        public class BloomFilterDataLoadApplicationRunner implements ApplicationRunner {
        
            @Autowired
            ICityService cityService;
        
            @Override
            public void run(ApplicationArguments args) {
                List<City> cities=cityService.list();
                //第一個參數指的是要存儲的數據,第二個數據是容許的容錯率
                BloomFilter<String> bloomFilter=BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8),100000000,0.03);
                cities.parallelStream().forEach(city-> bloomFilter.put(RedisKeyConstants.CITY_KEY+":"+city.getId()));
                BloomFilterCache.cityBloom=bloomFilter;
            }
        }
        View Code
      • 客戶端訪問時增長驗證
      •     @GetMapping("/bloom/{id}")
            public String filter(@PathVariable("id")Integer id){
                String key=RedisKeyConstants.CITY_KEY+":"+id;
                if(BloomFilterCache.cityBloom.mightContain(key)){
                    return redisTemplate.opsForValue().get(key).toString();
                }
                return "數據不存在";
            }
            public class BloomFilterCache {
        
            public static BloomFilter<String> cityBloom;
        }
        View Code
    • 剖析布隆過濾器:布隆過濾器是一種空間利用率極高的數據結構,他的底層實際上並不存儲咱們緩存的元素的內容,而是存儲緩存元素的標記 都是0/1。好比:一個int類型是32位4個字節,32位意味着咱們能夠存儲32個0或者1,那我如今若是要存儲32個條數據,只須要一個int類型,到底這是怎麼作到的?底層用到了位圖
    • 一個例子解釋位圖:
      • 如今有32位 【0000 0000 0000 0000 0000 0000 0000 0000】
      • 好比存儲5這個數字 ->5 的二進制是101 【0000 0000 0000 0000 0000 0000 0010 1000】
      • 第二個數字是9 ->9的二進制是 1001      【0000 0000 0000 0000 0000 0010 0110 1000】
    • 布隆過濾器引入了多個函數去生成hash數值,咱們傳入的數據經過這些函數計算,則落入了這32位中。好比 有x y  z 三個函數,咱們傳入了一個的數據,經過這三個函數進行hash換算,落到了 32位中的5 6 9 ,那就把這5 6 9 這幾個地方變爲1,當咱們要查詢時候咱們以前傳遞的數據是否存在的時候,再次用這些函數進行換算,若是相關位置是1 則說明數據存在,不然數據則不存在。
    • 解析布隆過濾器的參數:傳遞的 100000000是咱們要構建多少個int類型(一個int類型能夠存儲32位),0.03是誤判率(誤判率指的就是對咱們傳遞的數據進行hash換算的函數)
    • BloomFilter<String> bloomFilter=BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8),100000000,0.03);
       
    • 不少地方都運用了這種思想,好比
      • Redis的HyperLogLog
        bitmap
        protobuf中的zigzag壓縮算法
        線程池中的線程狀態和線程數量(高低位擴容咱們以前聊過)
        ConcurrentHashMap中的數據遷移的線程數量存儲(線性探索咱們也聊過)

 

相關文章
相關標籤/搜索