#Java 併發編程(一)數據庫
##同步容器 1.Vector,Hashtable。 實現線程安全的方式是:將它們的狀態封裝起來,並對每一個共有方法進行同步,使得每次只有一個線程能訪問容器的狀態。使用了Java監視器模式。編程
2.Vector代碼分析 //根據下標獲取數據,都是使用synchronized實現同步 public synchronized E get(int index) { if (index >= elementCount) throw new ArrayIndexOutOfBoundsException(index); return elementData(index); }設計模式
//添加數據 public synchronized boolean add(E e) { modCount++; ensureCapacityHelper(elementCount + 1); elementData[elementCount++] = e; return true; } //擴容 private void grow(int minCapacity) { // overflow-conscious code int oldCapacity = elementData.length; int newCapacity = oldCapacity + ((capacityIncrement > 0) ? capacityIncrement : oldCapacity); if (newCapacity - minCapacity < 0) newCapacity = minCapacity; if (newCapacity - MAX_ARRAY_SIZE > 0) newCapacity = hugeCapacity(minCapacity); elementData = Arrays.copyOf(elementData, newCapacity); }
3.Hashtable代碼分析數組
//使用synchronized實現同步 public synchronized V get(Object key) { Entry<?,?> tab[] = table; int hash = key.hashCode(); int index = (hash & 0x7FFFFFFF) % tab.length; for (Entry<?,?> e = tab[index] ; e != null ; e = e.next) { if ((e.hash == hash) && e.key.equals(key)) { return (V)e.value; } } return null; } //添加元素 public synchronized V put(K key, V value) { Entry<?,?> tab[] = table; int hash = key.hashCode(); int index = (hash & 0x7FFFFFFF) % tab.length; @SuppressWarnings("unchecked") //key值存在於數組中 Entry<K,V> entry = (Entry<K,V>)tab[index]; for(; entry != null ; entry = entry.next) { if ((entry.hash == hash) && entry.key.equals(key)) { V old = entry.value; entry.value = value; return old; } } //key值不存在則進行添加 addEntry(hash, key, value, index); return null; } private void addEntry(int hash, K key, V value, int index) { modCount++; Entry<?,?> tab[] = table; //若是超過了閾值 if (count >= threshold) { rehash(); tab = table; hash = key.hashCode(); index = (hash & 0x7FFFFFFF) % tab.length; } //添加新元素 @SuppressWarnings("unchecked") Entry<K,V> e = (Entry<K,V>) tab[index]; tab[index] = new Entry<>(hash, key, value, e); count++; }
##併發容器 1.JDK5.0提供了多種併發容器來改進同步容器的性能。同步容器將全部對容器狀態的訪問都串行化,以實現他們的線程安全性。代價是嚴重下降併發性。併發容器是針對多個線程併發設計的。緩存
ConcurrentHashMap用於代替基於散列的同步Map。安全
CopyOnWriteArrayList用於在遍歷操做爲主要操做的狀況下代替同步的List。併發
2.Java8 ConcurrentHashMap 代碼分析框架
public V put(K key, V value) { return putVal(key, value, false); } final V putVal(K key, V value, boolean onlyIfAbsent) { //key,value不能爲空 if (key == null || value == null) throw new NullPointerException(); //使Key值分散更均勻 int hash = spread(key.hashCode()); //記錄Node數量 int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //延遲初始化 if (tab == null || (n = tab.length) == 0) tab = initTable(); //此bucket爲空,不用鎖,使用cas添加 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; }
3.CopyOnWriteArrayList適合用在「讀多,寫少」的「併發」應用中,換句話說,它適合使用在讀操做遠遠大於寫操做的場景裏,好比緩存。它不存在「擴容」的概念,每次寫操做(add or remove)都要copy一個副本,在副本的基礎上修改後改變array引用,因此稱爲「CopyOnWrite」,所以在寫操做是加鎖,而且對整個list的copy操做時至關耗時的,過多的寫操做不推薦使用該存儲結構。ide
Java8 CopyOnWriteArrayList 代碼分析函數
//get 讀操做 private E get(Object[] a, int index) { return (E) a[index]; } //add 寫操做 public boolean add(E e) { //可重入鎖 final ReentrantLock lock = this.lock; lock.lock(); try { //得到數組元素對象 Object[] elements = getArray(); int len = elements.length; //數組的copy Object[] newElements = Arrays.copyOf(elements, len + 1); //添加元素 newElements[len] = e; setArray(newElements); return true; } finally { //釋放鎖 lock.unlock(); } }
##阻塞隊列和生產者-消費者模式
1.阻塞隊列提供了可供阻塞的put和take方法,以及支持定時的offer和poll方法,若是隊列已經滿了,那麼put 方法將一直阻塞直到有空間可用;若是隊列爲空,那麼take 方法將會阻塞直到有元素可用。隊列能夠是有界的也能夠是無界的,無界隊列永遠不會被充滿,所以無界隊列上的put方法也永遠不會阻塞。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器裏拿元素。BlockingQueue簡化了生產者-消費者設計模式的實現過程,他支持任意數量的生產者和消費者。一張最多見的生產者-消費者設計模式就是線程池和工做隊列的組合,在Executor任務執行框架中就體現了這種模式。
2.生產者-消費者模式代碼示例
public class Client { public static class People{ BlockingQueue<Object> peBlockingQueue=new ArrayBlockingQueue<Object>(3); public void putPeople(Object object){ try { //若是此時隊列已經滿了將阻塞。 peBlockingQueue.put(object); } catch (Exception e) { e.printStackTrace(); } System.out.println("放入人員"); } public Object takePeople(){ Object object=null; try { //若是此時隊列爲空將阻塞。 object=peBlockingQueue.take(); } catch (Exception e) { e.printStackTrace(); } System.out.println("取出人員"); return object; } } static class putThread extends Thread{ private People people; private Object object=new Object(); public putThread(People people){ this.people=people; } @Override public void run() { people.putPeople(object); } } static class takeThread extends Thread{ private People people; public takeThread(People people){ this.people=people; } @Override public void run() { people.takePeople(); } } public static void main(String args[]){ People people=new People(); for (int i = 0; i < 5; i++) { new Thread(new putThread(people)).start(); } for (int i = 0; i < 5; i++) { new Thread(new takeThread(people)).start(); } } }
輸出結果
放入人員
放入人員
放入人員
取出人員
放入人員
放入人員
取出人員
取出人員
取出人員
取出人員
3.Java 裏的阻塞隊列
ArrayBlockingQueue :一個由數組結構組成的有界阻塞隊列。
LinkedBlockingQueue :一個由鏈表結構組成的有界阻塞隊列。默認大小爲Integer.MAX_VALUE。
PriorityBlockingQueue :一個支持優先級排序的無界阻塞隊列。
DelayQueue:基於PriorityQueue,一種延時阻塞隊列,DelayQueue中的元素只有當其指定的延遲時間到了,纔可以從隊列中獲取到該元素。DelayQueue也是一個無界隊列,所以往隊列中插入數據的操做(生產者)永遠不會被阻塞,而只有獲取數據的操做(消費者)纔會被阻塞。
ArrayBlockingQueue是一個用數組實現的有界阻塞隊列。此隊列按照先進先出(FIFO)的原則對元素進行排序。默認狀況下不保證訪問者公平的訪問隊列,所謂公平訪問隊列是指阻塞的全部生產者線程或消費者線程,當隊列可用時,能夠按照阻塞的前後順序訪問隊列,即先阻塞的生產者線程,能夠先往隊列裏插入元素,先阻塞的消費者線程,能夠先從隊列裏獲取元素。一般狀況下爲了保證公平性會下降吞吐量。建立公平的隊列:
BlockingQueue<Object> peBlockingQueue=new ArrayBlockingQueue<Object>(3,true);
ArrayBlockingQueue代碼分析:
//可重入鎖 final ReentrantLock lock; //等待條件 private final Condition notEmpty; //等待條件 private final Condition notFull; //put 方法 public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //若是計數等於數組長度 while (count == items.length) //非滿阻塞 notFull.await(); enqueue(e); } finally { lock.unlock(); } } private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; //非空喚醒 notEmpty.signal(); } public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; //獲取可中斷鎖 lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(e); return true; } finally { lock.unlock(); } }
DelayQueue是一個支持延時獲取元素的無界阻塞隊列。隊列使用PriorityQueue來實現。隊列中的元素必須實現Delayed接口,在建立元素時能夠指定多久才能從隊列中獲取當前元素。只有在延遲期滿時才能從隊列中提取元素。
咱們能夠將DelayQueue運用在如下應用場景:
緩存系統的設計。能夠用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。
定時任務調度。使用DelayQueue保存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行,好比TimerQueue就是使用DelayQueue實現的。
關閉空閒鏈接。
##阻塞方法與中斷方法
##同步工具類
信號量
Semaphore 中管理着一組虛擬的許可,許可的初始數量可經過構造函數來指定,在執行操做時能夠首先得到許可,並在使用以後釋放許可,若是沒有許可,那麼acquire將阻塞直到有許可。
Semaphore 能夠用於實現資源池,例如數據庫鏈接池。咱們能夠構造一個固定長度的資源池,當池爲空,請求資源將會阻塞直到有資源。也可使用Semaphore將任何一種容器變成有界阻塞容器。
public class TestSemaphore { public static class SemaphoreDemo { private ReentrantLock lock = new ReentrantLock(); private Semaphore semaphore; private final ArrayList<Object> resourceList = new ArrayList<Object>(); public SemaphoreDemo(ArrayList<Object> list) { this.resourceList.addAll(list); semaphore = new Semaphore(3); } // 獲取資源 public Object acquire() throws InterruptedException { semaphore.acquire(); lock.lock(); try { return resourceList.get(resourceList.size() - 1); } catch (Exception InterruptedException) { } finally { lock.unlock(); } return null; } // 釋放資源 public void release(Object resource) { lock.lock(); try { resourceList.add(resource); } finally { lock.unlock(); } semaphore.release(); } } public static void main(String[] args) { ArrayList<Object> resourceList = new ArrayList(); resourceList.add("Resource1"); resourceList.add("Resource2"); final SemaphoreDemo semaphoreDemo = new SemaphoreDemo(resourceList); Runnable task = new Runnable() { public void run() { Object reObject = null; try { // 獲取資源 reObject = semaphoreDemo.acquire(); System.out.println(Thread.currentThread().getName() + ":" + reObject); //休眠 Thread.sleep(1500); System.out.println(Thread.currentThread().getName() + "!" + reObject); } catch (Exception e) { e.printStackTrace(); } finally { semaphoreDemo.release(reObject); } } }; ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 9; i++) { executorService.submit(task); } executorService.shutdown(); }
}
代碼輸出
pool-1-thread-1:Resource2
pool-1-thread-3:Resource2
pool-1-thread-2:Resource2
pool-1-thread-1!Resource2
pool-1-thread-3!Resource2
pool-1-thread-2!Resource2
pool-1-thread-4:Resource2
pool-1-thread-5:Resource2
pool-1-thread-5!Resource2
pool-1-thread-4!Resource2
閉鎖
閉鎖是一種同步工具類,能夠延遲線程的進度直到其到達終止狀態。閉鎖能夠用來確保某些活動直到其餘活動都完成之後才繼續執行。
CountDownLatch是一種靈活的閉鎖實現,可使一個或多個線程等待一組事件發生。CountDownLatch有一個正數計數器,countDown方法對計數器作減操做,await方法等待計數器達到0。全部await的線程都會阻塞直到計數器爲0或者等待線程中斷或者超時。
CountDownLatch源碼
//初始化時給定計數器大小 public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } //遞減鎖存器的計數,若是計數到達零,則釋放全部等待的線程。 public void countDown() { sync.releaseShared(1); } //使當前線程阻塞直到計數器爲零 public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
CountDownLatch使用示例
public class TestCountDownLatch { public static CountDownLatch latch=null; public static void main(String args[]) throws InterruptedException{ try { latch= new CountDownLatch(5); for (int i = 0; i < 5; i++) { new TestThread().start(); } latch.await(); System.out.println("5個線程已經完成"); } catch (Exception e) { }finally { } } static class TestThread extends Thread{ @Override public void run() { try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + " sleep 1000ms."); latch.countDown(); }catch (InterruptedException e) { e.printStackTrace(); } } } }
輸出結果
Thread-1 sleep 1000ms.
Thread-4 sleep 1000ms.
Thread-3 sleep 1000ms.
Thread-0 sleep 1000ms.
Thread-2 sleep 1000ms.
5個線程已經完成