Redis 【常識與進階】

Redis 簡介

Redis 是徹底開源免費的,遵照BSD協議,是一個高性能的key-value數據庫。html

Redis 與其餘 key - value 緩存產品有如下三個特色:java

  • Redis支持數據的持久化,能夠將內存中的數據保存在磁盤中,重啓的時候能夠再次加載進行使用。
  • Redis不只僅支持簡單的key-value類型的數據,同時還提供list,set,zset,hash等數據結構的存儲。
  • Redis支持數據的備份,即master-slave模式的數據備份。

Redis 優點&缺點

  • 數據類型多,純內存操做,單線程避免上下文切換,非阻塞IO多路複用機制
  • 性能極高 – Redis能讀的速度是110000次/s,寫的速度是81000次/s 。
  • 豐富的數據類型 – Redis支持二進制案例的 Strings, Lists, Hashes, Sets 及 Ordered Sets 數據類型操做。
  • 原子 – Redis的全部操做都是原子性的,意思就是要麼成功執行要麼失敗徹底不執行。單個操做是原子性的。多個操做也支持事務,即原子性,經過MULTI和EXEC指令包起來。
  • 豐富的特性 – Redis還支持 publish/subscribe, 通知, key 過時等等特性。
  • Redis的主要缺點是數據庫容量受到物理內存的限制,不能用做海量數據的高性能讀寫,所以Redis適合的場景主要侷限在較小數據量的高性能操做和運算上。

Redis與其餘key-value存儲有什麼不一樣?node

  • Redis有着更爲複雜的數據結構而且提供對他們的原子性操做,這是一個不一樣於其餘數據庫的進化路徑。Redis的數據類型都是基於基本數據結構的同時對程序員透明,無需進行額外的抽象。python

  • Redis運行在內存中可是能夠持久化到磁盤,因此在對不一樣數據集進行高速讀寫時須要權衡內存,由於數據量不能大於硬件內存。在內存數據庫方面的另外一個優勢是,相比在磁盤上相同的複雜的數據結構,在內存中操做起來很是簡單,這樣Redis能夠作不少內部複雜性很強的事情。同時,在磁盤格式方面他們是緊湊的以追加的方式產生的,由於他們並不須要進行隨機訪問。mysql

 

Redis 持久化方式

  • redis持久化介紹
    • 因爲Redis的數據都存放在內存中,若是沒有配置持久化,redis重啓後數據就全丟失了
    • redis提供兩種方式進行持久化:
      • 第一種:RDB (將Redis中數據定時dump到硬盤)
      • 第二種:AOF (將Reids的操做日誌以追加的方式寫入文件)
  • 兩者原理
    •  RDB持久化原理
      • RDB持久化是指在指定的時間間隔內將內存中的數據集快照寫入磁盤
      • 實際操做過程是fork一個子進程,先將數據集寫入臨時文件,寫入成功後,再替換以前的文件,用二進制壓縮存儲

                              

    •  AOF持久化原理
      • AOF持久化以日誌的形式記錄服務器所處理的每個寫、刪除操做,查詢操做不會記錄,以文本的方式記錄,能夠打開文件看到詳細的操做記錄。

                             

    • RDB優缺點介紹(快照)
      • RDB優勢

        1. 整個Redis數據庫將只包含一個文件,一旦系統出現災難性故障,咱們能夠很是容易的進行恢復。程序員

        2. 性能最大化,它僅須要fork出子進程,由子進程完成持久化工做,極大的避免服務進程執行IO操做了。redis

        3. 相比於AOF機制,若是數據集很大,RDB的啓動效率會更高算法

      • RDB缺點

        1. RDB容易丟數據,由於系統一旦在定時持久化以前出現宕機現象,此前沒有來得及寫入磁盤的數據都將丟失spring

        2. RDB經過fork子進程來完成持久化的若是當數據集較大時,可能會致使整個服務器中止服務幾百毫秒,甚至是1秒鐘。sql

AOF優缺點介紹(鏡像)

  • AOF優勢
    • 數據安全性高,Redis中提供了3中同步策略,即每秒同步、每修改同步和不一樣步
    • 因爲該機制對日誌文件的寫入操做採用的是append模式,所以在寫入過程當中即便出現宕機現象,也不會破壞日誌文件中已經存在的內容
    • 若是日誌過大,Redis能夠自動啓用rewrite機制。即Redis以append模式不斷的將修改數據寫入到老的磁盤文件中
    • AOF包含一個格式清晰、易於理解的日誌文件用於記錄全部的修改操做。事實上,咱們也能夠經過該文件完成數據的重建
  • AOF缺點
    • 對於相同數量的數據集而言,AOF文件一般要大於RDB文件,RDB 在恢復大數據集時的速度比 AOF 的恢復速度要快。
    • AOF在運行效率上每每會慢於RDB

在 /etc/redis.conf 中配置使用RDP

  • RDP配置選項(這3個選項都屏蔽,則rdb禁用))
# save 900 1       // 900內,有1條寫入,則產生快照
# save 300 1000    // 若是300秒內有1000次寫入,則產生快照
# save 60 10000    // 若是60秒內有10000次寫入,則產生快照
  • RDP其餘配置
# stop-writes-on-bgsave-error yes  // 後臺備份進程出錯時,主進程停不中止寫入?  主進程不中止 容易形成數據不一致
# rdbcompression yes               // 導出的rdb文件是否壓縮    若是rdb的大小很大的話建議這麼作
# Rdbchecksum yes                  // 導入rbd恢復時數據時,要不要檢驗rdb的完整性 驗證版本是否是一致
# dbfilename dump.rdb              //導出來的rdb文件名
# dir ./                           //rdb的放置路徑

在 /etc/redis.conf 中配置使用AOF

# appendonly no                  // 是否打開aof日誌功能     aof跟  rdb都打開的狀況下 
# appendfsync always             // 每1個命令,都當即同步到aof. 安全,速度慢
# appendfsync everysec           // 折衷方案,每秒寫1次
# appendfsync no                 // 寫入工做交給操做系統,由操做系統判斷緩衝區大小,統一寫入到aof. 同步頻率低,速度快,

# no-appendfsync-on-rewrite yes:      // 正在導出rdb快照的過程當中,要不要中止同步aof
# auto-aof-rewrite-percentage 100     //aof文件大小比起上次重寫時的大小,增加率100%時,重寫    缺點  剛開始的時候重複重寫屢次
# auto-aof-rewrite-min-size 64mb      //aof文件,至少超過64M時,重寫
  測試使用:  redis-benchmark  -n 10000 表示 執行請求10000次,執行ls   發現出現 rdb 跟 aof文件。appendonly.aof dump.rdb
  • 注意的事項
# 注: 在dump rdb過程當中,aof若是中止同步,會不會丟失?
答: 不會,全部的操做緩存在內存的隊列裏, dump完成後,統一操做.
# 注: aof重寫是指什麼? 答: aof重寫是指把內存中的數據,逆化成命令,寫入到.aof日誌裏,以解決aof日誌過大的問題.
# 問: 若是rdb文件,和aof文件都存在,優先用誰來恢復數據? 答: aof
# 問: 2種是否能夠同時用? 答: 能夠,並且推薦這麼作

# 問: 恢復時rdb和aof哪一個恢復的快 答: rdb快,由於其是數據的內存映射,直接載入到內存,而aof是命令,須要逐條執行

 

Redis 數據類型

  • redis中全部數據結構都以惟一的key字符串做爲名稱,而後經過這個惟一的key來獲取對應的value
  • 不一樣的數據類型數據結構差別就在於value的結構不同

字符串(string)

  • value的數據結構(數組)
    • 字符串value數據結構相似於數組,採用與分配容易空間來減小內存頻繁分配
    • 當字符串長度小於1M時,擴容就是加倍現有空間
    • 若是字符串長度操做1M時,擴容時最多擴容1M空間,字符串最大長度爲 512M
    • 編碼:int 編碼是用來保存整數值,raw編碼是用來保存長字符串,而embstr是用來保存短字符串
  • 字符串的使用場景(緩存)
    • 字符串一個常見的用途是緩存用戶信息,咱們將用戶信息使用JSON序列化成字符串
    • 取用戶信息時會通過一次反序列化的過程

list(列表)

  • value的數據結構(雙向鏈表)
    • 列表的數據結構是雙向鏈表,這意味着插入和刪除的時間複雜度是0(1),索引的時間複雜度位0(n)
    • 它是簡單的字符串列表,按照插入順序排序,你能夠添加一個元素到列表的頭部(左邊)或者尾部(右邊),它的底層其實是個鏈表結構。
    • 編碼: 能夠是 ziplist(壓縮列表) 和 linkedlist(雙端鏈表)
    • 當列表彈出最後一個元素後,該數據結構會被自動刪除,內存被回手
  • 列表的使用場景(隊列、棧)

hash(字典)

  • value的數據結構(HashMap)
    • redis中的字典也是HashMap(數組+列表)的二維結構
    • 不一樣的是redis的字典的值只能是字符串
    • 編碼:哈希對象的編碼能夠是 ziplist 或者 hashtable。
  • hash的使用場景(緩存)
    • hash結構也能夠用來緩存用戶信息,與字符串一次性所有序列化整個對象不一樣,hash能夠對每一個字段進行單獨存儲
    • 這樣能夠部分獲取用戶信息,節約網絡流量
    • hash也有缺點,hash結構的存儲消耗要高於單個字符串

set(集合)

  • value的數據結構(字典)
    • redis中的集合至關於一個特殊的字典,字典的全部value都位null
    • 當集合中的最後一個元素被移除後,數據結構會被自動刪除,內存被回收
  • set使用場景
    • set結構能夠用來存儲某個活動中中獎的用戶ID,由於有去重功能,能夠保證同一用戶不會中間兩次

zset(有序集合)

  • value的數據結構(跳躍列表)
    • zset一方面是一個set,保證了內部的惟一性
    • 另外一方面它能夠給每個value賦予一個score,表明這個value的權重
    • zset內部實現用的是一種叫作「跳躍列表」的數據結構
    • zset最後一個元素被移除後,數據結構就會被自動刪除,內存也會被回收
    • 編碼:有序集合的編碼能夠是 ziplist 或者 skiplist。
  • zset應用場景
    • 粉絲列表:value(粉絲ID),score(關注時間),這樣能夠輕鬆按關注事件排序
    • 學生成績:value(學生ID),score(考試成績),這樣能夠輕鬆對成績排序

 

Redis 對五大數據類型的操做

Redis 對String操做

  • redis中的String在內存存儲樣式
    • 注:String操做,redis中的String在在內存中按照一個name對應一個value字典形式來存儲

                  

  • set(name, value, ex=None, px=None, nx=False, xx=False)
    • ex,過時時間(秒)
    • px,過時時間(毫秒)
    • nx,若是設置爲True,則只有name不存在時,當前set操做才執行
    • xx,若是設置爲True,則只有name存在時,當前set操做才執行
import redis
r = redis.Redis(host='1.1.1.3', port=6379)

#一、打印這個Redis緩存全部key以列表形式返回:[b'name222', b'foo']
print( r.keys() )                      # keys *

#二、清空redis
r.flushall()

#三、設置存在時間:  ex=1指這個變量只會存在1秒,1秒後就不存在了
r.set('name', 'Alex')                 # ssetex name Alex
r.set('name', 'Alex',ex=1)             # ssetex name 1 Alex

#四、獲取對應key的value
print(r.get('name'))                # get name

#五、刪除指定的key
r.delete('name')                    # del 'name'

#六、避免覆蓋已有的值:  nx=True指只有當字典中沒有name這個key纔會執行
r.set('name', 'Tom',nx=True)        # setnx name alex

#七、從新賦值: xx=True只有當字典中已經有key值name纔會執行
r.set('name', 'Fly',xx=True)       # set name alex xx

#八、psetex(name, time_ms, value) time_ms,過時時間(數字毫秒 或 timedelta對象)
r.psetex('name',10,'Tom')          # psetex name 10000 alex

#十、mset 批量設置值;  mget 批量獲取
r.mset(key1='value1', key2='value2')           # mset k1 v1 k2 v2 k3 v3
print(r.mget({'key1', 'key2'}))                # mget k1 k2 k3

#十一、getset(name, value) 設置新值並獲取原來的值
print(r.getset('age','100'))                    # getset name tom

#十二、getrange(key, start, end)    下面例子就是獲取name值abcdef切片的0-2間的字符(b'abc')
r.set('name','abcdef')
print(r.getrange('name',0,2))

#1三、setbit(name, offset, value)  #對name對應值的二進制表示的位進行操做
r.set('name','abcdef')
r.setbit('name',6,1)    #將a(1100001)的第二位值改爲1,就變成了c(1100011)
print(r.get('name'))    #最後打印結果:b'cbcdef'

#1四、bitcount(key, start=None, end=None) 獲取name對應的值的二進制表示中 1 的個數

#1五、incr(self,name,amount=1) 自增 name對應的值,當name不存在時,則建立name=amount,不然自增

#1六、derc 自動減1:利用incr和derc能夠簡單統計用戶在線數量
#若是之前有count就在之前基礎加1,沒有就第一次就是1,之後每運行一次就自動加1
num = r.incr('count')

#1七、num = r.decr('count')    #每運行一次就自動減1
#每運行一次incr('count')num值加1每運行一次decr後num值減1
print(num)            

#1八、append(key, value) 在redis name對應的值後面追加內容
r.set('name','aaaa')
r.append('name','bbbb')
print(r.get('name'))        #運行結果: b'aaaabbbb'
Redis對string操做
  • 使用setbit()和bitcount()實現最高效的統計大數量用戶在線
    • 1. setbit()和bitcount()各自做用
      • setbit()能夠任意指定一個key的第多少位是1或者0(好比:setbit n 1 1 設置n的第一位是1)
      • bitcount() 能夠統計某個key中共有多少個1 (好比: bitcount  n   就會返回n中二進制1的個數)
      • 每一個用戶都會存儲在數據庫中,而且每一個條目都會對應一個id值
    •  2. 原理:(這裏是在Redis命令行中作的測試)
      • 根據上面三條特色能夠高效統計用戶在線數量以及肯定某個用戶是否在線
      • 方法是當用戶登陸後就將其對應的id位設置成1
      • 好比:tom用戶在數據庫中id=100,那麼tom登陸後就能夠設置鍵的第一百位爲1(setbit n 100 1)
      • 統計在線數量:  bitcount  n     (能夠後取到key值n中以的數量)
      • 肯定某用戶是否在線:好比用戶在數據庫中id=100getbit n 100    
      • 就能夠返回n的第一百位是1就是在線,是0就是不在線
import redis
r = redis.Redis(host='10.1.0.51', port=6379)

r.setbit('n',10,1)            #設置n的第十位是二進制的1
print(r.getbit('n',10))        #獲取n的第十位是1仍是0(id=10用戶是否在線)
print(r.bitcount('n'))        #統計那種共有多上個1(用戶在線數量)
使用python的Redis模塊實現統計用戶在線狀況

Redis 對 Hash操做

  • redis中的Hash在內存存儲樣式
    • 注: hash在內存中存儲能夠不像string中那樣必須是字典,能夠一個鍵對應一個字典

               

  • Redis對Hash字典操做舉例
import redis
pool = redis.ConnectionPool(host='1.1.1.3', port=6379)

r = redis.Redis(connection_pool=pool)
#1 hset(name, key, value) name=字典名字,key=字典key,value=對應key的值
r.hset('info','name','tom')       # hset info name tom
r.hset('info','age','100')
print(r.hgetall('info'))           # hgetall info          {b'name': b'tom', b'age': b'100'}
print(r.hget('info','name'))      # hget info name         b'tom'

print(r.hkeys('info'))        #打印出」info」對應的字典中的全部key         [b'name', b'age']
print(r.hvals('info'))        #打印出」info」對應的字典中的全部value       [b'tom', b'100']


#2 hmset(name, mapping) 在name對應的hash中批量設置鍵值對
r.hmset('info2', {'k1':'v1', 'k2': 'v2','k3':'v3'}) #一次性設置多個值
print(r.hgetall('info2'))         #hgetall() 一次性打印出字典中全部內容
print(r.hget('info2','k1'))       #打印出‘info2’對應字典中k1對應的value
print(r.hlen('info2'))            # 獲取name對應的hash中鍵值對的個數
print(r.hexists('info2','k1'))    # 檢查name對應的hash是否存在當前傳入的key
r.hdel('info2','k1')              # 將name對應的hash中指定key的鍵值對刪除
print(r.hgetall('info2'))

#3 hincrby(name, key, amount=1)自增name對應的hash中的指定key的值,不存在則建立key=amount
r.hincrby('info2','k1',amount=10)  #第一次賦值k1=10之後每執行一次值都會自動增長10
print(r.hget('info2','k1'))

#4 hscan(name, cursor=0, match=None, count=None)對於數據大的數據很是有用,hscan能夠實現分片的獲取數據
# name,redis的name
# cursor,遊標(基於遊標分批取獲取數據)
# match,匹配指定key,默認None 表示全部的key
# count,每次分片最少獲取個數,默認None表示採用Redis的默認分片個數
print(r.hscan('info2',cursor=0,match='k*'))     #打印出全部key中以k開頭的
print(r.hscan('info2',cursor=0,match='*2*'))     #打印出全部key中包含2的

#5 hscan_iter(name, match=None, count=None)
# match,匹配指定key,默認None 表示全部的key
# count,每次分片最少獲取個數,默認None表示採用Redis的默認分片個數
for item in r.hscan_iter('info2'):
    print(item)
Redis對Hash字典操做

Redis 對List操做

  • redis中的List在在內存中按照一個name對應一個List來存儲

              

  • redis對列表操做舉例
import redis
pool = redis.ConnectionPool(host='10.1.0.51', port=6379)

r = redis.Redis(connection_pool=pool)

#1 lpush:反向存放   rpush正向存放數據
r.lpush('names','alex','tom','jack')         # 從右向左放數據好比:3,2,1(反着放)
print(r.lrange('names',0,-1))                 # 結果:[b'jack', b'tom']
r.rpush('names','zhangsan','lisi')           #從左向右放數據如:1,2,3(正着放)
print(r.lrange('names',0,-1))                 #結果:b'zhangsan', b'lisi']

#2.1  lpushx(name,value) 在name對應的list中添加元素,只有name已經存在時,值添加到列表最左邊
#2.2  rpushx(name, value) 表示從右向左操做

#3 llen(name) name對應的list元素的個數
print(r.llen('names'))

#4 linsert(name, where, refvalue, value)) 在name對應的列表的某一個值前或後插入一個新值
# name,redis的name
# where,BEFORE或AFTER
# refvalue,標杆值,即:在它先後插入數據
# value,要插入的數據
r.rpush('name2','zhangsan','lisi')                           #先建立列表[zhangsan,lisi]
print(r.lrange('name2',0,-1))
r.linsert('name2','before','zhangsan','wangwu')         #在張三前插入值wangwu
r.linsert('name2','after','zhangsan','zhaoliu')         #在張三前插入值zhaoliu
print(r.lrange('name2',0,-1))

#5 r.lset(name, index, value)  對name對應的list中的某一個索引位置從新賦值
r.rpush('name3','zhangsan','lisi')                          #先建立列表[zhangsan,lisi]
r.lset('name3',0,'ZHANGSAN')                            #將索引爲0的位置值改爲'ZHANGSAN'
print(r.lrange('name3',0,-1))                            #最後結果:[b'ZHANGSAN', b'lisi']

#6 r.lrem(name, value, num) 在name對應的list中刪除指定的值
# name,redis的name
# value,要刪除的值
# num,  num=0,刪除列表中全部的指定值;
# num=2,從前到後,刪除2個;
# num=-2,從後向前,刪除2個
r.rpush('name4','zhangsan','zhangsan','zhangsan','lisi')
r.lrem('name4','zhangsan',1)
print(r.lrange('name4',0,-1))

#7 lpop(name) 在name對應的列表的左側獲取第一個元素並在列表中移除,返回值則是第一個元素
r.rpush('name5','zhangsan','lisi')
r.rpop('name5')
print(r.lrange('name5',0,-1))

#8 lindex(name, index) 在name對應的列表中根據索引獲取列表元素
r.rpush('name6','zhangsan','lisi')
print(r.lindex('name6',1))

#9 lrange(name, start, end) 在name對應的列表分片獲取數據
r.rpush('num',0,1,2,3,4,5,6)
print(r.lrange('num',1,3))

#10 ltrim(name, start, end) 在name對應的列表中移除沒有在start-end索引之間的值
r.rpush('num1',0,1,2,3,4,5,6)
r.ltrim('num1',1,2)
print(r.lrange('num1',0,-1))

#11 rpoplpush(src, dst) 從一個列表取出最右邊的元素,同時將其添加至另外一個列表的最左邊
r.rpush('num2',0,1,2,3)
r.rpush('num3',100)
r.rpoplpush('num2','num3')
print(r.lrange('num3',0,-1))        #運行結果:[b'3', b'100']

#12 blpop(keys, timeout) 將多個列表排列,按照從左到右去pop對應列表的元素
#timeout,超時時間,當元素全部列表的元素獲取完以後,阻塞等待列表內有數據的時間(秒), 0 表示永遠阻塞
r.rpush('num4',0,1,2,3)
r.blpop('num4',10)
print(r.lrange('num4',0,-1))
redis對列表操做舉例

redis對Set集合操做,Set集合就是不容許重複的列表

import redis
r = redis.Redis(host='10.1.0.51', port=6379)

#1 sadd(name,values) name對應的集合中添加元素
#2 scard(name) 獲取name對應的集合中元素個數
r.sadd('name0','alex','tom','jack')
print(r.scard('name0'))

#3 sdiff(keys, *args) 在第一個name對應的集合中且不在其餘name對應的集合的元素集合
r.sadd('num6',1,2,3,4)
r.sadd('num7',3,4,5,6)               #在num6中有且在num7中沒有的元素
print(r.sdiff('num6','num7'))        #運行結果:{b'1', b'2'}

#4 sdiffstore(dest, keys, *args)
#獲取第一個name對應的集合中且不在其餘name對應的集合,再將其新加入到dest對應的集合中
# 將在num7中不在num8中的元素添加到num9
r.sadd('num7',1,2,3,4)
r.sadd('num8',3,4,5,6)
r.sdiffstore('num9','num7','num8')
print(r.smembers('num9'))            #運行結果: {b'1', b'2'}

#5 sinter(keys, *args) 獲取多一個name對應集合的交集
r.sadd('num10',4,5,6,7,8)
r.sadd('num11',1,2,3,4,5,6)
print(r.sinter('num10','num11'))    #運行結果: {b'4', b'6', b'5'}

#6 sinterstore(dest, keys, *args) 獲取多一個name對應集合的並集,再講其加入到dest對應的集合中
r.sadd('num12',1,2,3,4)
r.sadd('num13',3,4,5,6)
r.sdiffstore('num14','num12','num13')
print(r.smembers('num14'))          #運行結果: {b'1', b'2'}

#7 sismember(name, value) 檢查value是不是name對應的集合的成員
r.sadd('name22','tom','jack')
print(r.sismember('name22','tom'))

#8 smove(src, dst, value) 將某個成員從一個集合中移動到另一個集合
r.sadd('num15',1,2,3,4)
r.sadd('num16',5,6)
r.smove('num15','num16',1)
print(r.smembers('num16'))         #運行結果: {b'1', b'5', b'6'}

#9 spop(name) 從集合的右側(尾部)移除一個成員,並將其返回
r.sadd('num17',4,5,6)
print(r.spop('num17'))

#10 srandmember(name, numbers) 從name對應的集合中隨機獲取 numbers 個元素
r.sadd('num18',4,5,6)
print(r.srandmember('num18',2))

#11 srem(name, values) 在name對應的集合中刪除某些值
r.sadd('num19',4,5,6)
r.srem('num19',4)
print(r.smembers('num19'))           #運行結果: {b'5', b'6'}

#12 sunion(keys, *args) 獲取多一個name對應的集合的並集
r.sadd('num20',3,4,5,6)
r.sadd('num21',5,6,7,8)
print(r.sunion('num20','num21'))    #運行結果: {b'4', b'5', b'7', b'6', b'8', b'3'}

#13 sunionstore(dest,keys, *args)
# 獲取多個name對應的集合的並集,並將結果保存到dest對應的集合中
r.sunionstore('num22','num20','num21')
print(r.smembers('num22'))          #運行結果: {b'5', b'7', b'3', b'8', b'6', b'4'}

#14 sscan(name, cursor=0, match=None, count=None)
#   sscan_iter(name, match=None, count=None)
#同字符串的操做,用於增量迭代分批獲取元素,避免內存消耗太大
redis對Set集合操做

redis對有序集合操做

  • 對有序集合使用介紹 
    • 有序集合,在集合的基礎上,爲每元素排序
    • 元素的排序須要根據另一個值來進行比較,因此,對於有序集合,每個元素有兩個值,即:值和分數,分數專門用來作排序
  • redis操做有序集合舉例
import redis
pool = redis.ConnectionPool(host='10.1.0.51', port=6379)
r = redis.Redis(connection_pool=pool)

#1 zadd(name, *args, **kwargs) 在name對應的有序集合中添加元素
r.zadd('zz', n1=11, n2=22,n3=15)
print(r.zrange('zz',0,-1))                  #[b'n1', b'n3', b'n2']
print(r.zrange('zz',0,-1,withscores=True))  #[(b'n1', 11.0), (b'n3', 15.0), (b'n2', 22.0)]

#2 zcard(name) 獲取name對應的有序集合元素的數量

#3 zcount(name, min, max) 獲取name對應的有序集合中分數 在 [min,max] 之間的個數
r.zadd('name01', tom=11,jack=22,fly=15)
print(r.zcount('name01',1,20))

#4 zincrby(name, value, amount) 自增name對應的有序集合的 name 對應的分數

#5 zrank(name, value) 獲取某個值在 name對應的有序集合中的排行(從 0 開始)
r.zadd('name02', tom=11,jack=22,fly=15)
print(r.zrank('name02','fly'))

#6 zrem(name, values) 刪除name對應的有序集合中值是values的成員
r.zadd('name03', tom=11,jack=22,fly=15)
r.zrem('name03','fly')
print(r.zrange('name03',0,-1))            # [b'tom', b'jack']

#7 zremrangebyrank(name, min, max)根據排行範圍刪除
r.zadd('name04', tom=11,jack=22,fly=15)
r.zremrangebyrank('name04',1,2)
print(r.zrange('name04',0,-1))            # [b'tom']

#8 zremrangebyscore(name, min, max) 根據分數範圍刪除
r.zadd('name05', tom=11,jack=22,fly=15)
r.zremrangebyscore('name05',1,20)
print(r.zrange('name05',0,-1))

#9 zremrangebylex(name, min, max) 根據值返回刪除

#10 zscore(name, value) 獲取name對應有序集合中 value 對應的分數

#11 zinterstore(dest, keys, aggregate=None)                #11測試過代碼報錯,未解決
#獲取兩個有序集合的交集,若是遇到相同值不一樣分數,則按照aggregate進行操做
# aggregate的值爲:  SUM  MIN  MAX
r.zadd('name09', tom=11,jack=22,fly=15)
r.zadd('name10', tom=12,jack=23,fly=15)
r.zinterstore('name11',2,'name09','name10')
print(r.zrange('name11',0,-1))
redis操做有序集合
# 127.0.0.1:6379> zadd name222 11 zhangsan 12 lisi
    (integer) 2

# 127.0.0.1:6379> zrange name222 0 -1
    1) "zhangsan"
    2) "lisi"

# 127.0.0.1:6379> zadd name333 11 zhangsan 12 lisi
    (integer) 2

# 127.0.0.1:6379> zrange name333 0 -1
    1) "zhangsan"
    2) "lisi"

# 127.0.0.1:6379> zinterstore name444 2 name222 name333
    (integer) 2

# 127.0.0.1:6379> zrange name444 0 -1 withscores
    1) "zhangsan"
    2) "22"
    3) "lisi"
    4) "24"
redis操做有序集合在命令行測試

 redis其餘經常使用操做 

  • redis其它命令
import redis
pool = redis.ConnectionPool(host='1.1.1.3', port=6379)
r = redis.Redis(connection_pool=pool)

#1 查看當前Redis全部key
print(r.keys('*'))

#2 delete(*names) 刪除Redis對應的key的值
r.delete('num16')

#3 exists(name) 檢測redis的name是否存在
print(r.exists('name09'))

#4 keys(pattern='*') 根據模型獲取redis的name
# KEYS * 匹配數據庫中全部 key 。
# KEYS h?llo 匹配 hello , hallo 和 hxllo 等。
# KEYS h*llo 匹配 hllo 和 heeeeello 等。
# KEYS h[ae]llo 匹配 hello 和 hallo ,但不匹配 hillo
print(r.keys(pattern='name*'))        #打印出Redis中全部以name開通的key

#5 expire(name ,time) 爲某個redis的某個name設置超時時間
r.expire('name09',1)            # 1秒後就會刪除這個key值name09

#6 rename(src, dst) 對redis的name重命名爲
r.rename('num13','num13new')
redis其餘命令
  • redis中切換數據庫操做
# redis 127.0.0.1:6379> SET db_number 0        # 默認使用 0 號數據庫
 
# redis 127.0.0.1:6379> SELECT 1               # 使用 1 號數據庫
 
# redis 127.0.0.1:6379[1]> GET db_number       # 已經切換到 1 號數據庫,注意 Redis 如今的命令提符多了個 [1]
 
# redis 127.0.0.1:6379[1]> SET db_number 1     # 設置默認使用 1 號數據庫
 
# redis 127.0.0.1:6379[1]> GET db_number       # 獲取當前默認使用的數據庫號

#1 move(name, db)) 將redis的某個值移動到指定的db下(對方庫中有就不移動)
127.0.0.1:6379> move name0 4


#2 type(name) 獲取name對應值的類型
127.0.0.1:6379[4]> type name0
redis中切換數據庫操做

redis的管道使用(經過管道向指定db傳送數據)

  • 管道做用
    • redis-py默認在執行每次請求都會建立(鏈接池申請鏈接)和斷開(歸還鏈接池)一次鏈接操做
    • 若是想要在一次請求中指定多個命令,則能夠使用pipline實現一次請求指定多個命令
  • 經過管道向指定db傳送數據
import redis,time
pool = redis.ConnectionPool(host='10.1.0.51', port=6379,db=5)
r = redis.Redis(connection_pool=pool)

# pipe = r.pipeline(transaction=False)
pipe = r.pipeline(transaction=True)

pipe.set('name', 'alex')
time.sleep(4)
pipe.set('role', 'sb')

pipe.execute()        #只有執行這裏上面兩條纔會一塊兒執行,才能到db5中看到這兩個值


# 127.0.0.1:6379[5]> select 5
# OK
# 127.0.0.1:6379[5]> keys *
# 1) "name"
# 2) "role"
經過管道向指定db傳送數據

 

發佈訂閱(一對多的廣播)

  • 做用圖解
    • 做用:發佈訂閱的做用就是在發佈者(publish)中發送數據,能夠在全部接收者(sub)中均可以接收到相同數據

               

  •  發佈訂閱實例各文件講解
    • 這裏的實例發佈訂閱包含如下三個文件:
      • redisHelper.py :  定義了一個類,在類例規定了如何發佈,如何訂閱,和頻道是多少
      • RedisSub.py    :  Redis接收端,在這裏直接導入redisHelper.py中定義的類,調用類中的接收數據的方法
      • RedisPub.py    :  Redis發送端,在這裏直接導入redisHelper.py中定義的類,調用類中的發送數據的方法
    • 實驗效果時,直接運行RedisSub.py,會卡在接收數據的地方,等待發送方發送數據
    • 而後運行RedisPub.py進行發送數據,能夠看到全部在運行的接收者RedisSub.py均可以接收到這個消息
import redis
class RedisHelper:
    def __init__(self):
        self.__conn = redis.Redis(host='10.1.0.51')    #鏈接Redis服務器
        self.chan_sub = 'fm104.5'       #發佈頻道'fm104.5'
        self.chan_pub = 'fm104.5'       #接收頻道也是'fm104.5'
    #發消息
    def public(self, msg):
        self.__conn.publish(self.chan_pub, msg)   #直接調用Redis的chan_pub方法發消息
        print('pub')
        return True
    #收消息
    def subscribe(self):
        print('sub')
        pub = self.__conn.pubsub()      #開始訂閱,僅僅至關於打開收音機
        pub.subscribe(self.chan_sub)    #調頻道
        pub.parse_response()            #準備接收
        return pub                      #再調用一次pub.parse_response()纔會接收
一、redisHelper.py : 定義如何發佈接收的類
from redisHelper import RedisHelper

#這裏的RedisHelper()就是redisHelper中定義的類
obj = RedisHelper()     #實例化一個對象RedisHelper
redis_sub = obj.subscribe()
while True:
    msg= redis_sub.parse_response() #若是Public發送有數據就打印,沒有就卡住
    print(msg)
二、RedisSub.py : Redis接收端
from redisHelper import RedisHelper
obj = RedisHelper()
obj.public('hello')
三、RedisPub.py : Redis發送端

 

Redis 主從同步

  • CPA原理
    • CPA原理是分佈式存儲理論的基石: C(一致性);   A(可用性);  P(分區容忍性);
    • 當主從網絡沒法連通時,修改操做沒法同步到節點,因此「一致性」沒法知足
    • 除非咱們犧牲「可用性」,也就是暫停分佈式節點服務,再也不提供修改數據功能,知道網絡恢復
    • 一句話歸納CAP: 當網絡分區發生時,一致性 和 可用性 兩難全
  • redis主從同步介紹
    • 和MySQL主從複製的緣由同樣,Redis雖然讀取寫入的速度都特別快,可是也會產生讀壓力特別大的狀況。
    • 爲了分擔讀壓力,Redis支持主從複製,Redis的主從結構能夠採用一主多從或者級聯結構。
    • Redis主從複製能夠根據是不是全量分爲全量同步和增量同步。
    • 注:redis主節點Master掛掉時,運維讓從節點Slave接管(redis主從默認沒法自動切換,須要運維手動切換)

      

  • 全量同步(快照同步)
    • 注:Redis全量複製通常發生在Slave初始化階段,這時Slave須要將Master上的全部數據都複製一份。具體步驟以下:
  1. 從服務器鏈接主服務器,發送SYNC命令;
  2. 主服務器接收到SYNC命名後,開始執行BGSAVE命令生成RDB文件並使用緩衝區記錄此後執行的全部寫命令;
  3. 主服務器BGSAVE執行完後,向全部從服務器發送快照文件,並在發送期間繼續記錄被執行的寫命令;
  4. 從服務器收到快照文件後丟棄全部舊數據,載入收到的快照;
  5. 主服務器快照發送完畢後開始向從服務器發送緩衝區中的寫命令;
  6. 從服務器完成對快照的載入,開始接收命令請求,並執行來自主服務器緩衝區的寫命令;
  7. 完成上面幾個步驟後就完成了從服務器數據初始化的全部操做,從服務器此時能夠接收來自用戶的讀請求。

      

  • 增量同步
    • 主節點會將那些對本身狀態產生修改性影響的指令記錄在本地內存buffer中,而後異步將buffer中指令同步到從節點
    • 從節點一邊執行同步指令達到主節點狀態,一邊向主節點反饋本身同步到哪裏(偏移量)
    • 當網絡狀態很差時,從節點沒法和主節點進行同步,當網絡恢復時須要進行快照同步

  • Redis主從同步策略
    • 主從剛剛鏈接的時候,進行全量同步;全同步結束後,進行增量同步。
    • 固然,若是有須要,slave 在任什麼時候候均可以發起全量同步。
    • redis 策略是,不管如何,首先會嘗試進行增量同步,如不成功,要求從機進行全量同步。
  • 注意點
    • 若是多個Slave斷線了,須要重啓的時候,由於只要Slave啓動,就會發送sync請求和主機全量同步,當多個同時出現的時候,可能會致使Master IO劇增宕機。
  • 部分複製
    • 當從節點正在複製主節點時,若是出現網絡閃斷和其餘異常,從節點會讓主節點補發丟失的命令數據,主節點只須要將複製緩衝區的數據發送到從節點就可以保證數據的一致性,相比較全量複製,成本小不少。

    • 當從節點出現網絡中斷,超過了 repl-timeout 時間,主節點就會中斷複製鏈接。
    • 主節點會將請求的數據寫入到「複製積壓緩衝區」,默認 1MB。
    • 當從節點恢復,從新鏈接上主節點,從節點會將 offset 和主節點 id 發送到主節點。     
    • 主節點校驗後,若是偏移量的數後的數據在緩衝區中,就發送 cuntinue 響應 —— 表示能夠進行部分複製。
    • 主節點將緩衝區的數據發送到從節點,保證主從複製進行正常狀態。
  • 心跳
    • 主從節點在創建複製後,他們之間維護着長鏈接並彼此發送心跳命令。
    • 心跳的關鍵機制以下:
      • 中從都有心跳檢測機制,各自模擬成對方的客戶端進行通訊,經過 client list 命令查看複製相關客戶端信息,主節點的鏈接狀態爲 flags = M,從節點的鏈接狀態是 flags = S。 
      • 主節點默認每隔 10 秒對從節點發送 ping 命令,可修改配置 repl-ping-slave-period 控制發送頻率。
      • 從節點在主線程每隔一秒發送 replconf ack{offset} 命令,給主節點上報自身當前的複製偏移量。
      • 主節點收到 replconf 信息後,判斷從節點超時時間,若是超過 repl-timeout 60 秒,則判斷節點下線。

    • 注意:
      • 爲了下降主從延遲,通常把 redis 主從節點部署在相同的機房/同城機房,避免網絡延遲帶來的網絡分區形成的心跳中斷等狀況。
  • 異步複製

    • 主節點不但負責數據讀寫,還負責把寫命令同步給從節點,寫命令的發送過程是異步完成,也就是說主節點處理完寫命令後當即返回客戶度,並不等待從節點複製完成。
    • 異步複製的步驟很簡單,以下:
    • 主節點接受處理命令。
      • 主節點處理完後返回響應結果 。
      • 對於修改命令,異步發送給從節點,從節點在主線程中執行復制的命令。

 

Redis 哨兵(sentinel)模式

  • 哨兵模式介紹
    • Sentinel(哨兵)進程是用於監控redis集羣中Master主服務器工做的狀態
    • 在Master主服務器發生故障的時候,能夠實現Master和Slave服務器的切換,保證系統的高可用(HA)
    • 其已經被集成在redis2.6+的版本中,Redis的哨兵模式到了2.8版本以後就穩定了下來

  • sentinel做用
    • 當用Redis作主從方案時,假如master宕機,Redis自己沒法自動進行主備切換
    • 而Redis-sentinel自己也是一個獨立運行的進程,它能監控多個master-slave集羣,發現master宕機後能進行自動切換。
    • 哨兵進程的做用
      • 監控(Monitoring): 哨兵(sentinel) 會不斷地檢查你的Master和Slave是否運做正常。
      • 提醒(Notification):當被監控的某個Redis節點出現問題時哨兵(sentinel) 能夠經過 API向管理員或者其餘應用程序發送知。
      • 自動故障遷移(Automatic failover):當一個Master不能正常工做時,哨兵(sentinel) 會開始一次自動故障遷移操做。
      • 它會將失效Master的其中一個Slave升級爲新的Master, 並讓失效Master的其餘Slave改成複製新的Master;
      • 當客戶端試圖鏈接失效的Master時,集羣也會向客戶端返回新Master的地址,使得集羣能夠使用如今的Master替換失效Master。
      • Master和Slave服務器切換後,Master的redis.conf、Slave的redis.conf和sentinel.conf的配置文件的內容都會發生相應的改變,即,Master主服務器的redis.conf配置文件中會多一行slaveof的配置,sentinel.conf的監控目標會隨之調換。
    • 哨兵進程的工做方式
      • 每一個Sentinel(哨兵)進程以每秒鐘一次的頻率向整個集羣中的Master主服務器,Slave從服務器以及其餘Sentinel(哨兵)進程發送一個 PING 命令。
      • 若是一個實例(instance)距離最後一次有效回覆 PING 命令的時間超過 down-after-milliseconds 選項所指定的值,則這個實例會被 Sentinel(哨兵)進程標記爲主觀下線(SDOWN)。
      • 若是一個Master主服務器被標記爲主觀下線(SDOWN),則正在監視這個Master主服務器的全部
      • Sentinel(哨兵)進程要以每秒一次的頻率確認Master主服務器的確進入了主觀下線狀態。
      • 當有足夠數量的 Sentinel(哨兵)進程(大於等於配置文件指定的值)在指定的時間範圍內確認Master主服務器進入了主觀下線狀態(SDOWN), 則Master主服務器會被標記爲客觀下線(ODOWN)。
      • 在通常狀況下, 每一個Sentinel(哨兵)進程會以每 10 秒一次的頻率向集羣中的全部Master主服務器、Slave從服務器發送 INFO 命令。
      • 當Master主服務器被 Sentinel(哨兵)進程標記爲客觀下線(ODOWN)時,Sentinel(哨兵)進程向下線的 Master主服務器的全部 Slave從服務器發送 INFO 命令的頻率會從 10 秒一次改成每秒一次。
      • 若沒有足夠數量的 Sentinel(哨兵)進程贊成 Master主服務器下線, Master主服務器的客觀下線狀態就會被移除。若 Master主服務器從新向 Sentinel(哨兵)進程發送 PING 命令返回有效回覆,Master主服務器的主觀下線狀態就會被移除。
  • sentinel原理
    • sentinel負責持續監控主節點的健康,當主節掛掉時,自動選擇一個最優的從節點切換成主節點
    • 從節點來鏈接集羣時會首先鏈接sentinel,經過sentinel來查詢主節點的地址
    • 當主節點發生故障時,sentinel會將最新的主節點地址告訴客戶端,能夠實現無需重啓自動切換redis
  • Sentinel支持集羣
    • 只使用單個sentinel進程來監控redis集羣是不可靠的,當sentinel進程宕掉後sentinel自己也有單點問題
    • 若是有多個sentinel,redis的客戶端能夠隨意地鏈接任意一個sentinel來得到關於redis集羣中的信息。
  • Sentinel版本
    • Sentinel當前穩定版本稱爲Sentinel 2,Redis2.8和Redis3.0附帶穩定的哨兵版本
    • 安裝完redis-3.2.8後,redis-3.2.8/src/redis-sentinel啓動程序 redis-3.2.8/sentinel.conf是配置文件。
  • 運行sentinel兩種方式(效果相同)
    • 法1:redis-sentinel /path/to/sentinel.conf
    • 法2:redis-server /path/to/sentinel.conf --sentinel
    • 以上兩種方式,都必須指定一個sentinel的配置文件sentinel.conf,若是不指定,將沒法啓動sentinel。
    • sentinel默認監聽26379端口,因此運行前必須肯定該端口沒有被別的進程佔用。
  • sentinel.conf配置文件說明
    • 配置文件只須要配置master的信息就好啦,不用配置slave的信息,由於slave可以被自動檢測到
    • 須要注意的是,配置文件在sentinel運行期間是會被動態修改的,例如當發生主備切換時候,配置文件中的master會被修改成另一個slave。
    • 這樣,以後sentinel若是重啓時,就能夠根據這個配置來恢復其以前所監控的redis集羣的狀態。
# sentinel.conf 配置說明
sentinel monitor mymaster 127.0.0.1 6379 2
sentinel down-after-milliseconds mymaster 60000
sentinel failover-timeout mymaster 180000
sentinel parallel-syncs mymaster 1

'''一、sentinel monitor mymaster 127.0.0.1 6379 2'''
#1)sentinel監控的master的名字叫作mymaster,地址爲127.0.0.1:6379
#2)當集羣中有2個sentinel認爲master死了時,才能真正認爲該master已經不可用了

'''二、sentinel down-after-milliseconds mymaster 60000'''
#1)sentinel會向master發送心跳PING來確認master是否存活,若是master在60000毫秒內不迴應PONG 
#2)那麼這個sentinel會單方面地認爲這個master已經不可用了

'''三、sentinel failover-timeout mymaster 180000'''
#1)若是sentinel A推薦sentinel B去執行failover,B會等待一段時間後,自行再次去對同一個master執行failover,
#2)這個等待的時間是經過failover-timeout配置項去配置的。
#3)從這個規則能夠看出,sentinel集羣中的sentinel不會再同一時刻併發去failover同一個master,
#4)第一個進行failover的sentinel若是失敗了,另一個將會在必定時間內進行從新進行failover,以此類推。

'''四、sentinel parallel-syncs mymaster 1'''
#1)在發生failover主備切換時,這個選項指定了最多能夠有多少個slave同時對新的master進行同步
#2)若是這個數字越大,就意味着越多的slave由於replication而不可用,這個數字越小,完成failover所需的時間就越長。
#3)能夠經過將這個值設爲 1 來保證每次只有一個slave處於不能處理命令請求的狀態。
sentinel.conf配置文件註釋
  • 配置傳播
    • 一旦一個sentinel成功地對一個master進行了failover,它將會把關於master的最新配置經過廣播形式通知其它sentinel,其它的sentinel則更新對應master的配置。
    • 一個faiover要想被成功實行,sentinel必須可以向選爲master的slave發送SLAVE OF NO ONE命令,而後可以經過INFO命令看到新master的配置信息。
    • 當將一個slave選舉爲master併發送SLAVE OF NO ONE`後,即便其它的slave還沒針對新master從新配置本身,failover也被認爲是成功了的。
    • 由於每個配置都有一個版本號,因此以版本號最大的那個爲標準:
      • 假設有一個名爲mymaster的地址爲192.168.1.50:6379。
      • 一開始,集羣中全部的sentinel都知道這個地址,因而爲mymaster的配置打上版本號1。
      • 一段時候後mymaster死了,有一個sentinel被受權用版本號2對其進行failover。
      • 若是failover成功了,假設地址改成了192.168.1.50:9000,此時配置的版本號爲2
      • 進行failover的sentinel會將新配置廣播給其餘的sentinel,發現新配置的版本號爲2時,版本號變大了,
      • 說明配置更新了,因而就會採用最新的版本號爲2的配置。
  • 安裝和部署
    • 部署拓撲結構

  • 啓動主節點
    • 配置
# redis-6379.conf主要修改參數
port 6379
daemonize yes
logfile "6379.log"
dbfilename "dump-6379.rdb"
    • 啓動
# ./redis-server redis-6379.conf
    • 確認是否啓動成功
#方式1:
# [root@localhost bin]# ./redis-cli -h 127.0.0.1 -p 6379 ping
PONG

# 方式2:
# [root@localhost bin]# ./redis-cli -h 127.0.0.1 -p 6379 
127.0.0.1:6379> keys *
(empty list or set)
  • 啓動從節點
    • 配置
# 從節點1 redis-6380.conf 主要修改參數
port 6380
daemonize yes
logfile "6380.log"
dbfilename "dump-6380.rdb"
slaveof 127.0.0.1 6379
# 從節點2 redis-6381.conf 主要修改參數
port 6381
daemonize yes
logfile "6381.log"
dbfilename "dump-6381.rdb"
slaveof 127.0.0.1 6379
    • 啓動
./redis-server redis-6380.conf
  • 啓動打印日誌:

# ./redis-server redis-6381.conf
  • 啓動打印日誌:

  • 確認主從關係
# [root@localhost bin]# ./redis-cli -p 6379
127.0.0.1:6379> info replication
# Replication
role:master  #當前節點角色
connected_slaves:2   #從節點鏈接個數
slave0:ip=127.0.0.1,port=6380,state=online,offset=392,lag=1   #從節點鏈接信息
slave1:ip=127.0.0.1,port=6381,state=online,offset=392,lag=2   #從節點鏈接信息
master_replid:6bc06103642acba6430e01ec78ef18ada4736649
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:392
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:392
  • 此時拓撲:

  • 部署Sentinel節點
    • 3個Sentinel節點的部署方法是徹底一致的(端口不一樣)
    • 配置
      • 主要修改參數 修改端口 ,修改主節點鏈接信息,其餘使用默認就好了,具體參數後面會介紹
# port 26379
# sentinel monitor mymaster 127.0.0.1 6379 1
    • Sentinel節點的默認端口是26379
    • 啓動
# ./redis-sentinel sentinel-26379.conf

# 方法二, 使用redis-server命令加–sentinel參數:
redis-server sentinel-26379.conf --sentinel
  • 日誌:

  • 確認
    • Sentinel節點本質上是一個特殊的Redis節點, 因此也能夠經過info命令 來查詢它的相關信息 。
# [root@localhost bin]# redis-cli -h 127.0.0.1 -p 26379 info Sentinel
# Sentinel
sentinel_masters:1
sentinel_tilt:0
sentinel_running_scripts:0
sentinel_scripts_queue_length:0
sentinel_simulate_failure_flags:0
master0:name=mymaster,status=ok,address=127.0.0.1:6379,slaves=2,sentinels=1
    • 其餘兩個配置是同樣的
  • 最終拓撲

  • 宕機測試
    • 如今在master節點上執行,以下操做,演示經過redis sentinel 進行故障轉移和新master的選出
# [root@localhost bin]# ./redis-cli shutdown
    • 執行完上述操做後,三個哨兵打印的日誌以下:
# 14549:X 24 Jul 15:44:44.568 # +vote-for-leader e31085285266ff86372eeeb4970c9a8de0471025 1
# 14549:X 24 Jul 15:44:44.604 # +sdown master mymaster 127.0.0.1 6379
# 14549:X 24 Jul 15:44:44.604 # +odown master mymaster 127.0.0.1 6379 #quorum 1/1
# 14549:X 24 Jul 15:44:44.604 # Next failover delay: I will not start a failover before Wed Jul 24 15:50:45 2019
# 14549:X 24 Jul 15:44:45.093 # +config-update-from sentinel e31085285266ff86372eeeb4970c9a8de0471025 127.0.0.1 
  26381 @ mymaster 127.0.0.1 6379
# 14549:X 24 Jul 15:44:45.093 # +switch-master mymaster 127.0.0.1 6379 127.0.0.1 6381 # 14549:X 24 Jul 15:44:45.093 * +slave slave 127.0.0.1:6380 127.0.0.1 6380 @ mymaster 127.0.0.1 6381 # 14549:X 24 Jul 15:44:45.093 * +slave slave 127.0.0.1:6379 127.0.0.1 6379 @ mymaster 127.0.0.1 6381 # 14549:X 24 Jul 15:45:15.127 # +sdown slave 127.0.0.1:6379 127.0.0.1 6379 @ mymaster 127.0.0.1 6381
  • 意思就是選擇6381爲新的master
  • 以下日誌是在,6379執行shutdown先後在6381節點上執行的操做:
127.0.0.1:6381> info replication
# Replication
role:slave    ###6379節點正常是,6381爲從節點
master_host:127.0.0.1
master_port:6379
master_link_status:up
master_last_io_seconds_ago:0
master_sync_in_progress:0
slave_repl_offset:166451
slave_priority:100
slave_read_only:1
connected_slaves:0
master_replid:6bc06103642acba6430e01ec78ef18ada4736649
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:166451
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:253
repl_backlog_histlen:166199
127.0.0.1:6381> info replication
# Replication
role:master   #執行shutdwon後成爲新的master節點
connected_slaves:1
slave0:ip=127.0.0.1,port=6380,state=online,offset=217098,lag=1
master_replid:e39de2323e3ab0ff0eff1347ad1c65e2bd3fd917
master_replid2:6bc06103642acba6430e01ec78ef18ada4736649
master_repl_offset:217098
second_repl_offset:172878
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:253
repl_backlog_histlen:216846
# Example sentinel.conf
# 哨兵sentinel實例運行的端口 默認26379
port 26379
# 哨兵sentinel的工做目錄
dir /tmp
# 哨兵sentinel監控的redis主節點的 ip port 
# master-name  能夠本身命名的主節點名字 只能由字母A-z、數字0-9 、這三個字符".-_"組成。
# quorum 配置多少個sentinel哨兵統一認爲master主節點失聯 那麼這時客觀上認爲主節點失聯了
# sentinel monitor <master-name> <ip> <redis-port> <quorum>
sentinel monitor mymaster 127.0.0.1 6379 2

# 當在Redis實例中開啓了requirepass foobared 受權密碼 這樣全部鏈接Redis實例的客戶端都要提供密碼
# 設置哨兵sentinel 鏈接主從的密碼 注意必須爲主從設置同樣的驗證密碼
# sentinel auth-pass <master-name> <password>
sentinel auth-pass mymaster MySUPER--secret-0123passw0rd

# 指定多少毫秒以後 主節點沒有應答哨兵sentinel 此時 哨兵主觀上認爲主節點下線 默認30秒
# sentinel down-after-milliseconds <master-name> <milliseconds>
sentinel down-after-milliseconds mymaster 30000

# 這個配置項指定了在發生failover主備切換時最多能夠有多少個slave同時對新的master進行 同步,
這個數字越小,完成failover所需的時間就越長,
可是若是這個數字越大,就意味着越 多的slave由於replication而不可用。
能夠經過將這個值設爲 1 來保證每次只有一個slave 處於不能處理命令請求的狀態。
# sentinel parallel-syncs <master-name> <numslaves>
sentinel parallel-syncs mymaster 1

# 故障轉移的超時時間 failover-timeout 能夠用在如下這些方面: 
#1. 同一個sentinel對同一個master兩次failover之間的間隔時間。
#2. 當一個slave從一個錯誤的master那裏同步數據開始計算時間。直到slave被糾正爲向正確的master那裏同步數據時。
#3.當想要取消一個正在進行的failover所須要的時間。  
#4.當進行failover時,配置全部slaves指向新的master所需的最大時間。不過,即便過了這個超時,slaves依然會被正確配置爲指向master,可是就不按parallel-syncs所配置的規則來了

# 默認三分鐘
# sentinel failover-timeout <master-name> <milliseconds>
sentinel failover-timeout mymaster 180000
# SCRIPTS EXECUTION

#配置當某一事件發生時所須要執行的腳本,能夠經過腳原本通知管理員,例如當系統運行不正常時發郵件通知相關人員。
#對於腳本的運行結果有如下規則:
#若腳本執行後返回1,那麼該腳本稍後將會被再次執行,重複次數目前默認爲10
#若腳本執行後返回2,或者比2更高的一個返回值,腳本將不會重複執行。
#若是腳本在執行過程當中因爲收到系統中斷信號被終止了,則同返回值爲1時的行爲相同。
#一個腳本的最大執行時間爲60s,若是超過這個時間,腳本將會被一個SIGKILL信號終止,以後從新執行。
 
#通知型腳本:當sentinel有任何警告級別的事件發生時(好比說redis實例的主觀失效和客觀失效等等),將會去調用這個腳本,
這時這個腳本應該經過郵件,SMS等方式去通知系統管理員關於系統不正常運行的信息。調用該腳本時,將傳給腳本兩個參數,
一個是事件的類型,
一個是事件的描述。
若是sentinel.conf配置文件中配置了這個腳本路徑,那麼必須保證這個腳本存在於這個路徑,而且是可執行的,不然sentinel沒法正常啓動成功。

#通知腳本
# sentinel notification-script <master-name> <script-path>
  sentinel notification-script mymaster /var/redis/notify.sh

# 客戶端從新配置主節點參數腳本
# 當一個master因爲failover而發生改變時,這個腳本將會被調用,通知相關的客戶端關於master地址已經發生改變的信息。
# 如下參數將會在調用腳本時傳給腳本:
# <master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port>
# 目前<state>老是「failover」,
# <role>是「leader」或者「observer」中的一個。 
# 參數 from-ip, from-port, to-ip, to-port是用來和舊的master和新的master(即舊的slave)通訊的
# 這個腳本應該是通用的,能被屢次調用,不是針對性的。

# sentinel client-reconfig-script <master-name> <script-path>

 sentinel client-reconfig-script mymaster /var/redis/reconfig.sh
Sentinel配置說明

 

codis

  • 爲何會出現codis
    • 在大數據高併發場景下,單個redis實例每每會沒法應對
    • 首先redis內存不易過大,內存太大會致使rdb文件過大,致使主從同步時間過長
    • 其次在CPU利用率中上,單個redis實例只能利用單核,數據量太大,壓力就會特別大
  • 什麼是codis
    • codis是redis集羣解決方案之一,codis是GO語言開發的代理中間件
    • 當客戶端向codis發送指令時,codis負責將指令轉發給後面的redis實例來執行,並將返回結果轉發給客戶端
  • codis部署方案
    • 單個codis代理支撐的QPS比較有限,經過啓動多個codis代理能夠顯著增長總體QPS
    • 多codis還能起到容災功能,掛掉一個codis代理還有不少codis代理能夠繼續服務

      

  • codis分片的原理
    • codis負責將特定key轉發到特定redis實例,codis默認將全部key劃分爲1024個槽位
    • 首先會對客戶端傳來的key進行crc32計算hash值,而後將hash後的整數值對1024進行取模,這個餘數就是對應的key槽位
    • 每一個槽位都會惟一映射到後面的多個redis實例之一,codis會在內存中維護槽位和redis實例的映射關係
    • 這樣有了上面key對應的槽位,那麼它應該轉發到那個redis實例就很明確了
    • 槽位數量默認是1024,若是集羣中節點較多,建議將這個數值大一些,好比2048,4096
  • 不一樣codis槽位如何同步 
    • 若是codis槽位值存在內存中,那麼不一樣的codis實例間的槽位關係得不到同步
    • 因此codis還須要一個分佈式配置存儲的數據庫專門來持久化槽位關係
    • codis將槽位關係存儲在zookeeper中,而且提供一個dashboard能夠來觀察和修改槽位關係

 

布隆過濾器

  • 布隆過濾器是什麼?(判斷某個key必定不存在)
    • 本質上布隆過濾器是一種數據結構,比較巧妙的機率型數據結構
    • 特色是高效地插入和查詢,能夠用來告訴你 「某樣東西必定不存在或者可能存在」。
    • 相比於傳統的 List、Set、Map 等數據結構,它更高效、佔用空間更少,可是缺點是其返回的結果是機率性的,而不是確切的。
  • 使用:
    • 布隆過濾器在NoSQL數據庫領域中應用的很是普遍
    • 當用戶來查詢某一個row時,能夠先經過內存中的布隆過濾器過濾掉大量不存在的row請求,而後去再磁盤進行查詢
    • 布隆過濾器說某個值不存在時,那確定就是不存在,能夠顯著下降數據庫IO請求數量
  • 應用場景
    • 場景1(給用戶推薦新聞)
      • 當用戶看過的新聞,確定會被過濾掉,對於沒有看多的新聞,可能會過濾極少的一部分(誤判)。
      • 這樣能夠徹底保證推送給用戶的新聞都是無重複的。
    • 場景2(爬蟲url去重)
      • 在爬蟲系統中,咱們須要對url去重,已經爬取的頁面再也不爬取
      • 當url高達幾千萬時,若是一個集合去裝下這些URL地址很是浪費空間
      • 使用布隆過濾器能夠大幅下降去重存儲消耗,只不過也會使爬蟲系統錯過少許頁面
  • 布隆過濾器原理
    • 每一個布隆過濾器對應到Redis的數據結構是一個大型的數組和幾個不同的無偏hash函數
    • 以下圖:f、g、h就是這樣的hash函數(無誤差指讓hash映射到數組的位置比較隨機)

      添加:值到布隆過濾器

        1)向布隆過濾器添加key,會使用 f、g、h hash函數對key算出一個整數索引,而後對長度取餘

        2)每一個hash函數都會算出一個不一樣的位置,把算出的位置都設置成1就完成了布隆過濾器添加過程

      查詢:布隆過濾器值

        1)當查詢某個key時,先用hash函數算出一個整數索引,而後對長度取餘

        2)當你有一個不爲1時確定不存在這個key,當所有都爲1時可能有這個key

        3)這樣內存中的布隆過濾器過濾掉大量不存在的row請求,而後去再磁盤進行查詢,減小IO操做

      刪除:不支持

        1)目前咱們知道布隆過濾器能夠支持 add 和 isExist 操做

        2)如何解決這個問題,答案是計數刪除,可是計數刪除須要存儲一個數值,而不是原先的 bit 位,會增大佔用的內存大小。

        3)增長一個值就是將對應索引槽上存儲的值加一,刪除則是減一,判斷是否存在則是看值是否大於0。

 

redis事物介紹

  • redis事物是能夠一次執行多個命令,本質是一組命令的集合。
  • 一個事務中的全部命令都會序列化,按順序串行化的執行而不會被其餘命令插入
  • 做用:一個隊列中,一次性、順序性、排他性的執行一系列命令 
  • redis事物基本使用
    • 下面指令演示了一個完整的事物過程,全部指令在exec前不執行,而是緩存在服務器的一個事物隊列中
    • 服務器一旦收到exec指令纔開始執行事物隊列,執行完畢後一次性返回全部結果
    • 由於redis是單線程的,因此沒必要擔憂本身在執行隊列是被打斷,能夠保證這樣的「原子性」
    • 注:redis事物在遇到指令失敗後,後面的指令會繼續執行
  • mysql的rollback與redis的discard的區別:
    • mysql回滾爲sql所有成功才執行,一條sql失敗則所有失敗,執行rollback後全部語句形成的影響消失
    • redis的discard只是結束本次事務,正確命令形成的影響仍然還在.
# Multi 命令用於標記一個事務塊的開始事務塊內的多條命令會按照前後順序被放進一個隊列當中,最後由 EXEC 命令原子性( atomic )地執行
> multi(開始一個redis事物)
incr books
incr books
> exec (執行事物)
> discard (丟棄事物)
[root@redis ~]# redis-cli
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set test 123
QUEUED
127.0.0.1:6379> exec
1) OK
127.0.0.1:6379> get test
"123"
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set test 456
QUEUED
127.0.0.1:6379> discard
OK
127.0.0.1:6379> get test
"123"
127.0.0.1:6379> 
在命令行測試redis事物
#定義ip
host = 'localhost'

#創建服務鏈接

r = redis.Redis(host=host)
pipe = r.pipeline()

#開啓事務
pipe.multi()
#存儲子命令
pipe.set('key2', 4)
#執行事務
pipe.execute()

print(r.get('key2'))
使用python測試redis事物
  • Redis事務相關命令:
    • watch key1 key2 ... : 監視一或多個key,若是在事務執行以前,被監視的key被其餘命令改動,則事務被打斷 ( 相似樂觀鎖 )

    • multi : 標記一個事務塊的開始( queued ) 事務塊內的多條命令會按照前後順序被放進一個隊列當中,最後由 EXEC 命令原子性( atomic )地執行

    • exec : 執行全部事務塊的命令 ( 一旦執行exec後,以前加的監控鎖都會被取消掉 ) 

    • discard : 取消事務,放棄事務塊中的全部命令

    • unwatch : 取消watch對全部key的監控

    • setnx:佔坑
  • watch指令
    • 實質:WATCH 只會在數據被其餘客戶端搶先修改了的狀況下通知執行命令的這個客戶端(經過 WatchError 異常)但不會阻止其餘客戶端對數據的修改
    • watch其實就是redis提供的一種樂觀鎖,能夠解決併發修改問題
    • watch會在事物開始前盯住一個或多個關鍵變量,當服務器收到exec指令要順序執行緩存中的事物隊列時
    • redis會檢查關鍵變量自watch後是否被修改(包括當前事物所在的客戶端)
    • 若是關鍵變量被人改動過,exec指令就會返回null回覆告知客戶端事物執行失敗,這個時候客戶端會選擇重試
    • 注:redis禁用在multi和exec之間執行watch指令,必須在multi以前盯住關鍵變量,不然會出錯

Redis事務的三個階段:

  • 開始事務

    • Redis事務的開始是經過執行MULTI 命令來實現,它的做用是將執行該命令的客戶端從非事務狀態切換至事務狀態

  • 命令入隊

    • 當一個客戶端出於事務狀態時, 若是客戶端發送的命令是 EXEC(執行全部事務塊內的命令) 、DISCARD(取消事務,放棄執行事務塊內的全部命令。) 、 WATCH(監視任意數量的key ,提一下,在事務中執行這個命令會報錯:ERR WATCH inside MULTI is not allowed) 、 MULTI(標記一個事務塊的開始) 四個命令之外的其餘命令,那麼服務器並不當即執行這個命令,而是將這個命令放入一個事務隊列裏面, 而後向客戶端返回 QUEUED 回覆。

  • 執行事務

    • 當一個處於事務狀態的客戶端向服務器發送 EXEC 命令時, 這個 EXEC 命令將當即被服務器執行: 服務器會遍歷這個客戶端的事務隊列,執行隊列中保存的全部命令,最後將執行命令所得的結果所有返回給客戶端。(這裏須要說明的一點是,Redis在處理網絡請求的是單線程的,因此隊列中的命令在執行期間是不會被其餘客戶端命令插進來的。這一點對理解Redis事務很關鍵)

  • WATCH

    • 用於事務開啓以前對任意數量的Key進行監視,若是這個被監視的key被改動(這裏提一下,這個改動,無論是刪除、添加、修改,或者A -> B -> A改回原值,都會被認爲發生了改動),那麼相應事務就被取消,不然事務正常執行。因此咱們能夠認爲 WATCH 是一個樂觀鎖。若是想讓key取消被監控,能夠用 UNWATCH 命令(這裏又要提一下,UNWATCH 若是在事務中執行,也是會被放到隊列裏的)。

 

redis事物與分佈式鎖 

  • redis事物
    • 嚴格意義來說,Redis的事務和咱們理解的傳統數據庫(如mysql)的事務是不同的;
    • Redis的事務實質上是命令的集合,在一個事務中要麼全部命令都被執行,要麼全部命令都不執行。
      • 須要注意的是:
        • Redis的事務沒有關係數據庫事務提供的回滾(rollback),因此開發者必須在事務執行失敗後進行後續的處理;
        • 若是在一個事務中的命令出現錯誤,那麼全部的命令都不會執行;
        • 若是在一個事務中出現運行錯誤,那麼正確的命令會被執行。
  • redis原子操做
    • 原子操做是指不會被線程調度機制打斷的操做
    • 這種操做一旦開始,就會一直運行到結束,中間不會切換任何進程
  • 分佈式鎖
    • 分佈式鎖本質是佔一個坑,當別的進程也要來佔坑時發現已經被佔,就會放棄或者稍後重試
    • 佔坑通常使用 setnx(set if not exists)指令,只容許一個客戶端佔坑
    • 先來先佔,用完了在調用del指令釋放坑
# > setnx lock:codehole true
# .... do something critical ....
# > del lock:codehole
    • 可是這樣有一個問題,若是邏輯執行到中間出現異常,可能致使del指令沒有被調用,這樣就會陷入死鎖,鎖永遠沒法釋放
    • 爲了解決死鎖問題,咱們拿到鎖時能夠加上一個expire過時時間,這樣即便出現異常,當到達過時時間也會自動釋放鎖
# > setnx lock:codehole true
# > expire lock:codehole 5
# .... do something critical ....
# > del lock:codehole
    • 這樣又有一個問題,setnx和expire是兩條指令而不是原子指令,若是兩條指令之間進程掛掉依然會出現死鎖
    • 爲了治理上面亂象,在redis 2.8中加入了set指令的擴展參數,使setnx和expire指令能夠一塊兒執行
# > set lock:codehole true ex 5 nx
# ''' do something '''
# > del lock:codehole 
  • 分佈式鎖舉例

    分佈式鎖,是一種思想,它的實現方式有不少。好比,咱們將沙灘當作分佈式鎖的組件,那麼它看起來應該是這樣的:

    • 加鎖

      • 加鎖實際上就是在redis中,給Key鍵設置一個值,爲避免死鎖,並給定一個過時

      • 在沙灘上踩一腳,留下本身的腳印,就對應了加鎖操做。其餘進程或者線程,看到沙灘上已經有腳印,證實鎖已被別人持有,則等待。

    • 解鎖

      • 解鎖的過程就是將Key鍵刪除。但也不能亂刪

      • 把腳印從沙灘上抹去,就是解鎖的過程。

    • 鎖超時

      • 爲了不死鎖,咱們能夠設置一陣風,在單位時間後颳起,將腳印自動抹去。

  • 對於分佈式鎖,注意的

    • 能夠保證在分佈式部署的應用集羣中,同一個方法在同一時間只能被一臺機器上的一個線程執行這把鎖要是一把可重入鎖(避免死鎖)這把鎖最好是一把阻塞鎖有高可用的獲取鎖和釋放鎖功能獲取鎖和釋放鎖的性能要好

 

 

redis雪崩&穿透&擊穿

  • 把redis做爲緩存使用已是司空見慣可是使用redis後也可能會碰到一系列的問題,尤爲是數據量很大的時候,經典的幾個問題以下:
  • 緩存和數據庫間數據一致性問題
    • 分佈式環境下(單機就不用說了)很是容易出現緩存和數據庫間的數據一致性問題,針對這一點的話,只能說,若是你的項目對緩存的要求是強一致性的,那麼請不要使用緩存。咱們只能採起合適的策略來下降緩存和數據庫間數據不一致的機率,而沒法保證二者間的強一致性。合適的策略包括 合適的緩存更新策略,更新數據庫後要及時更新緩存、緩存失敗時增長重試機制,例如MQ模式的消息隊列。
  • 緩存穿透
    • 定義
      • 緩存穿透是指查詢一個必定不存在的數據,因爲緩存不命中,接着查詢數據庫也沒法查詢出結果,
      • 雖然也不會寫入到緩存中,可是這將會致使每一個查詢都會去請求數據庫,形成緩存穿透;
    • 解決方法 :布隆過濾
    • 對全部可能查詢的參數以hash形式存儲,在控制層先進行校驗,不符合則丟棄,從而避免了對底層存儲系統的查詢壓力;
    • 若是查詢數據庫也爲空,直接設置一個默認值存放到緩存,這樣第二次到緩衝中獲取就有值了,而不會繼續訪問數據庫。設置一個過時時間或者當有值的時候將緩存中的值替換掉便可。能夠給key設置一些格式規則,而後查詢以前先過濾掉不符合規則的Key。

  • 緩存雪崩
    • 定義      
      • 緩存雪崩是指,因爲緩存層承載着大量請求,有效的保護了存儲層,可是若是緩存層因爲某些緣由總體不能提供服務
      • 因而全部的請求都會達到存儲層,存儲層的調用量會暴增,形成存儲層也會掛掉的狀況。
    • 解決方法
      • 保證緩存層服務高可用性:好比 Redis Sentinel 和 Redis Cluster 都實現了高可用
      • 依賴隔離組件爲後端限流並降級:好比對某個key只容許一個線程查詢數據和寫緩存,其餘線程等待。
      • 方案1、也是像解決緩存穿透同樣加鎖排隊,實現同上;
      • 方案2、創建備份緩存,緩存A和緩存B,A設置超時時間,B不設值超時時間,先從A讀緩存,A沒有讀B,而且更新A緩存和B緩存;
      • 方案3、設置緩存超時時間的時候加上一個隨機的時間長度,好比這個緩存key的超時時間是固定的5分鐘加上隨機的2分鐘,醬紫可從必定程度上避免雪崩問題;
public String getByKey(String keyA,String keyB) {
    String value = redisService.get(keyA);
    if (StringUtil.isEmpty(value)) {
        value = redisService.get(keyB);
        String newValue = getFromDbById();
        redisService.set(keyA,newValue,31, TimeUnit.DAYS);
        redisService.set(keyB,newValue);
    }
    return value;
}
  • 緩存擊穿
    • 定義:
      • 緩存擊穿,就是說某個 key 很是熱點,訪問很是頻繁,處於集中式高併發訪問的狀況
      • 當這個 key 在失效的瞬間,大量的請求就擊穿了緩存,直接請求數據庫,就像是在一道屏障上鑿開了一個洞。
    • 解決方法
      • 解決方式也很簡單,能夠將熱點數據設置爲永遠不過時;
      • 或者基於 redis or zookeeper 實現互斥鎖,等待第一個請求構建完緩存以後,再釋放鎖,進而其它請求才能經過該 key 訪問數據。
      • 方案一、使用互斥鎖排隊
        • 業界比價廣泛的一種作法,即根據key獲取value值爲空時,鎖上,從數據庫中load數據後再釋放鎖。若其它線程獲取鎖失敗,則等待一段時間後重試。這裏要注意,分佈式環境中要使用分佈式鎖,單機的話用普通的鎖(synchronized、Lock)就夠了。
public String getWithLock(String key, Jedis jedis, String lockKey, String uniqueId, long expireTime) {

    // 經過key獲取value

    String value = redisService.get(key);

    if (StringUtil.isEmpty(value)) {

        // 分佈式鎖,詳細能夠參考https://blog.csdn.net/fanrenxiang/article/details/79803037

        //封裝的tryDistributedLock包括setnx和expire兩個功能,在低版本的redis中不支持

        try {

            boolean locked = redisService.tryDistributedLock(jedis, lockKey, uniqueId, expireTime);

            if (locked) {

                value = userService.getById(key);

                redisService.set(key, value);

                redisService.del(lockKey);

                return value;

            } else {

                // 其它線程進來了沒獲取到鎖便等待50ms後重試
                Thread.sleep(50);
                getWithLock(key, jedis, lockKey, uniqueId, expireTime);
            }
        } catch (Exception e) {
            log.error("getWithLock exception=" + e);
            return value;
        } finally {
            redisService.releaseDistributedLock(jedis, lockKey, uniqueId);
        }
    }
    return value;
}
  • 這樣作思路比較清晰,也從必定程度上減輕數據庫壓力,可是鎖機制使得邏輯的複雜度增長,吞吐量也下降了,有點治標不治本。
  • 方案二、接口限流與熔斷、降級
    • 重要的接口必定要作好限流策略,防止用戶惡意刷接口,同時要降級準備,當接口中的某些服務不可用時候,進行熔斷,失敗快速返回機制。
  • 方案三、布隆過濾器
    • bloomfilter就相似於一個hash set,用於快速判某個元素是否存在於集合中,其典型的應用場景就是快速判斷一個key是否存在於某容器,不存在就直接返回。布隆過濾器的關鍵就在於hash算法和容器大小,下面先來簡單的實現下看看效果,我這裏用guava實現的布隆過濾器:
<dependencies>  
     <dependency>  
         <groupId>com.google.guava</groupId>  
         <artifactId>guava</artifactId>  
         <version>23.0</version>  
     </dependency>  
</dependencies>  
public class BloomFilterTest {


    private static final int capacity = 1000000;

    private static final int key = 999998;


    private static BloomFilter<Integer> bloomFilter = BloomFilter.create(Funnels.integerFunnel(), capacity);


    static {

        for (int i = 0; i < capacity; i++) {

            bloomFilter.put(i);

        }

    }

    public static void main(String[] args) {

        /*返回計算機最精確的時間,單位微妙*/

        long start = System.nanoTime();


        if (bloomFilter.mightContain(key)) {

            System.out.println("成功過濾到" + key);

        }

        long end = System.nanoTime();

        System.out.println("布隆過濾器消耗時間:" + (end - start));

        int sum = 0;

        for (int i = capacity + 20000; i < capacity + 30000; i++) {

            if (bloomFilter.mightContain(i)) {

               sum = sum + 1;

           }

        }
        System.out.println("錯判率爲:" + sum);
    }
}

# 成功過濾到999998
# 布隆過濾器消耗時間:215518
# 錯判率爲:318
  • 能夠看到,100w個數據中只消耗了約0.2毫秒就匹配到了key,速度足夠快。而後模擬了1w個不存在於布隆過濾器中的key,匹配錯誤率爲318/10000,也就是說,出錯率大概爲3%,跟蹤下BloomFilter的源碼發現默認的容錯率就是0.03:
public static <T> BloomFilter<T> create(Funnel<T> funnel, int expectedInsertions /* n */) {

  return create(funnel, expectedInsertions, 0.03); // FYI, for 3%, we always get 5 hash functions

}
#
http://www.javashuo.com/article/p-cmxicfey-du.html:詳細操做
  • 緩存併發
    • 這裏的併發指的是多個redis的client同時set key引發的併發問題。其實redis自身就是單線程操做,多個client併發操做,按照先到先執行的原則,先到的先執行,其他的阻塞。固然,另外的解決方案是把redis.set操做放在隊列中使其串行化,必須的一個一個執行。
  • 緩存預熱
    • 緩存預熱就是系統上線後,將相關的緩存數據直接加載到緩存系統。
    • 這樣就能夠避免在用戶請求的時候,先查詢數據庫,而後再將數據緩存的問題!用戶直接查詢事先被預熱的緩存數據!
  • 解決思路:
    • 一、直接寫個緩存刷新頁面,上線時手工操做下;
    • 二、數據量不大,能夠在項目啓動的時候自動進行加載;
    • 目的就是在系統上線前,將數據加載到緩存中。

 

Redis 項目緩存實現

  • 關於redis爲何能做爲緩存這個問題咱們就不說了,直接來講一下redis緩存到底如何在項目中使用吧:
  • 1.redis緩存如何在項目中配置?
    •  1.1redis緩存單機版和集羣版配置?(redis的客戶端jedis經常使用)
複製代碼
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.0.xsd">
    <!-- 鏈接池配置 -->
    <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
        <!-- 最大鏈接數 -->
        <property name="maxTotal" value="30" />
        <!-- 最大空閒鏈接數 -->
        <property name="maxIdle" value="10" />
        <!-- 每次釋放鏈接的最大數目 -->
        <property name="numTestsPerEvictionRun" value="1024" />
        <!-- 釋放鏈接的掃描間隔(毫秒) -->
        <property name="timeBetweenEvictionRunsMillis" value="30000" />
        <!-- 鏈接最小空閒時間 -->
        <property name="minEvictableIdleTimeMillis" value="1800000" />
        <!-- 鏈接空閒多久後釋放, 當空閒時間>該值 且 空閒鏈接>最大空閒鏈接數 時直接釋放 -->
        <property name="softMinEvictableIdleTimeMillis" value="10000" />
        <!-- 獲取鏈接時的最大等待毫秒數,小於零:阻塞不肯定的時間,默認-1 -->
        <property name="maxWaitMillis" value="1500" />
        <!-- 在獲取鏈接的時候檢查有效性, 默認false -->
        <property name="testOnBorrow" value="true" />
        <!-- 在空閒時檢查有效性, 默認false -->
        <property name="testWhileIdle" value="true" />
        <!-- 鏈接耗盡時是否阻塞, false報異常,ture阻塞直到超時, 默認true -->
        <property name="blockWhenExhausted" value="false" />
    </bean>  
    <!-- jedis客戶端單機版 -->
    <bean id="redisClient" class="redis.clients.jedis.JedisPool">
        <constructor-arg name="host" value="192.168.146.131"></constructor-arg>
        <constructor-arg name="port" value="6379"></constructor-arg>
        <constructor-arg name="poolConfig" ref="jedisPoolConfig"></constructor-arg>
    </bean>
    <bean id="jedisClient" class="com.taotao.rest.dao.impl.JedisClientSingle"/>
     
     
    <!-- jedis集羣版配置 -->
    <!-- <bean id="redisClient" class="redis.clients.jedis.JedisCluster">
        <constructor-arg name="nodes">
            <set>
                <bean class="redis.clients.jedis.HostAndPort">
                    <constructor-arg name="host" value="192.168.25.153"></constructor-arg>
                    <constructor-arg name="port" value="7001"></constructor-arg>
                </bean>
                <bean class="redis.clients.jedis.HostAndPort">
                    <constructor-arg name="host" value="192.168.25.153"></constructor-arg>
                    <constructor-arg name="port" value="7002"></constructor-arg>
                </bean>
                <bean class="redis.clients.jedis.HostAndPort">
                    <constructor-arg name="host" value="192.168.25.153"></constructor-arg>
                    <constructor-arg name="port" value="7003"></constructor-arg>
                </bean>
                <bean class="redis.clients.jedis.HostAndPort">
                    <constructor-arg name="host" value="192.168.25.153"></constructor-arg>
                    <constructor-arg name="port" value="7004"></constructor-arg>
                </bean>
                <bean class="redis.clients.jedis.HostAndPort">
                    <constructor-arg name="host" value="192.168.25.153"></constructor-arg>
                    <constructor-arg name="port" value="7005"></constructor-arg>
                </bean>
                <bean class="redis.clients.jedis.HostAndPort">
                    <constructor-arg name="host" value="192.168.25.153"></constructor-arg>
                    <constructor-arg name="port" value="7006"></constructor-arg>
                </bean>
            </set>
        </constructor-arg>
        <constructor-arg name="poolConfig" ref="jedisPoolConfig"></constructor-arg>
    </bean>
    <bean id="jedisClientCluster" class="com.taotao.rest.dao.impl.JedisClientCluster"></bean> -->
</beans>
複製代碼
  •  1.2redis的方法定義?
    • 接口:

  • 實現:分集羣和單機版

  • 單機版實現方法:
複製代碼
package com.taotao.rest.dao.impl;
 
import org.springframework.beans.factory.annotation.Autowired;
 
import com.taotao.rest.dao.JedisClient;
 
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
 
public class JedisClientSingle implements JedisClient{
     
    @Autowired
    private JedisPool jedisPool;
     
    @Override
    public String get(String key) {
        Jedis jedis = jedisPool.getResource();
        String string = jedis.get(key);
        jedis.close();
        return string;
    }
 
    @Override
    public String set(String key, String value) {
        Jedis jedis = jedisPool.getResource();
        String string = jedis.set(key, value);
        jedis.close();
        return string;
    }
 
    @Override
    public String hget(String hkey, String key) {
        Jedis jedis = jedisPool.getResource();
        String string = jedis.hget(hkey, key);
        jedis.close();
        return string;
    }
 
    @Override
    public long hset(String hkey, String key, String value) {
        Jedis jedis = jedisPool.getResource();
        Long result = jedis.hset(hkey, key, value);
        jedis.close();
        return result;
    }
 
    @Override
    public long incr(String key) {
        Jedis jedis = jedisPool.getResource();
        Long result = jedis.incr(key);
        jedis.close();
        return result;
    }
 
    @Override
    public long expire(String key, int second) {
        Jedis jedis = jedisPool.getResource();
        Long result = jedis.expire(key, second);
        jedis.close();
        return result;
    }
 
    @Override
    public long ttl(String key) {
        Jedis jedis = jedisPool.getResource();
        Long result = jedis.ttl(key);
        jedis.close();
        return result;
    }
 
    @Override
    public long del(String key) {
        Jedis jedis = jedisPool.getResource();
        Long result = jedis.del(key);
        jedis.close();
        return result;
    }
 
    @Override
    public long hdel(String hkey, String key) {
        Jedis jedis = jedisPool.getResource();
        Long result = jedis.hdel(hkey, key);
        jedis.close();
        return result;
    }
 
}
複製代碼
  • 集羣版的實現方法
複製代碼
package com.taotao.rest.dao.impl;
 
import org.springframework.beans.factory.annotation.Autowired;
 
import com.taotao.rest.dao.JedisClient;
 
import redis.clients.jedis.JedisCluster;
 
public class JedisClientCluster implements JedisClient {
 
    @Autowired
    private JedisCluster jedisCluster;
     
    @Override
    public String get(String key) {
        return jedisCluster.get(key);
    }
 
    @Override
    public String set(String key, String value) {
        return jedisCluster.set(key, value);
    }
 
    @Override
    public String hget(String hkey, String key) {
        return jedisCluster.hget(hkey, key);
    }
 
    @Override
    public long hset(String hkey, String key, String value) {
        return jedisCluster.hset(hkey, key, value);
    }
 
    @Override
    public long incr(String key) {
        return jedisCluster.incr(key);
    }
 
    @Override
    public long expire(String key, int second) {
        return jedisCluster.expire(key, second);
    }
 
    @Override
    public long ttl(String key) {
        return jedisCluster.ttl(key);
    }
 
    @Override
    public long del(String key) {
         
        return jedisCluster.del(key);
    }
 
    @Override
    public long hdel(String hkey, String key) {
         
        return jedisCluster.hdel(hkey, key);
    }
 
}
複製代碼
  • 配置好後,如何添加到代碼中呢?
  • 2.redis緩存如何添加到業務邏輯代碼中?
    • redis做爲緩存的做用就是減小對數據庫的訪問壓力,當咱們訪問一個數據的時候,首先咱們從redis中查看是否有該數據,若是沒有,則從數據庫中讀取,將從數據庫中讀取的數據存放到緩存中,下次再訪問一樣的數據的是,仍是先判斷redis中是否存在該數據,若是有,則從緩存中讀取,不訪問數據庫了。
    • 舉個例子:根據內容分類id訪問內容:
複製代碼
package com.taotao.rest.service.impl;
 
import java.util.ArrayList;
import java.util.List;
 
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
 
import com.taotao.commonEntity.JsonUtils;
import com.taotao.commonEntity.TaotaoResult;
import com.taotao.mapper.TbContentMapper;
import com.taotao.pojo.TbContent;
import com.taotao.pojo.TbContentExample;
import com.taotao.pojo.TbContentExample.Criteria;
import com.taotao.rest.dao.JedisClient;
import com.taotao.rest.service.ContentService;
 
import redis.clients.jedis.Jedis;
//首頁大廣告位的獲取服務層信息
@Service
public class ContentServiceImpl implements ContentService {
     
    @Value("${CONTENTCATEGORYID}")
    private String CONTENTCATEGORYID;
    @Autowired
    private TbContentMapper contentMapper;
    @Autowired
    private JedisClient jedisClient;
     
    @Override
    public List<TbContent> getContentList(Long categoryId) {
        /*通常第一次訪問的時候先從數據庫讀取數據,而後將數據寫入到緩存,再次訪問同一內容的時候就從緩存中讀取,若是緩存中沒有則從數據庫中讀取
        因此咱們添加緩存邏輯的時候,從數據庫中將內容讀取出來以後,先set入緩存,而後再從緩存中添加讀取行爲,若是緩存爲空則從數據庫中進行讀取
        */
        //從緩存中獲取值
        String getData = jedisClient.hget(CONTENTCATEGORYID, categoryId+"");
        if (!StringUtils.isBlank(getData)) {
            List<TbContent> resultList= JsonUtils.jsonToList(getData, TbContent.class);
            return resultList; 
        }
        TbContentExample example=new TbContentExample();
        Criteria criteria = example.createCriteria();
        criteria.andCategoryIdEqualTo(categoryId);
       List<TbContent> list = contentMapper.selectByExample(example);
       //向緩存中放入值
       String jsonData = JsonUtils.objectToJson(list);
       jedisClient.hset(CONTENTCATEGORYID, categoryId+"",jsonData);
        return list;
    }
 
}
複製代碼
  • 因此這裏就是寫邏輯代碼的時候,在業務功能處,從緩存中讀取-----從db中讀取----將數據寫入緩存。
  • 3.針對上面出現的問題:
    • 當咱們後臺數據庫中內容修改以後,由於緩存中的內容沒有修改,咱們訪問的時候都是先訪問緩存,因此即便數據庫中的內容修改了,可是頁面的顯示仍是不會改變的。由於緩存沒有更新,因此這就涉及到緩存同步的問題:即數據庫修改了內容與緩存中對應的內容同步。
    • 緩存同步的原理:就是將redis中的key進行刪除,下次訪問的時候,redis中沒有改數據,則從DB進行查詢,再次更新到redis中。
    • 咱們能夠寫一個緩存同步的服務:

  • 緩存同步除了查詢是沒有涉及到同步問題,增長刪除修改都會涉及到同步問題。
  • 只須要在後臺進行CRUD的地方添加調用該緩存同步的服務便可:

  • 5.redis客戶端jedis的使用:

 

效果圖終於結束了,但願能幫助你們,多多支持,關注不迷路哦!!!

相關文章
相關標籤/搜索