讀書筆記部份內容來源書出版書,版權歸本書做者,若有錯誤,請指正。java
歡迎star、fork,讀書筆記系列會同步更新git
git
github
https://github.com/xuminwlt/j360-jdk編程
module安全
j360-jdk-thread/me.j360.jdk.concurrent併發
本書前三章分別爲框架
併發編程的挑戰,也就是併發編程的原因所在less
底層的實現原理分佈式
java內存模型工具
分別從cpu x86,x64以及內存模型等概念中描述java對併發編程的實現和控制,概念較爲底層和基礎,讀書筆記略過前三章直接從第四章應用實現及原理基礎開始。
併發編程基礎
java中的鎖
併發容器和框架(重點)
13個操做原子類
java併發工具類
線程池
Execurot框架
Lock接口出現以前,java是經過synchronized關鍵字實現的鎖功能,javase5以後,併發包新增了Lock接口
Lock使用方式,和分佈式鎖的構造很像。
Lock lock = new ReentrantLock lock.lock(); try{ }finally{ lock.unlock(); }
Lock接口提供了Synchronized關鍵字不具有的特性
嘗試非阻塞地獲取鎖 | 當前線程嘗試獲取鎖,沒有其餘線程獲取鎖,則成功 |
能被中斷的獲取鎖 | |
超時獲取鎖 | 在指定的時間內獲取鎖 |
Lock接口的API
void lock() | |
void lockInterruptibly() throws InterruptedException | |
boolean tryLock() | |
boolean tryLock(long time,TimeUtil unit) throws InterruptedException | |
void unlock() | |
Condition newCondition | 獲取等待通知組件,該組件和當前的鎖綁定,當前線程只有獲取了鎖,才能調用該組件的wait()方法,而調用後,當前線程將會釋放鎖 |
鎖的實現基於隊列同步器完成,AbstractQueuedSynchronized(簡稱同步器),使用一個int成員變量表示同步狀態,經過內置的FIFO隊列來完成資源獲取線程的排隊工做
public class ReentrantLock implements Lock, java.io.Serializable { private static final long serialVersionUID = 7373984872572414699L; /** Synchronizer providing all implementation mechanics */ private final Sync sync; /** * Base of synchronization control for this lock. Subclassed * into fair and nonfair versions below. Uses AQS state to * represent the number of holds on the lock. 在這裏!!! */ abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; /** * Performs {@link Lock#lock}. The main reason for subclassing * is to allow fast path for nonfair version. */ abstract void lock(); /** * Performs non-fair tryLock. tryAcquire is * implemented in subclasses, but both need nonfair * try for trylock method. */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } protected final boolean isHeldExclusively() { // While we must in general read state before owner, // we don't need to do so to check if current thread is owner return getExclusiveOwnerThread() == Thread.currentThread(); } final ConditionObject newCondition() { return new ConditionObject(); } // Methods relayed from outer class final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } final boolean isLocked() { return getState() != 0; } /** * Reconstitutes this lock instance from a stream. * @param s the stream */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); // reset to unlocked state } } /** * Sync object for non-fair locks 非公平鎖 */ static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } /** * Sync object for fair locks 公平鎖 */ static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); } /** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
支持重進入的鎖,可以支持一個線程對資源的重複加鎖,代碼見上面。
特性
公平性選擇 | |
重進入 | |
鎖降級 |
接口示例
int getReadLockCount() | 讀鎖被或許的次數 |
int getReadHoldCount() | 當前線程或許讀鎖的次數 |
int getWriteLockCount() | |
int getWriteHoldCount() |
經過Cache來解釋讀寫鎖,HashMap是非線程安全的,經過讀寫鎖實現Cache的線程安全
public class Cache { static Map<String,Object> map = new HashMap<String,Object>(); static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); static Lock r = rwl.readLock(); static Lock w = rwl.writeLock(); public static final Object get(String key){ r.lock(); try { return map.get(key); }finally { r.unlock(); } } public static final Object put(String key,Object value){ w.lock(); try { return map.put(key,value); }finally { w.unlock(); } } public static final void clear() { w.lock(); try { map.clear(); }finally { w.unlock(); } } }
public class ConditionUseCase { private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public static void main(String[] args){ } public void conditionWait() throws InterruptedException { lock.lock(); try { condition.await(); }finally { lock.unlock(); } } public void conditionSignal(){ lock.lock(); try { condition.signal(); }finally { lock.unlock(); } } }
部分方法描述
void await() | 當前線程進入等待狀態,直到被通知或中斷 |
void awaitUninterruptibly() |
當前線程進入等待狀態,對中斷不敏感 |
long awaitNanos(long nanosTimeout) | 當前線程進入等待狀態,直到被通知,中斷或者超時,返回值表示剩餘的時間,返回值若是是0或者負數,那麼能夠認定已經超時了 |
boolean awaitUntil(Date deadline) | 當前線程進入等待狀態,直到被通知、中斷或者到某個時間,若是沒有到指定時間,返回true,不然到了指定時間,返回false |
void signal() | 喚醒一個等待在condition中的線程,該線程從等待方法返回前必須獲取與Condition相關聯的鎖 |
void signlAll() | 喚醒全部等待的condition中的線程,可以從等待方法返回的線程必須得到與condition相關聯的鎖 |
有界隊列BoundedQueue解釋Condition
public class BoundedQueue<T> { private Object[] items; private int addIndex,removeIndex,count; private Lock lock = new ReentrantLock(); private Condition notEmpty = lock.newCondition(); private Condition notFull = lock.newCondition(); public BoundedQueue(int size){ items = new Object[size]; } public void add(T t) throws InterruptedException { lock.lock(); try { while(count == items.length) notFull.await(); items[addIndex] = t; if(++addIndex == items.length) addIndex = 0; ++count; notEmpty.signal(); }finally { lock.unlock(); } } public T remove() throws InterruptedException { lock.lock(); try { while(count == 0) notEmpty.await(); Object x = items[removeIndex]; if(++removeIndex == items.length) removeIndex = 0; --count; notFull.signal(); return (T) x; }finally { lock.unlock(); } } }