源碼java
Redisson是一個在Redis的基礎上實現的Java駐內存數據網格(In-Memory Data Grid)。它不只提供了一系列的分佈式的Java經常使用對象,還提供了許多分佈式服務。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最簡單和最便捷的方法。Redisson的宗旨是促進使用者對Redis的關注分離(Separation of Concern),從而讓使用者可以將精力更集中地放在處理業務邏輯上。git
Redisson底層採用的是Netty 框架。支持Redis 2.8以上版本,支持Java1.6+以上版本。github
關於Redisson更多詳細介紹,可參考Redssion概述web
Redisson的分佈式可重入鎖RLock Java對象實現了java.util.concurrent.locks.Lock接口,同時還支持自動過時解鎖。下面是RLock的基本使用方法:redis
RLock lock = redisson.getLock("anyLock"); // 最多見的使用方法 lock.lock(); // 支持過時解鎖功能 // 10秒鐘之後自動解鎖 // 無需調用unlock方法手動解鎖 lock.lock(10, TimeUnit.SECONDS); // 嘗試加鎖,最多等待100秒,上鎖之後10秒自動解鎖 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS); ... lock.unlock();
Redisson同時還爲分佈式鎖提供了異步執行的相關方法:spring
RLock lock = redisson.getLock("anyLock"); lock.lockAsync(); lock.lockAsync(10, TimeUnit.SECONDS); Future<Boolean> res = lock.tryLockAsync(100, 10, TimeUnit.SECONDS);
Redisson分佈式可重入公平鎖也是實現了java.util.concurrent.locks.Lock接口的一種RLock對象。在提供了自動過時解鎖功能的同時,保證了當多個Redisson客戶端線程同時請求加鎖時,優先分配給先發出請求的線程。數組
RLock fairLock = redisson.getFairLock("anyLock"); // 最多見的使用方法 fairLock.lock(); // 支持過時解鎖功能 // 10秒鐘之後自動解鎖 // 無需調用unlock方法手動解鎖 fairLock.lock(10, TimeUnit.SECONDS); // 嘗試加鎖,最多等待100秒,上鎖之後10秒自動解鎖 boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS); ... fairLock.unlock();
Redisson還提供了其餘機制的鎖,如聯鎖(MultiLock)、紅鎖(RedLock)等。詳細可參考:分佈式鎖和同步器app
/** * 分佈式鎖回調接口 */ public interface DistributedLockCallback<T> { /** * 調用者必須在此方法中實現須要加分佈式鎖的業務邏輯 * * @return */ public T process(); /** * 獲得分佈式鎖名稱 * * @return */ public String getLockName(); }
/** * 分佈式鎖操做模板 */ public interface DistributedLockTemplate { long DEFAULT_WAIT_TIME = 30; long DEFAULT_TIMEOUT = 5; TimeUnit DEFAULT_TIME_UNIT = TimeUnit.SECONDS; /** * 使用分佈式鎖,使用鎖默認超時時間。 * @param callback * @param fairLock 是否使用公平鎖 * @return */ <T> T lock(DistributedLockCallback<T> callback, boolean fairLock); /** * 使用分佈式鎖。自定義鎖的超時時間 * * @param callback * @param leaseTime 鎖超時時間。超時後自動釋放鎖。 * @param timeUnit * @param fairLock 是否使用公平鎖 * @param <T> * @return */ <T> T lock(DistributedLockCallback<T> callback, long leaseTime, TimeUnit timeUnit, boolean fairLock); /** * 嘗試分佈式鎖,使用鎖默認等待時間、超時時間。 * @param callback * @param fairLock 是否使用公平鎖 * @param <T> * @return */ <T> T tryLock(DistributedLockCallback<T> callback, boolean fairLock); /** * 嘗試分佈式鎖,自定義等待時間、超時時間。 * @param callback * @param waitTime 獲取鎖最長等待時間 * @param leaseTime 鎖超時時間。超時後自動釋放鎖。 * @param timeUnit * @param fairLock 是否使用公平鎖 * @param <T> * @return */ <T> T tryLock(DistributedLockCallback<T> callback, long waitTime, long leaseTime, TimeUnit timeUnit, boolean fairLock); }
public class SingleDistributedLockTemplate implements DistributedLockTemplate { private RedissonClient redisson; public SingleDistributedLockTemplate() { } public SingleDistributedLockTemplate(RedissonClient redisson) { this.redisson = redisson; } @Override public <T> T lock(DistributedLockCallback<T> callback, boolean fairLock) { return lock(callback, DEFAULT_TIMEOUT, DEFAULT_TIME_UNIT, fairLock); } @Override public <T> T lock(DistributedLockCallback<T> callback, long leaseTime, TimeUnit timeUnit, boolean fairLock) { RLock lock = getLock(callback.getLockName(), fairLock); try { lock.lock(leaseTime, timeUnit); return callback.process(); } finally { if (lock != null && lock.isLocked()) { lock.unlock(); } } } @Override public <T> T tryLock(DistributedLockCallback<T> callback, boolean fairLock) { return tryLock(callback, DEFAULT_WAIT_TIME, DEFAULT_TIMEOUT, DEFAULT_TIME_UNIT, fairLock); } @Override public <T> T tryLock(DistributedLockCallback<T> callback, long waitTime, long leaseTime, TimeUnit timeUnit, boolean fairLock) { RLock lock = getLock(callback.getLockName(), fairLock); try { if (lock.tryLock(waitTime, leaseTime, timeUnit)) { return callback.process(); } } catch (InterruptedException e) { } finally { if (lock != null && lock.isLocked()) { lock.unlock(); } } return null; } private RLock getLock(String lockName, boolean fairLock) { RLock lock; if (fairLock) { lock = redisson.getFairLock(lockName); } else { lock = redisson.getLock(lockName); } return lock; } public void setRedisson(RedissonClient redisson) { this.redisson = redisson; } }
DistributedLockTemplate lockTemplate = ...; final String lockName = ...; lockTemplate.lock(new DistributedLockCallback<Object>() { @Override public Object process() { //do some business return null; } @Override public String getLockName() { return lockName; } }, false);
可是每次使用分佈式鎖都要寫相似上面的重複代碼,有沒有什麼方法能夠只關注核心業務邏輯代碼的編寫,即上面的"do some business"。下面介紹如何使用Spring AOP來實現這一目標。框架
@Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface DistributedLock { /** * 鎖的名稱。 * 若是lockName能夠肯定,直接設置該屬性。 */ String lockName() default ""; /** * lockName後綴 */ String lockNamePre() default ""; /** * lockName後綴 */ String lockNamePost() default "lock"; /** * 得到鎖名時拼接先後綴用到的分隔符 * @return */ String separator() default "."; /** * <pre> * 獲取註解的方法參數列表的某個參數對象的某個屬性值來做爲lockName。由於有時候lockName是不固定的。 * 當param不爲空時,能夠經過argNum參數來設置具體是參數列表的第幾個參數,不設置則默認取第一個。 * </pre> */ String param() default ""; /** * 將方法第argNum個參數做爲鎖 */ int argNum() default 0; /** * 是否使用公平鎖。 * 公平鎖即先來先得。 */ boolean fairLock() default false; /** * 是否使用嘗試鎖。 */ boolean tryLock() default false; /** * 最長等待時間。 * 該字段只有當tryLock()返回true纔有效。 */ long waitTime() default 30L; /** * 鎖超時時間。 * 超時時間事後,鎖自動釋放。 * 建議: * 儘可能縮簡須要加鎖的邏輯。 */ long leaseTime() default 5L; /** * 時間單位。默認爲秒。 */ TimeUnit timeUnit() default TimeUnit.SECONDS; }
@Aspect @Component public class DistributedLockAspect { @Autowired private DistributedLockTemplate lockTemplate; @Pointcut("@annotation(cn.sprinkle.study.distributedlock.common.annotation.DistributedLock)") public void DistributedLockAspect() {} @Around(value = "DistributedLockAspect()") public Object doAround(ProceedingJoinPoint pjp) throws Throwable { //切點所在的類 Class targetClass = pjp.getTarget().getClass(); //使用了註解的方法 String methodName = pjp.getSignature().getName(); Class[] parameterTypes = ((MethodSignature)pjp.getSignature()).getMethod().getParameterTypes(); Method method = targetClass.getMethod(methodName, parameterTypes); Object[] arguments = pjp.getArgs(); final String lockName = getLockName(method, arguments); return lock(pjp, method, lockName); } @AfterThrowing(value = "DistributedLockAspect()", throwing="ex") public void afterThrowing(Throwable ex) { throw new RuntimeException(ex); } public String getLockName(Method method, Object[] args) { Objects.requireNonNull(method); DistributedLock annotation = method.getAnnotation(DistributedLock.class); String lockName = annotation.lockName(), param = annotation.param(); if (isEmpty(lockName)) { if (args.length > 0) { if (isNotEmpty(param)) { Object arg; if (annotation.argNum() > 0) { arg = args[annotation.argNum() - 1]; } else { arg = args[0]; } lockName = String.valueOf(getParam(arg, param)); } else if (annotation.argNum() > 0) { lockName = args[annotation.argNum() - 1].toString(); } } } if (isNotEmpty(lockName)) { String preLockName = annotation.lockNamePre(), postLockName = annotation.lockNamePost(), separator = annotation.separator(); StringBuilder lName = new StringBuilder(); if (isNotEmpty(preLockName)) { lName.append(preLockName).append(separator); } lName.append(lockName); if (isNotEmpty(postLockName)) { lName.append(separator).append(postLockName); } lockName = lName.toString(); return lockName; } throw new IllegalArgumentException("Can't get or generate lockName accurately!"); } /** * 從方法參數獲取數據 * * @param param * @param arg 方法的參數數組 * @return */ public Object getParam(Object arg, String param) { if (isNotEmpty(param) && arg != null) { try { Object result = PropertyUtils.getProperty(arg, param); return result; } catch (NoSuchMethodException e) { throw new IllegalArgumentException(arg + "沒有屬性" + param + "或未實現get方法。", e); } catch (Exception e) { throw new RuntimeException("", e); } } return null; } public Object lock(ProceedingJoinPoint pjp, Method method, final String lockName) { DistributedLock annotation = method.getAnnotation(DistributedLock.class); boolean fairLock = annotation.fairLock(); boolean tryLock = annotation.tryLock(); if (tryLock) { return tryLock(pjp, annotation, lockName, fairLock); } else { return lock(pjp,lockName, fairLock); } } public Object lock(ProceedingJoinPoint pjp, final String lockName, boolean fairLock) { return lockTemplate.lock(new DistributedLockCallback<Object>() { @Override public Object process() { return proceed(pjp); } @Override public String getLockName() { return lockName; } }, fairLock); } public Object tryLock(ProceedingJoinPoint pjp, DistributedLock annotation, final String lockName, boolean fairLock) { long waitTime = annotation.waitTime(), leaseTime = annotation.leaseTime(); TimeUnit timeUnit = annotation.timeUnit(); return lockTemplate.tryLock(new DistributedLockCallback<Object>() { @Override public Object process() { return proceed(pjp); } @Override public String getLockName() { return lockName; } }, waitTime, leaseTime, timeUnit, fairLock); } public Object proceed(ProceedingJoinPoint pjp) { try { return pjp.proceed(); } catch (Throwable throwable) { throw new RuntimeException(throwable); } } private boolean isEmpty(Object str) { return str == null || "".equals(str); } private boolean isNotEmpty(Object str) { return !isEmpty(str); } }
有了上面兩段代碼,之後須要用到分佈式鎖,只需在覈心業務邏輯方法添加註解@DistributedLock,並設置LockName、fairLock等便可。下面的DistributionService演示了多種使用情景。異步
@Service public class DistributionService { @Autowired private RedissonClient redissonClient; @DistributedLock(param = "id", lockNamePost = ".lock") public Integer aspect(Person person) { RMap<String, Integer> map = redissonClient.getMap("distributionTest"); Integer count = map.get("count"); if (count > 0) { count = count - 1; map.put("count", count); } return count; } @DistributedLock(argNum = 1, lockNamePost = ".lock") public Integer aspect(String i) { RMap<String, Integer> map = redissonClient.getMap("distributionTest"); Integer count = map.get("count"); if (count > 0) { count = count - 1; map.put("count", count); } return count; } @DistributedLock(lockName = "lock", lockNamePost = ".lock") public int aspect(Action<Integer> action) { return action.action(); } }
定義一個Worker類:
public class Worker implements Runnable { private final CountDownLatch startSignal; private final CountDownLatch doneSignal; private final DistributionService service; private RedissonClient redissonClient; public Worker(CountDownLatch startSignal, CountDownLatch doneSignal, DistributionService service, RedissonClient redissonClient) { this.startSignal = startSignal; this.doneSignal = doneSignal; this.service = service; this.redissonClient = redissonClient; } @Override public void run() { try { startSignal.await(); System.out.println(Thread.currentThread().getName() + " start"); // Integer count = service.aspect(new Person(1, "張三")); // Integer count = service.aspect("1"); Integer count = service.aspect(() -> { RMap<String, Integer> map = redissonClient.getMap("distributionTest"); Integer count1 = map.get("count"); if (count1 > 0) { count1 = count1 - 1; map.put("count", count1); } return count1; }); System.out.println(Thread.currentThread().getName() + ": count = " + count); doneSignal.countDown(); } catch (InterruptedException ex) { System.out.println(ex); } } }
定義Controller類:
@RestController @RequestMapping("/distributedLockTest") public class DistributedLockTestController { private int count = 10; @Autowired private RedissonClient redissonClient; @Autowired private DistributionService service; @RequestMapping(method = RequestMethod.GET) public String distributedLockTest() throws Exception { RMap<String, Integer> map = redissonClient.getMap("distributionTest"); map.put("count", 8); CountDownLatch startSignal = new CountDownLatch(1); CountDownLatch doneSignal = new CountDownLatch(count); for (int i = 0; i < count; ++i) { // create and start threads new Thread(new Worker(startSignal, doneSignal, service)).start(); } startSignal.countDown(); // let all threads proceed doneSignal.await(); System.out.println("All processors done. Shutdown connection"); return "finish"; } }
Redisson基本配置:
singleServerConfig: idleConnectionTimeout: 10000 pingTimeout: 1000 connectTimeout: 10000 timeout: 3000 retryAttempts: 3 retryInterval: 1500 reconnectionTimeout: 3000 failedAttempts: 3 password: subscriptionsPerConnection: 5 clientName: null address: "redis://127.0.0.1:6379" subscriptionConnectionMinimumIdleSize: 1 subscriptionConnectionPoolSize: 50 connectionMinimumIdleSize: 10 connectionPoolSize: 64 database: 0 dnsMonitoring: false dnsMonitoringInterval: 5000 threads: 0 nettyThreads: 0 codec: !<org.redisson.codec.JsonJacksonCodec> {} useLinuxNativeEpoll: false
工程中須要注入的對象:
@Value("classpath:/redisson-conf.yml") Resource configFile; @Bean(destroyMethod = "shutdown") RedissonClient redisson() throws IOException { Config config = Config.fromYAML(configFile.getInputStream()); return Redisson.create(config); } @Bean DistributedLockTemplate distributedLockTemplate(RedissonClient redissonClient) { return new SingleDistributedLockTemplate(redissonClient); }
須要引入的依賴:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.5.3</version> </dependency> <dependency> <groupId>commons-beanutils</groupId> <artifactId>commons-beanutils</artifactId> <version>1.8.3</version> </dependency>
最後啓動工程,而後訪問localhost:8080/distributedLockTest,能夠看到以下結果:
觀察結果,能夠看出,10個線程中只有8個線程能執行count減1操做,並且多個線程是依次執行的。也就是說分佈式鎖起做用了。
該註解還能夠配合lambda使用。在介紹以前,先科普一下使用spring註解時須要注意的地方,有兩點。
第一,在使用spring提供的方法註解時,比較經常使用的是@Transactional
註解。如果Service層不帶註解的方法A調用同一個Service類帶@Transactional
註解的方法B,那麼方法B的事務註解將不起做用。好比:
... public void methodA() { methodB(); } @Transactional public void methodB() { // 操做表A // 操做表B } ...
上面的代碼中,假設有一次調用方法A,方法A又調用方法B,可是這次調用在操做表B時出錯了。咱們的意願是這樣的:以前對錶A的操做回滾。但實際上卻不會回滾,由於此時的@Transactional
註解並不會生效。緣由是調用方法B是同一個Service的方法A,而如果在其餘類中調用方法B註解才生效。這也就不難解釋爲何註解加在private方法上是不起做用的了。由於private方法只能在同一個方法中調用。
上面所說的調用同一個類的帶註解的方法,該註解將不生效,感興趣的能夠本身找找緣由,這裏就不細說了。
第二,註解(包括spring提供的、自定義的)加在普通類的方法上,spring是掃描不到的。普通類指類簽名上沒有諸如@Service
等Spring提供的註解(由於此分佈式鎖集成使用的是spring aop,因此介紹的都是與spring相關的)。好比,若是把上面貼出的DistributionService
中的各個方法放到Worker
中,那麼這些註解將不起做用,由於Worker
類簽名並無加任何註解,因此spring在掃描的時候直接跳過該類,所以定義在Worker
中的帶@DistributedLock
註解的方法(若是有的話)也就沒法被掃描到。
在上面貼出的代碼中,Worker中須要使用分佈式鎖的業務邏輯比較簡單,因此都寫到DistributionService
中,但在實際開發中,咱們一般有把業務邏輯直接寫在Worker
中的需求,畢竟是與Worker
相關的,放到哪個Service都感受很彆扭。因此,咱們能夠定義一個分佈式鎖管理器,如DistributedLockManager
,而後在初始化Worker時引入便可。接下來改造Worker
和定義DistributedLockManager
:
Worker1:
public class Worker1 implements Runnable { private final CountDownLatch startSignal; private final CountDownLatch doneSignal; private final DistributedLockManager distributedLockManager; private RedissonClient redissonClient; public Worker1(CountDownLatch startSignal, CountDownLatch doneSignal, DistributedLockManager distributedLockManager, RedissonClient redissonClient) { this.startSignal = startSignal; this.doneSignal = doneSignal; this.distributedLockManager = distributedLockManager; this.redissonClient = redissonClient; } @Override public void run() { try { System.out.println(Thread.currentThread().getName() + " start"); startSignal.await(); Integer count = aspect("lock"); System.out.println(Thread.currentThread().getName() + ": count = " + count); doneSignal.countDown(); } catch (Exception e) { e.printStackTrace(); } } public int aspect(String lockName) { return distributedLockManager.aspect(lockName, this); } public int aspectBusiness(String lockName) { RMap<String, Integer> map = redissonClient.getMap("distributionTest"); Integer count = map.get("count"); if (count > 0) { count = count - 1; map.put("count", count); } return count; } }
DistributedLockManager:
@Component public class DistributedLockManager { @DistributedLock(argNum = 1, lockNamePost = ".lock") public Integer aspect(String lockName, Worker1 worker1) { return worker1.aspectBusiness(lockName); } }
這樣作,雖然能夠將業務從Service層抽離出來,放到分佈式鎖管理器DistributedLockManager
統一管理,但每次都須要將Worker
一塊兒傳過去,一樣感受很彆扭。那麼有沒有更好的辦法呢?有,使用lambda。(上面鋪墊了那麼多,終於進入正題了!o(╥﹏╥)o)
lambda是java 8的新特性之一,若未了解過的建議先去惡補一番。由於java 8支持lambda,因此也新加了不少函數式接口,這裏簡單列幾個:
函數式接口 | 參數類型 | 返回類型 | 描述 |
---|---|---|---|
Supplier<T> | 無 | T | 提供一個T類型的值 |
Consumer<T> | T | void | 處理一個T類型的值 |
BiConsumer<T, U> | T, U | void | 處理T類型和U類型的值 |
Predicate<T> | T | boolean | 一個 計算Boolean值的函數 |
Function<T, R> | T | R | 一個參數類型爲T的函數 |
ToIntFunction<T> ToLongFunction<T> ToDoubleFunction<T> |
T | int long double |
分別計算int、long、double值的函數 |
IntFunction<R> LongFunction<R> DoubleFunction<R> |
int long double |
R | 參數分別爲int、long、double類型的函數 |
BiFunction<T, U, R> | T, U | R | 一個參數類型爲T和U的函數 |
UnaryOperator<T> | T | T | 對類型T進行的一元操做 |
BinaryOperator<T> | T, T | T | 對類型T進行的二元操做 |
觀察Worker1
中方法aspect(Person)
的邏輯,最後須要返回一個int值,因此咱們可使用Supplier<T>
來做爲參數的類型,在分佈式鎖管理器中添加一個方法,以下:
@DistributedLock(lockName = "lock", lockNamePost = ".lock") public int aspect(Supplier<Integer> supplier) { return supplier.get(); }
而後,在Worker1
中也定義一個方法:
private int aspect() { RMap<String, Integer> map = redissonClient.getMap("distributionTest"); Integer count1 = map.get("count"); if (count1 > 0) { count1 = count1 - 1; map.put("count", count1); } return count1; }
最後在Worker1
的run方法中使用,把Integer count = aspect("lock");
替換成以下:
Integer count = distributedLockManager.aspect(() -> { return aspect(); });
其實也能夠簡寫:
Integer count = distributedLockManager.aspect(() -> aspect());
經過這樣改造,是否是發現優雅多了。
在DistributedLockTestController
中,幫下面的代碼替換成另外一段代碼:
for (int i = 0; i < count; ++i) { // create and start threads new Thread(new Worker(startSignal, doneSignal, service, redissonClient)).start(); }
替換成:
for (int i = 0; i < count; ++i) { // create and start threads new Thread(new Worker1(startSignal, doneSignal, distributedLockManager, redissonClient)).start(); }
最後啓動工程,訪問http://localhost:8080/distributedLockTest,能夠看到相似以下的結果:
另外,由於暫時沒有找到合適的參數類型爲「無」、返回類型也爲「無」的函數式接口(找到一個——Runnable,但若是用了,怕產生歧義,因此就算了),既然如此,咱們不妨本身定義一個,以下:
@FunctionalInterface public interface Action { void action(); }
使用也很簡單,在DistributedLockManager
定義相似以下的方法:
@DistributedLock(lockName = "lock", lockNamePost = ".lock") public void doSomething(Action action) { action.action(); }
而後,在須要的地方這樣用:
distributedLockManager.doSomething(() -> { // do something });
至此,使用Redisson實現分佈式鎖,而後使用Spring AOP簡化分佈式鎖介紹完畢。
如有什麼地方有錯誤的或須要改進的,歡迎留言一塊兒討論交流。