java併發編程(四): 基礎構建模塊

基礎構建模塊:

  • 委託是建立線程安全類的一個最有效的策略:只需讓現有的線程安全類管理全部的狀態便可。

同步容器類:

  • 同步容器類包括:Vector, HashtableCollections.synchronizedXxx()方法產生的實例。
  • 同步容器類是線程安全的,但在某些狀況下須要客戶端加鎖保護來實現一些複合操做
  • 常見覆合操做:迭代跳轉條件運算,如"若沒有則添加"。

以下面的複合操做就有可能不安全: java

/**
 * getLast, rmLast沒有同步,可能致使lastIndex錯亂
 */
@NotThreadSafe
public class UnsafeVector<E> {
	private final Vector<E> v = new Vector<>();
	
	public E getLast(){
		int lastIndex = v.size()-1;
		return v.get(lastIndex);
	}
	
	public E rmLast(){
		int lastIndex = v.size()-1;
		return v.remove(lastIndex);
	}
}
  • 因爲同步容器類要遵照同步策略,即支持客戶端加鎖,上面代碼能夠經過客戶端加鎖實現線程安全:
/**
 * 經過客戶端加鎖實現線程安全
 */
@ThreadSafe
public class SafeVector<E> {
	private final Vector<E> v = new Vector<>();
	
	public E getLast(){
		synchronized (v) {
			int lastIndex = v.size()-1;
			return v.get(lastIndex);
		}
	}
	
	public E rmLast(){
		synchronized(v){
			int lastIndex = v.size()-1;
			return v.remove(lastIndex);
		}
	}
}

迭代器與ConcurrentModificationException:

  • 容器在迭代過程當中被修改時 ,就會拋出一個ConcurrentModificationException異常。
/**
 * 下面將會拋出:ConcurrentModificationException
 * 可經過在迭代前鎖住vector, 但這樣會損失併發性能
 */
@NotThreadSafe
public class ModificationExceptionVector {
	public static void main(String[] args) {
		Vector<Person> vector = new Vector<>();
		for (int i=0; i<10; i++){
			vector.add(new Person(i, "person" + i));
		}
		new Thread(new IterateThread(vector)).start();
		new Thread(new RemoveThread(vector)).start();
	}
	
	private static class RemoveThread implements Runnable{
		private Vector<Person> v;
		private Random ran = new Random();
		public RemoveThread(Vector<Person> v) {
			this.v = v;
		}
		
		@Override
		public void run() {
			try {
				// do 100 times' remove
				for (int i=0 ;i<5; i++){
					v.remove(ran.nextInt(v.size()));
					Thread.sleep(500);
				}
			} catch (InterruptedException e) {
			}
		}
	}
	
	private static class IterateThread implements Runnable{
		private Vector<Person> v;
		
		public IterateThread(Vector<Person> v) {
			this.v = v;
		}
		
		@Override
		public void run() {
			try {
				Iterator<Person> it = v.iterator();
				while (it.hasNext()){
					System.out.println(it.next());
					Thread.sleep(500);
				}
			} catch (InterruptedException e) {
			}
		}
	}
}

隱藏迭代器:

  • 正如封裝對象的狀態有助於維持不變性條件同樣,封裝對象的同步機制一樣有助於確保實施同步策略。
  • 一些隱藏的迭代操做:hashCode, equals, containsAll, removeAll, retainAll等。

併發容器:

  • 經過併發容器來代替同步容器,能夠極大地提升伸縮性下降風險

ConrrentHashMap:

以前有一篇文章介紹過ConcurrentHashMap: http://my.oschina.net/indestiny/blog/209458 編程

  • ConcurrentHashMap使用一種粒度更細的加鎖機制來實現大程度的共享,這種機制稱爲分段鎖(Lock Striping);
  • ConcurrentHashMap的迭代器不會拋出ConcurrentModificationException,所以不須要在迭代過程當中加鎖,由於其返回的迭代器具備弱一致性,而非"及時失敗"
  • ConcurrentHashMap對一些操做進行了弱化,如size(計算的是近似值,而不是精確值), isEmpty等。

額外的原子Map操做:

  • ConcurrentMap聲明瞭一些原子操做接口:
public interface ConcurrentMap<K, V> extends Map<K, V> {    
    V putIfAbsent(K key, V value);
    boolean remove(Object key, Object value);
    boolean replace(K key, V oldValue, V newValue);
    V replace(K key, V value);
}

CopyOnWriteArrayList:

  • CopyOnWriteArrayList比同步List具備更高的併發性能,並且在迭代時不須要加鎖或複製
  • 其安全性在於:只要發佈一個事實不可變的對象,那麼在訪問該對象時就不須要進一步同步;在每次修改都會建立一個新的容器副本,從而實現可變性
  • 僅當迭代操做遠遠多於修改操做時,才應該使用"寫入時複製"容器。好比事件通知系統,對監聽器列表中的每一個監聽器進行通知。

阻塞隊列和生產者--消費者模式:

  • 在構建高可靠的應用程序時,有界隊列是一種強大的資源管理工具;它們可以意志或防止產生過多的工做項,使應用程序在負荷過載的狀況下變得更加健壯。
  • BlockingQueue實現:LinkedBlockingQueue, ArrayBlockingQueue, PriorityBlockingQueue, SynchronousQueue;

串行線程封閉:

  • 對於可變對象生產者--消費者這種設計與阻塞隊列一塊兒,促進了串行線程封閉,從而將對象全部權從生產者交付給消費者。

雙端隊列與工做密取:

  • java6提供了雙端隊列:ArrayDeque, LinkedBlockingDeque;
  • 雙端隊列適用於另外一種模式:工做密取,每一個消費者有各自的雙端隊列,這種模式很是適合既是消費者又是生產者問題。
  • 當消費者本身的雙端隊列爲空時,它會從其餘消費者隊列末尾中密取任務。

阻塞方法與中斷方法:

  • 阻塞的緣由:等待I/O操做結束等待得到一個鎖等待從Thread.sleep方法中醒來,或是等待另外一個線程的計算結果等。
  • 傳遞InterreuptedException: 拋出異常給方法調用者,或捕獲異常,作一些清理工做再拋出拋出異常。
  • 恢復中斷:有時不能拋出InterruptedException, 好比在Runnable中,則能夠恢復中斷
/**
 * 恢復中斷狀態以免屏蔽中斷
 */
public class TaskRunnable implements Runnable {
	private final BlockingQueue<Task> queue;
	public TaskRunnable(BlockingQueue<Task> queue) {
		this.queue = queue;
	}
	
	@Override
	public void run() {
		try {
			doTask(queue.take());
		} catch (InterruptedException e) {
			Thread.currentThread().interrupt();
		}
	}
       ...
}

同步工具類:

  • 任何一個對象均可以是同步工具類,java平臺提供的一些同步工具類有:Semaphore(信號量), Barrier(柵欄), Latch(閉鎖);

閉鎖:

  • 閉鎖能夠用來確保某些活動直到其餘活動都完成後才繼續執行;

一個計算多個線程啓動到結束耗時的例子: 緩存

/**
 * 在計時測試中使用CountDownLatch來啓動和中止線程
 */
public class TestHarness {
	public long timeTasks(int nThreads, final Runnable task) throws InterruptedException{
		final CountDownLatch startGate = new CountDownLatch(1); //全部線程同時開始執行task的閥門
		final CountDownLatch endGate = new CountDownLatch(nThreads); //全部線程結束的閥門
		
		for (int i=0; i<nThreads; i++){
			Thread t = new Thread(){
				@Override
				public void run() {
					try {
						startGate.await(); //等待startGate值減爲0
						try {
							task.run();
						} finally{
							endGate.countDown(); //一個線程運行結束,值減1
						}
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			};
			t.start();
		}
		long start = System.nanoTime();
		startGate.countDown(); //全部線程開始執行task
		endGate.await(); //等待全部線程執行結束
		long end = System.nanoTime();
		return end - start;
	}
}

FutureTask:

  • FutureTask也可用作閉鎖,表示一種抽象的可生成結果的計算。
/**
 * 使用FutureTask來提早加載稍後須要的數據
 */
public class Preloader {
	private final FutureTask<ProductInfo> future = new FutureTask<>(
			new Callable<ProductInfo>() {
				@Override
				public ProductInfo call() throws Exception {
					return loadProductInfo();
				}
			});
	private final Thread thread = new Thread(future);

	public void start() {
		thread.start();
	}

	private ProductInfo loadProductInfo() {
		// TODO Auto-generated method stub
		return null;
	}

	public ProductInfo get() throws InterruptedException {
		try {
			return future.get();
		} catch (ExecutionException e) {
			// exception handle
			return null;
		}
	}
}

信號量:

  • 計數信號量用來控制同時訪問某個特定資源的操做數量,或者同時執行某個制定操做的數量,也能夠用來實現某種資源池,或者對容器施加邊界
/**
 * 使用Semaphore爲容器設置邊界
 */
public class BoundedHashSet<T> {
	private final Set<T> set;
	private final Semaphore sem;
	
	public BoundedHashSet(int bound){
		this.set = Collections.synchronizedSet(new HashSet<T>());
		sem = new Semaphore(bound); //非公平
	}
	
	public boolean add(T t) throws InterruptedException{
		sem.acquire(); //請求semaphore, permits-1或阻塞到permits > 0
		boolean wasAdded = false;
		
		try {
			wasAdded = set.add(t);
			return wasAdded;
		} finally{
			if (!wasAdded) //未添加成功則釋放semaphore
				sem.release();
		}
	}
	
	public boolean remove(T t){
		boolean wasRemoved = set.remove(t);
		if (wasRemoved) //刪除成功permits+1;
			sem.release();
		return wasRemoved;
	}
}

柵欄:

  • 柵欄(Barrier)相似於閉鎖,它能阻塞一組線程直到某個事件發生。柵欄與閉鎖的關鍵區別在於,全部線程必須同時到達柵欄位置,才能繼續執行。閉鎖用於等待事件(CutDownLatch值減爲0)柵欄用於等待其餘線程
/**
 * CyclicBarrier測試
 */
public class CyclicBarrierTest {
	
	public static void main(String[] args) {
		int threadCount = 3;
		CyclicBarrier barrier =
				 new CyclicBarrier(threadCount, new Runnable() {
					@Override
					public void run() { //最後一個線程到達柵欄時觸發
						System.out.println("all have finished.");
					}
		});
		
		for (int i=0 ;i<threadCount; i++){
			new Thread(new WorkThread(barrier)).start();
		}
	}
	
	private static class WorkThread implements Runnable{
		private CyclicBarrier barrier;
		
		public WorkThread(CyclicBarrier barrier) {
			this.barrier = barrier;
		}
		
		@Override
		public void run() {
			System.out.println(
					Thread.currentThread().getId() + " Working...");
			try {
				barrier.await(); //當前線程阻塞直到最後一個線程到達
				System.out.println(Thread.currentThread().getId() + " awaiting finished.");
			} catch (InterruptedException | BrokenBarrierException e) {
				e.printStackTrace();
			}
		}
	}
}
  • 除Barrier柵欄外,還有Exchanger柵欄,它是一種兩方柵欄, 能夠實現兩個線程之間交換數據
/**
 * 經過Exchanger交換2個線程數據
 */
public class ExchangerTest {
	public static void main(String[] args) {
		Exchanger<String> exchanger = new Exchanger<>();

		ExchangerRunnable exchangerRunnable1 =
		        new ExchangerRunnable(exchanger, "A");

		ExchangerRunnable exchangerRunnable2 =
		        new ExchangerRunnable(exchanger, "B");

		new Thread(exchangerRunnable1).start();
		new Thread(exchangerRunnable2).start();
	}
	
	private static class ExchangerRunnable implements Runnable{
		private Exchanger<String> exchanger;
		private String data;
		public ExchangerRunnable(Exchanger<String> exchanger, String data){
			this.exchanger = exchanger;
			this.data = data;
		}

		@Override
		public void run() {
			try {
	            String beforeData = this.data;
	            //阻塞直到另外一個線程調用exchanger.exchange(), 交換數據
	            this.data = this.exchanger.exchange(this.data); 
	            System.out.println(
	                    Thread.currentThread().getName() +
	                    " exchanged " + beforeData + " for " + this.data
	            );
	        } catch (InterruptedException e) {
	            e.printStackTrace();
	        }
		}
	}
}

構建高效且可伸縮的結果緩存:

  • 一個簡單安全,性能低下的緩存設計:
/**
 * 計算緩存器
 * 內部使用HashMap實現計算結果的緩存
 * 經過外部接口同步操做實現線程安全
 * 但有可能因爲計算時間過長致使性能低下
 */
public class Memoizer1<A, V> implements Computable<A, V> {
	private final Map<A, V> cache = new HashMap<A, V>();
	private final Computable<A, V> c;
	
	public Memoizer1(Computable<A, V> c) {
		this.c = c;
	}
	
	@Override
	public synchronized V compute(A key) throws InterruptedException {
		V result = cache.get(key);
		if (result == null){
			result = c.compute(key); //計算
			cache.put(key, result);
		}
		return result;
	}
}
  • 經過併發容器ConcurrentHashMap代替HashMap,提高併發性能:
/**
 * 計算緩存器
 * 經過ConcurrentHashMap代替HashMap, 提高併發性能
 * 但這樣有可能多個線程同時調用compute方法,
 * 因爲計算過程當中尚未結果,有可能致使多個線程計算一樣的值
 */
public class Memoizer2<A, V> implements Computable<A, V> {
	private final Map<A, V> cache = new ConcurrentHashMap<A, V>();
	private final Computable<A, V> c;
	
	public Memoizer2(Computable<A, V> c) {
		this.c = c;
	}
	
	@Override
	public V compute(A key) throws InterruptedException {
		V result = cache.get(key);
		if (result == null){
			result = c.compute(key); //計算
			cache.put(key, result);
		}
		return result;
	}
}
  • 經過FutureTask來彌補重複結果計算問題:
/**
 * 計算緩存器
 * 經過FutureTask代替map中的Value
 * 這樣能夠在計算結果計算完成,就當即返回,
 * 但仍然有可能重複計算,由於存在非原子的複合操做"若沒有則添加": if (f == null){...}
 */
public class Memoizer3<A, V> implements Computable<A, V> {
	private final Map<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>();
	private final Computable<A, V> c;
	
	public Memoizer3(Computable<A, V> c) {
		this.c = c;
	}
	
	@Override
	public V compute(final A key) throws InterruptedException {
		Future<V> f = cache.get(key);
		if (f == null){
			Callable<V> computeTask = new Callable<V>() {
				@Override
				public V call() throws Exception {
					return c.compute(key);
				}
			};
			FutureTask<V> ft = new FutureTask<>(computeTask);
			f = ft;
			cache.put(key, ft);
			ft.run(); //執行計算
		}
		try {
			return f.get(); //獲取計算結果
		} catch (ExecutionException e) {
			//do exception handle
		}
		return null;
	}
}
  • 經過對CocurrentHashMap.putIfAbsent()對上面的問題進行修復:
/**
 * 計算緩存器
 * 經過ConcurrentHashMap.putIfAbsent避免重複任務
 */
public class Memoizer<A, V> implements Computable<A, V> {
	private final ConcurrentHashMap<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>();
	private final Computable<A, V> c;
	
	public Memoizer(Computable<A, V> c) {
		this.c = c;
	}
	
	@Override
	public V compute(final A key) throws InterruptedException {
		while(true){
			Future<V> f = cache.get(key);
			if (f == null){
				Callable<V> computeTask = new Callable<V>() {
					@Override
					public V call() throws Exception {
						return c.compute(key);
					}
				};
				FutureTask<V> ft = new FutureTask<>(computeTask);
				f = cache.putIfAbsent(key, ft); //該方法不會對相同key的值進行覆蓋,這樣避免了相同key的任務被計算
				if (f == null) ft.run(); //執行計算
			}
			try {
				return f.get(); //獲取計算結果
			} catch (CancellationException e){ 
				cache.remove(key); //計算取消則移除對應的計算任務key
			} catch (ExecutionException e) {
				//do exception handle
			}
		}
	}
}
一,二,三,四就講述了java併發編程的基礎知識。

併發技巧清單:

  • 可變狀態相當重要的。
       全部併發訪問均可以歸結爲如何協調對併發狀態的訪問,可變狀態越少,越容易確保線程安全性。
  • 儘可能將域聲明爲final類型,除非須要它們是可變的。
  • 不可變對象必定是線程安全的。

       不可變對象能極大地下降併發編程的複雜性。它們更爲簡單且安全,能夠任意共享而無須使用加鎖或保護性複製等機制。 安全

  • 封裝有助於管理複雜性。
      將數據封裝在對象中,更易於維護不變性;將同步機制封裝在對象中,更易於遵循同步策略。
  • 用鎖保護每一個可變變量
  • 當保護同一個不變性條件中的全部變量時,要使用同一個鎖。
  • 在執行複合操做期間,要持有鎖。
  • 若是從多個線程中訪問同一個可變變量時沒有同步機制,那麼程序會可能出問題。
  • 不要自行推斷不須要使用同步。
  • 設計過程當中考慮線程安全,不要在上線出問題後再作。
  • 同步策略文檔化

不吝指正。 併發

相關文章
相關標籤/搜索