Java 8 併發:同步和鎖

原文地址: Java 8 Concurrency Tutorial: Synchronization and Lockshtml

爲了簡單起見,本教程的示例代碼使用了在這裏定義的兩個輔助方法,sleep(seconds)stop(executor)java

Synchronized

當咱們編寫多線程代碼訪問可共享的變量時須要特別注意,下面是一個多線程去改變一個整數的例子。git

定義一個變量 count,定義一個方法 increment() 使 count 增長 1.github

int count = 0;

void increment() {
    count = count + 1;
}
複製代碼

當多個線程同時調用 increment() 時就會出現問題:api

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 10000)
    .forEach(i -> executor.submit(this::increment));

stop(executor);

System.out.println(count);  // 9965
複製代碼

上面的代碼執行結果並非10000,緣由是咱們在不一樣的線程上共享一個變量,而沒有給這個變量的訪問設置競爭條件。安全

爲了增長數字,必須執行三個步驟:(i) 讀取當前值;(ii) 將該值增長1;(iii) 將新值寫入變量;若是兩個線程並行執行這些步驟,則兩個線程可能同時執行步驟1,從而讀取相同的當前值。 這致使寫入丟失,因此實際結果較低。 在上面的示例中,35個增量因爲併發非同步訪問計數而丟失,可是當你本身執行代碼時可能會看到不一樣的結果。bash

幸運的是,Java 早期經過 synchronized 關鍵字支持線程同步。增長計數時,咱們能夠利用同步來解決上述競爭條件:多線程

synchronized void incrementSync() {
    count = count + 1;
}
複製代碼

當咱們使用 incrementSync() 方法時,咱們獲得了但願的結果,並且每次執行的結果都是這樣的。併發

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 10000)
    .forEach(i -> executor.submit(this::incrementSync));

stop(executor);

System.out.println(count);  // 10000
複製代碼

synchronized 關鍵值也能夠用在一個語句塊中oracle

void incrementSync() {
    synchronized (this) {
        count = count + 1;
    }
}
複製代碼

JVM 的內部使用了一個監視器,也能夠稱爲監視器鎖和內部鎖來管理同步。這個監視器被綁定到一個對象上,當使用同步方法時,每一個方法共享相應對象的監視器。

全部隱式監視器都實現了可重入特性。 可重入意味着鎖被綁定到當前線程,線程能夠安全地屢次獲取相同的鎖,而不會發生死鎖(例如同步方法在同一對象上調用另外一個同步方法)。

Locks

除了使用關鍵字 synchronized 支持的隱式鎖(對象的內置鎖)外,Concurrency API 支持由 Lock 接口指定的各類顯示鎖。顯示鎖能控制更細的粒度,所以也有更好的性能,在邏輯上也比較清晰。

標準 JDK中提供了多種顯示鎖的實現,將在下面的章節中進行介紹。

ReentrantLock

ReentrantLock 類是一個互斥鎖,它和 synchronized 關鍵字訪問的隱式鎖具備相同的功能,但它具備擴展功能。它也實現了可重入的功能。

下面來看看如何使用 ReentrantLock

ReentrantLock lock = new ReentrantLock();
int count = 0;

void increment() {
    lock.lock();
    try {
        count++;
    } finally {
        lock.unlock();
    }
}
複製代碼

鎖經過 lock() 獲取,經過 unlock() 釋放,將代碼封裝到 try/finally 塊中是很是重要的,以確保在出現異常的時候也能釋放鎖。這個方法和使用關鍵字 synchronized 修飾的方法是同樣是線程安全的。若是一個線程已經得到了鎖,後續線程調用 lock() 會暫停線程,直到鎖被釋放,永遠只有一個線程能獲取鎖。

lock 支持更細粒度的去控制一個方法的同步,以下面的代碼:

ExecutorService executor = Executors.newFixedThreadPool(2);
ReentrantLock lock = new ReentrantLock();

executor.submit(() -> {
    lock.lock();
    try {
        sleep(1000);
    } finally {
        lock.unlock();
    }
});

executor.submit(() -> {
    System.out.println("Locked: " + lock.isLocked());
    System.out.println("Held by me: " + lock.isHeldByCurrentThread());
    boolean locked = lock.tryLock();
    System.out.println("Lock acquired: " + locked);
});

stop(executor);
複製代碼

當第一個任務獲取鎖時,第二個任務獲取鎖的狀態信息:

Locked: true
Held by me: false
Lock acquired: false
複製代碼

做爲 lock() 方法的替代方法 tryLock() 嘗試去獲取鎖而不暫停當前線程,必須使用 bool 結果去判斷是否真的獲取到了鎖。

ReadWriteLock

ReadWriteLock 指定了另外一種類型的鎖,即讀寫鎖。讀寫鎖實現的邏輯是,當沒有線程在寫這個變量時,其餘的線程能夠讀取這個變量,因此就是當沒有線程持有寫鎖時,讀鎖就能夠被全部的線程持有。若是讀取比寫更頻繁,這將增長系統的性能和吞吐量。

ExecutorService executor = Executors.newFixedThreadPool(2);
Map<String, String> map = new HashMap<>();
ReadWriteLock lock = new ReentrantReadWriteLock();

executor.submit(() -> {
    lock.writeLock().lock();
    try {
        sleep(1000);
        map.put("foo", "bar");
    } finally {
        lock.writeLock().unlock();
    }
});
複製代碼

上面的例子首先獲取一個寫入鎖,在 sleep 1秒後在 map 中寫入值,在這個任務完成以前,還有兩個任務正在提交,試圖從 map 讀取值:

Runnable readTask = () -> {
    lock.readLock().lock();
    try {
        System.out.println(map.get("foo"));
        sleep(1000);
    } finally {
        lock.readLock().unlock();
    }
};

executor.submit(readTask);
executor.submit(readTask);

stop(executor);
複製代碼

當執行上面的代碼時,你會注意到兩人讀取的任務必須等待直到寫入完成(當在讀取的時候,寫是不能獲取鎖的)。寫入鎖釋放後,兩個任務並行執行,它們沒必要等待對方是否完成,由於只要沒有線程持有寫入鎖,它們就能夠同時持有讀取鎖。

StampedLock

Java 8 提供了一種新類型的鎖 StampedLock,像上面的例子同樣它也支持讀寫鎖,與 ReadWriteLock 不一樣的是,StampedLock 的鎖定方法返回一個 long 值,能夠利用這個值檢查是否釋放鎖和鎖仍然有效。另外 StampedLock 支持另一種稱爲樂觀鎖的模式。

下面使用 StampedLock 來替換 ReadWriteLock

ExecutorService executor = Executors.newFixedThreadPool(2);
Map<String, String> map = new HashMap<>();
StampedLock lock = new StampedLock();

executor.submit(() -> {
    long stamp = lock.writeLock();
    try {
        sleep(1000);
        map.put("foo", "bar");
    } finally {
        lock.unlockWrite(stamp);
    }
});

Runnable readTask = () -> {
    long stamp = lock.readLock();
    try {
        System.out.println(map.get("foo"));
        sleep(1000);
    } finally {
        lock.unlockRead(stamp);
    }
};

executor.submit(readTask);
executor.submit(readTask);

stop(executor);
複製代碼

經過 readLock()writeLock() 方法來獲取讀寫鎖會返回一個稍後用於在 finally 塊中釋放鎖的值。注意,這裏的鎖不是可重入的。每次鎖定都會返回一個新的值,並在沒有鎖的狀況下阻塞,在使用的時候要注意不要死鎖。

就像前面 ReadWriteLock 中的示例同樣,兩個讀取任務必須等待寫入任務釋放鎖。而後同時並行執行打印結果到控制檯。

下面的例子演示了樂觀鎖

ExecutorService executor = Executors.newFixedThreadPool(2);
StampedLock lock = new StampedLock();

executor.submit(() -> {
    long stamp = lock.tryOptimisticRead();
    try {
        System.out.println("Optimistic Lock Valid: " + lock.validate(stamp));
        sleep(1000);
        System.out.println("Optimistic Lock Valid: " + lock.validate(stamp));
        sleep(2000);
        System.out.println("Optimistic Lock Valid: " + lock.validate(stamp));
    } finally {
        lock.unlock(stamp);
    }
});

executor.submit(() -> {
    long stamp = lock.writeLock();
    try {
        System.out.println("Write Lock acquired");
        sleep(2000);
    } finally {
        lock.unlock(stamp);
        System.out.println("Write done");
    }
});

stop(executor);
複製代碼

經過調用 tryOptimisticRead() 來獲取樂觀讀寫鎖tryOptimisticRead()老是返回一個值,而不會阻塞當前線程,也不關鎖是否可用。若是有一個寫鎖激活則返回0。能夠經過 lock.validate(stamp) 來檢查返回的標記(long 值)是否有效。

執行上面的代碼輸出:

Optimistic Lock Valid: true
Write Lock acquired
Optimistic Lock Valid: false
Write done
Optimistic Lock Valid: false
複製代碼

樂觀鎖在得到鎖後當即生效。與普通讀鎖相反,樂觀鎖不會阻止其餘線程當即得到寫鎖。在第一個線程休眠一秒以後,第二個線程得到一個寫鎖,而不用等待樂觀讀鎖解除。樂觀的讀鎖再也不有效,即便寫入鎖定被釋放,樂觀的讀取鎖仍然無效。

所以,在使用樂觀鎖時,必須在每次訪問任何共享的變量後驗證鎖,以確保讀取仍然有效。

有時將讀鎖轉換爲寫鎖並不須要再次解鎖和鎖定是有用的。StampedLock 爲此提供了tryConvertToWriteLock() 方法,以下面的示例所示:

ExecutorService executor = Executors.newFixedThreadPool(2);
StampedLock lock = new StampedLock();

executor.submit(() -> {
    long stamp = lock.readLock();
    try {
        if (count == 0) {
            stamp = lock.tryConvertToWriteLock(stamp);
            if (stamp == 0L) {
                System.out.println("Could not convert to write lock");
                stamp = lock.writeLock();
            }
            count = 23;
        }
        System.out.println(count);
    } finally {
        lock.unlock(stamp);
    }
});

stop(executor);
複製代碼

該任務首先得到一個讀鎖,並將當前的變量計數值打印到控制檯。 可是,若是當前值爲 0,咱們要分配一個新的值23。咱們首先必須將讀鎖轉換爲寫鎖,以不打破其餘線程的潛在併發訪問。 調用 tryConvertToWriteLock() 不會阻塞,但可能會返回 0,指示當前沒有寫鎖定可用。 在這種狀況下,咱們調用writeLock()來阻塞當前線程,直到寫鎖可用。

Semaphores

除了鎖以外,併發API還支持計數信號量。 鎖一般授予對變量或資源的獨佔訪問權,而信號量則可以維護整套許可證。 在不一樣的狀況下,必須限制對應用程序某些部分的併發訪問量。

下面是一個如何限制對長時間任務的訪問的例子:

ExecutorService executor = Executors.newFixedThreadPool(10);

Semaphore semaphore = new Semaphore(5);

Runnable longRunningTask = () -> {
    boolean permit = false;
    try {
        permit = semaphore.tryAcquire(1, TimeUnit.SECONDS);
        if (permit) {
            System.out.println("Semaphore acquired");
            sleep(5000);
        } else {
            System.out.println("Could not acquire semaphore");
        }
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    } finally {
        if (permit) {
            semaphore.release();
        }
    }
};

IntStream.range(0, 10)
    .forEach(i -> executor.submit(longRunningTask));

stop(executor);
複製代碼

執行程序能夠同時運行10個任務,可是咱們使用5信號量,所以限制併發訪問爲5個。使用try/finally塊,即便在異常的狀況下正確釋放信號量也是很是重要的。

運行上面的代碼輸出:

Semaphore acquired
Semaphore acquired
Semaphore acquired
Semaphore acquired
Semaphore acquired
Could not acquire semaphore
Could not acquire semaphore
Could not acquire semaphore
Could not acquire semaphore
Could not acquire semaphore
複製代碼

當有 5 個任務獲取型號量後,隨後的任務便不能獲取信號量了。可是若是前面 5 的任務執行完成,finally 塊釋放了型號量,隨後的線程就能夠獲取星號量了,總數不會超過5個。這裏調用 tryAcquire() 獲取型號量設置了超時時間1秒,意味着當線程獲取信號量失敗後能夠阻塞等待1秒再獲取。

相關文章
相關標籤/搜索