JAVA 併發包

Java.Utril.Concurrent

Volatile關鍵字

避免java虛擬機指令重排序,保證共享數據修改同步,數據可見性。volatile相較於synchronized是一種比較輕量級地同步策略,但不具有互斥性,不能成爲synchronized的替代,不能保證原子性。java

示例

package com.wang.test.juc.test_volatile;

public class TestVolatile {
    public static void main(String[] args) {
        TestThread testThread = new TestThread();

        new Thread(testThread).start();

        while (true)//在線程運行後,讀取線程中的flag值
        {
            if(testThread.isFlag()){
                System.out.println(" get Flag success! ");
                break;
            }
        }
    }
}

class TestThread implements Runnable{

    private boolean flag = false;

    public boolean isFlag() {
        return flag;
    }

    public void setFlag(boolean flag) {
        this.flag = flag;
    }

    public void run(){

        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        flag = true;
        System.out.println("flag:" + flag);

    }

}

分析

主線程中的讀取操做看似在線程執行後,但併發地執行,在線程更改數據以前,主線程已經讀取了數據,共享地數據flag不一樣步。算法

修改

public class TestVolatile {
    public static void main(String[] args) {
        TestThread testThread = new TestThread();

        new Thread(testThread).start();

        while (true)//在線程運行後,讀取線程中的flag值
        {
            if(testThread.isFlag()){
                System.out.println(" get Flag success! ");
                break;
            }
        }
    }
}

class TestThread implements Runnable{

    private volatile boolean flag = false;

    public boolean isFlag() {
        return flag;
    }

    public void setFlag(boolean flag) {
        this.flag = flag;
    }

    public void run(){
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        flag = true;
        System.out.println("flag:" + flag);
    }

}

在Flag屬性上添加volatile主方法便可讀取正確值。數據庫


原子性

示例

package com.wang.test.juc.test_volatile;

public class TestAtomic {
    public static void main(String[] args) {
        Atomic atomic = new Atomic();
        Thread t1 = new Thread(atomic);
        Thread t2 = new Thread(atomic);
        t1.start();
        t2.start();
    }

}

class Atomic implements Runnable{

    private int i =0;

    public void run() {
     while(true){
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
System.out.println(Thread.currentThread().getName()+": "+getCountI());
        }
    }
  
    public int getCountI()
    {
        return i++;
    }
}

分析

因爲線程中操做i++不具有原子性,線程執行中,數據i的遞增會出現重複問題,即i的值不會正常遞增,會出現線程作出一樣操做的狀況。此時由於這個操做不是原子的,使用volitale修飾不能解決同步問題。
安全

原子類

CAS算法

Compare-And-Swap算法時硬件對於併發操做共享數據的支持,包含三個操做數,內存值V、預估值A、更新值B,只有 V == A 時,V = B執行,不然不進行任何操做。併發

修改程序

import java.util.concurrent.atomic.AtomicInteger;

public class TestAtomic {
    public static void main(String[] args) {
        Atomic atomic = new Atomic();
        Thread t1 = new Thread(atomic);
        Thread t2 = new Thread(atomic);
        t1.start();
        t2.start();
    }

}

class Atomic implements Runnable{


    private AtomicInteger i = new AtomicInteger(0);

    public void run() {
        while(true){
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName()+": "+getCountI());
    }
    }
    public int getCountI()
    {
        return i.addAndGet(1);
    }
}

此時線程中i已是一個原子類型,那麼在對數據進行操做的時候是具有原子性的,全部線程在執行i自增時具備源自性,解決了併發問題。框架


ConcurrentHashMap

HashTable是線程安全的,在訪問HashTable時會加上表鎖,將操做轉爲串行,不容許有空值,效率比較低。dom

CuncurrentHashMap是線程安全的HashMap,採用鎖分段機制,每一個數據段都是獨立的鎖,在訪問時,能夠並行執行,提升效率工具

img

其餘

ConcurrentSkipListMap:同步的TreeMapthis

CopyOnWriteArrayList:同步的ArrayList (讀取和遍歷大於更新)atom

Collections.synchronizedList(new ArrayList(String))

閉鎖

CountDownLatch爲同步輔助類,在完成一組正在其餘線程中執行的操做以前,容許一個或多個線程一直等待。

示例

import java.util.concurrent.CountDownLatch;

public class TestCountDownLatch {
    public static void main(String[] args) {

        final CountDownLatch countDownLatch = new CountDownLatch(2);
        //鎖值爲2
      
        LatchLock latchLock = new LatchLock(countDownLatch);

        long start = System.currentTimeMillis();

        Thread t1 = new Thread(latchLock);
        Thread t2 = new Thread(latchLock);
        t1.start();
        t2.start();

        try {
            countDownLatch.await();//鎖不爲0 主線程等待
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        long end = System.currentTimeMillis();

        System.out.println("Time = [" + (end - start) + "mms"+"]");
    }
}

class LatchLock implements Runnable{

    private CountDownLatch countDownLatch;
    public LatchLock(CountDownLatch countDownLatch){
        this.countDownLatch = countDownLatch;
    }
    public void run() {
       synchronized (this){
        try {
            for(int i=0;i<1000;i++)
            {
                System.out.println(Thread.currentThread().getName()+": "+i);
            }
        }finally {
            countDownLatch.countDown();//線程執行一次鎖減一
        }
       }
    }
}

## Callable接口

此接口相較於Runnable接口,能夠返回值和拋異常。

示例

public class TestCallable {

    public static void main(String[] args) {
        ThreadCallable testCallable = new ThreadCallable();

        //執行Callable,須要使用FutureTask 實現類用於接收結果
        FutureTask<Integer> futureTask = new FutureTask(testCallable);

        Thread thread = new Thread(futureTask);
        thread.start();
      
        Integer result = 0;
        try {
            result = futureTask.get();
            //此方法將在線程執行結束後纔會執行
        } catch (Exception e) {
            e.printStackTrace();
        }
      
        System.out.println("result = [" + result + "]");

    }

}

class ThreadCallable implements Callable<Integer>{
    public Integer call() throws Exception {
        int sum = 0;

        for (int i=0;i<100;i++) {
            sum+=i;
            System.out.println("i: "+i);
        }
        return sum;
    }
}

Lock鎖

在解決同步問題時,採用synchronized關鍵字給代碼塊加鎖或者給方法加鎖,關鍵字加鎖方式時隱式的,所的獲取和釋放由執行過程當中的線程自行完成,須要顯式地完成加鎖和鎖釋放時,可使用lock加鎖方式。

示例

package com.wang.test.juc.cchm;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class TestLock {
    public static void main(String[] args) {

        TestThread testThread = new TestThread();

        Thread t1 = new Thread(testThread);
        Thread t2 = new Thread(testThread);
        Thread t3 = new Thread(testThread);

        t1.start();
        t2.start();
        t3.start();
    }
}

class TestThread implements Runnable{

    private Lock lock = new ReentrantLock();//鎖

    private int count = 1000;

    public void run() {
        while (true){   //  自旋等待!
            lock.lock();
            try {
                Thread.sleep(1);
                if (count > 0)
        System.out.println(Thread.currentThread().getName()+" count :"+ --count);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {  //  finally釋放鎖!
                lock.unlock();
            }

        }
    }
}

等待喚醒

示例-生產者消費者

public class TestProductorAndConsumer {
    public static void main(String[] args) {

        Clerk clerk = new Clerk();
        Productor productor = new Productor(clerk);
        Consumer consumer = new Consumer(clerk);

        new Thread(productor,"Producter").start();
        new Thread(consumer,"Customer").start();

    }

}

class Clerk{

    private int pruduct = 0;

    public synchronized void income(){

        if (pruduct >= 10){
            System.out.println("Can not add more");
        }else{
            System.out.println(Thread.currentThread().getName()+": "+ ++pruduct);
        }
    }

    public synchronized void sale(){

        if (pruduct <= 0) {System.out.println("Can not sale anything!");}
        else {
            System.out.println(Thread.currentThread().getName()+" :"+ --pruduct);
        }
    }
}

class Productor implements Runnable{

    private Clerk clerk;

    public Productor(Clerk clerk){
        this.clerk = clerk;
    }

    public void run(){
        for (int i=0;i<20;i++){
            clerk.income();
        }
    }
}

class Consumer implements Runnable{
    private Clerk clerk;

    public Consumer(Clerk clerk) {
        this.clerk = clerk;
    }

    public void run() {
        for (int i=0;i<20;i++){
            clerk.sale();
        }
    }
}

分析

生產者消費者都會一直進行,會出現沒有產品繼續消費和庫存已滿繼續生產,即沒有貨物依舊被屢次消費,沒法庫存仍舊屢次生產。

改進

public class TestProductorAndConsumer {
    public static void main(String[] args) {

        Clerk clerk = new Clerk();
        Productor productor = new Productor(clerk);
        Consumer consumer = new Consumer(clerk);

        new Thread(productor,"Producter").start();
        new Thread(consumer,"Customer").start();

    }

}

class Clerk{

    private int pruduct = 0;

    public synchronized void income(){

        if (pruduct >= 10){
            System.out.println("Can not add more");
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }else{
            System.out.println(Thread.currentThread().getName()+": "+ ++pruduct);
            this.notifyAll();
        }
    }

    public synchronized void sale(){

        if (pruduct <= 0) {System.out.println("Can not sale anything!");
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        else {
            System.out.println(Thread.currentThread().getName()+" :"+ --pruduct);
            this.notifyAll();
        }
    }
}

class Productor implements Runnable{

    private Clerk clerk;

    public Productor(Clerk clerk){
        this.clerk = clerk;
    }

    public void run(){
        for (int i=0;i<20;i++){
            clerk.income();
        }
    }
}

class Consumer implements Runnable{
    private Clerk clerk;

    public Consumer(Clerk clerk) {
        this.clerk = clerk;
    }

    public void run() {
        for (int i=0;i<20;i++){
            clerk.sale();
        }
    }
}

等待喚醒,當發生滿貨或是銷空時,進行等待。以上代碼沒法結束,最後一次地等待,沒法被喚醒,由else引起,繼續增長生產者和消費者,將會出現虛假喚醒,必須讓它自旋等待。

改進 2.0

package com.wang.test.juc.cchm;

public class TestProductorAndConsumer {
    public static void main(String[] args) {

        Clerk clerk = new Clerk();
        Productor productor = new Productor(clerk);
        Consumer consumer = new Consumer(clerk);

        new Thread(productor,"Producter").start();
        new Thread(consumer,"Customer").start();
        new Thread(productor,"Producter2").start();
        new Thread(consumer,"Customer2").start();

    }

}

class Clerk{

    private int pruduct = 0;

    public synchronized void income(){

        while (pruduct >= 10){//自旋
            System.out.println("Can not add more");
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
            System.out.println(Thread.currentThread().getName()+": "+ ++pruduct);
            this.notifyAll();

    }

    public synchronized void sale(){

        while (pruduct <= 0) {System.out.println("Can not sale anything!");
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

            System.out.println(Thread.currentThread().getName()+" :"+ --pruduct);
            this.notifyAll();

    }
}

class Productor implements Runnable{

    private Clerk clerk;

    public Productor(Clerk clerk){
        this.clerk = clerk;
    }

    public void run(){
        for (int i=0;i<20;i++){
            clerk.income();
        }
    }
}

class Consumer implements Runnable{
    private Clerk clerk;

    public Consumer(Clerk clerk) {
        this.clerk = clerk;
    }

    public void run() {
        for (int i=0;i<20;i++){
            clerk.sale();
        }
    }
}

同步鎖

生產消費模型

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class TestProductorAndConsumer {
    public static void main(String[] args) {

        Clerk clerk = new Clerk();
        Productor productor = new Productor(clerk);
        Consumer consumer = new Consumer(clerk);

        new Thread(productor,"Producter").start();
        new Thread(consumer,"Customer").start();
        new Thread(productor,"Producter2").start();
        new Thread(consumer,"Customer2").start();

    }

}

class Clerk{

    private int pruduct = 0;

    private Lock lock = new ReentrantLock();

    private Condition condition = lock.newCondition();
    //!!!!

    public  void income(){

        lock.lock();
        try {
                    while (pruduct >= 10){
                        System.out.println("Can not add more");
                        try {
                            condition.await();
                            //!!!!!
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
    System.out.println(Thread.currentThread().getName()+": "+ ++pruduct);
                    condition.signalAll();
                            //!!!!

        }finally {
            lock.unlock();
        }
    }

    public  void sale(){

        lock.lock();
        try {
                while (pruduct <= 0) {System.out.println("Can not sale anything!");
                    try {
                        condition.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

                System.out.println(Thread.currentThread().getName()+" :"+ --pruduct);
                condition.signalAll();
        }finally {
            lock.unlock();
        }
    }
}

class Productor implements Runnable{

    private Clerk clerk;

    public Productor(Clerk clerk){
        this.clerk = clerk;
    }

    public void run(){
        for (int i=0;i<20;i++){
            clerk.income();
        }
    }
}

class Consumer implements Runnable{
    private Clerk clerk;

    public Consumer(Clerk clerk) {
        this.clerk = clerk;
    }

    public void run() {
        for (int i=0;i<20;i++){
            clerk.sale();
        }
    }
}

示例交替打印

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class TestPrintInOrder {
    public static void main(String[] args) {
        final Alternate alternate = new Alternate();
        new Thread(new Runnable() {
            public void run() {

                for (int i=0;i<20;i++){
                    alternate.PrintA();
                }
            }
        }).start();
        new Thread(new Runnable() {
            public void run() {

                for (int i=0;i<20;i++){
                    alternate.PrintB();
                }
            }
        }).start();
        new Thread(new Runnable() {
            public void run() {

                for (int i=0;i<20;i++){
                    alternate.PrintC();
                }
            }
        }).start();
    }
}

class Alternate{
    private int mark = 1;

    private Lock lock = new ReentrantLock();
    private Condition c1 =lock.newCondition();
    private Condition c2 =lock.newCondition();
    private Condition c3 =lock.newCondition();

    public void PrintA(){
        lock.lock();
        try{
            while (mark != 1){
                c1.await();
            }

            System.out.println(Thread.currentThread().getName()+": "+"A");

            mark = 2;
            c2.signal();
    } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {

            lock.unlock();
        }
    }
  
    public void PrintB(){
        lock.lock();
        try{
            while (mark != 2){
                c2.await();
            }

            System.out.println(Thread.currentThread().getName()+": "+"B");

            mark = 3;
            c3.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {

            lock.unlock();
        }
    }    
  
  public void PrintC(){
        lock.lock();
        try{
            while (mark != 3){
                c3.await();
            }

            System.out.println(Thread.currentThread().getName()+": "+"C");

            mark = 1;
            c1.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {

            lock.unlock();
        }
    }
}

讀寫鎖

當數據在進行寫入時,讀取操做須要保持同步,即讀寫應當時互斥的,讀取鎖能夠共享,寫入鎖獨佔。

示例

import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class TestReadWriteLock {
    public static void main(String[] args) {
        final TestLockRW testLockRW = new TestLockRW();

        new Thread(new Runnable() {
            public void run() {
               testLockRW.write((int)(Math.random()*1000));
            }
        }).start();

        for (int i=0;i<20;i++){
            new Thread(new Runnable() {
                public void run() {
                    testLockRW.read();
                }
            }).start();
        }
    }
}

class TestLockRW{

    private int i = 0;

    private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

    public void read(){

        readWriteLock.readLock().lock();
        try {
            System.out.println(Thread.currentThread().getName()+": "+i);
        }finally {
            readWriteLock.readLock().unlock();
        }

    }

    public void write(int random){

        readWriteLock.writeLock().lock();
        try {
            System.out.println(Thread.currentThread().getName()+": write "+random);
            i = random;
        }finally {
            readWriteLock.writeLock().unlock();
        }
    }
}

線程八鎖

public class TestThread8Monitor {
    public static void main(String[] args) {
        Number number = new Number();
        Number number2 = new Number();

        new Thread(new Runnable() {
            public void run() {
                number.getOne();
            }
        }).start();
        new Thread(new Runnable() {
            public void run() {
                number2.getTwo();
            }
        }).start();

//        new Thread(new Runnable() {
//            public void run() {
//                number.getThree();
//            }
//        }).start();
    }

}

class Number{
    public static synchronized void getOne(){
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("One");
    }
    public static synchronized void getTwo(){
        System.out.println("Two");
    }

    public void getThree(){
        System.out.println("Three");
    }
}

  1. 非靜態方法的鎖默認爲this
  2. 靜態方法的鎖爲Class實例
  3. 在某時刻內,只有一個線程拿到鎖。

線程池

類比數據庫鏈接池,建立線程和銷燬線程比較浪費資源,創建一個線程池,線程池提供一個線程隊列,隊列中保存着全部等待狀態的線程,在須要使用線程時直接在線程池中獲取,使用完畢後,歸還給線程池,提升相應速度。

體系結構

java.util.concurrent.Executor
    |--ExecutorService                      線程池主要接口
        |--ThreadPoolExecutor                           線程池實現類
        |--ScheduleExecutorService              線程調度接口
            |--ScheduledThreadPoolExecutor      繼承線程池實現調度接口
            
使用方法:
        工具類:Executors
        Executors.ewCachedThreadPool()              數量不固定,動態更改數量
        Executors.newFixedThreadPool(int)           固定容量
    Executors.newSingleThreadExecutor()     單個線程線程池
    返回值類型爲ExecurotService
    
    ScheduledThreadPoolExecutor                     線程調度
    
  
  靜態方法。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestThreadPool {

    public static void main(String[] args) {

        ExecutorService executorService = Executors.newCachedThreadPool();

        ThreadPoolimp threadPoolimp = new ThreadPoolimp();


        for ( int i = 0;i<1000;i++){
          
            executorService.submit(threadPoolimp);
            //支持多種線程初始化參數 Runnable、Callable...
        }
      
        executorService.shutdown();

        //new Thread(new ThreadPoolimp()).start();
    }

}
class ThreadPoolimp implements Runnable{
    private int i = 0;

    public void run() {
        while(true){
            System.out.println(Thread.currentThread().getName()+" :"+ ++i);
        }
    }
}
public class TestScheduledThreadPool {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ScheduledExecutorService pool = Executors.newScheduledThreadPool(5);

        Future<Integer> future = pool.schedule(new Callable<Integer>() {
            public Integer call() throws Exception {
                int i =1;
                System.out.println(Thread.currentThread().getName());
                return i;
            }
        },5, TimeUnit.SECONDS);//延遲時間  時間單位

        System.out.println(future.get());
        pool.shutdown();
    }
}

分支合併框架

在必要的狀況下,將一個大人物,進行拆分,拆分紅若干的小人物,再將一個個小任務的運算結果進行彙總。

img

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class TestForkJoinPool {
    public static void main(String[] args) {

        ForkJoinPool pool = new ForkJoinPool();
        ForkSun forkSun = new ForkSun(0L, 1000000000L);
        Long sum = pool.invoke(forkSun);
        System.out.println("sum = [" + sum + "]");

    }
}
class ForkSun extends RecursiveTask<Long>{

    private static final long serialVersionUID = 7430797084800536110L;
    private long start;
    private long end;

    private static final long max = 10000l;

    public ForkSun(long start, long end) {
        this.start = start;
        this.end = end;
    }

    protected Long compute() {
        long len = end - start;
        if(len <= max){
            long sum = 0L;
            for(long i = start;i<=end;i++)
            {
                sum+=i;
            }
            return sum;
        }else {
            long middle = (start + end)/2;
            ForkSun left = new ForkSun(start,middle);
            left.fork();
            ForkSun right = new ForkSun(middle+1,end);
            right.fork();

            return left.join() + right.join();
        }

    }
}
相關文章
相關標籤/搜索