基於.NetCore的Redis5.0.3(最新版)快速入門、源碼解析、集羣搭建與SDK使用

原文: 基於.NetCore的Redis5.0.3(最新版)快速入門、源碼解析、集羣搭建與SDK使用

一、【基礎】redis能帶給咱們什麼福利

Redis(Remote Dictionary Server)官網:https://redis.io/html

Redis命令:https://redis.io/commandsnode

Redis is an open source (BSD licensed), in-memory data structure store, used as a database, cache and message broker. It supports data structures such as strings, hashes, lists, sets, sorted sets with range queries, bitmaps, hyperloglogs, geospatial indexes with radius queries and streams. Redis has built-in replication, Lua scripting, LRU eviction, transactions and different levels of on-disk persistence, and provides high availability via Redis Sentinel and automatic partitioning with Redis Cluster.
//-------------------------------------
Redis是一個開源(BSD許可),內存數據結構存儲,用做數據庫,緩存和消息代理。 它支持數據結構,如字符串,散列,列表,集合,帶有範圍查詢的排序集,位圖,超級日誌,具備半徑查詢和流的地理空間索引。 Redis具備內置複製,Lua腳本,LRU驅逐,事務和不一樣級別的磁盤持久性,並經過Redis Sentinel提供高可用性並使用Redis Cluster自動分區。

 

1.一、Redis前世此生

  1. 最開始使用本機內存的NativeCache(NativeCache沒法分佈式共享),隨着網站規模越大,咱們須要一個分佈式的緩存產品,Memcache誕生。
  2. 隨着memcache緩存大行其道,互聯網規模進一步擴大,對應用程序性能要求愈來愈高以及應用場景的愈來愈多 【09年】,好比內存數據庫,異構化消息隊列 等等,而原來市面上的memcache 暴露了如下幾個缺點:mysql

    1. memcache就是一個巨大的hash表,數據結構單一,咱們知道編程語言中數據結構類型衆多。
      數據結構類型:【List,HashSet, Dictionary, SortDictionary, BitArray, Queue, Stack, SortList。。。。】 
    2. memcache 沒法持久化,致使只能做爲緩存使用,重啓以後數據就會丟失。
    3. 沒法作到規模化的集羣,memcache可使用 一致性hash 的方式作到一個簡單的memcahce集羣,很是依賴於客戶端實現,也並不是無損的。
      set username  jack     hash(username)=8億 ,沿着順時針走,碰到的第一個server節點就是要存放的節點。。。
      linux

      因此咱們很是渴望有一個東西能夠解決上面三個問題,本身研發太費時費力,恰好redis就是爲了解決這些頭疼的問題。nginx

1.二、redis給咱們帶來了哪些福利

  • 概況
    能夠在redis官網上看到,目前redis支持的數據類型之多,很是豐富: 
    Redis數據類型 String Bitmap List(雙端隊列) Set Geo Hash HyperLogLogs Stream SortetSet(SkipList)
    C#數據類型 String BitArray (LinkedList+Stack+Queue+List) HashSet --- Dictionary ---  --- SortDictionary(紅黑樹)

 

 

  • 持久化
    使用AOF追加模式,RDB模式,以及混合模式,既然能緩存,就能夠當作一個memroy db使用。
    • AOF: 使用大量的操做命令進行數據恢復。
    • RDB: 內存快照磁盤化。
    • FixMode:混合兩種。
  • 集羣
    Redis自帶的Cluster集羣模式,Sentinel 和  第三方豌豆莢的Codis集羣搭建。

二、【搭建】使用centos和docker化快速部署

虛擬機CentOS7安裝步驟:http://www.javashuo.com/article/p-uxyckinu-cp.htmlgit

XShell6破解版:連接: https://pan.baidu.com/s/1YtnkN4_yAOU5Dc1j69ltrg 提取碼: nchp github

2.一、centos7平臺的部署

  • 安裝
    首先到Redis官網獲取Redis最新下載地址:http://download.redis.io/releases/redis-5.0.3.tar.gz
    而後在CentOS7上面進行安裝
    mkdir /data
    cd /data    
    wget http://download.redis.io/releases/redis-5.0.3.tar.gz
    tar xzf redis-5.0.3.tar.gz
    mv redis-5.0.3 redis
    cd redis
    make

    若是出現 gcc:命令未找到 ,安裝gcc並從新執行 make
    web

    yum -y install gcc automake autoconf libtool make
    //若是以上命令出現[Errno 256] No more mirrors to try.執行下面命令再從新安裝gcc
    yum clean all

    若是出現:致命錯誤:jemalloc/jemalloc.h:沒有那個文件或目錄,則執行下方命令
    redis

    make MALLOC=libc 
  • 這時候咱們查看是否成功安裝Redis(/data/redis/src/  目錄下有無redis-cli 與redis-server),並將它們拷貝到上級文件夾
    cd /data/redis/src/
    cp redis-cli ../
    cp redis-server ../
  • 啓動Redis
    [root@localhost src]# cd /data/redis/
    [root@localhost redis]# ./redis-server ./redis-conf

  • 查看端口
    netstat -tlnp

  • 測試存儲
    [root@localhost ~]# cd /data/redis/
    [root@localhost redis]# ./redis-cli 
    127.0.0.1:6379> set username jack
    OK
    127.0.0.1:6379> get username
    "jack"
    127.0.0.1:6379> dbsize
    (integer) 1
    127.0.0.1:6379> keys *
    1) "username"
  • 退出客戶端命令
    quit 
  • 配置Redis
    Redis啓動完成後是沒法進行外網訪問的,所以咱們須要修改redis.conf

    protect-mode 保護模式
    bind 綁定網卡接口算法

    bind 127.0.0.1   =>  bind 0.0.0.0
    protected-mode yes  =>  protected-mode no

    現實場景:redis是生產內網部署,對外不開放端口。。。

  • 須要密碼驗證(可選)
    修改redis.conf默認參數 # requirepass foobared
    鏈接以後命令 auth <password>
  • 修改文件存儲目錄rdb + logfile + aof(可選)
    • rdb 修改redis.conf默認參數 dir ./ 文件夾路徑
    • logfile 修改redis.conf默認參數  logfile "" 文件名稱,能夠改爲「redis.log」
  • 後臺執行 
    修改redis.conf默認參數 daemonize no ,改爲 daemonize yes 
    會生成pid文件 /var/run/redis_6379.pid 存放進程號
    [root@localhost redis]# ./redis-server ./redis.conf
    [root@localhost redis]# netstat -tlnp
    Active Internet connections (only servers)
    Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name    
    tcp        0      0 0.0.0.0:6379            0.0.0.0:*               LISTEN      66042/./redis-serve 
    tcp        0      0 0.0.0.0:111             0.0.0.0:*               LISTEN      1/systemd           
    tcp        0      0 0.0.0.0:6000            0.0.0.0:*               LISTEN      7748/X              
    tcp        0      0 192.168.122.1:53        0.0.0.0:*               LISTEN      7604/dnsmasq        
    tcp        0      0 0.0.0.0:22              0.0.0.0:*               LISTEN      7215/sshd           
    tcp        0      0 127.0.0.1:631           0.0.0.0:*               LISTEN      7217/cupsd          
    tcp        0      0 127.0.0.1:25            0.0.0.0:*               LISTEN      7432/master         
    tcp        0      0 127.0.0.1:6010          0.0.0.0:*               LISTEN      9283/sshd: root@pts 
    tcp        0      0 127.0.0.1:6011          0.0.0.0:*               LISTEN      11424/sshd: root@pt 
    tcp        0      0 127.0.0.1:6012          0.0.0.0:*               LISTEN      63727/sshd: root@pt 
    tcp6       0      0 :::111                  :::*                    LISTEN      1/systemd           
    tcp6       0      0 :::6000                 :::*                    LISTEN      7748/X              
    tcp6       0      0 :::21                   :::*                    LISTEN      9406/vsftpd         
    tcp6       0      0 :::22                   :::*                    LISTEN      7215/sshd           
    tcp6       0      0 ::1:631                 :::*                    LISTEN      7217/cupsd          
    tcp6       0      0 ::1:25                  :::*                    LISTEN      7432/master         
    tcp6       0      0 ::1:6010                :::*                    LISTEN      9283/sshd: root@pts 
    tcp6       0      0 ::1:6011                :::*                    LISTEN      11424/sshd: root@pt 
    tcp6       0      0 ::1:6012                :::*                    LISTEN      63727/sshd: root@pt 
    [root@localhost redis]# tail /var/run/redis_6379.pid 
    66042
    View Code

2.二、docker上進行部署

Docker安裝步驟:http://www.javashuo.com/article/p-uxyckinu-cp.html

  • 啓動Docker
    service docker start
  • 列出容器內容
    docker ps

    咱們能夠看到容器內是空的,咱們接下來前往DockerHub下載安裝redis(部份內容須要FQ)

  • 安裝端口並綁定端口
    我這裏是由於已經在虛擬機安裝了Redis,佔用了redis的6379端口,因此用外網6378端口映射docker6379端口
    安裝完成會自動啓動

    docker run --name some-redis -p 6378:6379 -d redis

    這時候在再查看Docker容器

    更復雜的配置,應該本身寫一個redis.conf,經過docker-compose 部署進去。而不是本身敲命令。
    dockerfile須要拷貝redis.conf

  • 移除docker中的redis
    docker kill 90b45b58a571
    docker rm 90b45b58a571

三、【SDK】C#的sdk快速操做和兩款可視化工具介紹

3.一、StackExchange.Redis

github地址:https://github.com/StackExchange/StackExchange.Redis/

使用文檔:https://stackexchange.github.io/StackExchange.Redis/

String的應用

web網站上保存用戶信息,模擬session。
netcore 中使用redis做爲分佈式session共享。 {框架集成}

Hash的應用 記錄每一個店鋪的數據庫鏈接串。(分庫的場景) key: shopid  value:connectionstring
Set的應用 判斷某一個用戶是否在黑名單中。 O(1)
List的應用 消息隊列  client -> 短信隊列 <- 發送處理程序   -> 運營商

 

  • 安裝
    Install-Package StackExchange.Redis
  • 使用示例
    class Program
    {
        static void Main(string[] args)
            {
                ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379");
                IDatabase db = redis.GetDatabase(0);
    
                //////cookie(ui,sessionid)
                //////redis(sessionid,userinfo)
                //db.StringSet("sessionid", "jack", TimeSpan.FromSeconds(5));
    
                //while (true)
                //{
                //    var info = db.StringGet("sessionid");
    
                //    Console.WriteLine(info);
    
                //    Thread.Sleep(1000);
                //}
    
                ////key: shopID  value: connectionstring
                //db.HashSet("connetions", "1", "mysql://192.168.1.1/mydb");
                //db.HashSet("connetions", "2", "mysql://192.168.1.2/mydb");
                //db.HashSet("connetions", "3", "mysql://192.168.1.3/mydb");
                //db.HashSet("connetions", "4", "mysql://192.168.1.4/mydb");
                //db.HashSet("connetions", "5", "mysql://192.168.1.5/mydb");
    
                //Console.WriteLine(db.HashGet("connetions", "3"));
    
    
                ////黑名單
                //db.SetAdd("blacklist", "1");
                //db.SetAdd("blacklist", "2");
                //db.SetAdd("blacklist", "3");
                //db.SetAdd("blacklist", "4");
    
                //var r = db.SetContains("blacklist", 40);
    
                ////消息隊列
                //db.ListLeftPush("sms", "18721073333");
                //db.ListLeftPush("sms", "18521073333");
                //db.ListLeftPush("sms", "18121073033");
    
                //Console.WriteLine(db.ListRightPop("sms"));
                //Console.WriteLine(db.ListRightPop("sms"));
                //Console.WriteLine(db.ListRightPop("sms"));
    
                Console.ReadKey();
            }
    }
    View Code
  • asp.net core使用redis存儲session
    Session是咱們在web開發中常用的對象,它默認是存在本機的,可是在ASP.NET Core中咱們能夠十分方便的將Session的存儲介質改成分佈式緩存(Redis)或者數據庫(SqlServer)。分佈式的緩存能夠提升ASP.NET Core 應用的性能和可伸縮性 ,尤爲是在託管在雲中或服務器場環境中
    • 添加引用

      Microsoft.Extensions.Caching.Redis
    • 配置服務
      public void ConfigureServices(IServiceCollection services)
      {
          ...
      
          //添加了redis做爲分佈式緩存
          services.AddDistributedRedisCache(option =>
          {
              option.InstanceName = "session";
              option.Configuration = "192.168.181.131:6379";
          });
      
          //添加session
          services.AddSession(options =>
          {
              //options.IdleTimeout = TimeSpan.FromMinutes(10); //session活期時間
              //options.Cookie.HttpOnly = true;//設爲httponly
          });
      
          ...
      }
      
      
      public void Configure(IApplicationBuilder app, IHostingEnvironment env)
      {
          ...
      
          //使用session
          app.UseSession();
      
          ...
      }
    • 設置session
      //using Microsoft.AspNetCore.Http;
      HttpContext.Session.SetString("userinfo", "jack");
    • 顯示數據
      @using Microsoft.AspNetCore.Http;
      @Context.Session.GetString("userinfo")
      @Context.Session.Id

3.二、可視化操做

四、【SDK】StackExchange強類型工具使用和本身動手封裝鏈接池

4.一、StackExchange.Redis的強類型擴展

爲何要使用強類型擴展?咱們能夠先看一段代碼:

class Program
{
    static void Main(string[] args)
    {
        ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379");
        IDatabase db = redis.GetDatabase(0);

        var userModel = new UserModel()
        {
            UserName = "jack",
            Email = "sdfasdf@qq.com",
            IsVip = true
        };
        db.StringSet("userinfo", JsonConvert.SerializeObject(userModel));
        var info = db.StringGet("userinfo");
        var model = JsonConvert.DeserializeObject<UserModel>(info);

        Console.ReadKey();
    }
}

public class UserModel
{
    public string UserName { get; set; }
    public string Email { get; set; }
    public bool IsVip { get; set; }
}
View Code

要存儲數據先要進行序列化成String,而後進行存儲,取出時又要進行反序列化,那麼有沒有更好的方式來處理這個問題呢? StackExchange.Redis.Extensions 爲咱們提供了很好的擴展

StackExchange.Redis.Extensions githun地址:https://github.com/imperugo/StackExchange.Redis.Extensions

  • 安裝
    Install-Package StackExchange.Redis.Extensions.Core
    Install-Package StackExchange.Redis.Extensions.Newtonsoft  //序列化方式
  • 使用
    var cacheClient = new StackExchangeRedisCacheClient(redis,new NewtonsoftSerializer());
    cacheClient.Add("userinfo", userModel);
    var model = cacheClient.Get<UserModel>("userinfo");
  • 完整代碼示例
    class Program
    {
        static void Main(string[] args)
        {
            ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379");
            IDatabase db = redis.GetDatabase(0);
    
            var userModel = new UserModel()
            {
                UserName = "jack",
                Email = "sdfasdf@qq.com",
                IsVip = true
            };
            db.StringSet("userinfo", JsonConvert.SerializeObject(userModel));
            var info = db.StringGet("userinfo");
            var model = JsonConvert.DeserializeObject<UserModel>(info);
    
    
    
            var cacheClient = new StackExchangeRedisCacheClient(redis,new NewtonsoftSerializer());
            cacheClient.Add("userinfo", userModel);
            model = cacheClient.Get<UserModel>("userinfo");
    
            Console.ReadKey();
        }
    }
    
    public class UserModel
    {
        public string UserName { get; set; }
        public string Email { get; set; }
        public bool IsVip { get; set; }
    }
    View Code
  • 缺點
    功能比底層要慢 + 功能要少。 暫時沒有Stream 

4.二、StackExchange.Redis鏈接問題

4.2.一、Socket鏈接過多的問題致使sdk掛掉

  • 緣由描述:做爲實例變量,會有什麼後果。。。 若是每次調用都new一下,會有太多的socket。。。 頻繁的打開和關閉。。

  • 解決辦法:
    • 全局惟一的connection
    • 本身定義connection鏈接池

4.2.二、自定義connection鏈接池

  • 建立鏈接池 RedisConnectionPool.cs
    public class RedisConnectionPool
        {
            private static ConcurrentQueue<ConnectionMultiplexer> connectionPoolQueue = new ConcurrentQueue<ConnectionMultiplexer>();
    
            private static int minConnectionNum;
            private static int maxConnectionNum;
    
            private static string host;
            private static int port;
    
            //經過構造函數 或者 config形式 獲取 max,min host,port
            public static void InitializeConnectionPool()
            {
                minConnectionNum = 10;
    
                maxConnectionNum = 100;
    
                host = "192.168.181.131";
                port = 6379;
    
                for (int i = 0; i < minConnectionNum; i++)
                {
                    var client = OpenConnection(host, port);
    
                    PushConnection(client);
                }
    
                Console.WriteLine($"{0} 個 connection 初始化完畢!");
            }
    
            /*
             * 1. 若是說池中沒有connection了,那麼你須要OpenConnection
             * 
             * 2. 若是池中獲取到了connection,而且isConnected=false,那麼直接close
             * 
             */
            public static ConnectionMultiplexer GetConnection()
            {
                while (connectionPoolQueue.Count > 0)
                {
                    connectionPoolQueue.TryDequeue(out ConnectionMultiplexer client);
    
                    if (!client.IsConnected)
                    {
                        client.Close();
                        continue;
                    }
    
                    return client;
                }
    
                return OpenConnection(host, port);
            }
    
            /// <summary>
            /// 1.  若是 queue的個數 >=max 直接踢掉
            /// 
            /// 2. client的IsConnected 若是爲false, close 
            /// </summary>
            /// <param name="client"></param>
            /// <returns></returns>
    
            public static void PushConnection(ConnectionMultiplexer client)
            {
                if (connectionPoolQueue.Count >= maxConnectionNum)
                {
                    client.Close();
                    return;
                }
    
                if (!client.IsConnected)
                {
                    client.Close();
                    return;
                }
    
                connectionPoolQueue.Enqueue(client);
            }
    
            public static ConnectionMultiplexer OpenConnection(string host, int port)
            {
                ConnectionMultiplexer client = ConnectionMultiplexer.Connect($"{host}:{port}");
                return client;
            }
        }
    View Code
  • 使用方法
    RedisConnectionPool.InitializeConnectionPool();
    for (int m = 0; m < 1000000; m++)
    {
        ConnectionMultiplexer client = null;
    
        try
        {
            client = RedisConnectionPool.GetConnection();
    
            var db = client.GetDatabase(0);
    
            db.StringSet("username", "jack");
    
            Console.WriteLine(db.StringGet("username") + " " + m);
        }
        finally
        {
            if (client != null)
            {
                RedisConnectionPool.PushConnection(client);
            }
        }
        //ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379");
        //Console.WriteLine(m);
    }
    View Code

五、【內存結構】閱讀redis源碼中的五大基礎對象

源碼由Redis官方下載下來並解壓,而後用VS2017打開,源碼在src文件夾下,redis存儲結構:

  • RedisServer
    源碼位置: src/server.h 
     redisServer 包含16個 redisDb 在 src/server.c 的 mian() 構造函數中,查看 void initServer(void) ,能夠看到建立16個DB

    咱們能夠看到 server.dbnum 默認值爲16

     

  • RedisDb
    源碼位置: src/server.h 
    咱們能夠看到 dict *dict 數據字典,過時時間,長度等等

     

  • redisObject
    源碼位置: src/server.h 

    咱們能夠看到有個 *ptr 屬性,指向 sds(sds.h)、quicklist(quicklist.h)、dict(dict.h)、rax(rax.h) 
    能夠在redis-cli中查看redisObject屬性
  • sds
    源碼位置: sds.h 

    sds => char[] 中了一個封裝,把內存優化到了極致
    typedef char *sds;
    
    /* Note: sdshdr5 is never used, we just access the flags byte directly.
     * However is here to document the layout of type 5 SDS strings. */
    struct __attribute__ ((__packed__)) sdshdr5 {
        unsigned char flags; /* 3 lsb of type, and 5 msb of string length */
        char buf[];
    };
    struct __attribute__ ((__packed__)) sdshdr8 {
        uint8_t len; /* used */
        uint8_t alloc; /* excluding the header and null terminator */
        unsigned char flags; /* 3 lsb of type, 5 unused bits */
        char buf[];
    };
    struct __attribute__ ((__packed__)) sdshdr16 {
        uint16_t len; /* used */
        uint16_t alloc; /* excluding the header and null terminator */
        unsigned char flags; /* 3 lsb of type, 5 unused bits */
        char buf[];
    };
    struct __attribute__ ((__packed__)) sdshdr32 {
        uint32_t len; /* used */
        uint32_t alloc; /* excluding the header and null terminator */
        unsigned char flags; /* 3 lsb of type, 5 unused bits */
        char buf[];
    };
    struct __attribute__ ((__packed__)) sdshdr64 {
        uint64_t len; /* used */
        uint64_t alloc; /* excluding the header and null terminator */
        unsigned char flags; /* 3 lsb of type, 5 unused bits */
        char buf[];
    };
    View Code

     

  • redisClient
    源碼位置: src/server.h ,包含三大重要參數:

    • redisDb *db  要進行操做的數據庫

    • int argc  命令的數量
    • robj **argv  命令的全部參數

    • 查詢示例

      set name jack
      
      ↓↓↓↓↓↓↓
      
      argv[0]=set
      argv[1]=name
      argv[2]=jack
      
      ↓↓↓↓↓↓↓
      
      commandTables 
      
      [
        {set => setCommand}
        {get => getCommand}
      ]

六、【String】字符串命令介紹和源碼閱讀及秒殺和防重驗證sdk實踐

6.一、String中常見命令詳解

Redis中String命令:https://redis.io/commands#string

Redis命令 incr decr incrby decrby
C#命令 ++ -- Interlocked.Incrment Interlocked.Decrement
命令示例 redis> SET mykey "10"
"OK"
redis> INCR mykey
(integer) 11
redis> GET mykey
"11"
redis> SET mykey "10"
"OK"
redis> DECR mykey
(integer) 9
redis> SET mykey "234293482390480948029348230948"
"OK"
redis> DECR mykey
ERR ERR value is not an integer or out of range
redis> SET mykey "10"
"OK"
redis> INCRBY mykey 5
(integer) 15
redis> SET mykey "10"
"OK"
redis> DECRBY mykey 3
(integer) 7

 

 

 

 

 

 

 

 

 

  • incr命令的應用場景:【簡單的解決秒殺問題】 

    庫存:1000   人數:10w

    購買:3000 只放3000進來。

    購買:1000

    待付款減庫存,仍是購買成功減庫存,這是業務的事情!

    用max來進行人員的過濾。。。
    簡單示例:

    class Program
    {
        static void Main(string[] args)
        {
            ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379");
    
            IDatabase db = redis.GetDatabase(0);
            while (true)
            {
                var num = db.StringIncrement("max1");
                if (num>3000)
                {
                    Console.WriteLine("當前請求>3000");
                    break;
                }
                Console.WriteLine(num);
            }
            Console.ReadKey();
        }
    }
    View Code

     

  • SetNx + Expire,Set 
    應用場景:解決訂單場景中的重複提交問題。  【SetNx=Set if Not eXists】若是key存在,那麼value不進行復制。。。

    setnx token 12345 (處理成功)
    setnx token 12345 (處理失敗)

    EXPIRE設置過時時間

    轉化成Set

    說明10秒以內重複SET是不被容許的
    c#代碼示例:

    class Program
    {
        static void Main(string[] args)
        {
            ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379");
    
            IDatabase db = redis.GetDatabase(0);
            while (true)
            {
                var b = db.StringSet("token", "123456", TimeSpan.FromSeconds(10), When.NotExists);
                Console.WriteLine(b);
    
                Thread.Sleep(TimeSpan.FromSeconds(2));
            }
            Console.ReadKey();
        }
    }
    View Code

     

6.二、源碼解讀

本篇示例解讀incr,其餘請自行參照本篇解讀

咱們首先查看 src/server.h 中的 redisCommand ,找到 incr 對應的 incrCommand ,而後定位到 t_string.c 

void incrCommand(client *c) {
    incrDecrCommand(c,1);//這裏能夠看到是+1
}

而後找到 incrDecrCommand 的定義方法

void incrDecrCommand(client *c, long long incr) {
    long long value, oldvalue;
    robj *o, *new;

    o = lookupKeyWrite(c->db,c->argv[1]);
    if (o != NULL && checkType(c,o,OBJ_STRING)) return;
    if (getLongLongFromObjectOrReply(c,o,&value,NULL) != C_OK) return;

    oldvalue = value;
    if ((incr < 0 && oldvalue < 0 && incr < (LLONG_MIN-oldvalue)) ||
        (incr > 0 && oldvalue > 0 && incr > (LLONG_MAX-oldvalue))) {
        addReplyError(c,"increment or decrement would overflow");
        return;
    }
    value += incr;

    if (o && o->refcount == 1 && o->encoding == OBJ_ENCODING_INT &&
        (value < 0 || value >= OBJ_SHARED_INTEGERS) &&
        value >= LONG_MIN && value <= LONG_MAX)
    {
        new = o;
        o->ptr = (void*)((long)value);
    } else {
        new = createStringObjectFromLongLongForValue(value);
        if (o) {
            dbOverwrite(c->db,c->argv[1],new);
        } else {
            dbAdd(c->db,c->argv[1],new);
        }
    }
    signalModifiedKey(c->db,c->argv[1]);
    notifyKeyspaceEvent(NOTIFY_STRING,"incrby",c->argv[1],c->db->id);
    server.dirty++;
    addReply(c,shared.colon);
    addReply(c,new);
    addReply(c,shared.crlf);
}
View Code

 

七、【String】位圖命令介紹和黑名單場景應用

7.一、bitmap思想 

  • 場景示例
    customerid: 1-32 都是黑名單用戶,那麼如何更省內存的存儲。 
    HashSet<int> hashSet=new HashSet<int>();
    hashSet.Add(customerid=1)
    ...
    hashSet.Add(customerid=32)
    • int類型存儲:32bit  * 32 = 1024bit
    • byte類型存儲:8bit * 32 =  256bit
    • bitmap類型存儲:1個int  = 32bit

      customerid 做爲 數組的 position

      0,1 標識 標識 該 position 是否擁有值。。。

  • 重要場景
    若是用戶有500W,其中100W是刷單用戶。
    若是某個店鋪的刷單用戶<10W,則可使用 set 
    若是某個店鋪的刷單用戶>10W,則要使用 bitmap 
  •  bitmap 主要適用於比較小的狀況,若是key=21億,那麼要產生21億/32=幾千萬個int
    普通模式只要一個int就能夠了

 7.二、setbit, getbit, bitcount 的使用

  • setbit:設置當前position究竟是0仍是1
  • getbit:獲取當前position的value。
  • bitcount: 判斷當前有多少黑名單用戶
  • redis-cli示例:
    192.168.181.131:0>setbit blacklist 1 1    //key=1黑名單
    "0"
    
    192.168.181.131:0>setbit blacklist 2 0    //key=2不是黑名單
    "0"
    
    192.168.181.131:0>setbit blacklist 3 0
    "0"
    
    192.168.181.131:0>setbit blacklist 4 1
    "0"
    
    192.168.181.131:0>getbit blacklist 2    //查詢是不是黑名單
    "0"
    
    192.168.181.131:0>getbit blacklist 4    //查詢是不是黑名單
    "1"
    
    192.168.181.131:0>bitcount blacklist    //查詢黑名單數量
    "2"
    View Code 
  • SDK示例:
    class Program
    {
        static void Main(string[] args)
        {
            ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379");
    
            IDatabase db = redis.GetDatabase(0);
    
            db.StringSetBit("blacklist", 1, true);
            db.StringSetBit("blacklist", 4, true);
    
            Console.WriteLine(db.StringGetBit("blacklist", 1));
            Console.WriteLine(db.StringGetBit("blacklist", 2));
            Console.WriteLine(db.StringGetBit("blacklist", 3));
            Console.WriteLine(db.StringGetBit("blacklist", 4));
    
            Console.ReadKey();
        }
    }
    View Code

  • C#非SDK實現:
    class Program
    {
        static void Main(string[] args)
        {
            BitArray bitArray=new BitArray(8);
    
            bitArray[1] = true;
            bitArray[4] = true;
    
            Console.WriteLine(bitArray[1]);
            Console.WriteLine(bitArray[2]);
            Console.WriteLine(bitArray[3]);
            Console.WriteLine(bitArray[4]);
    
            Console.ReadKey();
        }
    }
    View Code

八、【List】經常使用命令介紹及源碼閱讀和sdk使用

redis命令(List):https://redis.io/commands#list

8.一、List

  • 鏈表結構解析
    List是無環雙向列表,相鄰節點的查找的複雜度未O(1),以下圖所示
  • 常見方法
    lpush(左進),rpop(右出) ,rpush,lpop。這四種方法,能夠做爲堆棧(Stack)和鏈表(LinkList)使用

    lpush,rpop 這就是隊列
    lpush,lpop 這就是堆棧 (括號的語法檢查)

8.二、阻塞版的 bxxx

獲取隊列數據的時方法:

  • 寫一個死循環(sleep(10ms))  消息致使cpu太高
  • 若是隊列沒有數據,那麼線程卡住(卡住客戶端),一直等待獲取(阻塞)
    192.168.181.131:0>lpush sms 1
    "1"
    
    192.168.181.131:0>lpush sms 2
    "2"
    
    192.168.181.131:0>lpush sms 3
    "3"
    
    192.168.181.131:0>llen sms
    "3"
    
    192.168.181.131:0>blpop sms 0
     1)  "sms"
     2)  "3"
    192.168.181.131:0>blpop sms 0
     1)  "sms"
     2)  "2"
    192.168.181.131:0>blpop sms 0
     1)  "sms"
     2)  "1"
    192.168.181.131:0>blpop sms 0
    Connection error:Execution timeout
    View Code

8.三、Sdk實踐

阻塞和非阻塞 對比一下。。(代碼中控制進行非阻塞)

class Program
{
    static void Main(string[] args)
    {
        ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379");

        IDatabase db = redis.GetDatabase(0);

        db.ListLeftPush("sms", 1);
        db.ListLeftPush("sms", 2);
        db.ListLeftPush("sms", 3);
        db.ListLeftPush("sms", 4);
        while (true)
        {
            var info = db.ListLeftPop("sms");
            Console.WriteLine(info);
            Thread.Sleep(1000);
        }

        Console.ReadKey();
    }
}
View Code

8.四、源碼解讀

咱們首先查看 src/quicklist.h 中的 quicklist 咱們能夠看到由 quicklistNode  組成,包含頭指針和尾指針

typedef struct quicklist {
    quicklistNode *head;
    quicklistNode *tail;
    unsigned long count;        /* total count of all entries in all ziplists */
    unsigned long len;          /* number of quicklistNodes */
    int fill : 16;              /* fill factor for individual nodes */
    unsigned int compress : 16; /* depth of end nodes not to compress;0=off */
} quicklist;

而後咱們查看 quicklistNode 能夠看到 quicklistNode 爲當前節點,包含前節點和後一節點

typedef struct quicklistNode {
    struct quicklistNode *prev;
    struct quicklistNode *next;
    unsigned char *zl;
    unsigned int sz;             /* ziplist size in bytes */
    unsigned int count : 16;     /* count of items in ziplist */
    unsigned int encoding : 2;   /* RAW==1 or LZF==2 */
    unsigned int container : 2;  /* NONE==1 or ZIPLIST==2 */
    unsigned int recompress : 1; /* was this node previous compressed? */
    unsigned int attempted_compress : 1; /* node can't compress; too small */
    unsigned int extra : 10; /* more bits to steal for future usage */
} quicklistNode;

咱們接下來查看 LLEN 命令的源碼,咱們首先查看 src/server.c 中的 redisCommand ,找到 llen 對應的 llenCommand ,而後定位到 t_list.c 

void llenCommand(client *c) {
    robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero);
    if (o == NULL || checkType(c,o,OBJ_LIST)) return;
    addReplyLongLong(c,listTypeLength(o));
}

咱們接下來查看 listTypeLength 方法

unsigned long listTypeLength(const robj *subject) {
    if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
        return quicklistCount(subject->ptr);
    } else {
        serverPanic("Unknown list encoding");
    }
}

接下來查看 quicklistCount 方法

/* Return cached quicklist count */
unsigned long quicklistCount(const quicklist *ql) { return ql->count; }

因而咱們能夠看出 LLEN 命令實際上獲取的是 ptr 指針指向的 count 

 

咱們接下來再看下 LPUSH 命令,咱們仍是要先查看 src/server.c 中的 lpush 對應的 lpushCommand 命令、

void lpushCommand(client *c) {
    pushGenericCommand(c,LIST_HEAD);
}

而後查看 pushGenericCommand 方法

void pushGenericCommand(client *c, int where) {
    int j, pushed = 0;
    robj *lobj = lookupKeyWrite(c->db,c->argv[1]);

    if (lobj && lobj->type != OBJ_LIST) {
        addReply(c,shared.wrongtypeerr);
        return;
    }

    for (j = 2; j < c->argc; j++) {
        if (!lobj) {
            lobj = createQuicklistObject();
            quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size,
                                server.list_compress_depth);
            dbAdd(c->db,c->argv[1],lobj);
        }
        listTypePush(lobj,c->argv[j],where);
        pushed++;
    }
    addReplyLongLong(c, (lobj ? listTypeLength(lobj) : 0));
    if (pushed) {
        char *event = (where == LIST_HEAD) ? "lpush" : "rpush";

        signalModifiedKey(c->db,c->argv[1]);
        notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
    }
    server.dirty += pushed;
}

而後查看  listTypePush 方法

void listTypePush(robj *subject, robj *value, int where) {
    if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
        int pos = (where == LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
        value = getDecodedObject(value);
        size_t len = sdslen(value->ptr);
        quicklistPush(subject->ptr, value->ptr, len, pos);
        decrRefCount(value);
    } else {
        serverPanic("Unknown list encoding");
    }
}

而後查看 quicklistPush 方法,能夠看到加入了頭或者尾

/* Wrapper to allow argument-based switching between HEAD/TAIL pop */
void quicklistPush(quicklist *quicklist, void *value, const size_t sz,
                   int where) {
    if (where == QUICKLIST_HEAD) {
        quicklistPushHead(quicklist, value, sz);
    } else if (where == QUICKLIST_TAIL) {
        quicklistPushTail(quicklist, value, sz);
    }
}

咱們能夠查看一下 quicklistPushHead ,能夠看到count進行可+1

/* Add new entry to head node of quicklist.
 *
 * Returns 0 if used existing head.
 * Returns 1 if new head created. */
int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) {
    quicklistNode *orig_head = quicklist->head;
    if (likely(
            _quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) {
        quicklist->head->zl =
            ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD);
        quicklistNodeUpdateSz(quicklist->head);
    } else {
        quicklistNode *node = quicklistCreateNode();
        node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);

        quicklistNodeUpdateSz(node);
        _quicklistInsertNodeBefore(quicklist, quicklist->head, node);
    }
    quicklist->count++;
    quicklist->head->count++;
    return (orig_head != quicklist->head);
}

九、【Hash】哈希命令介紹和分庫鏈接串存儲及源碼閱讀

9.一、Hash的底層結構

redis的哈希對象的底層存儲可使用ziplist(壓縮列表)和hashtable。當hash對象能夠同時知足一下兩個條件時,哈希對象使用ziplist編碼。

  • 哈希對象保存的全部鍵值對的鍵和值的字符串長度都小於64字節
  • 哈希對象保存的鍵值對數量小於512個

redis的hash架構就是標準的hashtab的結構,經過掛鏈解決衝突問題。

類比成C#:

Dictionary<string,string> dict=new Dictionary<string,string>();
dict.Add("username","jack");
//假設hash(username) = 100
//table[100]=dictEntry(username,jack,next  ) => model
dict.Add("password","12345");
//假設hash(password) = 100

//hash衝突進行掛鏈
//table[100]= dictEntry(pasword,12345,next  ) ->  dictEntry(username,jack,next  ) 
var info= dict["username"];
info=jack;

能夠看出next的做用是將衝突的hash進行掛鏈

9.二、使用經常使用的hash命令

Hash命令地址:https://redis.io/commands#hash

經常使用的Hash命令:hset,hget,hdel,hlen,hexists,hkeys,hvals,hgetall

  • 命令簡單使用
    127.0.0.1:6379> flushdb
    OK
    127.0.0.1:6379> hset conn 1 mysql://1
    (integer) 1
    127.0.0.1:6379> hset conn 2 mysql://2
    (integer) 1
    127.0.0.1:6379> hlen conn
    (integer) 2
    127.0.0.1:6379> hexists conn 2
    (integer) 1
    127.0.0.1:6379> hexists conn 3
    (integer) 0
    127.0.0.1:6379> hget conn 2
    "mysql://2"
    127.0.0.1:6379> hdel conn 2
    (integer) 1
    127.0.0.1:6379> hlen conn
    (integer) 1
    127.0.0.1:6379> hset conn 3 mysql://3
    (integer) 1
    127.0.0.1:6379> hlen conn
    (integer) 2
    127.0.0.1:6379> hkeys conn
    1) "1"
    2) "3"
    127.0.0.1:6379> hvals conn
    1) "mysql://1"
    2) "mysql://3"
    127.0.0.1:6379> hgetall conn
    1) "1"
    2) "mysql://1"
    3) "3"
    4) "mysql://3"
    View Code

9.三、SDK的使用

class Program
{
    static void Main(string[] args)
    {
        ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379");

        IDatabase db = redis.GetDatabase(0);

        db.HashSet("conn", 10, "mysql://10");

        var info = db.HashGet("conn", 10);

        Console.WriteLine(info);

        var len = db.HashLength("conn");

        Console.WriteLine(len);

        var arr = db.HashKeys("conn");

        Console.WriteLine(string.Join(",", arr));

        Console.ReadKey();
    }
}
View Code

9.四、源碼解讀

咱們首先查看 src/dict.h 頭文件中的 dict 咱們能夠看到由數組結構 dictht 組成

typedef struct dict {
    dictType *type;
    void *privdata;
    dictht ht[2];
    long rehashidx; /* rehashing not in progress if rehashidx == -1 */
    unsigned long iterators; /* number of iterators currently running */
} dict;

而後咱們查看 dictht 包含 dictEntry 

/* This is our hash table structure. Every dictionary has two of this as we
 * implement incremental rehashing, for the old to the new table. */
typedef struct dictht {
    dictEntry **table;
    unsigned long size;    //開闢的大小空間
    unsigned long sizemask;    //求餘使用
    unsigned long used;    //實際使用的大小空間
} dictht;

而後咱們查看 dictEntry 、

typedef struct dictEntry {
    void *key;
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
        double d;
    } v; 
    struct dictEntry *next;    //掛鏈使用
} dictEntry;

 

接下來咱們查看 HLEN 命令,咱們仍是要先查看 src/server.c 中的 hlen 對應的 hlenCommand 命令

void hlenCommand(client *c) {
    robj *o;

    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
        checkType(c,o,OBJ_HASH)) return;

    addReplyLongLong(c,hashTypeLength(o));
}

而後咱們接下來查看獲取hash對象長度的 hashTypeLength 方法

/* Return the number of elements in a hash. */
unsigned long hashTypeLength(const robj *o) {
    unsigned long length = ULONG_MAX;

    if (o->encoding == OBJ_ENCODING_ZIPLIST) {
        length = ziplistLen(o->ptr) / 2;
    } else if (o->encoding == OBJ_ENCODING_HT) {
        length = dictSize((const dict*)o->ptr);
    } else {
        serverPanic("Unknown hash encoding");
    }
    return length;
}

而後查看 dictSize 方法查看計算邏輯

#define dictSize(d) ((d)->ht[0].used+(d)->ht[1].used)

能夠看到將兩個 dictht 數組中的 used 相加,獲得 hlen 結果

 

咱們接下來再看下 HSET 命令,咱們仍是要先查看 src/server.c 中的 hset 對應的 hsetCommand 命令

void hsetCommand(client *c) {
    int i, created = 0;
    robj *o;

    if ((c->argc % 2) == 1) {
        addReplyError(c,"wrong number of arguments for HMSET");
        return;
    }

    if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
    hashTypeTryConversion(o,c->argv,2,c->argc-1);

    for (i = 2; i < c->argc; i += 2) //遍歷hset的兩個參數
        created += !hashTypeSet(o,c->argv[i]->ptr,c->argv[i+1]->ptr,HASH_SET_COPY);

    /* HMSET (deprecated) and HSET return value is different. */
    char *cmdname = c->argv[0]->ptr;
    if (cmdname[1] == 's' || cmdname[1] == 'S') {
        /* HSET */
        addReplyLongLong(c, created);
    } else {
        /* HMSET */
        addReply(c, shared.ok);
    }
    signalModifiedKey(c->db,c->argv[1]);
    notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[1],c->db->id);
    server.dirty++;
}

而後咱們查看 hashTypeSet 方法

int hashTypeSet(robj *o, sds field, sds value, int flags) {
    int update = 0;
    //判斷是不是壓縮類型
    if (o->encoding == OBJ_ENCODING_ZIPLIST) {
        unsigned char *zl, *fptr, *vptr;

        zl = o->ptr;
        fptr = ziplistIndex(zl, ZIPLIST_HEAD);
        if (fptr != NULL) {
            fptr = ziplistFind(fptr, (unsigned char*)field, sdslen(field), 1);
            if (fptr != NULL) {
                /* Grab pointer to the value (fptr points to the field) */
                vptr = ziplistNext(zl, fptr);
                serverAssert(vptr != NULL);
                update = 1;

                /* Delete value */
                zl = ziplistDelete(zl, &vptr);

                /* Insert new value */
                zl = ziplistInsert(zl, vptr, (unsigned char*)value,
                        sdslen(value));
            }
        }

        if (!update) {
            /* Push new field/value pair onto the tail of the ziplist */
            zl = ziplistPush(zl, (unsigned char*)field, sdslen(field),
                    ZIPLIST_TAIL);
            zl = ziplistPush(zl, (unsigned char*)value, sdslen(value),
                    ZIPLIST_TAIL);
        }
        o->ptr = zl;

        /* Check if the ziplist needs to be converted to a hash table */
        if (hashTypeLength(o) > server.hash_max_ziplist_entries)
            hashTypeConvert(o, OBJ_ENCODING_HT);
    } else if (o->encoding == OBJ_ENCODING_HT) {
        dictEntry *de = dictFind(o->ptr,field);//hash(field)=int查看dictEntry是否有這個position
        if (de) {
            sdsfree(dictGetVal(de));
            if (flags & HASH_SET_TAKE_VALUE) {
                dictGetVal(de) = value;
                value = NULL;
            } else {
                dictGetVal(de) = sdsdup(value);
            }
            update = 1;
        } else {
            sds f,v;
            if (flags & HASH_SET_TAKE_FIELD) {
                f = field;
                field = NULL;
            } else {
                f = sdsdup(field);
            }
            if (flags & HASH_SET_TAKE_VALUE) {
                v = value;
                value = NULL;
            } else {
                v = sdsdup(value);
            }
            dictAdd(o->ptr,f,v);
        }
    } else {
        serverPanic("Unknown hash encoding");
    }

    /* Free SDS strings we did not referenced elsewhere if the flags
     * want this function to be responsible. */
    if (flags & HASH_SET_TAKE_FIELD && field) sdsfree(field);
    if (flags & HASH_SET_TAKE_VALUE && value) sdsfree(value);
    return update;
}

而後查看 dictFind 方法

static dictEntry *dictFind(dict *ht, const void *key) {
    dictEntry *he;
    unsigned int h;

    if (ht->size == 0) return NULL;
    h = dictHashKey(ht, key) & ht->sizemask;//求餘取hash值
    he = ht->table[h];//到table中進行查找
    while(he) {//若是存在則還要進行掛鏈查找
        if (dictCompareHashKeys(ht, key, he->key))
            return he;
        he = he->next;
    }
    return NULL;
}

而後查看 dictHashKey 方法

#define dictHashKey(ht, key) (ht)->type->hashFunction(key)

咱們查看 dictAdd 方法

/* Add an element to the target hash table */
static int dictAdd(dict *ht, void *key, void *val) {
    int index;
    dictEntry *entry;

    /* Get the index of the new element, or -1 if
     * the element already exists. */
    if ((index = _dictKeyIndex(ht, key)) == -1)
        return DICT_ERR;

    /* Allocates the memory and stores key */
    entry = malloc(sizeof(*entry));//後進來的放在前面
    entry->next = ht->table[index];
    ht->table[index] = entry;//將實體放在table的對應索引中去

    /* Set the hash entry fields. */
    dictSetHashKey(ht, entry, key);
    dictSetHashVal(ht, entry, val);
    ht->used++;        //最終將used++
    return DICT_OK;
}

十、【Set,HyperLogLog】經常使用命令介紹和sdk使用

10.一、理解Set的底層數據結構

Set 應用場景: 黑名單。

Set 底層就是用了dict。

[key=xxx,value=null]

10.二、經常使用set命令

sadd(增長),sismember(是否包含),scard(統計個數),srem(刪除),smembers(列出值)

server.natappfree.cc:0>sadd blacklist 1
"1"

server.natappfree.cc:0>sadd blacklist 2
"1"

server.natappfree.cc:0>sismember blacklist 2
"1"

server.natappfree.cc:0>sismember blacklist 3
"0"

server.natappfree.cc:0>scard blacklist
"2"

server.natappfree.cc:0>sadd blacklist 30
"1"

server.natappfree.cc:0>scard blacklist
"3"

server.natappfree.cc:0>smembers blacklist
 1)  "1"
 2)  "2"
 3)  "30"
server.natappfree.cc:0>srem blacklist 2
"1"

server.natappfree.cc:0>smembers blacklist
 1)  "1"
 2)  "30"
View Code

10.三、sdk操做

static void Main(string[] args)
{
    ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("server.natappfree.cc:39767");
    IDatabase db = redis.GetDatabase(0);


    db.SetAdd("blacklist", "2");

    var arr = db.SetMembers("blacklist");

    Console.WriteLine(string.Join(",", arr));

    var len = db.SetLength("blacklist");

    Console.WriteLine($"len={len}");

    db.SetRemove("blacklist", "1");

    Console.WriteLine(string.Join(",", db.SetMembers("blacklist")));




    Console.ReadKey();
}
View Code

10.四、源碼閱讀

  • scard(統計個數)源碼
    咱們首先查看 SCARD 命令,咱們仍是要先查看 src/server.c 中的 scard 對應的 scardCommand 命令
    void scardCommand(client *c) {
        robj *o;
    
        if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
            checkType(c,o,OBJ_SET)) return;
    
        addReplyLongLong(c,setTypeSize(o));
    }

    而後咱們查看 setTypeSize 方法

    unsigned long setTypeSize(const robj *subject) {
        if (subject->encoding == OBJ_ENCODING_HT) {
            return dictSize((const dict*)subject->ptr);
        } else if (subject->encoding == OBJ_ENCODING_INTSET) {
            return intsetLen((const intset*)subject->ptr);
        } else {
            serverPanic("Unknown set encoding");
        }
    }

    而後咱們查看 dictSize 方法

    #define dictSize(d) ((d)->ht[0].used+(d)->ht[1].used)

    這樣就側面印證了也是字典結構

  • sadd(增長)源碼
    咱們首先查看 SCARD 命令,咱們仍是要先查看 src/server.c 中的 sadd 對應的 saddCommand 命令
    void saddCommand(client *c) {
        robj *set;
        int j, added = 0;
    
        set = lookupKeyWrite(c->db,c->argv[1]);
        if (set == NULL) {
            set = setTypeCreate(c->argv[2]->ptr);
            dbAdd(c->db,c->argv[1],set);
        } else {
            if (set->type != OBJ_SET) {
                addReply(c,shared.wrongtypeerr);
                return;
            }
        }
    
        for (j = 2; j < c->argc; j++) {//遍歷多有的值,能夠添加多個值
            if (setTypeAdd(set,c->argv[j]->ptr)) added++;
        }
        if (added) {
            signalModifiedKey(c->db,c->argv[1]);
            notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[1],c->db->id);
        }
        server.dirty += added;
        addReplyLongLong(c,added);
    }

    咱們接下來查看 setTypeAdd 方法

    /* Add the specified value into a set.
     *
     * If the value was already member of the set, nothing is done and 0 is
     * returned, otherwise the new element is added and 1 is returned. */
    int setTypeAdd(robj *subject, sds value) {
        long long llval;
        if (subject->encoding == OBJ_ENCODING_HT) {
            dict *ht = subject->ptr;
            dictEntry *de = dictAddRaw(ht,value,NULL);
            if (de) {
                dictSetKey(ht,de,sdsdup(value));//添加key
                dictSetVal(ht,de,NULL);//添加value爲null,因此這是內有值得hash字典
                return 1;
            }
        } else if (subject->encoding == OBJ_ENCODING_INTSET) {
            if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) {
                uint8_t success = 0;
                subject->ptr = intsetAdd(subject->ptr,llval,&success);
                if (success) {
                    /* Convert to regular set when the intset contains
                     * too many entries. */
                    if (intsetLen(subject->ptr) > server.set_max_intset_entries)
                        setTypeConvert(subject,OBJ_ENCODING_HT);
                    return 1;
                }
            } else {
                /* Failed to get integer from object, convert to regular set. */
                setTypeConvert(subject,OBJ_ENCODING_HT);
    
                /* The set *was* an intset and this value is not integer
                 * encodable, so dictAdd should always work. */
                serverAssert(dictAdd(subject->ptr,sdsdup(value),NULL) == DICT_OK);
                return 1;
            }
        } else {
            serverPanic("Unknown set encoding");
        }
        return 0;
    }

10.五、HyperLogLogs統計

命令文檔地址:https://redis.io/commands#hyperloglog

  • 概況
    好比我有存儲數據3,3,1,5 ,那麼基數(去除重複後統計數)=3
    優勢:特別能節省空間,redis: 12k的空間,就能處理long個數據。 2的64次方 個數據(字符,數字)
    只能處理count統計,有必定的偏差,偏差率在 0.8%。
    原理: 就是使用數學中的 機率算法,不存儲數據自己,用 機率函數 預估基數值。f(x)=xxx.
    250萬 int = 1M
    2.5億 int 100M
  • pfadd, pfcount 命令使用
    server.natappfree.cc:0>pfadd p1 1
    "1"
    
    server.natappfree.cc:0>pfadd p1 2
    "1"
    
    server.natappfree.cc:0>pfadd p1 1
    "0"
    
    server.natappfree.cc:0>pfcount p1
    "2"
    View Code

十一、【SortedSet】跳躍表原理分析和topK場景中sdk應用

用途:用於範圍查找。。  10-100 的人數等等。。。

11.一、理解SortedSet底層結構 (skiplist)

跳躍表。 (本質上是解決查找的一個問題)
樹結構: avl,紅黑樹,伸展樹。
鏈表結構: 層級鏈表

<1> 有序的鏈表 (二分查找)
level1: O(N)
level1: 10 - 46 4次
level 2: 3次
leve1 3: -

 

level1: 作汽車: 上海 - 鎮江 -南京 - 石家莊 - 北京 (100站)
level2: 作高鐵: 上海 - 南京 - 天津 - 北京 (10站)
level3: 作飛機: 上海 - 北京 (1站)

11.二、源碼對照

  •  zskiplist
    咱們首先查看  zskiplist  方法,咱們仍是要先查看  src/server.c  中的對應方法
    typedef struct zskiplist {
        struct zskiplistNode *header, *tail; //頭尾節點
        unsigned long length;
        int level;
    } zskiplist;

    咱們接下來查看  zskiplistNode  方法

    /* ZSETs use a specialized version of Skiplists */
    typedef struct zskiplistNode {
        sds ele;    //原色
        double score;    //得分,相似於權重
        struct zskiplistNode *backward;    //回退指針
        struct zskiplistLevel {
            struct zskiplistNode *forward;    //向前指針
            unsigned long span;    //跳躍節點的區間
        } level[];
    } zskiplistNode;

11.三、應用場景介紹及sdk操做

首先咱們初始化消費者客戶的消費積分

class Program
{
    static void Main(string[] args)
    {
        ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379");

        IDatabase db = redis.GetDatabase(0);

        var rand = new Random();
        for (int i = 1; i <= 10000; i++)
        {
            var customerID = i;
            var totalTradeMoney = rand.Next(10, 10000);
            db.SortedSetAdd("shop", i, totalTradeMoney);
        }
        Console.WriteLine("插入成功!");


        Console.ReadKey();
    }
}
View Code

統計積分範圍個數

192.168.181.131:0>zcount shop 500 1000
"514"

新增一個用戶的積分

192.168.181.131:0>zadd shop 1 10001
"1"

刪除一個用戶的積分

192.168.181.131:0>zrem shop 10001
"1"

統計全部用戶數量

192.168.181.131:0>zcard shop
"10000"

查詢排序後10個

192.168.181.131:0>zrange shop 0 9
 1)  "5593"
 2)  "1459"
 3)  "2811"
 4)  "5043"
 5)  "5750"
 6)  "6601"
 7)  "337"
 8)  "7276"
 9)  "2917"
 10)  "6990"
192.168.181.131:0>zrange shop 0 9 with192.168.181.131:0>scores
 1)  "5593"
 2)  "11"
 3)  "1459"
 4)  "13"
 5)  "2811"
 6)  "15"
 7)  "5043"
 8)  "15"
 9)  "5750"
 10)  "15"
 11)  "6601"
 12)  "15"
 13)  "337"
 14)  "17"
 15)  "7276"
 16)  "17"
 17)  "2917"
 18)  "18"
 19)  "6990"
 20)  "19"
View Code

查詢top10

192.168.181.131:0>zrevrange shop 0 9
 1)  "4907"
 2)  "9796"
 3)  "6035"
 4)  "4261"
 5)  "2028"
 6)  "4611"
 7)  "4612"
 8)  "1399"
 9)  "2786"
 10)  "2696"
192.168.181.131:0>zrevrange shop 0 9 withscores
 1)  "4907"
 2)  "9999"
 3)  "9796"
 4)  "9998"
 5)  "6035"
 6)  "9995"
 7)  "4261"
 8)  "9995"
 9)  "2028"
 10)  "9995"
 11)  "4611"
 12)  "9994"
 13)  "4612"
 14)  "9992"
 15)  "1399"
 16)  "9992"
 17)  "2786"
 18)  "9990"
 19)  "2696"
 20)  "9989"
View Code

查詢排名

192.168.181.131:0>zrank shop192.168.181.131:0> 60
"9223"

 

實現業務邏輯:判斷某一個用戶是否在消費力前 25 % 的人羣,若是是,就是優質客戶了。(老客戶)

class Program
{
    static void Main(string[] args)
    {
        ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379");

        IDatabase db = redis.GetDatabase(0);


        var len = db.SortedSetLength("shop");

        var customerRank = len * 0.25;

        // 高端客戶
        var customerID = 60;

        var dbRank = db.SortedSetRank("shop", customerID, Order.Descending);


        Console.ReadKey();
    }
}
View Code

實現業務邏輯:獲取top10%的客戶,專門作重點維護。

class Program
{
    static void Main(string[] args)
    {
        ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379");

        IDatabase db = redis.GetDatabase(0);

        var len = db.SortedSetLength("shop");
        var top10 = len * 0.1;

        var vals = db.SortedSetRangeByRankWithScores("shop", order: Order.Descending);

        Console.ReadKey();
    }
}
View Code

 

十二、【序列化】理解redis的三大序列化存儲機制

12.一、RDB (Redis Database)

默認存儲文件: dump.rdb 

數據庫快照,加載速度快。redis意外退出會丟數據。

snapshot: 歷史時間點上的某一刻的全量數據。

咱們能夠查看 redis.conf 中的設置內存刷新數據到磁盤的觸發點

# Save the DB on disk:
#
#   save <seconds> <changes>
#
#   Will save the DB if both the given number of seconds and the given
#   number of write operations against the DB occurred.
#
#   In the example below the behaviour will be to save:
#   after 900 sec (15 min) if at least 1 key changed
#   after 300 sec (5 min) if at least 10 keys changed
#   after 60 sec if at least 10000 keys changed
#
#   Note: you can disable saving completely by commenting out all "save" lines.
#
#   It is also possible to remove all the previously configured save
#   points by adding a save directive with a single empty string argument
#   like in the following example:
#
#   save ""

save 900 1 //900秒內有1個數據改變則觸發
save 300 10 //300秒內有10個數據改變則觸發
save 60 10000 //60秒內有10000個數據改變則觸發

也能夠修改 redis.conf 中設置的保存文件名

# The filename where to dump the DB
dbfilename dump.rdb

咱們來模擬一下數據

./redis-server ./redis.conf
set username jack
kill -9 50565
./redis-server ./redis.conf


get username
(nil)

查看dump文件

[root@localhost redis]# od -c ./mydata/dump.rdb 
0000000   R   E   D   I   S   0   0   0   9 372  \t   r   e   d   i   s
0000020   -   v   e   r 005   5   .   0   .   3 372  \n   r   e   d   i
0000040   s   -   b   i   t   s 300   @ 372 005   c   t   i   m   e 302
0000060 271 226   N   \ 372  \b   u   s   e   d   -   m   e   m 302   @
0000100 347  \r  \0 372  \f   a   o   f   -   p   r   e   a   m   b   l
0000120   e 300  \0 377   t 344 312   Z   Y 363 323 231
0000134

使用redis工具查看dump文件

[root@localhost redis]# ./src/redis-check-rdb ./mydata/dump.rdb 
[offset 0] Checking RDB file ./mydata/dump.rdb
[offset 26] AUX FIELD redis-ver = '5.0.3'
[offset 40] AUX FIELD redis-bits = '64'
[offset 52] AUX FIELD ctime = '1548654265'
[offset 67] AUX FIELD used-mem = '911168'
[offset 83] AUX FIELD aof-preamble = '0'
[offset 92] Checksum OK
[offset 92] \o/ RDB looks OK! \o/
[info] 0 keys read
[info] 0 expires
[info] 0 already expired

12.二、AOF (Append Only File)

咱們能夠查看 redis.conf 中的設置內存刷新數據附加的觸發點

# no: don't fsync, just let the OS flush the data when it wants. Faster.
# always: fsync after every write to the append only log. Slow, Safest.
# everysec: fsync only one time every second. Compromise.
#
# The default is "everysec", as that's usually the right compromise between
# speed and data safety. It's up to you to understand if you can relax this to
# "no" that will let the operating system flush the output buffer when
# it wants, for better performances (but if you can live with the idea of
# some data loss consider the default persistence mode that's snapshotting),
# or on the contrary, use "always" that's very slow but a bit safer than
# everysec.
#
# More details please check the following article:
# http://antirez.com/post/redis-persistence-demystified.html
#
# If unsure, use "everysec".

# appendfsync always  //來一條附加一條到disk
appendfsync everysec   //每秒附加一條到disk
# appendfsync no  //由操做系統來決定

接下來咱們關閉RDB,開啓AOF

#save 900 1
#save 300 10
#save 60 10000


appendonly yes

而後進行數據存儲

127.0.0.1:6379> set username jack
OK
127.0.0.1:6379> set password 12345
OK

查看生成的 appendonly.aof 文件

[root@localhost mydata]# cat appendonly.aof 
*2
$6
SELECT
$1
0
*3
$3
set
$8
username
$4
jack
*3
$3
set
$8
password
$5
12345

AOF:加載慢,丟失數據少

RDB:加載快,丟失數據多

12.三、混合模式rdb + aof 模式

既保證加載速度快,有保證了丟失數據少。

如何開啓?咱們能夠修改 redis.conf 中的設置內存刷新數據到磁盤的觸發點

# When rewriting the AOF file, Redis is able to use an RDB preamble in the
# AOF file for faster rewrites and recoveries. When this option is turned
# on the rewritten AOF file is composed of two different stanzas:
#
#   [RDB file][AOF tail]
#
# When loading Redis recognizes that the AOF file starts with the "REDIS"
# string and loads the prefixed RDB file, and continues loading the AOF
# tail.
aof-use-rdb-preamble yes

咱們接下來輸入存儲數據

127.0.0.1:6379> flushall
OK
127.0.0.1:6379> set username jack
OK
127.0.0.1:6379> set password 12345
OK

這時候 appendonly.aof 中會被追加存儲命令信息

[root@localhost redis]# cat appendonly.aof 
*2
$6
SELECT
$1
0
*1
$8
flushall
*3
$3
set
$8
username
$4
jack
*3
$3
set
$8
password
$5
12345
*3
$3
set
$8
username
$4
jack
*3
$3
set
$8
password
$5
12345
*1
$8
flushall
*3
$3
set
$8
username
$4
jack
*3
$3
set
$8
password
$5
12345
View Code

接下來執行 bgrewriteaof 命令將aof文件內容寫入rdb

127.0.0.1:6379> bgrewriteaof
Background append only file rewriting started

這時候再查看 appendonly.aof 

[root@localhost redis]# cat appendonly.aof 
REDIS0009    redis-ver5.0.3
redis-bits󿿀򳨭e·­Nused-memÈ
𮤭preamble󿾁password󿿹usernamejackÿȵ

12.四、源碼解析

  • rdb源碼
    查看 rdbSaveRio 方法,該方法位於 src/rdb.c 中
    /* Produces a dump of the database in RDB format sending it to the specified
     * Redis I/O channel. On success C_OK is returned, otherwise C_ERR
     * is returned and part of the output, or all the output, can be
     * missing because of I/O errors.
     *
     * When the function returns C_ERR and if 'error' is not NULL, the
     * integer pointed by 'error' is set to the value of errno just after the I/O
     * error. */
    int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
        dictIterator *di = NULL;
        dictEntry *de;
        char magic[10];
        int j;
        uint64_t cksum;
        size_t processed = 0;
    
        if (server.rdb_checksum)
            rdb->update_cksum = rioGenericUpdateChecksum;
        snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
        if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
        if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;
    
        for (j = 0; j < server.dbnum; j++) {//for循環16個DB
            redisDb *db = server.db+j;
            dict *d = db->dict;//拿出全部的Key
            if (dictSize(d) == 0) continue;
            di = dictGetSafeIterator(d);
    
            /* Write the SELECT DB opcode */
            if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;
            if (rdbSaveLen(rdb,j) == -1) goto werr;
    
            /* Write the RESIZE DB opcode. We trim the size to UINT32_MAX, which
             * is currently the largest type we are able to represent in RDB sizes.
             * However this does not limit the actual size of the DB to load since
             * these sizes are just hints to resize the hash tables. */
            uint64_t db_size, expires_size;
            db_size = dictSize(db->dict);
            expires_size = dictSize(db->expires);
            if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
            if (rdbSaveLen(rdb,db_size) == -1) goto werr;
            if (rdbSaveLen(rdb,expires_size) == -1) goto werr;
    
            /* Iterate this DB writing every entry */
            while((de = dictNext(di)) != NULL) {
                sds keystr = dictGetKey(de);
                robj key, *o = dictGetVal(de);
                long long expire;
    
                initStaticStringObject(key,keystr);
                expire = getExpire(db,&key);
                if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;
    
                /* When this RDB is produced as part of an AOF rewrite, move
                 * accumulated diff from parent to child while rewriting in
                 * order to have a smaller final write. */
                if (flags & RDB_SAVE_AOF_PREAMBLE &&
                    rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
                {
                    processed = rdb->processed_bytes;
                    aofReadDiffFromParent();
                }
            }
            dictReleaseIterator(di);
            di = NULL; /* So that we don't release it again on error. */
        }
    
        /* If we are storing the replication information on disk, persist
         * the script cache as well: on successful PSYNC after a restart, we need
         * to be able to process any EVALSHA inside the replication backlog the
         * master will send us. */
        if (rsi && dictSize(server.lua_scripts)) {
            di = dictGetIterator(server.lua_scripts);
            while((de = dictNext(di)) != NULL) {
                robj *body = dictGetVal(de);
                if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1)
                    goto werr;
            }
            dictReleaseIterator(di);
            di = NULL; /* So that we don't release it again on error. */
        }
    
        /* EOF opcode */
        if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;
    
        /* CRC64 checksum. It will be zero if checksum computation is disabled, the
         * loading code skips the check in this case. */
        cksum = rdb->cksum;
        memrev64ifbe(&cksum);
        if (rioWrite(rdb,&cksum,8) == 0) goto werr;
        return C_OK;
    
    werr:
        if (error) *error = errno;
        if (di) dictReleaseIterator(di);
        return C_ERR;
    }
    View Code
  • aof源碼
    查看 rewriteAppendOnlyFile 方法,該方法位於 src/aof.c 中
    /* Write a sequence of commands able to fully rebuild the dataset into
     * "filename". Used both by REWRITEAOF and BGREWRITEAOF.
     *
     * In order to minimize the number of commands needed in the rewritten
     * log Redis uses variadic commands when possible, such as RPUSH, SADD
     * and ZADD. However at max AOF_REWRITE_ITEMS_PER_CMD items per time
     * are inserted using a single command. */
    int rewriteAppendOnlyFile(char *filename) {
        rio aof;
        FILE *fp;
        char tmpfile[256];
        char byte;
    
        /* Note that we have to use a different temp name here compared to the
         * one used by rewriteAppendOnlyFileBackground() function. */
        snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
        fp = fopen(tmpfile,"w");
        if (!fp) {
            serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
            return C_ERR;
        }
    
        server.aof_child_diff = sdsempty();
        rioInitWithFile(&aof,fp);
    
        if (server.aof_rewrite_incremental_fsync)
            rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES);
    
        if (server.aof_use_rdb_preamble) {//判斷是否設置了aof-use-rdb-preamble yes
            int error;
            if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) {
                errno = error;
                goto werr;
            }
        } else {
            if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
        }
    
        /* Do an initial slow fsync here while the parent is still sending
         * data, in order to make the next final fsync faster. */
        if (fflush(fp) == EOF) goto werr;
        if (fsync(fileno(fp)) == -1) goto werr;
    
        /* Read again a few times to get more data from the parent.
         * We can't read forever (the server may receive data from clients
         * faster than it is able to send data to the child), so we try to read
         * some more data in a loop as soon as there is a good chance more data
         * will come. If it looks like we are wasting time, we abort (this
         * happens after 20 ms without new data). */
        int nodata = 0;
        mstime_t start = mstime();
        while(mstime()-start < 1000 && nodata < 20) {
            if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0)
            {
                nodata++;
                continue;
            }
            nodata = 0; /* Start counting from zero, we stop on N *contiguous*
                           timeouts. */
            aofReadDiffFromParent();
        }
    
        /* Ask the master to stop sending diffs. */
        if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;
        if (anetNonBlock(NULL,server.aof_pipe_read_ack_from_parent) != ANET_OK)
            goto werr;
        /* We read the ACK from the server using a 10 seconds timeout. Normally
         * it should reply ASAP, but just in case we lose its reply, we are sure
         * the child will eventually get terminated. */
        if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 ||
            byte != '!') goto werr;
        serverLog(LL_NOTICE,"Parent agreed to stop sending diffs. Finalizing AOF...");
    
        /* Read the final diff if any. */
        aofReadDiffFromParent();
    
        /* Write the received diff to the file. */
        serverLog(LL_NOTICE,
            "Concatenating %.2f MB of AOF diff received from parent.",
            (double) sdslen(server.aof_child_diff) / (1024*1024));
        if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0)
            goto werr;
    
        /* Make sure data will not remain on the OS's output buffers */
        if (fflush(fp) == EOF) goto werr;
        if (fsync(fileno(fp)) == -1) goto werr;
        if (fclose(fp) == EOF) goto werr;
    
        /* Use RENAME to make sure the DB file is changed atomically only
         * if the generate DB file is ok. */
        if (rename(tmpfile,filename) == -1) {
            serverLog(LL_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
            unlink(tmpfile);
            return C_ERR;
        }
        serverLog(LL_NOTICE,"SYNC append only file rewrite performed");
        return C_OK;
    
    werr:
        serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
        fclose(fp);
        unlink(tmpfile);
        return C_ERR;
    }
    View Code

1三、【PubSub】發佈訂閱模式命令介紹和sdk實戰

13.一、概述

發佈訂閱模式:相似於觀察者模式,好比用戶下單以後,經過pubsub講全部訂閱這個主題的subscribe發送消息。

13.二、命令實現

命令地址:https://redis.io/commands#pubsub

經常使用功能:publish(發佈),subscribe(訂閱),psubcribe(模式訂閱)

  •  subscribe (用2個客戶端進行訂閱)
    127.0.0.1:6379> subscribe order
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"
    2) "order"
    3) (integer) 1
  •  publish (用一個客戶端進行發發送) 

    [root@localhost redis]# ./redis-cli 
    127.0.0.1:6379> publish order trade1
    (integer) 2    //顯示發送給了2個訂閱者
  • 這時候查看訂閱的客戶端,發現已經收到消息
    127.0.0.1:6379> subscribe order
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"
    2) "order"
    3) (integer) 1
    1) "message"
    2) "order"
    3) "trade1"
  •  psubcribe 支持三種模式的訂閱消息
    •  *  若是爲ord*則表示全部ord開頭的都能經過
    •  []  若是爲orde[er]則表示order和ordee能經過
    •    若是爲orde?則表示orde後面任意一個字符能經過
    • 示例1:
      訂閱端
      127.0.0.1:6379> psubscribe s*
      Reading messages... (press Ctrl-C to quit)
      1) "psubscribe"
      2) "s*"
      3) (integer) 1

      發佈端

      127.0.0.1:6379> publish shop shop1
      (integer) 2
      127.0.0.1:6379> publish order trade1
      (integer) 0

      訂閱端

      127.0.0.1:6379> psubscribe s*
      Reading messages... (press Ctrl-C to quit)
      1) "psubscribe"
      2) "s*"
      3) (integer) 1
      1) "pmessage"
      2) "s*"
      3) "shop"
      4) "shop1"

13.三、SDK實現

首先,令咱們的2個客戶端監控trade通道

127.0.0.1:6379> subscribe trade
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "trade"
3) (integer) 1

而後編寫c#代碼實現第三個客戶端

  • 非模式訂閱
    class Program
    {
        static void Main(string[] args)
        {
            ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379");
    
            IDatabase db = redis.GetDatabase(0);
    
    
            var subscriber = redis.GetSubscriber();
    
    
            //訂閱了channel=>trade
            //只要有 * 號 就認爲是 模式的。
            subscriber.Subscribe("trade", (channel, redisVaue) =>
            {
                Console.WriteLine($"message={redisVaue}");
            });
    
    
    
            Console.ReadKey();
        }
    }
    View Code

  • publish
    class Program
    {
        static void Main(string[] args)
        {
            ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379");
    
            IDatabase db = redis.GetDatabase(0);
    
    
            var subscriber = redis.GetSubscriber();
    
            for (int i = 0; i < 100; i++)
            {
                subscriber.Publish("trade", "t11111111111111");
            }
            Console.ReadKey();
        }
    }
    View Code
  • 模式訂閱
    class Program
    {
        static void Main(string[] args)
        {
            ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379");
            IDatabase db = redis.GetDatabase(0);
    
            var subscriber = redis.GetSubscriber();
            var redisChannel = new RedisChannel("trad[ae]", RedisChannel.PatternMode.Pattern);
    
            //訂閱了channel=>trade
            //只要有 * 號 就認爲是 模式的。
            subscriber.Subscribe(redisChannel, (channel, redisVaue) =>
            {
                Console.WriteLine($"message={redisVaue}");
            });
    
            Console.ReadKey();
        }
    }
    View Code

13.四、源碼解析

咱們首先查看 Pubsub 組成,咱們仍是要先查看 src/server.c 中的對應定義

/* Pubsub */
dict *pubsub_channels;  /* Map channels to list of subscribed clients */
list *pubsub_patterns;  /* A list of pubsub_patterns */
int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an
                               xor of NOTIFY_... flags. */

而後查看 publish 命令,咱們仍是要先查看 src/server.c 中的 publish 對應的 publishCommand 命令

void publishCommand(client *c) {
    int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);//推送消息
    if (server.cluster_enabled)
        clusterPropagatePublish(c->argv[1],c->argv[2]);
    else
        forceCommandPropagation(c,PROPAGATE_REPL);
    addReplyLongLong(c,receivers);
}

咱們接下來查看 pubsubPublishMessage 方法

/* Publish a message */
int pubsubPublishMessage(robj *channel, robj *message) {
    int receivers = 0;
    dictEntry *de;
    listNode *ln;
    listIter li;

    /* Send to clients listening for that channel */
    de = dictFind(server.pubsub_channels,channel);//獲取到通道全部client
    if (de) {
        list *list = dictGetVal(de);
        listNode *ln;
        listIter li;

        listRewind(list,&li);
        while ((ln = listNext(&li)) != NULL) {//遍歷全部client進行發送消息
            client *c = ln->value;

            addReply(c,shared.mbulkhdr[3]);
            addReply(c,shared.messagebulk);
            addReplyBulk(c,channel);
            addReplyBulk(c,message);
            receivers++;
        }
    }
    /* Send to clients listening to matching channels */
    if (listLength(server.pubsub_patterns)) {//獲取到通道模式適配的全部client
        listRewind(server.pubsub_patterns,&li);
        channel = getDecodedObject(channel);
        while ((ln = listNext(&li)) != NULL) {
            pubsubPattern *pat = ln->value;

            if (stringmatchlen((char*)pat->pattern->ptr,
                                sdslen(pat->pattern->ptr),
                                (char*)channel->ptr,
                                sdslen(channel->ptr),0)) {
                addReply(pat->client,shared.mbulkhdr[4]);
                addReply(pat->client,shared.pmessagebulk);
                addReplyBulk(pat->client,pat->pattern);
                addReplyBulk(pat->client,channel);
                addReplyBulk(pat->client,message);
                receivers++;
            }
        }
        decrRefCount(channel);
    }
    return receivers;
}
View Code

 

而後咱們再查看 subscribeCommand 方法

void subscribeCommand(client *c) {
    int j;

    for (j = 1; j < c->argc; j++)
        pubsubSubscribeChannel(c,c->argv[j]);
    c->flags |= CLIENT_PUBSUB;
}

咱們接下來查看 pubsubSubscribeChannel 方法

/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
 * 0 if the client was already subscribed to that channel. */
int pubsubSubscribeChannel(client *c, robj *channel) {
    dictEntry *de;
    list *clients = NULL;
    int retval = 0;

    /* Add the channel to the client -> channels hash table */
    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {//將channel加入到字典中
        retval = 1;
        incrRefCount(channel);
        /* Add the client to the channel -> list of clients hash table */
        de = dictFind(server.pubsub_channels,channel);
        if (de == NULL) {//若是爲null則生成一個list將客戶端塞進去
            clients = listCreate();
            dictAdd(server.pubsub_channels,channel,clients);
            incrRefCount(channel);
        } else {
            clients = dictGetVal(de);
        }
        listAddNodeTail(clients,c);//將當前client追加到鏈表末尾
    }
    /* Notify the client */
    addReply(c,shared.mbulkhdr[3]);
    addReply(c,shared.subscribebulk);
    addReplyBulk(c,channel);
    addReplyLongLong(c,clientSubscriptionsCount(c));
    return retval;
}
View Code

1四、【Tranaction】事務命令介紹和源碼閱讀

14.一、命令介紹

命令地址:https://redis.io/commands#transactions

經常使用命令:multi(開始),exec(執行),discard(丟棄) 命令的使用

命令示例:

127.0.0.1:6379> multi
OK
127.0.0.1:6379> set username jack
QUEUED
127.0.0.1:6379> set password 12345
QUEUED
127.0.0.1:6379> exec
1) OK
2) OK
127.0.0.1:6379> keys *
1) "password"
2) "username"
127.0.0.1:6379> flushall
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set username jack
QUEUED
127.0.0.1:6379> set password 12345
QUEUED
127.0.0.1:6379> discard
OK
127.0.0.1:6379> keys *
(empty list or set)
View Code

14.二、事務的一些坑

示例:

127.0.0.1:6379> flushall
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set username mary
QUEUED
127.0.0.1:6379> lpush username 1 2 3
QUEUED
127.0.0.1:6379> exec
1) OK
2) (error) WRONGTYPE Operation against a key holding the wrong kind of value
View Code

這時候咱們發現命令有一個未執行成功,這樣破壞了事務的原子性

14.三、watch 防止破壞事務的安全性

watch的目的是爲了在執行事務的時候若是命令key的值唄修改,則不會執行成功

示例:

//客戶端1
127.0.0.1:6379> watch username
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set username jack
QUEUED
//客戶端2
127.0.0.1:6379> set username mary
OK
//客戶端1
127.0.0.1:6379> exec
(nil)
127.0.0.1:6379> get username
"mary"

在client1的執行期間,修改了client1的事務中的某些數據類型的狀態。。。

14.三、sdk使用

class Program
{
    static void Main(string[] args)
    {
        ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379");
        IDatabase db = redis.GetDatabase(0);

        var transaction = db.CreateTransaction();

        transaction.StringSetAsync("username", "jack");
        transaction.StringSetAsync("password", "1234512345123451234512345123451234512345123451234512345123451234512345123451234512345123451234512345");

        transaction.Execute();

        Console.WriteLine("提交成功!");

        Console.ReadKey();
    }
}
View Code

Wireshark抓取網絡請求分析redis事務

14.四、源碼解析

  • client
    查看 client 定義,該方法位於 src/server.h 中
    /* With multiplexing we need to take per-client state.
     * Clients are taken in a linked list. */
    typedef struct client {
        uint64_t id;            /* Client incremental unique ID. */
        int fd;                 /* Client socket. */
        redisDb *db;            /* Pointer to currently SELECTed DB. */
        robj *name;             /* As set by CLIENT SETNAME. */
        sds querybuf;           /* Buffer we use to accumulate client queries. */
        size_t qb_pos;          /* The position we have read in querybuf. */
        sds pending_querybuf;   /* If this client is flagged as master, this buffer
                                   represents the yet not applied portion of the
                                   replication stream that we are receiving from
                                   the master. */
        size_t querybuf_peak;   /* Recent (100ms or more) peak of querybuf size. */
        int argc;               /* Num of arguments of current command. */
        robj **argv;            /* Arguments of current command. */
        struct redisCommand *cmd, *lastcmd;  /* Last command executed. */
        int reqtype;            /* Request protocol type: PROTO_REQ_* */
        int multibulklen;       /* Number of multi bulk arguments left to read. */
        long bulklen;           /* Length of bulk argument in multi bulk request. */
        list *reply;            /* List of reply objects to send to the client. */
        unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */
        size_t sentlen;         /* Amount of bytes already sent in the current
                                   buffer or object being sent. */
        time_t ctime;           /* Client creation time. */
        time_t lastinteraction; /* Time of the last interaction, used for timeout */
        time_t obuf_soft_limit_reached_time;
        int flags;              /* Client flags: CLIENT_* macros. */
        int authenticated;      /* When requirepass is non-NULL. */
        int replstate;          /* Replication state if this is a slave. */
        int repl_put_online_on_ack; /* Install slave write handler on ACK. */
        int repldbfd;           /* Replication DB file descriptor. */
        off_t repldboff;        /* Replication DB file offset. */
        off_t repldbsize;       /* Replication DB file size. */
        sds replpreamble;       /* Replication DB preamble. */
        long long read_reploff; /* Read replication offset if this is a master. */
        long long reploff;      /* Applied replication offset if this is a master. */
        long long repl_ack_off; /* Replication ack offset, if this is a slave. */
        long long repl_ack_time;/* Replication ack time, if this is a slave. */
        long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
                                           copying this slave output buffer
                                           should use. */
        char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */
        int slave_listening_port; /* As configured with: SLAVECONF listening-port */
        char slave_ip[NET_IP_STR_LEN]; /* Optionally given by REPLCONF ip-address */
        int slave_capa;         /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */ multiState mstate;      /* MULTI/EXEC state */    //保存全部的命令
        int btype;              /* Type of blocking op if CLIENT_BLOCKED. */
        blockingState bpop;     /* blocking state */
        long long woff;         /* Last write global replication offset. */
        list *watched_keys;     /* Keys WATCHED for MULTI/EXEC CAS */
        dict *pubsub_channels;  /* channels a client is interested in (SUBSCRIBE) */
        list *pubsub_patterns;  /* patterns a client is interested in (SUBSCRIBE) */
        sds peerid;             /* Cached peer ID. */
        listNode *client_list_node; /* list node in client list */
    
        /* Response buffer */
        int bufpos;
        char buf[PROTO_REPLY_CHUNK_BYTES];
    } client;

    咱們接下來查看 multiState 方法

    typedef struct multiState {
        multiCmd *commands;     /* Array of MULTI commands */    //存放命令的數組
        int count;              /* Total number of MULTI commands */
        int cmd_flags;          /* The accumulated command flags OR-ed together.
                                   So if at least a command has a given flag, it
                                   will be set in this field. */
        int minreplicas;        /* MINREPLICAS for synchronous replication */
        time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */
    } multiState;

    咱們接下來查看 multiCmd 方法

    /* Client MULTI/EXEC state */
    typedef struct multiCmd {
        robj **argv;    //數組命令具體的值
        int argc;    //參數個數
        struct redisCommand *cmd;    //具體執行的哪一個commond
    } multiCmd;

    查看常見的 Client flags ,在 src/server.h 中

    #define CLIENT_MULTI (1<<3)   /* This client is in a MULTI context */    //當前的客戶端是MULTI上下文

     

  • multi
    咱們首先查看 multi 命令,咱們仍是要先查看 src/server.c 中的 multi 對應的 multiCommand 命令
    void multiCommand(client *c) {
        if (c->flags & CLIENT_MULTI) {
            addReplyError(c,"MULTI calls can not be nested");
            return;
        }
        c->flags |= CLIENT_MULTI;//把當前flag置爲CLIENT_MULTI
        addReply(c,shared.ok);
    }

     

  • exec
    咱們首先查看 exec 命令,咱們仍是要先查看 src/server.c 中的 exec 對應的 execCommand 命令
    void execCommand(client *c) {
        int j;
        robj **orig_argv;
        int orig_argc;
        struct redisCommand *orig_cmd;
        int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */
        int was_master = server.masterhost == NULL;
    
        if (!(c->flags & CLIENT_MULTI)) {
            addReplyError(c,"EXEC without MULTI");
            return;
        }
    
        /* Check if we need to abort the EXEC because:
         * 1) Some WATCHed key was touched.
         * 2) There was a previous error while queueing commands.
         * A failed EXEC in the first case returns a multi bulk nil object
         * (technically it is not an error but a special behavior), while
         * in the second an EXECABORT error is returned. */
        if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) {
            addReply(c, c->flags & CLIENT_DIRTY_EXEC ? shared.execaborterr :
                                                      shared.nullmultibulk);
            discardTransaction(c);
            goto handle_monitor;
        }
    
        /* If there are write commands inside the transaction, and this is a read
         * only slave, we want to send an error. This happens when the transaction
         * was initiated when the instance was a master or a writable replica and
         * then the configuration changed (for example instance was turned into
         * a replica). */
        if (!server.loading && server.masterhost && server.repl_slave_ro &&
            !(c->flags & CLIENT_MASTER) && c->mstate.cmd_flags & CMD_WRITE)//判斷是否是CAS狀態,若是是的話,則取消
        {
            addReplyError(c,
                "Transaction contains write commands but instance "
                "is now a read-only slave. EXEC aborted.");
            discardTransaction(c); goto handle_monitor;
        }
    
        /* Exec all the queued commands */
        unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */ //接觸全部watch命令控制住的key
        orig_argv = c->argv;
        orig_argc = c->argc;
        orig_cmd = c->cmd;
        addReplyMultiBulkLen(c,c->mstate.count);
        for (j = 0; j < c->mstate.count; j++) {
            c->argc = c->mstate.commands[j].argc;
            c->argv = c->mstate.commands[j].argv;
            c->cmd = c->mstate.commands[j].cmd;
    
            /* Propagate a MULTI request once we encounter the first command which
             * is not readonly nor an administrative one.
             * This way we'll deliver the MULTI/..../EXEC block as a whole and
             * both the AOF and the replication link will have the same consistency
             * and atomicity guarantees. */
            if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN))) {
                execCommandPropagateMulti(c);    //以冒泡的形式執行
                must_propagate = 1;
            }
    
            call(c,server.loading ? CMD_CALL_NONE : CMD_CALL_FULL);
    
            /* Commands may alter argc/argv, restore mstate. */        //讀取全部的命令
            c->mstate.commands[j].argc = c->argc;
            c->mstate.commands[j].argv = c->argv;
            c->mstate.commands[j].cmd = c->cmd;
        }
        c->argv = orig_argv;
        c->argc = orig_argc;
        c->cmd = orig_cmd;
        discardTransaction(c);
    
        /* Make sure the EXEC command will be propagated as well if MULTI
         * was already propagated. */
        if (must_propagate) {
            int is_master = server.masterhost == NULL;
            server.dirty++;
            /* If inside the MULTI/EXEC block this instance was suddenly
             * switched from master to slave (using the SLAVEOF command), the
             * initial MULTI was propagated into the replication backlog, but the
             * rest was not. We need to make sure to at least terminate the
             * backlog with the final EXEC. */
            if (server.repl_backlog && was_master && !is_master) {
                char *execcmd = "*1\r\n$4\r\nEXEC\r\n";
                feedReplicationBacklog(execcmd,strlen(execcmd));
            }
        }
    
    handle_monitor:
        /* Send EXEC to clients waiting data from MONITOR. We do it here
         * since the natural order of commands execution is actually:
         * MUTLI, EXEC, ... commands inside transaction ...
         * Instead EXEC is flagged as CMD_SKIP_MONITOR in the command
         * table, and we do it here with correct ordering. */
        if (listLength(server.monitors) && !server.loading)
            replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
    }

     

  • watch
    查看 redisDb 方法,該方法位於 src/server.h 中
    /* Redis database representation. There are multiple databases identified
     * by integers from 0 (the default database) up to the max configured
     * database. The database number is the 'id' field in the structure. */
    typedef struct redisDb {
        dict *dict;                 /* The keyspace for this DB */
        dict *expires;              /* Timeout of keys with a timeout set */
        dict *blocking_keys;        /* Keys with clients waiting for data (BLPOP)*/
        dict *ready_keys;           /* Blocked keys that received a PUSH */
        dict *watched_keys;         /* WATCHED keys for MULTI/EXEC CAS */    //watch數組來存放MULTI/EXEC的watch key使之變成CAS狀態
        int id;                     /* Database ID */
        long long avg_ttl;          /* Average TTL, just for stats */
        list *defrag_later;         /* List of key names to attempt to defrag one by one, gradually. */
    } redisDb;

    這個地方的處理邏輯在於 t_string.c 中的 setCommand 方法中的 setGenericCommand 方法中的 setKey 方法, setKey 方法位於 db.c 中

    /* High level Set operation. This function can be used in order to set
     * a key, whatever it was existing or not, to a new object.
     *
     * 1) The ref count of the value object is incremented.
     * 2) clients WATCHing for the destination key notified.
     * 3) The expire time of the key is reset (the key is made persistent).
     *
     * All the new keys in the database should be created via this interface. */
    void setKey(redisDb *db, robj *key, robj *val) {
        if (lookupKeyWrite(db,key) == NULL) {
            dbAdd(db,key,val);
        } else {
            dbOverwrite(db,key,val);
        }
        incrRefCount(val);
        removeExpire(db,key);
        signalModifiedKey(db,key);    //通知修改key
    }

    咱們接下來查看 signalModifiedKey 方法

    /*-----------------------------------------------------------------------------
     * Hooks for key space changes.
     *
     * Every time a key in the database is modified the function
     * signalModifiedKey() is called.
     *
     * Every time a DB is flushed the function signalFlushDb() is called.
     *----------------------------------------------------------------------------*/
    
    void signalModifiedKey(redisDb *db, robj *key) {    //鉤子函數,全部key的修改都能監控到
     touchWatchedKey(db,key);
    }

    咱們接下來查看 touchWatchedKey 方法,位於 multi.c 文件中

    /* "Touch" a key, so that if this key is being WATCHed by some client the
     * next EXEC will fail. */
    void touchWatchedKey(redisDb *db, robj *key) {
        list *clients;
        listIter li;
        listNode *ln;
    
        if (dictSize(db->watched_keys) == 0) return;
        clients = dictFetchValue(db->watched_keys, key);    //拿出watched_keys中的全部客戶端,相似結構["username":{client1, client2,client3}]
        if (!clients) return;
    
        /* Mark all the clients watching this key as CLIENT_DIRTY_CAS */
        /* Check if we are already watching for this key */
        listRewind(clients,&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
    
            c->flags |= CLIENT_DIRTY_CAS;    //把全部的狀態所有設置成CAS狀態,後面執行exec的時候回進行該狀態判斷
        }
    }

1五、【Scan】億級key的刪除困惑之理解利器scan

15.一、Keys 硬遍歷的困惑

keys命令介紹:https://redis.io/commands#generic

背景介紹:最近有一個redis大概有1億個key,可是隨着有些店鋪的過時,我須要把keys找到刪除(一年一個週期),以減小redis內存的膨脹。若是直接使用使用直接 keys * 命令,則會形成redis卡死。

數據存儲格式:key: s1c1 => shopid=1 customerid= 1。      value: 總交易金額,總交易次數。因此獲取到的key爲key:s1c2, s1c3,  s2c1, s2c2

困惑緣由:因爲redis是單線程的,遍歷37w數據大約須要4s的時間,若是是上億級的數據會很耗時,因此數據量比較大的時候不建議使用keys

模擬數據插入:

class Program
{
    static void Main(string[] args)
    {
        ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379");
        IDatabase db = redis.GetDatabase(0);

        var rand = new Random();

        for (int i = 1; i < int.MaxValue; i++)
        {
            var customerID = rand.Next(1, 10000);

            var key = $"s{i}c{customerID}";
            var value = "";  //統計信息

            db.StringSet(key, value);
        }


        Console.WriteLine("提交成功!");

        Console.ReadKey();
    }
}
View Code

15.二、Scan 軟遍歷 

採用cursor遊標的模式,增量返回。不是像keys同樣全部都返回。而是採用遊標的形式從0開始,從0結束。。

語法結構:count: 返回的條數 【maxcount】,先讀取後匹配

SCAN cursor [MATCH pattern] [COUNT count]
  • SCAN:遍歷string
  • HSCAN:遍歷hash
  • ZSCAN:遍歷SortSet
  • SSCAN:遍歷Set

示例:

127.0.0.1:6379> scan 0 match s* count 10    //先從遊標0開始按模式匹配10個
1) "6553600"    //獲取到當前遊標6553600
2)  1) "s6320142c103"
    2) "s719732c3086"
    3) "s4214422c4224"
    4) "s6107971c7924"
    5) "s571181c6966"
    6) "s750494c9526"
    7) "s527442c5164"
    8) "s6580725c8456"
    9) "s4791604c5206"
   10) "s1556977c9206"
(1.95s)
127.0.0.1:6379> scan 6553600 match s* count 10        //從遊標6553600繼續按模式匹配10個
1) "6422528"    //獲取到當前遊標6422528
2)  1) "s4304862c8240"
    2) "s3414324c2227"
    3) "s4356115c2908"
    4) "s236939c720"
    5) "s3866928c1421"
    6) "s4228406c6939"
    7) "s5128352c6328"
    8) "s3357175c9411"
    9) "s2312242c5901"
   10) "s5774711c106"
127.0.0.1:6379> scan 6422528 match s* count 10        //從遊標6422528繼續按模式匹配10個
1) "7995392"    //獲取到當前遊標7995392
2)  1) "s1823975c1611"
    2) "s244495c2589"
    3) "s1786203c9731"
    4) "s6120152c2581"
    5) "s3939227c1146"
    6) "s2551230c1949"
    7) "s2603224c341"
    8) "s5598259c625"
    9) "s5823184c9255"
   10) "s3871444c9972"
(0.52s)
View Code

15.三、SDK實現

SDK中將keys 和 scan 合二爲一了。。

  • 若是你的sdk 版本比較低,或者不支持scan,那就是用keys
  • 若是你的key的個數比較少,可能就會是用到keys。。。
class Program
{
    static void Main(string[] args)
    {
        ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379");
        IDatabase db = redis.GetDatabase(0);

        var server = redis.GetServer("192.168.181.131:6379");

        var list = server.Keys(0, "s*", 10);

        // 底層幫你每次從server獲取10條,上層不用關心這個。。。
        // 自動幫你執行了  list = server.Keys(cursor, "s*", 10);
        var index = 1;

        foreach (var item in list)
        {
            Console.WriteLine(item);

            Console.WriteLine(index++);
        }


        Console.ReadKey();
    }
}
View Code

wireshark抓包

 

15.四、 源碼簡要研究

咱們首先查看 SCAN 命令,咱們仍是要先查看 src/server.c 中的 scan 對應的 scanCommand 命令

/* The SCAN command completely relies on scanGenericCommand. */
void scanCommand(client *c) {
    unsigned long cursor;
    if (parseScanCursorOrReply(c,c->argv[1],&cursor) == C_ERR) return;
    scanGenericCommand(c,NULL,cursor);
}

咱們接下來查看 scanGenericCommand 方法

/* This command implements SCAN, HSCAN and SSCAN commands.
 * If object 'o' is passed, then it must be a Hash or Set object, otherwise
 * if 'o' is NULL the command will operate on the dictionary associated with
 * the current database.
 *
 * When 'o' is not NULL the function assumes that the first argument in
 * the client arguments vector is a key so it skips it before iterating
 * in order to parse options.
 *
 * In the case of a Hash object the function returns both the field and value
 * of every element on the Hash. */
void scanGenericCommand(client *c, robj *o, unsigned long cursor) {
    int i, j;
    list *keys = listCreate();
    listNode *node, *nextnode;
    long count = 10;
    sds pat = NULL;
    int patlen = 0, use_pattern = 0;
    dict *ht;

    /* Object must be NULL (to iterate keys names), or the type of the object
     * must be Set, Sorted Set, or Hash. */
    serverAssert(o == NULL || o->type == OBJ_SET || o->type == OBJ_HASH ||
                o->type == OBJ_ZSET);

    /* Set i to the first option argument. The previous one is the cursor. */
    i = (o == NULL) ? 2 : 3; /* Skip the key argument if needed. */

    /* Step 1: Parse options. */    //第一步,轉換options,驗證count、match、*不能錯
    while (i < c->argc) {
        j = c->argc - i;
        if (!strcasecmp(c->argv[i]->ptr, "count") && j >= 2) {
            if (getLongFromObjectOrReply(c, c->argv[i+1], &count, NULL)
                != C_OK)
            {
                goto cleanup;
            }

            if (count < 1) {
                addReply(c,shared.syntaxerr);
                goto cleanup;
            }

            i += 2;
        } else if (!strcasecmp(c->argv[i]->ptr, "match") && j >= 2) {
            pat = c->argv[i+1]->ptr;
            patlen = sdslen(pat);

            /* The pattern always matches if it is exactly "*", so it is
             * equivalent to disabling it. */
            use_pattern = !(pat[0] == '*' && patlen == 1);

            i += 2;
        } else {
            addReply(c,shared.syntaxerr);
            goto cleanup;
        }
    }

    /* Step 2: Iterate the collection.
     *
     * Note that if the object is encoded with a ziplist, intset, or any other
     * representation that is not a hash table, we are sure that it is also
     * composed of a small number of elements. So to avoid taking state we
     * just return everything inside the object in a single call, setting the
     * cursor to zero to signal the end of the iteration. */

    /* Handle the case of a hash table. */        //第二步,迭代集合
    ht = NULL;
    if (o == NULL) {
        ht = c->db->dict;
    } else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HT) {
        ht = o->ptr;
    } else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) {
        ht = o->ptr;
        count *= 2; /* We return key / value for this type. */
    } else if (o->type == OBJ_ZSET && o->encoding == OBJ_ENCODING_SKIPLIST) {
        zset *zs = o->ptr;
        ht = zs->dict;
        count *= 2; /* We return key / value for this type. */
    }

    if (ht) {
        void *privdata[2];
        /* We set the max number of iterations to ten times the specified
         * COUNT, so if the hash table is in a pathological state (very
         * sparsely populated) we avoid to block too much time at the cost
         * of returning no or very few elements. */
        long maxiterations = count*10;

        /* We pass two pointers to the callback: the list to which it will
         * add new elements, and the object containing the dictionary so that
         * it is possible to fetch more data in a type-dependent way. */
        privdata[0] = keys;
        privdata[1] = o;
        do {
            cursor = dictScan(ht, cursor, scanCallback, NULL, privdata);
        } while (cursor &&
              maxiterations-- &&
              listLength(keys) < (unsigned long)count);
    } else if (o->type == OBJ_SET) {
        int pos = 0;
        int64_t ll;

        while(intsetGet(o->ptr,pos++,&ll))
            listAddNodeTail(keys,createStringObjectFromLongLong(ll));
        cursor = 0;
    } else if (o->type == OBJ_HASH || o->type == OBJ_ZSET) {
        unsigned char *p = ziplistIndex(o->ptr,0);
        unsigned char *vstr;
        unsigned int vlen;
        long long vll;

        while(p) {
            ziplistGet(p,&vstr,&vlen,&vll);
            listAddNodeTail(keys,
                (vstr != NULL) ? createStringObject((char*)vstr,vlen) :
                                 createStringObjectFromLongLong(vll));
            p = ziplistNext(o->ptr,p);
        }
        cursor = 0;
    } else {
        serverPanic("Not handled encoding in SCAN.");
    }

    /* Step 3: Filter elements. */        //第三步,過濾元素
    node = listFirst(keys);
    while (node) {
        robj *kobj = listNodeValue(node);
        nextnode = listNextNode(node);
        int filter = 0;

        /* Filter element if it does not match the pattern. */        //使用模式去匹配
        if (!filter && use_pattern) {
            if (sdsEncodedObject(kobj)) {
                if (!stringmatchlen(pat, patlen, kobj->ptr, sdslen(kobj->ptr), 0))
                    filter = 1;
            } else {
                char buf[LONG_STR_SIZE];
                int len;

                serverAssert(kobj->encoding == OBJ_ENCODING_INT);
                len = ll2string(buf,sizeof(buf),(long)kobj->ptr);
                if (!stringmatchlen(pat, patlen, buf, len, 0)) filter = 1;
            }
        }

        /* Filter element if it is an expired key. */        //判斷key有沒有過時,過時也進行刪除
        if (!filter && o == NULL && expireIfNeeded(c->db, kobj)) filter = 1;

        /* Remove the element and its associted value if needed. */        //刪除元素關聯的值
        if (filter) {
            decrRefCount(kobj);
            listDelNode(keys, node);
        }

        /* If this is a hash or a sorted set, we have a flat list of
         * key-value elements, so if this element was filtered, remove the
         * value, or skip it if it was not filtered: we only match keys. */
        if (o && (o->type == OBJ_ZSET || o->type == OBJ_HASH)) {
            node = nextnode;
            nextnode = listNextNode(node);
            if (filter) {
                kobj = listNodeValue(node);
                decrRefCount(kobj);
                listDelNode(keys, node);
            }
        }
        node = nextnode;
    }

    /* Step 4: Reply to the client. */        //第四部,響應客戶端
    addReplyMultiBulkLen(c, 2);
    addReplyBulkLongLong(c,cursor);

    addReplyMultiBulkLen(c, listLength(keys));
    while ((node = listFirst(keys)) != NULL) {
        robj *kobj = listNodeValue(node);
        addReplyBulk(c, kobj);
        decrRefCount(kobj);
        listDelNode(keys, node);
    }

cleanup:
    listSetFreeMethod(keys,decrRefCountVoid);
    listRelease(keys);
}
View Code

咱們重點查看一下第二步的 dictScan ,在 dict.c 文件中

/* dictScan() is used to iterate over the elements of a dictionary.
 * dictscan()用於迭代字典的元素。
 * 
 * Iterating works the following way:
 * 迭代的工做方式以下:
 *
 * 1) Initially you call the function using a cursor (v) value of 0.
 * 1)最初使用光標(v)值0調用函數。
 * 2) The function performs one step of the iteration, and returns the
 *    new cursor value you must use in the next call.
 * 2)函數執行迭代的一個步驟,並返回下一次調用中必須使用的新光標值。
 * 3) When the returned cursor is 0, the iteration is complete.
 * 3)當返回的光標爲0時,迭代完成。
 *
 * The function guarantees all elements present in the
 * dictionary get returned between the start and end of the iteration.
 * However it is possible some elements get returned multiple times.
 * 函數確保在迭代的開始和結束之間返回字典中的全部元素。可是,某些元素可能會屢次返回。
 *
 * For every element returned, the callback argument 'fn' is
 * called with 'privdata' as first argument and the dictionary entry
 * 'de' as second argument.
 * 對於返回的每一個元素,調用回調參數「fn」,第一個參數爲「privdata」,第二個參數爲字典條目「de」。
 *
 * HOW IT WORKS.
 * 它是如何工做的。
 *
 * The iteration algorithm was designed by Pieter Noordhuis.
 * The main idea is to increment a cursor starting from the higher order
 * bits. That is, instead of incrementing the cursor normally, the bits
 * of the cursor are reversed, then the cursor is incremented, and finally
 * the bits are reversed again.
 * 迭代算法由Pieter Noordhuis設計。主要思想是從高階位開始增長光標。也就是說,不是一般遞增光標,而是反轉光標的位,而後遞增光標,最後再次反轉位。
 *
 * This strategy is needed because the hash table may be resized between
 * iteration calls.
 * 須要使用此策略,由於哈希表可能在迭代調用之間調整大小。
 *
 * dict.c hash tables are always power of two in size, and they
 * use chaining, so the position of an element in a given table is given
 * by computing the bitwise AND between Hash(key) and SIZE-1
 * (where SIZE-1 is always the mask that is equivalent to taking the rest
 *  of the division between the Hash of the key and SIZE).
 *  dict.c散列表的大小老是2的冪,它們使用連接,所以經過計算散列(鍵)和大小-1之間的位和(其中,大小-1始終是等同於在鍵的散列和大小之間進行其他除法的掩碼)來給出給定表中元素的位置。
 *
 * For example if the current hash table size is 16, the mask is
 * (in binary) 1111. The position of a key in the hash table will always be
 * the last four bits of the hash output, and so forth.
 * 例如,若是當前哈希表大小爲16,則掩碼爲(二進制)1111。鍵在哈希表中的位置始終是哈希輸出的最後四位,以此類推。
 *
 * WHAT HAPPENS IF THE TABLE CHANGES IN SIZE?
 * 若是表的大小發生了變化,會發生什麼?
 *
 * If the hash table grows, elements can go anywhere in one multiple of
 * the old bucket: for example let's say we already iterated with
 * a 4 bit cursor 1100 (the mask is 1111 because hash table size = 16).
 * 若是散列表增加,元素能夠在舊bucket的一個倍數中移動到任何地方:例如,假設咱們已經使用4位光標1100進行了迭代(掩碼爲1111,由於散列表大小=16)。
 *
 * If the hash table will be resized to 64 elements, then the new mask will
 * be 111111. The new buckets you obtain by substituting in ??1100
 * with either 0 or 1 can be targeted only by keys we already visited
 * when scanning the bucket 1100 in the smaller hash table.
 * 若是哈希表將被調整爲64個元素,那麼新的掩碼將是111111。你用替換的方法獲得的新桶??只有在掃描較小哈希表中的bucket 1100時,咱們已經訪問過的鍵才能針對0或1的1100。
 *
 * By iterating the higher bits first, because of the inverted counter, the
 * cursor does not need to restart if the table size gets bigger. It will
 * continue iterating using cursors without '1100' at the end, and also
 * without any other combination of the final 4 bits already explored.
 * 經過首先迭代更高的位,因爲計數器是反向的,若是表的大小變大,光標就不須要從新啓動。它將繼續使用光標進行迭代,結尾不帶「1100」,也不包含已探索的最後4位的任何其餘組合。
 *
 * Similarly when the table size shrinks over time, for example going from
 * 16 to 8, if a combination of the lower three bits (the mask for size 8
 * is 111) were already completely explored, it would not be visited again
 * because we are sure we tried, for example, both 0111 and 1111 (all the
 * variations of the higher bit) so we don't need to test it again.
 * 一樣地,當表大小隨着時間而縮小時,例如從16到8,若是已經徹底探索了較低的三位(8大小的掩碼是111)的組合,則不會再次訪問它,由於咱們肯定已嘗試過,例如,0111和1111(較高位的全部變化),所以咱們不須要再次測試它。
 *
 * WAIT... YOU HAVE *TWO* TABLES DURING REHASHING!
 * 
 *
 * Yes, this is true, but we always iterate the smaller table first, then
 * we test all the expansions of the current cursor into the larger
 * table. For example if the current cursor is 101 and we also have a
 * larger table of size 16, we also test (0)101 and (1)101 inside the larger
 * table. This reduces the problem back to having only one table, where
 * the larger one, if it exists, is just an expansion of the smaller one.
 * 是的,這是正確的,但咱們老是先迭代較小的表,而後將當前光標的全部擴展測試到較大的表中。例如,若是當前光標是101,而且咱們還有一個更大的表,大小爲16,那麼咱們還將在更大的表中測試(0)101和(1)101。這將問題減小到只有一個表,其中較大的表(若是存在)只是較小表的擴展。
 *
 * LIMITATIONS
 * 侷限性
 *
 * This iterator is completely stateless, and this is a huge advantage,
 * including no additional memory used.
 * 這個迭代器是徹底無狀態的,這是一個巨大的優點,包括沒有使用額外的內存。
 *
 * The disadvantages resulting from this design are:
 * 這種設計的缺點是:
 *
 * 1) It is possible we return elements more than once. However this is usually
 *    easy to deal with in the application level.
 * 2) The iterator must return multiple elements per call, as it needs to always
 *    return all the keys chained in a given bucket, and all the expansions, so
 *    we are sure we don't miss keys moving during rehashing.
 * 3) The reverse cursor is somewhat hard to understand at first, but this
 *    comment is supposed to help.
 * 1)咱們可能會屢次返回元素。然而,這一般在應用程序級別很容易處理。
 * 2)迭代器每次調用必須返回多個元素,由於它須要始終返回一個給定bucket中連接的全部鍵以及全部擴展,所以咱們確信在從新刷新期間不會錯過鍵的移動。
 * 3)反向光標一開始有點難理解,可是這個註釋應該有幫助。
 */
unsigned long dictScan(dict *d,
                       unsigned long v,
                       dictScanFunction *fn,
                       dictScanBucketFunction* bucketfn,
                       void *privdata)
{
    dictht *t0, *t1;
    const dictEntry *de, *next;
    unsigned long m0, m1;

    if (dictSize(d) == 0) return 0;

    if (!dictIsRehashing(d)) {
        t0 = &(d->ht[0]);
        m0 = t0->sizemask;

        /* Emit entries at cursor */
        if (bucketfn) bucketfn(privdata, &t0->table[v & m0]);
        de = t0->table[v & m0];
        while (de) {
            next = de->next;
            fn(privdata, de);
            de = next;
        }

        /* Set unmasked bits so incrementing the reversed cursor
         * operates on the masked bits */
        v |= ~m0;

        /* Increment the reverse cursor */
        v = rev(v);
        v++;
        v = rev(v);

    } else {
        t0 = &d->ht[0];
        t1 = &d->ht[1];

        /* Make sure t0 is the smaller and t1 is the bigger table */
        if (t0->size > t1->size) {
            t0 = &d->ht[1];
            t1 = &d->ht[0];
        }

        m0 = t0->sizemask;
        m1 = t1->sizemask;

        /* Emit entries at cursor */
        if (bucketfn) bucketfn(privdata, &t0->table[v & m0]);
        de = t0->table[v & m0];
        while (de) {
            next = de->next;
            fn(privdata, de);
            de = next;
        }

        /* Iterate over indices in larger table that are the expansion
         * of the index pointed to by the cursor in the smaller table */
        do {
            /* Emit entries at cursor */
            if (bucketfn) bucketfn(privdata, &t1->table[v & m1]);
            de = t1->table[v & m1];
            while (de) {
                next = de->next;
                fn(privdata, de);
                de = next;
            }

            /* Increment the reverse cursor not covered by the smaller mask.*/
            v |= ~m1;
            v = rev(v);
            v++;
            v = rev(v);

            /* Continue while bits covered by mask difference is non-zero */
        } while (v & (m0 ^ m1));
    }

    return v;
}
View Code

1六、【Lua】腳本的幾個案例介紹及對scan的優化

16.一、Lua簡介

Lua 腳本功能是 Reids 2.6 版本的最大亮點, 經過內嵌對 Lua 環境的支持, Redis 解決了長久以來不能高效地處理 CAS (check-and-set)命令的缺點, 而且能夠經過組合使用多個命令, 輕鬆實現之前很難實現或者不能高效實現的模式。(其實他就至關於關係數據庫的 存儲過程)

假設咱們存儲了userinfo age1 20 age2 25 age3 28,若是咱們要找到hash中小於指定age的全部kv。咱們只能使用 gethashall 命令取出所有數據或者 getkeys 取出全部key,而後再逐一進行查詢,Lua腳本就是來解決這一問題的

16.二、經常使用命令介紹

命令地址:https://redis.io/commands#scripting

經常使用命令:EVAL,EVALSHA, SCRIPT LOAD, SCRIPT FLUSH

  • EVAL
    語法:EVAL script numkeys key [key ...] arg [arg ...]
    示例:
    //KEYS表示鍵,ARGV表示值
    127.0.0.1:6379> EVAL "return KEYS[1]+KEYS[2]" 2 1 5
    (integer) 6
    (0.57s)
    127.0.0.1:6379> EVAL "return KEYS[1]+ARGV[1]+ARGV[2]" 1 1 10 20
    (integer) 31

    咱們還能夠將其寫成lua腳本,使用 file.lua 形式灌入

    //建立test.lua文件
    vim test.lua
    //文件中寫入
    return KEYS[1]+ARGV[1]+ARGV[2];
    //而後執行(注意使用lua腳本時,key和value用,號進行分隔,中間還應有空格)
    [root@localhost redis]# ./redis-cli --eval ./test.lua 1 , 10 20
    (integer) 31
  • SCRIPT LOAD + EVALSHA
    把腳本在redis server 中進行緩存,這樣不用每次使用的時候再去進行編譯了
    127.0.0.1:6379> SCRIPT LOAD "return KEYS[1]+KEYS[2]"
    "7b23d2a5829679ac50baf7c8e105904a3e9e69bb"
    127.0.0.1:6379> EVALSHA 7b23d2a5829679ac50baf7c8e105904a3e9e69bb 2 1 5
    (integer) 6

16.三、LUA腳本

情景描述:首先咱們優化以前的SCAN查找,咱們先進性篩選查找,而後將數據存放到List<string>集合中,而後進行遍歷刪除,這樣就涉及到客戶端與服務端的頻繁數據往返。

那麼咱們能夠經過Lua腳本的方式解決這個問題。

16.3.一、按模式刪除數據

初始化測試數據:

127.0.0.1:6379> flushall
OK
127.0.0.1:6379> set s1c1 1
OK
127.0.0.1:6379> set s1c2 2
OK
127.0.0.1:6379> set s1c3 3
OK
127.0.0.1:6379> set s1c4 4
OK
127.0.0.1:6379> set s1c5 5
OK
127.0.0.1:6379> set s1c6 6
OK
127.0.0.1:6379> set s1c7 7
OK
127.0.0.1:6379> set s1c8 8
OK
127.0.0.1:6379> set s1c9 9
OK
127.0.0.1:6379> set s1c10 10
OK
127.0.0.1:6379> set s2c1 1
OK
View Code

測試查詢數據:

127.0.0.1:6379> scan 0 match s1c* count 5
1) "10"
2) 1) "s1c8"
   2) "s1c10"
   3) "s1c3"
   4) "s1c2"
   5) "s1c1"

test.lua腳本內容:

local pattern=KEYS[1];
local result={};
local cursor=0;

while (true) do

    -- 匹配slc*
    -- redis.call :至關於在server端調用redis的相應命令。
    -- redis.call返回table結構 => dict 至關於c#中的dictionary字典
    local dict=redis.call("scan",cursor,"match",pattern);
    
    -- 1.獲取cursor
    cursor=dict[1];
    -- 1.獲取返回的keys的table
    local keyslist=dict[2];
    
    -- 2.獲取要刪除的keys
    for idx,value in pairs(keyslist) do
        local isSuccess=redis.call("del",value);
        if(isSuccess==1)then
            table.insert(result.isSuccess);-- 插入到result中
        end
    end
    
    print(cursor);
    
    if(cursor=="0")then
        break;
    end

end

return result;
View Code

咱們能夠在本地寫好腳本,而後再在 linux 系統中使用 rz/sz 命令進行接收和發送文件(注意:rz命令從客戶端進行發送時,去確保接收路徑沒有重複文件,否則會傳輸失敗或者使用rz -y強制覆蓋

sz:將選定的文件發送(send)到本地機器 
rz:運行該命令會彈出一個文件選擇窗口,從本地選擇文件上傳到Linux服務器

安裝命令: 
yum install lrzsz

從服務端發送文件到客戶端: 
sz filename 
從客戶端上傳文件到服務端: 
rz 
在彈出的框中選擇文件,上傳文件的用戶和組是當前登陸的用戶

Xshell設置默認路徑: 
右鍵會話 -> 屬性 -> ZMODEM -> 接收文件夾
View Code

而後執行命令刪除"s1c"開頭的全部數據,刪除成功後只會剩一條數據

[root@localhost redis]# ./redis-cli --eval ./test.lua "s1c*"
 1) (integer) 1
 2) (integer) 1
 3) (integer) 1
 4) (integer) 1
 5) (integer) 1
 6) (integer) 1
 7) (integer) 1
 8) (integer) 1
 9) (integer) 1
10) (integer) 1
127.0.0.1:6379> keys *
1) "s2c1"

16.3.二、找到hash中小於指定age的全部kv

目標:刪除age大於25的kv

初始化數據:

127.0.0.1:6379> flushall
OK
127.0.0.1:6379> hset userinfo age1 20
(integer) 1
127.0.0.1:6379> hset userinfo age2 25
(integer) 1
127.0.0.1:6379> hset userinfo age3 28
(integer) 1
127.0.0.1:6379> hset userinfo age4 30
(integer) 1
View Code

測試查詢數據:

127.0.0.1:6379> hkeys userinfo
1) "age1"
2) "age2"
3) "age3"
4) "age4"

hash.lua腳本內容:

local userinfo=KEYS[1];  --db 的 key
local age=KEYS[2];

local hkeys=redis.call("hkeys",userinfo);

for k,v in pairs(hkeys) do
    local hval= redis.call("hget",userinfo,v);

    -- 若是hval 大於指定的 age,直接刪除
    if(tonumber(hval) > tonumber(age)) then
        redis.call("hdel",userinfo,v); 
        print (v .. " del ok");
    end
end

return 1;
View Code

而後執行命令刪除userinfo中age大於25的全部數據,刪除成功後只會剩2條數據

[root@localhost redis]# ./redis-cli --eval ./hash.lua userinfo 25
(integer) 1
[root@localhost redis]# ./redis-cli
127.0.0.1:6379> hkeys userinfo
1) "age1"
2) "age2"

16.四、SDK實現

sdk中的實現邏輯是讀取本地lua文件中的腳本信息,而後提交到redis-server中去執行

class Program
{
    static void Main(string[] args)
    {
        ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379");
        IDatabase db = redis.GetDatabase(0);

        var script = File.ReadAllText(@"hash.lua", Encoding.Default);
        var result = db.ScriptEvaluate(script, new RedisKey[2] { "userinfo", "25" });

        Console.WriteLine("執行成功");
        Console.ReadKey();
    }
}
View Code

1七、【性能優化】介紹使用四種方式實現大批量數據急速插入

17.一、場景介紹

如何短期內向redis灌入大量數據,源於千人千面場景,存儲s*c*(針對shop和customer統計信息進行存儲)。

  • 普通模式 的龜速插入
    10w條: 50s左右
    class Program
    {
        static void Main(string[] args)
        {
            ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379");
            IDatabase db = redis.GetDatabase(0);
    
            Stopwatch sw=new Stopwatch();
            sw.Start();
    
            for (int i = 0; i < 100000; i++)
            {
                db.StringSet(i.ToString(), i.ToString());
            }
    
            sw.Stop();
            Console.WriteLine(sw.ElapsedMilliseconds);
            Console.WriteLine("執行成功");
    
            Console.ReadKey();
        }
    }
    View Code

  • 緣由分析及優化  (Round-Trip)

    優化思路:減小round-trip,10萬次請求就是10萬次round-trip

17.二、SDK演示速度大比拼 

batch.lua 腳本

--  KEYS[1] 轉化爲json數組
local str=KEYS[1];
local arr=cjson.decode(str);

local result={};

for idx,v in pairs(arr) do
    local isSuccess= redis.call("set",v.k,v.v);
    table.insert(result,isSuccess);
end

return result;
View Code

c# SDK 代碼

class Program
{
    static void Main(string[] args)
    {
        ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379");
        IDatabase db = redis.GetDatabase(0);

       
        var dict=new Dictionary<int, List<KeyValuePair<RedisKey, RedisValue>>>();
        // 100個shop
        for (int i = 0; i <= 10; i++)
        {

            //1w 個 key
            var smallList = Enumerable.Range(0, 10000).Select(m => KeyValuePair.Create<RedisKey, RedisValue>(Guid.NewGuid().ToString(),
                Guid.NewGuid().ToString())).ToList();

            dict.Add(i,smallList);
        }

        var stopwatch = Stopwatch.StartNew();

        //1. transaction (1w條插入一次)
        foreach (var item in dict)
        {
            var transaction = db.CreateTransaction();

            foreach (var model in item.Value)
            {
                transaction.StringSetAsync(model.Key, model.Value);
            }

            transaction.Execute();

            Console.WriteLine($"transaction {item.Key} 批次執行完畢");
        }
        Console.WriteLine($"transaction 耗費的時間:{stopwatch.ElapsedMilliseconds}");

        stopwatch.Restart();

        //2. mset (1w條插入一次)
        foreach (var item in dict)
        {
            db.StringSet(item.Value.ToArray());

            Console.WriteLine($"mset {item.Key} 批次執行完畢");
        }

        Console.WriteLine($"mset耗費的時間:{stopwatch.ElapsedMilliseconds}");

        stopwatch.Restart();

        //3. pipeline (1w條插入一次)
        foreach (var item in dict)
        {
            var batch = db.CreateBatch();

            foreach (var model in item.Value)
            {
                batch.StringSetAsync(model.Key, model.Value);
            }

            batch.Execute();

            Console.WriteLine($"batch {item.Key} 批次執行完畢");
        }

        Console.WriteLine($"batch 耗費的時間:{stopwatch.ElapsedMilliseconds}");

        
        stopwatch.Restart();

        //4. lua腳本 (1w條插入一次)
        foreach (var item in dict)
        {
            var list = item.Value.Select(i => new model() { k = i.Key, v = i.Value });

            db.ScriptEvaluate(File.ReadAllText(@"batch.lua", Encoding.Default),
                new RedisKey[] { JsonConvert.SerializeObject(list) });

            Console.WriteLine($"lua {item.Key} 批次執行完畢");
        }

        Console.WriteLine($"lua 耗費的時間:{stopwatch.ElapsedMilliseconds}");

        stopwatch.Restart();

        //5. normal (一條一次)
        foreach (var item in dict)
        {
            foreach (var model in item.Value)
            {
                db.StringSet(model.Key, model.Value);
            }

            Console.WriteLine($"normal {item.Key} 批次執行完畢");
        }

        Console.WriteLine($"normal 耗費的時間:{stopwatch.ElapsedMilliseconds}");


        Console.ReadKey();
    }
}

public class model
{
    public string k { get; set; }

    public string v { get; set; }
}
View Code

時間統計

...
transaction 耗費的時間:1060
...
mset耗費的時間:511
...
batch 耗費的時間:819
...
lua 耗費的時間:1504
...
normal 耗費的時間:61657
View Code

1八、【限制內存】限制redis的最大內存介紹及代碼測試

有些人可能真的會把Redis當作緩存來使用。由於緩存使用無止境,全部一般會配一個 maxmemory 限制redis最大內存。

18.一、設置最大內存(maxmemory)、內存超出使用策略(maxmemory-policy)

修改redis.conf默認參數(maxmemory、maxmemory-policy)

############################## MEMORY MANAGEMENT ################################

# Set a memory usage limit to the specified amount of bytes.
# When the memory limit is reached Redis will try to remove keys
# according to the eviction policy selected (see maxmemory-policy).
#
# If Redis can't remove keys according to the policy, or if the policy is
# set to 'noeviction', Redis will start to reply with errors to commands
# that would use more memory, like SET, LPUSH, and so on, and will continue
# to reply to read-only commands like GET.
#
# This option is usually useful when using Redis as an LRU or LFU cache, or to
# set a hard memory limit for an instance (using the 'noeviction' policy).
#
# WARNING: If you have replicas attached to an instance with maxmemory on,
# the size of the output buffers needed to feed the replicas are subtracted
# from the used memory count, so that network problems / resyncs will
# not trigger a loop where keys are evicted, and in turn the output
# buffer of replicas is full with DELs of keys evicted triggering the deletion
# of more keys, and so forth until the database is completely emptied.
#
# In short... if you have replicas attached it is suggested that you set a lower
# limit for maxmemory so that there is some free RAM on the system for replica
# output buffers (but this is not needed if the policy is 'noeviction').
#
# maxmemory <bytes>
maxmemory 104857600  //100M=1024*1024*100  最大內存設置100M

# MAXMEMORY POLICY: how Redis will select what to remove when maxmemory
# is reached. You can select among five behaviors:
#
# volatile-lru -> Evict using approximated LRU among the keys with an expire set.
# allkeys-lru -> Evict any key using approximated LRU.
# volatile-lfu -> Evict using approximated LFU among the keys with an expire set.
# allkeys-lfu -> Evict any key using approximated LFU.
# volatile-random -> Remove a random key among the ones with an expire set.
# allkeys-random -> Remove a random key, any key.
# volatile-ttl -> Remove the key with the nearest expire time (minor TTL)
# noeviction -> Don't evict anything, just return an error on write operations.
#
# LRU means Least Recently Used
# LFU means Least Frequently Used
#
# Both LRU, LFU and volatile-ttl are implemented using approximated
# randomized algorithms.
#
# Note: with any of the above policies, Redis will return an error on write
#       operations, when there are no suitable keys for eviction.
#
#       At the date of writing these commands are: set setnx setex append
#       incr decr rpush lpush rpushx lpushx linsert lset rpoplpush sadd
#       sinter sinterstore sunion sunionstore sdiff sdiffstore zadd zincrby
#       zunionstore zinterstore hset hsetnx hmset hincrby incrby decrby
#       getset mset msetnx exec sort
#
# The default is:
#
# maxmemory-policy noeviction
maxmemory-policy allkeys-lru  //maxmemory-policy 配置內存不足時使用的策略。

內存不足時使用的策略使用說明

  • LRU(Least Recently Used最少最近使用 ):

     

  • LFU(Least Frequently Used訪問次數最少的優先剔除):

    咱們能夠經過使用 info memory 命令查看內存使用狀況

    127.0.0.1:6379> info memory
    # Memory
    used_memory:911136
    used_memory_human:889.78K  //已經使用內存    
    used_memory_rss:2678784
    used_memory_rss_human:2.55M
    used_memory_peak:911248
    used_memory_peak_human:889.89K
    used_memory_peak_perc:99.99%
    used_memory_overhead:910574
    used_memory_startup:860880
    used_memory_dataset:562
    used_memory_dataset_perc:1.12%
    allocator_allocated:878296
    allocator_active:2640896
    allocator_resident:2640896
    total_system_memory:1907941376
    total_system_memory_human:1.78G
    used_memory_lua:37888
    used_memory_lua_human:37.00K
    used_memory_scripts:0
    used_memory_scripts_human:0B
    number_of_cached_scripts:0
    maxmemory:104857600  //最大內存
    maxmemory_human:100.00M  //最大內存
    maxmemory_policy:allkeys-lru  //最大內存策略,使用lru策略
    allocator_frag_ratio:3.01
    allocator_frag_bytes:1762600
    allocator_rss_ratio:1.00
    allocator_rss_bytes:0
    rss_overhead_ratio:1.01
    rss_overhead_bytes:37888
    mem_fragmentation_ratio:3.05
    mem_fragmentation_bytes:1800488
    mem_not_counted_for_evict:0
    mem_replication_backlog:0
    mem_clients_slaves:0
    mem_clients_normal:49694
    mem_aof_buffer:0
    mem_allocator:libc
    active_defrag_running:0
    lazyfree_pending_objects:0
    View Code

     

18.二、sdk演示

sdk插入數據進行觀察內存變化,能夠看到始終保持100M的內存

class Program
{
    static void Main(string[] args)
    {
        ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.43.62:6379");

        var db = redis.GetDatabase(0);

        for (int i = 0; i < int.MaxValue; i++)
        {
            var val = string.Join(",", Enumerable.Range(0, 10000));

            db.StringSet(i.ToString(), val);

            Console.WriteLine($"當前{i} 塞入成功!");
        }


        Console.ReadKey();
    }
}
View Code

使用redis-cli查看

127.0.0.1:6379> info memory
# Memory
used_memory:104808424
used_memory_human:99.95M
used_memory_rss:171360256
used_memory_rss_human:163.42M
used_memory_peak:104857232
used_memory_peak_human:100.00M
used_memory_peak_perc:99.95%
used_memory_overhead:1110857
used_memory_startup:860896
used_memory_dataset:103697567
used_memory_dataset_perc:99.76%
allocator_allocated:104775568
allocator_active:171322368
allocator_resident:171322368
total_system_memory:1907941376
total_system_memory_human:1.78G
used_memory_lua:37888
used_memory_lua_human:37.00K
used_memory_scripts:0
used_memory_scripts_human:0B
number_of_cached_scripts:0
maxmemory:104857600
maxmemory_human:100.00M
maxmemory_policy:allkeys-lru
allocator_frag_ratio:1.64
allocator_frag_bytes:66546800
allocator_rss_ratio:1.00
allocator_rss_bytes:0
rss_overhead_ratio:1.00
rss_overhead_bytes:37888
mem_fragmentation_ratio:1.64
mem_fragmentation_bytes:66584688
mem_not_counted_for_evict:0
mem_replication_backlog:0
mem_clients_slaves:0
mem_clients_normal:132433
mem_aof_buffer:0
mem_allocator:libc
active_defrag_running:0
lazyfree_pending_objects:0
View Code

 

18.三、源碼簡要研究

redis 怎麼知道剔除呢? 它是根據時間來進行控制的,怎麼給對象安裝上時間的?咱們首先要查看 redisObject 對象中的 lru 

#define OBJ_SHARED_REFCOUNT INT_MAX
typedef struct redisObject {
    unsigned type:4;
    unsigned encoding:4;
    unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock 自動記錄系統時鐘) or
                            * LFU data (least significant 8 bits frequency
                            * and most significant 16 bits access time  記錄最少次數).getcommond的時候會給lru賦值 */
    int refcount;
    void *ptr;
} robj;

咱們接下來查看一下 getCommand 源碼

void getCommand(client *c) {
    getGenericCommand(c);
}

咱們接下來查看一下 getGenericCommand 源碼

int getGenericCommand(client *c) {
    robj *o;

    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL)
        return C_OK;

    if (o->type != OBJ_STRING) {
        addReply(c,shared.wrongtypeerr);
        return C_ERR;
    } else {
        addReplyBulk(c,o);
        return C_OK;
    }
}

咱們接下來查看一下 lookupKeyReadOrReply 源碼

robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) {
    robj *o = lookupKeyRead(c->db, key);
    if (!o) addReply(c,reply);
    return o;
}

咱們接下來查看一下 lookupKeyRead 源碼

/* Like lookupKeyReadWithFlags(), but does not use any flag, which is the
 * common case. */
robj *lookupKeyRead(redisDb *db, robj *key) {
    return lookupKeyReadWithFlags(db,key,LOOKUP_NONE);
}

咱們接下來查看一下 lookupKeyReadWithFlags 源碼

/* Lookup a key for read operations, or return NULL if the key is not found
 * in the specified DB.
 *
 * As a side effect of calling this function:
 * 1. A key gets expired if it reached it's TTL.
 * 2. The key last access time is updated.
 * 3. The global keys hits/misses stats are updated (reported in INFO).
 *
 * This API should not be used when we write to the key after obtaining
 * the object linked to the key, but only for read only operations.
 *
 * Flags change the behavior of this command:
 *
 *  LOOKUP_NONE (or zero): no special flags are passed.
 *  LOOKUP_NOTOUCH: don't alter the last access time of the key.
 *
 * Note: this function also returns NULL if the key is logically expired
 * but still existing, in case this is a slave, since this API is called only
 * for read operations. Even if the key expiry is master-driven, we can
 * correctly report a key is expired on slaves even if the master is lagging
 * expiring our key via DELs in the replication link. */
robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
    robj *val;

    if (expireIfNeeded(db,key) == 1) {
        /* Key expired. If we are in the context of a master, expireIfNeeded()
         * returns 0 only when the key does not exist at all, so it's safe
         * to return NULL ASAP. */
        if (server.masterhost == NULL) {
            server.stat_keyspace_misses++;
            return NULL;
        }

        /* However if we are in the context of a slave, expireIfNeeded() will
         * not really try to expire the key, it only returns information
         * about the "logical" status of the key: key expiring is up to the
         * master in order to have a consistent view of master's data set.
         *
         * However, if the command caller is not the master, and as additional
         * safety measure, the command invoked is a read-only command, we can
         * safely return NULL here, and provide a more consistent behavior
         * to clients accessign expired values in a read-only fashion, that
         * will say the key as non existing.
         *
         * Notably this covers GETs when slaves are used to scale reads. */
        if (server.current_client &&
            server.current_client != server.master &&
            server.current_client->cmd &&
            server.current_client->cmd->flags & CMD_READONLY)
        {
            server.stat_keyspace_misses++;
            return NULL;
        }
    }
    val = lookupKey(db,key,flags);
    if (val == NULL)
        server.stat_keyspace_misses++;
    else
        server.stat_keyspace_hits++;
    return val;
}

咱們接下來查看一下 lookupKey 源碼

/* Low level key lookup API, not actually called directly from commands
 * implementations that should instead rely on lookupKeyRead(),
 * lookupKeyWrite() and lookupKeyReadWithFlags(). */
robj *lookupKey(redisDb *db, robj *key, int flags) {
    dictEntry *de = dictFind(db->dict,key->ptr);
    if (de) {
        robj *val = dictGetVal(de);

        /* Update the access time for the ageing algorithm.
         * Don't do it if we have a saving child, as this will trigger
         * a copy on write madness. */
        if (server.rdb_child_pid == -1 &&
            server.aof_child_pid == -1 &&
            !(flags & LOOKUP_NOTOUCH))
        {
            if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {//若是是lfu會更新系統值
                updateLFU(val);
            } else {//若是是lru會賦值系統時鐘
                val->lru = LRU_CLOCK();
            }
        }
        return val;
    } else {
        return NULL;
    }
}

 

咱們接下來查看一下 createObject 源碼

robj *createObject(int type, void *ptr) {
    robj *o = zmalloc(sizeof(*o));
    o->type = type;
    o->encoding = OBJ_ENCODING_RAW;
    o->ptr = ptr;
    o->refcount = 1;

    /* Set the LRU to the current lruclock (minutes resolution), or
     * alternatively the LFU counter. */
         //分配lru時間
    if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
        o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL;
        
    } else {
        o->lru = LRU_CLOCK();
    }
    return o;
}

 

咱們接下來查看一下 freeMemoryIfNeeded 源碼

/* This function is periodically called to see if there is memory to free
 * according to the current "maxmemory" settings. In case we are over the
 * memory limit, the function will try to free some memory to return back
 * under the limit.
 *
 * The function returns C_OK if we are under the memory limit or if we
 * were over the limit, but the attempt to free memory was successful.
 * Otehrwise if we are over the memory limit, but not enough memory
 * was freed to return back under the limit, the function returns C_ERR. */
int freeMemoryIfNeeded(void) {
    /* By default replicas should ignore maxmemory
     * and just be masters exact copies. */
    if (server.masterhost && server.repl_slave_ignore_maxmemory) return C_OK;

    size_t mem_reported, mem_tofree, mem_freed;
    mstime_t latency, eviction_latency;
    long long delta;
    int slaves = listLength(server.slaves);

    /* When clients are paused the dataset should be static not just from the
     * POV of clients not being able to write, but also from the POV of
     * expires and evictions of keys not being performed. */
    if (clientsArePaused()) return C_OK;
    if (getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL) == C_OK)
        return C_OK;

    mem_freed = 0;

    if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION)
        goto cant_free; /* We need to free memory, but policy forbids. */

    latencyStartMonitor(latency);
    while (mem_freed < mem_tofree) {
        int j, k, i, keys_freed = 0;
        static unsigned int next_db = 0;
        sds bestkey = NULL;
        int bestdbid;
        redisDb *db;
        dict *dict;
        dictEntry *de;

        if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
            server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
        {
            struct evictionPoolEntry *pool = EvictionPoolLRU;

            while(bestkey == NULL) {
                unsigned long total_keys = 0, keys;

                /* We don't want to make local-db choices when expiring keys,
                 * so to start populate the eviction pool sampling keys from
                 * every DB. */
                for (i = 0; i < server.dbnum; i++) {//遍歷數據庫
                    db = server.db+i;
                    dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ?//若是ALLkeys,則從全部的key中去找,不然從待過時的時間中去找
                            db->dict : db->expires;
                    if ((keys = dictSize(dict)) != 0) {
                        evictionPoolPopulate(i, dict, db->dict, pool);
                        total_keys += keys;
                    }
                }
                if (!total_keys) break; /* No keys to evict. *///沒有key則退出

                /* Go backward from best to worst element to evict. */
                for (k = EVPOOL_SIZE-1; k >= 0; k--) {
                    if (pool[k].key == NULL) continue;
                    bestdbid = pool[k].dbid;

                    if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
                        de = dictFind(server.db[pool[k].dbid].dict,
                            pool[k].key);
                    } else {
                        de = dictFind(server.db[pool[k].dbid].expires,
                            pool[k].key);
                    }

                    /* Remove the entry from the pool. */
                    if (pool[k].key != pool[k].cached)
                        sdsfree(pool[k].key);
                    pool[k].key = NULL;
                    pool[k].idle = 0;

                    /* If the key exists, is our pick. Otherwise it is
                     * a ghost and we need to try the next element. */
                    if (de) {
                        bestkey = dictGetKey(de);
                        break;
                    } else {
                        /* Ghost... Iterate again. */
                    }
                }
            }
        }

        /* volatile-random and allkeys-random policy */
        else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM ||
                 server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM)
        {
            /* When evicting a random key, we try to evict a key for
             * each DB, so we use the static 'next_db' variable to
             * incrementally visit all DBs. */
            for (i = 0; i < server.dbnum; i++) {
                j = (++next_db) % server.dbnum;
                db = server.db+j;
                dict = (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ?
                        db->dict : db->expires;
                if (dictSize(dict) != 0) {
                    de = dictGetRandomKey(dict);
                    bestkey = dictGetKey(de);
                    bestdbid = j;
                    break;
                }
            }
        }

        /* Finally remove the selected key. *///最終移除選擇key
        if (bestkey) {
            db = server.db+bestdbid;
            robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
            propagateExpire(db,keyobj,server.lazyfree_lazy_eviction);
            /* We compute the amount of memory freed by db*Delete() alone.
             * It is possible that actually the memory needed to propagate
             * the DEL in AOF and replication link is greater than the one
             * we are freeing removing the key, but we can't account for
             * that otherwise we would never exit the loop.
             *
             * AOF and Output buffer memory will be freed eventually so
             * we only care about memory used by the key space. */
            delta = (long long) zmalloc_used_memory();
            latencyStartMonitor(eviction_latency);
            if (server.lazyfree_lazy_eviction)
                dbAsyncDelete(db,keyobj);
            else
                dbSyncDelete(db,keyobj);
            latencyEndMonitor(eviction_latency);
            latencyAddSampleIfNeeded("eviction-del",eviction_latency);
            latencyRemoveNestedEvent(latency,eviction_latency);
            delta -= (long long) zmalloc_used_memory();
            mem_freed += delta;
            server.stat_evictedkeys++;
            notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",
                keyobj, db->id);
            decrRefCount(keyobj);
            keys_freed++;

            /* When the memory to free starts to be big enough, we may
             * start spending so much time here that is impossible to
             * deliver data to the slaves fast enough, so we force the
             * transmission here inside the loop. */
            if (slaves) flushSlavesOutputBuffers();

            /* Normally our stop condition is the ability to release
             * a fixed, pre-computed amount of memory. However when we
             * are deleting objects in another thread, it's better to
             * check, from time to time, if we already reached our target
             * memory, since the "mem_freed" amount is computed only
             * across the dbAsyncDelete() call, while the thread can
             * release the memory all the time. */
            if (server.lazyfree_lazy_eviction && !(keys_freed % 16)) {
                if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) {
                    /* Let's satisfy our stop condition. */
                    mem_freed = mem_tofree;
                }
            }
        }

        if (!keys_freed) {
            latencyEndMonitor(latency);
            latencyAddSampleIfNeeded("eviction-cycle",latency);
            goto cant_free; /* nothing to free... */
        }
    }
    latencyEndMonitor(latency);
    latencyAddSampleIfNeeded("eviction-cycle",latency);
    return C_OK;

cant_free:
    /* We are here if we are not able to reclaim memory. There is only one
     * last thing we can try: check if the lazyfree thread has jobs in queue
     * and wait... */
    while(bioPendingJobsOfType(BIO_LAZY_FREE)) {
        if (((mem_reported - zmalloc_used_memory()) + mem_freed) >= mem_tofree)
            break;
        usleep(1000);
    }
    return C_ERR;
}
View Code

 

1九、【限流分佈鎖】限流和分佈式鎖的場景介紹及sdk代碼演示

19.一、Redis實現限流

  • 場景
    • 防爬蟲,限制某個接口的調用頻次。   (10個/s)
    • 限制並統計接口調用次數,按萬次收費。
  • 實現思想
    • 漏桶算法
      能夠有效的保護下游的系統。
    • 令牌桶算法
    • 區別
      漏桶算法是鐵定的恆定輸出。
      令牌桶算法是可支持短暫的忽然流量。
  • 實現
    • string 的 incr 或者 set/get 實現(lua腳本實現)
      令牌桶lua腳本(基於string的insr)
      -- redis的key
      -- keys[1]: ip
      local rediskey="rate.limit."..KEYS[1];
      local limit= redis.call("incr",rediskey);  --每個請求來了,我自增+1
      
      local isOk=1;
      
      -- 這是第一次加入
      if limit == 1 then
          redis.call("expire",rediskey,1);  -- 這個key只有1s的有效期
      else
          if limit >10 then  --每秒中 10 個令牌
              isOk=0;
          else
              redis.call("incr",rediskey);
          end
      end
      
      return isOk;
      View Code

      令牌桶lua腳本(基於get/set)

      local seconds=redis.call("time")[1];
      local rediskey="rate.limit."..seconds .."." ..KEYS[1];
      local limit= redis.call("get",rediskey);
      local isOk=1;
      
      -- 第一次加入
      if limit==false then
          redis.call("set",rediskey,1,"EX",1);  -- 10s 過時
      else
          if tonumber(limit) >10 then
              isOk=0;
          else
              redis.call("incr",rediskey);
          end
      end
      
      return isOk;
      View Code

      sdk執行腳本

      class Program
      {
          static void Main(string[] args)
          {
              ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.2.107:6379");
              var db = redis.GetDatabase(0);
      
              var script = File.ReadAllText("api.lua");
      
              while (true)
              {
                  var result= db.ScriptEvaluate(script, new RedisKey[1] { "192.168.1.1" });
      
                  Console.WriteLine(result);
      
                  //1秒大約是12個請求
                  Thread.Sleep(80);
              }
      
              Console.ReadKey();
          }
      }
      View Code
    • list 的 lpush 實現
      lua腳本
      -- redis的key
      local ip=KEYS[1];
      local rediskey="rate.limit."..ip;
      local limit= redis.call("llen",rediskey);
      local isOk=1;
      
      if limit > 10 then
          isOk=0;
      else    
          redis.call("lpush",rediskey,ip);
      
          if limit ==1 then
              redis.call("expire",rediskey,1);  -- 這個key只有1s的有效期
          end
      end
      
      return isOk;
      View Code

      sdk執行腳本

      class Program
      {
          static void Main(string[] args)
          {
              ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.2.107:6379");
              var db = redis.GetDatabase(0);
      
              var script = File.ReadAllText("api2.lua");
      
              while (true)
              {
                  var result= db.ScriptEvaluate(script, new RedisKey[1] { "192.168.1.1" });
      
                  Console.WriteLine(result);
      
                  //1秒大約是12個請求
                  Thread.Sleep(80);
              }
      
              Console.ReadKey();
          }
      }
      View Code

19.二、SDK中的分佈式鎖

官方網址參考:https://redis.io/topics/distlock

class Program
{
    static void Main(string[] args)
    {
        ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.2.107:6379");
        var db = redis.GetDatabase(0);

        var isTake = db.LockTake("lock", "12345", TimeSpan.FromMinutes(10));//獲取鎖
        if (isTake)
        {
            //TODO

            var isRelease = db.LockRelease("lock", "12345");//釋放鎖,釋放鎖的key和value要和獲取鎖保持一致
        }


        Console.ReadKey();
    }
}
View Code

SDK實現

class Program
{
    static void Main(string[] args)
    {
        ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.2.107:6379");
        var db = redis.GetDatabase(0);

        var isTake = db.LockTake("lock", "12345", TimeSpan.FromMinutes(10));//獲取鎖,期間會執行set lock 12345 EX 600 NX
        if (isTake)
        {
            //TODO

            var isRelease = db.LockRelease("lock", "12345");
            /*
             * 釋放鎖,釋放鎖的key和value要和獲取鎖保持一致
             * 期間會執行set lock 12345 EX 600 NX
             * watch lock
             * get lock
             * 而後根據獲取的lock進行下一步執行是否須要繼續執行,由於當你del的時候,你要肯定這個lock就是你當初建立lock
             * multi
             * del lock  
             * 最後
             * exec
            */
        }


        Console.ReadKey();
    }
}
View Code

20、【Stream】對流式處理的理解和經常使用的stream命令介紹

20.一、什麼叫作流式處理

  • 場景
    不少數據會隨着時間的推移價值大大流逝,因此須要實時計算。
  • 流式處理的特徵
    • 事先定義好計算模型。【機器學習 】  
    • 數據持續的輸入, 結果持續的輸出。
    • 例子1:金融風控:當前申請貸款的人: 是男是女,年齡多少,家裏幾套房,戶口,銀行流水怎樣,有沒有不良記錄等等。。。 結果要實時反饋。影像到貸款的多少。。
    • 例子2:實時預測:好比咱們的這類系統,當用戶下單以後儘快給用戶推送,根據訂單金額,提供貢獻度,所在地區,黑名單 等等,發送實時的 猜你喜歡,實施可能的二次回購。

20.二、理解Redis中的流結構Stream(5.0)

官方文檔參考:https://redis.io/topics/streams-intro

官方命令文檔:https://redis.io/commands#stream

The Stream is a new data type introduced with Redis 5.0, which models a log data structure in a more abstract way, however the essence of the log is still intact: like a log file, often implemented as a file open in append only mode, Redis streams are primarily an append only data structure. At least conceptually, because being Redis Streams an abstract data type represented in memory, they implement more powerful operations, to overcome the limits of the log file itself.
/*****************************/
Stream是Redis 5.0引入的一種新的數據類型,它以更抽象的方式對日誌數據結構進行建模,可是日誌的本質仍然是完整的:就像日誌文件同樣,Redis streams一般是以僅附加模式打開的文件,主要是一種僅附加的數據結構。至少在概念上,由於Redis Streams是內存中表示的抽象數據類型,因此它們實現了更強大的操做,以克服日誌文件自己的限制。

理解redis中的stream 模型

能夠看出,寫入是最後一條寫入,讀取的話按序號向下取

127.0.0.1:6379> xadd logs * c1 c1
"1549195730591-0"    //結構解析爲時間+序號
127.0.0.1:6379> xadd logs * c2 c2
"1549197993824-0"    //結構解析爲時間+序號
127.0.0.1:6379> xadd logs * c3 c3
"1549198001356-0"    //結構解析爲時間+序號
127.0.0.1:6379> xread streams logs 1549197993824-0    //讀取當前序號的下一條
1) 1) "logs"
   2) 1) 1) "1549198001356-0"
         2) 1) "c3"
            2) "c3"
127.0.0.1:6379> xread streams logs 0    //讀取全部,從0以後嘛
1) 1) "logs"
   2) 1) 1) "1549195730591-0"
         2) 1) "c1"
            2) "c1"
      2) 1) "1549197993824-0"
         2) 1) "c2"
            2) "c2"
      3) 1) "1549198001356-0"
         2) 1) "c3"
            2) "c3"
127.0.0.1:6379> xread streams logs $    //讀取最後一條以後,就是沒有咯
(nil)
127.0.0.1:6379> xread count 1 streams logs 0    //讀取第一行
1) 1) "logs"
   2) 1) 1) "1549195730591-0"
         2) 1) "c1"
            2) "c1"
127.0.0.1:6379> xread count 2 streams logs 0    //讀取前兩行
1) 1) "logs"
   2) 1) 1) "1549195730591-0"
         2) 1) "c1"
            2) "c1"
      2) 1) "1549197993824-0"
         2) 1) "c2"
            2) "c2"

20.三、SDK實現發佈訂閱與數據保持

  • 使用Xadd +  XRead 實現 發佈訂閱的功能

    redis:publish/subscribe (不存儲數據)

    stream: 存儲數據的。

  • SDK實現
    class Program
    {
        static void Main(string[] args)
        {
            ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.2.107:6379");
            var db = redis.GetDatabase(0);
    
            RedisValue nextID = (DateTime.Now.ToUniversalTime().Ticks - 621355968000000000) / 10000;//c#獲取毫秒數
    
            while (true)
            {
                var list = db.StreamRead("logs", nextID, 1);
    
                if (list.Length == 0)
                {
                    Thread.Sleep(10);
                    continue;
                }
    
                var single = list[0];
    
                //有點相似於遊標
                nextID = single.Id;
    
                Console.WriteLine(single.Values[0]);
            }
    
    
            Console.ReadKey();
        }
    }
    View Code

20.四、SDK實現xgroup, xreadgroup 實現多分組

127.0.0.1:6379> xrange logs - +
1) 1) "1549195730591-0"
   2) 1) "c1"
      2) "c1"
2) 1) "1549197993824-0"
   2) 1) "c2"
      2) "c2"
3) 1) "1549198001356-0"
   2) 1) "c3"
      2) "c3"
4) 1) "1549199460391-0"
   2) 1) "c4"
      2) "c4"
5) 1) "1549199719371-0"
   2) 1) "c5"
      2) "c5"
127.0.0.1:6379> xgroup create logs ctrip 1549197993824-0    //建立分組ctrip讀取,從c2開始
OK
127.0.0.1:6379> xreadgroup group ctrip jack streams logs >    //按分組日後讀取ctrip
1) 1) "logs"
   2) 1) 1) "1549198001356-0"
         2) 1) "c3"
            2) "c3"
      2) 1) "1549199460391-0"
         2) 1) "c4"
            2) "c4"
      3) 1) "1549199719371-0"
         2) 1) "c5"
            2) "c5"
127.0.0.1:6379> xinfo groups logs    //查看全部的組信息
1) 1) "name"
   2) "ctrip"
   3) "consumers"
   4) (integer) 1
   5) "pending"    //阻塞3個,是由於redis中流處理相似於rabbitmq中的ack機制
   6) (integer) 3
   7) "last-delivered-id"
   8) "1549199719371-0"
View Code

SDK實現

業務1模擬(mary)

class Program
{
    static void Main(string[] args)
    {
        ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.2.107:6379");
        var db = redis.GetDatabase(0);


        while (true)
        {
            var entrylist = db.StreamReadGroup("logs", "ctrip", "mary", ">", count: 1);

            if (entrylist.Length == 0)
            {
                Console.WriteLine("暫無數據!");
                Thread.Sleep(1000);
                continue;
            }

            var single = entrylist[0];

            Console.WriteLine(single.Values[0]);

            //提交給redis確認(ack)
            db.StreamAcknowledge("logs", "ctrip", single.Id);
        }


        Console.ReadKey();
    }
}
View Code

業務2模擬(Jack)

class Program
{
    static void Main(string[] args)
    {
        ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.2.107:6379");
        var db = redis.GetDatabase(0);


        while (true)
        {
            var entrylist = db.StreamReadGroup("logs", "ctrip", "jack", ">", count: 1);

            if (entrylist.Length == 0)
            {
                Console.WriteLine("暫無數據!");
                Thread.Sleep(1000);
                continue;
            }

            var single = entrylist[0];

            Console.WriteLine(single.Values[0]);

            //提交給redis確認(ack)
            db.StreamAcknowledge("logs", "ctrip", single.Id);
        }


        Console.ReadKey();
    }
}
View Code

模擬數據插入

class Program
{
    static void Main(string[] args)
    {
        ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.2.107:6379");
        var db = redis.GetDatabase(0);


        for (int i = 0; i < int.MaxValue; i++)
        {
            db.StreamAdd("logs", i, i);

            Thread.Sleep(200);
        }


        Console.ReadKey();
    }
}
View Code

效果展現:資源競爭處理

 

2一、【Module】模塊概念認知及sql on redis的RediSQL介紹

21.一、什麼是module

redis4.0以後支持了module,第三方機構能夠給予redis開發本身的module。經過module 給 redis添加新的數據類型,(bloomfilter,sql on redis)

This is a list of Redis modules, for Redis v4.0 or greater, ordered by Github stars. This list contains two set of modules: modules under an OSI approved license, and modules that are under some proprietary license. Non OSI modules are clearly flagged as not open source. Also to have the source code hosted at Github is currently mandatory. To add your module here please send a pull request for the modules.json file in the Redis-doc repository.
/**********************/
這是一個Redis模塊列表,適用於Redis v4.0或更高版本,由Github stars訂購。這個列表包含兩組模塊:OSI批准的許可下的模塊,以及一些私有許可下的模塊。非OSI模塊被明確標記爲非開源。同時,將源代碼託管在Github上也是必須的。要在這裏添加模塊,請發送模塊的拉取請求。Redis-doc存儲庫中的json文件。

官方文檔:https://redis.io/modules

21.二、實現一個簡單的module

  1. 引用redismodule.h 頭文件、初始化函數 RedisModule_Init(在源碼中,要拷貝進來)
    #ifndef REDISMODULE_H
    #define REDISMODULE_H
    
    #include <sys/types.h>
    #include <stdint.h>
    #include <stdio.h>
    
    /* ---------------- Defines common between core and modules --------------- */
    
    /* Error status return values. */
    #define REDISMODULE_OK 0
    #define REDISMODULE_ERR 1
    
    /* API versions. */
    #define REDISMODULE_APIVER_1 1
    
    /* API flags and constants */
    #define REDISMODULE_READ (1<<0)
    #define REDISMODULE_WRITE (1<<1)
    
    #define REDISMODULE_LIST_HEAD 0
    #define REDISMODULE_LIST_TAIL 1
    
    /* Key types. */
    #define REDISMODULE_KEYTYPE_EMPTY 0
    #define REDISMODULE_KEYTYPE_STRING 1
    #define REDISMODULE_KEYTYPE_LIST 2
    #define REDISMODULE_KEYTYPE_HASH 3
    #define REDISMODULE_KEYTYPE_SET 4
    #define REDISMODULE_KEYTYPE_ZSET 5
    #define REDISMODULE_KEYTYPE_MODULE 6
    
    /* Reply types. */
    #define REDISMODULE_REPLY_UNKNOWN -1
    #define REDISMODULE_REPLY_STRING 0
    #define REDISMODULE_REPLY_ERROR 1
    #define REDISMODULE_REPLY_INTEGER 2
    #define REDISMODULE_REPLY_ARRAY 3
    #define REDISMODULE_REPLY_NULL 4
    
    /* Postponed array length. */
    #define REDISMODULE_POSTPONED_ARRAY_LEN -1
    
    /* Expire */
    #define REDISMODULE_NO_EXPIRE -1
    
    /* Sorted set API flags. */
    #define REDISMODULE_ZADD_XX      (1<<0)
    #define REDISMODULE_ZADD_NX      (1<<1)
    #define REDISMODULE_ZADD_ADDED   (1<<2)
    #define REDISMODULE_ZADD_UPDATED (1<<3)
    #define REDISMODULE_ZADD_NOP     (1<<4)
    
    /* Hash API flags. */
    #define REDISMODULE_HASH_NONE       0
    #define REDISMODULE_HASH_NX         (1<<0)
    #define REDISMODULE_HASH_XX         (1<<1)
    #define REDISMODULE_HASH_CFIELDS    (1<<2)
    #define REDISMODULE_HASH_EXISTS     (1<<3)
    
    /* Context Flags: Info about the current context returned by
     * RM_GetContextFlags(). */
    
    /* The command is running in the context of a Lua script */
    #define REDISMODULE_CTX_FLAGS_LUA (1<<0)
    /* The command is running inside a Redis transaction */
    #define REDISMODULE_CTX_FLAGS_MULTI (1<<1)
    /* The instance is a master */
    #define REDISMODULE_CTX_FLAGS_MASTER (1<<2)
    /* The instance is a slave */
    #define REDISMODULE_CTX_FLAGS_SLAVE (1<<3)
    /* The instance is read-only (usually meaning it's a slave as well) */
    #define REDISMODULE_CTX_FLAGS_READONLY (1<<4)
    /* The instance is running in cluster mode */
    #define REDISMODULE_CTX_FLAGS_CLUSTER (1<<5)
    /* The instance has AOF enabled */
    #define REDISMODULE_CTX_FLAGS_AOF (1<<6)
    /* The instance has RDB enabled */
    #define REDISMODULE_CTX_FLAGS_RDB (1<<7)
    /* The instance has Maxmemory set */
    #define REDISMODULE_CTX_FLAGS_MAXMEMORY (1<<8)
    /* Maxmemory is set and has an eviction policy that may delete keys */
    #define REDISMODULE_CTX_FLAGS_EVICT (1<<9)
    /* Redis is out of memory according to the maxmemory flag. */
    #define REDISMODULE_CTX_FLAGS_OOM (1<<10)
    /* Less than 25% of memory available according to maxmemory. */
    #define REDISMODULE_CTX_FLAGS_OOM_WARNING (1<<11)
    
    #define REDISMODULE_NOTIFY_GENERIC (1<<2)     /* g */
    #define REDISMODULE_NOTIFY_STRING (1<<3)      /* $ */
    #define REDISMODULE_NOTIFY_LIST (1<<4)        /* l */
    #define REDISMODULE_NOTIFY_SET (1<<5)         /* s */
    #define REDISMODULE_NOTIFY_HASH (1<<6)        /* h */
    #define REDISMODULE_NOTIFY_ZSET (1<<7)        /* z */
    #define REDISMODULE_NOTIFY_EXPIRED (1<<8)     /* x */
    #define REDISMODULE_NOTIFY_EVICTED (1<<9)     /* e */
    #define REDISMODULE_NOTIFY_STREAM (1<<10)     /* t */
    #define REDISMODULE_NOTIFY_ALL (REDISMODULE_NOTIFY_GENERIC | REDISMODULE_NOTIFY_STRING | REDISMODULE_NOTIFY_LIST | REDISMODULE_NOTIFY_SET | REDISMODULE_NOTIFY_HASH | REDISMODULE_NOTIFY_ZSET | REDISMODULE_NOTIFY_EXPIRED | REDISMODULE_NOTIFY_EVICTED | REDISMODULE_NOTIFY_STREAM)      /* A */
    
    
    /* A special pointer that we can use between the core and the module to signal
     * field deletion, and that is impossible to be a valid pointer. */
    #define REDISMODULE_HASH_DELETE ((RedisModuleString*)(long)1)
    
    /* Error messages. */
    #define REDISMODULE_ERRORMSG_WRONGTYPE "WRONGTYPE Operation against a key holding the wrong kind of value"
    
    #define REDISMODULE_POSITIVE_INFINITE (1.0/0.0)
    #define REDISMODULE_NEGATIVE_INFINITE (-1.0/0.0)
    
    /* Cluster API defines. */
    #define REDISMODULE_NODE_ID_LEN 40
    #define REDISMODULE_NODE_MYSELF     (1<<0)
    #define REDISMODULE_NODE_MASTER     (1<<1)
    #define REDISMODULE_NODE_SLAVE      (1<<2)
    #define REDISMODULE_NODE_PFAIL      (1<<3)
    #define REDISMODULE_NODE_FAIL       (1<<4)
    #define REDISMODULE_NODE_NOFAILOVER (1<<5)
    
    #define REDISMODULE_CLUSTER_FLAG_NONE 0
    #define REDISMODULE_CLUSTER_FLAG_NO_FAILOVER (1<<1)
    #define REDISMODULE_CLUSTER_FLAG_NO_REDIRECTION (1<<2)
    
    #define REDISMODULE_NOT_USED(V) ((void) V)
    
    /* This type represents a timer handle, and is returned when a timer is
     * registered and used in order to invalidate a timer. It's just a 64 bit
     * number, because this is how each timer is represented inside the radix tree
     * of timers that are going to expire, sorted by expire time. */
    typedef uint64_t RedisModuleTimerID;
    
    /* ------------------------- End of common defines ------------------------ */
    
    #ifndef REDISMODULE_CORE
    
    typedef long long mstime_t;
    
    /* Incomplete structures for compiler checks but opaque access. */
    typedef struct RedisModuleCtx RedisModuleCtx;
    typedef struct RedisModuleKey RedisModuleKey;
    typedef struct RedisModuleString RedisModuleString;
    typedef struct RedisModuleCallReply RedisModuleCallReply;
    typedef struct RedisModuleIO RedisModuleIO;
    typedef struct RedisModuleType RedisModuleType;
    typedef struct RedisModuleDigest RedisModuleDigest;
    typedef struct RedisModuleBlockedClient RedisModuleBlockedClient;
    typedef struct RedisModuleClusterInfo RedisModuleClusterInfo;
    typedef struct RedisModuleDict RedisModuleDict;
    typedef struct RedisModuleDictIter RedisModuleDictIter;
    
    typedef int (*RedisModuleCmdFunc)(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
    typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc);
    typedef int (*RedisModuleNotificationFunc)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key);
    typedef void *(*RedisModuleTypeLoadFunc)(RedisModuleIO *rdb, int encver);
    typedef void (*RedisModuleTypeSaveFunc)(RedisModuleIO *rdb, void *value);
    typedef void (*RedisModuleTypeRewriteFunc)(RedisModuleIO *aof, RedisModuleString *key, void *value);
    typedef size_t (*RedisModuleTypeMemUsageFunc)(const void *value);
    typedef void (*RedisModuleTypeDigestFunc)(RedisModuleDigest *digest, void *value);
    typedef void (*RedisModuleTypeFreeFunc)(void *value);
    typedef void (*RedisModuleClusterMessageReceiver)(RedisModuleCtx *ctx, const char *sender_id, uint8_t type, const unsigned char *payload, uint32_t len);
    typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data);
    
    #define REDISMODULE_TYPE_METHOD_VERSION 1
    typedef struct RedisModuleTypeMethods {
        uint64_t version;
        RedisModuleTypeLoadFunc rdb_load;
        RedisModuleTypeSaveFunc rdb_save;
        RedisModuleTypeRewriteFunc aof_rewrite;
        RedisModuleTypeMemUsageFunc mem_usage;
        RedisModuleTypeDigestFunc digest;
        RedisModuleTypeFreeFunc free;
    } RedisModuleTypeMethods;
    
    #define REDISMODULE_GET_API(name) \
        RedisModule_GetApi("RedisModule_" #name, ((void **)&RedisModule_ ## name))
    
    #define REDISMODULE_API_FUNC(x) (*x)
    
    
    void *REDISMODULE_API_FUNC(RedisModule_Alloc)(size_t bytes);
    void *REDISMODULE_API_FUNC(RedisModule_Realloc)(void *ptr, size_t bytes);
    void REDISMODULE_API_FUNC(RedisModule_Free)(void *ptr);
    void *REDISMODULE_API_FUNC(RedisModule_Calloc)(size_t nmemb, size_t size);
    char *REDISMODULE_API_FUNC(RedisModule_Strdup)(const char *str);
    int REDISMODULE_API_FUNC(RedisModule_GetApi)(const char *, void *);
    int REDISMODULE_API_FUNC(RedisModule_CreateCommand)(RedisModuleCtx *ctx, const char *name, RedisModuleCmdFunc cmdfunc, const char *strflags, int firstkey, int lastkey, int keystep);
    void REDISMODULE_API_FUNC(RedisModule_SetModuleAttribs)(RedisModuleCtx *ctx, const char *name, int ver, int apiver);
    int REDISMODULE_API_FUNC(RedisModule_IsModuleNameBusy)(const char *name);
    int REDISMODULE_API_FUNC(RedisModule_WrongArity)(RedisModuleCtx *ctx);
    int REDISMODULE_API_FUNC(RedisModule_ReplyWithLongLong)(RedisModuleCtx *ctx, long long ll);
    int REDISMODULE_API_FUNC(RedisModule_GetSelectedDb)(RedisModuleCtx *ctx);
    int REDISMODULE_API_FUNC(RedisModule_SelectDb)(RedisModuleCtx *ctx, int newid);
    void *REDISMODULE_API_FUNC(RedisModule_OpenKey)(RedisModuleCtx *ctx, RedisModuleString *keyname, int mode);
    void REDISMODULE_API_FUNC(RedisModule_CloseKey)(RedisModuleKey *kp);
    int REDISMODULE_API_FUNC(RedisModule_KeyType)(RedisModuleKey *kp);
    size_t REDISMODULE_API_FUNC(RedisModule_ValueLength)(RedisModuleKey *kp);
    int REDISMODULE_API_FUNC(RedisModule_ListPush)(RedisModuleKey *kp, int where, RedisModuleString *ele);
    RedisModuleString *REDISMODULE_API_FUNC(RedisModule_ListPop)(RedisModuleKey *key, int where);
    RedisModuleCallReply *REDISMODULE_API_FUNC(RedisModule_Call)(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...);
    const char *REDISMODULE_API_FUNC(RedisModule_CallReplyProto)(RedisModuleCallReply *reply, size_t *len);
    void REDISMODULE_API_FUNC(RedisModule_FreeCallReply)(RedisModuleCallReply *reply);
    int REDISMODULE_API_FUNC(RedisModule_CallReplyType)(RedisModuleCallReply *reply);
    long long REDISMODULE_API_FUNC(RedisModule_CallReplyInteger)(RedisModuleCallReply *reply);
    size_t REDISMODULE_API_FUNC(RedisModule_CallReplyLength)(RedisModuleCallReply *reply);
    RedisModuleCallReply *REDISMODULE_API_FUNC(RedisModule_CallReplyArrayElement)(RedisModuleCallReply *reply, size_t idx);
    RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateString)(RedisModuleCtx *ctx, const char *ptr, size_t len);
    RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringFromLongLong)(RedisModuleCtx *ctx, long long ll);
    RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringFromString)(RedisModuleCtx *ctx, const RedisModuleString *str);
    RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringPrintf)(RedisModuleCtx *ctx, const char *fmt, ...);
    void REDISMODULE_API_FUNC(RedisModule_FreeString)(RedisModuleCtx *ctx, RedisModuleString *str);
    const char *REDISMODULE_API_FUNC(RedisModule_StringPtrLen)(const RedisModuleString *str, size_t *len);
    int REDISMODULE_API_FUNC(RedisModule_ReplyWithError)(RedisModuleCtx *ctx, const char *err);
    int REDISMODULE_API_FUNC(RedisModule_ReplyWithSimpleString)(RedisModuleCtx *ctx, const char *msg);
    int REDISMODULE_API_FUNC(RedisModule_ReplyWithArray)(RedisModuleCtx *ctx, long len);
    void REDISMODULE_API_FUNC(RedisModule_ReplySetArrayLength)(RedisModuleCtx *ctx, long len);
    int REDISMODULE_API_FUNC(RedisModule_ReplyWithStringBuffer)(RedisModuleCtx *ctx, const char *buf, size_t len);
    int REDISMODULE_API_FUNC(RedisModule_ReplyWithString)(RedisModuleCtx *ctx, RedisModuleString *str);
    int REDISMODULE_API_FUNC(RedisModule_ReplyWithNull)(RedisModuleCtx *ctx);
    int REDISMODULE_API_FUNC(RedisModule_ReplyWithDouble)(RedisModuleCtx *ctx, double d);
    int REDISMODULE_API_FUNC(RedisModule_ReplyWithCallReply)(RedisModuleCtx *ctx, RedisModuleCallReply *reply);
    int REDISMODULE_API_FUNC(RedisModule_StringToLongLong)(const RedisModuleString *str, long long *ll);
    int REDISMODULE_API_FUNC(RedisModule_StringToDouble)(const RedisModuleString *str, double *d);
    void REDISMODULE_API_FUNC(RedisModule_AutoMemory)(RedisModuleCtx *ctx);
    int REDISMODULE_API_FUNC(RedisModule_Replicate)(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...);
    int REDISMODULE_API_FUNC(RedisModule_ReplicateVerbatim)(RedisModuleCtx *ctx);
    const char *REDISMODULE_API_FUNC(RedisModule_CallReplyStringPtr)(RedisModuleCallReply *reply, size_t *len);
    RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringFromCallReply)(RedisModuleCallReply *reply);
    int REDISMODULE_API_FUNC(RedisModule_DeleteKey)(RedisModuleKey *key);
    int REDISMODULE_API_FUNC(RedisModule_UnlinkKey)(RedisModuleKey *key);
    int REDISMODULE_API_FUNC(RedisModule_StringSet)(RedisModuleKey *key, RedisModuleString *str);
    char *REDISMODULE_API_FUNC(RedisModule_StringDMA)(RedisModuleKey *key, size_t *len, int mode);
    int REDISMODULE_API_FUNC(RedisModule_StringTruncate)(RedisModuleKey *key, size_t newlen);
    mstime_t REDISMODULE_API_FUNC(RedisModule_GetExpire)(RedisModuleKey *key);
    int REDISMODULE_API_FUNC(RedisModule_SetExpire)(RedisModuleKey *key, mstime_t expire);
    int REDISMODULE_API_FUNC(RedisModule_ZsetAdd)(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr);
    int REDISMODULE_API_FUNC(RedisModule_ZsetIncrby)(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr, double *newscore);
    int REDISMODULE_API_FUNC(RedisModule_ZsetScore)(RedisModuleKey *key, RedisModuleString *ele, double *score);
    int REDISMODULE_API_FUNC(RedisModule_ZsetRem)(RedisModuleKey *key, RedisModuleString *ele, int *deleted);
    void REDISMODULE_API_FUNC(RedisModule_ZsetRangeStop)(RedisModuleKey *key);
    int REDISMODULE_API_FUNC(RedisModule_ZsetFirstInScoreRange)(RedisModuleKey *key, double min, double max, int minex, int maxex);
    int REDISMODULE_API_FUNC(RedisModule_ZsetLastInScoreRange)(RedisModuleKey *key, double min, double max, int minex, int maxex);
    int REDISMODULE_API_FUNC(RedisModule_ZsetFirstInLexRange)(RedisModuleKey *key, RedisModuleString *min, RedisModuleString *max);
    int REDISMODULE_API_FUNC(RedisModule_ZsetLastInLexRange)(RedisModuleKey *key, RedisModuleString *min, RedisModuleString *max);
    RedisModuleString *REDISMODULE_API_FUNC(RedisModule_ZsetRangeCurrentElement)(RedisModuleKey *key, double *score);
    int REDISMODULE_API_FUNC(RedisModule_ZsetRangeNext)(RedisModuleKey *key);
    int REDISMODULE_API_FUNC(RedisModule_ZsetRangePrev)(RedisModuleKey *key);
    int REDISMODULE_API_FUNC(RedisModule_ZsetRangeEndReached)(RedisModuleKey *key);
    int REDISMODULE_API_FUNC(RedisModule_HashSet)(RedisModuleKey *key, int flags, ...);
    int REDISMODULE_API_FUNC(RedisModule_HashGet)(RedisModuleKey *key, int flags, ...);
    int REDISMODULE_API_FUNC(RedisModule_IsKeysPositionRequest)(RedisModuleCtx *ctx);
    void REDISMODULE_API_FUNC(RedisModule_KeyAtPos)(RedisModuleCtx *ctx, int pos);
    unsigned long long REDISMODULE_API_FUNC(RedisModule_GetClientId)(RedisModuleCtx *ctx);
    int REDISMODULE_API_FUNC(RedisModule_GetContextFlags)(RedisModuleCtx *ctx);
    void *REDISMODULE_API_FUNC(RedisModule_PoolAlloc)(RedisModuleCtx *ctx, size_t bytes);
    RedisModuleType *REDISMODULE_API_FUNC(RedisModule_CreateDataType)(RedisModuleCtx *ctx, const char *name, int encver, RedisModuleTypeMethods *typemethods);
    int REDISMODULE_API_FUNC(RedisModule_ModuleTypeSetValue)(RedisModuleKey *key, RedisModuleType *mt, void *value);
    RedisModuleType *REDISMODULE_API_FUNC(RedisModule_ModuleTypeGetType)(RedisModuleKey *key);
    void *REDISMODULE_API_FUNC(RedisModule_ModuleTypeGetValue)(RedisModuleKey *key);
    void REDISMODULE_API_FUNC(RedisModule_SaveUnsigned)(RedisModuleIO *io, uint64_t value);
    uint64_t REDISMODULE_API_FUNC(RedisModule_LoadUnsigned)(RedisModuleIO *io);
    void REDISMODULE_API_FUNC(RedisModule_SaveSigned)(RedisModuleIO *io, int64_t value);
    int64_t REDISMODULE_API_FUNC(RedisModule_LoadSigned)(RedisModuleIO *io);
    void REDISMODULE_API_FUNC(RedisModule_EmitAOF)(RedisModuleIO *io, const char *cmdname, const char *fmt, ...);
    void REDISMODULE_API_FUNC(RedisModule_SaveString)(RedisModuleIO *io, RedisModuleString *s);
    void REDISMODULE_API_FUNC(RedisModule_SaveStringBuffer)(RedisModuleIO *io, const char *str, size_t len);
    RedisModuleString *REDISMODULE_API_FUNC(RedisModule_LoadString)(RedisModuleIO *io);
    char *REDISMODULE_API_FUNC(RedisModule_LoadStringBuffer)(RedisModuleIO *io, size_t *lenptr);
    void REDISMODULE_API_FUNC(RedisModule_SaveDouble)(RedisModuleIO *io, double value);
    double REDISMODULE_API_FUNC(RedisModule_LoadDouble)(RedisModuleIO *io);
    void REDISMODULE_API_FUNC(RedisModule_SaveFloat)(RedisModuleIO *io, float value);
    float REDISMODULE_API_FUNC(RedisModule_LoadFloat)(RedisModuleIO *io);
    void REDISMODULE_API_FUNC(RedisModule_Log)(RedisModuleCtx *ctx, const char *level, const char *fmt, ...);
    void REDISMODULE_API_FUNC(RedisModule_LogIOError)(RedisModuleIO *io, const char *levelstr, const char *fmt, ...);
    int REDISMODULE_API_FUNC(RedisModule_StringAppendBuffer)(RedisModuleCtx *ctx, RedisModuleString *str, const char *buf, size_t len);
    void REDISMODULE_API_FUNC(RedisModule_RetainString)(RedisModuleCtx *ctx, RedisModuleString *str);
    int REDISMODULE_API_FUNC(RedisModule_StringCompare)(RedisModuleString *a, RedisModuleString *b);
    RedisModuleCtx *REDISMODULE_API_FUNC(RedisModule_GetContextFromIO)(RedisModuleIO *io);
    long long REDISMODULE_API_FUNC(RedisModule_Milliseconds)(void);
    void REDISMODULE_API_FUNC(RedisModule_DigestAddStringBuffer)(RedisModuleDigest *md, unsigned char *ele, size_t len);
    void REDISMODULE_API_FUNC(RedisModule_DigestAddLongLong)(RedisModuleDigest *md, long long ele);
    void REDISMODULE_API_FUNC(RedisModule_DigestEndSequence)(RedisModuleDigest *md);
    RedisModuleDict *REDISMODULE_API_FUNC(RedisModule_CreateDict)(RedisModuleCtx *ctx);
    void REDISMODULE_API_FUNC(RedisModule_FreeDict)(RedisModuleCtx *ctx, RedisModuleDict *d);
    uint64_t REDISMODULE_API_FUNC(RedisModule_DictSize)(RedisModuleDict *d);
    int REDISMODULE_API_FUNC(RedisModule_DictSetC)(RedisModuleDict *d, void *key, size_t keylen, void *ptr);
    int REDISMODULE_API_FUNC(RedisModule_DictReplaceC)(RedisModuleDict *d, void *key, size_t keylen, void *ptr);
    int REDISMODULE_API_FUNC(RedisModule_DictSet)(RedisModuleDict *d, RedisModuleString *key, void *ptr);
    int REDISMODULE_API_FUNC(RedisModule_DictReplace)(RedisModuleDict *d, RedisModuleString *key, void *ptr);
    void *REDISMODULE_API_FUNC(RedisModule_DictGetC)(RedisModuleDict *d, void *key, size_t keylen, int *nokey);
    void *REDISMODULE_API_FUNC(RedisModule_DictGet)(RedisModuleDict *d, RedisModuleString *key, int *nokey);
    int REDISMODULE_API_FUNC(RedisModule_DictDelC)(RedisModuleDict *d, void *key, size_t keylen, void *oldval);
    int REDISMODULE_API_FUNC(RedisModule_DictDel)(RedisModuleDict *d, RedisModuleString *key, void *oldval);
    RedisModuleDictIter *REDISMODULE_API_FUNC(RedisModule_DictIteratorStartC)(RedisModuleDict *d, const char *op, void *key, size_t keylen);
    RedisModuleDictIter *REDISMODULE_API_FUNC(RedisModule_DictIteratorStart)(RedisModuleDict *d, const char *op, RedisModuleString *key);
    void REDISMODULE_API_FUNC(RedisModule_DictIteratorStop)(RedisModuleDictIter *di);
    int REDISMODULE_API_FUNC(RedisModule_DictIteratorReseekC)(RedisModuleDictIter *di, const char *op, void *key, size_t keylen);
    int REDISMODULE_API_FUNC(RedisModule_DictIteratorReseek)(RedisModuleDictIter *di, const char *op, RedisModuleString *key);
    void *REDISMODULE_API_FUNC(RedisModule_DictNextC)(RedisModuleDictIter *di, size_t *keylen, void **dataptr);
    void *REDISMODULE_API_FUNC(RedisModule_DictPrevC)(RedisModuleDictIter *di, size_t *keylen, void **dataptr);
    RedisModuleString *REDISMODULE_API_FUNC(RedisModule_DictNext)(RedisModuleCtx *ctx, RedisModuleDictIter *di, void **dataptr);
    RedisModuleString *REDISMODULE_API_FUNC(RedisModule_DictPrev)(RedisModuleCtx *ctx, RedisModuleDictIter *di, void **dataptr);
    int REDISMODULE_API_FUNC(RedisModule_DictCompareC)(RedisModuleDictIter *di, const char *op, void *key, size_t keylen);
    int REDISMODULE_API_FUNC(RedisModule_DictCompare)(RedisModuleDictIter *di, const char *op, RedisModuleString *key);
    
    /* Experimental APIs */
    #ifdef REDISMODULE_EXPERIMENTAL_API
    #define REDISMODULE_EXPERIMENTAL_API_VERSION 3
    RedisModuleBlockedClient *REDISMODULE_API_FUNC(RedisModule_BlockClient)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms);
    int REDISMODULE_API_FUNC(RedisModule_UnblockClient)(RedisModuleBlockedClient *bc, void *privdata);
    int REDISMODULE_API_FUNC(RedisModule_IsBlockedReplyRequest)(RedisModuleCtx *ctx);
    int REDISMODULE_API_FUNC(RedisModule_IsBlockedTimeoutRequest)(RedisModuleCtx *ctx);
    void *REDISMODULE_API_FUNC(RedisModule_GetBlockedClientPrivateData)(RedisModuleCtx *ctx);
    RedisModuleBlockedClient *REDISMODULE_API_FUNC(RedisModule_GetBlockedClientHandle)(RedisModuleCtx *ctx);
    int REDISMODULE_API_FUNC(RedisModule_AbortBlock)(RedisModuleBlockedClient *bc);
    RedisModuleCtx *REDISMODULE_API_FUNC(RedisModule_GetThreadSafeContext)(RedisModuleBlockedClient *bc);
    void REDISMODULE_API_FUNC(RedisModule_FreeThreadSafeContext)(RedisModuleCtx *ctx);
    void REDISMODULE_API_FUNC(RedisModule_ThreadSafeContextLock)(RedisModuleCtx *ctx);
    void REDISMODULE_API_FUNC(RedisModule_ThreadSafeContextUnlock)(RedisModuleCtx *ctx);
    int REDISMODULE_API_FUNC(RedisModule_SubscribeToKeyspaceEvents)(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc cb);
    int REDISMODULE_API_FUNC(RedisModule_BlockedClientDisconnected)(RedisModuleCtx *ctx);
    void REDISMODULE_API_FUNC(RedisModule_RegisterClusterMessageReceiver)(RedisModuleCtx *ctx, uint8_t type, RedisModuleClusterMessageReceiver callback);
    int REDISMODULE_API_FUNC(RedisModule_SendClusterMessage)(RedisModuleCtx *ctx, char *target_id, uint8_t type, unsigned char *msg, uint32_t len);
    int REDISMODULE_API_FUNC(RedisModule_GetClusterNodeInfo)(RedisModuleCtx *ctx, const char *id, char *ip, char *master_id, int *port, int *flags);
    char **REDISMODULE_API_FUNC(RedisModule_GetClusterNodesList)(RedisModuleCtx *ctx, size_t *numnodes);
    void REDISMODULE_API_FUNC(RedisModule_FreeClusterNodesList)(char **ids);
    RedisModuleTimerID REDISMODULE_API_FUNC(RedisModule_CreateTimer)(RedisModuleCtx *ctx, mstime_t period, RedisModuleTimerProc callback, void *data);
    int REDISMODULE_API_FUNC(RedisModule_StopTimer)(RedisModuleCtx *ctx, RedisModuleTimerID id, void **data);
    int REDISMODULE_API_FUNC(RedisModule_GetTimerInfo)(RedisModuleCtx *ctx, RedisModuleTimerID id, uint64_t *remaining, void **data);
    const char *REDISMODULE_API_FUNC(RedisModule_GetMyClusterID)(void);
    size_t REDISMODULE_API_FUNC(RedisModule_GetClusterSize)(void);
    void REDISMODULE_API_FUNC(RedisModule_GetRandomBytes)(unsigned char *dst, size_t len);
    void REDISMODULE_API_FUNC(RedisModule_GetRandomHexChars)(char *dst, size_t len);
    void REDISMODULE_API_FUNC(RedisModule_SetDisconnectCallback)(RedisModuleBlockedClient *bc, RedisModuleDisconnectFunc callback);
    void REDISMODULE_API_FUNC(RedisModule_SetClusterFlags)(RedisModuleCtx *ctx, uint64_t flags);
    #endif
    
    /* This is included inline inside each Redis module. */
    static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int apiver) __attribute__((unused));
    static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int apiver) {
        void *getapifuncptr = ((void**)ctx)[0];
        RedisModule_GetApi = (int (*)(const char *, void *)) (unsigned long)getapifuncptr;
        REDISMODULE_GET_API(Alloc);
        REDISMODULE_GET_API(Calloc);
        REDISMODULE_GET_API(Free);
        REDISMODULE_GET_API(Realloc);
        REDISMODULE_GET_API(Strdup);
        REDISMODULE_GET_API(CreateCommand);
        REDISMODULE_GET_API(SetModuleAttribs);
        REDISMODULE_GET_API(IsModuleNameBusy);
        REDISMODULE_GET_API(WrongArity);
        REDISMODULE_GET_API(ReplyWithLongLong);
        REDISMODULE_GET_API(ReplyWithError);
        REDISMODULE_GET_API(ReplyWithSimpleString);
        REDISMODULE_GET_API(ReplyWithArray);
        REDISMODULE_GET_API(ReplySetArrayLength);
        REDISMODULE_GET_API(ReplyWithStringBuffer);
        REDISMODULE_GET_API(ReplyWithString);
        REDISMODULE_GET_API(ReplyWithNull);
        REDISMODULE_GET_API(ReplyWithCallReply);
        REDISMODULE_GET_API(ReplyWithDouble);
        REDISMODULE_GET_API(ReplySetArrayLength);
        REDISMODULE_GET_API(GetSelectedDb);
        REDISMODULE_GET_API(SelectDb);
        REDISMODULE_GET_API(OpenKey);
        REDISMODULE_GET_API(CloseKey);
        REDISMODULE_GET_API(KeyType);
        REDISMODULE_GET_API(ValueLength);
        REDISMODULE_GET_API(ListPush);
        REDISMODULE_GET_API(ListPop);
        REDISMODULE_GET_API(StringToLongLong);
        REDISMODULE_GET_API(StringToDouble);
        REDISMODULE_GET_API(Call);
        REDISMODULE_GET_API(CallReplyProto);
        REDISMODULE_GET_API(FreeCallReply);
        REDISMODULE_GET_API(CallReplyInteger);
        REDISMODULE_GET_API(CallReplyType);
        REDISMODULE_GET_API(CallReplyLength);
        REDISMODULE_GET_API(CallReplyArrayElement);
        REDISMODULE_GET_API(CallReplyStringPtr);
        REDISMODULE_GET_API(CreateStringFromCallReply);
        REDISMODULE_GET_API(CreateString);
        REDISMODULE_GET_API(CreateStringFromLongLong);
        REDISMODULE_GET_API(CreateStringFromString);
        REDISMODULE_GET_API(CreateStringPrintf);
        REDISMODULE_GET_API(FreeString);
        REDISMODULE_GET_API(StringPtrLen);
        REDISMODULE_GET_API(AutoMemory);
        REDISMODULE_GET_API(Replicate);
        REDISMODULE_GET_API(ReplicateVerbatim);
        REDISMODULE_GET_API(DeleteKey);
        REDISMODULE_GET_API(UnlinkKey);
        REDISMODULE_GET_API(StringSet);
        REDISMODULE_GET_API(StringDMA);
        REDISMODULE_GET_API(StringTruncate);
        REDISMODULE_GET_API(GetExpire);
        REDISMODULE_GET_API(SetExpire);
        REDISMODULE_GET_API(ZsetAdd);
        REDISMODULE_GET_API(ZsetIncrby);
        REDISMODULE_GET_API(ZsetScore);
        REDISMODULE_GET_API(ZsetRem);
        REDISMODULE_GET_API(ZsetRangeStop);
        REDISMODULE_GET_API(ZsetFirstInScoreRange);
        REDISMODULE_GET_API(ZsetLastInScoreRange);
        REDISMODULE_GET_API(ZsetFirstInLexRange);
        REDISMODULE_GET_API(ZsetLastInLexRange);
        REDISMODULE_GET_API(ZsetRangeCurrentElement);
        REDISMODULE_GET_API(ZsetRangeNext);
        REDISMODULE_GET_API(ZsetRangePrev);
        REDISMODULE_GET_API(ZsetRangeEndReached);
        REDISMODULE_GET_API(HashSet);
        REDISMODULE_GET_API(HashGet);
        REDISMODULE_GET_API(IsKeysPositionRequest);
        REDISMODULE_GET_API(KeyAtPos);
        REDISMODULE_GET_API(GetClientId);
        REDISMODULE_GET_API(GetContextFlags);
        REDISMODULE_GET_API(PoolAlloc);
        REDISMODULE_GET_API(CreateDataType);
        REDISMODULE_GET_API(ModuleTypeSetValue);
        REDISMODULE_GET_API(ModuleTypeGetType);
        REDISMODULE_GET_API(ModuleTypeGetValue);
        REDISMODULE_GET_API(SaveUnsigned);
        REDISMODULE_GET_API(LoadUnsigned);
        REDISMODULE_GET_API(SaveSigned);
        REDISMODULE_GET_API(LoadSigned);
        REDISMODULE_GET_API(SaveString);
        REDISMODULE_GET_API(SaveStringBuffer);
        REDISMODULE_GET_API(LoadString);
        REDISMODULE_GET_API(LoadStringBuffer);
        REDISMODULE_GET_API(SaveDouble);
        REDISMODULE_GET_API(LoadDouble);
        REDISMODULE_GET_API(SaveFloat);
        REDISMODULE_GET_API(LoadFloat);
        REDISMODULE_GET_API(EmitAOF);
        REDISMODULE_GET_API(Log);
        REDISMODULE_GET_API(LogIOError);
        REDISMODULE_GET_API(StringAppendBuffer);
        REDISMODULE_GET_API(RetainString);
        REDISMODULE_GET_API(StringCompare);
        REDISMODULE_GET_API(GetContextFromIO);
        REDISMODULE_GET_API(Milliseconds);
        REDISMODULE_GET_API(DigestAddStringBuffer);
        REDISMODULE_GET_API(DigestAddLongLong);
        REDISMODULE_GET_API(DigestEndSequence);
        REDISMODULE_GET_API(CreateDict);
        REDISMODULE_GET_API(FreeDict);
        REDISMODULE_GET_API(DictSize);
        REDISMODULE_GET_API(DictSetC);
        REDISMODULE_GET_API(DictReplaceC);
        REDISMODULE_GET_API(DictSet);
        REDISMODULE_GET_API(DictReplace);
        REDISMODULE_GET_API(DictGetC);
        REDISMODULE_GET_API(DictGet);
        REDISMODULE_GET_API(DictDelC);
        REDISMODULE_GET_API(DictDel);
        REDISMODULE_GET_API(DictIteratorStartC);
        REDISMODULE_GET_API(DictIteratorStart);
        REDISMODULE_GET_API(DictIteratorStop);
        REDISMODULE_GET_API(DictIteratorReseekC);
        REDISMODULE_GET_API(DictIteratorReseek);
        REDISMODULE_GET_API(DictNextC);
        REDISMODULE_GET_API(DictPrevC);
        REDISMODULE_GET_API(DictNext);
        REDISMODULE_GET_API(DictPrev);
        REDISMODULE_GET_API(DictCompare);
        REDISMODULE_GET_API(DictCompareC);
    
    #ifdef REDISMODULE_EXPERIMENTAL_API
        REDISMODULE_GET_API(GetThreadSafeContext);
        REDISMODULE_GET_API(FreeThreadSafeContext);
        REDISMODULE_GET_API(ThreadSafeContextLock);
        REDISMODULE_GET_API(ThreadSafeContextUnlock);
        REDISMODULE_GET_API(BlockClient);
        REDISMODULE_GET_API(UnblockClient);
        REDISMODULE_GET_API(IsBlockedReplyRequest);
        REDISMODULE_GET_API(IsBlockedTimeoutRequest);
        REDISMODULE_GET_API(GetBlockedClientPrivateData);
        REDISMODULE_GET_API(GetBlockedClientHandle);
        REDISMODULE_GET_API(AbortBlock);
        REDISMODULE_GET_API(SetDisconnectCallback);
        REDISMODULE_GET_API(SubscribeToKeyspaceEvents);
        REDISMODULE_GET_API(BlockedClientDisconnected);
        REDISMODULE_GET_API(RegisterClusterMessageReceiver);
        REDISMODULE_GET_API(SendClusterMessage);
        REDISMODULE_GET_API(GetClusterNodeInfo);
        REDISMODULE_GET_API(GetClusterNodesList);
        REDISMODULE_GET_API(FreeClusterNodesList);
        REDISMODULE_GET_API(CreateTimer);
        REDISMODULE_GET_API(StopTimer);
        REDISMODULE_GET_API(GetTimerInfo);
        REDISMODULE_GET_API(GetMyClusterID);
        REDISMODULE_GET_API(GetClusterSize);
        REDISMODULE_GET_API(GetRandomBytes);
        REDISMODULE_GET_API(GetRandomHexChars);
        REDISMODULE_GET_API(SetClusterFlags);
    #endif
    
        if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR;
        RedisModule_SetModuleAttribs(ctx,name,ver,apiver);
        return REDISMODULE_OK;
    }
    
    #else
    
    /* Things only defined for the modules core, not exported to modules
     * including this file. */
    #define RedisModuleString robj
    
    #endif /* REDISMODULE_CORE */
    #endif /* REDISMOUDLE_H */
    View Code
  2. 實現入口函數 RedisModule_OnLoad 、實現本身的自定義函數 xxxx_createmand  (module.c
    #include "redismodule.h"
    #include <stdlib.h>
    
    // 你的業務代碼
    int MyRand_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
        RedisModule_ReplyWithLongLong(ctx,rand());
        return REDISMODULE_OK;
    }
    
    // redis 加載 module 的入口函數
    int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
    
        //給你的module定義 名稱和版本信息
        if (RedisModule_Init(ctx,"ctrip",1,REDISMODULE_APIVER_1) == REDISMODULE_ERR)
                return REDISMODULE_ERR;
    
        if (RedisModule_CreateCommand(ctx,"ctrip.rand",MyRand_RedisCommand,"",1,1,1) == REDISMODULE_ERR)
            return REDISMODULE_ERR;
    
        return REDISMODULE_OK;
    }
    View Code
  3. 編寫編譯文件Makefile
    DEBUGFLAGS = -g -ggdb -O2
    ifeq ($(DEBUG), 1)
        DEBUGFLAGS = -g -ggdb -O0
    endif
    
    # find the OS
    uname_S := $(shell sh -c 'uname -s 2>/dev/null || echo not')
    CFLAGS =  -Wall -Wno-unused-function $(DEBUGFLAGS) -fPIC -std=gnu99 -D_GNU_SOURCE
    CC:=$(shell sh -c 'type $(CC) >/dev/null 2>/dev/null && echo $(CC) || echo gcc')
    
    # Compile flags for linux / osx
    ifeq ($(uname_S),Linux)
        SHOBJ_CFLAGS ?=  -fno-common -g -ggdb
        SHOBJ_LDFLAGS ?= -shared -Bsymbolic -Bsymbolic-functions
    else
        CFLAGS += -mmacosx-version-min=10.6
        SHOBJ_CFLAGS ?= -dynamic -fno-common -g -ggdb
        SHOBJ_LDFLAGS ?= -dylib -exported_symbol _RedisModule_OnLoad -macosx_version_min 10.6
    endif
    
    SOURCEDIR=$(shell pwd -P)
    CC_SOURCES = $(wildcard $(SOURCEDIR)/*.c) $(wildcard $(SOURCEDIR)/dep/*.c)
    CC_OBJECTS = $(patsubst $(SOURCEDIR)/%.c, $(SOURCEDIR)/%.o, $(CC_SOURCES))
    
    all: ctrip.so
    
    ctrip.so: $(CC_OBJECTS)
        $(LD) -o $@ $(CC_OBJECTS) $(SHOBJ_LDFLAGS) -lc
    
    clean:
        rm -rvf *.xo *.so *.o *.a
    View Code
  4. 執行 rz 命令將 redismodule.h 、 module.c 、 Makefile 文件拷貝到redis文件夾下新建的module文件夾下,而後使用 make 編譯一下,而後生成 ctrip.so 動態庫
    [root@localhost redis]# mkdir module
    [root@localhost redis]# cd module/
    [root@localhost module]# rz
    
    [root@localhost module]# ls
    Makefile  module.c  redismodule.h
    [root@localhost module]# make
    cc -Wall -Wno-unused-function -g -ggdb -O2 -fPIC -std=gnu99 -D_GNU_SOURCE   -c -o /data/redis/module/module.o /data/redis/module/module.c
    ld -o ctrip.so  /data/redis/module/module.o -shared -Bsymbolic -Bsymbolic-functions -lc
    [root@localhost module]# ls
    ctrip.so  Makefile  module.c  module.o  redismodule.h
    [root@localhost module]# pwd
    /data/redis/module
    View Code
  5. redis啓動的時候加載module
    ./redis-server ./redis.conf --loadmodule ./module/ctrip.so
  6. 客戶端使用
    [root@localhost redis]# ./redis-cli 
    127.0.0.1:6379> ctrip.rand
    (integer) 1445136292
    127.0.0.1:6379> ctrip.rand
    (integer) 475099848
    127.0.0.1:6379> ctrip.rand
    (integer) 2031241573
    View Code
  7. 其餘命令
    127.0.0.1:6379> module list    //查看module命令
    1) 1) "name"
       2) "ctrip"
       3) "ver"
       4) (integer) 1
    127.0.0.1:6379> module unload ctrip    //卸載module
    OK
    127.0.0.1:6379> module load /data/redis/ctrip.so    //加載module
    (error) ERR Error loading the extension. Please check the server logs.
    127.0.0.1:6379> module load /data/redis/module/ctrip.so
    OK

21.三、rediSql的安裝和使用

官方地址:https://github.com/RedBeardLab/rediSQL

下載v0.7.1版本:https://github.com/RedBeardLab/rediSQL/releases/download/v0.7.1/rediSQL_0.7.1.so

將文件拷貝到文件夾並加載

127.0.0.1:6379> module load /data/redis/rediSQL_0.7.1.so
OK

客戶端執行語句

127.0.0.1:6379> REDISQL.CREATE_DB DB    //建立數據庫
OK
127.0.0.1:6379> REDISQL.EXEC DB "CREATE TABLE person(id int, username text);"    //建立person表
1) DONE
2) (integer) 0
127.0.0.1:6379> REDISQL.EXEC DB "INSERT INTO person VALUES(1,'jack');"    //插入person表數據
1) DONE
2) (integer) 1
127.0.0.1:6379> REDISQL.EXEC DB "INSERT INTO person VALUES(1,'mary');"
1) DONE
2) (integer) 1
127.0.0.1:6379> REDISQL.EXEC DB "select * from person;"    //查詢person表數據
1) 1) (integer) 1
   2) "jack"
2) 1) (integer) 1
   2) "mary"

2二、【監控】使用es+kibana+metricsbeat對redis進行監控

22.一、如何監控redis

  • info命令https://redis.io/commands/info
    能夠經過info命令查看
    127.0.0.1:6379> info
    # Server
    redis_version:5.0.3
    redis_git_sha1:00000000
    redis_git_dirty:0
    redis_build_id:7e97aa5c23979213
    redis_mode:standalone
    os:Linux 3.10.0-957.el7.x86_64 x86_64
    arch_bits:64
    multiplexing_api:epoll
    atomicvar_api:atomic-builtin
    gcc_version:4.8.5
    process_id:19834
    run_id:d50dc9ef71d45e3ee56bf5bd20d6a55e539fd476
    tcp_port:6379
    uptime_in_seconds:1546
    uptime_in_days:0
    hz:10
    configured_hz:10
    lru_clock:5845738
    executable:/data/redis/./redis-server
    config_file:/data/redis/./redis.conf
    
    # Clients
    connected_clients:1
    client_recent_max_input_buffer:2
    client_recent_max_output_buffer:0
    blocked_clients:0
    
    # Memory
    used_memory:913320
    used_memory_human:891.91K
    used_memory_rss:3870720
    used_memory_rss_human:3.69M
    used_memory_peak:913320
    used_memory_peak_human:891.91K
    used_memory_peak_perc:100.11%
    used_memory_overhead:910838
    used_memory_startup:861072
    used_memory_dataset:2482
    used_memory_dataset_perc:4.75%
    allocator_allocated:879528
    allocator_active:3832832
    allocator_resident:3832832
    total_system_memory:1907941376
    total_system_memory_human:1.78G
    used_memory_lua:37888
    used_memory_lua_human:37.00K
    used_memory_scripts:0
    used_memory_scripts_human:0B
    number_of_cached_scripts:0
    maxmemory:104857600
    maxmemory_human:100.00M
    maxmemory_policy:allkeys-lru
    allocator_frag_ratio:4.36
    allocator_frag_bytes:2953304
    allocator_rss_ratio:1.00
    allocator_rss_bytes:0
    rss_overhead_ratio:1.01
    rss_overhead_bytes:37888
    mem_fragmentation_ratio:4.40
    mem_fragmentation_bytes:2991192
    mem_not_counted_for_evict:0
    mem_replication_backlog:0
    mem_clients_slaves:0
    mem_clients_normal:49694
    mem_aof_buffer:0
    mem_allocator:libc
    active_defrag_running:0
    lazyfree_pending_objects:0
    
    # Persistence
    loading:0
    rdb_changes_since_last_save:0
    rdb_bgsave_in_progress:0
    rdb_last_save_time:1549349088
    rdb_last_bgsave_status:ok
    rdb_last_bgsave_time_sec:0
    rdb_current_bgsave_time_sec:-1
    rdb_last_cow_size:557056
    aof_enabled:0
    aof_rewrite_in_progress:0
    aof_rewrite_scheduled:0
    aof_last_rewrite_time_sec:-1
    aof_current_rewrite_time_sec:-1
    aof_last_bgrewrite_status:ok
    aof_last_write_status:ok
    aof_last_cow_size:0
    
    # Stats
    total_connections_received:3
    total_commands_processed:17
    instantaneous_ops_per_sec:0
    total_net_input_bytes:772
    total_net_output_bytes:34922
    instantaneous_input_kbps:0.00
    instantaneous_output_kbps:0.00
    rejected_connections:0
    sync_full:0
    sync_partial_ok:0
    sync_partial_err:0
    expired_keys:0
    expired_stale_perc:0.00
    expired_time_cap_reached_count:0
    evicted_keys:0
    keyspace_hits:0
    keyspace_misses:0
    pubsub_channels:0
    pubsub_patterns:0
    latest_fork_usec:172
    migrate_cached_sockets:0
    slave_expires_tracked_keys:0
    active_defrag_hits:0
    active_defrag_misses:0
    active_defrag_key_hits:0
    active_defrag_key_misses:0
    
    # Replication
    role:master
    connected_slaves:0
    master_replid:343fc546128e3dc41ecb5f178fae6d5e02e390ea
    master_replid2:0000000000000000000000000000000000000000
    master_repl_offset:0
    second_repl_offset:-1
    repl_backlog_active:0
    repl_backlog_size:1048576
    repl_backlog_first_byte_offset:0
    repl_backlog_histlen:0
    
    # CPU
    used_cpu_sys:1.949254
    used_cpu_user:1.224226
    used_cpu_sys_children:0.003597
    used_cpu_user_children:0.000000
    
    # Cluster
    cluster_enabled:0
    
    # Keyspace
    db0:keys=1,expires=0,avg_ttl=0
    View Code
    • server: General information about the Redis server
    • clients: Client connections section
    • memory: Memory consumption related information
    • persistence: RDB and AOF related information
    • stats: General statistics
    • replication: Master/replica replication information
    • cpu: CPU consumption statistics
    • commandstats: Redis command statistics
    • cluster: Redis Cluster section
    • keyspace: Database related statistics

    It can also take the following values:

    • all: Return all sections
    • default: Return only the default set of sections
  • monitor命令https://redis.io/commands/monitor
    能夠
    隨時查看多個客戶端進行處理的命令語句
    //客戶端1
    127.0.0.1:6379> monitor
    OK
    //客戶端2
    [root@localhost redis]# ./redis-cli
    //客戶端1
    1549349998.266239 [0 127.0.0.1:46604] "COMMAND"
    127.0.0.1:6379> keys *
    1) "DB"
    //客戶端2
    1549350008.437912 [0 127.0.0.1:46604] "keys" "*"
    View Code
  • ./redis-cli --stat命令
    [root@localhost redis]# ./redis-cli --stat
    ------- data ------ --------------------- load -------------------- - child -
    鍵           內存       客戶端  阻塞數   請求數                鏈接數
    keys       mem      clients blocked requests            connections          
    1          909.06K  2       0       21 (+0)             5           
    1          909.06K  2       0       22 (+1)             5           
    1          909.06K  2       0       23 (+1)             5           
    1          909.06K  2       0       24 (+1)             5           
    1          909.06K  2       0       25 (+1)             5           
    1          909.06K  2       0       26 (+1)             5           
    1          909.06K  2       0       27 (+1)             5           
    1          909.06K  2       0       28 (+1)             5        

22.二、專業的elasticsearch + kibana + metricbeat 對redis進行監控

  • 搭建

    docker 安裝 es +kibana
    metric 安裝 https://www.elastic.co/downloads

    metricbeat elasticsearch kibana
    採集器 數據分析,搜索 luncene web查看工具

 

  • 經過docker安裝es +kibana+redis
    docker-es(5.6.14):https://hub.docker.com/_/elasticsearch安裝
    docker pull docker.elastic.co/elasticsearch/elasticsearch:5.6.14

    docker-kibana(5.6.14):https://hub.docker.com/_/kibana安裝

    docker pull docker.elastic.co/kibana/kibana:5.6.14

    docker-redis:

    docker run --name some-redis -p 6379:6379 -d redis
  • 查看安裝的docker鏡像
    [root@localhost ~]# docker images
    REPOSITORY                                      TAG                 IMAGE ID            CREATED             SIZE
    docker.io/redis                                 latest              82629e941a38        13 days ago         95 MB
    docker.elastic.co/kibana/kibana                 5.6.14              b5d65e1bd763        7 weeks ago         659 MB
    docker.elastic.co/elasticsearch/elasticsearch   5.6.14              cbf18c3f8c43        7 weeks ago         663 MB
  • 安裝 metricbeat ,在redis文件夾下建立 elastic 文件夾,而後下載 Metricbeat 安裝包
    [root@localhost redis]# mkdir elastic
    [root@localhost redis]# cd elastic/
    [root@localhost elastic]# wget https://artifacts.elastic.co/downloads/beats/metricbeat/metricbeat-5.6.14-linux-x86_64.tar.gz
  • 編寫 docker-compose.yml 文件,或者使用 rz 命令上傳
    version: '3.0'
    
    services:
    
      elasticsearch:
        image: elasticsearch:5.6.14
        ports:
          - 9200:9200
          - 9300:9300
      kibana:
        image: kibana:5.6.14
        ports:
          - 5601:5601
        links:
          - elasticsearch

     

  • 安裝 docker-compose 工具
    官方地址:https://github.com/docker/compose/releases

    //安裝
    sudo curl -L https://github.com/docker/compose/releases/download/1.24.0-rc1/docker-compose-`uname -s`-`uname -m` -o /usr/local/bin/docker-compose
    
    //添加執行權限
    sudo chmod +x /usr/local/bin/docker-compose
    
    //測試安裝結果
    docker-compose

     

  • 編譯(會根據 docker-compose.yml 配置文件進行自動下載)
    docker-compose up --build
  • 查看docker鏡像
    [root@localhost ~]# docker ps
    CONTAINER ID        IMAGE                  COMMAND                  CREATED             STATUS              PORTS                                            NAMES
    0ea66ecacfa6        kibana:5.6.14          "/docker-entrypoin..."   8 minutes ago       Up 8 minutes        0.0.0.0:5601->5601/tcp                           elastic_kibana_1
    a6e1ea5adadf        elasticsearch:5.6.14   "/docker-entrypoin..."   8 minutes ago       Up 8 minutes        0.0.0.0:9200->9200/tcp, 0.0.0.0:9300->9300/tcp   elastic_elasticsearch_1

    能夠看到kibana與elasticasearch都已安裝完成

  • 查看kibana,端口5601

     

  • 查看elasticsearch,端口9200、9300

     

  • 安裝與啓動 metricbeat 
    先進行解壓
    //解壓
    [root@localhost elastic]# tar -xzvf metricbeat-5.6.14-linux-x86_64.tar.gz 
    
    //重命名解壓文件
    [root@localhost elastic]# mv metricbeat-5.6.14-linux-x86_64 metricbeat
    [root@localhost elastic]# cd metricbeat/
    
    //刪除原有metricbeat.yml 文件並從新生成
    [root@localhost metricbeat]# rm -rf metricbeat.yml 
    [root@localhost metricbeat]# cp metricbeat.full.yml metricbeat.yml

    修改 metricbeat.yml 配置文件

    //Redis Module
    - module: redis
      metricsets: ["info", "keyspace"]
      enabled: true
      #period: 10s
    
      # Redis hosts
      hosts: ["0.0.0.0:6379"]
    
    //template
      # Set to false to disable template loading.
      template.enabled: true
    
      # Template name. By default the template name is metricbeat.
      template.name: "metricbeat"
    
      # Path to template file
      template.path: "${path.config}/metricbeat.template.json"
    
      # Overwrite existing template
      template.overwrite: false
    
    //dashboard
    dashboards.enabled:true

    啓動Metricbeat

    ./metricbeat -e -c metricbeat.yml

     

  • 配置Kibana
    咱們要配置kibana的索引,索引在metricbeat中的 metricbeat.yml 文件中的 template.name 的值metricbea,注意配置的時候要以*號結尾。時間過濾咱們選擇時間戳

    而後咱們能夠在DisCover中查看日誌信息

    咱們能夠經過dashboard來查看不少儀表盤信息

    咱們重點關注一下dashboard中的redis

    咱們也能夠本身起建立模板來採集信息

     

2三、【Cluster】讀寫分離架構搭建和twenproxy分佈式緩存搭建介紹

23.一、初級的集羣(多機部署)

 

23.1.一、redis的 master - replica 模式

緩存單機redis的讀寫壓力。

  • master: 負責寫 和 少許的讀
  • slave: 負責讀

解決的問題

  • 必定能力的高可用
  • 分攤讀寫壓力

23.1.二、搭建

  •   slaveof 命令實現(重啓以後將會取消)
    寫入庫IP:192.168.43.62
    讀取庫IP:192.168.132.128
    在讀取庫上進行slaveof操做
    slaveof 192.168.43.62 6379

    這時候在寫入庫寫入,便可在讀取庫讀取

     

  •  replicaof 命令實現(重啓以後不會取消,配置文件)
    寫入庫IP:192.168.43.62
    讀取庫IP:192.168.132.128
    修改讀取庫配置文件 redis.conf 
    # replicaof <masterip> <masterport>
    replicaof 192.168.43.62 6379

     

23.1.三、SDK實現

  • write
    class Program
    {
        static void Main(string[] args)
        {
            ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.43.62:6379,192.168.132.128:6379");
            var db = redis.GetDatabase(0);
    
    
            for (int i = 0; i < int.MaxValue; i++)
            {
                try
                {
                    db.StringSet(i.ToString(), "");
                    Console.WriteLine($"{i}處理結束");
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                    Thread.Sleep(10);
                }
            }
    
    
            Console.ReadKey();
        }
    }
    View Code

    主服務掛掉會拋出異常

  • read
    class Program
    {
        static void Main(string[] args)
        {
            ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.43.62:6379,192.168.132.128:6379");
            var db = redis.GetDatabase(0);
    
    
            for (int i = 0; i < int.MaxValue; i++)
            {
                try
                {
                    db.StringGet(i.ToString());
                    Console.WriteLine($"{i}處理結束");
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                    Thread.Sleep(10);
                }
            }
    
    
            Console.ReadKey();
        }
    }
    View Code

    主服務掛掉一個依然能夠讀

23.二、redis + twenproxy 模式(負載均衡模式)

twenproxy官方地址:https://github.com/twitter/twemproxy (至關於web nginx)

在主服務器上進行下載安裝 

//下載
[root@localhost data]# wget https://doc-0o-08-docs.googleusercontent.com/docs/securesc/ldrgc4v8l6s4mmt863tsvf16k3mhbph4/uqjpged7j8k0kucos05i7pvj52n3fv21/1549418400000/15183709450983481674/04994642584830366907/0B6pVMMV5F5dfb1YwcThnaVZXbjg?e=download
//解壓
[root@localhost data]# tar -xzvf nutcracker-0.4.1.tar.gz
//重命名
[root@localhost data]# mv nutcracker-0.4.1 twemproxy
[root@localhost data]# cd twemproxy/

編譯,編譯到/data/proxy文件夾下

[root@localhost twemproxy]# ./configure -prefix=/data/proxy

執行make進行編譯

[root@localhost twemproxy]# make & make install

編譯完成以後咱們能夠到 /data/proxy/sbin/ 下面的 nutcracker ,咱們能夠看一下 nutcracker 的幫助類

[root@localhost sbin]# ./nutcracker --help
This is nutcracker-0.4.1

Usage: nutcracker [-?hVdDt] [-v verbosity level] [-o output file]
                  [-c conf file] [-s stats port] [-a stats addr]
                  [-i stats interval] [-p pid file] [-m mbuf size]

Options:
  -h, --help             : this help
  -V, --version          : show version and exit
  -t, --test-conf        : test configuration for syntax errors and exit
  -d, --daemonize        : run as a daemon
  -D, --describe-stats   : print stats description and exit
  -v, --verbose=N        : set logging level (default: 5, min: 0, max: 11)
  -o, --output=S         : set logging file (default: stderr)
  -c, --conf-file=S      : set configuration file (default: conf/nutcracker.yml)
  -s, --stats-port=N     : set stats monitoring port (default: 22222)
  -a, --stats-addr=S     : set stats monitoring ip (default: 0.0.0.0)
  -i, --stats-interval=N : set stats aggregation interval in msec (default: 30000 msec)
  -p, --pid-file=S       : set pid file (default: off)
  -m, --mbuf-size=N      : set size of mbuf chunk in bytes (default: 16384 bytes)
View Code

而後咱們看一下配置文件模板,在 /data/twemproxy/conf/ 文件夾下面的 nutcracker.yml 文件中

alpha:
  listen: 127.0.0.1:22121
  hash: fnv1a_64
  distribution: ketama
  auto_eject_hosts: true
  redis: true
  server_retry_timeout: 2000
  server_failure_limit: 1
  servers:
   - 127.0.0.1:6379:1

beta:
  listen: 127.0.0.1:22122
  hash: fnv1a_64
  hash_tag: "{}"
  distribution: ketama
  auto_eject_hosts: false
  timeout: 400
  redis: true
  servers:
   - 127.0.0.1:6380:1 server1
   - 127.0.0.1:6381:1 server2
   - 127.0.0.1:6382:1 server3
   - 127.0.0.1:6383:1 server4

gamma:
  listen: 127.0.0.1:22123
  hash: fnv1a_64
  distribution: ketama
  timeout: 400
  backlog: 1024
  preconnect: true
  auto_eject_hosts: true
  server_retry_timeout: 2000
  server_failure_limit: 3
  servers:
   - 127.0.0.1:11212:1
   - 127.0.0.1:11213:1

delta:
  listen: 127.0.0.1:22124
  hash: fnv1a_64
  distribution: ketama
  timeout: 100
  auto_eject_hosts: true
  server_retry_timeout: 2000
  server_failure_limit: 1
  servers:
   - 127.0.0.1:11214:1
   - 127.0.0.1:11215:1
   - 127.0.0.1:11216:1
   - 127.0.0.1:11217:1
   - 127.0.0.1:11218:1
   - 127.0.0.1:11219:1
   - 127.0.0.1:11220:1
   - 127.0.0.1:11221:1
   - 127.0.0.1:11222:1
   - 127.0.0.1:11223:1

omega:
  listen: /tmp/gamma
  hash: hsieh
  distribution: ketama
  auto_eject_hosts: false
  servers:
   - 127.0.0.1:11214:100000
   - 127.0.0.1:11215:1
View Code

而後咱們在 /data/proxy/sbin/ 下面參照 nutcracker.yml 文件編寫自定義配置文件 kp.yml 

lpha:
  listen: 192.168.132.130:22121
  hash: fnv1a_64
  distribution: ketama
  auto_eject_hosts: true
  redis: true
  server_retry_timeout: 2000
  server_failure_limit: 1
  servers:
   - 192.168.132.130:6379:1
   - 192.168.132.129:6379:2

而後啓動(-d後臺啓動,-c配置文件)

[root@localhost sbin]# ./nutcracker -d -c kp.yml
[root@localhost sbin]# netstat -tlnp
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name    
tcp        0      0 192.168.132.130:22121   0.0.0.0:*               LISTEN      61912/./nutcracker  
tcp        0      0 0.0.0.0:22222           0.0.0.0:*               LISTEN      61912/./nutcracker  
tcp        0      0 0.0.0.0:111             0.0.0.0:*               LISTEN      1/systemd           
tcp        0      0 0.0.0.0:6000            0.0.0.0:*               LISTEN      7196/X              
tcp        0      0 192.168.122.1:53        0.0.0.0:*               LISTEN      7643/dnsmasq        
tcp        0      0 0.0.0.0:22              0.0.0.0:*               LISTEN      6974/sshd           
tcp        0      0 127.0.0.1:631           0.0.0.0:*               LISTEN      6976/cupsd          
tcp6       0      0 :::111                  :::*                    LISTEN      1/systemd           
tcp6       0      0 :::6000                 :::*                    LISTEN      7196/X              
tcp6       0      0 :::22                   :::*                    LISTEN      6974/sshd           
tcp6       0      0 ::1:631                 :::*                    LISTEN      6976/cupsd  

而後分別啓動兩臺redis(1192.168.132.130、192.168.132.129),進行測試

[root@bogon redis]# ./redis-cli -h 192.168.132.130 -p 22121
192.168.132.130:22121> set username jack
OK
192.168.132.130:22121> set password 12345 
OK
192.168.132.130:22121> set email 786744873@qq.com
OK

效果

 

23.三、SDK實現

class Program
{
    static void Main(string[] args)
    {
        var conf = new ConfigurationOptions() { Proxy = Proxy.Twemproxy };

        conf.EndPoints.Add("192.168.132.130:22121");  //proxy 服務器

        ConnectionMultiplexer redis = ConnectionMultiplexer.Connect(conf);

        var db = redis.GetDatabase(0);

        for (int i = 0; i < int.MaxValue; i++)
        {
            try
            {
                var info = db.StringSet(i.ToString(), i.ToString());
                Console.WriteLine($"{i} {info}處理結束");

                Thread.Sleep(100);
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
                Thread.Sleep(10);
            }
        }


        Console.ReadKey();
    }
}

效果

2四、【Cluster】搭建Redis的高可用模式sentinel介紹和搭建sdk實戰

24.一、sentinel(哨兵機制)

master-slave 模式下master掛掉的問題解決,master 掛掉了,我但願這個 master -slave 模式還能夠讀寫。因此引入議會機制 master - slave 的高可用問題,讓這個機構去投票選舉出一個slave 做爲 master。

 

流程:

  • 若是master掛掉了
  • 當通常以上的sentinel(觀察員)都認爲master掛掉了。那麼(觀察員)就要推選出一個 「正組長」, 由 「正組長」 根據 slave的優先級 選舉出一個最合適的 slave 做爲 master

  • sentinel讓 其餘slave 就會自動與master進行同步,做爲新的master 的slave
  • 掛掉的master 繼續 受到觀察員的監視。。。當老的master從新啓動,將做爲新的master的slave。

24.二、搭建

主:192.168.132.129:6379
從:192.168.132.130:6379
哨兵:192.168.132.131

  • 現將主從創建 replicaof 主從關係,而後分別啓動
  • 在哨兵機上面的 /data/ 文件夾下建立 sentinelnel 文件夾,並在 sentinelnel 文件夾下建立三個哨兵文件夾
    [root@localhost data]# ls
    redis  redis-5.0.3.tar.gz
    [root@localhost data]# mkdir sentinelnel
    [root@localhost data]# cd sentinelnel/
    [root@localhost sentinelnel]# mkdir s1
    [root@localhost sentinelnel]# mkdir s2
    [root@localhost sentinelnel]# mkdir s3
    [root@localhost sentinelnel]# ls
    s1  s2  s3
    View Code
  • 將 redis 文件夾下面的 sentinel.conf 與 redis/src 文件夾下面的分別拷貝到 /data/sentinelnel/s1/ 、 /data/sentinelnel/s2/ 、 /data/sentinelnel/s3/ 文件夾下
    [root@localhost redis]# cp ./sentinel.conf ../sentinelnel/s1/
    [root@localhost redis]# cp ./sentinel.conf ../sentinelnel/s2/
    [root@localhost redis]# cp ./sentinel.conf ../sentinelnel/s3/
    [root@localhost redis]# cd src
    [root@localhost src]# cp ./redis-sentinel ../../sentinelnel/s1/
    [root@localhost src]# cp ./redis-sentinel ../../sentinelnel/s2/
    [root@localhost src]# cp ./redis-sentinel ../../sentinelnel/s3/
  • 分別修改s一、s二、s3下面的配置文件(默認端口號26379,因此咱們設置s1:2637九、s2:26380、s3:26381)
    //s1=>sentinel.conf
    port 26379
    sentinel monitor mymaster 192.168.132.129 6379 2    //有2個哨兵認爲主節點下線了,才進行從新選舉
    
    //s2=>sentinel.conf
    port 26380
    sentinel monitor mymaster 192.168.132.129 6379 2
    
    //s3=>sentinel.conf
    port 26381
    sentinel monitor mymaster 192.168.132.129 6379 2
  • 分別使用命令啓動三個哨兵
    [root@localhost s1]# ./redis-sentinel ./sentinel.conf 
    [root@localhost s2]# ./redis-sentinel ./sentinel.conf 
    [root@localhost s3]# ./redis-sentinel ./sentinel.conf 
    ...
    72173:X 07 Feb 2019 10:09:26.046 # Sentinel ID is a4b0184b08224fccf8c9eb9d8073a6197bb15fcc
    72173:X 07 Feb 2019 10:09:26.046 # +monitor master mymaster 192.168.132.129 6379 quorum 2
    72173:X 07 Feb 2019 10:09:26.047 * +slave slave 192.168.132.130:6379 192.168.132.130 6379 @ mymaster 192.168.132.129 6379
    72173:X 07 Feb 2019 10:09:26.883 * +sentinel sentinel bee674ea947117df4af66228229a6c565a3e051b 192.168.132.131 26379 @ mymaster 192.168.132.129 6379    //26379哨兵
    72173:X 07 Feb 2019 10:09:26.996 * +sentinel sentinel 14998349ae4f22ea83e22a9f847db2385009ddba 192.168.132.131 26380 @ mymaster 192.168.132.129 6379    //26380哨兵

    爲何哨兵機制能夠檢測到其餘哨兵?

    [root@localhost redis]# ./redis-cli 
    127.0.0.1:6379> pubsub channels
    1) "__sentinel__:hello"

    由此咱們能夠看到在主redis中創建了sentinel通道,其餘哨兵經過檢測該通道來進行檢測其餘哨兵

  • 測試成果
    咱們能夠在主redis上使用 info 命令查看當前redis是的 role 
    //192.168.132.129
    # Replication
    role:master
    
    //192.168.132.130
    # Replication
    role:slave

    咱們能夠將192.168.132.129上的redis服務暫停掉,哨兵30秒內會檢測到變化,而後從新選舉

    [root@localhost s1]# ./redis-sentinel ./sentinel.conf 
    71856:X 07 Feb 2019 10:00:43.022 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
    71856:X 07 Feb 2019 10:00:43.022 # Redis version=5.0.3, bits=64, commit=00000000, modified=0, pid=71856, just started
    71856:X 07 Feb 2019 10:00:43.022 # Configuration loaded
    71856:X 07 Feb 2019 10:00:43.023 * Increased maximum number of open files to 10032 (it was originally set to 1024).
                    _._                                                  
               _.-``__ ''-._                                             
          _.-``    `.  `_.  ''-._           Redis 5.0.3 (00000000/0) 64 bit
      .-`` .-```.  ```\/    _.,_ ''-._                                   
     (    '      ,       .-`  | `,    )     Running in sentinel mode
     |`-._`-...-` __...-.``-._|'` _.-'|     Port: 26379
     |    `-._   `._    /     _.-'    |     PID: 71856
      `-._    `-._  `-./  _.-'    _.-'                                   
     |`-._`-._    `-.__.-'    _.-'_.-'|                                  
     |    `-._`-._        _.-'_.-'    |           http://redis.io        
      `-._    `-._`-.__.-'_.-'    _.-'                                   
     |`-._`-._    `-.__.-'    _.-'_.-'|                                  
     |    `-._`-._        _.-'_.-'    |                                  
      `-._    `-._`-.__.-'_.-'    _.-'                                   
          `-._    `-.__.-'    _.-'                                       
              `-._        _.-'                                           
                  `-.__.-'                                               
    
    71856:X 07 Feb 2019 10:00:43.025 # WARNING: The TCP backlog setting of 511 cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of 128.
    71856:X 07 Feb 2019 10:00:43.026 # Sentinel ID is bee674ea947117df4af66228229a6c565a3e051b
    71856:X 07 Feb 2019 10:00:43.026 # +monitor master mymaster 192.168.132.129 6379 quorum 2
    71856:X 07 Feb 2019 10:00:43.028 * +slave slave 192.168.132.130:6379 192.168.132.130 6379 @ mymaster 192.168.132.129 6379
    71856:X 07 Feb 2019 10:06:39.686 * +sentinel sentinel 14998349ae4f22ea83e22a9f847db2385009ddba 192.168.132.131 26380 @ mymaster 192.168.132.129 6379
    71856:X 07 Feb 2019 10:09:28.105 * +sentinel sentinel a4b0184b08224fccf8c9eb9d8073a6197bb15fcc 192.168.132.131 26381 @ mymaster 192.168.132.129 6379
    71856:X 07 Feb 2019 12:10:54.893 # +new-epoch 1
    71856:X 07 Feb 2019 12:10:54.895 # +vote-for-leader a4b0184b08224fccf8c9eb9d8073a6197bb15fcc 1
    71856:X 07 Feb 2019 12:10:54.904 # +sdown master mymaster 192.168.132.129 6379
    71856:X 07 Feb 2019 12:10:54.962 # +odown master mymaster 192.168.132.129 6379 #quorum 3/2
    71856:X 07 Feb 2019 12:10:54.962 # Next failover delay: I will not start a failover before Thu Feb  7 12:16:55 2019
    71856:X 07 Feb 2019 12:10:55.704 # +config-update-from sentinel a4b0184b08224fccf8c9eb9d8073a6197bb15fcc 192.168.132.131 26381 @ mymaster 192.168.132.129 6379
    71856:X 07 Feb 2019 12:10:55.704 # +switch-master mymaster 192.168.132.129 6379 192.168.132.130 6379
    71856:X 07 Feb 2019 12:10:55.705 * +slave slave 192.168.132.129:6379 192.168.132.129 6379 @ mymaster 192.168.132.130 6379
    71856:X 07 Feb 2019 12:11:25.779 # +sdown slave 192.168.132.129:6379 192.168.132.129 6379 @ mymaster 192.168.132.130 6379

    咱們經過 info 命令查看以前的從節點(192.168.132.130),發現已經變成了主節點

    這時咱們繼續啓動原來的暫停的主節點,發現已經變成了從節點

    72173:X 07 Feb 2019 12:17:36.159 * +convert-to-slave slave 192.168.132.129:6379 192.168.132.129 6379 @ mymaster 192.168.132.130 6379

24.三、sdk演示

class Program
{
    static void Main(string[] args)
    {
        ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.132.129:6379,192.168.132.130:6379");

        var db = redis.GetDatabase(0);

        for (int i = 0; i < int.MaxValue; i++)
        {
            try
            {
                var info = db.StringSet(i.ToString(), i.ToString());
                Console.WriteLine($"{i} {info}處理結束");

                Thread.Sleep(100);
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
                Thread.Sleep(10);
            }
        }


        Console.ReadKey();
    }
}
View Code

2五、【Cluster】cluster集羣搭建和以前集羣缺陷分析

25.一、目前咱們知道的集羣

  • tweenproxy
    解決了數據的均攤,可是單點壓力容易壓力過大。(容量的問題)

                                  -> redis1
    client    -> proxy    -> redis2
                                  -> redis3

  • master + slave + sentinel
    雖然解決了高可用,實現master - slave角色的切換,但尚未解決數據均攤

25.二、cluster模型

  • 高可用
    sentinel 集成到 master 裏面去。
  • 數據均攤
    16384個slot (一致性hash的算法),一個master分攤了 5600的slot。

     

25.三、集羣搭建

官方文檔:https://redis.io/topics/cluster-tutorial

本次搭建採用一臺機器多個端口進行搭建,現預計端口開放以下

master 6379 6380 6381
slave 6382 6383 6384

 

 

咱們先準備6臺redis服務(將redis-serve、redis.conf拷貝下來),並修改配置文件以下

/cluster/6379/redis-server
/cluster/6380/redis-server
/cluster/6381/redis-server
/cluster/6382/redis-server
/cluster/6383/redis-server
/cluster/6384/redis-server

/cluster/6379/redis.conf
bind 0.0.0.0
port 6379
protected-mode no
cluster-enabled yes
cluster-config-file nodes-6379.conf

/cluster/6380/redis.conf
bind 0.0.0.0
port 6380
protected-mode no
cluster-enabled yes
cluster-config-file nodes-6380.conf


/cluster/6381/redis.conf
bind 0.0.0.0
port 6381
protected-mode no
cluster-enabled yes
cluster-config-file nodes-6381.conf

/cluster/6382/redis.conf
bind 0.0.0.0
port 6382
protected-mode no
cluster-enabled yes
cluster-config-file nodes-6382.conf

/cluster/6383/redis.conf
bind 0.0.0.0
port 6383
protected-mode no
cluster-enabled yes
cluster-config-file nodes-6383.conf

/cluster/6384/redis.conf
bind 0.0.0.0
port 6384
protected-mode no
cluster-enabled yes
cluster-config-file nodes-6384.conf

而後使用 filezilla 工具將cluster文件夾拷貝到 /data 文件夾下

[root@localhost ~]# cd /data/
[root@localhost data]# ls
cluster  redis  redis-5.0.3.tar.gz
[root@localhost data]# ls
cluster  redis  redis-5.0.3.tar.gz
[root@localhost data]# cd cluster/
[root@localhost cluster]# ls
6379  6380  6381  6382  6383  6384
[root@localhost cluster]# cd 6379/
[root@localhost 6379]# ls
redis.conf  redis-server

而後修改權限並啓動

[root@localhost 6379]# chmod 777 ./redis-server && ./redis-server ./redis.conf
[root@localhost 6380]# chmod 777 ./redis-server && ./redis-server ./redis.conf
[root@localhost 6381]# chmod 777 ./redis-server && ./redis-server ./redis.conf
[root@localhost 6382]# chmod 777 ./redis-server && ./redis-server ./redis.conf
[root@localhost 6383]# chmod 777 ./redis-server && ./redis-server ./redis.conf
[root@localhost 6384]# chmod 777 ./redis-server && ./redis-server ./redis.conf

查看是否啓動成功

[root@localhost ~]# netstat -tlnp
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name    
tcp        0      0 0.0.0.0:6379            0.0.0.0:*               LISTEN      85807/./redis-serve 
tcp        0      0 0.0.0.0:6380            0.0.0.0:*               LISTEN      85846/./redis-serve 
tcp        0      0 0.0.0.0:6381            0.0.0.0:*               LISTEN      85851/./redis-serve 
tcp        0      0 0.0.0.0:6382            0.0.0.0:*               LISTEN      85864/./redis-serve 
tcp        0      0 0.0.0.0:6383            0.0.0.0:*               LISTEN      85869/./redis-serve 
tcp        0      0 0.0.0.0:111             0.0.0.0:*               LISTEN      1/systemd           
tcp        0      0 0.0.0.0:6384            0.0.0.0:*               LISTEN      85875/./redis-serve 

建立集羣(cluster-replicas表示每一個master後面跟一個slave)

./redis-cli --cluster create 192.168.132.129:6379 192.168.132.129:6380 192.168.132.129:6381 192.168.132.129:6382 192.168.132.129:6383 192.168.132.129:6384 --cluster-replicas 1

啓動效果

[root@localhost redis]# ./redis-cli --cluster create 192.168.132.129:6379 192.168.132.129:6380 192.168.132.129:6381 192.168.132.129:6382 192.168.132.129:6383 192.168.132.129:6384 --cluster-replicas 1
>>> Performing hash slots allocation on 6 nodes...
Master[0] -> Slots 0 - 5460
Master[1] -> Slots 5461 - 10922
Master[2] -> Slots 10923 - 16383
Adding replica 192.168.132.129:6382 to 192.168.132.129:6379
Adding replica 192.168.132.129:6383 to 192.168.132.129:6380
Adding replica 192.168.132.129:6384 to 192.168.132.129:6381
>>> Trying to optimize slaves allocation for anti-affinity
[WARNING] Some slaves are in the same host as their master
M: a9ef70b01534cae6bed200ea6ba3f0c73ff9d1f5 192.168.132.129:6379    //主
   slots:[0-5460] (5461 slots) master
M: a13cd2f8005286fd7bed260f1463b8cf5da1b91f 192.168.132.129:6380    //主
   slots:[5461-10922] (5462 slots) master
M: 56f79b7711110f2a6cc9bd52d5a218345ab23a77 192.168.132.129:6381    //主
   slots:[10923-16383] (5461 slots) master
S: e243acc761d12f1a0f1d5e05c28a2d2a6b7b9db9 192.168.132.129:6382    //從
   replicates 56f79b7711110f2a6cc9bd52d5a218345ab23a77
S: dc13912412a5c8f38a7ee24234aad47aa269c593 192.168.132.129:6383    //從
   replicates a9ef70b01534cae6bed200ea6ba3f0c73ff9d1f5
S: b408b915098d0a4725a6bce2ce375e5f9d690bdf 192.168.132.129:6384    //從
   replicates a13cd2f8005286fd7bed260f1463b8cf5da1b91f
Can I set the above configuration? (type 'yes' to accept): yes
>>> Nodes configuration updated
>>> Assign a different config epoch to each node
>>> Sending CLUSTER MEET messages to join the cluster
Waiting for the cluster to join
.........
>>> Performing Cluster Check (using node 192.168.132.129:6379)
M: a9ef70b01534cae6bed200ea6ba3f0c73ff9d1f5 192.168.132.129:6379    //主
   slots:[0-5460] (5461 slots) master
   1 additional replica(s)
M: 56f79b7711110f2a6cc9bd52d5a218345ab23a77 192.168.132.129:6381    //主
   slots:[10923-16383] (5461 slots) master
   1 additional replica(s)
S: b408b915098d0a4725a6bce2ce375e5f9d690bdf 192.168.132.129:6384    //從
   slots: (0 slots) slave
   replicates a13cd2f8005286fd7bed260f1463b8cf5da1b91f
S: dc13912412a5c8f38a7ee24234aad47aa269c593 192.168.132.129:6383    //從
   slots: (0 slots) slave
   replicates a9ef70b01534cae6bed200ea6ba3f0c73ff9d1f5
M: a13cd2f8005286fd7bed260f1463b8cf5da1b91f 192.168.132.129:6380    //主
   slots:[5461-10922] (5462 slots) master
   1 additional replica(s)
S: e243acc761d12f1a0f1d5e05c28a2d2a6b7b9db9 192.168.132.129:6382    //從
   slots: (0 slots) slave
   replicates 56f79b7711110f2a6cc9bd52d5a218345ab23a77
[OK] All nodes agree about slots configuration.
>>> Check for open slots...
>>> Check slots coverage...
[OK] All 16384 slots covered.

使用 cluster nodes 命令查看集羣狀態

[root@localhost redis]# ./redis-cli 
127.0.0.1:6379> cluster nodes
56f79b7711110f2a6cc9bd52d5a218345ab23a77 192.168.132.129:6381@16381 master - 0 1549521186000 3 connected 10923-16383
a9ef70b01534cae6bed200ea6ba3f0c73ff9d1f5 192.168.132.129:6379@16379 myself,master - 0 1549521183000 1 connected 0-5460
b408b915098d0a4725a6bce2ce375e5f9d690bdf 192.168.132.129:6384@16384 slave a13cd2f8005286fd7bed260f1463b8cf5da1b91f 0 1549521187004 6 connected
dc13912412a5c8f38a7ee24234aad47aa269c593 192.168.132.129:6383@16383 slave a9ef70b01534cae6bed200ea6ba3f0c73ff9d1f5 0 1549521184000 5 connected
a13cd2f8005286fd7bed260f1463b8cf5da1b91f 192.168.132.129:6380@16380 master - 0 1549521185000 2 connected 5461-10922
e243acc761d12f1a0f1d5e05c28a2d2a6b7b9db9 192.168.132.129:6382@16382 slave 56f79b7711110f2a6cc9bd52d5a218345ab23a77 0 1549521185997 4 connected

高可用問題演示

如今咱們將6379關閉,能夠看到其餘的redis所有監聽到了6379端口關閉

85846:M 07 Feb 2019 14:39:51.059 * Marking node a9ef70b01534cae6bed200ea6ba3f0c73ff9d1f5 as failing (quorum reached).
85846:M 07 Feb 2019 14:39:51.060 # Cluster state changed: fail
85846:M 07 Feb 2019 14:39:51.968 # Failover auth granted to dc13912412a5c8f38a7ee24234aad47aa269c593 for epoch 7
85846:M 07 Feb 2019 14:39:52.010 # Cluster state changed: ok

這時候咱們查看6383信息,能夠看到該slave已經變成了master

這時候查看cluster信息,能夠看到目前3個master,2個slave

127.0.0.1:6383> cluster nodes
a13cd2f8005286fd7bed260f1463b8cf5da1b91f 192.168.132.129:6380@16380 master - 0 1549521863000 2 connected 5461-10922
dc13912412a5c8f38a7ee24234aad47aa269c593 192.168.132.129:6383@16383 myself,master - 0 1549521864000 7 connected 0-5460
e243acc761d12f1a0f1d5e05c28a2d2a6b7b9db9 192.168.132.129:6382@16382 slave 56f79b7711110f2a6cc9bd52d5a218345ab23a77 0 1549521865027 4 connected
b408b915098d0a4725a6bce2ce375e5f9d690bdf 192.168.132.129:6384@16384 slave a13cd2f8005286fd7bed260f1463b8cf5da1b91f 0 1549521864021 6 connected
a9ef70b01534cae6bed200ea6ba3f0c73ff9d1f5 192.168.132.129:6379@16379 master,fail - 1549521573625 1549521572519 1 disconnected
56f79b7711110f2a6cc9bd52d5a218345ab23a77 192.168.132.129:6381@16381 master - 0 1549521863013 3 connected 10923-16383

這時候咱們從新啓動6379,並查看查看cluster信息,發現已經恢復成slave

127.0.0.1:6383> cluster nodes
a13cd2f8005286fd7bed260f1463b8cf5da1b91f 192.168.132.129:6380@16380 master - 0 1549521999113 2 connected 5461-10922
dc13912412a5c8f38a7ee24234aad47aa269c593 192.168.132.129:6383@16383 myself,master - 0 1549522000000 7 connected 0-5460
e243acc761d12f1a0f1d5e05c28a2d2a6b7b9db9 192.168.132.129:6382@16382 slave 56f79b7711110f2a6cc9bd52d5a218345ab23a77 0 1549521999000 4 connected
b408b915098d0a4725a6bce2ce375e5f9d690bdf 192.168.132.129:6384@16384 slave a13cd2f8005286fd7bed260f1463b8cf5da1b91f 0 1549522001129 6 connected
a9ef70b01534cae6bed200ea6ba3f0c73ff9d1f5 192.168.132.129:6379@16379 slave dc13912412a5c8f38a7ee24234aad47aa269c593 0 1549521997098 7 connected
56f79b7711110f2a6cc9bd52d5a218345ab23a77 192.168.132.129:6381@16381 master - 0 1549522000121 3 connected 10923-16383

數據均攤問題演示

以集羣的方式鏈接服務6382,在這裏咱們鏈接master,方便清除數據庫(使用-c參數),能夠清晰的看到存儲到了哪臺服務器的哪一個slot(槽位)

[root@localhost redis]# ./redis-cli -c -p 6380
127.0.0.1:6380> flushall
OK
127.0.0.1:6380> set username jack
-> Redirected to slot [14315] located at 192.168.132.129:6381
OK
192.168.132.129:6381> set password 12345
-> Redirected to slot [9540] located at 192.168.132.129:6380
OK
192.168.132.129:6380> set email 786744873@qq.com
OK

25.四、SDK實現

class Program
{
    static void Main(string[] args)
    {
        ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.132.129:6379,192.168.132.129:6380,192.168.132.129:6381,192.168.132.129:6382,192.168.132.129:6383,192.168.132.129:6384");

        var db = redis.GetDatabase(0);

        for (int i = 0; i < int.MaxValue; i++)
        {
            try
            {
                var info = db.StringSet(i.ToString(), i.ToString());
                Console.WriteLine($"{i} {info}處理結束");

                Thread.Sleep(100);
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
                Thread.Sleep(10);
            }
        }

        Console.ReadKey();
    }
}
View Code

效果展現

 

 

節後語,堅持寫完真的不容易,大年初一都在寫。謝謝你們,新年快樂

相關文章
相關標籤/搜索