避免java虛擬機指令重排序,保證共享數據修改同步,數據可見性。volatile相較於synchronized是一種比較輕量級地同步策略,但不具有互斥性,不能成爲synchronized的替代,不能保證原子性。java
package com.wang.test.juc.test_volatile; public class TestVolatile { public static void main(String[] args) { TestThread testThread = new TestThread(); new Thread(testThread).start(); while (true)//在線程運行後,讀取線程中的flag值 { if(testThread.isFlag()){ System.out.println(" get Flag success! "); break; } } } } class TestThread implements Runnable{ private boolean flag = false; public boolean isFlag() { return flag; } public void setFlag(boolean flag) { this.flag = flag; } public void run(){ try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } flag = true; System.out.println("flag:" + flag); } }
主線程中的讀取操做看似在線程執行後,但併發地執行,在線程更改數據以前,主線程已經讀取了數據,共享地數據flag不一樣步。算法
public class TestVolatile { public static void main(String[] args) { TestThread testThread = new TestThread(); new Thread(testThread).start(); while (true)//在線程運行後,讀取線程中的flag值 { if(testThread.isFlag()){ System.out.println(" get Flag success! "); break; } } } } class TestThread implements Runnable{ private volatile boolean flag = false; public boolean isFlag() { return flag; } public void setFlag(boolean flag) { this.flag = flag; } public void run(){ try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } flag = true; System.out.println("flag:" + flag); } }
在Flag屬性上添加volatile主方法便可讀取正確值。數據庫
package com.wang.test.juc.test_volatile; public class TestAtomic { public static void main(String[] args) { Atomic atomic = new Atomic(); Thread t1 = new Thread(atomic); Thread t2 = new Thread(atomic); t1.start(); t2.start(); } } class Atomic implements Runnable{ private int i =0; public void run() { while(true){ try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+": "+getCountI()); } } public int getCountI() { return i++; } }
因爲線程中操做i++不具有原子性,線程執行中,數據i的遞增會出現重複問題,即i的值不會正常遞增,會出現線程作出一樣操做的狀況。此時由於這個操做不是原子的,使用volitale修飾不能解決同步問題。
安全
Compare-And-Swap算法時硬件對於併發操做共享數據的支持,包含三個操做數,內存值V、預估值A、更新值B,只有 V == A 時,V = B執行,不然不進行任何操做。併發
import java.util.concurrent.atomic.AtomicInteger; public class TestAtomic { public static void main(String[] args) { Atomic atomic = new Atomic(); Thread t1 = new Thread(atomic); Thread t2 = new Thread(atomic); t1.start(); t2.start(); } } class Atomic implements Runnable{ private AtomicInteger i = new AtomicInteger(0); public void run() { while(true){ try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+": "+getCountI()); } } public int getCountI() { return i.addAndGet(1); } }
此時線程中i已是一個原子類型,那麼在對數據進行操做的時候是具有原子性的,全部線程在執行i自增時具備源自性,解決了併發問題。框架
HashTable是線程安全的,在訪問HashTable時會加上表鎖,將操做轉爲串行,不容許有空值,效率比較低。dom
CuncurrentHashMap是線程安全的HashMap,採用鎖分段機制,每一個數據段都是獨立的鎖,在訪問時,能夠並行執行,提升效率工具
ConcurrentSkipListMap:同步的TreeMapthis
CopyOnWriteArrayList:同步的ArrayList (讀取和遍歷大於更新)atom
Collections.synchronizedList(new ArrayList(String))
CountDownLatch爲同步輔助類,在完成一組正在其餘線程中執行的操做以前,容許一個或多個線程一直等待。
import java.util.concurrent.CountDownLatch; public class TestCountDownLatch { public static void main(String[] args) { final CountDownLatch countDownLatch = new CountDownLatch(2); //鎖值爲2 LatchLock latchLock = new LatchLock(countDownLatch); long start = System.currentTimeMillis(); Thread t1 = new Thread(latchLock); Thread t2 = new Thread(latchLock); t1.start(); t2.start(); try { countDownLatch.await();//鎖不爲0 主線程等待 } catch (InterruptedException e) { e.printStackTrace(); } long end = System.currentTimeMillis(); System.out.println("Time = [" + (end - start) + "mms"+"]"); } } class LatchLock implements Runnable{ private CountDownLatch countDownLatch; public LatchLock(CountDownLatch countDownLatch){ this.countDownLatch = countDownLatch; } public void run() { synchronized (this){ try { for(int i=0;i<1000;i++) { System.out.println(Thread.currentThread().getName()+": "+i); } }finally { countDownLatch.countDown();//線程執行一次鎖減一 } } } }
## Callable接口
此接口相較於Runnable接口,能夠返回值和拋異常。
public class TestCallable { public static void main(String[] args) { ThreadCallable testCallable = new ThreadCallable(); //執行Callable,須要使用FutureTask 實現類用於接收結果 FutureTask<Integer> futureTask = new FutureTask(testCallable); Thread thread = new Thread(futureTask); thread.start(); Integer result = 0; try { result = futureTask.get(); //此方法將在線程執行結束後纔會執行 } catch (Exception e) { e.printStackTrace(); } System.out.println("result = [" + result + "]"); } } class ThreadCallable implements Callable<Integer>{ public Integer call() throws Exception { int sum = 0; for (int i=0;i<100;i++) { sum+=i; System.out.println("i: "+i); } return sum; } }
在解決同步問題時,採用synchronized關鍵字給代碼塊加鎖或者給方法加鎖,關鍵字加鎖方式時隱式的,所的獲取和釋放由執行過程當中的線程自行完成,須要顯式地完成加鎖和鎖釋放時,可使用lock加鎖方式。
package com.wang.test.juc.cchm; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class TestLock { public static void main(String[] args) { TestThread testThread = new TestThread(); Thread t1 = new Thread(testThread); Thread t2 = new Thread(testThread); Thread t3 = new Thread(testThread); t1.start(); t2.start(); t3.start(); } } class TestThread implements Runnable{ private Lock lock = new ReentrantLock();//鎖 private int count = 1000; public void run() { while (true){ // 自旋等待! lock.lock(); try { Thread.sleep(1); if (count > 0) System.out.println(Thread.currentThread().getName()+" count :"+ --count); } catch (InterruptedException e) { e.printStackTrace(); }finally { // finally釋放鎖! lock.unlock(); } } } }
public class TestProductorAndConsumer { public static void main(String[] args) { Clerk clerk = new Clerk(); Productor productor = new Productor(clerk); Consumer consumer = new Consumer(clerk); new Thread(productor,"Producter").start(); new Thread(consumer,"Customer").start(); } } class Clerk{ private int pruduct = 0; public synchronized void income(){ if (pruduct >= 10){ System.out.println("Can not add more"); }else{ System.out.println(Thread.currentThread().getName()+": "+ ++pruduct); } } public synchronized void sale(){ if (pruduct <= 0) {System.out.println("Can not sale anything!");} else { System.out.println(Thread.currentThread().getName()+" :"+ --pruduct); } } } class Productor implements Runnable{ private Clerk clerk; public Productor(Clerk clerk){ this.clerk = clerk; } public void run(){ for (int i=0;i<20;i++){ clerk.income(); } } } class Consumer implements Runnable{ private Clerk clerk; public Consumer(Clerk clerk) { this.clerk = clerk; } public void run() { for (int i=0;i<20;i++){ clerk.sale(); } } }
生產者消費者都會一直進行,會出現沒有產品繼續消費和庫存已滿繼續生產,即沒有貨物依舊被屢次消費,沒法庫存仍舊屢次生產。
public class TestProductorAndConsumer { public static void main(String[] args) { Clerk clerk = new Clerk(); Productor productor = new Productor(clerk); Consumer consumer = new Consumer(clerk); new Thread(productor,"Producter").start(); new Thread(consumer,"Customer").start(); } } class Clerk{ private int pruduct = 0; public synchronized void income(){ if (pruduct >= 10){ System.out.println("Can not add more"); try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } }else{ System.out.println(Thread.currentThread().getName()+": "+ ++pruduct); this.notifyAll(); } } public synchronized void sale(){ if (pruduct <= 0) {System.out.println("Can not sale anything!"); try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } else { System.out.println(Thread.currentThread().getName()+" :"+ --pruduct); this.notifyAll(); } } } class Productor implements Runnable{ private Clerk clerk; public Productor(Clerk clerk){ this.clerk = clerk; } public void run(){ for (int i=0;i<20;i++){ clerk.income(); } } } class Consumer implements Runnable{ private Clerk clerk; public Consumer(Clerk clerk) { this.clerk = clerk; } public void run() { for (int i=0;i<20;i++){ clerk.sale(); } } }
等待喚醒,當發生滿貨或是銷空時,進行等待。以上代碼沒法結束,最後一次地等待,沒法被喚醒,由else引起,繼續增長生產者和消費者,將會出現虛假喚醒,必須讓它自旋等待。
package com.wang.test.juc.cchm; public class TestProductorAndConsumer { public static void main(String[] args) { Clerk clerk = new Clerk(); Productor productor = new Productor(clerk); Consumer consumer = new Consumer(clerk); new Thread(productor,"Producter").start(); new Thread(consumer,"Customer").start(); new Thread(productor,"Producter2").start(); new Thread(consumer,"Customer2").start(); } } class Clerk{ private int pruduct = 0; public synchronized void income(){ while (pruduct >= 10){//自旋 System.out.println("Can not add more"); try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName()+": "+ ++pruduct); this.notifyAll(); } public synchronized void sale(){ while (pruduct <= 0) {System.out.println("Can not sale anything!"); try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName()+" :"+ --pruduct); this.notifyAll(); } } class Productor implements Runnable{ private Clerk clerk; public Productor(Clerk clerk){ this.clerk = clerk; } public void run(){ for (int i=0;i<20;i++){ clerk.income(); } } } class Consumer implements Runnable{ private Clerk clerk; public Consumer(Clerk clerk) { this.clerk = clerk; } public void run() { for (int i=0;i<20;i++){ clerk.sale(); } } }
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class TestProductorAndConsumer { public static void main(String[] args) { Clerk clerk = new Clerk(); Productor productor = new Productor(clerk); Consumer consumer = new Consumer(clerk); new Thread(productor,"Producter").start(); new Thread(consumer,"Customer").start(); new Thread(productor,"Producter2").start(); new Thread(consumer,"Customer2").start(); } } class Clerk{ private int pruduct = 0; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); //!!!! public void income(){ lock.lock(); try { while (pruduct >= 10){ System.out.println("Can not add more"); try { condition.await(); //!!!!! } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName()+": "+ ++pruduct); condition.signalAll(); //!!!! }finally { lock.unlock(); } } public void sale(){ lock.lock(); try { while (pruduct <= 0) {System.out.println("Can not sale anything!"); try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName()+" :"+ --pruduct); condition.signalAll(); }finally { lock.unlock(); } } } class Productor implements Runnable{ private Clerk clerk; public Productor(Clerk clerk){ this.clerk = clerk; } public void run(){ for (int i=0;i<20;i++){ clerk.income(); } } } class Consumer implements Runnable{ private Clerk clerk; public Consumer(Clerk clerk) { this.clerk = clerk; } public void run() { for (int i=0;i<20;i++){ clerk.sale(); } } }
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class TestPrintInOrder { public static void main(String[] args) { final Alternate alternate = new Alternate(); new Thread(new Runnable() { public void run() { for (int i=0;i<20;i++){ alternate.PrintA(); } } }).start(); new Thread(new Runnable() { public void run() { for (int i=0;i<20;i++){ alternate.PrintB(); } } }).start(); new Thread(new Runnable() { public void run() { for (int i=0;i<20;i++){ alternate.PrintC(); } } }).start(); } } class Alternate{ private int mark = 1; private Lock lock = new ReentrantLock(); private Condition c1 =lock.newCondition(); private Condition c2 =lock.newCondition(); private Condition c3 =lock.newCondition(); public void PrintA(){ lock.lock(); try{ while (mark != 1){ c1.await(); } System.out.println(Thread.currentThread().getName()+": "+"A"); mark = 2; c2.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void PrintB(){ lock.lock(); try{ while (mark != 2){ c2.await(); } System.out.println(Thread.currentThread().getName()+": "+"B"); mark = 3; c3.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void PrintC(){ lock.lock(); try{ while (mark != 3){ c3.await(); } System.out.println(Thread.currentThread().getName()+": "+"C"); mark = 1; c1.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }
當數據在進行寫入時,讀取操做須要保持同步,即讀寫應當時互斥的,讀取鎖能夠共享,寫入鎖獨佔。
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class TestReadWriteLock { public static void main(String[] args) { final TestLockRW testLockRW = new TestLockRW(); new Thread(new Runnable() { public void run() { testLockRW.write((int)(Math.random()*1000)); } }).start(); for (int i=0;i<20;i++){ new Thread(new Runnable() { public void run() { testLockRW.read(); } }).start(); } } } class TestLockRW{ private int i = 0; private ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); public void read(){ readWriteLock.readLock().lock(); try { System.out.println(Thread.currentThread().getName()+": "+i); }finally { readWriteLock.readLock().unlock(); } } public void write(int random){ readWriteLock.writeLock().lock(); try { System.out.println(Thread.currentThread().getName()+": write "+random); i = random; }finally { readWriteLock.writeLock().unlock(); } } }
public class TestThread8Monitor { public static void main(String[] args) { Number number = new Number(); Number number2 = new Number(); new Thread(new Runnable() { public void run() { number.getOne(); } }).start(); new Thread(new Runnable() { public void run() { number2.getTwo(); } }).start(); // new Thread(new Runnable() { // public void run() { // number.getThree(); // } // }).start(); } } class Number{ public static synchronized void getOne(){ try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("One"); } public static synchronized void getTwo(){ System.out.println("Two"); } public void getThree(){ System.out.println("Three"); } }
類比數據庫鏈接池,建立線程和銷燬線程比較浪費資源,創建一個線程池,線程池提供一個線程隊列,隊列中保存着全部等待狀態的線程,在須要使用線程時直接在線程池中獲取,使用完畢後,歸還給線程池,提升相應速度。
java.util.concurrent.Executor |--ExecutorService 線程池主要接口 |--ThreadPoolExecutor 線程池實現類 |--ScheduleExecutorService 線程調度接口 |--ScheduledThreadPoolExecutor 繼承線程池實現調度接口 使用方法: 工具類:Executors Executors.ewCachedThreadPool() 數量不固定,動態更改數量 Executors.newFixedThreadPool(int) 固定容量 Executors.newSingleThreadExecutor() 單個線程線程池 返回值類型爲ExecurotService ScheduledThreadPoolExecutor 線程調度 靜態方法。
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class TestThreadPool { public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); ThreadPoolimp threadPoolimp = new ThreadPoolimp(); for ( int i = 0;i<1000;i++){ executorService.submit(threadPoolimp); //支持多種線程初始化參數 Runnable、Callable... } executorService.shutdown(); //new Thread(new ThreadPoolimp()).start(); } } class ThreadPoolimp implements Runnable{ private int i = 0; public void run() { while(true){ System.out.println(Thread.currentThread().getName()+" :"+ ++i); } } }
public class TestScheduledThreadPool { public static void main(String[] args) throws ExecutionException, InterruptedException { ScheduledExecutorService pool = Executors.newScheduledThreadPool(5); Future<Integer> future = pool.schedule(new Callable<Integer>() { public Integer call() throws Exception { int i =1; System.out.println(Thread.currentThread().getName()); return i; } },5, TimeUnit.SECONDS);//延遲時間 時間單位 System.out.println(future.get()); pool.shutdown(); } }
在必要的狀況下,將一個大人物,進行拆分,拆分紅若干的小人物,再將一個個小任務的運算結果進行彙總。
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; public class TestForkJoinPool { public static void main(String[] args) { ForkJoinPool pool = new ForkJoinPool(); ForkSun forkSun = new ForkSun(0L, 1000000000L); Long sum = pool.invoke(forkSun); System.out.println("sum = [" + sum + "]"); } } class ForkSun extends RecursiveTask<Long>{ private static final long serialVersionUID = 7430797084800536110L; private long start; private long end; private static final long max = 10000l; public ForkSun(long start, long end) { this.start = start; this.end = end; } protected Long compute() { long len = end - start; if(len <= max){ long sum = 0L; for(long i = start;i<=end;i++) { sum+=i; } return sum; }else { long middle = (start + end)/2; ForkSun left = new ForkSun(start,middle); left.fork(); ForkSun right = new ForkSun(middle+1,end); right.fork(); return left.join() + right.join(); } } }