高併發之Phaser、ReadWriteLock、StampedLock

本系列研究總結高併發下的幾種同步鎖的使用以及之間的區別,分別是:ReentrantLock、CountDownLatch、CyclicBarrier、Phaser、ReadWriteLock、StampedLock、Semaphore、Exchanger、LockSupport。因爲博客園對博客字數的要求限制,會分爲三個篇幅:javascript

高併發之ReentrantLock、CountDownLatch、CyclicBarrierhtml

高併發之Phaser、ReadWriteLock、StampedLockjava

高併發之Semaphore、Exchanger、LockSupportshell

Phaser

Phaser是JDK7開始引入的一個同步工具類,適用於一些須要分階段的任務的處理。它的功能與 CyclicBarrierCountDownLatch有些相似,功能上與 CountDownLatch 和 CyclicBarrier相似但支持的場景更加靈活相似於一個多階段的柵欄,而且功能更強大,咱們來比較下這三者的功能:多線程

同步器 做用
CountDownLatch 倒數計數器,初始時設定計數器值,線程能夠在計數器上等待,當計數器值歸0後,全部等待的線程繼續執行
CyclicBarrier 循環柵欄,初始時設定參與線程數,當線程到達柵欄後,會等待其它線程的到達,當到達柵欄的總數知足指定數後,全部等待的線程繼續執行
Phaser 多階段柵欄,能夠在初始時設定參與線程數,也能夠中途註冊/註銷參與者,當到達的參與者數量知足柵欄設定的數量後,會進行階段升級(advance)

使用場景

相對於前面的CyclicBarrier和CountDownLatch而言,這個稍微有一些難以理解,這兒引入一個場景:結婚併發

一場婚禮中勢必分紅不少個階段,例如賓客到齊、舉行婚禮、新郎新娘拜天地、入洞房、吃宴席、賓客離開等,若是把不一樣的人當作是不一樣的線程的話,那麼不一樣的線程所要到的階段是不同的,例如新郎新娘可能要走徹底流程,而賓客可能只是其中的幾步而已。dom

代碼示例:ide

Person高併發

static class Person {
        String name;

        public Person(String name) {
            this.name = name;
        }

        public void arrive() {
            milliSleep(r.nextInt(1000));
            System.out.printf("%s 到達現場!\n", name);
        }

        public void eat() {
            milliSleep(r.nextInt(1000));
            System.out.printf("%s 吃完!\n", name);
        }

        public void leave() {
            milliSleep(r.nextInt(1000));
            System.out.printf("%s 離開!\n", name);
        }

    }
}

MarriagePhaser工具

static class MarriagePhaser extends Phaser {
        @Override
        protected boolean onAdvance(int phase, int registeredParties) {

            switch (phase) {
                case 0:
                    System.out.println("全部人到齊了!");
                    return false;
                case 1:
                    System.out.println("全部人吃完了!");
                    return false;
                case 2:
                    System.out.println("全部人離開了!");
                    System.out.println("婚禮結束!");
                    return true;
                default:
                    return true;
            }
        }
    }

TestPhaser

public class TestPhaser {
    static Random r = new Random();
    static MarriagePhaser phaser = new MarriagePhaser();

    static void milliSleep(int milli) {
        try {
            TimeUnit.MILLISECONDS.sleep(milli);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {

        phaser.bulkRegister(5);

        for(int i=0; i<5; i++) {
            final int nameIndex = i;
            new Thread(()->{

                Person p = new Person("person " + nameIndex);
                p.arrive();
                phaser.arriveAndAwaitAdvance();

                p.eat();
                phaser.arriveAndAwaitAdvance();

                p.leave();
                phaser.arriveAndAwaitAdvance();
            }).start();
        }

    }

打印結果

person 0 到達現場!
person 2 到達現場!
person 4 到達現場!
person 1 到達現場!
person 3 到達現場!
全部人到齊了!
person 2 吃完!
person 0 吃完!
person 4 吃完!
person 3 吃完!
person 1 吃完!
全部人吃完了!
person 3 離開!
person 1 離開!
person 0 離開!
person 4 離開!
person 2 離開!
全部人離開了!
婚禮結束!

Phaser常見的方法

Phaser() //默認的構造方法,初始化註冊的線程數量爲0
Phaser(int parties)//一個指定線程數量的構造方法

此外Phaser還支持Tiering類型具備父子關係的構造方法,主要是爲了減小在註冊者數量龐大的時候,經過分組的形式複用Phaser從而減小競爭,提升吞吐,這種形式通常不常見,因此這裏再也不說起,有興趣的能夠參考官網文檔。

其餘幾個常見方法:

register()//添加一個新的註冊者
bulkRegister(int parties)//添加指定數量的多個註冊者
arrive()// 到達柵欄點直接執行,無須等待其餘的線程
arriveAndAwaitAdvance()//到達柵欄點,必須等待其餘全部註冊者到達
arriveAndDeregister()//到達柵欄點,註銷本身無須等待其餘的註冊者到達
onAdvance(int phase, int registeredParties)//多個線程達到註冊點以後,會調用該方法。

  • arriveAndAwaitAdvance() 當前線程當前階段執行完畢,等待其它線程完成當前階段。若是當前線程是該階段最後一個未到達的,則該方法直接返回下一個階段的序號(階段序號從0開始),同時其它線程的該方法也返回下一個階段的序號。
  • arriveAndDeregister() 該方法當即返回下一階段的序號,而且其它線程須要等待的個數減一,而且把當前線程從以後須要等待的成員中移除。若是該Phaser是另一個Phaser的子Phaser(層次化Phaser會在後文中講到),而且該操做致使當前Phaser的成員數爲0,則該操做也會將當前Phaser從其父Phaser中移除。
  • arrive()該方法不做任何等待,直接返回下一階段的序號。
  • awaitAdvance(int phase) 該方法等待某一階段執行完畢。若是當前階段不等於指定的階段或者該Phaser已經被終止,則當即返回。該階段數通常由arrive()方法或者arriveAndDeregister()方法返回。返回下一階段的序號,或者返回參數指定的值(若是該參數爲負數),或者直接返回當前階段序號(若是當前Phaser已經被終止)。
  • awaitAdvanceInterruptibly(int phase) 效果與awaitAdvance(int phase)至關,惟一的不一樣在於若該線程在該方法等待時被中斷,則該方法拋出InterruptedException
  • awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) 效果與awaitAdvanceInterruptibly(int phase)至關,區別在於若是超時則拋出TimeoutException
  • bulkRegister(int parties) 註冊多個party。若是當前phaser已經被終止,則該方法無效,並返回負數。若是調用該方法時,onAdvance方法正在執行,則該方法等待其執行完畢。若是該Phaser有父Phaser則指定的party數大於0,且以前該Phaser的party數爲0,那麼該Phaser會被註冊到其父Phaser中。
  • forceTermination() 強制讓該Phaser進入終止狀態。已經註冊的party數不受影響。若是該Phaser有子Phaser,則其全部的子Phaser均進入終止狀態。若是該Phaser已經處於終止狀態,該方法調用不形成任何影響。

ReadWriteLock

根據翻譯,讀寫鎖,顧名思義,在讀的時候上讀鎖,在寫的時候上寫鎖,這樣就很巧妙的解決synchronized的一個性能問題:讀與讀之間互斥。

ReadWriteLock也是一個接口,原型以下:

public interface ReadWriteLock {
    Lock readLock();
    Lock writeLock();
}

該接口只有兩個方法,讀鎖和寫鎖。也就是說,咱們在寫文件的時候,能夠將讀和寫分開,分紅2個鎖來分配給線程,從而能夠作到讀和讀互不影響,讀和寫互斥,寫和寫互斥,提升讀寫文件的效率。該接口也有一個實現類ReentrantReadWriteLock,下面咱們就來學習下這個類。

咱們先看一下,多線程同時讀取文件時,用synchronized實現的效果,代碼以下:

public class ReadAndWriteLock {

    public synchronized void get(Thread thread) {
        long start = System.currentTimeMillis();
        for(int i=0; i<5; i++){
            try {
                Thread.sleep(20);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(thread.getName() + ":正在進行讀操做……");
        }
        System.out.println(thread.getName() + ":讀操做完畢!");
        long end = System.currentTimeMillis();
        System.out.println("用時:"+(end-start)+"ms");
    }

    public static void main(String[] args) {
        final ReadAndWriteLock lock = new ReadAndWriteLock();
        new Thread(new Runnable() {
            @Override
            public void run() {
                lock.get(Thread.currentThread());
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                lock.get(Thread.currentThread());
            }
        }).start();
    }
}

測試結果以下:

Thread-1:正在進行讀操做……
Thread-1:正在進行讀操做……
Thread-1:正在進行讀操做……
Thread-1:正在進行讀操做……
Thread-1:正在進行讀操做……
Thread-1:讀操做完畢!
用時:112ms
Thread-0:正在進行讀操做……
Thread-0:正在進行讀操做……
Thread-0:正在進行讀操做……
Thread-0:正在進行讀操做……
Thread-0:正在進行讀操做……
Thread-0:讀操做完畢!
用時:107ms

咱們能夠看到,即便是在讀取文件,在加了synchronized關鍵字以後,讀與讀之間,也是互斥的,也就是說,必須等待Thread-0讀完以後,纔會輪到Thread-1線程讀,而沒法作到同時讀文件,這種狀況在大量線程同時都須要讀文件的時候,讀寫鎖的效率,明顯要高於synchronized關鍵字的實現。下面咱們來測試一下,代碼以下:

public class ReadAndWriteLock {
	ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
	public void get(Thread thread) {
		lock.readLock().lock();
		try{
			System.out.println("start time:"+System.currentTimeMillis());
			for(int i=0; i<5; i++){
				try {
					Thread.sleep(20);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println(thread.getName() + ":正在進行讀操做……");
			}
			System.out.println(thread.getName() + ":讀操做完畢!");
			System.out.println("end time:"+System.currentTimeMillis());
		}finally{
			lock.readLock().unlock();
		}
	}
	
	public static void main(String[] args) {
		final ReadAndWriteLock lock = new ReadAndWriteLock();
		new Thread(new Runnable() {
			@Override
			public void run() {
				lock.get(Thread.currentThread());
			}
		}).start();
		
		new Thread(new Runnable() {
			@Override
			public void run() {
				lock.get(Thread.currentThread());
			}
		}).start();
	}
}

注意的是,若是有一個線程已經佔用了讀鎖,則此時其餘線程若是要申請寫鎖,則申請寫鎖的線程會一直等待釋放讀鎖。若是有一個線程已經佔用了寫鎖,則此時其餘線程若是申請寫鎖或者讀鎖,則申請的線程會一直等待釋放寫鎖。讀鎖和寫鎖是互斥的。

下面咱們來驗證下讀寫鎖的互斥關係,代碼以下:

public class ReadAndWriteLock {
   ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
	public static void main(String[] args) {
		final ReadAndWriteLock lock = new ReadAndWriteLock();
    // 建N個線程,同時讀
		ExecutorService service = Executors.newCachedThreadPool();
		service.execute(new Runnable() {
			@Override
			public void run() {
				lock.readFile(Thread.currentThread());
			}
		});
		// 建N個線程,同時寫
		ExecutorService service1 = Executors.newCachedThreadPool();
		service1.execute(new Runnable() {
			@Override
			public void run() {
				lock.writeFile(Thread.currentThread());
			}
		});
	}
	// 讀操做
	public void readFile(Thread thread){
		lock.readLock().lock();
		boolean readLock = lock.isWriteLocked();
		if(!readLock){
			System.out.println("當前爲讀鎖!");
		}
		try{
			for(int i=0; i<5; i++){
				try {
					Thread.sleep(20);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println(thread.getName() + ":正在進行讀操做……");
			}
			System.out.println(thread.getName() + ":讀操做完畢!");
		}finally{
         System.out.println("釋放讀鎖!");
			lock.readLock().unlock();
		}
	}
	// 寫操做
	public void writeFile(Thread thread){
		lock.writeLock().lock();
		boolean writeLock = lock.isWriteLocked();
		if(writeLock){
			System.out.println("當前爲寫鎖!");
		}
		try{
			for(int i=0; i<5; i++){
				try {
					Thread.sleep(20);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println(thread.getName() + ":正在進行寫操做……");
			}
			System.out.println(thread.getName() + ":寫操做完畢!");
		}finally{
         System.out.println("釋放寫鎖!");
			lock.writeLock().unlock();
		}
	}
}

測試結果以下:

// 讀鎖和讀鎖測試結果:
當前爲讀鎖!
當前爲讀鎖!
pool-2-thread-1:正在進行讀操做……
pool-1-thread-1:正在進行讀操做……
pool-2-thread-1:正在進行讀操做……
pool-1-thread-1:正在進行讀操做……
pool-2-thread-1:正在進行讀操做……
pool-1-thread-1:正在進行讀操做……
pool-2-thread-1:正在進行讀操做……
pool-1-thread-1:正在進行讀操做……
pool-1-thread-1:正在進行讀操做……
pool-2-thread-1:正在進行讀操做……
pool-1-thread-1:讀操做完畢!
pool-2-thread-1:讀操做完畢!
釋放讀鎖!
釋放讀鎖!
// 測試結果不互斥
 
// 讀鎖和寫鎖,測試結果以下:
當前爲讀鎖!
pool-1-thread-1:正在進行讀操做……
pool-1-thread-1:正在進行讀操做……
pool-1-thread-1:正在進行讀操做……
pool-1-thread-1:正在進行讀操做……
pool-1-thread-1:正在進行讀操做……
pool-1-thread-1:讀操做完畢!
釋放讀鎖!
當前爲寫鎖!
pool-2-thread-1:正在進行寫操做……
pool-2-thread-1:正在進行寫操做……
pool-2-thread-1:正在進行寫操做……
pool-2-thread-1:正在進行寫操做……
pool-2-thread-1:正在進行寫操做……
pool-2-thread-1:寫操做完畢!
釋放寫鎖!
// 測試結果互斥
 
// 寫鎖和寫鎖,測試結果以下:
當前爲寫鎖!
pool-1-thread-1:正在進行寫操做……
pool-1-thread-1:正在進行寫操做……
pool-1-thread-1:正在進行寫操做……
pool-1-thread-1:正在進行寫操做……
pool-1-thread-1:正在進行寫操做……
pool-1-thread-1:寫操做完畢!
釋放寫鎖!
當前爲寫鎖!
pool-2-thread-1:正在進行寫操做……
pool-2-thread-1:正在進行寫操做……
pool-2-thread-1:正在進行寫操做……
pool-2-thread-1:正在進行寫操做……
pool-2-thread-1:正在進行寫操做……
pool-2-thread-1:寫操做完畢!
釋放寫鎖!
// 測試結果互斥

ReadWriteLock小結

使用ReadWriteLock能夠提升讀取效率:

  • ReadWriteLock只容許一個線程寫入;
  • ReadWriteLock容許多個線程在沒有寫入時同時讀取;
  • ReadWriteLock適合讀多寫少的場景。

StampedLock

前面介紹的ReadWriteLock能夠解決多線程同時讀,但只有一個線程能寫的問題。

若是咱們深刻分析ReadWriteLock,會發現它有個潛在的問題:若是有線程正在讀,寫線程須要等待讀線程釋放鎖後才能獲取寫鎖,即讀的過程當中不容許寫,這是一種悲觀的讀鎖。

要進一步提高併發執行效率,Java 8引入了新的讀寫鎖:StampedLock

StampedLockReadWriteLock相比,改進之處在於:讀的過程當中也容許獲取寫鎖後寫入!這樣一來,咱們讀的數據就可能不一致,因此,須要一點額外的代碼來判斷讀的過程當中是否有寫入,這種讀鎖是一種樂觀鎖。

樂觀鎖的意思就是樂觀地估計讀的過程當中大機率不會有寫入,所以被稱爲樂觀鎖。反過來,悲觀鎖則是讀的過程當中拒絕有寫入,也就是寫入必須等待。顯然樂觀鎖的併發效率更高,但一旦有小几率的寫入致使讀取的數據不一致,須要能檢測出來,再讀一遍就行。

咱們來看例子:

public class Point {
    private final StampedLock stampedLock = new StampedLock();

    private double x;
    private double y;

    public void move(double deltaX, double deltaY) {
        long stamp = stampedLock.writeLock(); // 獲取寫鎖
        try {
            x += deltaX;
            y += deltaY;
        } finally {
            stampedLock.unlockWrite(stamp); // 釋放寫鎖
        }
    }

    public double distanceFromOrigin() {
        long stamp = stampedLock.tryOptimisticRead(); // 得到一個樂觀讀鎖
        // 注意下面兩行代碼不是原子操做
        // 假設x,y = (100,200)
        double currentX = x;
        // 此處已讀取到x=100,但x,y可能被寫線程修改成(300,400)
        double currentY = y;
        // 此處已讀取到y,若是沒有寫入,讀取是正確的(100,200)
        // 若是有寫入,讀取是錯誤的(100,400)
        if (!stampedLock.validate(stamp)) { // 檢查樂觀讀鎖後是否有其餘寫鎖發生
            stamp = stampedLock.readLock(); // 獲取一個悲觀讀鎖
            try {
                currentX = x;
                currentY = y;
            } finally {
                stampedLock.unlockRead(stamp); // 釋放悲觀讀鎖
            }
        }
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }
}

ReadWriteLock相比,寫入的加鎖是徹底同樣的,不一樣的是讀取。注意到首先咱們經過tryOptimisticRead()獲取一個樂觀讀鎖,並返回版本號。接着進行讀取,讀取完成後,咱們經過validate()去驗證版本號,若是在讀取過程當中沒有寫入,版本號不變,驗證成功,咱們就能夠放心地繼續後續操做。若是在讀取過程當中有寫入,版本號會發生變化,驗證將失敗。在失敗的時候,咱們再經過獲取悲觀讀鎖再次讀取。因爲寫入的機率不高,程序在絕大部分狀況下能夠經過樂觀讀鎖獲取數據,極少數狀況下使用悲觀讀鎖獲取數據。

可見,StampedLock把讀鎖細分爲樂觀讀和悲觀讀,能進一步提高併發效率。但這也是有代價的:

一是代碼更加複雜

二是StampedLock是不可重入鎖,不能在一個線程中反覆獲取同一個鎖。

StampedLock還提供了更復雜的將悲觀讀鎖升級爲寫鎖的功能,它主要使用在if-then-update的場景:即先讀,若是讀的數據知足條件,就返回,若是讀的數據不知足條件,再嘗試寫。

StampedLock小結

StampedLock提供了樂觀讀鎖,可取代ReadWriteLock以進一步提高併發性能;

StampedLock是不可重入鎖。

相關文章
相關標籤/搜索