分佈式緩存技術redis學習系列(五)——redis實戰(redis與spring整合,分佈式鎖實現)

本文是redis學習系列的第五篇,點擊下面鏈接可回看系列文章

《redis簡介以及linux上的安裝》

《詳細講解redis數據結構(內存模型)以及常用命令》

《redis高級應用(主從、事務與鎖、持久化)》

《redis高級應用(集羣搭建、集羣分區原理、集羣操作》

本文我們繼續學習redis與spring的整合,整合之後就可以用redisStringTemplate的setNX()和delete()方法實現分佈式鎖了。

Redis與spring的整合

相關依賴jar包

spring把專門的數據操作獨立封裝在spring-data系列中,spring-data-redis是對Redis的封裝

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<dependency>
     <groupId>org.springframework.data</groupId>
     <artifactId>spring-data-redis</artifactId>
     <version> 1.4 . 2 .RELEASE</version>
</dependency>
 
<dependency>
     <groupId>redis.clients</groupId>
     <artifactId>jedis</artifactId>
     <version> 2.6 . 2 </version>
</dependency>
 
<dependency>
     <groupId>org.apache.commons</groupId>
     <artifactId>commons-pool2</artifactId>
     <version> 2.4 . 2 </version>
</dependency>

Spring 配置文件applicationContext.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<!--命令空間中加入下面這行-->
xmlns:p= "http://www.springframework.org/schema/p"
 
<!-- redis連接池配置文件 -->
<context:property-placeholder location= "classpath:redis.properties"  /> 
 
<bean id= "poolConfig"  class = "redis.clients.jedis.JedisPoolConfig"
     <property name= "maxIdle"  value= "${redis.maxIdle}"  /> 
     <property name= "maxTotal"  value= "${redis.maxTotal}"  /> 
     <property name= "MaxWaitMillis"  value= "${redis.MaxWaitMillis}"  /> 
     <property name= "testOnBorrow"  value= "${redis.testOnBorrow}"  /> 
</bean> 
   
<bean id= "connectionFactory"  class = "org.springframework.data.    redis.connection.jedis.JedisConnectionFactory" 
     p:host-name= "${redis.host}"  p:port= "${redis.port}"
     p:password= "${redis.pass}"   p:pool-config-ref= "poolConfig" /> 
   
<bean id= "redisTemplate"  class = "org.springframework.data.    redis.core.RedisTemplate"
     <property name= "connectionFactory"    ref= "connectionFactory"  /> 
</bean>

  

注意新版的maxTotal,MaxWaitMillis這兩個字段與舊版的不同

redis連接池配置文件redis.properties

1
2
3
4
5
6
7
8
redis.host= 192.168 . 2.129
redis.port= 6379 
redis.pass=redis129 
 
redis.maxIdle= 300 
redis.maxTotal= 600 
redis.MaxWaitMillis= 1000 
redis.testOnBorrow= true

好了,配置完成,下面寫上代碼

測試代碼

User

1
2
3
4
5
6
7
8
9
@Entity
@Table (name =  "t_user" )
public  class  User {
     //主鍵
     private  String id;
     //用戶名
     private  String userName;
         //...省略get,set...
}

BaseRedisDao

1
2
3
4
5
6
7
@Repository
public  abstract  class  BaseRedisDao<K,V> {
     
     @Autowired (required= true
     protected  RedisTemplate<K, V> redisTemplate;
 
}

IUserDao 

1
2
3
4
5
6
7
8
9
10
11
public  interface  IUserDao {
     
     public  boolean  save(User user);
     
     public  boolean  update(User user);
 
     public  boolean  delete(String userIds);
     
     public  User find(String userId);
     
}

UserDao 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@Repository
public  class  UserDao  extends  BaseRedisDao<String, User>  implements  IUserDao {
     
     @Override
     public  boolean  save( final  User user) {
         boolean  res = redisTemplate.execute( new  RedisCallback<Boolean>() {
             public  Boolean doInRedis(RedisConnection connection)  throws  DataAccessException {
                 RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
                 byte [] key = serializer.serialize(user.getId());
                 byte [] value = serializer.serialize(user.getUserName());
                 //set not exits
                 return  connection.setNX(key, value);
             }
         });
         return  res;
     }
 
     @Override
     public  boolean  update( final  User user) {
         boolean  result = redisTemplate.execute( new  RedisCallback<Boolean>() { 
             public  Boolean doInRedis(RedisConnection connection)  throws  DataAccessException { 
                 RedisSerializer<String> serializer = redisTemplate.getStringSerializer(); 
                 byte [] key  = serializer.serialize(user.getId()); 
                 byte [] name = serializer.serialize(user.getUserName()); 
                 //set
                 connection.set(key, name); 
                 return  true
            
         }); 
         return  result;
     }
 
     @Override
     public  User find( final  String userId) {
         User result = redisTemplate.execute( new  RedisCallback<User>() { 
             public  User doInRedis(RedisConnection connection)  throws  DataAccessException { 
                 RedisSerializer<String> serializer = redisTemplate.getStringSerializer(); 
                 byte [] key = serializer.serialize(userId);
                 //get
                 byte [] value = connection.get(key); 
                 if  (value ==  null ) { 
                     return  null
                
                 String name = serializer.deserialize(value);
                 User resUser =  new  User();
                 resUser.setId(userId);
                 resUser.setUserName(name);
                 return  resUser; 
            
         }); 
         return  result; 
     }
 
     @Override
     public  boolean  delete( final  String userId) {
         boolean  result = redisTemplate.execute( new  RedisCallback<Boolean>() { 
             public  Boolean doInRedis(RedisConnection connection)  throws  DataAccessException { 
                 RedisSerializer<String> serializer = redisTemplate.getStringSerializer(); 
                 byte [] key  = serializer.serialize(userId); 
                 //delete
                 connection.del(key);
                 return  true
            
         }); 
         return  result;
     }
 
}

Test

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@RunWith (SpringJUnit4ClassRunner. class )
@ContextConfiguration (locations = { "classpath*:applicationContext.xml" }) 
public  class  RedisTest  extends  AbstractJUnit4SpringContextTests { 
       
     @Autowired 
     private  IUserDao userDao;
     
     @Test 
     public  void  testSaveUser() { 
         User user =  new  User(); 
         user.setId( "402891815170e8de015170f6520b0000" ); 
         user.setUserName( "zhangsan" ); 
         boolean  res = userDao.save(user);
         Assert.assertTrue(res); 
    
     
     @Test 
     public  void  testGetUser() { 
         User user =  new  User(); 
         user = userDao.find( "402891815170e8de015170f6520b0000" );
         System.out.println(user.getId() +  "-"  + user.getUserName() ); 
    
     
     @Test 
     public  void  testUpdateUser() { 
         User user =  new  User(); 
         user.setId( "402891815170e8de015170f6520b0000" ); 
         user.setUserName( "lisi" ); 
         boolean  res = userDao.update(user);
         Assert.assertTrue(res); 
    
     
     @Test 
     public  void  testDeleteUser() { 
         boolean  res = userDao.delete( "402891815170e8de015170f6520b0000" );
         Assert.assertTrue(res); 
    
    
}

  

String類型的增刪該查已完成,Hash,List,Set數據類型的操作就不舉例了,和使用命令的方式差不多。如下

 

1
2
3
4
5
6
7
8
9
10
11
12
13
connection.hSetNX(key, field, value);
connection.hDel(key, fields);
connection.hGet(key, field);
 
connection.lPop(key);
connection.lPush(key, value);
connection.rPop(key);
connection.rPush(key, values);
 
connection.sAdd(key, values);
connection.sMembers(key);
connection.sDiff(keys);
connection.sPop(key);

  

整合可能遇到的問題

1.NoSuchMethodError

1
2
3
java.lang.NoSuchMethodError: org.springframework.core.serializer.support.DeserializingConverter.<init>(Ljava/lang/ClassLoader;)V
 
Caused by: java.lang.NoSuchMethodError: redis.clients.jedis.JedisShardInfo.setTimeout(I)V

  

類似找不到類,找不到方法的問題,當確定依賴的jar已經引入之後,此類問題多事spring-data-redis以及jedis版本問題,多換個版本試試,本文上面提到的版本可以使用。

1.No qualifying bean

1
No qualifying bean of type [org.springframework.data.redis.core.RedisTemplate] found  for  dependency

  

找不到bean,考慮applicationContext.xml中配置redisTemplate bean時實現類是否寫錯。例如,BaseRedisDao注入的是RedisTemplate類型的對象,applicationContext.xml中配置的實現類卻是RedisTemplate的子類StringRedisTemplate,那肯定報錯。整合好後,下面我們着重學習基於redis的分佈式鎖的實現。

基於redis實現的分佈式鎖

我們知道,在多線程環境中,鎖是實現共享資源互斥訪問的重要機制,以保證任何時刻只有一個線程在訪問共享資源。鎖的基本原理是:用一個狀態值表示鎖,對鎖的佔用和釋放通過狀態值來標識,因此基於redis實現的分佈式鎖主要依賴redis的SETNX命令和DEL命令,SETNX相當於上鎖,DEL相當於釋放鎖,當然,在下面的具體實現中會更復雜些。之所以稱爲分佈式鎖,是因爲客戶端可以在redis集羣環境中向集羣中任一個可用Master節點請求上鎖(即SETNX命令存儲key到redis緩存中是隨機的)。

 

現在相信你已經對在基於redis實現的分佈式鎖的基本概念有了解,需要注意的是,這個和前面文章提到的使用WATCH 命令對key值進行鎖操作沒有直接的關係。java中synchronized和Lock對象都能對共享資源進行加鎖,下面我們將學習用java實現的redis分佈式鎖。

java中的鎖技術

在分析java實現的redis分佈式鎖之前,我們先來回顧下java中的鎖技術,爲了直觀的展示,我們採用「多個線程共享輸出設備」來舉例。

不加鎖共享輸出設備

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public  class  LockTest {
     //不加鎖
     static  class  Outputer {
         public  void  output(String name) {
             for ( int  i= 0 ; i<name.length(); i++) {
                 System.out.print(name.charAt(i));
             }
             System.out.println();
         }
     }
     public  static  void  main(String[] args) {
         final  Outputer output =  new  Outputer();
         //線程1打印zhangsan
         new  Thread( new  Runnable(){
             @Override
             public  void  run() {
                 while ( true ) {
                      try {
                          Thread.sleep( 1000 );
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                      output.output( "zhangsan" );
                 }  
             }
         }).start();
         
         //線程2打印lingsi
         new  Thread( new  Runnable(){
             @Override
             public  void  run() {
                 while ( true ) {
                      try {
                          Thread.sleep( 1000 );
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                      output.output( "lingsi" );
                 }
             }
         }).start();
         
         //線程3打印wangwu
         new  Thread( new  Runnable(){
             @Override
             public  void  run() {
                 while ( true ) {
                      try {
                          Thread.sleep( 1000 );
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                      output.output( "huangwu" );
                 }
             }
         }).start();
     }
}

 

上面例子中,三個線程同時共享輸出設備output,線程1需要打印zhangsan,線程2需要打印lingsi,線程3需要打印wangwu。在不加鎖的情況,這三個線程會不會因爲得不到輸出設備output打架呢,我們來看看運行結果:

 

1
2
3
4
5
6
7
8
9
10
11
huangwu
zhangslingsi
an
huangwu
zlingsi
hangsan
huangwu
lzhangsan
ingsi
huangwu
lingsi

  

從運行結果可以看出,三個線程打架了,線程1沒打印完zhangsan,線程2就來搶輸出設備......可見,這不是我們想要的,我們想要的是線程之間能有序的工作,各個線程之間互斥的使用輸出設備output。

使用java5中的Lock對輸出設備加鎖

現在我們對Outputer進行改進,給它加上鎖,加鎖之後每次只有一個線程能訪問它。

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//使用java5中的鎖
static  class  Outputer{
     Lock lock =  new  ReentrantLock();
     public  void  output(String name) {
         //傳統java加鎖
         //synchronized (Outputer.class){
         lock.lock();
         try  {
             for ( int  i= 0 ; i<name.length(); i++) {
                 System.out.print(name.charAt(i));
             }
             System.out.println();
         } finally {
             //任何情況下都有釋放鎖
             lock.unlock();
         }  
         //}
     }
}

  

看看加鎖後的輸出結果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
zhangsan
lingsi
huangwu
zhangsan
lingsi
huangwu
zhangsan
lingsi
huangwu
zhangsan
lingsi
huangwu
zhangsan
lingsi
huangwu
......

  

從運行結果中可以看出,三個線程之間不打架了,線程之間的打印變得有序。有個這個基礎,下面我們來學習基於Redis實現的分佈式鎖就更容易了。

Redis分佈式鎖

實現分析

從上面java鎖的使用中可以看出,鎖對象主要有lock與unlock方法,在lock與unlock方法之間的代碼(臨界區)能保證線程互斥訪問。基於redis實現的Java分佈式鎖主要依賴redis的SETNX命令和DEL命令,SETNX相當於上鎖(lock),DEL相當於釋放鎖(unlock)。我們只要實現Lock接口重寫lock()和unlock()即可。但是這還不夠,安全可靠的分佈式鎖應該滿足滿足下面三個條件:

l 互斥,不管任何時候,只有一個客戶端能持有同一個鎖。

l 不會死鎖,最終一定會得到鎖,即使持有鎖的客戶端對應的master節點宕掉。

l 容錯,只要大多數Redis節點正常工作,客戶端應該都能獲取和釋放鎖。

那麼什麼情況下回不滿足上面三個條件呢。多個線程(客戶端)同時競爭鎖可能會導致多個客戶端同時擁有鎖。比如,

(1)線程1在master節點拿到了鎖(存入key)

(2)master節點在把線程1創建的key寫入slave之前宕機了,此時集羣中的節點已經沒有鎖(key)了,包括master節點的slaver節點

(3)slaver節點升級爲master節點

(4)線程2向新的master節點發起鎖(存入key)請求,很明顯,能請求成功。

可見,線程1和線程2同時獲得了鎖。如果在更高併發的情況,可能會有更多線程(客戶端)獲取鎖,這種情況就會導致上文所說的線程「打架」問題,線程之間的執行雜亂無章。

 

那什麼情況下又會發生死鎖的情況呢。如果擁有鎖的線程(客戶端)長時間的執行或者因爲某種原因造成阻塞,就會導致鎖無法釋放(unlock沒有調用),其它線程就不能獲取鎖而而產生無限期死鎖的情況。其它線程在執行lock失敗後即使粗暴的執行unlock刪除key之後也不能正常釋放鎖,因爲鎖就只能由獲得鎖的線程釋放,鎖不能正常釋放其它線程仍然獲取不到鎖。解決死鎖的最好方式是設置鎖的有效時間(redis的expire命令),不管是什麼原因導致的死鎖,有效時間過後,鎖將會被自動釋放。

 

爲了保障容錯功能,即只要有Redis節點正常工作,客戶端應該都能獲取和釋放鎖,我們必須用相同的key不斷循環向Master節點請求鎖,當請求時間超過設定的超時時間則放棄請求鎖,這個可以防止一個客戶端在某個宕掉的master節點上阻塞過長時間,如果一個master節點不可用了,應該儘快嘗試下一個master節點。釋放鎖比較簡單,因爲只需要在所有節點都釋放鎖就行,不管之前有沒有在該節點獲取鎖成功。

Redlock算法

根據上面的分析,官方提出了一種用Redis實現分佈式鎖的算法,這個算法稱爲RedLock。RedLock算法的主要流程如下:

 

RedLock算法主要流程

 

 

Java實現

 

結合上面的流程圖,加上下面的代碼解釋,相信你一定能理解redis分佈式鎖的實現原理

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
public  class  RedisLock  implements  Lock{
 
     protected  StringRedisTemplate redisStringTemplate;
 
     // 存儲到redis中的鎖標誌
     private  static  final  String LOCKED =  "LOCKED" ;
 
     // 請求鎖的超時時間(ms)
     private  static  final  long  TIME_OUT =  30000 ;
 
     // 鎖的有效時間(s)
     public  static  final  int  EXPIRE =  60 ;
 
     // 鎖標誌對應的key;
     private  String key;
 
     // state flag
     private  volatile  boolean  isLocked =  false ;
 
     public  RedisLock(String key) {
         this .key = key;
         @SuppressWarnings ( "resource" )
         ApplicationContext  ctx =   new  ClassPathXmlApplicationContext( "classpath*:applicationContext.xml" );
         redisStringTemplate = (StringRedisTemplate)ctx.getBean( "redisStringTemplate" );
     }
 
     @Override
     public  void  lock() {
         //系統當前時間,毫秒
         long  nowTime = System.nanoTime();
         //請求鎖超時時間,毫秒
         long  timeout = TIME_OUT* 1000000 ;
         final  Random r =  new  Random();
         try  {
             //不斷循環向Master節點請求鎖,當請求時間(System.nanoTime() - nano)超過設定的超時時間則放棄請求鎖
             //這個可以防止一個客戶端在某個宕掉的master節點上阻塞過長時間
             //如果一個master節點不可用了,應該儘快嘗試下一個master節點
             while  ((System.nanoTime() - nowTime) < timeout) {
                 //將鎖作爲key存儲到redis緩存中,存儲成功則獲得鎖
                 if  (redisStringTemplate.getConnectionFactory().getConnection().setNX(key.getBytes(),
                         LOCKED.getBytes())) {
                     //設置鎖的有效期,也是鎖的自動釋放時間,也是一個客戶端在其他客戶端能搶佔鎖之前可以執行任務的時間
                     //可以防止因異常情況無法釋放鎖而造成死鎖情況的發生
                     redisStringTemplate.expire(key, EXPIRE, TimeUnit.SECONDS);
                     isLocked =  true ;
                     //上鎖成功結束請求
                     break ;
                 }
                 //獲取鎖失敗時,應該在隨機延時後進行重試,避免不同客戶端同時重試導致誰都無法拿到鎖的情況出現
                 //睡眠3毫秒後繼續請求鎖
                 Thread.sleep( 3 , r.nextInt( 500 ));
             }
         catch  (Exception e) {
             e.printStackTrace();
         }
     }
 
     @Override
     public  void  unlock() {
         //釋放鎖
         //不管請求鎖是否成功,只要已經上鎖,客戶端都會進行釋放鎖的操作
         if  (isLocked) {
             redisStringTemplate.delete(key);
         }
     }
 
     @Override
     public  void  lockInterruptibly()  throws  InterruptedException {
         // TODO Auto-generated method stub
         
     }
相關文章
相關標籤/搜索