在使用多線程併發編程的時,常常會遇到對共享變量修改操做。此時咱們能夠選擇ConcurrentHashMap,ConcurrentLinkedQueue來進行安全地存儲數據。但若是單單是涉及狀態的修改,線程執行順序問題,使用Atomic開頭的原子組件或者ReentrantLock、CyclicBarrier之類的同步組件,會是更好的選擇,下面將一一介紹它們的原理和用法java
AtomicBoolean //布爾類型 AtomicInteger //正整型數類型 AtomicLong //長整型類型 複製代碼
public static void main(String[] args) throws Exception { AtomicBoolean atomicBoolean = new AtomicBoolean(false); //異步線程修改atomicBoolean CompletableFuture<Void> future = CompletableFuture.runAsync(() ->{ try { Thread.sleep(1000); //保證異步線程是在主線程以後修改atomicBoolean爲false atomicBoolean.set(false); }catch (Exception e){ throw new RuntimeException(e); } }); atomicBoolean.set(true); future.join(); System.out.println("boolean value is:"+atomicBoolean.get()); } ---------------輸出結果------------------ boolean value is:false 複製代碼
AtomicReference //加時間戳版本的引用類原子類 AtomicStampedReference //至關於AtomicStampedReference,AtomicMarkableReference關心的是 //變量是否仍是原來變量,中間被修改過也無所謂 AtomicMarkableReference 複製代碼
volatile V value
,並藉助VarHandle(具體子類是FieldInstanceReadWrite)實現原子操做,MethodHandles會幫忙計算value在類的偏移位置,最後在VarHandle調用Unsafe.public final native boolean compareAndSetReference(Object o, long offset, Object expected, Object x)
方法原子修改對象的屬性public class AtomicReference<V> implements java.io.Serializable { private static final long serialVersionUID = -1848883965231344442L; private static final VarHandle VALUE; static { try { MethodHandles.Lookup l = MethodHandles.lookup(); VALUE = l.findVarHandle(AtomicReference.class, "value", Object.class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } } private volatile V value; .... 複製代碼
volatile Pair<V> pair;
,Pair是其內部類。AtomicStampedReference能夠用來解決ABA問題public class AtomicStampedReference<V> { private static class Pair<T> { final T reference; final int stamp; private Pair(T reference, int stamp) { this.reference = reference; this.stamp = stamp; } static <T> Pair<T> of(T reference, int stamp) { return new Pair<T>(reference, stamp); } } private volatile Pair<V> pair; 複製代碼
public class Main { public static void main(String[] args) throws Exception { Test old = new Test("hello"), newTest = new Test("world"); AtomicStampedReference<Test> reference = new AtomicStampedReference<>(old, 1); reference.compareAndSet(old, newTest,1,2); System.out.println("對象:"+reference.getReference().name+";版本號:"+reference.getStamp()); } } class Test{ Test(String name){ this.name = name; } public String name; } ---------------輸出結果------------------ 對象:world;版本號:2 複製代碼
AtomicIntegerArray //整型數組 AtomicLongArray //長整型數組 AtomicReferenceArray //引用類型數組 複製代碼
public native Object getReferenceVolatile(Object o, long offset)
方法來獲取實時的元素值//元素默認初始化爲0 AtomicIntegerArray array = new AtomicIntegerArray(2); // 下標爲0的元素,期待值是0,更新值是1 array.compareAndSet(0,0,1); System.out.println(array.get(0)); ---------------輸出結果------------------ 1 複製代碼
AtomicIntegerFieldUpdater AtomicLongFieldUpdater AtomicReferenceFieldUpdater 複製代碼
public class Main { public static void main(String[] args) { AtomicReferenceFieldUpdater<Test,String> fieldUpdater = AtomicReferenceFieldUpdater.newUpdater(Test.class,String.class,"name"); Test test = new Test("hello world"); fieldUpdater.compareAndSet(test,"hello world","siting"); System.out.println(fieldUpdater.get(test)); System.out.println(test.name); } } class Test{ Test(String name){ this.name = name; } public volatile String name; } ---------------輸出結果------------------ siting siting 複製代碼
Striped64 LongAccumulator LongAdder //accumulatorFunction:運算規則,identity:初始值 public LongAccumulator(LongBinaryOperator accumulatorFunction,long identity) 複製代碼
public static void main(String[] args) throws Exception { LongAccumulator accumulator = new LongAccumulator(Long::sum, 0); for(int i=0;i<100000;i++){ CompletableFuture.runAsync(() -> accumulator.accumulate(1)); } Thread.sleep(1000); //等待所有CompletableFuture線程執行完成,再獲取 System.out.println(accumulator.get()); } ---------------輸出結果------------------ 100000 複製代碼
ReentrantLock lock = new ReentrantLock(); if(lock.tryLock()){ //業務邏輯 lock.unlock(); } 複製代碼
public static void main(String[] args) throws Exception { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); if(lock.readLock().tryLock()){ //讀鎖 //業務邏輯 lock.readLock().unlock(); } if(lock.writeLock().tryLock()){ //寫鎖 //業務邏輯 lock.writeLock().unlock(); } } 複製代碼
public static void main(String[] args) throws Exception { Semaphore semaphore = new Semaphore(2); for (int i = 0; i < 3; i++) CompletableFuture.runAsync(() -> { try { System.out.println(Thread.currentThread().toString() + " start "); if(semaphore.tryAcquire(1)){ Thread.sleep(1000); semaphore.release(1); System.out.println(Thread.currentThread().toString() + " 無阻塞結束 "); }else { System.out.println(Thread.currentThread().toString() + " 被阻塞結束 "); } } catch (Exception e) { throw new RuntimeException(e); } }); //保證CompletableFuture 線程被執行,主線程再結束 Thread.sleep(2000); } ---------------輸出結果------------------ Thread[ForkJoinPool.commonPool-worker-19,5,main] start Thread[ForkJoinPool.commonPool-worker-5,5,main] start Thread[ForkJoinPool.commonPool-worker-23,5,main] start Thread[ForkJoinPool.commonPool-worker-23,5,main] 被阻塞結束 Thread[ForkJoinPool.commonPool-worker-5,5,main] 無阻塞結束 Thread[ForkJoinPool.commonPool-worker-19,5,main] 無阻塞結束 複製代碼
public static void main(String[] args) throws Exception { CountDownLatch count = new CountDownLatch(2); for (int i = 0; i < 2; i++) CompletableFuture.runAsync(() -> { try { Thread.sleep(1000); System.out.println(" CompletableFuture over "); count.countDown(); } catch (Exception e) { throw new RuntimeException(e); } }); //等待CompletableFuture線程的完成 count.await(); System.out.println(" main over "); } ---------------輸出結果------------------ CompletableFuture over CompletableFuture over main over 複製代碼
ReentrantLock lock
和Condition trip
屬性來實現同步public static void main(String[] args) throws Exception { CyclicBarrier barrier = new CyclicBarrier(2); CompletableFuture.runAsync(()->{ try { System.out.println("CompletableFuture run start-"+ Clock.systemUTC().millis()); barrier.await(); //須要等待main線程也執行到await狀態才能繼續執行 System.out.println("CompletableFuture run over-"+ Clock.systemUTC().millis()); }catch (Exception e){ throw new RuntimeException(e); } }); Thread.sleep(1000); //和CompletableFuture線程相互等待 barrier.await(); System.out.println("main run over!"); } ---------------輸出結果------------------ CompletableFuture run start-1609822588881 main run over! CompletableFuture run over-1609822589880 複製代碼
//獲取讀鎖,自旋獲取,返回一個戳值 public long readLock() //嘗試加讀鎖,不成功返回0 public long tryReadLock() //解鎖 public void unlockRead(long stamp) //獲取寫鎖,自旋獲取,返回一個戳值 public long writeLock() //嘗試加寫鎖,不成功返回0 public long tryWriteLock() //解鎖 public void unlockWrite(long stamp) //嘗試樂觀讀讀取一個時間戳,並配合validate方法校驗時間戳的有效性 public long tryOptimisticRead() //驗證stamp是否有效 public boolean validate(long stamp) 複製代碼
public static void main(String[] args) throws Exception { StampedLock stampedLock = new StampedLock(); long stamp = stampedLock.tryOptimisticRead(); //判斷版本號是否生效 if (!stampedLock.validate(stamp)) { //獲取讀鎖,會空轉 stamp = stampedLock.readLock(); long writeStamp = stampedLock.tryConvertToWriteLock(stamp); if (writeStamp != 0) { //成功轉爲寫鎖 //fixme 業務操做 stampedLock.unlockWrite(writeStamp); } else { stampedLock.unlockRead(stamp); //嘗試獲取寫讀 stamp = stampedLock.tryWriteLock(); if (stamp != 0) { //fixme 業務操做 stampedLock.unlockWrite(writeStamp); } } } } 複製代碼