【java併發核心一】Semaphore 的使用思路

最近在看一本書《Java併發編程 核心方法與框架》,打算一邊學習一邊把學習的經驗記下來,所粘貼的代碼都是我運行過的,你們一塊兒學習,歡迎吐槽。java

估計也沒多少人看個人博客,哈哈,那麼我仍是會記下來,天空未曾留下個人痕跡,但我已飛過,而在博客園留下了個人痕跡~編程

 一、Semaphore的初步使用多線程

  Semaphore是什麼,能作什麼?併發

    Semaphore 是 synchronized 的增強版,做用是控制線程的併發數量。就這一點而言,單純的synchronized 關鍵字是實現不了的。框架

  直接看例子吧,這個例子包含3個類,一個是線程類,一個是 Semaphore 關鍵代碼類,一個類是主main方法類:jvm

package com.cd.concurrent.semaphore;

public class MyThread extends Thread {
    private SemaphoreService service;

    public MyThread(String name, SemaphoreService service) {
        super();
        this.setName(name);
        this.service = service;
    }

    @Override
    public void run() {
        this.service.doSomething();
    }
}
package com.cd.concurrent.semaphore;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Semaphore;

public class SemaphoreService {

    private static SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

    private Semaphore semaphore = new Semaphore(1);// 同步關鍵類,構造方法傳入的數字是多少,則同一個時刻,只運行多少個進程同時運行制定代碼

    public void doSomething() {
        try {
            /**
             * 在 semaphore.acquire() 和 semaphore.release()之間的代碼,同一時刻只容許制定個數的線程進入,
             * 由於semaphore的構造方法是1,則同一時刻只容許一個線程進入,其餘線程只能等待。
             * */
            semaphore.acquire();
            System.out.println(Thread.currentThread().getName() + ":doSomething start-" + getFormatTimeStr());
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getName() + ":doSomething end-" + getFormatTimeStr());
            semaphore.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static String getFormatTimeStr() {
        return sf.format(new Date());
    }
}
package com.cd.concurrent.semaphore;

public class SemaphoreTest {
    public static void main(String args[]) {
        SemaphoreService service = new SemaphoreService();
        for (int i = 0; i < 10; i++) {
            MyThread t = new MyThread("thread" + (i + 1), service);
            t.start();// 這裏使用 t.run() 也能夠運行,可是不是併發執行了 
        }
    }
}

運行結果:ide

 

 

 實踐證實,確實是同一個時刻只有一個線程能訪問,那若是把 Semaphore 的構造方法入參改爲 2 呢,修改 SemaphoreService.java 文件:工具

package com.cd.concurrent.semaphore;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Semaphore;

public class SemaphoreService {

    private static SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

    private Semaphore semaphore = new Semaphore(2);// 同步關鍵類,構造方法傳入的數字是多少,則同一個時刻,只運行多少個進程同時運行制定代碼

    public void doSomething() {
        try {
            /**
             * 在 semaphore.acquire() 和 semaphore.release()之間的代碼,同一時刻只容許制定個數的線程進入,
             * 由於semaphore的構造方法是2,則同一時刻只容許2個線程進入,其餘線程等待。
             * */
            semaphore.acquire();
            System.out.println(Thread.currentThread().getName() + ":doSomething start-" + getFormatTimeStr());
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getName() + ":doSomething end-" + getFormatTimeStr());
            semaphore.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static String getFormatTimeStr() {
        return sf.format(new Date());
    }
}

運行SemaphoreTest,結果以下:性能

 

 

 

驗證OK學習

 二、方法 acquire( int permits ) 參數做用,及動態添加 permits 許可數量  

  acquire( int permits ) 中的參數是什麼意思呢?能夠這麼理解, new Semaphore(6) 表示初始化了 6個通路, semaphore.acquire(2) 表示每次線程進入將會佔用2個通路,semaphore.release(2) 運行時表示歸還2個通路。沒有通路,則線程就沒法進入代碼塊。

  而上面的代碼中,semaphore.acquire() +  semaphore.release()  在運行的時候,其實和 semaphore.acquire(1) + semaphore.release(1)  效果是同樣的。  

  上代碼:

  仍是3個代碼,線程類沒有變,用的是上面的線程類,從新寫了另外兩個類:

package com.cd.concurrent.semaphore;

import java.util.concurrent.Semaphore;

public class SemaphoreService2 extends SemaphoreService { // 之因此繼承 SemaphoreService,僅僅是爲了使用父類的打印時間的方法 0.0

    private Semaphore semaphore = new Semaphore(6);// 6表示總共有6個通路

    public void doSomething() {
        try {
            semaphore.acquire(2); // 2 表示進入此代碼,就會消耗2個通路,2個通路從6箇中扣除
            System.out.println(Thread.currentThread().getName() + ":doSomething start-" + getFormatTimeStr());
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getName() + ":doSomething end-" + getFormatTimeStr());
            semaphore.release(2); // 釋放佔用的 2 個通路
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public int availablePermits() {    // 查看可用通路數
        return semaphore.availablePermits();
    }
}
package com.cd.concurrent.semaphore;

public class SemaphoreTest2 {
    public static void main(String args[]) {
        SemaphoreService2 service = new SemaphoreService2(); // 使用總 6 通路,每一個線程佔用2通路
        for (int i = 0; i < 10; i++) {
            MyThread t = new MyThread("thread" + (i + 1), service);
            t.start();// 這裏使用 t.run() 也能夠運行,可是不是併發執行了 
            System.out.println("可用通路數:" + service.availablePermits());
        }
    }
}

運行結果:

 

 

 

若是 acquire 的數量大於 release 的數量,則 通路早晚會被使用完,若是線程比較多,得不到後續運行,出現線程堆積內存,最終java進程崩掉;若是 acquire 的數量小於 release 的數量,就會出現併發執行的線程愈來愈多(換句話說,處理愈來愈快),最終也有可能出現問題。

  好比,象上面的代碼,SemaphoreService2.java 中 semaphore.release(2) 若是改爲 semaphore.release(1) 則 就會出現有5個線程得不到運行堆積的狀況,能夠算一下:6-2-2-2+1+1+1=3,運行完一個回合後,還剩3個通路,3-2+1,第二回合,還剩2個通路,2-2+1=1,第3個回合,還剩一個通路,不足以運行任何一個線程。

  把上面說的用代碼實現一下,修改 SemaphoreService2.java 以下: 

package com.cd.concurrent.semaphore;

import java.util.concurrent.Semaphore;

public class SemaphoreService2 extends SemaphoreService { // 之因此繼承 SemaphoreService,僅僅是爲了使用父類的打印時間的方法 0.0

    private Semaphore semaphore = new Semaphore(6);// 6表示總共有6個通路

    public void doSomething() {
        try {
            semaphore.acquire(2); // 2 表示進入此代碼,就會消耗2個通路,2個通路從6箇中扣除
            System.out.println(Thread.currentThread().getName() + ":doSomething start-" + getFormatTimeStr());
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getName() + ":doSomething end-" + getFormatTimeStr());
            semaphore.release(1); // 釋放佔用的 1 個通路
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public int availablePermits() {
        return semaphore.availablePermits();
    }
}

運行 SemaphoreTest2 結果:

 

 

 

三、acquire 的不可中斷實現

  仔細看一下上面的代碼,semaphore.acquire() 和 semaphore.acquire(int permits) 是會拋出異常 InterruptedException 的,若是在 acquire 和 release 之間的代碼是一個比較慢和複製的運算,如內存佔用過多,或者棧深度很深等,jvm會中斷這塊代碼。

  如何才能不讓 jvm 中斷 代碼執行呢?

  答案是:使用 acquireUninterruptibly() 替換acquire()、使用 acquireUninterruptibly(int permits) 替換 acquire(int permits) 。

  acquireUninterruptibly 不會拋出 InterruptedException ,一個代碼塊一時執行不完,還會繼續等待執行。

  我的以爲,不要隨便使用 acquireUninterruptibly ,由於 jvm 中斷執行,是自身的一種自我保護機制,保證 java 進程的正常,除了特殊狀況必須用 acquireUninterruptibly 外,都應該 使用 acquire ,同時,改進一下 SemaphoreService2 的 doSomething 方法,將 release 放到 finally 塊 中,以下。  

public void doSomething() {
        try {
            semaphore.acquire(2); // 2 表示進入此代碼,就會消耗2個通路,2個通路從6箇中扣除
            System.out.println(Thread.currentThread().getName() + ":doSomething start-" + getFormatTimeStr());
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getName() + ":doSomething end-" + getFormatTimeStr());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semaphore.release(2); // release 放到 finally 中
        }
    }

四、其餘一些常有工具方法

  availablePermits()  方法在前面用過,表示返回 Semaphore 對象中的當前可用許可數,此方法一般用於調試,由於許可數量(通路)多是實時在改變的。

  drainPermits() 方法可獲取並返回當即可用的全部許可(通路)個數,並將可用許可置爲0。

  getQueueLength() 獲取等待許可的線程個數。

  hasQueuedThreads() 判斷有沒有線程在等待這個許可。

  getQueueLength() 和 hasQueuedThreads() 都是在判斷當前有沒有等待許可的線程信息時使用。

  這裏就不寫代碼校驗了,大家能夠在 SemaphoreService 或者 SemaphoreService2 中加入這個信息試一下。

五、線程公平性

  上面用的 Semaphore  構造方法是 Semaphore semaphore = new Semaphore(int permits)

  其實,還有一個構造方法: Semaphore semaphore = new Semaphore(int permits , boolean isFair)

  isFair 的意思就是,是否公平,得到鎖的順序與線程啓動順序有關,就是公平,先啓動的線程,先得到鎖。isFair 不能100% 保證公平,只能是大機率公平。

  isFair 爲 true,則表示公平,先啓動的線程先得到鎖。

六、方法 tryAcquire() 、 tryAcquire(int permits)、 tryAcquire(int permits , long timeout , TimeUint unit) 的使用:

  tryAcquire 方法,是 acquire 的擴展版,tryAcquire 做用是嘗試得獲取通路,若是未傳參數,就是嘗試獲取一個通路,若是傳了參數,就是嘗試獲取 permits 個 通路 、在指定時間 timeout  內 嘗試 獲取 permits 個通路。

  上代碼試試看:

  3個類,線程類未變,如下是修改了的兩個類:

package com.cd.concurrent.semaphore;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreService3 extends SemaphoreService { // 之因此繼承 SemaphoreService,僅僅是爲了使用父類的打印時間的方法 0.0

    private Semaphore semaphore = new Semaphore(6, true);// 6表示總共有6個通路,true 表示公平

    public void doSomething() {
        try {
            if (semaphore.tryAcquire(2, 3, TimeUnit.SECONDS)) { // 在 3秒 內 嘗試獲取 2 個通路

                System.out.println(Thread.currentThread().getName() + ":doSomething start-" + getFormatTimeStr());
                Thread.sleep(2000);
                System.out.println(Thread.currentThread().getName() + ":doSomething end-" + getFormatTimeStr()
                        + ",當前是否有進程等待:" + semaphore.hasQueuedThreads() + ",等待進程數:" + semaphore.getQueueLength());
                semaphore.release(2); // 釋放佔用的 2 個通路
            } else {
                System.out.println(Thread.currentThread().getName() + ":doSomething 沒有獲取到鎖-準備退出-" + getFormatTimeStr());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public int availablePermits() {
        return semaphore.availablePermits();
    }
}
package com.cd.concurrent.semaphore;

public class SemaphoreTest3 {
    public static void main(String args[]) {
        SemaphoreService3 service = new SemaphoreService3(); // 使用總 6 通路,每一個線程佔用2通路,嘗試獲取鎖
        for (int i = 0; i < 10; i++) {
            MyThread t = new MyThread("thread" + (i + 1), service);
            t.start();
        }
    }
}

SemaphoreTest3 運行結果:

 

 

 

七、多進路-多處理 vs 多進路-單處理

  在上面的代碼中,咱們之因此能夠實現單處理,是由於在上面的全部線程都共有了同一個 Semaphore 來進行進程處理,那麼若是 Semaphore 自己就是進程的一部分呢,會怎麼樣呢?

  好比,修改 第一個例子中的 SemaphoreTest  以下: 

package com.cd.concurrent.semaphore;

public class SemaphoreTest {
    public static void main(String args[]) {
        for (int i = 0; i < 10; i++) {
            SemaphoreService service = new SemaphoreService();
            MyThread t = new MyThread("thread" + (i + 1), service);
            t.start();// 這裏使用 t.run() 也能夠運行,可是不是併發執行了 
        }
    }
}

運行 SemaphoreTest 結果:

 

 

全部線程同時執行了。

  若是 SemaphoreTest  類不進行修改,如何實現第一個例子 中的 單處理呢?

  也簡單,修改 SemaphoreService ,代碼以下:

package com.cd.concurrent.semaphore;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Semaphore;

public class SemaphoreService {

    private static SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

    private Semaphore semaphore = new Semaphore(2);// 同步關鍵類,構造方法傳入的數字是多少,則同一個時刻,只運行多少個進程同時運行制定代碼

    public void doSomething() {
        try {
            /**
             * 在 semaphore.acquire() 和 semaphore.release()之間的代碼,同一時刻只容許制定個數的線程進入,
             * 由於semaphore的構造方法是1,則同一時刻只容許一個線程進入,其餘線程只能等待。
             * */
            semaphore.acquire();

            doSomethingMain(); // 將主要處理部分封裝成一個方法

            semaphore.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static synchronized void doSomethingMain() throws InterruptedException {
        System.out.println(Thread.currentThread().getName() + ":doSomething start-" + getFormatTimeStr());
        Thread.sleep(2000);
        System.out.println(Thread.currentThread().getName() + ":doSomething end-" + getFormatTimeStr());
    }

    public static String getFormatTimeStr() {
        return sf.format(new Date());
    }
}
注意:doSomethingMain() 方法必須是 static synchronized 的才行,由於 多線程調用的話,static 方法是類方法,這樣 synchronized 同步 才能針對整個類同步,不然 就只能針對單線程多個地方調用同步。

   修改 SemaphoreService ,運行 SemaphoreTest 結果:

 

 運行達到想要的效果。

  這裏,拋出一個問題,上面的代碼,不用 synchronized 實現,而使用 ReentrantLock 來實現,按理說會更好的,緣由以下:

    synchronized 是 jvm 層面的實現,ReentrantLock 是 jdk 層面的實現,synchronized 的缺點以下:

    1)不能響應中斷;

    2)同一時刻不論是讀仍是寫都只能有一個線程對共享資源操做,其餘線程只能等待

    3)鎖的釋放由虛擬機來完成,不用人工干預,不過此即便缺點也是優勢,優勢是不用擔憂會形成死鎖,缺點是由可能獲取到鎖的線程阻塞以後其餘線程會一直等待,性能不高。

  而lock接口的提出就是爲了完善synchronized的不完美的,首先lock是基於jdk層面實現的接口,和虛擬機層面不是一個概念;其次對於lock對象中的多個方法的調用,能夠靈活控制對共享資源變量的操做,不論是讀操做仍是寫操做

  那麼上面的代碼若是使用 ReentrantLock 來實現,豈不是更好嗎?好,修改 SemaphoreService:

package com.cd.concurrent.semaphore;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.ReentrantLock;

public class SemaphoreService {

    private static SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

    private Semaphore semaphore = new Semaphore(2);// 同步關鍵類,構造方法傳入的數字是多少,則同一個時刻,只運行多少個進程同時運行制定代碼

    private ReentrantLock lock = new ReentrantLock();

    public void doSomething() {
        try {
            /**
             * 在 semaphore.acquire() 和 semaphore.release()之間的代碼,同一時刻只容許制定個數的線程進入,
             * 由於semaphore的構造方法是1,則同一時刻只容許一個線程進入,其餘線程只能等待。
             * */
            semaphore.acquire();

            lock.lock();
            doSomethingMain(); // 將主要處理部分封裝成一個方法            

            semaphore.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    private void doSomethingMain() throws InterruptedException {
        System.out.println(Thread.currentThread().getName() + ":doSomething start-" + getFormatTimeStr());
        Thread.sleep(2000);
        System.out.println(Thread.currentThread().getName() + ":doSomething end-" + getFormatTimeStr());
    }

    public static String getFormatTimeStr() {
        return sf.format(new Date());
    }
}

運行 SemaphoreTest 結果:

 

 和預期的不同呀,10個線程基本是同時執行了,那麼問題出在哪裏呢?

由於使用的不是同一個 SemaphoreService 對象實例,全部是多個鎖分別加在了多個 SemaphoreService 實例中,就至關於沒有鎖

相關文章
相關標籤/搜索