分佈式緩存技術redis系列(五)——redis實戰(redis與spring整合,分佈式鎖實現)

本文是redis學習系列的第五篇,點擊下面連接可回看系列文章html

《redis簡介以及linux上的安裝》java

《詳細講解redis數據結構(內存模型)以及經常使用命令》linux

《redis高級應用(主從、事務與鎖、持久化)》redis

《redis高級應用(集羣搭建、集羣分區原理、集羣操做》算法

本文咱們繼續學習redis與spring的整合,整合以後就能夠用redisStringTemplate的setNX()和delete()方法實現分佈式鎖了。spring

Redis與spring的整合

相關依賴jar包

spring把專門的數據操做獨立封裝在spring-data系列中,spring-data-redis是對Redis的封裝apache

<dependency>
	<groupId>org.springframework.data</groupId>
	<artifactId>spring-data-redis</artifactId>
	<version>1.4.2.RELEASE</version>
</dependency>

<dependency>
	<groupId>redis.clients</groupId>
	<artifactId>jedis</artifactId>
	<version>2.6.2</version>
</dependency>

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
    <version>2.4.2</version>
</dependency>

Spring 配置文件applicationContext.xml

    <!--命令空間中加入下面這行-->
    xmlns:p="http://www.springframework.org/schema/p"

    <!-- redis鏈接池配置文件 -->
    <context:property-placeholder location="classpath:redis.properties" />  
  
    <bean id="poolConfig" class="redis.clients.jedis.JedisPoolConfig">  
        <property name="maxIdle" value="${redis.maxIdle}" />  
        <property name="maxTotal" value="${redis.maxTotal}" />  
        <property name="MaxWaitMillis" value="${redis.MaxWaitMillis}" />  
        <property name="testOnBorrow" value="${redis.testOnBorrow}" />  
    </bean>  
      
    <bean id="connectionFactory" class="org.springframework.data.    redis.connection.jedis.JedisConnectionFactory"  
        p:host-name="${redis.host}" p:port="${redis.port}" 
        p:password="${redis.pass}"  p:pool-config-ref="poolConfig"/>  
      
    <bean id="redisTemplate" class="org.springframework.data.    redis.core.RedisTemplate">  
        <property name="connectionFactory"   ref="connectionFactory" />  
    </bean>

  

注意新版的maxTotal,MaxWaitMillis這兩個字段與舊版的不一樣緩存

redis鏈接池配置文件redis.properties

redis.host=192.168.2.129
redis.port=6379  
redis.pass=redis129  

redis.maxIdle=300  
redis.maxTotal=600  
redis.MaxWaitMillis=1000  
redis.testOnBorrow=true 

好了,配置完成,下面寫上代碼安全

測試代碼

User

@Entity
@Table(name = "t_user")
public class User {
	//主鍵
	private String id;
	//用戶名
	private String userName;
        //...省略get,set...
}

BaseRedisDao

@Repository
public abstract class BaseRedisDao<K,V> {
	
	@Autowired(required=true)  
    protected RedisTemplate<K, V> redisTemplate;

}

IUserDao 

public interface IUserDao {
	
	public boolean save(User user);
	
	public boolean update(User user);

	public boolean delete(String userIds);
	
	public User find(String userId);
	
}

UserDao 

@Repository
public class UserDao extends BaseRedisDao<String, User> implements IUserDao {
	
	@Override
	public boolean save(final User user) {
		boolean res = redisTemplate.execute(new RedisCallback<Boolean>() {
			public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
				RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
				byte[] key = serializer.serialize(user.getId());
				byte[] value = serializer.serialize(user.getUserName());
				//set not exits
				return connection.setNX(key, value);
			}
		});
		return res;
	}

	@Override
	public boolean update(final User user) {
        boolean result = redisTemplate.execute(new RedisCallback<Boolean>() {  
            public Boolean doInRedis(RedisConnection connection) throws DataAccessException {  
                RedisSerializer<String> serializer = redisTemplate.getStringSerializer();  
                byte[] key  = serializer.serialize(user.getId());  
                byte[] name = serializer.serialize(user.getUserName());  
                //set
                connection.set(key, name);  
                return true;  
            }  
        });  
        return result;
	}

	@Override
	public User find(final String userId) {
		User result = redisTemplate.execute(new RedisCallback<User>() {  
            public User doInRedis(RedisConnection connection) throws DataAccessException {  
                RedisSerializer<String> serializer = redisTemplate.getStringSerializer();  
                byte[] key = serializer.serialize(userId);
                //get
                byte[] value = connection.get(key);  
                if (value == null) {  
                    return null;  
                }  
                String name = serializer.deserialize(value); 
                User resUser = new User();
                resUser.setId(userId);
                resUser.setUserName(name);
                return resUser;  
            }  
        });  
        return result;  
	}

	@Override
	public boolean delete(final String userId) {
		boolean result = redisTemplate.execute(new RedisCallback<Boolean>() {  
            public Boolean doInRedis(RedisConnection connection) throws DataAccessException {  
                RedisSerializer<String> serializer = redisTemplate.getStringSerializer();  
                byte[] key  = serializer.serialize(userId);  
                //delete 
                connection.del(key);
                return true;  
            }  
        });  
		return result;
	}

}

Test

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath*:applicationContext.xml"})  
public class RedisTest extends AbstractJUnit4SpringContextTests {  
      
    @Autowired  
    private IUserDao userDao;
    
    @Test  
    public void testSaveUser() {  
        User user = new User();  
        user.setId("402891815170e8de015170f6520b0000");  
        user.setUserName("zhangsan");  
        boolean res = userDao.save(user);
        Assert.assertTrue(res);  
    }  
    
    @Test  
    public void testGetUser() {  
        User user = new User();  
        user = userDao.find("402891815170e8de015170f6520b0000");
        System.out.println(user.getId() + "-" + user.getUserName() );  
    }  
    
    @Test  
    public void testUpdateUser() {  
    	User user = new User();  
        user.setId("402891815170e8de015170f6520b0000");  
        user.setUserName("lisi");  
        boolean res = userDao.update(user);
        Assert.assertTrue(res);  
    }  
    
    @Test  
    public void testDeleteUser() {  
    	boolean res = userDao.delete("402891815170e8de015170f6520b0000");
    	Assert.assertTrue(res);  
    }  
   
}

  

String類型的增刪該查已完成,Hash,List,Set數據類型的操做就不舉例了,和使用命令的方式差很少。以下數據結構

 

connection.hSetNX(key, field, value);
connection.hDel(key, fields);
connection.hGet(key, field);

connection.lPop(key);
connection.lPush(key, value);
connection.rPop(key);
connection.rPush(key, values);

connection.sAdd(key, values);
connection.sMembers(key);
connection.sDiff(keys);
connection.sPop(key);

  

整合可能遇到的問題

1.NoSuchMethodError

 java.lang.NoSuchMethodError: org.springframework.core.serializer.support.DeserializingConverter.<init>(Ljava/lang/ClassLoader;)V

Caused by: java.lang.NoSuchMethodError: redis.clients.jedis.JedisShardInfo.setTimeout(I)V

  

相似找不到類,找不到方法的問題,當肯定依賴的jar已經引入以後,此類問題多事spring-data-redis以及jedis版本問題,多換個版本試試,本文上面提到的版本可使用。

1.No qualifying bean

No qualifying bean of type [org.springframework.data.redis.core.RedisTemplate] found for dependency

  

找不到bean,考慮applicationContext.xml中配置redisTemplate bean時實現類是否寫錯。例如,BaseRedisDao注入的是RedisTemplate類型的對象,applicationContext.xml中配置的實現類倒是RedisTemplate的子類StringRedisTemplate,那確定報錯。整合好後,下面咱們着重學習基於redis的分佈式鎖的實現。

基於redis實現的分佈式鎖

咱們知道,在多線程環境中,鎖是實現共享資源互斥訪問的重要機制,以保證任什麼時候刻只有一個線程在訪問共享資源。鎖的基本原理是:用一個狀態值表示鎖,對鎖的佔用和釋放經過狀態值來標識,所以基於redis實現的分佈式鎖主要依賴redis的SETNX命令和DEL命令,SETNX至關於上鎖,DEL至關於釋放鎖,固然,在下面的具體實現中會更復雜些。之因此稱爲分佈式鎖,是由於客戶端能夠在redis集羣環境中向集羣中任一個可用Master節點請求上鎖(即SETNX命令存儲key到redis緩存中是隨機的)。

 

如今相信你已經對在基於redis實現的分佈式鎖的基本概念有了解,須要注意的是,這個和前面文章提到的使用WATCH 命令對key值進行鎖操做沒有直接的關係。java中synchronized和Lock對象都能對共享資源進行加鎖,下面咱們將學習用java實現的redis分佈式鎖。

java中的鎖技術

在分析java實現的redis分佈式鎖以前,咱們先來回顧下java中的鎖技術,爲了直觀的展現,咱們採用「多個線程共享輸出設備」來舉例。

不加鎖共享輸出設備

public class LockTest {
	//不加鎖
	static class Outputer {
		public void output(String name) {
			for(int i=0; i<name.length(); i++) {
				System.out.print(name.charAt(i));
			}
			System.out.println();
		}
	}
	public static void main(String[] args) {
		final Outputer output = new Outputer();
		//線程1打印zhangsan
		new Thread(new Runnable(){
			@Override
			public void run() {
				while(true) {
					 try{
						 Thread.sleep(1000);
					 }catch(InterruptedException e) {
						 e.printStackTrace();
					 }
					 output.output("zhangsan");
				}	
			}
		}).start();
		
		//線程2打印lingsi
		new Thread(new Runnable(){
			@Override
			public void run() {
				while(true) {
					 try{
						 Thread.sleep(1000);
					 }catch(InterruptedException e) {
						 e.printStackTrace();
					 }
					 output.output("lingsi");
				}
			}
		}).start();
		
		//線程3打印wangwu
		new Thread(new Runnable(){
			@Override
			public void run() {
				while(true) {
					 try{
						 Thread.sleep(1000);
					 }catch(InterruptedException e) {
						 e.printStackTrace();
					 }
					 output.output("huangwu");
				}
			}
		}).start();
	}
}

 

上面例子中,三個線程同時共享輸出設備output,線程1須要打印zhangsan,線程2須要打印lingsi,線程3須要打印wangwu。在不加鎖的狀況,這三個線程會不會由於得不到輸出設備output打架呢,咱們來看看運行結果:

 

huangwu
zhangslingsi
an
huangwu
zlingsi
hangsan
huangwu
lzhangsan
ingsi
huangwu
lingsi

  

從運行結果能夠看出,三個線程打架了,線程1沒打印完zhangsan,線程2就來搶輸出設備......可見,這不是咱們想要的,咱們想要的是線程之間能有序的工做,各個線程之間互斥的使用輸出設備output。

使用java5中的Lock對輸出設備加鎖

如今咱們對Outputer進行改進,給它加上鎖,加鎖以後每次只有一個線程能訪問它。

 

//使用java5中的鎖
static class Outputer{
	Lock lock = new ReentrantLock();
	public void output(String name) {
		//傳統java加鎖
		//synchronized (Outputer.class){
		lock.lock();
		try {
			for(int i=0; i<name.length(); i++) {
				System.out.print(name.charAt(i));
			}
			System.out.println();
		}finally{
			//任何狀況下都有釋放鎖
			lock.unlock();
		}	
		//}
	}
}

  

看看加鎖後的輸出結果:

zhangsan
lingsi
huangwu
zhangsan
lingsi
huangwu
zhangsan
lingsi
huangwu
zhangsan
lingsi
huangwu
zhangsan
lingsi
huangwu
......

  

從運行結果中能夠看出,三個線程之間不打架了,線程之間的打印變得有序。有個這個基礎,下面咱們來學習基於Redis實現的分佈式鎖就更容易了。

Redis分佈式鎖

實現分析

從上面java鎖的使用中能夠看出,鎖對象主要有lock與unlock方法,在lock與unlock方法之間的代碼(臨界區)能保證線程互斥訪問。基於redis實現的Java分佈式鎖主要依賴redis的SETNX命令和DEL命令,SETNX至關於上鎖(lock),DEL至關於釋放鎖(unlock)。咱們只要實現Lock接口重寫lock()和unlock()便可。可是這還不夠,安全可靠的分佈式鎖應該知足知足下面三個條件:

l 互斥,無論任什麼時候候,只有一個客戶端能持有同一個鎖。

l 不會死鎖,最終必定會獲得鎖,即便持有鎖的客戶端對應的master節點宕掉。

l 容錯,只要大多數Redis節點正常工做,客戶端應該都能獲取和釋放鎖。

那麼什麼狀況下回不知足上面三個條件呢。多個線程(客戶端)同時競爭鎖可能會致使多個客戶端同時擁有鎖。好比,

(1)線程1在master節點拿到了鎖(存入key)

(2)master節點在把線程1建立的key寫入slave以前宕機了,此時集羣中的節點已經沒有鎖(key)了,包括master節點的slaver節點

(3)slaver節點升級爲master節點

(4)線程2向新的master節點發起鎖(存入key)請求,很明顯,能請求成功。

可見,線程1和線程2同時得到了鎖。若是在更高併發的狀況,可能會有更多線程(客戶端)獲取鎖,這種狀況就會致使上文所說的線程「打架」問題,線程之間的執行雜亂無章。

 

那什麼狀況下又會發生死鎖的狀況呢。若是擁有鎖的線程(客戶端)長時間的執行或者由於某種緣由形成阻塞,就會致使鎖沒法釋放(unlock沒有調用),其它線程就不能獲取鎖而而產生無限期死鎖的狀況。其它線程在執行lock失敗後即便粗暴的執行unlock刪除key以後也不能正常釋放鎖,由於鎖就只能由得到鎖的線程釋放,鎖不能正常釋放其它線程仍然獲取不到鎖。解決死鎖的最好方式是設置鎖的有效時間(redis的expire命令),不論是什麼緣由致使的死鎖,有效時間事後,鎖將會被自動釋放。

 

爲了保障容錯功能,即只要有Redis節點正常工做,客戶端應該都能獲取和釋放鎖,咱們必須用相同的key不斷循環向Master節點請求鎖,當請求時間超過設定的超時時間則放棄請求鎖,這個能夠防止一個客戶端在某個宕掉的master節點上阻塞過長時間,若是一個master節點不可用了,應該儘快嘗試下一個master節點。釋放鎖比較簡單,由於只須要在全部節點都釋放鎖就行,無論以前有沒有在該節點獲取鎖成功。

Redlock算法

根據上面的分析,官方提出了一種用Redis實現分佈式鎖的算法,這個算法稱爲RedLock。RedLock算法的主要流程以下:

 

RedLock算法主要流程

 

 

Java實現

 

結合上面的流程圖,加上下面的代碼解釋,相信你必定能理解redis分佈式鎖的實現原理

 

public class RedisLock implements Lock{

	protected StringRedisTemplate redisStringTemplate;

	// 存儲到redis中的鎖標誌
	private static final String LOCKED = "LOCKED";

	// 請求鎖的超時時間(ms)
	private static final long TIME_OUT = 30000;

	// 鎖的有效時間(s)
	public static final int EXPIRE = 60;

	// 鎖標誌對應的key;
	private String key;

	// state flag
	private volatile boolean isLocked = false;

	public RedisLock(String key) {
		this.key = key;
		@SuppressWarnings("resource")
		ApplicationContext  ctx =  new ClassPathXmlApplicationContext("classpath*:applicationContext.xml");
		redisStringTemplate = (StringRedisTemplate)ctx.getBean("redisStringTemplate");
	}

	@Override
	public void lock() {
		//系統當前時間,毫秒
		long nowTime = System.nanoTime();
		//請求鎖超時時間,毫秒
		long timeout = TIME_OUT*1000000;
		final Random r = new Random();
		try {
			//不斷循環向Master節點請求鎖,當請求時間(System.nanoTime() - nano)超過設定的超時時間則放棄請求鎖
			//這個能夠防止一個客戶端在某個宕掉的master節點上阻塞過長時間
			//若是一個master節點不可用了,應該儘快嘗試下一個master節點
			while ((System.nanoTime() - nowTime) < timeout) {
				//將鎖做爲key存儲到redis緩存中,存儲成功則得到鎖
				if (redisStringTemplate.getConnectionFactory().getConnection().setNX(key.getBytes(),
						LOCKED.getBytes())) {
					//設置鎖的有效期,也是鎖的自動釋放時間,也是一個客戶端在其餘客戶端能搶佔鎖以前能夠執行任務的時間
					//能夠防止因異常狀況沒法釋放鎖而形成死鎖狀況的發生
					redisStringTemplate.expire(key, EXPIRE, TimeUnit.SECONDS);
					isLocked = true;
					//上鎖成功結束請求
					break;
				}
				//獲取鎖失敗時,應該在隨機延時後進行重試,避免不一樣客戶端同時重試致使誰都沒法拿到鎖的狀況出現
				//睡眠3毫秒後繼續請求鎖
				Thread.sleep(3, r.nextInt(500));
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Override
	public void unlock() {
		//釋放鎖
		//無論請求鎖是否成功,只要已經上鎖,客戶端都會進行釋放鎖的操做
		if (isLocked) {
			redisStringTemplate.delete(key);
		}
	}

	@Override
	public void lockInterruptibly() throws InterruptedException {
		// TODO Auto-generated method stub
		
	}

	@Override
	public boolean tryLock() {
		// TODO Auto-generated method stub
		return false;
	}

	@Override
	public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
		// TODO Auto-generated method stub
		return false;
	}

	@Override
	public Condition newCondition() {
		// TODO Auto-generated method stub
		return null;
	}
}

 

好了,RedisLock已經實現,咱們對Outputer使用RedisLock進行修改

 

/使用RedisLock
static class Outputer {
	//建立一個名爲redisLock的RedisLock類型的鎖
	RedisLock redisLock = new RedisLock("redisLock");
	public void output(String name) {
		//上鎖
		redisLock.lock();
		try {
			for(int i=0; i<name.length(); i++) {
				System.out.print(name.charAt(i));
			}
			System.out.println();
		}finally{
			//任何狀況下都要釋放鎖
			redisLock.unlock();
		}	
	}
}

  

看看使用RedisLock加鎖後的的運行結果

 

lingsi
zhangsan
huangwu
lingsi
zhangsan
huangwu
lingsi
zhangsan
huangwu
lingsi
zhangsan
huangwu
lingsi
zhangsan
huangwu
......

  

可見,使用RedisLock加鎖後線程之間再也不「打架」,三個線程互斥的訪問output。

問題

如今我沒法論證RedLock算法在分佈式、高併發環境下的可靠性,但從本例三個線程的運行結果看,RedLock算法確實保證了三個線程互斥的訪問output(redis.maxIdle=300 redis.maxTotal=600,運行到Timeout waiting for idle object都沒有出現線程「打架」的問題)。我認爲RedLock算法仍有些問題沒說清楚,好比,如何防止宕機時多個線程同時得到鎖;RedLock算法在釋放鎖的處理上,無論線程是否獲取鎖成功,只要上了鎖,就會到每一個master節點上釋放鎖,這就會致使一個線程上的鎖可能會被其餘線程釋放掉,這就和每一個鎖只能被得到鎖的線程釋放相互矛盾。這些有待後續進一步交流學習研究。

 

參考文檔

http://redis.io/topics/distlock

http://ifeve.com/redis-lock/

相關文章
相關標籤/搜索