多線程系列五:併發工具類和併發容器

1、併發容器

1.ConcurrentHashMap

爲何要使用ConcurrentHashMap

在多線程環境下,使用HashMap進行put操做會引發死循環,致使CPU利用率接近100%,HashMap在併發執行put操做時會引發死循環,是由於多線程會致使HashMap的Entry鏈表php

造成環形數據結構,一旦造成環形數據結構,Entry的next節點永遠不爲空,就會產生死循環獲取Entry。java

HashTable容器使用synchronized來保證線程安全,但在線程競爭激烈的狀況下HashTable的效率很是低下。由於當一個線程訪問HashTable的同步方法,其餘線程也訪問HashTable的同步方法時,會進入阻塞或輪詢狀態。如線程1使用put進行元素添加,線程2不但不能使用put方法添加元素,也不能使用get方法來獲取元素,因此競爭越激烈效率越低。算法

ConcurrentHashMap的一些有用的方法

不少時候咱們但願在元素不存在時插入元素,咱們通常會像下面那樣寫代碼數據庫

synchronized(map){編程

  if (map.get(key) == null){數組

      return map.put(key, value);緩存

  } else{安全

      return map.get(key);數據結構

  }多線程

}

putIfAbsent(key,value)方法原子性的實現了一樣的功能

putIfAbsent(K key, V value)  

 若是key對應的value不存在,則put進去,返回null。不然不put,返回已存在的value。  

boolean remove(Object key, Object value)  

  若是key對應的值是value,則移除K-V,返回true。不然不移除,返回false。  

boolean replace(K key, V oldValue, V newValue)  

 若是key對應的當前值是oldValue,則替換爲newValue,返回true。不然不替換,返回false

Hash的解釋

散列,任意長度的輸入,經過一種算法,變換成固定長度的輸出。屬於壓縮的映射。

hash算法示例圖演示:

 

相似於HaspMap的實現就是使用散列,好比把1000個元素放到長度爲10的hashmap裏面去,放入以前會把這1000個數通過hash算法映射到10個數組裏面去,這時候就會存在相同的映射值在一個數組的相同位置,就會產生hash碰撞,此時hashmap就會在產生碰撞的數組的後面使用Entry鏈表來存儲相同映射的值,而後使用equals方法來判斷同一個鏈表存儲的值是否同樣來獲取值,鏈表就是hashmap用來解決碰撞的方法,因此咱們通常在寫一個類的時候要寫本身的hashcode方法和equals方法,若是鍵的hashcode相同,再使用鍵的equals方法判斷鍵內容是否是同樣的,同樣的就獲取值

Md5,Sha,取餘都是散列算法,ConcurrentHashMap中是wang/jenkins算法

 ConcurrentHashMap在1.7下的實現

分段鎖的設計思想。

分段鎖的思想示例圖:

說明:

a)傳統的hashtable是很小空間的數組整段鎖住,這樣性能比較低

b)ConcurrentHashMap是在很小空間數組的前面再加一個數組,映射的時候先映射到前面的數組,而後再映射到後面的很小空間的數組;讀取的時候只須要把前面的數組鎖住就能夠了。這就是分段鎖的思想

ConcurrentHashMap是由Segment數組結構和HashEntry數組結構組成。Segment實際是一種可重入鎖(ReentrantLock),也就是用於分段的鎖。HashEntry則用於存儲鍵值對數據。一個ConcurrentHashMap裏包含一個Segment數組。Segment的結構和HashMap相似,是一種數組和鏈表結構。一個Segment裏包含一個HashEntry數組,每一個HashEntry是一個鏈表結構的元素,每一個Segment守護着一個HashEntry數組裏的元素,當對HashEntry數組的數據進行修改時,必須首先得到與它對應的Segment鎖。

 

說明:上圖存在兩次散列的過程:好比插入一個1000的數,首先是把1000的位數(最可能是高16位)作一次散列找到在segments數組中的位置,而後再把1000自己作一次散列找到在table中的位置

獲取值時同樣

ConcurrentHashMap初始化方法是經過initialCapacity、loadFactor和concurrencyLevel(參數concurrencyLevel是用戶估計的併發級別,就是說你以爲最多有多少線程共同修改這個map,根據這個來肯定Segment數組的大小concurrencyLevel默認是DEFAULT_CONCURRENCY_LEVEL = 16;)。

ConcurrentHashMap徹底容許多個讀操做併發進行,讀操做並不須要加鎖。ConcurrentHashMap實現技術是保證HashEntry幾乎是不可變的。HashEntry表明每一個hash鏈中的一個節點,能夠看到其中的對象屬性要麼是final的,要麼是volatile的。

總結:ConcurrentHashMap在1.7及如下的實現使用數組+鏈表的方式,採用了分段鎖的思想

ConcurrentHashMap在1.8下的實現

改進一:取消segments字段,直接採用transient volatile HashEntry<K,V>[] table保存數據,採用table數組元素做爲鎖,從而實現了對每一行數據進行加鎖,進一步減小併發衝突的機率。

改進二:將原先table數組+單向鏈表的數據結構,變動爲table數組+單向鏈表+紅黑樹的結構。對於個數超過8(默認值)的列表,jdk1.8中採用了紅黑樹的結構,那麼查詢的時間複雜度能夠下降到O(logN),能夠改進性能。

總結:ConcurrentHashMap在1.8下的實現使用數組+鏈表+紅黑樹的方式,當鏈表個數超過8的時候就把原來的鏈表轉成紅黑樹,使用紅黑樹來存取,採用了元素鎖的思想

2. ConcurrentSkipListMap  和ConcurrentSkipListSet

ConcurrentSkipListMap    TreeMap的併發實現

ConcurrentSkipListSet     TreeSet的併發實現

瞭解什麼是SkipList?

二分查找和AVL樹查找

二分查找要求元素能夠隨機訪問,因此決定了須要把元素存儲在連續內存。這樣查找確實很快,可是插入和刪除元素的時候,爲了保證元素的有序性,就須要大量的移動元素了。

若是須要的是一個可以進行二分查找,又能快速添加和刪除元素的數據結構,首先就是二叉查找樹,二叉查找樹在最壞狀況下可能變成一個鏈表。

因而,就出現了平衡二叉樹,根據平衡算法的不一樣有AVL樹,B-Tree,B+Tree,紅黑樹等,可是AVL樹實現起來比較複雜,平衡操做較難理解,這時候就能夠用SkipList跳躍表結構。

傳統意義的單鏈表是一個線性結構,向有序的鏈表中插入一個節點須要O(n)的時間,查找操做須要O(n)的時間。

 

若是咱們使用上圖所示的跳躍表,就能夠減小查找所需時間爲O(n/2),由於咱們能夠先經過每一個節點的最上面的指針先進行查找,這樣子就能跳過一半的節點。

好比咱們想查找19,首先和6比較,大於6以後,在和9進行比較,而後在和12進行比較......最後比較到21的時候,發現21大於19,說明查找的點在17和21之間,從這個過程當中,咱們能夠看出,查找的時候跳過了三、七、12等點,所以查找的複雜度爲O(n/2)。

跳躍表其實也是一種經過「空間來換取時間」的一個算法,經過在每一個節點中增長了向前的指針,從而提高查找的效率。

跳躍表又被稱爲機率,或者說是隨機化的數據結構,目前開源軟件 Redis 和 lucence都有用到它。

3. ConcurrentLinkedQueue  無界非阻塞隊列

ConcurrentLinkedQueue   LinkedList 併發版本

Add,offer:添加元素

Peek():get頭元素並不把元素拿走

poll():get頭元素把元素拿走

4. CopyOnWriteArrayList和CopyOnWriteArraySet

寫的時候進行復制,能夠進行併發的讀。

適用讀多寫少的場景:好比白名單,黑名單,商品類目的訪問和更新場景,假如咱們有一個搜索網站,用戶在這個網站的搜索框中,輸入關鍵字搜索內容,可是某些關鍵字不容許被搜索。這些不能被搜索的關鍵字會被放在一個黑名單當中,黑名單天天晚上更新一次。當用戶搜索時,會檢查當前關鍵字在不在黑名單當中,若是在,則提示不能搜索。

弱點:內存佔用高,數據一致性弱

總結:寫的時候從新複製一份數據,而後在複製的數據裏面寫入數據,寫完之後再把原來的數據的引用執行復制的數據,因此存在數據的弱一致性,適用於讀多寫少的場景

5.什麼是阻塞隊列

取數據和存數據不知足要求時,會對線程進行阻塞。例如取數據時發現隊列裏面沒有數據就在那裏阻塞等着有數據了再取;存數據時發現隊列已經滿了就在那裏阻塞等着有數據被取走時再存

方法

拋出異常

返回值

一直阻塞

超時退出

插入

Add

offer

put

offer

移除

remove

poll

take

poll

檢查

element

peek

沒有

沒有

經常使用阻塞隊列

ArrayBlockingQueue: 數組結構組成有界阻塞隊列。

先進先出原則,初始化必須傳大小,take和put時候用的同一把鎖

LinkedBlockingQueue:鏈表結構組成的有界阻塞隊列

先進先出原則,初始化能夠不傳大小,put,take鎖分離

PriorityBlockingQueue:支持優先級排序的無界阻塞隊列,

排序,天然順序升序排列,更改順序:類本身實現compareTo()方法,初始化PriorityBlockingQueue指定一個比較器Comparator

DelayQueue: 使用了優先級隊列的無界阻塞隊列

支持延時獲取,隊列裏的元素要實現Delay接口。DelayQueue很是有用,能夠將DelayQueue運用在如下應用場景。

緩存系統的設計:能夠用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。

還有訂單到期,限時支付等等。

SynchronousQueue:不存儲元素的阻塞隊列

每一個put操做必需要等take操做

LinkedTransferQueue:鏈表結構組成的界阻塞隊列

Transfer,tryTransfer,生產者put時,當前有消費者take,生產者直接把元素傳給消費者

LinkedBlockingDeque:鏈表結構組成的雙向阻塞隊列

能夠在隊列的兩端插入和移除,xxxFirst頭部操做,xxxLast尾部操做。工做竊取模式。

瞭解阻塞隊列的實現原理

使用了Condition實現。

生產者消費者模式

在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生

產線程和消費線程的工做能力來提升程序總體處理數據的速度。

在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發

中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理

完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這種生產消費能力不均衡的問題,便有了生產者和消費者模式。

生產者和消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而是經過阻塞隊列來進行通訊,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。

什麼是Fork/Join框架

並行執行任務的框架,把大任務拆分紅不少的小任務,彙總每一個小任務的結果獲得大任務的結果。

 

工做竊取算法

工做竊取(work-stealing)算法是指某個線程從其餘隊列裏竊取任務來執行,執行完之後把結果放回去

那麼,爲何須要使用工做竊取算法呢?假如咱們須要作一個比較大的任務,能夠把這個任務分割爲若干互不依賴的子任務,爲了減小線程間的競爭,把這些子任務分別放到不一樣的隊列裏,併爲每一個隊列建立一個單獨的線程來執行隊列裏的任務,線程和隊列一一對應。

好比A線程負責處理A隊列裏的任務。可是,有的線程會先把本身隊列裏的任務幹完,而其餘線程對應的隊列裏還有任務等待處理。幹完活的線程與其等着,不如去幫其餘線程幹活,因而它就去其餘線程的隊列裏竊取一個任務來執行。而在這時它們會訪問同一個隊列,因此爲了減小竊取任務線程和被竊取任務線程之間的競爭,一般會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。

Fork/Join框架的使用

Fork/Join使用兩個類來完成以上兩件事情。

①ForkJoinTask:咱們要使用ForkJoin框架,必須首先建立一個ForkJoin任務。它提供在任務

中執行fork()和join()操做的機制。一般狀況下,咱們不須要直接繼承ForkJoinTask類,只須要繼承它的子類,Fork/Join框架提供瞭如下兩個子類。

·RecursiveAction:用於沒有返回結果的任務。

·RecursiveTask:用於有返回結果的任務。

②ForkJoinPool:ForkJoinTask須要經過ForkJoinPool來執行。

Fork/Join有同步和異步兩種方式。

案例1:孫悟空摘桃子fork/join的案例

複製代碼

1 /**
 2  * 孫悟空摘桃子fork/join的案例,孫悟空去摘桃子時發現桃子太多就讓猴子猴孫去幫忙在桃子,
 3  * 摘完之後再統一彙總求和
 4  */
 5 public class ForkJoinWuKong {
 6 
 7   private static class XiaoWuKong extends RecursiveTask<Integer>{
 8 
 9       private final static int THRESHOLD = 100;//閾值,數組多小的時候,再也不進行任務拆分操做
10       private PanTao[] src;
11       private int fromIndex;
12       private int toIndex;
13       private IPickTaoZi pickTaoZi;
14 
15       public XiaoWuKong(PanTao[] src, int fromIndex, int toIndex, IPickTaoZi pickTaoZi) {
16           this.src = src;
17           this.fromIndex = fromIndex;
18           this.toIndex = toIndex;
19           this.pickTaoZi = pickTaoZi;
20       }
21 
22       @Override
23       protected Integer compute() {
24           //計算完之後結果彙總
25           if (toIndex-fromIndex<THRESHOLD){
26               int count =0 ;
27               for(int i=fromIndex;i<toIndex;i++){
28                   if (pickTaoZi.pick(src,i)) count++;
29               }
30               return count;
31           }
32           //大任務拆分紅小任務
33           else{
34               //fromIndex....mid......toIndex
35               int mid = (fromIndex+toIndex)/2;
36               XiaoWuKong left = new XiaoWuKong(src,fromIndex,mid,pickTaoZi);
37               XiaoWuKong right = new XiaoWuKong(src,mid,toIndex,pickTaoZi);
38               invokeAll(left,right);
39               return left.join()+right.join();
40 
41           }
42       }
43   }
44 
45     public static void main(String[] args) {
46 
47         ForkJoinPool pool = new ForkJoinPool();
48         PanTao[] src = MakePanTaoArray.makeArray();
49         IProcessTaoZi processTaoZi = new WuKongProcessImpl();
50         IPickTaoZi pickTaoZi = new WuKongPickImpl(processTaoZi);
51 
52         long start = System.currentTimeMillis();
53 
54         //構造一個ForkJoinTask
55         XiaoWuKong xiaoWuKong = new XiaoWuKong(src,0,
56                 src.length-1,pickTaoZi);
57 
58         //ForkJoinTask交給ForkJoinPool來執行。
59         pool.invoke(xiaoWuKong);
60 
61         System.out.println("The count is "+ xiaoWuKong.join()
62                 +" spend time:"+(System.currentTimeMillis()-start)+"ms");
63 
64     }
65 
66 }

複製代碼

案例2:使用Fork/Join框架實現計算1+2+3+....+100的結果

複製代碼

package com.study.demo.forkjoin;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

/**
 * Fork/Join框架設計思路:
 * 第一步:分割任務。首先咱們須要有一個fork類來把大任務分割成子任務,有可能子任務仍是很大,因此還須要
 *         不停的分割,直到分割出的子任務足夠小。
 * 第二步:執行任務併合並結果。分割的子任務分別放在雙端隊列裏,而後啓動幾個線程分別從雙端隊列裏獲取任務執行。
 *         子任務執行完的結果都統一放在一個隊列裏,啓動一個線程從隊列裏拿數據,而後合併這些數據。 
 * 
 * Fork/Join框架的具體實現:
 * Fork/Join使用兩個類來完成以上兩件事情:
 * ForkJoinTask:咱們要使用ForkJoin框架,必須首先建立一個ForkJoin任務。它提供在任務中執行fork()和join()
 *               操做的機制,一般狀況下咱們不須要直接繼承ForkJoinTask類,而只須要繼承它的子類,Fork/Join框架提供瞭如下兩個子類:
 *               RecursiveAction:用於沒有返回結果的任務。
 *               RecursiveTask :用於有返回結果的任務。
 * ForkJoinPool :ForkJoinTask須要經過ForkJoinPool來執行,任務分割出的子任務會添加到當前工做線程所維護的雙端隊列中,
 *                進入隊列的頭部。當一個工做線程的隊列裏暫時沒有任務時,它會隨機從其餘工做線程的隊列的尾部獲取一個任務。
 *                
 * 實戰:使用Fork/Join框架實現計算1+2+3+....+100的結果-100個數拆分紅10個(閾值)子任務來執行最後彙總結果
 *
 */
public class CountTask extends RecursiveTask<Integer> {

    /**
     * 序列化
     */
    private static final long serialVersionUID = 1L;
    private static final int THRESHOLD = 10;// 閾值
    private int start;
    private int end;
    public CountTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {

        int sum = 0;

        // 若是任務足夠小就計算任務
        boolean canCompute = (end - start) <= THRESHOLD;
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }

        } else {

            // 若是任務大於閥值,就分裂成兩個子任務計算
            int middle = (start + end) / 2;
            CountTask leftTask = new CountTask(start, middle);
            CountTask rightTask = new CountTask(middle + 1, end);

            // 執行子任務
            leftTask.fork();
            rightTask.fork();

            // 等待子任務執行完,並獲得其結果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();

            // 合併子任務
            sum = leftResult + rightResult;

        }

        return sum;

    }

    public static void main(String[] args) {

        ForkJoinPool forkJoinPool = new ForkJoinPool();

        // 生成一個計算任務,負責計算1+2+3+4
        CountTask task = new CountTask(1, 100);

        // 執行一個任務
        Future result = forkJoinPool.submit(task);

        try {

            System.out.println(result.get());

        } catch (InterruptedException e) {

        } catch (ExecutionException e) {

        }

    }

}

複製代碼

 

2、併發工具類

1. CountDownLatch

容許一個或多個線程等待其餘線程完成操做。CountDownLatch的構造函數接收一個int類型的參數做爲計數器,若是你想等待N個點完成,這裏就傳入N。當咱們調用CountDownLatch的countDown方法時,N就會減1,CountDownLatch的await方法會阻塞當前線程,直到N變成零。

因爲countDown方法能夠用在任何地方,因此這裏說的N個點,能夠是N個線程,也能夠是1個線程裏的N個執行步驟。用在多個線程時,只須要把這個CountDownLatch的引用傳遞到線程裏便可。

複製代碼

1 public class CountDownLatchCase {
 2 
 3     static CountDownLatch c = new CountDownLatch(7);
 4 
 5     private static class SubThread implements Runnable{
 6 
 7         @Override
 8         public void run() {
 9             System.out.println(Thread.currentThread().getId());
10             c.countDown();
11             System.out.println(Thread.currentThread().getId()+" is done");
12         }
13     }
14 
15     public static void main(String[] args) throws InterruptedException {
16 
17         new Thread(new Runnable() {
18             @Override
19             public void run() {
20                 System.out.println(Thread.currentThread().getId());
21                 c.countDown();
22                 System.out.println("sleeping...");
23                 try {
24                     Thread.sleep(1500);
25                 } catch (InterruptedException e) {
26                     e.printStackTrace();
27                 }
28                 System.out.println("sleep is completer");
29                 c.countDown();
30             }
31         }).start();
32 
33         for(int i=0;i<=4;i++){
34             Thread thread = new Thread(new SubThread());
35             thread.start();
36         }
37 
38         c.await();
39         System.out.println("Main will gone.....");
40     }
41 }

複製代碼

 

2. CyclicBarrier

CyclicBarrier的字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要作的事情是,讓一組線程到達一個屏障(也能夠叫同步點)時被阻塞,直到最後一個線程到達屏障時,屏障纔會開門,全部被屏障攔截的線程纔會繼續運行。CyclicBarrier默認的構造方法是CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每一個線程調用await方法告訴CyclicBarrier我已經到達了屏障,而後當前線程被阻塞。

複製代碼

1 public class CyclicBarrriesBase {
 2 
 3     static CyclicBarrier c = new CyclicBarrier(2);
 4 
 5     public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
 6         new Thread(new Runnable() {
 7             @Override
 8             public void run() {
 9                 System.out.println(Thread.currentThread().getId());
10                 try {
11                     c.await();//等待主線程完成
12                     System.out.println(Thread.currentThread().getId()+"is going");
13                 } catch (InterruptedException e) {
14                     e.printStackTrace();
15                 } catch (BrokenBarrierException e) {
16                     e.printStackTrace();
17                 }
18                 System.out.println("sleeping...");
19 
20             }
21         }).start();
22 
23         System.out.println("main will sleep.....");
24         Thread.sleep(2000);
25         c.await();////等待子線程完成
26 
27         System.out.println("All are complete.");
28     }
29 
30 
31 
32 }

複製代碼

 

CyclicBarrier還提供一個更高級的構造函數CyclicBarrier(int parties,Runnable barrierAction),用於在線程到達屏障時,優先執行barrierAction,方便處理更復雜的業務場景。

CyclicBarrier能夠用於多線程計算數據,最後合併計算結果的場景。

複製代碼

1 public class CyclicBarrierSum {
 2 
 3     static CyclicBarrier c = new CyclicBarrier(5,new SumThread());
 4     //子線程結果存放的緩存
 5     private static ConcurrentHashMap<String,Integer> resultMap =
 6             new ConcurrentHashMap<>();
 7 
 8     //全部子線程達到屏障後,會執行這個Runnable的任務
 9     private static class SumThread implements Runnable{
10 
11         @Override
12         public void run() {
13             int result =0;
14             for(Map.Entry<String,Integer> workResult:resultMap.entrySet()){
15                 result = result+workResult.getValue();
16             }
17             System.out.println("result = "+result);
18             System.out.println("徹底能夠作與子線程,統計無關的事情.....");
19         }
20     }
21 
22     //工做線程,也就是子線程
23     private static class WorkThread implements Runnable{
24 
25         private Random t = new Random();
26 
27         @Override
28         public void run() {
29             int r = t.nextInt(1000)+1000;
30             System.out.println(Thread.currentThread().getId()+":r="+r);
31             resultMap.put(Thread.currentThread().getId()+"",r);
32             try {
33                 Thread.sleep(1000+r);
34                 c.await();
35             } catch (InterruptedException e) {
36                 e.printStackTrace();
37             } catch (BrokenBarrierException e) {
38                 e.printStackTrace();
39             }
40 
41         }
42     }
43 
44     public static void main(String[] args) {
45         for(int i=0;i<=4;i++){
46             Thread thread = new Thread(new WorkThread());
47             thread.start();
48         }
49     }
50 }

複製代碼

 

CyclicBarrierCountDownLatch的區別

CountDownLatch的計數器只能使用一次,而CyclicBarrier的計數器可使用reset()方法重置,CountDownLatch.await通常阻塞主線程,全部的工做線程執行countDown,而CyclicBarrierton經過工做線程調用await從而阻塞工做線程,直到全部工做線程達到屏障。

4. 控制併發線程數的Semaphore

Semaphore(信號量)是用來控制同時訪問特定資源的線程數量,它經過協調各個線程,以保證合理的使用公共資源。應用場景Semaphore能夠用於作流量控制,特別是公用資源有限的應用場景,好比數據庫鏈接。假若有一個需求,要讀取幾萬個文件的數據,由於都是IO密集型任務,咱們能夠啓動幾十個線程併發地讀取,可是若是讀到內存後,還須要存儲到數據庫中,而數據庫的鏈接數只有10個,這時咱們必須控制只有10個線程同時獲取數據庫鏈接保存數據,不然會報錯沒法獲取數據庫鏈接。這個時候,就可使用Semaphore來作流量控制。。Semaphore的構造方法Semaphore(int permits)接受一個整型的數字,表示可用的許可證數量。Semaphore的用法也很簡單,首先線程使用Semaphore的acquire()方法獲取一個許可證,使用完以後調用release()方法歸還許可證。還能夠用tryAcquire()方法嘗試獲取許可證。

複製代碼

1 public class SemaphporeCase<T> {
 2 
 3     private final Semaphore items;//有多少元素可拿
 4     private final Semaphore space;//有多少空位可放元素
 5     private List queue = new LinkedList<>();
 6 
 7     public SemaphporeCase(int itemCounts){
 8         this.items = new Semaphore(0);
 9         this.space = new Semaphore(itemCounts);
10     }
11 
12     //放入數據
13     public void put(T x) throws InterruptedException {
14         space.acquire();//拿空位的許可,沒有空位線程會在這個方法上阻塞
15         synchronized (queue){
16             queue.add(x);
17         }
18         items.release();//有元素了,能夠釋放一個拿元素的許可
19     }
20 
21     //取數據
22     public T take() throws InterruptedException {
23         items.acquire();//拿元素的許可,沒有元素線程會在這個方法上阻塞
24         T t;
25         synchronized (queue){
26             t = (T)queue.remove(0);
27         }
28         space.release();//有空位了,能夠釋放一個存在空位的許可
29         return t;
30     }
31 }

複製代碼

 

Semaphore還提供一些其餘方法,具體以下。

·intavailablePermits():返回此信號量中當前可用的許可證數。

·intgetQueueLength():返回正在等待獲取許可證的線程數。

·booleanhasQueuedThreads():是否有線程正在等待獲取許可證。

·void reducePermits(int reduction):減小reduction個許可證,是個protected方法。

·Collection getQueuedThreads():返

5. Exchanger

Exchanger(交換者)是一個用於線程間協做的工具類。Exchanger用於進行線程間的數據交換。它提供一個同步點,在這個同步點,兩個線程能夠交換彼此的數據。這兩個線程經過exchange方法交換數據,若是第一個線程先執行exchange()方法,它會一直等待第二個線程也執行exchange方法,當兩個線程都到達同步點時,這兩個線程就能夠交換數據,將本線程生產出來的數據傳遞給對方。

複製代碼

1 public class ExchangeCase {
 2 
 3     static final Exchanger<List<String>> exgr = new Exchanger<>();
 4 
 5     public static void main(String[] args) {
 6 
 7         new Thread(new Runnable() {
 8 
 9             @Override
10             public void run() {
11                 try {
12                     List<String> list = new ArrayList<>();
13                     list.add(Thread.currentThread().getId()+" insert A1");
14                     list.add(Thread.currentThread().getId()+" insert A2");
15                     list = exgr.exchange(list);//交換數據
16                     for(String item:list){
17                         System.out.println(Thread.currentThread().getId()+":"+item);
18                     }
19                 } catch (InterruptedException e) {
20                     e.printStackTrace();
21                 }
22             }
23         }).start();
24 
25         new Thread(new Runnable() {
26 
27             @Override
28             public void run() {
29                 try {
30                     List<String> list = new ArrayList<>();
31                     list.add(Thread.currentThread().getId()+" insert B1");
32                     list.add(Thread.currentThread().getId()+" insert B2");
33                     list.add(Thread.currentThread().getId()+" insert B3");
34                     System.out.println(Thread.currentThread().getId()+" will sleep");
35                     Thread.sleep(1500);
36                     list = exgr.exchange(list);//交換數據
37                     for(String item:list){
38                         System.out.println(Thread.currentThread().getId()+":"+item);
39                     }
40                 } catch (InterruptedException e) {
41                     e.printStackTrace();
42                 }
43             }
44         }).start();
45 
46     }
47 
48 }

複製代碼

相關文章
相關標籤/搜索