5.10.1. @Transactional Support
Transaction Support is disabled by default and has to be explicitly enabled for each RedisTemplate in use by setting setEnableTransactionSupport(true).
This will force binding the RedisConnection in use to the current Thread triggering MULTI.
If the transaction finishes without errors, EXEC is called, otherwise DISCARD.
Once in MULTI, RedisConnection would queue write operations, all readonly operations, such as KEYS are piped to a fresh (non thread bound) RedisConnection.css
/** Sample Configuration **/ @Configuration public class RedisTxContextConfiguration { @Bean public StringRedisTemplate redisTemplate() { StringRedisTemplate template = new StringRedisTemplate(redisConnectionFactory()); // explicitly enable transaction support template.setEnableTransactionSupport(true); return template; } @Bean public PlatformTransactionManager transactionManager() throws SQLException { return new DataSourceTransactionManager(dataSource()); } @Bean public RedisConnectionFactory redisConnectionFactory( // jedis, lettuce, srp,... ); @Bean public DataSource dataSource() throws SQLException { // ... } } /** Usage Constrainsts **/ // executed on thread bound connection template.opsForValue().set("foo", "bar"); // read operation executed on a free (not tx-aware) connection template.keys("*"); // returns null as values set within transaction are not visible template.opsForValue().get("foo");
public Long leftPush(V value) { return this.ops.leftPush(this.getKey(), value); }
public Long leftPush(K key, V value) { final byte[] rawKey = this.rawKey(key); final byte[] rawValue = this.rawValue(value); return (Long)this.execute(new RedisCallback() { public Long doInRedis(RedisConnection connection) { return connection.lPush(rawKey, new byte[][]{rawValue}); } }, true); }
<T> T execute(RedisCallback<T> callback, boolean b) { return this.template.execute(callback, b); }
public <T> T execute(RedisCallback<T> action, boolean exposeConnection) { return this.execute(action, exposeConnection, false); }
public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) { Assert.isTrue(this.initialized, "template not initialized; call afterPropertiesSet() before using it"); Assert.notNull(action, "Callback object must not be null"); RedisConnectionFactory factory = this.getConnectionFactory(); RedisConnection conn = null; Object var11; try { if(this.enableTransactionSupport) { conn = RedisConnectionUtils.bindConnection(factory, this.enableTransactionSupport); } else { conn = RedisConnectionUtils.getConnection(factory); } boolean existingConnection = TransactionSynchronizationManager.hasResource(factory); RedisConnection connToUse = this.preProcessConnection(conn, existingConnection); boolean pipelineStatus = connToUse.isPipelined(); if(pipeline && !pipelineStatus) { connToUse.openPipeline(); } RedisConnection connToExpose = exposeConnection?connToUse:this.createRedisConnectionProxy(connToUse); Object result = action.doInRedis(connToExpose); if(pipeline && !pipelineStatus) { connToUse.closePipeline(); } var11 = this.postProcessResult(result, connToUse, existingConnection); } finally { if(!this.enableTransactionSupport) { RedisConnectionUtils.releaseConnection(conn, factory); } } return var11; }
本文主要講述如何在java裏頭使用redis進行cas操做。其實呢,redis不像memcached那樣顯示地支持cas操做,不過它有事務的概念。html
redis的docker搭建java
Redis經過使用WATCH, MULTI, and EXEC組成的事務來實現樂觀鎖(注意沒有用DISCARD
),Redis事務沒有回滾操做。在SpringDataRedis當中經過RedisTemplate的SessionCallback中來支持(不然事務不生效
)。discard的話不須要本身代碼處理,callback返回null,成的話,返回非null,依據這個來判斷事務是否成功(沒有拋異常
)。github
@Test public void cas() throws InterruptedException, ExecutionException { String key = "test-cas-1"; ValueOperations<String, String> strOps = redisTemplate.opsForValue(); strOps.set(key, "hello"); ExecutorService pool = Executors.newCachedThreadPool(); List<Callable<Object>> tasks = new ArrayList<>(); for(int i=0;i<5;i++){ final int idx = i; tasks.add(new Callable() { @Override public Object call() throws Exception { return redisTemplate.execute(new SessionCallback() { @Override public Object execute(RedisOperations operations) throws DataAccessException { operations.watch(key); String origin = (String) operations.opsForValue().get(key); operations.multi(); operations.opsForValue().set(key, origin + idx); Object rs = operations.exec(); System.out.println("set:"+origin+idx+" rs:"+rs); return rs; } }); } }); } List<Future<Object>> futures = pool.invokeAll(tasks); for(Future<Object> f:futures){ System.out.println(f.get()); } pool.shutdown(); pool.awaitTermination(1000, TimeUnit.MILLISECONDS); }
輸出web
set:hello2 rs:null set:hello3 rs:[] set:hello1 rs:null set:hello4 rs:null set:hello0 rs:null
查看該值redis
127.0.0.1:6379> get test-cas-1 "\"hello3\""
沒有在SessionCallback裏頭執行watch、multi、exec,而是本身單獨寫算法
template.setEnableTransactionSupport(true);
這個應該是支持數據庫的事務成功才執行的意思。由於Spring默認的事務,都是基於DB事務的
/** * Gets a Redis connection. Is aware of and will return any existing corresponding connections bound to the current * thread, for example when using a transaction manager. Will create a new Connection otherwise, if * {@code allowCreate} is <tt>true</tt>. * * @param factory connection factory for creating the connection * @param allowCreate whether a new (unbound) connection should be created when no connection can be found for the * current thread * @param bind binds the connection to the thread, in case one was created * @param enableTransactionSupport * @return an active Redis connection */ public static RedisConnection doGetConnection(RedisConnectionFactory factory, boolean allowCreate, boolean bind, boolean enableTransactionSupport) { Assert.notNull(factory, "No RedisConnectionFactory specified"); RedisConnectionHolder connHolder = (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory); if (connHolder != null) { if (enableTransactionSupport) { potentiallyRegisterTransactionSynchronisation(connHolder, factory); } return connHolder.getConnection(); } if (!allowCreate) { throw new IllegalArgumentException("No connection found and allowCreate = false"); } if (log.isDebugEnabled()) { log.debug("Opening RedisConnection"); } RedisConnection conn = factory.getConnection(); if (bind) { RedisConnection connectionToBind = conn; if (enableTransactionSupport && isActualNonReadonlyTransactionActive()) { connectionToBind = createConnectionProxy(conn, factory); } connHolder = new RedisConnectionHolder(connectionToBind); TransactionSynchronizationManager.bindResource(factory, connHolder); if (enableTransactionSupport) { potentiallyRegisterTransactionSynchronisation(connHolder, factory); } return connHolder.getConnection(); } return conn; }
不要跟本文的樂觀鎖說的事務混淆在一塊兒。spring
https://segmentfault.com/a/1190000004393573
org.springframework.data.redis.core.RedisTemplate
public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) { Assert.isTrue(this.initialized, "template not initialized; call afterPropertiesSet() before using it"); Assert.notNull(action, "Callback object must not be null"); RedisConnectionFactory factory = this.getConnectionFactory(); RedisConnection conn = null; Object var11; try { if(this.enableTransactionSupport) { conn = RedisConnectionUtils.bindConnection(factory, this.enableTransactionSupport);
//這個是否支持的開關能夠在@Configuration中配置: } else { conn = RedisConnectionUtils.getConnection(factory); } boolean existingConnection = TransactionSynchronizationManager.hasResource(factory); RedisConnection connToUse = this.preProcessConnection(conn, existingConnection); boolean pipelineStatus = connToUse.isPipelined(); if(pipeline && !pipelineStatus) { connToUse.openPipeline(); } RedisConnection connToExpose = exposeConnection?connToUse:this.createRedisConnectionProxy(connToUse); Object result = action.doInRedis(connToExpose); if(pipeline && !pipelineStatus) { connToUse.closePipeline(); } var11 = this.postProcessResult(result, connToUse, existingConnection); } finally { if(!this.enableTransactionSupport) { RedisConnectionUtils.releaseConnection(conn, factory); } } return var11; }
@Bean public RedisTemplate redisTemplate(){ RedisTemplate<StringRedisSerializer, Serializable> rt = new RedisTemplate<>(); rt.setConnectionFactory(jedisConnectionFactory()); StringRedisSerializer stringSerializer = new StringRedisSerializer(); JdkSerializationRedisSerializer jdkSerializationRedisSerializer = new JdkSerializationRedisSerializer(); RedisHashKeySerializer redisHashKeySerializer = new RedisHashKeySerializer(); rt.setKeySerializer(stringSerializer); rt.setValueSerializer(jdkSerializationRedisSerializer); rt.setHashKeySerializer(redisHashKeySerializer); rt.setHashValueSerializer(jdkSerializationRedisSerializer); rt.afterPropertiesSet(); rt.setEnableTransactionSupport(true); return rt; }
org.springframework.data.redis.core.RedisConnectionUtils
public static RedisConnection doGetConnection(RedisConnectionFactory factory, boolean allowCreate, boolean bind, boolean enableTransactionSupport) { Assert.notNull(factory, "No RedisConnectionFactory specified"); RedisConnectionUtils.RedisConnectionHolder connHolder = (RedisConnectionUtils.RedisConnectionHolder)TransactionSynchronizationManager.getResource(factory); if(connHolder != null) { if(enableTransactionSupport) { potentiallyRegisterTransactionSynchronisation(connHolder, factory); } return connHolder.getConnection(); } else if(!allowCreate) { throw new IllegalArgumentException("No connection found and allowCreate = false"); } else { if(log.isDebugEnabled()) { log.debug("Opening RedisConnection"); } RedisConnection conn = factory.getConnection(); if(bind) { RedisConnection connectionToBind = conn; if(enableTransactionSupport && isActualNonReadonlyTransactionActive()) { connectionToBind = createConnectionProxy(conn, factory); } connHolder = new RedisConnectionUtils.RedisConnectionHolder(connectionToBind); TransactionSynchronizationManager.bindResource(factory, connHolder); if(enableTransactionSupport) { potentiallyRegisterTransactionSynchronisation(connHolder, factory); } return connHolder.getConnection(); } else { return conn; } } }
private static void potentiallyRegisterTransactionSynchronisation(RedisConnectionUtils.RedisConnectionHolder connHolder, RedisConnectionFactory factory) { if(isActualNonReadonlyTransactionActive() && !connHolder.isTransactionSyncronisationActive()) { connHolder.setTransactionSyncronisationActive(true); RedisConnection conn = connHolder.getConnection(); conn.multi(); TransactionSynchronizationManager.registerSynchronization(new RedisConnectionUtils.RedisTransactionSynchronizer(connHolder, conn, factory)); } }
RedisTemplate api詳解
private boolean enableTransactionSupport = false; private boolean exposeConnection = false; private boolean initialized = false; private boolean enableDefaultSerializer = true; private RedisSerializer<?> defaultSerializer = new JdkSerializationRedisSerializer(); private RedisSerializer keySerializer = null; private RedisSerializer valueSerializer = null; private RedisSerializer hashKeySerializer = null; private RedisSerializer hashValueSerializer = null; private RedisSerializer<String> stringSerializer = new StringRedisSerializer(); private ScriptExecutor<K> scriptExecutor; // cache singleton objects (where possible) private ValueOperations<K, V> valueOps; private ListOperations<K, V> listOps; private SetOperations<K, V> setOps; private ZSetOperations<K, V> zSetOps;
enableTransactionSupport:是否啓用事務支持。
在代碼中搜索下用到這個變量的地方,會看到,在調用RedisCallback以前,有一行代碼是若是啓用事務支持,那麼conn = RedisConnectionUtils.bindConnection(factory, enableTransactionSupport),
也就是說,系統自動幫咱們拿到了事務中綁定的鏈接。
能夠在一個方法的屢次對Redis增刪該查中,始終使用同一個鏈接。可是,即便使用了一樣的鏈接,沒有進行connection.multi()和connection.exec(),依然是沒法啓用事務的。
我沒有仔細的查閱代碼,可是能夠知道的是,Spring已經對這個,給了咱們一個更好的支持:@Transactional
在調用RedisTempalte中的execute()方法的地方,加入這個註解(是spring包下面提供的,不要引用成rt包下的註解),能讓這個方法中的全部execute,自動加入multi()以及異常的回滾或者是正常運行時候的提交!
用過jedis操做的都知道,全部connection的操做方法,都是傳入字節數組。那麼,將一個對象和字節相互轉換,就須要經過序列化和反序列化。
模版方法中,Spring提供了默認的StringSerializer和JdkSerializer,第一個很簡單,就是經過String.getBytes()來實現的。並且在Redis中,全部存儲的值都是字符串類型的。因此這種方法保存後,經過Redis-cli控制檯,是能夠清楚的查看到咱們保存了什麼key,value是什麼。可是對於JdkSerializationRedisSerializer來講,這個序列化方法就是Jdk提供的了。首先要求咱們要被序列化的類繼承自Serializeable接口,而後經過,而後經過Jdk對象序列化的方法保存。(注:這個序列化保存的對象,即便是個String類型的,在redis控制檯,也是看不出來的,由於它保存了一些對象的類型什麼的額外信息,)
這麼一長串,其實就是一個int類型的123。
keySerializer:這個是對key的默認序列化器。默認值是StringSerializer。
valueSerializer:這個是對value的默認序列化器,默認值是取自DefaultSerializer的JdkSerializationRedisSerializer。
hashKeySerializer:對hash結構數據的hashkey序列化器,默認值是取自DefaultSerializer的JdkSerializationRedisSerializer。
hashValueSerializer:對hash結構數據的hashvalue序列化器,默認值是取自DefaultSerializer的JdkSerializationRedisSerializer。
除此以外,咱們在該類中,還發現了valueOps和hashOps等操做類,這是spring給咱們提供的能夠直接使用來操做Redis的類,很是方便。下一篇咱們將講解這些類。
http://www.cnblogs.com/luochengqiuse/p/4640932.html?utm_source=tuicool&utm_medium=referral
把 Redis 看成數據庫的用例
如今咱們來看看在服務器端 Java 企業版系統中把 Redis 看成數據庫的各類用法吧。不管用例的簡繁,Redis 都能幫助用戶優化性能、處理能力和延遲,讓常規 Java 企業版技術棧望而卻步。
1. 全局惟一增量計數器
咱們先從一個相對簡單的用例開始吧:一個增量計數器,可顯示某網站受到多少次點擊。Spring Data Redis 有兩個適用於這一實用程序的類:RedisAtomicInteger 和 RedisAtomicLong。和 Java 併發包中的 AtomicInteger 和 AtomicLong 不一樣的是,這些 Spring 類能在多個 JVM 中發揮做用。
列表 1:全局惟一增量計數器
RedisAtomicLong counter = new RedisAtomicLong("UNIQUE_COUNTER_NAME", redisTemplate.getConnectionFactory()); Long myCounter = counter.incrementAndGet();// return the incremented value
請注意整型溢出並謹記,在這兩個類上進行操做須要付出相對較高的代價。
2. 全局悲觀鎖
時不時的,用戶就得應對服務器集羣的爭用。假設你從一個服務器集羣運行一個預約做業。在沒有全局鎖的狀況下,集羣中的節點會發起冗餘做業實例。假設某個聊天室分區可容納 50 人。若是聊天室已滿,就須要建立新的聊天室實例來容納另外 50 人。
若是檢測到聊天室已滿但沒有全局鎖,集羣中的各個節點就會建立自有的聊天室實例,爲整個系統帶來不可預知的因素。列表 2 介紹了應當如何充分利用 SETNX(SET if **N**ot e**X**ists:若是不存在,則設置)這一 Redis 命令來執行全局悲觀鎖。
列表2:全局悲觀鎖
import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.BoundValueOperations; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; @Service public class DistributedLockRedis { public static final Logger LOGGER = LoggerFactory.getLogger(DistributedLockRedis.class); public static final String DISTRIBUTED_LOCK_REDIS_KEY = "distributed:lock:redis:"; @Autowired private RedisTemplate<String, Long> redisTemplate; /** * 因爲Redis是單線程模型,命令操做原子性,因此利用這個特性能夠很容易的實現分佈式鎖。 * 得到一個鎖 * * @param bizName 業務名 * @param lockTimeout 線程佔用鎖的時間 * @param unit 單位 * @throws InterruptedException 鎖能夠被中斷 */ public void lock(String bizName, int lockTimeout, TimeUnit unit) throws InterruptedException { // redisTemplate.getConnectionFactory().getConnection().setNX() String redisKey; if (StringUtils.isBlank(bizName)) { LOGGER.warn("is not recommended!"); redisKey = DISTRIBUTED_LOCK_REDIS_KEY; } else { redisKey = DISTRIBUTED_LOCK_REDIS_KEY + bizName.trim(); } BoundValueOperations<String, Long> valueOps = redisTemplate.boundValueOps(redisKey); while (true) { // https://redis.io/commands/setnx long currentTimeMillis = System.currentTimeMillis(); long releaseLockTime = currentTimeMillis + unit.toMillis(lockTimeout) + 1; //這兩個if else不能混寫,由於多個相同類型的線程競爭鎖時,在鎖超時時,設置的超時時間是同樣的 if (valueOps.setIfAbsent(releaseLockTime)) {//第一次獲取鎖 redisTemplate.expire(redisKey, lockTimeout, unit); return; } else if (currentTimeMillis > valueOps.get()) {//鎖已經超時 //若是其它線程佔用鎖,再從新設置的時間和原來時間的時間差,能夠忽略 Long lockCurrentValue = valueOps.getAndSet(releaseLockTime); //若是當前時間小於LockKey存放的時間,說明已經有其它線程加鎖 if (currentTimeMillis > lockCurrentValue) { redisTemplate.expire(redisKey, lockTimeout, unit); return; } } else { TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(100, 1000)); } } } }
Set key to hold string value if key does not exist. In that case, it is equal to SET. When key already holds a value, no operation is performed. SETNX is short for "SET if Not eXists".
Return value
Integer reply, specifically:
1 if the key was set
0 if the key was not set
Examples
redis> SETNX mykey "Hello"
(integer) 1
redis> SETNX mykey "World"
(integer) 0
redis> GET mykey
"Hello"
redis>
How to do set nx option using RedisTemplate?
The method name is setIfAbsent
setIfAbsent
Boolean setIfAbsent(V value)
Set the bound key to hold the string value if the bound key is absent.
Parameters:
value -
See Also:
Redis Documentation: SETNX
http://docs.spring.io/spring-data/redis/docs/current/api/org/springframework/data/redis/core/BoundValueOperations.html#setIfAbsent-V-
用如下Python代碼來實現上述的使用 SETNX 命令做分佈式鎖的算法。
LOCK_TIMEOUT = 3 lock = 0 lock_timeout = 0 lock_key = 'lock.foo' # 獲取鎖 while lock != 1: now = int(time.time()) lock_timeout = now + LOCK_TIMEOUT + 1 lock = redis_client.setnx(lock_key, lock_timeout) if lock == 1 or (now > int(redis_client.get(lock_key))) and now > int(redis_client.getset(lock_key, lock_timeout)): break else: time.sleep(0.001) # 已得到鎖 do_job() # 釋放鎖 now = int(time.time()) if now < lock_timeout: redis_client.delete(lock_key)
http://blog.csdn.net/lihao21/article/details/49104695
若是使用關係數據庫,一旦最早生成鎖的程序意外退出,鎖就可能永遠得不到釋放。Redis 的 EXPIRE 設置可確保在任何狀況下釋放鎖。
3. 位屏蔽(Bit Mask)
假設 web 客戶端須要輪詢一臺 web 服務器,針對某個數據庫中的多個表查詢客戶指定更新內容。若是盲目地查詢全部相應的表以尋找潛在更新,成本較高。爲了不這一作法,能夠嘗試在 Redis 中給每一個客戶端保存一個整型做爲髒指標,整型的每一個數位表示一個表。該表中存在客戶所需更新時,設置數位。輪詢期間,不會觸發對錶的查詢,除非設置了相應數位。就獲取並將這樣的位屏蔽設置爲 STRING 而言,Redis 很是高效。
4. 排行榜(Leaderboard)
Redis 的 ZSET 數據結構爲遊戲玩家排行榜提供了簡潔的解決方案。ZSET 的工做方式有些相似於 Java 中的 PriorityQueue,各個對象均爲通過排序的數據結構,層次分明。能夠按照分數排出遊戲玩家在排行榜上的位置。Redis 的 ZSET 定義了一分內容豐富的命令列表,支持靈活有效的查詢。例如,ZRANGE(包括 ZREVRANGE)可返回有序集內的指定範圍要素。
你可使用這一命令列出排行榜前 100 名玩家。ZRANGEBYSCORE 返回指定分數範圍內的要素(例如列出得分爲 1000 至 2000 之間的玩家),ZRNK 則返回有序集內的要素的排名,諸如此類。
5. 布隆(Bloom)過濾器
布隆過濾器 (Bloom filter) 是一種空間利用率較高的機率數據結構,用來測試某元素是否某個集的一員。可能會出現誤報匹配,但不會漏報。查詢可返回「可能在集內」或「確定不在集內」。
就在線服務和離線服務包括大數據分析等方面,布隆過濾器數據結構都能派上不少用場。Facebook 利用布隆過濾器進行輸入提示搜索,爲用戶輸入的查詢提取朋友和朋友的朋友。Apache HBase 則利用布隆過濾器過濾掉不包含特殊行或列的 HFile 塊磁盤讀取,使讀取速度獲得明顯提高。Bitly 用布隆過濾器來避免將用戶重定向到惡意網站,而 Quara 則在訂閱後端執行了一個切分的布隆過濾器,用來過濾掉以前查看過的內容。在我本身的項目裏,我用布隆過濾器追蹤用戶對各個主題的投票狀況。
藉助出色的速度和處理能力,Redis 極好地融合了布隆過濾器。搜索 GitHub,就能發現不少 Redis 布隆過濾器項目,其中一些還支持可調諧精度。
6. 高效的全局通知:發佈/訂閱渠道
Redis 發佈/訂閱渠道的工做方式相似於一個扇出消息傳遞系統,或 JMS 語義中的一個主題。JMS 主題和 Redis 發佈/訂閱渠道的一個區別是,經過 Redis 發佈的消息並不持久。消息被推送給全部相連的客戶端後,Redis 上就會刪除這一消息。換句話說,訂閱者必須一直在線才能接收新消息。Redis 發佈/訂閱渠道的典型用例包括實時配置分佈、簡單的聊天服務器等。
在 web 服務器集羣中,每一個節點均可以是 Redis 發佈/訂閱渠道的一個訂閱者。發佈到渠道上的消息也會被即時推送到全部相連節點。這一消息能夠是某種配置更改,也能夠是針對全部在線用戶的全局通知。和恆定輪詢相比,這種推送溝通模式顯然極爲高效。
Redis 性能優化
Redis 很是強大,但也能夠從總體上和根據特定編程場景作出進一步優化。能夠考慮如下技巧。
存活時間
全部 Redis 數據結構都具有存活時間 (TTL) 屬性。當你設置這一屬性時,數據結構會在過時後自動刪除。充分利用這一功能,可讓 Redis 保持較低的內存損耗。
管道技術
在一條請求中向 Redis 發送多個命令,這種方法叫作管道技術。這一技術節省了網絡往返的成本,這一點很是重要,由於網絡延遲可能比 Redis 延遲要高上好幾個量級。但這裏存在一個陷阱:管道中的 Redis 命令列表必須預先肯定,而且應當彼此獨立。若是一個命令的參數是由先前命令的結果計算得出,管道技術就不起做用。列表 3 給出了 Redis 管道技術的一個示例。
列表 3:管道技術
@Override public List<LeaderboardEntry> fetchLeaderboard(String key, String... playerIds) { final List<LeaderboardEntry> entries = new ArrayList<>(); redisTemplate.executePipelined(new RedisCallback<Object>() { // enable Redis Pipeline @Override public Object doInRedis(RedisConnection connection) throws DataAccessException { for(String playerId : playerIds) { Long rank = connection.zRevRank(key.getBytes(), playerId.getBytes()); Double score = connection.zScore(key.getBytes(), playerId.getBytes()); LeaderboardEntry entry = new LeaderboardEntry(playerId, score!=null?score.intValue():-1, rank!=null?rank.intValue():-1); entries.add(entry); } return null; } }); return entries; }
副本集和切分
Redis 支持主從副本配置。和 MongoDB 同樣,副本集也是不對稱的,由於從節點是隻讀的,以便共享讀取工做量。
我在文章開頭提到過,也能夠執行切分來橫向擴展 Redis 的處理能力和存儲容量。
事實上,Redis 很是強大,據亞馬遜公司的內部基準顯示,類型 r3.4xlarge 的一個 EC2 實例每秒可輕鬆處理 100000 次請求。傳說還有把每秒 700000 次請求做爲基準的。
對於中小型應用程序,一般無需考慮 Redis 切分。
(請參見這篇很是出色的文章《運行中的 Redis》https://www.manning.com/books/redis-in-action,進一步瞭解 Redis 的性能優化和切分http://www.oneapm.com/brand/apm.html。)
Redis 並不像關係數據庫管理系統那樣能支持全面的 ACID 事務,但其自有的事務也很是有效。從本質上來講,Redis 事務是管道、樂觀鎖、肯定提交和回滾的結合。其思想是執行一個管道中的一個命令列表,而後觀察某一關鍵記錄的潛在更新(樂觀鎖)。根據所觀察的記錄是否會被另外一個進程更新,該命令列表或總體肯定提交,或徹底回滾。
下面以某個拍賣網站上的賣方庫存爲例。買方試圖從賣方處購買某件商品時,你負責觀察 Redis 事務內的賣方庫存變化。同時,你要從同一個庫存中刪除此商品。事務關閉前,若是庫存被一個以上進程觸及(例如,若是兩個買方同時購買了同一件商品),事務將回滾,不然事務會肯定提交。回滾後可開始重試。
我在 Spring 的 RedisTemplate
類 redisTemplate.setEnableTransactionSupport(true);
中啓用 Redis 事務時獲得一個慘痛的教訓:Redis 會在運行幾天後開始返回垃圾數據,致使數據嚴重損壞。StackOverflow上也報道了相似狀況。
在運行一個 monitor
命令後,個人團隊發現,在進行 Redis 操做或 RedisCallback
後,Spring 並無自動關閉 Redis 鏈接,而事實上它是應該關閉的。若是再次使用未關閉的鏈接,可能會從意想不到的 Redis 密鑰返回垃圾數據。有意思的是,若是在 RedisTemplate
中把事務支持設爲 false,這一問題就不會出現了。
咱們發現,咱們能夠先在 Spring 語境裏配置一個 PlatformTransactionManager
(例如 DataSourceTransactionManager
),而後再用 @Transactional
註釋來聲明 Redis 事務的範圍,讓 Spring 自動關閉 Redis 鏈接。
根據這一經驗,咱們相信,在 Spring 語境裏配置兩個單獨的 RedisTemplate
是很好的作法:其中一個 RedisTemplates 的事務設爲 false,用於大多數 Redis 操做,另外一個 RedisTemplates 的事務已激活,僅用於 Redis 事務。固然必需要聲明 PlatformTransactionManager
和 @Transactional
,以防返回垃圾數值。
另外,咱們還發現了 Redis 事務和關係數據庫事務(在本例中,即 JDBC)相結合的不利之處。混合型事務的表現和預想的不太同樣。
I learned a hard lesson when enabling Redis transactions in the Spring RedisTemplate class redisTemplate.setEnableTransactionSupport(true);: Redis started returning junk data after running for a few days, causing serious data corruption. A similar case was reported on StackOverflow. By running a monitor command, my team discovered that after a Redis operation or RedisCallback, Spring doesn't close the Redis connection automatically, as it should do. Reusing an unclosed connection may return junk data from an unexpected key in Redis. Interestingly, this issue doesn't show up when transaction support is set to false in RedisTemplate. We discovered that we could make Spring close Redis connections automatically by configuring a PlatformTransactionManager (such as DataSourceTransactionManager) in the Spring context, then using the @Transactional annotation to declare the scope of Redis transactions. Based on this experience, we believe it's good practice to configure two separate RedisTemplates in the Spring context: One with transaction set to false is used on most Redis operations; the other with transaction enabled is only applied to Redis transactions. Of course PlatformTransactionManager and @Transactional must be declared to prevent junk values from being returned. Moreover, we learned the downside of mixing a Redis transaction with a relational database transaction, in this case JDBC. Mixed transactions do not behave as you would expect.
結論
我但願經過這篇文章向其餘 Java 企業開發師介紹 Redis 的強大之處,尤爲是將 Redis 用做遠程數據緩存和用於易揮發數據時。
在這裏我介紹了 Redis 的六個有效用例,分享了一些性能優化技巧,還說明了個人 Glu Mobile 團隊怎樣解決了 Spring Data Redis 事務配置不當形成的垃圾數據問題。
我但願這篇文章可以激發你對 Redis NoSQL 的好奇心,讓你可以受到啓發,在本身的 Java 企業版系統裏創造出一番天地。
http://blog.oneapm.com/apm-tech/778.html
http://www.javaworld.com/article/3062899/big-data/lightning-fast-nosql-with-spring-data-redis.html?page=2
https://docs.spring.io/spring-data/redis/docs/1.8.6.RELEASE/reference/html/#tx.spring
若是是使用編程的方式(一般是基於 Spring Boot 項目)配置 RedisTemplate 的話只需在你的配置類(被@Configuration
註解修飾的類)中顯式建立 RedisTemplate
Bean,設置 Serializer
便可。
@Bean public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate redisTemplate = new RedisTemplate(); redisTemplate.setConnectionFactory(redisConnectionFactory); GenericFastJsonRedisSerializer fastJsonRedisSerializer = new GenericFastJsonRedisSerializer(); redisTemplate.setDefaultSerializer(fastJsonRedisSerializer);//設置默認的Serialize,包含 keySerializer & valueSerializer //redisTemplate.setKeySerializer(fastJsonRedisSerializer);//單獨設置keySerializer //redisTemplate.setValueSerializer(fastJsonRedisSerializer);//單獨設置valueSerializer return redisTemplate; }
https://github.com/alibaba/fastjson/wiki/%E5%9C%A8-Spring-%E4%B8%AD%E9%9B%86%E6%88%90-Fastjson