java高併發之從零到放棄(三)

前言

今天講的多線程的同步控制
直接進入正題java

ReentrantLock重入鎖

重入鎖能夠徹底代替synchronized,它須要java.util.concurrent.locks.ReentrantLock類來實現
下面用一個簡單的例子來實現重入鎖:數據庫

public class ReentrantLockThread implements Runnable{
    public static ReentrantLock lock = new ReentrantLock();
    public static int i = 0;
    @Override
    public void run() {
        for (int j=0;j<10000;j++){
            lock.lock();
            try {
                i++;
            }finally {
                lock.unlock();
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        ReentrantLockThread thread = new ReentrantLockThread();
        Thread t1 = new Thread(thread);
        Thread t2 = new Thread(thread);
        t1.start();t2.start();
        t1.join();t2.join();
        System.out.println(i);
    }
}

以上代碼打印出來的i是20000,能夠說明ReentrantLock也實現了線程同步
它相比synchronized更加靈活,由於重入鎖實現了用戶本身加鎖.lock(),本身釋放鎖.unlock()(記得必定要釋放,否則其餘線程沒法進入)
固然重入鎖同一個對象能夠加兩個鎖,但也要記得釋放兩個鎖(多釋放了會拋出異常,少釋放了那其它線程就進不來)安全

重入鎖的中斷功能也是它的高級功能之一:
在run()代碼塊中寫入lock.lockInterruptibly()方法,當線程實例使用t1.interrupt()時,外部通知便會就會中斷t1線程
下面來一個簡單示例代碼Demo:數據結構

public class interruptTest {

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(){
            public void run(){
                while (true){
                    System.out.println("go");
                    if (Thread.currentThread().isInterrupted()){
                        break;
                    }
                }
            }
        };
        t1.start();
        Thread.sleep(10001);
        t1.interrupt();
    }
}

發現t1線程實現interrupt()方法時,線程實現代碼中的.isInterrupted()執行了,而且中斷了t1線程
中斷功能能夠很好的防止兩個線程間互相等待,出現死鎖的現象。多線程

除了.interrupt()通知,要避免死鎖的另外一種方法,就是限時等待:lock.tryLock()
咱們來看下代碼:併發

public class MyThread implements Runnable{
    public static ReentrantLock lock = new ReentrantLock();
    public static int i = 0;
    @Override
    public void run() {
        try {
            if (lock.tryLock(5, TimeUnit.SECONDS)){
                Thread.sleep(6000);
            }else {
                System.out.println("結束線程");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            if (lock.isHeldByCurrentThread()){
                lock.unlock();
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        MyThread thread = new MyThread();
        Thread t1 = new Thread(thread);
        Thread t2 = new Thread(thread);
        t1.start();
        t2.start();
    }
}

lock.tryLock(5, TimeUnit.SECONDS)解釋下:
若是超過5秒尚未獲得鎖,就返回false,執行else語句
若是成功得到鎖,就返回true,執行sleep語句
因此5秒後打印結束線程,結束的就是等待5秒後沒有拿到鎖的線程框架

固然也能夠不帶等待時間,直接if(lock.tryLock())ide

下面是對ReentrantLock的整理函數

lock():得到鎖,若是鎖被佔用,則等待
lockInterruptibly():得到鎖,但優先響應中斷
tryLock():得到鎖,若是成功返回true,若是失敗返回false
unlock():釋放鎖

Condition條件

Condition是和ReentrantLock關聯的
利用綁定的Condition咱們可讓線程在合適的時間等待,或者在某一特定時刻得到通知繼續執行
下面演示一段簡單的Condition代碼工具

public class MyThread implements Runnable{
    public static ReentrantLock lock = new ReentrantLock();
    public static Condition condition = lock.newCondition();
    @Override
    public void run() {
        try {
            lock.lock();
            Thread.sleep(1000);
            System.out.println("t1拿到鎖,接着進入等待,而且釋放鎖");
            condition.await();
            Thread.sleep(4000);
            System.out.println("t1又拿到鎖");
        } catch (InterruptedException e) {
        }finally {
            lock.unlock();
        }
    }
    public static void main(String[] args) throws InterruptedException {
        MyThread thread = new MyThread();
        Thread t1 = new Thread(thread);
        t1.start();
        Thread.sleep(5000);
        lock.lock();
        System.out.println("主線程佔用鎖");
        condition.signal();
        System.out.println("喚醒成功,主線程釋放鎖");
        lock.unlock();
    }
}

await():會讓當前線程進入等待而且釋放鎖
singal();會喚醒一個在等待中的線程,固然執行方法的線程必須釋放鎖,被喚醒的線程才能獲得鎖

Semaphore信號量

怎樣才能規定進入一段代碼的線程數
這裏咱們使用信號量,在外面定義Semaphore semp = new Semaphore(10);,這樣簡單的設定了5個線程
在run()方法中semp.acquire();表示得到了10箇中的其中一個許可證
當你的工做代碼完成時,依舊在run()方法的後面寫上semp.release();,許可證被釋放(跟鎖一個道理)

ReadWriteLock讀寫鎖

咱們知道讀不會響應數據,寫會影響數據
因此咱們真正操做的時候要求只讀的那些線程能夠一塊兒執行,不用同步操做
而與寫有關的線程所有要同步,以保護數據的安全
那麼咱們怎樣才能作到只讀線程不用同步呢
這裏須要用到讀寫鎖,下面演示一段讀寫鎖的簡單例子:

public class ReadWriteThread {
    private static Lock lock = new ReentrantLock();
    private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private static Lock readLock = readWriteLock.readLock();
    private static Lock writeLock = readWriteLock.writeLock();
    private int value;

    //讀
    public int handleRead(Lock lock) throws InterruptedException{
        try {
            lock.lock();
            Thread.sleep(5000); //模擬讀線程
            System.out.println("讀完成");
            return value;
        } finally {
            lock.unlock();
        }
    }
    //寫
    public void handleWrite(Lock lock,int index) throws InterruptedException {
        try {
            lock.lock();
            Thread.sleep(5000);
            value=index;
            System.out.println("寫完成");
        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        ReadWriteThread demo = new ReadWriteThread();
        Thread t1 = new Thread(){
            public void run(){
                try {
                    demo.handleRead(readLock);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        Thread t2 = new Thread(){
            public void run(){
                try {
                    demo.handleRead(readLock);
                    demo.handleWrite(writeLock,2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        t1.start();
        t2.start();
    }
}

執行結果能夠發現,對於只讀的方法,咱們給予readLock鎖,而寫的方法給予writeLock鎖
如此這般,只讀的方法能夠並行,而讀寫只能串行

CountDownLatch倒計時器

倒計時器用來控制線程等待,它可讓一個線程等待直到倒計時結束,再開始執行
仍是經過實例來讓你們知道什麼是倒計時器,而且它能有什麼做用
這個例子的需求是:要在主線程以前完成之個類線程才能繼續主線程:

public class CountDownLatchThread implements Runnable{
    private static CountDownLatch countDownLatch = new CountDownLatch(10);
    private static CountDownLatchThread countDownLatchThread = new CountDownLatchThread();

    @Override
    public void run() {
        try {
            Thread.sleep(1000);
            System.out.println("此線程工做完成");
            countDownLatch.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);    //建立一個固定大小爲10的線程池
        for (int i=0;i<10;i++){
            executorService.submit(countDownLatchThread);
        }
        countDownLatch.await();
        System.out.println("10個任務完成");
        executorService.shutdown();
    }
}

countDownLatch.await();:讓主線程進入等待,等待全部10個線程完成
countDownLatch.countDown();:說明這個線程已完成,並進入統計

線程池

線程池和數據庫鏈接池同樣,事先準備好線程,當程序須要線程時,調用線程池中空閒的線程,當工做線程工做完畢時,從新放回線程池
JDK提供了一套Executor框架,本質是線程池。
框架中的成員變量在java.util.concurrent包中,是JDK併發包的核心類
其中ThreadPoolExecutor實現了Executor接口,任何Runnable均可以被ThreadPoolExecutor線程池調度

Executor提供了各類類型的線程池,主要有如下工廠方法建立不一樣的線程池:
newFixedThreadPool()方法:返回一個固定線程數量的線程池,當一個新任務提交時,若是有空閒線程,當即執行,若是沒有空閒線程,新任務會被放在一個等待隊列中去等待空閒線程
newCachedThreadPool()方法:返回一個根據實際狀況調整線程數量的線程池,
newSingleThreadScheduledExecutor()方法:返回一個對象,可放線程數量爲1,可是這個線程池拓展了計劃任務功能,能夠規定執行時間、週期性等等

上面這些線程池的源碼其實都是用ThreadPoolExecutor實現:
咱們來看下ThreadPoolExecutor的構造函數:

public ThreadPoolExecutor(int corePoolSize,    //指定線程池中線程的數量
                          int maximumPoolSize,//指定線程池中最大線程數量        
                          long keepAliveTime,    //當線程池中線程數量超過corePoolSize時,多餘線程的存活時間
                          TimeUnit unit,    //keepAliveTime的單位
                          BlockingQueue<Runnable> workQueue,    //等待任務隊列,被提交都是還沒有執行的任務
                          ThreadFactory threadFactory,    //線程工廠,用於建立線程,通常默認
                          RejectedExecutionHandler handler    //拒絕策略,當任務太多時,如何拒絕任務
    }

下面我來說講BlockingQueue:
在ThreadPoolExecutor構造器中,有如下幾種BlockingQueue:
1.直接提交隊列:有SynchronousQueue對象提供,提交的任務若是沒有空閒線程嘗試新建線程,若是線程數量已達到最大,則執行拒絕策略
2.有界的任務隊列:使用ArrayBlockingQueue對象實現,構造器帶一個任務的容量參數,若等待隊列已滿,總線程不大於maximumPoolSize的前提下,建立新的線程執行任務,若大於,則執行拒絕策略
3.無界的任務隊列:使用LinkedBlockingQueue來實現,和有界相比,除非系統資源耗盡,不然無界的任務隊列不存在任務入隊失敗的狀況
4.有限任務隊列:經過PriorityBlockingQueue實現,能夠控制任務的執行前後順序

再來看newFixedThreadPool(),它使用了無界任務隊列,而且corePoolSize和maximumPoolSize同樣大,由於對於固定大小的線程池,不存在線程數量的動態變化,當任務提交很是頻繁時,可能會耗盡系統資源

而newCachedThreadPool()方法返回corePoolSize爲0,maximumPoolSize無限大的線程池,使用了SynchronousQueue隊列,當任務執行完畢後,因爲corePoolSize爲0,空閒線程會在指定時間(60s)回收

講完了BlockingQueue咱們來說下RejectedExecutionHandler拒絕策略
JDK內置了四種拒絕策略:
1.AbortPolicy:直接拋出異常,阻止系統正常工做
2.CallerRunsPolicy:只要線程池沒有關閉,該策略直接調用工做線程運行當前被丟棄的任務
3.DiscardOledestPolicy:丟棄最老的一個等待任務,也就是即將被執行的一個任務,並嘗試再次提交當前任務
4.DiscardPolicy:默默地丟棄沒法處理的任務,若是運行任務丟失,這是最好的一個策略

固然咱們也能夠本身寫拒絕策略
下面我來寫一個打印出被拒絕的策略(而不是選擇拋異常,由於拋異常咱們還要去捕捉異常,若是沒有捕捉到會致使系統奔潰)

public class ThreadPoolTest {
    public static class TestThread implements Runnable{
        @Override
        public void run() {
            System.out.println(System.currentTimeMillis()+"線程ID:"+Thread.currentThread().getId());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        TestThread testThread = new TestThread();
        ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.SECONDS,
                                                    new ArrayBlockingQueue<Runnable>(10), Executors.defaultThreadFactory(),
                                                    new RejectedExecutionHandler(){
                                                        @Override
                                                        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                                                            System.out.println("等待線程被拒絕");
                                                        }
                                                    });
        for (int i=0;i<Integer.MAX_VALUE;i++){
            es.submit(testThread);
            Thread.sleep(10);
        }
    }
}

來看下運行結果:

1511833277724線程ID:13
1511833277771線程ID:10
1511833277771線程ID:14
1511833277771線程ID:12
等待線程被拒絕
等待線程被拒絕
1511833277833線程ID:13
1511833277833線程ID:11

能夠發現,咱們自定義的線程池和拒絕策略完美的執行了

併發高效容器

下面我介紹給你們幾個很是好用的工具類(固然都是線程安全的)
1.ConcurrentHashMap:這是一個高效的hashMap
2.CopyOnwriteArrayList:在讀多寫少的場合這個list很是好用,遠勝與Vector
3.ConCurrentLinkedQueue:高效的併發隊列,使用鏈表實現,能夠看作是一個線程安全的LinkedList
4.BlockingQueue:這個接口上面說過,表示阻塞隊列,很是適合用於做爲數據共享的通道
5.ConcurrentSkipListMap:這是一個Map,使用跳錶的數據結構進行快速的查找

若是並不追求高效,咱們可使用Collections類幫助把線程不安全的容器轉換爲線程安全
如將HashMap轉換爲線程安全:

Map map = Collections.synchronizedMap(new HashMap<String,Object>());

固然可使用CAS操做來替代synchronized

今天就先到這裏,你們能夠看看這些內容的拓展
記得點關注看更新,謝謝閱讀

相關文章
相關標籤/搜索