多線程(二)

原子操做:

原子(atomic)本意是「不能被進一步分割的最小粒子」,而原子操做(atomic operation)意爲「不可被中斷的一個或一系列操做」。java

CAS( Compare(比較) And Swap(交換) )  爲何要有CAS?

count++,並非一個原子操做。node

Compare And Swap就是比較而且交換的一個原子操做,由Cpu在指令級別上進行保證。數據庫

CAS包含三個參數:一、變量所在內存地址V;二、變量應該的值A;三、咱們將要修改的值B。若是說V上的變量的值時A的話,就用B從新賦值,若是不是A,那就什麼事也不作,操做的返回結果原值是多少。數組

循環CAS:在一個(死)循環【for(;;)】裏不斷進行CAS操做,直到成功爲止(自旋操做)。安全

CAS實現原子操做的三大問題

一、 ABA問題:其餘的線程把值改爲了C,很快改爲了A。解決ABA,引入版本號:1A-》2C-》3Adom

二、 循環時間很長的話,cpu的負荷比較大ide

三、 對一個變量進行操做能夠,同時操做多個共享變量有點麻煩工具

CAS線程安全

經過硬件層面的阻塞實現原子操做的安全性能

原子更新基本類型類

AtomicBoolean,AtomicInteger,AtomicLong,AtomicReference。測試

AtomicInteger的經常使用方法以下

·int addAndGet(int delta): 和delta

·boolean compareAndSet(int expect,int update): 

·int getAndIncrement(): 原子遞增,可是返回的是自增之前的值

int incrementAndGet原子遞增,可是返回的是自增之後的值

·int getAndSet(int newValue): 

public class AtomicIntTest {
	static AtomicInteger ai = new AtomicInteger(1);

	public static void main(String[] args) {
		System.out.println(ai.getAndIncrement());// ai的值獲取後+1
		System.out.println(ai.incrementAndGet());// ai的值 +1後獲取
		System.out.println(ai.get());// 獲取ai的值
		System.out.println(ai.addAndGet(2));// ai的值和2相加後返回
		System.out.println(ai.compareAndSet(5, 2));// 比較ai是否等於2 若是相等則set ai=2
		System.out.println(ai.getAndSet(3));//獲取ai後設置ai=3
		System.out.println(ai.get());
	}
}
運行結果
1
3
3
5
true
2
3

原子更新數組類

AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray

AtomicIntegerArray類主要是提供原子的方式更新數組裏的整型,

其經常使用方法以下。

·int addAndGet(int i,int delta): 

·boolean compareAndSet(int i,int expect,int update): 

數組經過構造方法傳入,類會將數組複製一份,原數組不會發生變化。

public class AtomicArray {
	static int[] value = new int[] { 1, 2 };
	static AtomicIntegerArray ai = new AtomicIntegerArray(value);

	public static void main(String[] args) {
		ai.getAndSet(0, 3);
		System.out.println(ai.get(0));// 操做的是構成參數數組的複製
		System.out.println(value[0]);// 原構造參數的數組不會被操做
	}

}
運行結果
3
1

原子更新引用類型提供的類。

·AtomicReference: 能夠解決更新多個變量的問題

·AtomicStampedReference:解決ABA問題

·AtomicMarkableReference:解決ABA問題

public class AtomicRef {

    static AtomicReference<User> userAtomicReference = new AtomicReference<>();

    public static void main(String[] args) {
        User user = new User("Mark",25);
        userAtomicReference.set(user);
        User updateUser = new User("Mike",26);
        userAtomicReference.compareAndSet(user,updateUser);
        System.out.println(userAtomicReference.get().getName());
        System.out.println(userAtomicReference.get().getOld());
    }

    static class User{ 
        private String name;
        private int old;

        public User(String name, int old) {
            this.name = name;
            this.old = old;
        }

        public String getName() {
            return name;
        }

        public int getOld() {
            return old;
        }
    }

}
運行結果
Mike
26

 

原子更新字段類,

Atomic包提供瞭如下3個類進行原子字段更新。

·AtomicReferenceFieldUpdater: 

·AtomicIntegerFieldUpdater: 

·AtomicLongFieldUpdater:

輸出結果

有了synchronized爲何還要Lock?

一、 嘗試非阻塞地獲取鎖

二、 獲取鎖的過程能夠被中斷

三、 超時獲取鎖

Lock的標準用法

public class LockTemplete {

	public static void main(String[] args) {
		Lock lock = new ReentrantLock(); // 可重入
		lock.lock();// 加鎖
		try {
			// do my work.....
		} finally {
			lock.unlock();// 釋放鎖
		}
	}

}

Lock的經常使用API

Lock()

lockInterruptibly:可中斷

tryLock嘗試非阻塞地獲取鎖

unlock()

鎖的可重入

遞歸的時候發生鎖的重入,沒有鎖的可重入,就會死鎖

公平和非公平鎖

公平鎖,先對鎖發出獲取請求的必定先被知足。公平鎖的效率比非公平鎖效率要低。

非公平鎖,可插隊

讀寫鎖ReentrantReadWriteLock

容許多個讀線程同時進行,可是隻容許一個寫線程(不容許其餘讀線程和寫線程),支持讀多寫少場景,性能會有提高。

/**
 * 讀寫鎖的使用
 */
public class RwLockTemplete {

	static final Map<String, String> map = new HashMap<>();
	static ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
	static Lock r = reentrantReadWriteLock.readLock();
	static Lock w = reentrantReadWriteLock.writeLock();

	public void put() {
		w.lock();
		try {
			// do my work.....
		} finally {
			w.unlock();
		}
	}

	public void get() {
		r.lock();
		try {
			// do my work.....
		} finally {
			r.unlock();
		}
	}

}

synchronized和lock比較

public class GoodsVo {

    private final String id;
    private int totalSaleNumber;//總銷售數
    private int depotNumber;//當前庫存數

    public GoodsVo(String id, int totalSaleNumber, int depotNumber) {
        this.id = id;
        this.totalSaleNumber = totalSaleNumber;
        this.depotNumber = depotNumber;
    }

    public int getTotalSaleNumber() {
        return totalSaleNumber;
    }

    public int getDepotNumber() {
        return depotNumber;
    }

    public void setGoodsVoNumber(int changeNumber){
        this.totalSaleNumber += changeNumber;
        this.depotNumber -= changeNumber;
    }
}
public interface IGoodsNum {

    public GoodsVo getGoodsNumber();
    public void setGoodsNumber(int changeNumber);
}
/**
 * synchronized操做
 */
public class NumSyn implements IGoodsNum {

	private GoodsVo goods;

	public NumSyn(GoodsVo goods) {
		this.goods = goods;
	}

	@Override
	public synchronized GoodsVo getGoodsNumber() {
		try {
			Thread.sleep(5);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return this.goods;
	}

	@Override
	public synchronized void setGoodsNumber(int changeNumber) {

		try {
			Thread.sleep(50);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		this.goods.setGoodsVoNumber(changeNumber);

	}
}
/**
 * lock操做
 */
public class RwNumImpl implements IGoodsNum {

	private GoodsVo goods;

	private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
	private final Lock r = lock.readLock();
	private final Lock w = lock.writeLock();

	public RwNumImpl(GoodsVo goods) {
		this.goods = goods;
	}

	@Override
	public GoodsVo getGoodsNumber() {
		r.lock();
		try {
			try {
				Thread.sleep(5);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			return this.goods;
		} finally {
			r.unlock();
		}
	}

	@Override
	public void setGoodsNumber(int changeNumber) {
		w.lock();
		try {
			try {
				Thread.sleep(50);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}

			this.goods.setGoodsVoNumber(changeNumber);
		} finally {
			w.unlock();
		}
	}
}
/**
 * 測試
 */
public class Test {

	static final int threadRatio = 10;
	static final int threadBaseCount = 3;
	static CountDownLatch countDownLatch = new CountDownLatch(1);

	// 模擬實際的數據庫讀操做
	private static class ReadThread implements Runnable {

		private IGoodsNum goodsNum;

		public ReadThread(IGoodsNum goodsNum) {
			this.goodsNum = goodsNum;
		}

		@Override
		public void run() {
			try {
				countDownLatch.await();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			long start = System.currentTimeMillis();
			for (int i = 0; i < 100; i++) {
				goodsNum.getGoodsNumber();
			}
			long duration = System.currentTimeMillis() - start;
			System.out.println(Thread.currentThread().getName() + "讀取庫存數據耗時:"
					+ duration + "ms");

		}
	}

	// 模擬實際的數據庫寫操做
	private static class WriteThread implements Runnable {

		private IGoodsNum goodsNum;

		public WriteThread(IGoodsNum goodsNum) {
			this.goodsNum = goodsNum;
		}

		@Override
		public void run() {
			try {
				countDownLatch.await();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			long start = System.currentTimeMillis();
			Random r = new Random();
			for (int i = 0; i < 10; i++) {
				try {
					Thread.sleep(50);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				goodsNum.setGoodsNumber(r.nextInt(10));
			}
			long duration = System.currentTimeMillis() - start;
			System.out.println(Thread.currentThread().getName() + "寫庫存數據耗時:"
					+ duration + "ms");

		}
	}

	public static void main(String[] args) throws InterruptedException {
		GoodsVo goodsVo = new GoodsVo("goods001", 100000, 10000);
		IGoodsNum goodsNum = new NumSyn(goodsVo);
		// IGoodsNum goodsNum = new RwNumImpl(goodsVo);
		for (int i = 0; i < threadBaseCount * threadRatio; i++) {
			Thread readT = new Thread(new ReadThread(goodsNum));
			readT.start();
		}
		for (int i = 0; i < threadBaseCount; i++) {
			Thread writeT = new Thread(new WriteThread(goodsNum));
			writeT.start();
		}
		countDownLatch.countDown();

	}
}
synchronized 運行結果

Thread-2讀取庫存數據耗時:500ms
Thread-26讀取庫存數據耗時:1155ms
Thread-24讀取庫存數據耗時:1761ms
Thread-21讀取庫存數據耗時:2606ms
Thread-20讀取庫存數據耗時:3106ms
Thread-18讀取庫存數據耗時:3981ms
Thread-17讀取庫存數據耗時:4481ms
Thread-16讀取庫存數據耗時:4981ms
Thread-14讀取庫存數據耗時:5551ms
Thread-9讀取庫存數據耗時:6469ms
Thread-0讀取庫存數據耗時:7077ms
Thread-11讀取庫存數據耗時:9042ms
Thread-28讀取庫存數據耗時:9962ms
Thread-25讀取庫存數據耗時:10893ms
Thread-19讀取庫存數據耗時:11379ms
Thread-10讀取庫存數據耗時:11759ms
Thread-12讀取庫存數據耗時:11814ms
Thread-6讀取庫存數據耗時:12294ms
Thread-4讀取庫存數據耗時:12954ms
Thread-1讀取庫存數據耗時:13079ms
Thread-7讀取庫存數據耗時:13399ms
Thread-5讀取庫存數據耗時:13769ms
Thread-13讀取庫存數據耗時:14054ms
Thread-15讀取庫存數據耗時:14369ms
Thread-23讀取庫存數據耗時:14479ms
Thread-27讀取庫存數據耗時:14829ms
Thread-29讀取庫存數據耗時:15294ms
Thread-22讀取庫存數據耗時:15424ms
Thread-8讀取庫存數據耗時:15499ms
Thread-3讀取庫存數據耗時:16259ms
Thread-31寫庫存數據耗時:16409ms
Thread-32寫庫存數據耗時:16459ms
Thread-30寫庫存數據耗時:16509ms
lock運行結果

Thread-31寫庫存數據耗時:1398ms
Thread-30寫庫存數據耗時:1503ms
Thread-32寫庫存數據耗時:1663ms
Thread-13讀取庫存數據耗時:2003ms
Thread-8讀取庫存數據耗時:2003ms
Thread-29讀取庫存數據耗時:2003ms
Thread-21讀取庫存數據耗時:2003ms
Thread-20讀取庫存數據耗時:2003ms
Thread-26讀取庫存數據耗時:2003ms
Thread-1讀取庫存數據耗時:2003ms
Thread-4讀取庫存數據耗時:2003ms
Thread-18讀取庫存數據耗時:2003ms
Thread-3讀取庫存數據耗時:2003ms
Thread-7讀取庫存數據耗時:2003ms
Thread-25讀取庫存數據耗時:2003ms
Thread-11讀取庫存數據耗時:2004ms
Thread-6讀取庫存數據耗時:2003ms
Thread-19讀取庫存數據耗時:2004ms
Thread-22讀取庫存數據耗時:2003ms
Thread-23讀取庫存數據耗時:2003ms
Thread-28讀取庫存數據耗時:2003ms
Thread-10讀取庫存數據耗時:2003ms
Thread-16讀取庫存數據耗時:2003ms
Thread-12讀取庫存數據耗時:2003ms
Thread-14讀取庫存數據耗時:2003ms
Thread-27讀取庫存數據耗時:2004ms
Thread-24讀取庫存數據耗時:2004ms
Thread-0讀取庫存數據耗時:2004ms
Thread-15讀取庫存數據耗時:2004ms
Thread-17讀取庫存數據耗時:2004ms
Thread-5讀取庫存數據耗時:2003ms
Thread-2讀取庫存數據耗時:2003ms
Thread-9讀取庫存數據耗時:2003ms

Condition接口有何用處?

Object wait,notify/all  Condition接口和Lock配合來實現等待通知機制

Condition經常使用方法和使用範式

public class ConditionTemplete {

	Lock lock = new ReentrantLock();
	Condition condition = lock.newCondition();

	public void waitc() throws InterruptedException {
		lock.lock();
		try {
			condition.await();// 等待
		} finally {
			lock.unlock();
		}
	}

	public void waitnotify() throws InterruptedException {
		lock.lock();
		try {
			condition.signal();// 喚醒
			// condition.signalAll();儘可能少使用 由於condition和lock已經綁定 不須要喚醒其餘鎖
		} finally {
			lock.unlock();
		}
	}

}

結合ReentrantLock和Condition實現線程安全的有界隊列

public class BlockingQueueLC<T> {
    private List queue = new LinkedList<>();
    private final int limit;
    Lock lock = new ReentrantLock();
    private Condition needNotEmpty = lock.newCondition();
    private Condition needNotFull = lock.newCondition();


    public BlockingQueueLC(int limit) {
        this.limit = limit;
    }

    public void enqueue(T item) throws InterruptedException {
        lock.lock();
        try{
            while(this.queue.size()==this.limit){
                needNotFull.await();
            }
            this.queue.add(item);
            needNotEmpty.signal();
        }finally{
            lock.unlock();
        }
    }

    public  T dequeue() throws InterruptedException {
        lock.lock();
        try{
            while(this.queue.size()==0){
                needNotEmpty.await();
            }
            needNotFull.signal();
            return (T) this.queue.remove(0);
        }finally{
            lock.unlock();
        }
    }
}
public class BqTest {
    public static void main(String[] args) {
        BlockingQueueLC<Integer> bq = new BlockingQueueLC(10);
        Thread threadA = new ThreadPush(bq);
        threadA.setName("Push");
        Thread threadB = new ThreadPop(bq);
        threadB.setName("Pop");
        threadB.start();
        threadA.start();
    }

    private static class ThreadPush extends Thread{
        BlockingQueueLC<Integer> bq;

        public ThreadPush(BlockingQueueLC<Integer> bq) {
            this.bq = bq;
        }

        @Override
        public void run() {
            String threadName = Thread.currentThread().getName();
            int i = 20;
            while(i>0){
                try {
                    Thread.sleep(500);
                    System.out.println(" i="+i+" will push");
                    bq.enqueue(i--);
                } catch (InterruptedException e) {
                    //e.printStackTrace();
                }

            }
        }
    }

    private static class ThreadPop extends Thread{
        BlockingQueueLC<Integer> bq;

        public ThreadPop(BlockingQueueLC<Integer> bq) {
            this.bq = bq;
        }
        @Override
        public void run() {
            while(true){
                try {
                    Thread.sleep(1000);
                    System.out.println(Thread.currentThread().getName()+" will pop.....");
                    Integer i = bq.dequeue();
                    System.out.println(" i="+i.intValue()+" alread pop");
                } catch (InterruptedException e) {
                    //e.printStackTrace();
                }
            }

        }
    }
}
運行結果
 i=20 will push
Pop will pop.....
 i=20 alread pop
 i=19 will push
 i=18 will push
Pop will pop.....
 i=19 alread pop
 i=17 will push
 i=16 will push
Pop will pop.....
 i=18 alread pop
 i=15 will push
 i=14 will push
Pop will pop.....
 i=17 alread pop
 i=13 will push
 i=12 will push
Pop will pop.....
 i=16 alread pop
 i=11 will push
 i=10 will push
Pop will pop.....
 i=15 alread pop
 i=9 will push
 i=8 will push
Pop will pop.....
 i=14 alread pop
 i=7 will push
 i=6 will push
Pop will pop.....
 i=13 alread pop
 i=5 will push
 i=4 will push
Pop will pop.....
 i=12 alread pop
 i=3 will push
 i=2 will push
Pop will pop.....
 i=11 alread pop
 i=1 will push
Pop will pop.....
 i=10 alread pop
Pop will pop.....
 i=9 alread pop
Pop will pop.....
 i=8 alread pop
Pop will pop.....
 i=7 alread pop
Pop will pop.....
 i=6 alread pop
Pop will pop.....
 i=5 alread pop
Pop will pop.....
 i=4 alread pop
Pop will pop.....
 i=3 alread pop
Pop will pop.....
 i=2 alread pop
Pop will pop.....
 i=1 alread pop
Pop will pop.....

 

什麼是AbstractQueuedSynchronizer?爲何咱們要分析它?

抽象隊列同步器,提供JDK中許多同步需求

 

AQS的基本使用方法

同步器的主要使用方式是繼承,子類經過繼承同步器並實現它的抽象方法來管理同步狀態。可重寫的方法有:

tryAcquire  獨佔鎖獲取

tryRelease   獨佔鎖釋放

tryAcquireShared  共享鎖獲取

tryReleaseShared  共享鎖釋放

isHeldExclusively 快速判斷被線程獨佔

同步器的設計是基於模板方法模式, 使用者須要繼承同步器並重寫指定的方法,隨後將同步器組合在自定義同步組件的實現中,並調用同步器提供的模板方法,而這些模板方法將會調用使用者重寫的方法。

對同步狀態進行更改,這時就須要使用同步器提供的3個方法來進行操做。

    getState() 獲取同步狀態

    setState 設置同步狀態

    compareAndSetState 原子的設置同步狀態

 

public class TestMyLock {
	public void test() {
		final Lock lock = new ReentrantLock();
        //final Lock lock = new SingleLock();
		class Worker extends Thread {
			public void run() {
				while (true) {
					lock.lock();
					try {
						SleepUtils.second(1);
						System.out.println(Thread.currentThread().getName());
						SleepUtils.second(1);
					} finally {
						lock.unlock();
					}
					SleepUtils.second(2);
				}
			}
		}
		// 啓動10個子線程
		for (int i = 0; i < 10; i++) {
			Worker w = new Worker();
			// w.setDaemon(true);
			w.start();
		}
		// 主線程每隔1秒換行
		for (int i = 0; i < 10; i++) {
			SleepUtils.second(1);
			System.out.println();
		}
	}

	public static void main(String[] args) {
		TestMyLock testMyLock = new TestMyLock();
		testMyLock.test();
	}
}
運行結果
Thread-0


Thread-1


Thread-2


Thread-1


Thread-2


Thread-1
Thread-3
Thread-1
Thread-4
Thread-5
Thread-6
Thread-5
Thread-6
Thread-5
Thread-7
Thread-8
Thread-9
Thread-8
Thread-0
Thread-8
/**
 * AQS獨佔鎖實現
 */
public class SingleLock implements Lock {

    static class Sync extends AbstractQueuedSynchronizer{

        //  獨佔鎖獲取
        public boolean tryAcquire(int arg){
            if (compareAndSetState(0,1)){
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        //獨佔鎖釋放
        public boolean tryRelease(int arg){
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        //是否處於佔用狀態
        public boolean isHeldExclusively(){
            return getState()  == 1;
        }

        Condition newCondition(){
            return new ConditionObject();
        }

    }

    private final Sync sync = new Sync();

    @Override
    public void lock() {
        sync.acquire(1);

    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);

    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1,unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);

    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

瞭解LockSupport

LockSupport定義了一組的公共靜態方法,這些方法提供了最基本的線程阻塞和喚醒功能,而LockSupport也成爲構建同步組件的基礎工具。LockSupport定義了一組以park開頭的方法用來阻塞當前線程,以及unpark(Thread thread)方法來喚醒一個被阻塞的線程。

同步隊列

同步器依賴內部的同步隊列(一個FIFO雙向隊列)來完成同步狀態的管理,當前線程獲取同步狀態失敗時,同步器會將當前線程以及等待狀態等信息構形成爲一個節點(Node)並將其加入同步隊列。同步器擁有首節點(head)和尾節點(tail),沒有成功獲取同步狀態的線程將會成爲節點加入該隊列的尾部。

同步隊列的基本結構

節點加入到同步隊列

首節點的設置

獨佔式同步狀態獲取與釋放

經過調用同步器的acquire(int arg)方法能夠獲取同步狀態,其主要邏輯是:首先調用自定義同步器實現的tryAcquire(int arg)方法,該方法保證線程安全的獲取同步狀態,若是同步狀態獲取失敗,則構造同步節點(獨佔式Node.EXCLUSIVE,同一時刻只能有一個線程成功獲取同步狀態)並經過addWaiter(Node node)方法將該節點加入到同步隊列的尾部,最後調用acquireQueued(Node node,int arg)方法,使得該節點以「死循環」的方式獲取同步狀態。若是獲取不到則阻塞節點中的線程,而被阻塞線程的喚醒主要依靠前驅節點的出隊或阻塞線程被中斷來實現。

 

共享式同步狀態獲取與釋放

共享式獲取與獨佔式獲取最主要的區別在於同一時刻可否有多個線程同時獲取到同步狀態。在acquireShared(int arg)方法中,同步器調用tryAcquireShared(int arg)方法嘗試獲取同步狀態,tryAcquireShared(int arg)方法返回值爲int類型,當返回值大於等於0時,表示可以獲取到同步狀態。所以,在共享式獲取的自旋過程當中,成功獲取到同步狀態並退出自旋的條件就是tryAcquireShared(int arg)方法返回值大於等於0。

來實現一個共享式的同步工具類

/**
 * 共享鎖
 */
public class TwoLock implements Lock {

    static class Sync extends AbstractQueuedSynchronizer {
        Sync(int count){
            setState(count);
        }

        //  共享鎖獲取
        public int tryAcquireShared  (int arg){
            for(;;){
                int current = getState();
                int newCount = current - arg;
                if(newCount<0||compareAndSetState(current,newCount)){
                    return newCount;
                }
            }
        }

        //共享鎖鎖釋放
        public boolean tryReleaseShared  (int arg){
            for(;;){
                int current = getState();
                int newCount = current + arg;
                if(compareAndSetState(current,newCount)){
                    return true;
                }
            }
        }

        Condition newCondition(){
            return new ConditionObject();
        }

    }

    private final Sync sync = new Sync(2);

    @Override
    public void lock() {
        sync.acquireShared(1);

    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);

    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquireShared(1)>=0;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireSharedNanos(1,unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.releaseShared(1);

    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}
public class TestMyLock {
	public void test() {
		final Lock lock = new TwoLock();
		class Worker extends Thread {
			public void run() {
				while (true) {
					lock.lock();
					try {
						SleepUtils.second(1);
						System.out.println(Thread.currentThread().getName());
						SleepUtils.second(1);
					} finally {
						lock.unlock();
					}
					SleepUtils.second(2);
				}
			}
		}
		// 啓動10個子線程
		for (int i = 0; i < 10; i++) {
			Worker w = new Worker();
			// w.setDaemon(true);
			w.start();
		}
		// 主線程每隔1秒換行
		for (int i = 0; i < 10; i++) {
			SleepUtils.second(1);
			System.out.println();
		}
	}

	public static void main(String[] args) {
		TestMyLock testMyLock = new TestMyLock();
		testMyLock.test();
	}
}
運行結果
Thread-0
Thread-1



Thread-2
Thread-3


Thread-5
Thread-4

Thread-6

Thread-3


Thread-8
Thread-7

Thread-3
Thread-9
Thread-8
Thread-0
Thread-3

瞭解ReentrantLock的實現

重進入是指任意線程在獲取到鎖以後可以再次獲取該鎖而不會被鎖所阻塞,該特性的實現須要解決如下兩個問題。

1)線程再次獲取鎖。鎖須要去識別獲取鎖的線程是否爲當前佔據鎖的線程,若是是,則再次成功獲取。

2)鎖的最終釋放。

nonfairTryAcquire方法增長了再次獲取同步狀態的處理邏輯:經過判斷當前線程是否爲獲取鎖的線程來決定獲取操做是否成功,若是是獲取鎖的線程再次請求,則將同步狀態值進行增長並返回true,表示獲取同步狀態成功。同步狀態表示鎖被一個線程重複獲取的次數。

若是該鎖被獲取了n次,那麼前(n-1)次tryRelease(int releases)方法必須返回false,而只有同

步狀態徹底釋放了,才能返回true。能夠看到,該方法將同步狀態是否爲0做爲最終釋放的條件,當同步狀態爲0時,將佔有線程設置爲null,並返回true,表示釋放成功。

 

瞭解ReentrantReadWriteLock的實現

讀寫鎖的自定義同步器須要在同步狀態(一個整型變量)上維護多個讀線程和一個寫線程的狀態,使得該狀態的設計成爲讀寫鎖實現的關鍵。若是在一個整型變量上維護多種狀態,就必定須要「按位切割使用」這個變量,讀寫鎖將變量切分紅了兩個部分,高16位表示讀,低16位表示寫。

讀狀態是全部線程獲取讀鎖次數的總和,而每一個線程各自獲取讀鎖的次數只能選擇保存在ThreadLocal中,由線程自身維護。

瞭解Condition的實現

等待隊列是一個FIFO的隊列,在隊列中的每一個節點都包含了一個線程引用,該線程就是在Condition對象上等待的線程,若是一個線程調用了Condition.await()方法,那麼該線程將會釋放鎖、構形成節點加入等待隊列並進入等待狀態。

一個Condition包含一個等待隊列,新增節點只須要將原有的尾節點nextWaiter指向它,而且更新尾節點便可。

調用Condition的signal()方法,將會喚醒在等待隊列中等待時間最長的節點(首節點),在喚醒節點以前,會將節點移到同步隊列中。

調用該方法的前置條件是當前線程必須獲取了鎖,能夠看到signal()方法進行了isHeldExclusively()檢查,也就是當前線程必須是獲取了鎖的線程。接着獲取等待隊列的首節點,將其移動到同步隊列並使用LockSupport喚醒節點中的線程。

經過調用同步器的enq(Node node)方法,等待隊列中的頭節點線程安全地移動到同步隊列。

Condition的signalAll()方法,至關於對等待隊列中的每一個節點均執行一次signal()方法,效果就是將等待隊列中全部節點所有移動到同步隊列中,並喚醒每一個節點的線程。

相關文章
相關標籤/搜索