Java 併發編程(一)

#Java 併發編程(一)數據庫

##同步容器 1.Vector,Hashtable。 實現線程安全的方式是:將它們的狀態封裝起來,並對每一個共有方法進行同步,使得每次只有一個線程能訪問容器的狀態。使用了Java監視器模式。編程

2.Vector代碼分析 //根據下標獲取數據,都是使用synchronized實現同步 public synchronized E get(int index) { if (index >= elementCount) throw new ArrayIndexOutOfBoundsException(index); return elementData(index); }設計模式

//添加數據
public synchronized boolean add(E e) {
    modCount++;
    ensureCapacityHelper(elementCount + 1);
    elementData[elementCount++] = e;
    return true;
}

//擴容
private void grow(int minCapacity) {
    // overflow-conscious code
    int oldCapacity = elementData.length;
    int newCapacity = oldCapacity + ((capacityIncrement > 0) ?
                                     capacityIncrement : oldCapacity);
    if (newCapacity - minCapacity < 0)
        newCapacity = minCapacity;
    if (newCapacity - MAX_ARRAY_SIZE > 0)
        newCapacity = hugeCapacity(minCapacity);
    elementData = Arrays.copyOf(elementData, newCapacity);
}

3.Hashtable代碼分析數組

//使用synchronized實現同步
public synchronized V get(Object key) {
    Entry<?,?> tab[] = table;
    int hash = key.hashCode();
    int index = (hash & 0x7FFFFFFF) % tab.length;
    for (Entry<?,?> e = tab[index] ; e != null ; e = e.next) {
        if ((e.hash == hash) && e.key.equals(key)) {
            return (V)e.value;
        }
    }
    return null;
}
//添加元素
public synchronized V put(K key, V value) {
    
    
    Entry<?,?> tab[] = table;
    int hash = key.hashCode();
    int index = (hash & 0x7FFFFFFF) % tab.length;
    @SuppressWarnings("unchecked")
    //key值存在於數組中
    Entry<K,V> entry = (Entry<K,V>)tab[index];
    for(; entry != null ; entry = entry.next) {
        if ((entry.hash == hash) && entry.key.equals(key)) {
            V old = entry.value;
            entry.value = value;
            return old;
        }
    }
	//key值不存在則進行添加
    addEntry(hash, key, value, index);
    return null;
}	

 private void addEntry(int hash, K key, V value, int index) {
    modCount++;

    Entry<?,?> tab[] = table;
    //若是超過了閾值
    if (count >= threshold) {
        rehash();
        tab = table;
        hash = key.hashCode();
        index = (hash & 0x7FFFFFFF) % tab.length;
    }
		//添加新元素
    @SuppressWarnings("unchecked")
    Entry<K,V> e = (Entry<K,V>) tab[index];
    tab[index] = new Entry<>(hash, key, value, e);
    count++;
}

##併發容器 1.JDK5.0提供了多種併發容器來改進同步容器的性能。同步容器將全部對容器狀態的訪問都串行化,以實現他們的線程安全性。代價是嚴重下降併發性。併發容器是針對多個線程併發設計的。緩存

ConcurrentHashMap用於代替基於散列的同步Map。安全

CopyOnWriteArrayList用於在遍歷操做爲主要操做的狀況下代替同步的List。併發

2.Java8 ConcurrentHashMap 代碼分析框架

public V put(K key, V value) {
    return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
		//key,value不能爲空
    if (key == null || value == null) throw new NullPointerException();
    //使Key值分散更均勻
    int hash = spread(key.hashCode());
    //記錄Node數量
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        //延遲初始化
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        //此bucket爲空,不用鎖,使用cas添加
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                  
        }
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}

3.CopyOnWriteArrayList適合用在「讀多,寫少」的「併發」應用中,換句話說,它適合使用在讀操做遠遠大於寫操做的場景裏,好比緩存。它不存在「擴容」的概念,每次寫操做(add or remove)都要copy一個副本,在副本的基礎上修改後改變array引用,因此稱爲「CopyOnWrite」,所以在寫操做是加鎖,而且對整個list的copy操做時至關耗時的,過多的寫操做不推薦使用該存儲結構。ide

Java8 CopyOnWriteArrayList 代碼分析函數

//get 讀操做
	private E get(Object[] a, int index) {
    return (E) a[index];
	}

	//add 寫操做
	
	public boolean add(E e) {
		//可重入鎖
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
    		//得到數組元素對象
        Object[] elements = getArray();
        int len = elements.length;
        //數組的copy
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        //添加元素
        newElements[len] = e;
        setArray(newElements);
        return true;
    } finally {
    		//釋放鎖
        lock.unlock();
    }
}

##阻塞隊列和生產者-消費者模式

1.阻塞隊列提供了可供阻塞的put和take方法,以及支持定時的offer和poll方法,若是隊列已經滿了,那麼put 方法將一直阻塞直到有空間可用;若是隊列爲空,那麼take 方法將會阻塞直到有元素可用。隊列能夠是有界的也能夠是無界的,無界隊列永遠不會被充滿,所以無界隊列上的put方法也永遠不會阻塞。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器裏拿元素。BlockingQueue簡化了生產者-消費者設計模式的實現過程,他支持任意數量的生產者和消費者。一張最多見的生產者-消費者設計模式就是線程池和工做隊列的組合,在Executor任務執行框架中就體現了這種模式。

2.生產者-消費者模式代碼示例

public class Client {
public static class People{
	
	BlockingQueue<Object> peBlockingQueue=new ArrayBlockingQueue<Object>(3);
			
	public void putPeople(Object object){
		try {
			//若是此時隊列已經滿了將阻塞。
			peBlockingQueue.put(object);
		} catch (Exception e) {
			e.printStackTrace();
		}
		System.out.println("放入人員");
	}
	
	public Object takePeople(){
		Object object=null;
		try {
			//若是此時隊列爲空將阻塞。
			object=peBlockingQueue.take();
		} catch (Exception e) {
			e.printStackTrace();
		}
		System.out.println("取出人員");
		return object;
	}
}

static class putThread extends Thread{
	
	private  People people; 
	private Object object=new Object();
	public putThread(People people){
		this.people=people;
	}
	@Override
    public void run() {  
    		people.putPeople(object);  
    }  
}
static class takeThread extends Thread{
	private  People people; 
	public takeThread(People people){
		this.people=people;
	}
	@Override
    public void run() {  
    		people.takePeople(); 
    }  
}
public static void main(String args[]){
	People people=new People();
	for (int i = 0; i < 5; i++) {
		new Thread(new putThread(people)).start();
	}
	for (int i = 0; i < 5; i++) {
		new Thread(new takeThread(people)).start();
	}
}
}

輸出結果

放入人員

放入人員

放入人員

取出人員

放入人員

放入人員

取出人員

取出人員

取出人員

取出人員

3.Java 裏的阻塞隊列

ArrayBlockingQueue :一個由數組結構組成的有界阻塞隊列。

LinkedBlockingQueue :一個由鏈表結構組成的有界阻塞隊列。默認大小爲Integer.MAX_VALUE。

PriorityBlockingQueue :一個支持優先級排序的無界阻塞隊列。

DelayQueue:基於PriorityQueue,一種延時阻塞隊列,DelayQueue中的元素只有當其指定的延遲時間到了,纔可以從隊列中獲取到該元素。DelayQueue也是一個無界隊列,所以往隊列中插入數據的操做(生產者)永遠不會被阻塞,而只有獲取數據的操做(消費者)纔會被阻塞。

ArrayBlockingQueue是一個用數組實現的有界阻塞隊列。此隊列按照先進先出(FIFO)的原則對元素進行排序。默認狀況下不保證訪問者公平的訪問隊列,所謂公平訪問隊列是指阻塞的全部生產者線程或消費者線程,當隊列可用時,能夠按照阻塞的前後順序訪問隊列,即先阻塞的生產者線程,能夠先往隊列裏插入元素,先阻塞的消費者線程,能夠先從隊列裏獲取元素。一般狀況下爲了保證公平性會下降吞吐量。建立公平的隊列:

BlockingQueue<Object> peBlockingQueue=new ArrayBlockingQueue<Object>(3,true);

ArrayBlockingQueue代碼分析:

//可重入鎖	
final ReentrantLock lock;
//等待條件
private final Condition notEmpty;
//等待條件
private final Condition notFull;
//put 方法
 public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
    		//若是計數等於數組長度
        while (count == items.length)
        		//非滿阻塞
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

 private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    //非空喚醒
    notEmpty.signal();
}


public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {

    checkNotNull(e);
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    //獲取可中斷鎖
    lock.lockInterruptibly();
    try {
        while (count == items.length) {
            if (nanos <= 0)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(e);
        return true;
    } finally {
        lock.unlock();
    }
}

DelayQueue是一個支持延時獲取元素的無界阻塞隊列。隊列使用PriorityQueue來實現。隊列中的元素必須實現Delayed接口,在建立元素時能夠指定多久才能從隊列中獲取當前元素。只有在延遲期滿時才能從隊列中提取元素。

咱們能夠將DelayQueue運用在如下應用場景:

緩存系統的設計。能夠用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。

定時任務調度。使用DelayQueue保存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行,好比TimerQueue就是使用DelayQueue實現的。

關閉空閒鏈接。

##阻塞方法與中斷方法

##同步工具類

信號量

Semaphore 中管理着一組虛擬的許可,許可的初始數量可經過構造函數來指定,在執行操做時能夠首先得到許可,並在使用以後釋放許可,若是沒有許可,那麼acquire將阻塞直到有許可。

Semaphore 能夠用於實現資源池,例如數據庫鏈接池。咱們能夠構造一個固定長度的資源池,當池爲空,請求資源將會阻塞直到有資源。也可使用Semaphore將任何一種容器變成有界阻塞容器。

public class TestSemaphore {
public static class SemaphoreDemo {
	private ReentrantLock lock = new ReentrantLock();
	private Semaphore semaphore;
	private final ArrayList<Object> resourceList = new ArrayList<Object>();

	public SemaphoreDemo(ArrayList<Object> list) {
		this.resourceList.addAll(list);
		semaphore = new Semaphore(3);
	}
	// 獲取資源
	public Object acquire() throws InterruptedException {
		semaphore.acquire();
		lock.lock();
		try {
			return resourceList.get(resourceList.size() - 1);
		} catch (Exception InterruptedException) {
		} finally {
			lock.unlock();
		}
		return null;
	}
	// 釋放資源
	public void release(Object resource) {
		lock.lock();
		try {
			resourceList.add(resource);
		} finally {
			lock.unlock();
		}
		semaphore.release();
	}
}
public static void main(String[] args) {
	ArrayList<Object> resourceList = new ArrayList();
	resourceList.add("Resource1");
	resourceList.add("Resource2");

	final SemaphoreDemo semaphoreDemo = new SemaphoreDemo(resourceList);
	Runnable task = new Runnable() {
		public void run() {
			Object reObject = null;
			try {
				// 獲取資源
				reObject = semaphoreDemo.acquire();
				System.out.println(Thread.currentThread().getName() + ":" + reObject);
				//休眠
				Thread.sleep(1500);
				System.out.println(Thread.currentThread().getName() + "!" + reObject);
			} catch (Exception e) {
				e.printStackTrace();
			} finally {
				semaphoreDemo.release(reObject);
			}
		}
	};
	ExecutorService executorService = Executors.newCachedThreadPool();
	for (int i = 0; i < 9; i++) {
		executorService.submit(task);
    }
	executorService.shutdown();
}

}

代碼輸出

pool-1-thread-1:Resource2

pool-1-thread-3:Resource2

pool-1-thread-2:Resource2

pool-1-thread-1!Resource2

pool-1-thread-3!Resource2

pool-1-thread-2!Resource2

pool-1-thread-4:Resource2

pool-1-thread-5:Resource2

pool-1-thread-5!Resource2

pool-1-thread-4!Resource2

閉鎖

閉鎖是一種同步工具類,能夠延遲線程的進度直到其到達終止狀態。閉鎖能夠用來確保某些活動直到其餘活動都完成之後才繼續執行。

CountDownLatch是一種靈活的閉鎖實現,可使一個或多個線程等待一組事件發生。CountDownLatch有一個正數計數器,countDown方法對計數器作減操做,await方法等待計數器達到0。全部await的線程都會阻塞直到計數器爲0或者等待線程中斷或者超時。

CountDownLatch源碼

//初始化時給定計數器大小
public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}
//遞減鎖存器的計數,若是計數到達零,則釋放全部等待的線程。
public void countDown() {
    sync.releaseShared(1);
}
//使當前線程阻塞直到計數器爲零
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

CountDownLatch使用示例

public class TestCountDownLatch {

public static CountDownLatch latch=null;
public static void main(String args[]) throws InterruptedException{
	try {
		latch= new CountDownLatch(5);
		for (int i = 0; i < 5; i++) {
			new TestThread().start();
		}
		latch.await();
		System.out.println("5個線程已經完成");
	} catch (Exception e) {
		
	}finally {
		
	}
}
static class TestThread extends Thread{
	@Override
	public void run() {
		try {
			Thread.sleep(1000);
			System.out.println(Thread.currentThread().getName() + " sleep 1000ms.");
			latch.countDown();
		}catch (InterruptedException e) {
			e.printStackTrace();
	    }
	}
	
}
}

輸出結果

Thread-1 sleep 1000ms.

Thread-4 sleep 1000ms.

Thread-3 sleep 1000ms.

Thread-0 sleep 1000ms.

Thread-2 sleep 1000ms.

5個線程已經完成

相關文章
相關標籤/搜索