多線程(三)

併發工具類和併發容器

 

爲何要使用ConcurrentHashMap

在多線程環境下,使用HashMap進行put操做會引發死循環,致使CPU利用率接近100%,HashMap在併發執行put操做時會引發死循環,是由於多線程會致使HashMap的Entry鏈表php

造成環形數據結構,一旦造成環形數據結構,Entry的next節點永遠不爲空,就會產生死循環獲取Entry。java

HashTable容器使用synchronized來保證線程安全,但在線程競爭激烈的狀況下HashTable的效率很是低下。由於當一個線程訪問HashTable的同步方法,其餘線程也訪問HashTable的同步方法時,會進入阻塞或輪詢狀態。如線程1使用put進行元素添加,線程2不但不能使用put方法添加元素,也不能使用get方法來獲取元素,因此競爭越激烈效率越低。算法

一些有用的方法

不少時候咱們但願在元素不存在時插入元素,咱們通常會像下面那樣寫代碼數據庫

synchronized(map){
   if (map.get(key) == null){
      return map.put(key, value);
   } else{
      return map.get(key);
   }
}

putIfAbsent(key,value)方法原子性的實現了一樣的功能

V putIfAbsent(K key, V value)  

若是key對應的value不存在,則put進去,返回null。不然不put,返回已存在的value。  
boolean remove(Object key, Object value)  

若是key對應的值是value,則移除K-V,返回true。不然不移除,返回false。
boolean replace(K key, V oldValue, V newValue)  

若是key對應的當前值是oldValue,則替換爲newValue,返回true。不然不替換,返回false。
public class UseChm {

	HashMap<String, String> hashMap = new HashMap<>();
	ConcurrentHashMap<String, String> chm = new ConcurrentHashMap<>();

	public String putIfAbsent(String key, String value) {
		int a;
		synchronized (hashMap) {
			if (hashMap.get(key) == null) {
				return hashMap.put(key, value);
			} else {
				return hashMap.get(key);
			}
		}
	}

	// 存值
	public String useChm(String key, String value) {
		return chm.putIfAbsent(key, value);
	}

}

Hash的解釋

散列,任意長度的輸入,經過一種算法,變換成固定長度的輸出。屬於壓縮的映射。編程

Md5,Sha,取餘都是散列算法,ConcurrentHashMap中是wang/jenkins算法數組

ConcurrentHashMap在1.7下的實現

分段鎖的設計思想。緩存

ConcurrentHashMap是由Segment數組結構和HashEntry數組結構組成。Segment實際是一種可重入鎖(ReentrantLock),HashEntry則用於存儲鍵值對數據。一個ConcurrentHashMap裏包含一個Segment數組。Segment的結構和HashMap相似,是一種數組和鏈表結構。一個Segment裏包含一個HashEntry數組,每一個HashEntry是一個鏈表結構的元素,每一個Segment守護着一個HashEntry數組裏的元素,當對HashEntry數組的數據進行修改時,必須首先得到與它對應的Segment鎖。安全

 

ConcurrentHashMap初始化方法是經過initialCapacity、loadFactor和concurrencyLevel(參數concurrencyLevel是用戶估計的併發級別,就是說你以爲最多有多少線程共同修改這個map,根據這個來肯定Segment數組的大小concurrencyLevel默認是DEFAULT_CONCURRENCY_LEVEL = 16;)。數據結構

ConcurrentHashMap徹底容許多個讀操做併發進行,讀操做並不須要加鎖。ConcurrentHashMap實現技術是保證HashEntry幾乎是不可變的。HashEntry表明每一個hash鏈中的一個節點,能夠看到其中的對象屬性要麼是final的,要麼是volatile的。多線程

ConcurrentHashMap在1.8下的實現

改進一:取消segments字段,直接採用transient volatile HashEntry<K,V>[] table保存數據,採用table數組元素做爲鎖,從而實現了對每一行數據進行加鎖,進一步減小併發衝突的機率。

改進二:將原先table數組+單向鏈表的數據結構,變動爲table數組+單向鏈表+紅黑樹的結構。對於個數超過8(默認值)的列表,jdk1.8中採用了紅黑樹的結構,那麼查詢的時間複雜度能夠下降到O(logN),能夠改進性能。

ConcurrentSkipListMap  和ConcurrentSkipListSet

Skiplist跳錶 能夠提升鏈表的訪問速度,達到紅黑樹的性能

ConcurrentSkipListMap    TreeMap的併發實現

ConcurrentSkipListSet     TreeSet的併發實現

瞭解什麼是SkipList?

二分查找和AVL樹查找

二分查找要求元素能夠隨機訪問,因此決定了須要把元素存儲在連續內存。這樣查找確實很快,可是插入和刪除元素的時候,爲了保證元素的有序性,就須要大量的移動元素了。

若是須要的是一個可以進行二分查找,又能快速添加和刪除元素的數據結構,首先就是二叉查找樹,二叉查找樹在最壞狀況下可能變成一個鏈表。

因而,就出現了平衡二叉樹,根據平衡算法的不一樣有AVL樹,B-Tree,B+Tree,紅黑樹等,可是AVL樹實現起來比較複雜,平衡操做較難理解,這時候就能夠用SkipList跳躍表結構。

傳統意義的單鏈表是一個線性結構,向有序的鏈表中插入一個節點須要O(n)的時間,查找操做須要O(n)的時間。

若是咱們使用上圖所示的跳躍表,就能夠減小查找所需時間爲O(n/2),由於咱們能夠先經過每一個節點的最上面的指針先進行查找,這樣子就能跳過一半的節點。

好比咱們想查找19,首先和6比較,大於6以後,在和9進行比較,而後在和12進行比較......最後比較到21的時候,發現21大於19,說明查找的點在17和21之間,從這個過程當中,咱們能夠看出,查找的時候跳過了三、七、12等點,所以查找的複雜度爲O(n/2)。

跳躍表其實也是一種經過「空間來換取時間」的一個算法,經過在每一個節點中增長了向前的指針,從而提高查找的效率。

跳躍表又被稱爲機率,或者說是隨機化的數據結構,目前開源軟件 Redis 和 lucence都有用到它。

 

ConcurrentLinkedQueue  無界非阻塞隊列

ConcurrentLinkedQueue   LinkedList 併發版本

Add,offer:添加元素

Peek:get頭元素並不把元素拿走

poll():get頭元素把元素拿走

多用isEmpty()儘可能少用size()

/**
 * isEmpty()和size()的性能差別
 */
public class ConcurrentLinkedQueueTest {
	private static ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<Integer>();
	private static int count = 50000;
	private static int count2 = 2;
	private static CountDownLatch cd = new CountDownLatch(count2);

	public static void dothis() {
		for (int i = 0; i < count; i++) {
			queue.offer(i);
		}
	}

	public static void main(String[] args) throws InterruptedException {
		long timeStart = System.currentTimeMillis();
		ExecutorService es = Executors.newFixedThreadPool(4);
		ConcurrentLinkedQueueTest.dothis();
		// 啓用兩個線程取數據
		for (int i = 0; i < count2; i++) {
			es.submit(new Poll());
		}
		cd.await();
		System.out.println("cost time "
				+ (System.currentTimeMillis() - timeStart) + "ms");
		es.shutdown();
	}

	static class Poll implements Runnable {
		@Override
		public void run() {
			// while (queue.size() > 0) { //cost time 3648ms
			while (!queue.isEmpty()) {	  //cost time 412ms
				System.out.println(queue.poll());
			}
			cd.countDown();
		}
	}
}

 

CopyOnWriteArrayList和CopyOnWriteArraySet

寫的時候進行復制,能夠進行併發的讀。

適用讀多寫少的場景:好比白名單,黑名單,商品類目的訪問和更新場景,假如咱們有一個搜索網站,用戶在這個網站的搜索框中,輸入關鍵字搜索內容,可是某些關鍵字不容許被搜索。這些不能被搜索的關鍵字會被放在一個黑名單當中,黑名單天天晚上更新一次。當用戶搜索時,會檢查當前關鍵字在不在黑名單當中,若是在,則提示不能搜索。

弱點:內存佔用高,數據一致性弱

 

什麼是阻塞隊列

取數據和讀數據不知足要求時,會對線程進行阻塞

方法

拋出異常

返回值

一直阻塞

超時退出

插入

Add

offer

put

offer

移除

remove

poll

take

poll

檢查

element

peek

沒有

沒有


經常使用阻塞隊列

ArrayBlockingQueue: 數組結構組成有界阻塞隊列。

先進先出原則,初始化必須傳大小,take和put時候用的同一把鎖

LinkedBlockingQueue:鏈表結構組成的有界阻塞隊列

先進先出原則,初始化能夠不傳大小,put,take鎖分離

PriorityBlockingQueue:支持優先級排序的無界阻塞隊列,

排序,天然順序升序排列,更改順序:類本身實現compareTo()方法,初始化PriorityBlockingQueue指定一個比較器Comparator

DelayQueue: 使用了優先級隊列的無界阻塞隊列

支持延時獲取,隊列裏的元素要實現Delay接口。DelayQueue很是有用,能夠將DelayQueue運用在如下應用場景。

緩存系統的設計:能夠用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。

還有訂單到期,限時支付等等。

SynchronousQueue:不存儲元素的阻塞隊列

每一個put操做必需要等take操做

LinkedTransferQueue:鏈表結構組成的界阻塞隊列

Transfer,tryTransfer,生產者put時,當前有消費者take,生產者直接把元素傳給消費者

LinkedBlockingDeque:鏈表結構組成的雙向阻塞隊列

能夠在隊列的兩端插入和移除,xxxFirst頭部操做,xxxLast尾部操做。工做竊取模式。

public class User {
    private final String name;

    public User(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }
}
public class CacheBean<T> implements Delayed {

	private String id;
	private String name;
	private T data;
	private long activeTime;// 到期時間

	public CacheBean(String id, String name, T data, long activeTime) {
		this.id = id;
		this.name = name;
		this.data = data;
		this.activeTime = TimeUnit.NANOSECONDS.convert(activeTime,
				TimeUnit.MILLISECONDS) + System.nanoTime();// 納秒級別轉換
	}

	public String getId() {
		return id;
	}

	public void setId(String id) {
		this.id = id;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public T getData() {
		return data;
	}

	public void setData(T data) {
		this.data = data;
	}

	public long getActiveTime() {
		return activeTime;
	}

	public void setActiveTime(long activeTime) {
		this.activeTime = activeTime;
	}

	@Override
	public long getDelay(TimeUnit unit) {
		return unit.convert(this.activeTime - System.nanoTime(),
				TimeUnit.NANOSECONDS); // 檢查當前還剩多少時間
	}

	@Override
	public int compareTo(Delayed o) {
		long d = getDelay(TimeUnit.NANOSECONDS)
				- o.getDelay(TimeUnit.NANOSECONDS);
		return (d == 0) ? 0 : (d < 0) ? -1 : 1;
	}
}
public class GetFromCache implements Runnable {

	private DelayQueue<CacheBean<User>> queue;

	public GetFromCache(DelayQueue<CacheBean<User>> queue) {
		this.queue = queue;
	}

	@Override
	public void run() {
		while (true) {
			try {
				CacheBean<User> item = queue.take();// 阻塞地拿
				System.out.println("GetFromCache" + item.getId() + ":"
						+ item.getName() + "data:"
						+ ((User) item.getData()).getName());
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}
public class PutInCache implements Runnable {

    private DelayQueue<CacheBean<User>> queue;

    public PutInCache(DelayQueue<CacheBean<User>> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        CacheBean cacheBean = new CacheBean("1","5秒",
                new User("Mark"),5000);
        CacheBean cacheBean2 = new CacheBean("1","3秒",
                new User("Mike"),3000);
        queue.offer(cacheBean);
        System.out.println("put in cache:"+cacheBean.getId()+":"+cacheBean.getName());
        queue.offer(cacheBean2);
        System.out.println("put in cache:"+cacheBean2.getId()+":"+cacheBean2.getName());

    }
}
public class Test {
    public static void main(String[] args) throws InterruptedException {
        DelayQueue<CacheBean<User>> queue = new DelayQueue<CacheBean<User>>();
        new Thread(new PutInCache(queue)).start();
        new Thread(new GetFromCache(queue)).start();

        for(int i=1;i<20;i++){
            Thread.sleep(500);
            System.out.println(i*500);
        }
    }
}
運行結果
put in cache:1:5秒
put in cache:1:3秒
500
1000
1500
2000
2500
3000
GetFromCache1:3秒data:Mike
3500
4000
4500
5000
GetFromCache1:5秒data:Mark
5500
6000
6500
7000
7500
8000
8500
9000
9500

瞭解阻塞隊列的實現原理

使用了Condition實現。

生產者消費者模式

在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生

產線程和消費線程的工做能力來提升程序總體處理數據的速度。

在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發

中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理

完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這種生產消費能力不均衡的問題,便有了生產者和消費者模式。

生產者和消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而是經過阻塞隊列來進行通訊,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。

什麼是Fork/Join框架

並行執行任務的框架,把大任務拆分紅不少的小任務,彙總每一個小任務的結果獲得大任務的結果。

工做竊取算法

工做竊取(work-stealing)算法是指某個線程從其餘隊列裏竊取任務來執行。

那麼,爲何須要使用工做竊取算法呢?假如咱們須要作一個比較大的任務,能夠把這個任務分割爲若干互不依賴的子任務,爲了減小線程間的競爭,把這些子任務分別放到不一樣的隊列裏,併爲每一個隊列建立一個單獨的線程來執行隊列裏的任務,線程和隊列一一對應。

好比A線程負責處理A隊列裏的任務。可是,有的線程會先把本身隊列裏的任務幹完,而其餘線程對應的隊列裏還有任務等待處理。幹完活的線程與其等着,不如去幫其餘線程幹活,因而它就去其餘線程的隊列裏竊取一個任務來執行。而在這時它們會訪問同一個隊列,因此爲了減小竊取任務線程和被竊取任務線程之間的競爭,一般會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。

Fork/Join框架的使用

Fork/Join使用兩個類來完成以上兩件事情。

①ForkJoinTask:咱們要使用ForkJoin框架,必須首先建立一個ForkJoin任務。它提供在任務

中執行fork()和join()操做的機制。一般狀況下,咱們不須要直接繼承ForkJoinTask類,只須要繼承它的子類,Fork/Join框架提供瞭如下兩個子類。

·RecursiveAction:用於沒有返回結果的任務。

·RecursiveTask:用於有返回結果的任務。

②ForkJoinPool:ForkJoinTask須要經過ForkJoinPool來執行。

Fork/Join有同步和異步兩種方式。

public class PanTao {

    private final Color color;
    private final Size size;
    private final int Year;

    public PanTao(Color color, Size size, int year) {
        this.color = color;
        this.size = size;
        Year = year;
    }

    public Color getColor() {
        return color;
    }

    public Size getSize() {
        return size;
    }

    public int getYear() {
        return Year;
    }

    @Override
    public String toString() {
        return "PanTao{" +
                "color=" + color +
                ", size=" + size +
                ", Year=" + Year +
                '}';
    }
}
public enum Color {
    RED,GREEN
}
public enum Size {
    BIG,SMALL
}
public class MakePanTaoArray {

    //數組長度
    public static final int ARRAY_LENGTH  = 40000;
    //做爲基準的值
    public static final int STANDARD_VAL  = 66694523;

    public static PanTao[] makeArray() {

        //new三個隨機數發生器
        Random rColor = new Random();
        Random rSize = new Random();
        Random rYear = new Random();
        PanTao[] result = new PanTao[ARRAY_LENGTH];
        for(int i=0;i<ARRAY_LENGTH;i++){
            //填充數組
            PanTao panTao = new PanTao(
                    rColor.nextBoolean() ? Color.RED:Color.GREEN,
                    rSize.nextBoolean() ? Size.BIG:Size.SMALL,
                    rYear.nextInt(9001));
            result[i] =  panTao;
        }
        return result;
    }
}
public interface IPickTaoZi {
    boolean pick(PanTao[] src, int index);
}
public class WuKongPickImpl implements IPickTaoZi {

    private IProcessTaoZi processTaoZi;

    public WuKongPickImpl(IProcessTaoZi processTaoZi) {
        this.processTaoZi = processTaoZi;
    }

    @Override
    public boolean pick(PanTao[] src, int index) {
        if(src[index].getColor()== Color.RED&&
                src[index].getSize()== Size.BIG&&
                src[index].getYear()>=6000){
            processTaoZi.processTaoZi(src[index]);
            return true;
        }else{
            return false;
        }
    }
}
public interface IProcessTaoZi {
    void processTaoZi(PanTao taoZi);
}
public class WuKongProcessImpl implements IProcessTaoZi {
    @Override
    public void processTaoZi(PanTao taoZi) {
        //看看桃子,放到口袋裏
        inBag();
    }

    private void inBag(){
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
public class ForkJoinWuKong {

  private static class XiaoWuKong extends RecursiveTask<Integer>{

      private final static int THRESHOLD = 100;//閾值,數組多小,進行具體的業務操做
      private PanTao[] src;
      private int fromIndex;
      private int toIndex;
      private IPickTaoZi pickTaoZi;

      public XiaoWuKong(PanTao[] src, int fromIndex, int toIndex, IPickTaoZi pickTaoZi) {
          this.src = src;
          this.fromIndex = fromIndex;
          this.toIndex = toIndex;
          this.pickTaoZi = pickTaoZi;
      }

      @Override
      protected Integer compute() {
          if (toIndex-fromIndex<THRESHOLD){
              int count =0 ;
              for(int i=fromIndex;i<toIndex;i++){
                  if (pickTaoZi.pick(src,i)) count++;
              }
              return count;
          }else{
              //fromIndex....mid......toIndex
              int mid = (fromIndex+toIndex)/2;
              XiaoWuKong left = new XiaoWuKong(src,fromIndex,mid,pickTaoZi);
              XiaoWuKong right = new XiaoWuKong(src,mid,toIndex,pickTaoZi);
              invokeAll(left,right);
              return left.join()+right.join();

          }
      }
  }

    public static void main(String[] args) {

        ForkJoinPool pool = new ForkJoinPool();
        PanTao[] src = MakePanTaoArray.makeArray();
        IProcessTaoZi processTaoZi = new WuKongProcessImpl();
        IPickTaoZi pickTaoZi = new WuKongPickImpl(processTaoZi);

        long start = System.currentTimeMillis();

        XiaoWuKong xiaoWuKong = new XiaoWuKong(src,0,
                src.length-1,pickTaoZi);
        //同步執行
        pool.invoke(xiaoWuKong);
        //System.out.println("Task is Running.....");

        System.out.println("The count is "+ xiaoWuKong.join()
                +" spend time:"+(System.currentTimeMillis()-start)+"ms");

    }

}
運行結果:
The count is 3294 spend time:511ms

 

public class BaJieProcessImpl implements IProcessTaoZi {
	@Override
	public void processTaoZi(PanTao taoZi) {
		// 看看桃子,一口吃了
		eat();
	}

	// 看看桃子,一口吃了
	private void eat() {
		try {
			Thread.sleep(2);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}
public class BaJiePickImpl implements IPickTaoZi {

    private IProcessTaoZi processTaoZi;

    public BaJiePickImpl(IProcessTaoZi processTaoZi) {
        this.processTaoZi = processTaoZi;
    }

    @Override
    public boolean pick(PanTao[] src, int index) {
        if(src[index].getColor()== Color.RED&&
                src[index].getSize()== Size.BIG){
            processTaoZi.processTaoZi(src[index]);
            return true;
        }else{
            return false;
        }
    }
}
public class ForkJoinBaJieAsyn {

	private static class XiaoBaJie extends RecursiveAction {

		private final static int THRESHOLD = 100;
		private PanTao[] src;
		private int fromIndex;
		private int toIndex;
		private IPickTaoZi pickTaoZi;

		public XiaoBaJie(PanTao[] src, int fromIndex, int toIndex,
				IPickTaoZi pickTaoZi) {
			this.src = src;
			this.fromIndex = fromIndex;
			this.toIndex = toIndex;
			this.pickTaoZi = pickTaoZi;
		}

		@Override
		protected void compute() {
			if (toIndex - fromIndex < THRESHOLD) {
				System.out.println(" from index = " + fromIndex + " toIndex="
						+ toIndex);
				int count = 0;
				for (int i = fromIndex; i <= toIndex; i++) {
					if (pickTaoZi.pick(src, i))
						count++;
				}
			} else {
				// fromIndex....mid.....toIndex
				int mid = (fromIndex + toIndex) / 2;
				XiaoBaJie left = new XiaoBaJie(src, fromIndex, mid, pickTaoZi);
				XiaoBaJie right = new XiaoBaJie(src, mid + 1, toIndex,
						pickTaoZi);
				invokeAll(left, right);
			}
		}
	}

	public static void main(String[] args) throws InterruptedException {

		ForkJoinPool pool = new ForkJoinPool();
		PanTao[] src = MakePanTaoArray.makeArray();
		IProcessTaoZi processTaoZi = new BaJieProcessImpl();
		IPickTaoZi pickTaoZi = new BaJiePickImpl(processTaoZi);

		long start = System.currentTimeMillis();

		XiaoBaJie xiaoBaJie = new XiaoBaJie(src, 0, src.length - 1, pickTaoZi);
		// 異步執行
		pool.execute(xiaoBaJie);
		System.out.println("BaJie is picking.....");

		Thread.sleep(2);
		System.out.println("Please waiting.....");

		while (!xiaoBaJie.isDone()) {
			showLog(pool);
			TimeUnit.MILLISECONDS.sleep(100);
		}
		// 關閉鏈接池
		pool.shutdown();
		pool.awaitTermination(1, TimeUnit.DAYS);
		showLog(pool);

		xiaoBaJie.join();
		System.out.println(" spend time:"
				+ (System.currentTimeMillis() - start) + "ms");

	}

	// 監控Fork/Join池相關方法
	private static void showLog(ForkJoinPool pool) {
		System.out.printf("**********************\n");

		System.out.printf("線程池的worker線程們的數量:%d\n", pool.getPoolSize());
		System.out.printf("當前執行任務的線程的數量:%d\n", pool.getActiveThreadCount());
		System.out.printf("沒有被阻塞的正在工做的線程:%d\n", pool.getRunningThreadCount());
		System.out.printf("已經提交給池尚未開始執行的任務數:%d\n",
				pool.getQueuedSubmissionCount());
		System.out.printf("已經提交給池已經開始執行的任務數:%d\n", pool.getQueuedTaskCount());
		System.out.printf("線程偷取任務數:%d\n", pool.getStealCount());
		System.out.printf("池是否已經終止 :%s\n", pool.isTerminated());
		System.out.printf("**********************\n");
	}

}
設置數組長度爲400後打印結果
BaJie is picking.....
 from index = 100 toIndex=199
 from index = 0 toIndex=99
 from index = 200 toIndex=299
 from index = 300 toIndex=399
Please waiting.....
**********************
線程池的worker線程們的數量:5
當前執行任務的線程的數量:4
沒有被阻塞的正在工做的線程:0
已經提交給池尚未開始執行的任務數:0
已經提交給池已經開始執行的任務數:0
線程偷取任務數:0
池是否已經終止 :false
**********************
**********************
線程池的worker線程們的數量:0
當前執行任務的線程的數量:0
沒有被阻塞的正在工做的線程:0
已經提交給池尚未開始執行的任務數:0
已經提交給池已經開始執行的任務數:0
線程偷取任務數:4
池是否已經終止 :true
**********************
 spend time:247ms

 

CountDownLatch

容許一個或多個線程等待其餘線程完成操做。CountDownLatch的構造函數接收一個int類型的參數做爲計數器,若是你想等待N個點完成,這裏就傳入N。當咱們調用CountDownLatch的countDown方法時,N就會減1,CountDownLatch的await方法會阻塞當前線程,直到N變成零。因爲countDown方法能夠用在任何地方,因此這裏說的N個點,能夠是N個線程,也能夠是1個線程裏的N個執行步驟。用在多個線程時,只須要把這個CountDownLatch的引用傳遞到線程裏便可。

public class CountDownLatchCase {

	static CountDownLatch c = new CountDownLatch(7);

	private static class SubThread implements Runnable {

		@Override
		public void run() {
			System.out.println(Thread.currentThread().getId());
			c.countDown();
			System.out.println(Thread.currentThread().getId() + " is done");
		}
	}

	public static void main(String[] args) throws InterruptedException {

		new Thread(new Runnable() {
			@Override
			public void run() {
				System.out.println(Thread.currentThread().getId());
				c.countDown();
				System.out.println("sleeping...");
				try {
					Thread.sleep(1500);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println("sleep is completer");
				c.countDown();
			}
		}).start();

		for (int i = 0; i <= 4; i++) {
			Thread thread = new Thread(new SubThread());
			thread.start();
		}

		c.await();
		System.out.println("Main will gone.....");
	}
}
運行結果
11
sleeping...
12
13
12 is done
13 is done
14
14 is done
16
16 is done
15
15 is done
sleep is completer
Main will gone.....

CyclicBarrier

CyclicBarrier的字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要作的事情是,讓一組線程到達一個屏障(也能夠叫同步點)時被阻塞,直到最後一個線程到達屏障時,屏障纔會開門,全部被屏障攔截的線程纔會繼續運行。CyclicBarrier默認的構造方法是CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每一個線程調用await方法告訴CyclicBarrier我已經到達了屏障,而後當前線程被阻塞。

CyclicBarrier還提供一個更高級的構造函數CyclicBarrier(int parties,Runnable barrierAction),用於在線程到達屏障時,優先執行barrierAction,方便處理更復雜的業務場景。

CyclicBarrier能夠用於多線程計算數據,最後合併計算結果的場景。

public class CyclicBarrriesBase {

	static CyclicBarrier c = new CyclicBarrier(2);

	public static void main(String[] args) throws InterruptedException,
			BrokenBarrierException {
		new Thread(new Runnable() {
			@Override
			public void run() {
				System.out.println(Thread.currentThread().getId());
				try {
					System.out.println(Thread.currentThread().getId()+ " will await");
					c.await();// 等待主線程完成
					System.out.println(Thread.currentThread().getId()+ " is going");
				} catch (InterruptedException e) {
					e.printStackTrace();
				} catch (BrokenBarrierException e) {
					e.printStackTrace();
				}
				System.out.println("sleeping...");

			}
		}).start();

		System.out.println("main will sleep.....");
		Thread.sleep(2000);
		c.await();// //等待子線程完成
		System.out.println("All are complete.");
	}

}
運行結果
11
main will sleep.....
11 will await
All are complete.
11 is going
sleeping...
public class CyclicBarrierSum {

    static CyclicBarrier c = new CyclicBarrier(5,new SumThread());
    //子線程結果存放的緩存
    private static ConcurrentHashMap<String,Integer> resultMap =
            new ConcurrentHashMap<>();

    //全部子線程達到屏障後,會執行這個Runnable的任務
    private static class SumThread implements Runnable{

        @Override
        public void run() {
            int result =0;
            for(Map.Entry<String,Integer> workResult:resultMap.entrySet()){
                result = result+workResult.getValue();
            }
            System.out.println("result = "+result);
            System.out.println("徹底能夠作與子線程,統計無關的事情.....");
        }
    }

    //工做線程,也就是子線程
    private static class WorkThread implements Runnable{

        private Random t = new Random();

        @Override
        public void run() {
            int r = t.nextInt(1000)+1000;
            System.out.println(Thread.currentThread().getId()+":r="+r);
            resultMap.put(Thread.currentThread().getId()+"",r);
            try {
                Thread.sleep(1000+r);
                c.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }

        }
    }

    public static void main(String[] args) {
        for(int i=0;i<=4;i++){
            Thread thread = new Thread(new WorkThread());
            thread.start();
        }
    }
}
運行結果
11:r=1277
14:r=1144
15:r=1818
13:r=1000
12:r=1253
result = 6492
徹底能夠作與子線程,統計無關的事情.....

CyclicBarrier和CountDownLatch的區別

CountDownLatch的計數器只能使用一次,而CyclicBarrier的計數器可使用reset()方法重

置,CountDownLatch.await通常阻塞主線程,全部的工做線程執行countDown,而CyclicBarrierton經過工做線程調用await從而阻塞工做線程,直到全部工做線程達到屏障。

控制併發線程數的Semaphore

Semaphore(信號量)是用來控制同時訪問特定資源的線程數量,它經過協調各個線程,以保證合理的使用公共資源。應用場景Semaphore能夠用於作流量控制,特別是公用資源有限的應用場景,好比數據庫鏈接。假若有一個需求,要讀取幾萬個文件的數據,由於都是IO密集型任務,咱們能夠啓動幾十個線程併發地讀取,可是若是讀到內存後,還須要存儲到數據庫中,而數據庫的鏈接數只有10個,這時咱們必須控制只有10個線程同時獲取數據庫鏈接保存數據,不然會報錯沒法獲取數據庫鏈接。這個時候,就可使用Semaphore來作流量控制。。Semaphore的構造方法Semaphore(int permits)接受一個整型的數字,表示可用的許可證數量。Semaphore的用法也很簡單,首先線程使用Semaphore的acquire()方法獲取一個許可證,使用完以後調用release()方法歸還許可證。還能夠用tryAcquire()方法嘗試獲取許可證。

Semaphore還提供一些其餘方法,具體以下。

·intavailablePermits():返回此信號量中當前可用的許可證數。

·intgetQueueLength():返回正在等待獲取許可證的線程數。

·booleanhasQueuedThreads():是否有線程正在等待獲取許可證。

·void reducePermits(int reduction):減小reduction個許可證,是個protected方法。

·Collection getQueuedThreads():返回全部等待獲取許可證的線程集合,是個protected方

法。

public class SemaphporeCase<T> {

    private final Semaphore items;//有多少元素可拿
    private final Semaphore space;//有多少空位可放元素
    private List queue = new LinkedList<>();

    public SemaphporeCase(int itemCounts){
        this.items = new Semaphore(0);
        this.space = new Semaphore(itemCounts);
    }

    //放入數據
    public void put(T x) throws InterruptedException {
        space.acquire();//拿空位的許可,沒有空位線程會在這個方法上阻塞
        synchronized (queue){
            queue.add(x);
        }
        items.release();//有元素了,能夠釋放一個拿元素的許可
    }

    //取數據
    public T take() throws InterruptedException {
        items.acquire();//拿元素的許可,沒有元素線程會在這個方法上阻塞
        T t;
        synchronized (queue){
            t = (T)queue.remove(0);
        }
        space.release();//有空位了,能夠釋放一個存在空位的許可
        return t;
    }
}

Exchanger

Exchanger(交換者)是一個用於線程間協做的工具類。Exchanger用於進行線程間的數據交換。它提供一個同步點,在這個同步點,兩個線程能夠交換彼此的數據。這兩個線程經過exchange方法交換數據,若是第一個線程先執行exchange()方法,它會一直等待第二個線程也執行exchange方法,當兩個線程都到達同步點時,這兩個線程就能夠交換數據,將本線程生產出來的數據傳遞給對方。

public class ExchangeCase {

	static final Exchanger<List<String>> exgr = new Exchanger<>();

	public static void main(String[] args) {

		new Thread(new Runnable() {

			@Override
			public void run() {
				try {
					List<String> list = new ArrayList<>();
					list.add(Thread.currentThread().getId() + " insert A1");
					list.add(Thread.currentThread().getId() + " insert A2");
					System.out.println(Thread.currentThread().getId()+ " exchange");
					list = exgr.exchange(list);// 交換數據
					for (String item : list) {
						System.out.println(Thread.currentThread().getId() + ":"+ item);
					}
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}).start();

		new Thread(new Runnable() {

			@Override
			public void run() {
				try {
					List<String> list = new ArrayList<>();
					list.add(Thread.currentThread().getId() + " insert B1");
					list.add(Thread.currentThread().getId() + " insert B2");
					list.add(Thread.currentThread().getId() + " insert B3");
					System.out.println(Thread.currentThread().getId()+ " will sleep");
					Thread.sleep(3500);
					System.out.println(Thread.currentThread().getId()+ " exchange");
					list = exgr.exchange(list);// 交換數據
					for (String item : list) {
						System.out.println(Thread.currentThread().getId() + ":"
								+ item);
					}
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}).start();

	}

}
運行結果
11 exchange
12 will sleep
12 exchange
12:11 insert A1
12:11 insert A2
11:12 insert B1
11:12 insert B2
11:12 insert B3
相關文章
相關標籤/搜索