爲何使用ConcurrentHashMapphp
在多線程環境下,使用HashMap進行put操做會引發死循環,致使CPU利用率接近100%,HashMap在併發執行put操做時,出發rehash時,可能會引發鏈表成環的現象,一旦造成環形數據結構,Entry的next結點永遠不爲空,就會產生死循環獲取Entry。java
HshTable容器使用synchronized來保證線程安全,但在線程競爭激烈的狀況下HashTable的效率很是低下。由於當一個線程訪問HashTable的同步方法,其餘的線程也訪問H啊是Table的同步方法時,會進入阻塞或者輪詢狀態。如線程1使用put進行元素添加,線程2不但不能使用put方法添加元素,也不能使用get方法來獲取元素,因此競爭越激烈效率越低。算法
常見的方法數據庫
putIfAbsent(K key,V value)編程
若是key對應的value不存在,則put進去,返回null。不然不put,返回已存在的value.數組
boolean remove(Object key , Object value)緩存
若是key對應的值是value,則移除k-v,返回true。不然不移除,返回false安全
boolean replace(K key, V oldValue, V newValue)數據結構
若是key對應的當前值是oldValue,則替換爲newValue,返回true。不然不替換,返回false。多線程
Hash的解釋
散列,任意長度的輸入,經過一種算法,變換成固定長度的輸出。屬於壓縮的映射。Md5,Sha,取餘都是散列算法,ConcurrentHashMap中是wang/jenkins算法
ConcurrentHashMap在1.7下的實現
分段鎖的設計思想。
ConcurrentHashMap是由Segment數組結構和HashEntry數組結構組成。Segment實際是一種可重入鎖(ReentrantLock),HashEntry則用於存儲鍵值對數據。一個ConcurrentHashMap裏包含一個Segment數組。Segment的結構和HashMap相似,是一種數組和鏈表結構。一個Segment裏包含一個HashEntry數組,每一個HashEntry是一個鏈表結構的元素,每一個Segment守護着一個HashEntry數組裏的元素,當對HashEntry數組的數據進行修改時,必須首先得到與它對應的Segment鎖。
ConcurrentHashMap初始化方法是經過initialCapacity、loadFactor和concurrencyLevel(參數concurrencyLevel是用戶估計的併發級別,就是說你以爲最多有多少線程共同修改這個map,根據這個來肯定Segment數組的大小concurrencyLevel默認是DEFAULT_CONCURRENCY_LEVEL = 16;)。
ConcurrentHashMap徹底容許多個讀操做併發進行,讀操做並不須要加鎖。ConcurrentHashMap實現技術是保證HashEntry幾乎是不可變的。HashEntry表明每一個hash鏈中的一個節點,能夠看到其中的對象屬性要麼是final的,要麼是volatile的。
ConcurrentHashMap1.8版本實現
改進一:取消segments字段,直接採用transient volatile HashEntry<K,V>[] table保存數據,採用table數組元素做爲鎖,從而實現了對每一行數據進行加鎖,進一步減小併發衝突的機率。
改進二:將原先table數組+單向鏈表的數據結構,變動爲table數組+單向鏈表+紅黑樹的結構。對於個數超過8(默認值)的列表,jdk1.8中採用了紅黑樹的結構,那麼查詢的時間複雜度能夠下降到O(logN),能夠改進性能。
ConcurrentSkipListMap TreeMap的併發實現
ConcurrentSkipListSet TreeSet的併發實現
瞭解什麼是SkipList(跳錶)?
二分查找和AVL樹查找
二分查找要求元素能夠隨機訪問,因此決定了須要把元素存儲在連續內存。這樣查找確實很快,可是插入和刪除元素的時候,爲了保證元素的有序性,就須要大量的移動元素了。
若是須要的是一個可以進行二分查找,又能快速添加和刪除元素的數據結構,首先就是二叉查找樹,二叉查找樹在最壞狀況下可能變成一個鏈表。
因而,就出現了平衡二叉樹,根據平衡算法的不一樣有AVL樹,B-Tree,B+Tree,紅黑樹等,可是AVL樹實現起來比較複雜,平衡操做較難理解,這時候就能夠用SkipList跳躍表結構。
傳統意義的單鏈表是一個線性結構,向有序的鏈表中插入一個節點須要O(n)的時間,查找操做須要O(n)的時間。
若是咱們使用上圖所示的跳躍表,就能夠減小查找所需時間爲O(n/2),由於咱們能夠先經過每一個節點的最上面的指針先進行查找,這樣子就能跳過一半的節點。
好比咱們想查找19,首先和6比較,大於6以後,在和9進行比較,而後在和17進行比較......最後比較到21的時候,發現21大於19,說明查找的點在17和21之間,從這個過程當中,咱們能夠看出,查找的時候跳過了三、七、12等點,所以查找的複雜度爲O(n/2)。
跳躍表其實也是一種經過「空間來換取時間」的一個算法,經過在每一個節點中增長了向前的指針,從而提高查找的效率。
跳躍表又被稱爲機率,或者說是隨機化的數據結構,目前開源軟件 Redis 和 lucence都有用到它。
add,offer:添加元素
Peek:get頭元素並不把元素拿走
poll():get頭元素把元素拿走
寫的時候進行復制,能夠進行併發的讀。
適用讀多寫少的場景:好比白名單,黑名單,商品類目的訪問和更新場景,假如咱們有一個搜索網站,用戶在這個網站的搜索框中,輸入關鍵字搜索內容,可是某些關鍵字不容許被搜索。這些不能被搜索的關鍵字會被放在一個黑名單當中,黑名單天天晚上更新一次。當用戶搜索時,會檢查當前關鍵字在不在黑名單當中,若是在,則提示不能搜索。
弱點:內存佔用高,數據一致性弱
取數據和讀數據不知足要求時,會對線程進行阻塞
方法 |
拋出異常 |
返回值 |
一直阻塞 |
超時退出 |
插入 |
Add |
offer |
put |
offer |
移除 |
Remove |
poll |
take |
poll |
檢查 |
element |
peek |
沒有 |
沒有 |
經常使用阻塞隊列
ArrayBlockingQueue: 數組結構組成有界阻塞隊列。
先進先出原則,初始化必須傳大小,take和put時候用的同一把鎖
LinkedBlockingQueue:鏈表結構組成的有界阻塞隊列
先進先出原則,初始化能夠不傳大小,put,take鎖分離
PriorityBlockingQueue:支持優先級排序的無界阻塞隊列,
排序,天然順序升序排列,更改順序:類本身實現compareTo()方法,初始化PriorityBlockingQueue指定一個比較器Comparator
DelayQueue: 使用了優先級隊列的無界阻塞隊列
支持延時獲取,隊列裏的元素要實現Delay接口。DelayQueue很是有用,能夠將DelayQueue運用在如下應用場景。
緩存系統的設計:能夠用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。
還有訂單到期,限時支付等等。
假如對User進行緩存:
public class User { private String name; public String getName() { return name; } public void setName(String name) { this.name = name; } public User(String name) { super(); this.name = name; } }
CacheBean:
由於DelayQueue中存放的是Delayed接口,因此CacheBean要實現Delayed接
public class CacheBean<T> implements Delayed { private String id; private String name; private T data; //數據的到期時間 private Long activeTime; //要求傳入的activeTime爲毫秒,在構造函數中會自動轉換成納秒 public CacheBean(String id, String name, T data, Long activeTime) { super(); this.id = id; this.name = name; this.data = data; this.activeTime = TimeUnit.NANOSECONDS.convert(activeTime, TimeUnit.MILLISECONDS)+System.nanoTime(); } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public T getData() { return data; } public void setData(T data) { this.data = data; } public Long getActiveTime() { return activeTime; } public void setActiveTime(Long activeTime) { this.activeTime = activeTime; } @Override public int compareTo(Delayed o) { long d = this.getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS); return (d == 0) ? 0 : (d >0 ? 1 : -1); } //返回還有多少納秒的剩餘時間 @Override public long getDelay(TimeUnit unit) { return unit.convert(this.activeTime - System.nanoTime(), TimeUnit.NANOSECONDS); } }
定義兩個任務類,分別是向DelayQueue中存數據和取數據。
public class PutUserWork implements Runnable{ private DelayQueue<CacheBean<User>> delayQueue; private List<CacheBean<User>> list; public PutUserWork(DelayQueue<CacheBean<User>> delayQueue, List<CacheBean<User>> list) { super(); this.delayQueue = delayQueue; this.list = list; } @Override public void run() { list.forEach(cacheBean->{ delayQueue.put(cacheBean); System.out.println("放入:"+cacheBean.getData()); }); } } public class GetUserWork implements Runnable{ private DelayQueue<CacheBean<User>> delayque; public GetUserWork(DelayQueue<CacheBean<User>> delayque) { super(); this.delayque = delayque; } @Override public void run() { while(true) { try { CacheBean<User> element = delayque.take(); System.out.println("get element:"+element+" id:"+element.getId()+", name:"+element.getName() +" data:"+element.getName()); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
測試類:
public class MainTest { public static void main(String[] args) throws InterruptedException { User u1 = new User("張三"); CacheBean<User> c1 = new CacheBean<User>("1", "張三", u1, 5000L); User u2 = new User("李四"); CacheBean<User> c2 = new CacheBean<User>("2", "李四", u2, 3000L); List<CacheBean<User>> list = new ArrayList<>(); list.add(c1); list.add(c2); DelayQueue<CacheBean<User>> delayQueue = new DelayQueue<>(); new Thread(new PutUserWork(delayQueue,list)).start(); new Thread(new GetUserWork(delayQueue)).start(); CountDownLatch countDownLatch = new CountDownLatch(1); countDownLatch.await(); } }
在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序總體處理數據的速度。在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這種生產消費能力不均衡的問題,便有了生產者和消費者模式。生產者和消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而是經過阻塞隊列來進行通訊,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。
並行執行任務的框架,把大任務拆分紅不少的小任務,彙總每一個小任務的結果獲得大任務的結果。
工做竊取(work-stealing)算法是指某個線程從其餘隊列裏竊取任務來執行。那麼,爲何須要使用工做竊取算法呢?假如咱們須要作一個比較大的任務,能夠把這個任務分割爲若干互不依賴的子任務,爲了減小線程間的競爭,把這些子任務分別放到不一樣的隊列裏,併爲每一個隊列建立一個單獨的線程來執行隊列裏的任務,線程和隊列一一對應。好比A線程負責處理A隊列裏的任務。可是,有的線程會先把本身隊列裏的任務幹完,而其餘線程對應的隊列裏還有任務等待處理。幹完活的線程與其等着,不如去幫其餘線程幹活,因而它就去其餘線程的隊列裏竊取一個任務來執行。而在這時它們會訪問同一個隊列,因此爲了減小竊取任務線程和被竊取任務線程之間的競爭,一般會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。
Fork/Join使用兩個類來完成以上兩件事情。
①ForkJoinTask:咱們要使用ForkJoin框架,必須首先建立一個ForkJoin任務。它提供在任務
中執行fork()和join()操做的機制。一般狀況下,咱們不須要直接繼承ForkJoinTask類,只須要繼承它的子類,Fork/Join框架提供瞭如下兩個子類。
·RecursiveAction:用於沒有返回結果的任務。
·RecursiveTask:用於有返回結果的任務。
②ForkJoinPool:ForkJoinTask須要經過ForkJoinPool來執行。
Fork/Join有同步和異步兩種方式。
例子:計算硬盤中.txt文件的個數。。。。
public class MyRunnable2 { static class CountWork extends RecursiveTask<Integer> { private String filePath; public CountWork(String filePath) { super(); this.filePath = filePath; } @Override protected Integer compute() { File file = new File(filePath); int count = 0; if (file.isDirectory()) { File[] files = file.listFiles(); if (files == null || files.length == 0) return 0; List<CountWork> taskList = new ArrayList<CountWork>(); for (File f : files) { if (f.isDirectory()) { String lastName = f.getName(); String newFilePath = filePath + File.separator + lastName; // count+= count(newFilePath); CountWork countWork = new CountWork(newFilePath); taskList.add(countWork); // invokeAll(countWork); // count += countWork.join(); } else { try { Thread.currentThread().sleep(1); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } count++; } } if (!taskList.isEmpty()) { //方式1 for (CountWork mtask : invokeAll(taskList)) { count += mtask.join(); } //方式2, // for (CountWork mtask : taskList) { // invokeAll(mtask); // // //Returns the result of the computation when it {@link #isDone is 一直等待到計算完成才返回結果 // count += mtask.join(); // } } }else { count=1; } //System.out.println(Thread.currentThread().getName()); return count; } } //監控Fork/Join池相關方法 private static void showLog(ForkJoinPool pool) { System.out.printf("**********************\n"); System.out.printf("線程池的worker線程們的數量:%d\n", pool.getPoolSize()); System.out.printf("當前執行任務的線程的數量:%d\n", pool.getActiveThreadCount()); System.out.printf("沒有被阻塞的正在工做的線程:%d\n", pool.getRunningThreadCount()); System.out.printf("已經提交給池尚未開始執行的任務數:%d\n", pool.getQueuedSubmissionCount()); System.out.printf("已經提交給池已經開始執行的任務數:%d\n", pool.getQueuedTaskCount()); System.out.printf("線程偷取任務數:%d\n", pool.getStealCount()); System.out.printf("池是否已經終止 :%s\n", pool.isTerminated()); System.out.printf("**********************\n"); } public static void main(String[] args) throws InterruptedException { ForkJoinPool forkJoinPool = new ForkJoinPool(15); File [] roots = File.listRoots(); Long start = System.currentTimeMillis(); //File e=new File("D:\\"); for(File e:roots) { CountWork countWork = new CountWork(e.getAbsolutePath()); forkJoinPool.invoke(countWork); // Thread.sleep(1000); // forkJoinPool.execute(countWork); // showLog(forkJoinPool); System.out.println(countWork.join()); } Long end = System.currentTimeMillis(); System.out.println("用時:"+(end-start)/1000); } }
容許一個或多個線程等待其餘線程完成操做。CountDownLatch的構造函數接收一個int類型的參數做爲計數器,若是你想等待N個點完成,這裏就傳入N。當咱們調用CountDownLatch的countDown方法時,N就會減1,CountDownLatch的await方法會阻塞當前線程,直到N變成零。因爲countDown方法能夠用在任何地方,因此這裏說的N個點,能夠是N個線程,也能夠是1個線程裏的N個執行步驟。用在多個線程時,只須要把這個CountDownLatch的引用傳遞到線程裏便可。
CyclicBarrier的字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要作的事情是,讓一組線程到達一個屏障(也能夠叫同步點)時被阻塞,直到最後一個線程到達屏障時,屏障纔會開門,全部被屏障攔截的線程纔會繼續運行。CyclicBarrier默認的構造方法是CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每一個線程調用await方法告訴CyclicBarrier我已經到達了屏障,而後當前線程被阻塞。CyclicBarrier還提供一個更高級的構造函數CyclicBarrier(int parties,Runnable barrierAction),用於在線程到達屏障時,優先執行barrierAction,方便處理更復雜的業務場景。CyclicBarrier能夠用於多線程計算數據,最後合併計算結果的場景。
CyclicBarrier和CountDownLatch的區別
CountDownLatch的計數器只能使用一次,而CyclicBarrier的計數器可使用reset()方法重置,CountDownLatch.await通常阻塞主線程,全部的工做線程執行countDown,而CyclicBarrierton經過工做線程調用await從而阻塞工做線程,直到全部工做線程達到屏障。
Semaphore(信號量)是用來控制同時訪問特定資源的線程數量,它經過協調各個線程,以保證合理的使用公共資源。應用場景Semaphore能夠用於作流量控制,特別是公用資源有限的應用場景,好比數據庫鏈接。假若有一個需求,要讀取幾萬個文件的數據,由於都是IO密集型任務,咱們能夠啓動幾十個線程併發地讀取,可是若是讀到內存後,還須要存儲到數據庫中,而數據庫的鏈接數只有10個,這時咱們必須控制只有10個線程同時獲取數據庫鏈接保存數據,不然會報錯沒法獲取數據庫鏈接。這個時候,就可使用Semaphore來作流量控制。。Semaphore的構造方法Semaphore(int permits)接受一個整型的數字,表示可用的許可證數量。Semaphore的用法也很簡單,首先線程使用Semaphore的acquire()方法獲取一個許可證,使用完以後調用release()方法歸還許可證。還能夠用tryAcquire()方法嘗試獲取許可證。
Semaphore還提供一些其餘方法,具體以下。
·int availablePermits():返回此信號量中當前可用的許可證數。
·int getQueueLength():返回正在等待獲取許可證的線程數。
·boolean hasQueuedThreads():是否有線程正在等待獲取許可證。
·void reducePermits(int reduction):減小reduction個許可證,是個protected方法。
·Collection getQueuedThreads():返回全部等待獲取許可證的線程集合,是個protected方法。
用法:用用信號量實現有界緩存
public class SemaphporeCase<T> { private final Semaphore items;//有多少元素可拿 private final Semaphore space;//有多少空位可放元素 private List queue = new LinkedList<>(); public SemaphporeCase(int itemCounts){ this.items = new Semaphore(0); this.space = new Semaphore(itemCounts); } //放入數據 public void put(T x) throws InterruptedException { space.acquire();//拿空位的許可,沒有空位線程會在這個方法上阻塞 synchronized (queue){ queue.add(x); } items.release();//有元素了,能夠釋放一個拿元素的許可 } //取數據 public T take() throws InterruptedException { items.acquire();//拿元素的許可,沒有元素線程會在這個方法上阻塞 T t; synchronized (queue){ t = (T)queue.remove(0); } space.release();//有空位了,能夠釋放一個存在空位的許可 return t; } }
Exchanger(交換者)是一個用於線程間協做的工具類。Exchanger用於進行線程間的數據交換。它提供一個同步點,在這個同步點,兩個線程能夠交換彼此的數據。這兩個線程經過exchange方法交換數據,若是第一個線程先執行exchange()方法,它會一直等待第二個線程也執行exchange方法,當兩個線程都到達同步點時,這兩個線程就能夠交換數據,將本線程生產出來的數據傳遞給對方。
例子:
public class ExchangeCase { static final Exchanger<List<String>> exgr = new Exchanger<>(); public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { try { List<String> list = new ArrayList<>(); list.add(Thread.currentThread().getId()+" insert A1"); list.add(Thread.currentThread().getId()+" insert A2"); list = exgr.exchange(list);//交換數據 for(String item:list){ System.out.println(Thread.currentThread().getId()+":"+item); } } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { try { List<String> list = new ArrayList<>(); list.add(Thread.currentThread().getId()+" insert B1"); list.add(Thread.currentThread().getId()+" insert B2"); list.add(Thread.currentThread().getId()+" insert B3"); System.out.println(Thread.currentThread().getId()+" will sleep"); Thread.sleep(1500); list = exgr.exchange(list);//交換數據 for(String item:list){ System.out.println(Thread.currentThread().getId()+":"+item); } } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } }
結果:
14 will sleep 14:13 insert A1 13:14 insert B1 13:14 insert B2 13:14 insert B3 14:13 insert A2