gogogo 咱們正式進入主題吧,java
Redis is an open source (BSD licensed), in-memory data structure store, used as a database, cache and message broker. It supports data structures such as strings, hashes, lists, sets, sorted sets with range queries, bitmaps, hyperloglogs and geospatial indexes with radius queries. Redis has built-in replication, Lua scripting, LRU eviction, transactions and different levels of on-disk persistence, and provides high availability via Redis Sentinel and automatic partitioning with Redis Cluster。git
推介看《Redis In Action》很是很是很是不錯,尤爲前三章。使用Redis的關鍵點不是設計數據庫,而是如何選擇合適場景的數據結構。就像咱們以前操做容器時,選擇不一樣容易裝不一樣對象。每一個都有各自的各點,具體的內容過兩天看源碼的時候分享給你們。今天的主題是如下五種數據結構。github
主要5種數據結構:redis
String數據結構是簡單的Key-Value類型,Value不只能夠是String,也能夠是數字。使用String類型,能夠徹底實現目前Memcached
(只有一種String)的功能,而且效率更高,還能夠享受Redis的定時持久化(RDB模式或AOF模式),提供日誌及Replication等功能。除了提供與Memcached的get、set、incr、decr等操做外,Redis還提供了下面一些操做:數據庫
//一、計數器的使用 String articleId = String.valueOf(conn.incr("article:")); //String.valueOf(int i) : 將 int 變量 i 轉換成字符串
放一些經常使用命令,方便本身複習瀏覽器
set key value [ex 秒數] / [px 毫秒數] [nx] /[xx] //返回1表示成功,0失敗 incr key //對key作加加操做, decr key //對key作減減操做 setnx key value //僅當key不存在時才set,nx表示not exist。(重點) mset key1 value1 key2 value2 .。。 //一次設置多個key, -------------------------------------------------------------------------------- get key //若是key不存在返回null mget key1 key2 ... keyN //一次獲取多個key的值,若是對於的key不存在,則返回null getset key value //設置新值返回舊值。
分佈式鎖的思路:緩存
僞代碼:cookie
# get lock lock = 0 while lock != 1: timestamp = current Unix time + lock timeout + 1 //時間戳 lock = SETNX lock.foo timestamp //嘗試獲取鎖,返回0,則下面檢查是否超時,GET。 if lock == 1 or (now() > (GET lock.foo) and now() > (GETSET lock.foo timestamp)): break; //關鍵點上面這條代碼 else: sleep(10ms) # do your job do_job() # release if now() < GET lock.foo: DEL lock.foo
gogogosession
incr key //對key作加加操做, decr key //對key作減減操做 incrby key interge //對key加指定的數 incrbyfloat key floatnumber //針對浮點數 append key value //返回新字符的長度 substr key start end //並不會修改字符串,返回截取的字符串 getrange key start end //返回子串 strlen key //取指定key的Value
在Memcached中,常常就愛那個一些結構化的信息打包成hashMap,在客戶端序列化後存儲爲一個字符串的值,(一般爲JSON格式),好比用戶的暱稱、年齡、性別、積分。這時候在須要修改其中某一項時,通用須要將字符串(JSON)取出來,而後進行反序列化,修改某一項的值,在序列化成字符串(JSON)存儲回去。而Redis和Hash結構可使你像在數據庫中Update一個屬性同樣只修改某一項屬性值。數據結構
底層實現是hash table,通常操做複雜度是O(1),要同時操做多個field時就是O(N),N是field的數量。應用場景:土法建索引。好比User對象,除了id有時還要按name來查詢。
經常使用命令:
hset key field value //設置hash field爲指定值,若是key不存在,則先建立 hsetnx //同時,若是存在返回0,nx是not exist的意思 hmset key filed1 value1 ... filedN valueN //設置多個值 hget key field //獲取指定的hash field hmget key field1 field2 //獲取所有指定的field ------------------------------------------------------------------------------- hincrby key field integer //將指定的hash field加上給定值 hexists key field //測試指定field是否存在 hdel key field //刪除指定的field hlen key //返回會指定hash的field數量 hgetall //返回hash全部field和value
場景:
/** * 使用Redis從新實現登陸cookie,取代目前由關係型數據庫實現的登陸cookie功能 * 一、將使用一個散列來存儲登陸cookie令牌與與登陸用戶之間的映射。 * 二、須要根據給定的令牌來查找與之對應的用戶,並在已經登陸的狀況下,返回該用戶id。 */ //一、嘗試獲取並返回令牌對應的用戶 注意「login:」 通常使用冒號作分割符,這是不成文的規矩 conn.hget("login:", token); //二、維持令牌與已登陸用戶之間的映射。 conn.hset("login:", token, user); //六、移除被刪除令牌對應的用戶信息 conn.hdel("login:", tokens); ---------------------------注意中間還有其餘步驟---------------------------------- /** * 使用cookie實現購物車——就是將整個購物車都存儲到cookie裏面, * 優勢:無需對數據庫進行寫入就能夠實現購物車功能, * 缺點:怎是程序須要從新解析和驗證cookie,確保cookie的格式正確。而且包含商品能夠正常購買 * 還有一缺點:由於瀏覽器每次發送請求都會連cookie一塊兒發送,因此若是購物車的體積較大, * 那麼請求發送和處理的速度可能下降。 * ----------------------------------------------------------------- * 一、每一個用戶的購物車都是一個散列,存儲了商品ID與商品訂單數量之間的映射。 * 二、若是用戶訂購某件商品的數量大於0,那麼程序會將這件商品的ID以及用戶訂購該商品的數量添加到散列裏。 * 三、若是用戶購買的商品已經存在於散列裏面,那麼新的訂單數量會覆蓋已有的。 * 四、相反,若是某用戶訂購某件商品數量不大於0,那麼程序將從散列裏移除該條目 * 五、須要對以前的會話清理函數進行更新,讓它在清理會話的同時,將舊會話對應的用戶購物車也一併刪除。 */ //一、從購物車裏面移除指定的商品 注意"cart:" 能夠在數據遷移、轉換 、和刪除時輕鬆識別 conn.hdel("cart:" + session, item); //二、將指定的商品添加到購物車 conn.hset("cart:" + session, item, String.valueOf(count)); //六、移除被刪除令牌對應的用戶信息 conn.hdel("login:", tokens); -------------------------------------------------------------------------------- //五、將文章信息存儲到一個散列裏面。 //HMSET key field value [field value ...] //同時將多個 field-value (域-值)對設置到哈希表 key 中。 //此命令會覆蓋哈希表中已存在的域。 conn.hmset(article, articleData); //爲哈希表 key 中的域 field 的值加上增量 increment 。 //增量也能夠爲負數,至關於對給定域進行減法操做。 //HINCRBY counter page_view 200 conn.hincrBy(article, "votes", 1L); //若是返回1表示尚未這個數據,注意-1後面的L conn.hincrBy(article, "votes", -1L); //三、根據文章ID獲取文章的詳細信息 Map<String,String> articleData = conn.hgetAll(id); --------------------------測試------------------------------------------------- //二、測試文章的投票過程 articleVote(conn, "other_user", "article:" + articleId); String votes = conn.hget("article:" + articleId, "votes"); System.out.println("咱們爲該文章投票,目前該文章的票數 " + votes); assert Integer.parseInt(votes) > 1;
其實應用場景還有不少。這裏只是摘出程序片斷中的一部分,具體能夠點這裏。須要注意的是組合使用,取其精華、去其糟粕。
List說白了就是鏈表(Redis使用雙端鏈表實現的List),使用List結構,咱們能夠輕鬆的實現最新消息的排行等功能(好比Twitter的TimeLine),List的另外一個應用就是消息隊列,能夠利用List的PUSH操做,將任務存在List中,而後工做線POP操做取出任務執行。Redis還提供了操做List中某一段元素的API,能夠直接查詢,刪除List中某一段元素。
命令:
lpush key string //在key對應的list的頭部添加元素 rpush key string //在list的尾部添加元素 lpushx key value //若是key不存在,什麼都不作 rpushx key value //同上 linsert key BEFORE|AFTER pivot value //在list對應的位置以前或以後 --------------------------------------------------------------------------- llen key //查看列表對應的長度 lindex key index //指定索引的位置,0第一個 lrange key start end //查看一段列表 lrange key 0 -1 // -1表示返回全部數據 ltrim key start end //保留指定區間的元素 lset key index value //idnex表示指定索引的位置 ldel //刪除元素 blpop key [key ...] timeout //阻塞隊列 brpop key [key ...] timeout
基於redis構建消息隊列點這裏,寫的很是不錯。爲了本身複習,就拿來主義了。
通常來講,消息隊列有兩種場景:一種是發佈者訂閱者模式;一種是生產者消費者模式。利用redis這兩種場景的消息隊列都可以實現。
一、redis做爲消息中間件:
該方式是藉助redis的list結構實現的。Producer調用redis的lpush往特定key裏塞入消息,Consumer調用brpop
(阻塞方法)去不斷監聽該key。
// producer code String key = "demo:mq:test"; String msg = "hello world"; redisDao.lpush(key, msg); // consumer code String key = "demo:mq:test"; while (true) { // block invoke List<String> msgs = redisDao.brpop(BLOCK_TIMEOUT, listKey); //注意brpop if (msgs == null) continue; String jobMsg = msgs.get(1); processMsg(jobMsg); }
一、使用redis怎麼作消息隊列
二、訂閱-發佈系統
Pub/Sub 從字面上理解就是發佈(Publish)與訂閱(Subscribe),在 Redis 中,你能夠設定對某一個 key 值進行消息發佈及消息訂閱,當一個 key 值上進行了消息發佈後,全部訂閱它的客戶端都會收到相應的消息。這一功能最明顯的用法就是用做實時消息系統,好比普通的即時聊天,羣聊等功能。
代碼實現點這裏和上面同一我的,已徵求:
要使用Jedis的Publish/Subscribe功能,必須編寫對JedisPubSub的本身的實現。
public class MyListener extends JedisPubSub { // 取得訂閱的消息後的處理 @Override public void onMessage(String channel, String message) { // TODO Auto-generated method stub System.out.println(channel + "=" + message); } // 取得按表達式的方式訂閱的消息後的處理 @Override public void onPMessage(String pattern, String channel, String message) { // TODO Auto-generated method stub System.out.println(pattern + ":" + channel + "=" + message); } // 初始化訂閱時候的處理 @Override public void onSubscribe(String channel, int subscribedChannels) { // TODO Auto-generated method stub System.out.println("初始化 【頻道訂閱】 時候的處理 "); } // 取消訂閱時候的處理 @Override public void onUnsubscribe(String channel, int subscribedChannels) { // TODO Auto-generated method stub System.out.println("// 取消 【頻道訂閱】 時候的處理 "); } // 初始化按表達式的方式訂閱時候的處理 @Override public void onPSubscribe(String pattern, int subscribedChannels) { // TODO Auto-generated method stub System.out.println("初始化 【模式訂閱】 時候的處理 "); } // 取消按表達式的方式訂閱時候的處理 @Override public void onPUnsubscribe(String pattern, int subscribedChannels) { // TODO Auto-generated method stub System.out.println("取消 【模式訂閱】 時候的處理 "); } }
二、Sub
public class Sub { public static void main(String[] args) { try { Jedis jedis = getJedis(); MyListener ml = new MyListener(); //能夠訂閱多個頻道 //jedis.subscribe(ml, "liuxiao","hello","hello_liuxiao","hello_redis"); //jedis.subscribe(ml, new String[]{"hello_foo","hello_test"}); //這裏啓動了訂閱監聽,線程將在這裏被阻塞 //訂閱獲得信息在lister的onPMessage(...)方法中進行處理 //使用模式匹配的方式設置頻道 jedis.psubscribe(ml, new String[]{"hello_*"}); } catch (Exception e) { e.printStackTrace(); } } }
須要注意的是:
調用subscribe()或psubscribe() 時,當前線程都會阻塞。
三、Pub
public class Pub { public static void main(String[] args) { try { Jedis jedis = getJedis(); jedis.publish("hello_redis","hello_redis"); } catch (Exception e) { e.printStackTrace(); } } }
Set 就是一個集合,集合的概念就是一堆不重複值的組合。利用 Redis 提供的 Set 數據結構,能夠存儲一些集合性的數據。好比在Twitter應用中,能夠將一個用戶全部的關注人存在一個集合中,將其全部粉絲存在一個集合。由於 Redis 很是人性化的爲集合提供了求交集、並集、差集等操做,那麼就能夠很是方便的實現如共同關注、共同喜愛、二度好友等功能,對上面的全部集合操做,你還可使用不一樣的命令選擇將結果返回給客戶端仍是存集到一個新的集合中。
命令:
sadd key member //添加元素,成功返回1, srem key member //移除元素,成功返回1 spop key //刪除並返回,若是set是空或者不存在則返回null srandmember key //同spop,隨機取set中一個元素,可是不刪除 smove srckey dstkey member //集合間移動元素 scard key //查看集合的大小,若是set是空或者key不存在則返回0 sismember key member //判斷member是否在Set中,存在返回1,0表示不存在或key不存在 smembers key //獲取全部元素,返回key對應的全部元素,結果是無序的哦 -------------------------------------------------------------------------------- //集合交集 sinter key1 key2 //返回全部給定key的交集 sinterstore dstkey key1 key2 //同sinter,並同時保存並集到dstkey下 //集合並集 sunion key1 key2 //返回全部給定key的並集 sunionstore dstkey key1 key2 //同sunion,並同時保存並集到dstkey下 //集合差集 sdiff key1 key2 //返回給定key的差集 sdiffstore dstkey key1 key2 //同sdiff,並同時保存並集到dstkey下
爲了防止用戶對同一篇文章進行屢次投票,須要爲每篇文章記錄一個已投票用戶名單。使用集合來存儲已投票的用戶ID。因爲集合是不能存儲多個相同的元素的,因此不會出現同個用戶對同一篇文章屢次投票的狀況。
代碼實現:
二、程序須要使用SADD將文章發佈者的ID添加到記錄文章已投票用戶名單的集合中 並使用EXPIRE命令爲這個集合設置一個過時時間,讓Redis在文章發佈期滿一週以後自動刪除這個集合。 //二、添加到記錄文章已投用戶名單中, conn.sadd(voted, user); //三、設置一週爲過時時間 conn.expire(voted, ONE_WEEK_IN_SECONDS); -------------------------------------------------------------------------------- 四、檢查用戶是否第一次爲這篇文章投票,若是是第一次,則在增長這篇文章的投票數量和評分。 將一個或多個 member 元素加入到集合 key 當中,已經存在於集合的 member 元素將被忽略。 if (conn.sadd("voted:" + articleId, user) == 1) { //ZINCRBY salary 2000 tom # tom 加薪啦! conn.zincrby("score:", VOTE_SCORE, article); //HINCRBY counter page_view 200 conn.hincrBy(article, "votes", 1L); } --------------------------------------------------------------------------------- /** * 記錄文章屬於哪一個羣組 * 將所屬一個羣組的文章ID記錄到那個集合中 * Redis不只能夠對多個集合執行操做,甚至在一些狀況下,還能夠在集合和有序集合之間執行操做 */ //一、構建存儲文章信息的鍵名 String article = "article:" + articleId; for (String group : toAdd) { //二、將文章添加到它所屬的羣組裏面 conn.sadd("group:" + group, article); }
Sorted Set的實現是hash table(element->score, 用於實現ZScore及判斷element是否在集合內),和skip list(score->element,按score排序)的混合體。 skip list有點像平衡二叉樹那樣,不一樣範圍的score被分紅一層一層,每層是一個按score排序的鏈表。 ZAdd/ZRem是O(log(N)),ZRangeByScore/ZRemRangeByScore是O(log(N)+M),N是Set大小,M是結果/操做元素的個數。可見,本來可能很大的N被很關鍵的Log了一下,1000萬大小的Set,複雜度也只是幾十不到。固然,若是一次命中不少元素M很大那誰也沒辦法了。
經常使用命令 :
zadd key score member //添加元素到集合,元素在集合中存在則更新對應的score zrem key member //1表示成功,若是元素不存在則返回0 zremrangebyrank min max //刪除集合中排名在給定的區間 zincrvy key member //增長對於member的score的值。 zcard key //返回集合中元素的個數 //獲取排名 zrank key member //返回指定元素在集合中的排名, zrebrank key member //同時,可是集合中的元素是按score從大到小排序的 //獲取排行榜 zrange key start end //相似lrange操做從集合中去指定區間的元素,返回時有序的。 //給給定分數區間的元素 zrangebyscore key min max //評分的聚合 ZUNIONSTORE destination numkeys key [key ...] [WEIGHTS weight] [AGGREGATE SUM|MIN|MAX]
代碼實現:
/** * 一、每次用戶瀏覽頁面的時候,程序需都會對用戶存儲在登陸散列裏面的信息進行更新, * 二、並將用戶的令牌和當前時間戳添加到記錄最近登陸用戶的集合裏。 * 三、若是用戶正在瀏覽的是一個商品,程序還會將商品添加到記錄這個用戶最近瀏覽過的商品有序集合裏面, * 四、若是記錄商品的數量超過25個時,對這個有序集合進行修剪。 */ //一、獲取當前時間戳 long timestamp = System.currentTimeMillis() / 1000; //二、維持令牌與已登陸用戶之間的映射。 conn.hset("login:", token, user); //三、記錄令牌最後一次出現的時間 conn.zadd("recent:", timestamp, token); if (item != null) { //四、記錄用戶瀏覽過的商品 conn.zadd("viewed:" + token, timestamp, item); //五、移除舊記錄,只保留用戶最近瀏覽過的25個商品 conn.zremrangeByRank("viewed:" + token, 0, -26); //六、爲有序集key的成員member的score值加上增量increment。經過傳遞一個負數值increment 讓 score 減去相應的值, conn.zincrby("viewed:", -1, item); } --------------------------------------------------------------------------------- /** *存儲會話數據所需的內存會隨着時間的推移而不斷增長,全部咱們須要按期清理舊的會話數據。 * 一、清理會話的程序由一個循環構成,這個循環每次執行的時候,都會檢查存儲在最近登陸令牌的有序集合的大小。 * 二、若是有序集合的大小超過了限制,那麼程序會從有序集合中移除最多100個最舊的令牌, * 三、並從記錄用戶登陸信息的散列裏移除被刪除令牌對應的用戶信息, * 四、並對存儲了這些用戶最近瀏覽商品記錄的有序集合中進行清理。 * 五、於此相反,若是令牌的數量沒有超過限制,那麼程序會先休眠一秒,以後在從新進行檢查。 */ public void run() { while (!quit) { //一、找出目前已有令牌的數量。 long size = conn.zcard("recent:"); //二、令牌數量未超過限制,休眠1秒,並在以後從新檢查 if (size <= limit) { try { sleep(1000); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } continue; } long endIndex = Math.min(size - limit, 100); //三、獲取須要移除的令牌ID Set<String> tokenSet = conn.zrange("recent:", 0, endIndex - 1); String[] tokens = tokenSet.toArray(new String[tokenSet.size()]); ArrayList<String> sessionKeys = new ArrayList<String>(); for (String token : tokens) { //四、爲那些將要被刪除的令牌構建鍵名 sessionKeys.add("viewed:" + token); } //五、移除最舊的令牌 conn.del(sessionKeys.toArray(new String[sessionKeys.size()])); //六、移除被刪除令牌對應的用戶信息 conn.hdel("login:", tokens); //七、移除用戶最近瀏覽商品記錄。 conn.zrem("recent:", tokens); } } } //七、移除用戶最近瀏覽商品記錄。 conn.zrem("recent:", tokens);
另一個案例:
/** * 爲了應對促銷活動帶來的大量負載,須要對數據行進行緩存,具體作法是: * 一、編寫一個持續運行的守護進程,讓這個函數指定的數據行緩存到redis裏面,並不按期的更新。 * 二、緩存函數會將數據行編碼爲JSON字典並存儲在Redis字典裏。其中數據列的名字會被映射爲JSON的字典, * 而數據行的值則被映射爲JSON字典的值。 * ----------------------------------------------------------------------------------------- * 程序使用兩個有序集合來記錄應該在什麼時候對緩存進行更新: * 一、第一個爲調用有序集合,他的成員爲數據行的ID,而分支則是一個時間戳, * 這個時間戳記錄了應該在什麼時候將指定的數據行緩存到Redis裏面 * 二、第二個有序集合爲延時有序集合,他的成員也是數據行的ID, * 而分值則記錄了指定數據行的緩存須要每隔多少秒更新一次。 * ---------------------------------------------------------------------------------------- * 爲了讓緩存函數按期的緩存數據行,程序首先須要將hangID和給定的延遲值添加到延遲有序集合裏面, * 而後再將行ID和當前指定的時間戳添加到調度有序集合裏面。 */ public void scheduleRowCache(Jedis conn, String rowId, int delay) { //一、先設置數據行的延遲值 conn.zadd("delay:", delay, rowId); //二、當即對須要行村的數據進行調度 conn.zadd("schedule:", System.currentTimeMillis() / 1000, rowId); } -------------------------------------------------------------------------------------- /** * 一、經過組合使用調度函數和持續運行緩存函數,實現類一種重讀進行調度的自動緩存機制, * 而且能夠爲所欲爲的控制數據行緩存的更新頻率: * 二、若是數據行記錄的是特價促銷商品的剩餘數量,而且參與促銷活動的用戶特別多的話,那麼最好每隔幾秒更新一次數據行緩存: * 另外一方面,若是數據並不常常改變,或者商品缺貨是能夠接受的,那麼能夠每隔幾分鐘更新一次緩存。 */ public class CacheRowsThread extends Thread { private Jedis conn; private boolean quit; public CacheRowsThread() { this.conn = new Jedis("localhost"); this.conn.select(14); } public void quit() { quit = true; } public void run() { Gson gson = new Gson(); while (!quit) { //一、嘗試獲取下一個須要被緩存的數據行以及該行的調度時間戳,返回一個包含0個或一個元組列表 Set<Tuple> range = conn.zrangeWithScores("schedule:", 0, 0); Tuple next = range.size() > 0 ? range.iterator().next() : null; long now = System.currentTimeMillis() / 1000; //二、暫時沒有行須要被緩存,休眠50毫秒。 if (next == null || next.getScore() > now) { try { sleep(50); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } continue; } //三、提早獲取下一次調度的延遲時間, String rowId = next.getElement(); double delay = conn.zscore("delay:", rowId); if (delay <= 0) { //四、沒必要在緩存這個行,將它從緩存中移除 conn.zrem("delay:", rowId); conn.zrem("schedule:", rowId); conn.del("inv:" + rowId); continue; } //五、繼續讀取數據行 Inventory row = Inventory.get(rowId); //六、更新調度時間,並設置緩存值。 conn.zadd("schedule:", now + delay, rowId); conn.set("inv:" + rowId, gson.toJson(row)); } } }
具體的內容看我以前寫的筆記購物網站的redis相關實現(Java)和文章投票網站的redis相關實現(Java).
這個Redis主題就算記錄完了,gogogo。