以下面的複合操做就有可能不安全: java
/** * getLast, rmLast沒有同步,可能致使lastIndex錯亂 */ @NotThreadSafe public class UnsafeVector<E> { private final Vector<E> v = new Vector<>(); public E getLast(){ int lastIndex = v.size()-1; return v.get(lastIndex); } public E rmLast(){ int lastIndex = v.size()-1; return v.remove(lastIndex); } }
/** * 經過客戶端加鎖實現線程安全 */ @ThreadSafe public class SafeVector<E> { private final Vector<E> v = new Vector<>(); public E getLast(){ synchronized (v) { int lastIndex = v.size()-1; return v.get(lastIndex); } } public E rmLast(){ synchronized(v){ int lastIndex = v.size()-1; return v.remove(lastIndex); } } }
/** * 下面將會拋出:ConcurrentModificationException * 可經過在迭代前鎖住vector, 但這樣會損失併發性能 */ @NotThreadSafe public class ModificationExceptionVector { public static void main(String[] args) { Vector<Person> vector = new Vector<>(); for (int i=0; i<10; i++){ vector.add(new Person(i, "person" + i)); } new Thread(new IterateThread(vector)).start(); new Thread(new RemoveThread(vector)).start(); } private static class RemoveThread implements Runnable{ private Vector<Person> v; private Random ran = new Random(); public RemoveThread(Vector<Person> v) { this.v = v; } @Override public void run() { try { // do 100 times' remove for (int i=0 ;i<5; i++){ v.remove(ran.nextInt(v.size())); Thread.sleep(500); } } catch (InterruptedException e) { } } } private static class IterateThread implements Runnable{ private Vector<Person> v; public IterateThread(Vector<Person> v) { this.v = v; } @Override public void run() { try { Iterator<Person> it = v.iterator(); while (it.hasNext()){ System.out.println(it.next()); Thread.sleep(500); } } catch (InterruptedException e) { } } } }
以前有一篇文章介紹過ConcurrentHashMap: http://my.oschina.net/indestiny/blog/209458 編程
public interface ConcurrentMap<K, V> extends Map<K, V> { V putIfAbsent(K key, V value); boolean remove(Object key, Object value); boolean replace(K key, V oldValue, V newValue); V replace(K key, V value); }
/** * 恢復中斷狀態以免屏蔽中斷 */ public class TaskRunnable implements Runnable { private final BlockingQueue<Task> queue; public TaskRunnable(BlockingQueue<Task> queue) { this.queue = queue; } @Override public void run() { try { doTask(queue.take()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } ... }
一個計算多個線程啓動到結束耗時的例子: 緩存
/** * 在計時測試中使用CountDownLatch來啓動和中止線程 */ public class TestHarness { public long timeTasks(int nThreads, final Runnable task) throws InterruptedException{ final CountDownLatch startGate = new CountDownLatch(1); //全部線程同時開始執行task的閥門 final CountDownLatch endGate = new CountDownLatch(nThreads); //全部線程結束的閥門 for (int i=0; i<nThreads; i++){ Thread t = new Thread(){ @Override public void run() { try { startGate.await(); //等待startGate值減爲0 try { task.run(); } finally{ endGate.countDown(); //一個線程運行結束,值減1 } } catch (InterruptedException e) { e.printStackTrace(); } } }; t.start(); } long start = System.nanoTime(); startGate.countDown(); //全部線程開始執行task endGate.await(); //等待全部線程執行結束 long end = System.nanoTime(); return end - start; } }
/** * 使用FutureTask來提早加載稍後須要的數據 */ public class Preloader { private final FutureTask<ProductInfo> future = new FutureTask<>( new Callable<ProductInfo>() { @Override public ProductInfo call() throws Exception { return loadProductInfo(); } }); private final Thread thread = new Thread(future); public void start() { thread.start(); } private ProductInfo loadProductInfo() { // TODO Auto-generated method stub return null; } public ProductInfo get() throws InterruptedException { try { return future.get(); } catch (ExecutionException e) { // exception handle return null; } } }
/** * 使用Semaphore爲容器設置邊界 */ public class BoundedHashSet<T> { private final Set<T> set; private final Semaphore sem; public BoundedHashSet(int bound){ this.set = Collections.synchronizedSet(new HashSet<T>()); sem = new Semaphore(bound); //非公平 } public boolean add(T t) throws InterruptedException{ sem.acquire(); //請求semaphore, permits-1或阻塞到permits > 0 boolean wasAdded = false; try { wasAdded = set.add(t); return wasAdded; } finally{ if (!wasAdded) //未添加成功則釋放semaphore sem.release(); } } public boolean remove(T t){ boolean wasRemoved = set.remove(t); if (wasRemoved) //刪除成功permits+1; sem.release(); return wasRemoved; } }
/** * CyclicBarrier測試 */ public class CyclicBarrierTest { public static void main(String[] args) { int threadCount = 3; CyclicBarrier barrier = new CyclicBarrier(threadCount, new Runnable() { @Override public void run() { //最後一個線程到達柵欄時觸發 System.out.println("all have finished."); } }); for (int i=0 ;i<threadCount; i++){ new Thread(new WorkThread(barrier)).start(); } } private static class WorkThread implements Runnable{ private CyclicBarrier barrier; public WorkThread(CyclicBarrier barrier) { this.barrier = barrier; } @Override public void run() { System.out.println( Thread.currentThread().getId() + " Working..."); try { barrier.await(); //當前線程阻塞直到最後一個線程到達 System.out.println(Thread.currentThread().getId() + " awaiting finished."); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } } }
/** * 經過Exchanger交換2個線程數據 */ public class ExchangerTest { public static void main(String[] args) { Exchanger<String> exchanger = new Exchanger<>(); ExchangerRunnable exchangerRunnable1 = new ExchangerRunnable(exchanger, "A"); ExchangerRunnable exchangerRunnable2 = new ExchangerRunnable(exchanger, "B"); new Thread(exchangerRunnable1).start(); new Thread(exchangerRunnable2).start(); } private static class ExchangerRunnable implements Runnable{ private Exchanger<String> exchanger; private String data; public ExchangerRunnable(Exchanger<String> exchanger, String data){ this.exchanger = exchanger; this.data = data; } @Override public void run() { try { String beforeData = this.data; //阻塞直到另外一個線程調用exchanger.exchange(), 交換數據 this.data = this.exchanger.exchange(this.data); System.out.println( Thread.currentThread().getName() + " exchanged " + beforeData + " for " + this.data ); } catch (InterruptedException e) { e.printStackTrace(); } } } }
/** * 計算緩存器 * 內部使用HashMap實現計算結果的緩存 * 經過外部接口同步操做實現線程安全 * 但有可能因爲計算時間過長致使性能低下 */ public class Memoizer1<A, V> implements Computable<A, V> { private final Map<A, V> cache = new HashMap<A, V>(); private final Computable<A, V> c; public Memoizer1(Computable<A, V> c) { this.c = c; } @Override public synchronized V compute(A key) throws InterruptedException { V result = cache.get(key); if (result == null){ result = c.compute(key); //計算 cache.put(key, result); } return result; } }
/** * 計算緩存器 * 經過ConcurrentHashMap代替HashMap, 提高併發性能 * 但這樣有可能多個線程同時調用compute方法, * 因爲計算過程當中尚未結果,有可能致使多個線程計算一樣的值 */ public class Memoizer2<A, V> implements Computable<A, V> { private final Map<A, V> cache = new ConcurrentHashMap<A, V>(); private final Computable<A, V> c; public Memoizer2(Computable<A, V> c) { this.c = c; } @Override public V compute(A key) throws InterruptedException { V result = cache.get(key); if (result == null){ result = c.compute(key); //計算 cache.put(key, result); } return result; } }
/** * 計算緩存器 * 經過FutureTask代替map中的Value * 這樣能夠在計算結果計算完成,就當即返回, * 但仍然有可能重複計算,由於存在非原子的複合操做"若沒有則添加": if (f == null){...} */ public class Memoizer3<A, V> implements Computable<A, V> { private final Map<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>(); private final Computable<A, V> c; public Memoizer3(Computable<A, V> c) { this.c = c; } @Override public V compute(final A key) throws InterruptedException { Future<V> f = cache.get(key); if (f == null){ Callable<V> computeTask = new Callable<V>() { @Override public V call() throws Exception { return c.compute(key); } }; FutureTask<V> ft = new FutureTask<>(computeTask); f = ft; cache.put(key, ft); ft.run(); //執行計算 } try { return f.get(); //獲取計算結果 } catch (ExecutionException e) { //do exception handle } return null; } }
/** * 計算緩存器 * 經過ConcurrentHashMap.putIfAbsent避免重複任務 */ public class Memoizer<A, V> implements Computable<A, V> { private final ConcurrentHashMap<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>(); private final Computable<A, V> c; public Memoizer(Computable<A, V> c) { this.c = c; } @Override public V compute(final A key) throws InterruptedException { while(true){ Future<V> f = cache.get(key); if (f == null){ Callable<V> computeTask = new Callable<V>() { @Override public V call() throws Exception { return c.compute(key); } }; FutureTask<V> ft = new FutureTask<>(computeTask); f = cache.putIfAbsent(key, ft); //該方法不會對相同key的值進行覆蓋,這樣避免了相同key的任務被計算 if (f == null) ft.run(); //執行計算 } try { return f.get(); //獲取計算結果 } catch (CancellationException e){ cache.remove(key); //計算取消則移除對應的計算任務key } catch (ExecutionException e) { //do exception handle } } } }一,二,三,四就講述了java併發編程的基礎知識。
不可變對象能極大地下降併發編程的複雜性。它們更爲簡單且安全,能夠任意共享而無須使用加鎖或保護性複製等機制。 安全
不吝指正。 併發