本文主要研究一下Elasticsearch的ReleasableLockjava
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/ReleasableLock.javagit
public class ReleasableLock implements Releasable {
private final Lock lock;
// a per-thread count indicating how many times the thread has entered the lock; only works if assertions are enabled
private final ThreadLocal<Integer> holdingThreads;
public ReleasableLock(Lock lock) {
this.lock = lock;
if (Assertions.ENABLED) {
holdingThreads = new ThreadLocal<>();
} else {
holdingThreads = null;
}
}
@Override
public void close() {
lock.unlock();
assert removeCurrentThread();
}
public ReleasableLock acquire() throws EngineException {
lock.lock();
assert addCurrentThread();
return this;
}
private boolean addCurrentThread() {
final Integer current = holdingThreads.get();
holdingThreads.set(current == null ? 1 : current + 1);
return true;
}
private boolean removeCurrentThread() {
final Integer count = holdingThreads.get();
assert count != null && count > 0;
if (count == 1) {
holdingThreads.remove();
} else {
holdingThreads.set(count - 1);
}
return true;
}
public boolean isHeldByCurrentThread() {
if (holdingThreads == null) {
throw new UnsupportedOperationException("asserts must be enabled");
}
final Integer count = holdingThreads.get();
return count != null && count > 0;
}
}
複製代碼
close方法
);它的構造器要求輸入Lock參數,只有在開啓了assertions的條件下才會初始化holdingThreads;isHeldByCurrentThread方法判斷調用線程是否正在使用lockelasticsearch-7.0.1/server/src/test/java/org/elasticsearch/common/util/concurrent/ReleasableLockTests.javagithub
public class ReleasableLockTests extends ESTestCase {
/**
* Test that accounting on whether or not a thread holds a releasable lock is correct. Previously we had a bug where on a re-entrant
* lock that if a thread entered the lock twice we would declare that it does not hold the lock after it exits its first entrance but
* not its second entrance.
*
* @throws BrokenBarrierException if awaiting on the synchronization barrier breaks
* @throws InterruptedException if awaiting on the synchronization barrier is interrupted
*/
public void testIsHeldByCurrentThread() throws BrokenBarrierException, InterruptedException {
final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
final ReleasableLock readLock = new ReleasableLock(readWriteLock.readLock());
final ReleasableLock writeLock = new ReleasableLock(readWriteLock.writeLock());
final int numberOfThreads = scaledRandomIntBetween(1, 32);
final int iterations = scaledRandomIntBetween(1, 32);
final CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);
final List<Thread> threads = new ArrayList<>();
for (int i = 0; i < numberOfThreads; i++) {
final Thread thread = new Thread(() -> {
try {
barrier.await();
} catch (final BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
for (int j = 0; j < iterations; j++) {
if (randomBoolean()) {
acquire(readLock, writeLock);
} else {
acquire(writeLock, readLock);
}
}
try {
barrier.await();
} catch (final BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
});
threads.add(thread);
thread.start();
}
barrier.await();
barrier.await();
for (final Thread thread : threads) {
thread.join();
}
}
private void acquire(final ReleasableLock lockToAcquire, final ReleasableLock otherLock) {
try (@SuppressWarnings("unused") Releasable outer = lockToAcquire.acquire()) {
assertTrue(lockToAcquire.isHeldByCurrentThread());
assertFalse(otherLock.isHeldByCurrentThread());
try (@SuppressWarnings("unused") Releasable inner = lockToAcquire.acquire()) {
assertTrue(lockToAcquire.isHeldByCurrentThread());
assertFalse(otherLock.isHeldByCurrentThread());
}
// previously there was a bug here and this would return false
assertTrue(lockToAcquire.isHeldByCurrentThread());
assertFalse(otherLock.isHeldByCurrentThread());
}
assertFalse(lockToAcquire.isHeldByCurrentThread());
assertFalse(otherLock.isHeldByCurrentThread());
}
}
複製代碼
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/cache/Cache.javabash
private static class CacheSegment<K, V> {
// read/write lock protecting mutations to the segment
ReadWriteLock segmentLock = new ReentrantReadWriteLock();
ReleasableLock readLock = new ReleasableLock(segmentLock.readLock());
ReleasableLock writeLock = new ReleasableLock(segmentLock.writeLock());
Map<K, CompletableFuture<Entry<K, V>>> map = new HashMap<>();
SegmentStats segmentStats = new SegmentStats();
/**
* get an entry from the segment; expired entries will be returned as null but not removed from the cache until the LRU list is
* pruned or a manual {@link Cache#refresh()} is performed however a caller can take action using the provided callback
*
* @param key the key of the entry to get from the cache
* @param now the access time of this entry
* @param isExpired test if the entry is expired
* @param onExpiration a callback if the entry associated to the key is expired
* @return the entry if there was one, otherwise null
*/
Entry<K, V> get(K key, long now, Predicate<Entry<K, V>> isExpired, Consumer<Entry<K, V>> onExpiration) {
CompletableFuture<Entry<K, V>> future;
try (ReleasableLock ignored = readLock.acquire()) {
future = map.get(key);
}
if (future != null) {
Entry<K, V> entry;
try {
entry = future.get();
} catch (ExecutionException e) {
assert future.isCompletedExceptionally();
segmentStats.miss();
return null;
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
if (isExpired.test(entry)) {
segmentStats.miss();
onExpiration.accept(entry);
return null;
} else {
segmentStats.hit();
entry.accessTime = now;
return entry;
}
} else {
segmentStats.miss();
return null;
}
}
/**
* put an entry into the segment
*
* @param key the key of the entry to add to the cache
* @param value the value of the entry to add to the cache
* @param now the access time of this entry
* @return a tuple of the new entry and the existing entry, if there was one otherwise null
*/
Tuple<Entry<K, V>, Entry<K, V>> put(K key, V value, long now) {
Entry<K, V> entry = new Entry<>(key, value, now);
Entry<K, V> existing = null;
try (ReleasableLock ignored = writeLock.acquire()) {
try {
CompletableFuture<Entry<K, V>> future = map.put(key, CompletableFuture.completedFuture(entry));
if (future != null) {
existing = future.handle((ok, ex) -> {
if (ok != null) {
return ok;
} else {
return null;
}
}).get();
}
} catch (ExecutionException | InterruptedException e) {
throw new IllegalStateException(e);
}
}
return Tuple.tuple(entry, existing);
}
/**
* remove an entry from the segment
*
* @param key the key of the entry to remove from the cache
* @param onRemoval a callback for the removed entry
*/
void remove(K key, Consumer<CompletableFuture<Entry<K, V>>> onRemoval) {
CompletableFuture<Entry<K, V>> future;
try (ReleasableLock ignored = writeLock.acquire()) {
future = map.remove(key);
}
if (future != null) {
segmentStats.eviction();
onRemoval.accept(future);
}
}
/**
* remove an entry from the segment iff the future is done and the value is equal to the
* expected value
*
* @param key the key of the entry to remove from the cache
* @param value the value expected to be associated with the key
* @param onRemoval a callback for the removed entry
*/
void remove(K key, V value, Consumer<CompletableFuture<Entry<K, V>>> onRemoval) {
CompletableFuture<Entry<K, V>> future;
boolean removed = false;
try (ReleasableLock ignored = writeLock.acquire()) {
future = map.get(key);
try {
if (future != null) {
if (future.isDone()) {
Entry<K, V> entry = future.get();
if (Objects.equals(value, entry.value)) {
removed = map.remove(key, future);
}
}
}
} catch (ExecutionException | InterruptedException e) {
throw new IllegalStateException(e);
}
}
if (future != null && removed) {
segmentStats.eviction();
onRemoval.accept(future);
}
}
private static class SegmentStats {
private final LongAdder hits = new LongAdder();
private final LongAdder misses = new LongAdder();
private final LongAdder evictions = new LongAdder();
void hit() {
hits.increment();
}
void miss() {
misses.increment();
}
void eviction() {
evictions.increment();
}
}
}
複製代碼
close方法
),而該接口繼承了java.lang.AutoCloseable接口,於是能夠直接利用try with resources語法來自動close,從而釋放鎖close方法
);它的構造器要求輸入Lock參數,只有在開啓了assertions的條件下才會初始化holdingThreads;isHeldByCurrentThread方法判斷調用線程是否正在使用lockReleasableLock實現了Releasable接口(
close方法
),而該接口繼承了java.lang.AutoCloseable接口,於是能夠直接利用try with resources語法來自動close,從而釋放鎖多線程