String
一、概念:string是redis最基本的類型,你能夠理解成與Memcached如出一轍的類型,一個key對應一個value。
string類型是二進制安全的。意思是redis的string能夠包含任何數據。好比jpg圖片或者序列化的對象 。
string類型是Redis最基本的數據類型,一個鍵最大能存儲512MB。html
二、實例:java
redis 127.0.0.1:6379> SET name "runoob" OK redis 127.0.0.1:6379> GET name "runoob"
Hash
一、概念:hash 是一個鍵值(key=>value)對集合。hash 是一個 string 類型的 field 和 value 的映射表,hash 特別適合用於存儲對象。
二、實例:node
redis> HMSET myhash field1 "Hello" field2 "World" "OK" redis> HGET myhash field1 "Hello" redis> HGET myhash field2 "World"
List
一、概念:list是簡單的字符串列表,按照插入順序排序。你能夠添加一個元素到列表的頭部或者尾部。
二、實例:redis
redis 127.0.0.1:6379> lpush runoob redis (integer) 1 redis 127.0.0.1:6379> lpush runoob mongodb (integer) 2 redis 127.0.0.1:6379> lpush runoob rabitmq (integer) 3 redis 127.0.0.1:6379> lrange runoob 0 10 1) "rabitmq" 2) "mongodb" 3) "redis"
Set
一、概念:Redis的Set是string類型的無序集合。集合是經過哈希表實現的,因此添加,刪除,查找的複雜度都是O(1)。
二、實例:算法
redis 127.0.0.1:6379> sadd runoob redis (integer) 1 redis 127.0.0.1:6379> sadd runoob mongodb (integer) 1 redis 127.0.0.1:6379> sadd runoob rabitmq (integer) 1 redis 127.0.0.1:6379> sadd runoob rabitmq (integer) 0 redis 127.0.0.1:6379> smembers runoob 1) "redis" 2) "rabitmq" 3) "mongodb"
zset
一、zset 和 set 同樣也是string類型元素的集合,且不容許重複的成員。不一樣的是每一個元素都會關聯一個double類型的分數。redis正是經過分數來爲集合中的成員進行從小到大的排序。zset的成員是惟一的,但分數(score)卻能夠重複。
二、實例:mongodb
redis 127.0.0.1:6379> zadd runoob 0 redis (integer) 1 redis 127.0.0.1:6379> zadd runoob 0 mongodb (integer) 1 redis 127.0.0.1:6379> zadd runoob 0 rabitmq (integer) 1 redis 127.0.0.1:6379> zadd runoob 0 rabitmq (integer) 0 redis 127.0.0.1:6379> > ZRANGEBYSCORE runoob 0 1000 1) "mongodb" 2) "rabitmq" 3) "redis" ``
用於Key的命令數據庫
SET
: 爲Key設置值,實例:SET runoobkey redisEXISTS
: 查詢Key是否存在,存在返回1,不然返回0。實例:EXISTS runoob-new-keyPPTL
: 以毫秒爲單位返回 key 的剩餘過時時間。實例:PTTL KEY_NAMETTL
: 秒爲單位返回 key 的剩餘過時時間。 實例: TTL KEY_NAME用於字符串的命令json
SET
: 設置指定key的值。實例:SET KEY_NAME valueGET
: 獲取指定key的值。實例:GET KEY_NAMEINCR
: 把key中存儲的數字值增1。實例:INCR KEY_NAMEDECR
: 把key中存儲的數字值減1。實例:DECR KEY_NAME用於Hash表的命令後端
HGET
: 獲取hash表中的指定字段的值。實例:HGET KEY_NAME FIELD_NAMEHGETALL
: 獲取hash表中全部字段和 對應的值。實例:HGETALL KEY_NAMEHKEYS
: 獲取hash表中全部字段。實例:HKEYS KEY_NAME用於List集合的命令瀏覽器
LRANGE
: 獲取集合指定範圍內的元素。實例:LRANGE key start stop 獲取下標從start到stop的元素Scan
一、SCAN命令是增量的循環,每次調用只會返回一小部分的元素。因此不會有KEYS命令的坑。 SCAN命令返回的是一個遊標,從0開始遍歷,到0結束遍歷。scan也有以下一些特設:
(1)查詢複雜度爲O(n),經過遊標分步進行,不會阻塞線程
(2)提供limit參數,控制每次返回結果的最大條數。這裏值得注意的是,limit只是一個提示,返回的結果可多可少
(3)同keys同樣,它也提供模式匹配功能
(4)返回的結果可能會重複,須要客戶端去重
(5)遍歷過程當中,若是有數據修改,改動後的數據不必定能遍歷到
(6)單次返回結果是空的並不意味着遍歷結束,而是看返回的遊標值是否爲0
二、使用實例:
(1)先插入10個key
(2)而後使用Scan進行掃描:
這時候返回key三、key5,返回的遊標爲744
(3)使用744遊標繼續查詢:
返回key一、key4,返回遊標爲268。同理,繼續查詢能夠查詢出匹配key*的全部key,直到遊標爲0爲止,結果以下圖:
一、概念:
HyperLogLog這種數據結構用於解決去重統計問題,它提供了不精確的去重計數方案,標準偏差在0.81%。
二、使用方法:
(1)pfadd:增長計數,pfadd KEY_NAME id
(2)pfcount:獲取計數,pfcount KEY_NAME
(3)pfmerge:用於將多個pf計數值累加在一塊兒造成一個新的pf值
三、原理:較爲複雜,後續補充
一、概念:指的是客戶端容許將多個請求依次發給服務器,過程當中而不須要等待請求的回覆,在最後再一併讀取結果便可。主要做用在於提升吞吐量
上圖中能夠看出,全部的請求合併爲一次IO,除了時延能夠下降以外,還能大幅度提高系統吞吐量。
二、實例:
package com.redis; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; import redis.clients.jedis.Pipeline; import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 管道處理實例 * @author huangy on 2018/8/5 */ public class Piple { private static Logger logger = LoggerFactory.getLogger(Piple.class); private static JedisPool jedisPool; // 總共有多少個併發任務 private static final int taskCount = 50; // 管道大小 private static final int batchSize = 10; // 每一個任務處理的命令數 private static final int cmdCount = 1000; // redis服務地址 private static final String host = "127.0.0.1"; // 端口 private static final int port = 6379; // 是否使用管道 private static final boolean usePipeline = true; // 保存執行結果 private static List<Object> results = new ArrayList<>(cmdCount); private static void init() { JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMaxIdle(100); poolConfig.setTestOnBorrow(false); poolConfig.setTestOnReturn(false); jedisPool = new JedisPool(poolConfig, host, port); } public static void main(String[] args) throws InterruptedException { // 初始化redis init(); long beginTime = System.currentTimeMillis(); // 使用多線程執行 ExecutorService executor = Executors.newCachedThreadPool(); // 使用CountDownLatch保證全部任務都執行完 CountDownLatch latch = new CountDownLatch(taskCount); for (int i = 0; i < taskCount; i++) { executor.submit(new DemoTask(i, latch)); } latch.await(); executor.shutdownNow(); long endTime = System.currentTimeMillis(); // 耗時 logger.info("execution cost time(s)={}", (endTime - beginTime) / 1000.0); // 全部結果 logger.info("results, size={}, all={}", results.size(), results); } private static Jedis get() { return jedisPool.getResource(); } private static class DemoTask implements Runnable { private int id; private CountDownLatch latch; private DemoTask(int id, CountDownLatch latch) { this.id = id; this.latch = latch; } public void run() { logger.info("Task[{}] start.", id); try { if (usePipeline) { runWithPipeline(); } else { runWithNonPipeline(); } } finally { latch.countDown(); } logger.info("Task[{}] end.", id); } /** * 不使用管道處理redis命令 */ private void runWithNonPipeline() { // 總共處理cmdCount個 for (int i = 0; i < cmdCount; i++) { Jedis jedis = get(); try { jedis.set(key(i), UUID.randomUUID().toString()); } finally { if (jedis != null) { jedisPool.returnResource(jedis); } } if (i % batchSize == 0) { logger.info("Task[{}] process -- {}", id, i); } } } /** * 使用管道處理redis命令 * 總共處理cmdCount個redis命令,每次處理batchSize個 */ private void runWithPipeline() { for (int i = 0; i < cmdCount;) { Jedis jedis = get(); try { Pipeline pipeline = jedis.pipelined(); int j; for (j = 0; j < batchSize; j++) { if (i + j < cmdCount) { pipeline.set(key(i + j), UUID.randomUUID().toString()); } else { break; } } // pipeline.sync(); // 查看結果,使用syncAndReturnAll List<Object> tem = pipeline.syncAndReturnAll(); logger.info("Task thread id={} pipeline, cmd end={}, result size={}", id, i + j, tem.size()); results.addAll(tem); i += j; } finally { if (jedis != null) { jedisPool.returnResource(jedis); } } } } private String key(int i) { return i + ""; } } }
上述實例使用50個線程進行處理,每一個線程分別處理了1000個redis命令(set),在不使用管道的狀況下,耗時大約5秒;使用管道的狀況下,耗時大約2秒。
三、缺點:
(1)pipeline機制能夠優化吞吐量,但沒法提供原子性/事務保障,而這個能夠經過Redis-Multi等命令實現。
(2)部分讀寫操做存在相關依賴,沒法使用pipeline實現
一、概念:redis實現異步隊列,通常使用list結構做爲隊列,rpush生產消息,lpop消費消息。當lpop沒有消息的時候,要適當sleep一會再重試。
二、sleep優化:
(1)緣由:客戶端是經過隊列的pop操做來獲取消息,而後進行處理,處理完了再接着得到消息。但是若是隊列空了,客戶端就會不停的pop,這樣的操做不能獲取數據,而且拉高了客戶端的CPU,redis的QPS也會拉高,下降redis的查詢效率。
(2)使用sleep解決這個問題,讓線程睡一會,好比說1s。可讓客戶端的CPU降下來。
三、blpop/brpop 優化:
(1)緣由:sleep的方法能夠優化,可是睡眠會致使消息延遲增大。
(2)使用blpop(阻塞讀):阻塞讀在隊列沒有數據的時候,會當即進入休眠狀態,一旦數據到來,則馬上醒過來。
(3)PS:blpop指隊列左邊出數據。brpop指隊列右邊出數據
(4)注意點:當線程一直sleep,服務器通常會斷開鏈接,這時候blpop會拋出異常,因此在編寫客戶端消費者的時候要當心,注意捕獲異常,還要重試。
四、延時隊列:
(1)延時隊列能夠經過redis的zset(有序列表)來實現。
(2)簡單的代碼實例:
/** * 延時隊列案例 * @author huangy on 2018/8/26 */ @Component public class QueueDemo { private static final Logger LOGGER = LoggerFactory.getLogger(QueueDemo.class); @Resource private Jedis jedis; private static final String QUEUE_KEY = "queueKey"; /** * 將任務扔到延時隊列中 * 使用時間做爲score * @param msg 任務(通常使用json格式的字符串) */ public void delay(String msg) { jedis.zadd(QUEUE_KEY, System.currentTimeMillis() + 5000, msg); } /** * 從延時隊列中獲取任務,利用時間範圍做爲score的範圍 * zrangeByScore :返回有序集 key 中,全部 score 值介於 min 和 max 之間(包括等於 min 或 max )的成員 */ public void loop() { while (!Thread.interrupted()) { Set<String> values = jedis.zrangeByScore(QUEUE_KEY, 0, 577742424, 0, 1); if (values.isEmpty()) { try { Thread.sleep(500); } catch (Exception e) { LOGGER.error("loop sleep fail", e); break; } continue; } String s = values.iterator().next(); // 利用zrem判斷是否拿到任務 if (jedis.zrem(QUEUE_KEY, s) > 0) { // 拿到這個任務了,進行處理 LOGGER.info("loop msg, msg={}", s); } } } }
hash(key) % length
,當緩存服務器變化時,length字段變化,致使全部緩存的數據須要從新進行HASH運算,才能使用。而這段時間若是訪問量上升了,容易引發服務器雪崩。所以,引入了一致性哈希對客戶端來講,整個cluster被看作是一個總體,客戶端能夠鏈接任意一個node進行操做,就像操做單一Redis實例同樣,當客戶端操做的key沒有分配到該node上時,就像操做單一Redis實例同樣,當客戶端操做的key沒有分配到該node上時,Redis會返回轉向指令,指向正確的node,這有點兒像瀏覽器頁面的302 redirect跳轉。
Redis Cluster是Redis 3.0之後才正式推出,時間較晚,目前能證實在大規模生產環境下成功的案例還不是不少,須要時間檢驗。
和MySQL主從複製的緣由同樣,Redis雖然讀取寫入的速度都特別快,可是也會產生讀壓力特別大的狀況。爲了分擔讀壓力,Redis支持主從複製,Redis的主從結構能夠採用一主多從或者級聯結構,下圖爲級聯結構。
Redis主從複製能夠根據是不是全量分爲全量同步和增量同步。
一、全量同步
Redis全量複製通常發生在Slave初始化階段,這時Slave須要將Master上的全部數據都複製一份。具體步驟以下:
1)從服務器鏈接主服務器,發送SYNC命令;
2)主服務器接收到SYNC命名後,開始執行BGSAVE命令生成RDB文件並使用緩衝區記錄此後執行的全部寫命令;
3)主服務器BGSAVE執行完後,向全部從服務器發送快照文件,並在發送期間繼續記錄被執行的寫命令;
4)從服務器收到快照文件後丟棄全部舊數據,載入收到的快照;
5)主服務器快照發送完畢後開始向從服務器發送緩衝區中的寫命令;
6)從服務器完成對快照的載入,開始接收命令請求,並執行來自主服務器緩衝區的寫命令;
Redis全量同步過程
完成上面幾個步驟後就完成了從服務器數據初始化的全部操做,從服務器此時能夠接收來自用戶的讀請求。
二、增量同步
Redis增量複製是指Slave初始化後開始正常工做時主服務器發生的寫操做同步到從服務器的過程。
增量複製的過程主要是主服務器每執行一個寫命令就會向從服務器發送相同的寫命令,從服務器接收並執行收到的寫命令。
三、Redis主從同步策略
主從剛剛鏈接的時候,進行全量同步;全同步結束後,進行增量同步。固然,若是有須要,slave 在任什麼時候候均可以發起全量同步。redis 策略是,不管如何,首先會嘗試進行增量同步,如不成功,要求從機進行全量同步。
四、其餘:Redis 2.8之後提供了PSYNC優化了斷線重連的效率
http://blog.csdn.net/sk199048...
案例情景
若是緩存集中在一段時間內失效,發生大量的緩存穿透,全部的查詢都落在數據庫上,形成了緩存雪崩。而且若是大量的key過時時間設置的過於集中,到過時的那個時間點,redis可能會出現短暫的卡頓現象
解決方法
一、使用互斥鎖(這裏使用redis的setnx實現分佈式鎖):
只有獲取鎖,才能去訪問數據庫,加載數據,保證同一時刻只有一個請求訪問數據庫,避免數據庫崩潰。但這樣子可能形成請求延時比較久的問題。
二、讓緩存過時時間不那麼集中:
好比咱們能夠在原有的失效時間基礎上增長一個隨機值,好比1-5分鐘隨機,這樣每個緩存的過時時間的重複率就會下降,就很難引起集體失效的事件
三、作二級緩存
A1爲原始緩存,A2爲拷貝緩存,A1失效時,能夠訪問A2。A1緩存失效時間設置爲短時間,A2設置爲長期。這個在業務追求數據一致性要求不高的狀況下,可使用。
四、緩存永遠不過時
這裏的「永遠不過時」包含兩層意思:
(1) 從緩存上看,確實沒有設置過時時間,這就保證了,不會出現熱點key過時問題,也就是「物理」不過時。
(2) 從功能上看,若是不過時,那不就成靜態的了嗎?因此咱們把過時時間存在key對應的value裏,若是發現要過時了,經過一個後臺的異步線程進行緩存的構建,也就是「邏輯」過時.
從實戰看,這種方法對於性能很是友好,惟一不足的就是構建緩存時候,其他線程(非構建緩存的線程)可能訪問的是老數據,可是對於通常的互聯網功能來講這個仍是能夠忍受。
案例情景
緩存穿透是指查詢一個必定不存在的數據,因爲緩存是不命中時須要從數據庫查詢,查不到數據則不寫入緩存,這將致使這個不存在的數據每次請求都要到數據庫去查詢,形成緩存穿透
解決方法
使用Bloom filter(布隆過濾器)。當用戶查詢某個row時,能夠先經過內存中的布隆過濾器過濾掉大量不存在的row請求,而後再到數據庫進行查詢。
參考:
http://www.zsythink.net/archi...
http://blog.csdn.net/upxiaofe...
http://www.runoob.com/redis/r...
https://blog.csdn.net/sk19904...
https://www.cnblogs.com/littl...
https://blog.csdn.net/fei3342...