後端開發 高併發設計

最近面試一些公司,被問到的關於Java併發編程的問題,以及本身總結的回答。javascript

Java線程的狀態及如何轉換。

 
線程狀態及其轉換圖

多個線程之間如何協調?

  • wait()、notify()、notifyAll():這三個方法用於協調多個線程對共享數據的存取,因此必須在同步語句塊內使用。wait方法要等待notify/notifyAll的線程釋放鎖後才能開始繼續往下執行。
// 等待方 synchronized(lockObj){ while(condition is false){ lockObj.wait(); } // do business } // 通知方 synchronized(lockObj){ // change condition lockObj.notifyAll(); } 

說說Java的線程池是如何實現的?

  • 建立線程要花費昂貴的資源和時間,若是任務來了才建立線程那麼響應時間會變長,並且一個進程能建立的線程數有限。爲了不這些問題,在程序啓動的時候就建立若干線程來響應處理,它們被稱爲線程池,裏面的線程叫工做線程。
  • maximumPoolSize和corePoolSize的區別:這個概念很重要,maximumPoolSize爲線程池最大容量,也就是說線程池最多能起多少Worker。corePoolSize是核心線程池的大小,當corePoolSize滿了時,同時workQueue full(ArrayBolckQueue是可能滿的) 那麼此時容許新建Worker去處理workQueue中的Task,可是不能超過maximumPoolSize。超過corePoolSize以外的線程會在空閒超時後終止。能夠經過beforeExecute和afterExecute實現線程池的監聽;
 
線程池
 
線程池處理流程
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } 

關於BlockingQueue和TransferQueue的異同。

  • TransferQueue繼承了BlockingQueue並擴展了一些新方法。BlockingQueue是Java 5中加入的接口,它是指這樣的一個隊列:當生產者向隊列添加元素但隊列已滿時,生產者會被阻塞;當消費者從隊列移除元素但隊列爲空時,消費者會被阻塞。
  • TransferQueue則更進一步,生產者會一直阻塞直到所添加到隊列的元素被某一個消費者所消費(不只僅是添加到隊列裏就完事),新添加的transfer方法用來實現這種約束。顧名思義,阻塞就是發生在元素從一個線程transfer到另外一個線程的過程當中,它有效地實現了元素在線程之間的傳遞(以創建Java內存模型中的happens-before關係的方式)。
  • TransferQueue還包括了其餘的一些方法:兩個tryTransfer方法,一個是非阻塞的,另外一個帶有timeout參數設置超時時間的。還有兩個輔助方法hasWaitingConsumer()和getWaitingConsumerCount()。
  • TransferQueue相比SynchronousQueue用處更廣、更好用,由於你能夠決定是使用BlockingQueue的方法(例如put方法)仍是確保一次傳遞完成(即transfer方法)。在隊列中已有元素的狀況下,調用transfer方法,能夠確保隊列中被傳遞元素以前的全部元素都能被處理。Doug Lea說從功能角度來說,LinkedTransferQueue其實是ConcurrentLinkedQueue、SynchronousQueue(公平模式)和LinkedBlockingQueue的超集。並且LinkedTransferQueue更好用,由於它不只僅綜合了這幾個類的功能,同時也提供了更高效的實現。

談談HashMap的實現。

  • 從結構實現來說,HashMap是數組+鏈表+紅黑樹(JDK1.8增長了紅黑樹部分)實現的,以下如所示。
 
HashMap
從源碼可知,HashMap類中有一個很是重要的字段,就是 Node[] table,即哈希桶數組;
Node是HashMap的一個內部類,實現了Map.Entry接口,本質是就是一個映射(鍵值對)。 HashMap就是使用哈希表來存儲的。爲解決衝突,Java中HashMap採用了鏈地址法,簡單來講,就是數組加鏈表的結合。在每一個數組元素上都一個鏈表結構,當數據被Hash後,獲得數組下標,把數據放在對應下標元素的鏈表上。 Node[] table的初始化長度默認值是16,Load factor爲負載因子(默認值是0.75),threshold是HashMap所能容納的最大數據量的Node(鍵值對)個數。threshold = length * Load factor。也就是說,在數組定義好長度以後,負載因子越大,所能容納的鍵值對個數越多。 
  • 肯定哈希桶數組索引位置:取key的hashCode值、高位運算(經過hashCode()的高16位異或低16位實現的)、取模運算。
static final int hash(Object key) { int h; return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16); } 
 
數組索引位置
  • HashMap的put方法執行過程能夠經過下圖來理解。
 
put方法執行過程
public V put(K key, V value) { return putVal(hash(key), key, value, false, true); } final V putVal(int hash, K key, V value, boolean onlyIfAbsent, boolean evict) { Node<K,V>[] tab; Node<K,V> p; int n, i; // 步驟①:tab爲空則建立 if ((tab = table) == null || (n = tab.length) == 0) n = (tab = resize()).length; // 步驟②:計算index,並對null作處理 if ((p = tab[i = (n - 1) & hash]) == null) tab[i] = newNode(hash, key, value, null); else { Node<K,V> e; K k; // 步驟③:節點key存在,直接覆蓋value if (p.hash == hash && ((k = p.key) == key || (key != null && key.equals(k)))) e = p; // 步驟④:判斷該鏈爲紅黑樹 else if (p instanceof TreeNode) e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value); // 步驟⑤:該鏈爲鏈表 else { for (int binCount = 0; ; ++binCount) { if ((e = p.next) == null) { p.next = newNode(hash, key, value, null); //鏈表長度大於8轉換爲紅黑樹進行處理 if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st treeifyBin(tab, hash); break; } // key已經存在直接覆蓋value if (e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k)))) break; p = e; } } if (e != null) { // existing mapping for key V oldValue = e.value; if (!onlyIfAbsent || oldValue == null) e.value = value; afterNodeAccess(e); return oldValue; } } ++modCount; // 步驟⑥:超過最大容量 就擴容 if (++size > threshold) resize(); afterNodeInsertion(evict); return null; } 
  • 擴容機制:擴容(resize)就是從新計算容量,向HashMap對象裏不停的添加元素,而HashMap對象內部的數組沒法裝載更多的元素時,對象就須要擴大數組的長度,以便能裝入更多的元素。固然Java裏的數組是沒法自動擴容的,方法是使用一個新的數組代替已有的容量小的數組。在擴充HashMap的時候,不須要像JDK1.7的實現那樣從新計算hash,只須要看看原來的hash值新增的那個bit是1仍是0就行了,是0的話索引沒變,是1的話索引變成「原索引+oldCap」。這個設計確實很是的巧妙,既省去了從新計算hash值的時間,並且同時,因爲新增的1bit是0仍是1能夠認爲是隨機的,所以resize的過程,均勻的把以前的衝突的節點分散到新的bucket了。
final Node<K,V>[] resize() { Node<K,V>[] oldTab = table; int oldCap = (oldTab == null) ? 0 : oldTab.length; int oldThr = threshold; int newCap, newThr = 0; if (oldCap > 0) { // 超過最大值就再也不擴充了,就只好隨你碰撞去吧 if (oldCap >= MAXIMUM_CAPACITY) { threshold = Integer.MAX_VALUE; return oldTab; } // 沒超過最大值,就擴充爲原來的2倍 else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY && oldCap >= DEFAULT_INITIAL_CAPACITY) newThr = oldThr << 1; // double threshold } else if (oldThr > 0) // initial capacity was placed in threshold newCap = oldThr; else { // zero initial threshold signifies using defaults newCap = DEFAULT_INITIAL_CAPACITY; newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY); } // 計算新的resize上限 if (newThr == 0) { float ft = (float)newCap * loadFactor; newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ? (int)ft : Integer.MAX_VALUE); } threshold = newThr; @SuppressWarnings({"rawtypes","unchecked"}) Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap]; table = newTab; if (oldTab != null) { // 把每一個bucket都移動到新的buckets中 for (int j = 0; j < oldCap; ++j) { Node<K,V> e; if ((e = oldTab[j]) != null) { oldTab[j] = null; if (e.next == null) newTab[e.hash & (newCap - 1)] = e; else if (e instanceof TreeNode) ((TreeNode<K,V>)e).split(this, newTab, j, oldCap); else { // preserve order Node<K,V> loHead = null, loTail = null; Node<K,V> hiHead = null, hiTail = null; Node<K,V> next; do { next = e.next; // 原索引 if ((e.hash & oldCap) == 0) { if (loTail == null) loHead = e; else loTail.next = e; loTail = e; } // 原索引+oldCap else { if (hiTail == null) hiHead = e; else hiTail.next = e; hiTail = e; } } while ((e = next) != null); // 原索引放到bucket裏 if (loTail != null) { loTail.next = null; newTab[j] = loHead; } // 原索引+oldCap放到bucket裏 if (hiTail != null) { hiTail.next = null; newTab[j + oldCap] = hiHead; } } } } } return newTab; } 

談談線程安全的ConcurrentHashMap的實現原理。

  • ConcurrentHashMap在jdk1.8中主要作了2方面的改進:改進一是取消segments字段,直接採用transient volatile HashEntry<K,V>[] table保存數據,採用table數組元素做爲鎖,從而實現了對每一行數據進行加鎖,進一步減小併發衝突的機率;改進二是將原先table數組+單向鏈表的數據結構,變動爲table數組+單向鏈表+紅黑樹的結構,對於hash表來講,最核心的能力在於將key hash以後能均勻的分佈在數組中,若是hash以後散列的很均勻,那麼table數組中的每一個隊列長度主要爲0或者1。但實際狀況並不是老是如此理想,雖然ConcurrentHashMap類默認的加載因子爲0.75,可是在數據量過大或者運氣不佳的狀況下,仍是會存在一些隊列長度過長的狀況,若是仍是採用單向列表方式,那麼查詢某個節點的時間複雜度爲O(n);所以,對於個數超過8(默認值)的列表,jdk1.8中採用了紅黑樹的結構,那麼查詢的時間複雜度能夠下降到O(logN),能夠改進性能。
  • TreeNode類:樹節點類,另一個核心的數據結構。當鏈表長度過長的時候,會轉換爲TreeNode。可是與HashMap不相同的是,它並非直接轉換爲紅黑樹,而是把這些結點包裝成TreeNode放在TreeBin對象中,由TreeBin完成對紅黑樹的包裝。並且TreeNode在ConcurrentHashMap繼承自Node類,而並不是HashMap中的集成自LinkedHashMap.Entry。
  • 二叉查找樹,也稱有序二叉樹(ordered binary tree),是指一棵空樹或者具備下列性質的二叉樹:
1.若任意節點的左子樹不空,則左子樹上全部結點的值均小於它的根結點的值;
2.若任意節點的右子樹不空,則右子樹上全部結點的值均大於它的根結點的值;
3.任意節點的左、右子樹也分別爲二叉查找樹。
4.沒有鍵值相等的節點(no duplicate nodes)。
  • 紅黑樹雖然本質上是一棵二叉查找樹,但它在二叉查找樹的基礎上增長了着色和相關的性質使得紅黑樹相對平衡,從而保證了紅黑樹的查找、插入、刪除的時間複雜度最壞爲O(log n)。但它是如何保證一棵n個結點的紅黑樹的高度始終保持在logn的呢?這就引出了紅黑樹的5個性質:
1.每一個結點要麼是紅的要麼是黑的。 2.根結點是黑的。 3.每一個葉結點(葉結點即指樹尾端NIL指針或NULL結點)都是黑的。 4.若是一個結點是紅的,那麼它的兩個兒子都是黑的。 5.對於任意結點而言,其到葉結點樹尾端NIL指針的每條路徑都包含相同數目的黑結點。 

什麼是一致性哈希?

  • 環形Hash空間:按照經常使用的hash算法來將對應的key哈希到一個具備232次方個桶的空間中,即0~(232)-1的數字空間中。如今咱們能夠將這些數字頭尾相連,想象成一個閉合的環形;
  • 把數據經過必定的hash算法處理後映射到環上;
  • 將機器經過hash算法映射到環上(通常狀況下對機器的hash計算是採用機器的IP或者機器惟一的別名做爲輸入值),而後以順時針的方向計算,將全部對象存儲到離本身最近的機器中;
  • 機器的刪除與添加:普通hash求餘算法最爲不妥的地方就是在有機器的添加或者刪除以後會照成大量的對象存儲位置失效,這樣就大大的不知足單調性了。經過對節點的添加和刪除的分析,一致性哈希算法在保持了單調性的同時,仍是數據的遷移達到了最小,這樣的算法對分佈式集羣來講是很是合適的,避免了大量數據遷移,減少了服務器的的壓力。
  • 平衡性:在一致性哈希算法中,爲了儘量的知足平衡性,其引入了虛擬節點。它其實是節點在hash空間的複製品,一實際個節點對應了若干個「虛擬節點」,這個對應個數也成爲「複製個數」,「虛擬節點」在hash空間中以hash值排列。
 
一致性哈希

Java有哪些實現鎖的方式?

  • synchronized同步鎖:它沒法中斷一個正在等候得到鎖的線程,也沒法經過投票獲得鎖,若是不想等下去,也就無法獲得鎖。但除非對鎖的某個高級特性有明確的須要,或者有明確的證據代表在特定狀況下,同步已經成爲瓶頸,不然仍是應當繼續使用synchronized。
  • volatile是比synchronized更輕量,由於它沒有上下文切換;其實現是經過lock指令將緩存行數據寫到系統內存且讓其餘緩存數據無效;
  • ReentrantLock能夠支持公平鎖,固然公平鎖性能會有影響,默認爲非公平的;
// 對於非公平鎖,會執行該方法: final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState();//獲取狀態變量 if (c == 0) {//代表沒有線程佔有該同步狀態 if (compareAndSetState(0, acquires)) {//以原子方式設置該同步狀態 setExclusiveOwnerThread(current);//該線程擁有該FairSync同步狀態 return true; } } else if (current == getExclusiveOwnerThread()) {//當前線程已經擁有該同步狀態 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc);//重複設置狀態變量(鎖的可重入特性) return true; } return false; } // 而對於公平鎖,該方法則是這樣: protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //先判斷該線程節點是不是隊列的頭結點 //是則以原子方式設置同步狀態,獲取鎖 //不然失敗返回 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) {//重入 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } 
  • AQS管理的FIFO等待隊列,獲取鎖狀態失敗的線程會被放入該隊列,等待再次嘗試獲取鎖。而state成員變量,表明着鎖的同步狀態,一個線程成功得到鎖,這個行爲的實質就是該線程成功的設置了state變量的狀態。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; // 這個if分支實際上是一種優化:CAS操做失敗的話才進入enq中的循環。 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } 
  • ReentrantReadWriteLock對重入鎖再進一步分離爲讀鎖和寫鎖,在讀多寫少的場景下能顯著提高性能。
  • ReadLock能夠被多個線程持有而且在做用時排斥任何的WriteLock,而WriteLock則是徹底的互斥。
  • 寫線程獲取寫入鎖後能夠獲取讀取鎖,而後釋放寫入鎖,這樣就從寫入鎖變成了讀取鎖,從而實現鎖降級的特性。讀取鎖是不能直接升級爲寫入鎖的。由於獲取一個寫入鎖須要釋放全部讀取鎖,因此若是有兩個讀取鎖視圖獲取寫入鎖而都不釋放讀取鎖時就會發生死鎖。
  • 若是讀取執行狀況不少,寫入不多的狀況下,使用ReentrantReadWriteLock可能會使寫入線程遭遇飢餓問題,也就是寫入線程吃吃沒法競爭到鎖定而一直處於等待狀態。
  • StampedLock控制鎖有三種模式(寫,讀,樂觀讀),一個StampedLock狀態是由版本和模式兩個部分組成,鎖獲取方法返回一個數字做爲票據stamp,它用相應的鎖狀態表示並控制訪問,數字0表示沒有寫鎖被受權訪問。在讀鎖上分爲悲觀鎖和樂觀鎖。所謂的樂觀讀模式,也就是若讀的操做不少,寫的操做不多的狀況下,你能夠樂觀地認爲,寫入與讀取同時發生概率不多,所以不悲觀地使用徹底的讀取鎖定,程序能夠查看讀取資料以後,是否遭到寫入執行的變動,再採起後續的措施(從新讀取變動信息,或者拋出異常) ,這一個小小改進,可大幅度提升程序的吞吐量!
class Point { private double x, y; private final StampedLock sl = new StampedLock(); void move(double deltaX, double deltaY) { // an exclusively locked method long stamp = sl.writeLock(); try { x += deltaX; y += deltaY; } finally { sl.unlockWrite(stamp); } } //下面看看樂觀讀鎖案例 double distanceFromOrigin() { // A read-only method long stamp = sl.tryOptimisticRead(); //得到一個樂觀讀鎖 double currentX = x, currentY = y; //將兩個字段讀入本地局部變量 if (!sl.validate(stamp)) { //檢查發出樂觀讀鎖後同時是否有其餘寫鎖發生? stamp = sl.readLock(); //若是沒有,咱們再次得到一個讀悲觀鎖 try { currentX = x; // 將兩個字段讀入本地局部變量 currentY = y; // 將兩個字段讀入本地局部變量 } finally { sl.unlockRead(stamp); } } return Math.sqrt(currentX * currentX + currentY * currentY); } //下面是悲觀讀鎖案例 void moveIfAtOrigin(double newX, double newY) { // upgrade // Could instead start with optimistic, not read mode long stamp = sl.readLock(); try { while (x == 0.0 && y == 0.0) { //循環,檢查當前狀態是否符合 long ws = sl.tryConvertToWriteLock(stamp); //將讀鎖轉爲寫鎖 if (ws != 0L) { //這是確認轉爲寫鎖是否成功 stamp = ws; //若是成功 替換票據 x = newX; //進行狀態改變 y = newY; //進行狀態改變 break; } else { //若是不能成功轉換爲寫鎖 sl.unlockRead(stamp); //咱們顯式釋放讀鎖 stamp = sl.writeLock(); //顯式直接進行寫鎖 而後再經過循環再試 } } } finally { sl.unlock(stamp); //釋放讀鎖或寫鎖 } } } 

AtomicLong、LongAdder和LongAccumulator的實現有何不一樣?

  • AtomicLong是Java 5引入的基於CAS的無鎖的操做長整形值的工具類;
  • LongAdder是Java 8提供的累加器,基於Striped64實現。它經常使用於狀態採集、統計等場景。AtomicLong也能夠用於這種場景,但在線程競爭激烈的狀況下,LongAdder要比AtomicLong擁有更高的吞吐量,但會耗費更多的內存空間。
  • Striped64的設計核心思路就是經過內部的分散計算來避免競爭(好比多線程CAS操做時的競爭)。Striped64內部包含一個基礎值和一個單元哈希表。沒有競爭的狀況下,要累加的數會累加到這個基礎值上;若是有競爭的話,會將要累加的數累加到單元哈希表中的某個單元裏面。因此整個Striped64的值包括基礎值和單元哈希表中全部單元的值的總和。
/** * 存放Cell的hash表,大小爲2的冪。 */ transient volatile Cell[] cells; /** * 基礎值,沒有競爭時會使用(更新)這個值,同時作爲初始化競爭失敗的回退方案。 * 原子更新。 */ transient volatile long base; /** * 自旋鎖,經過CAS操做加鎖,用於保護建立或者擴展Cell表。 */ transient volatile int cellsBusy; 
  • LongAccumulator和LongAdder相似,也基於Striped64實現。但要比LongAdder更加靈活(要傳入一個函數接口),LongAdder至關因而LongAccumulator的一種特例。

CompletableFuture對比Future有哪些改進,怎麼用?

  • Future對象表明一個還沒有完成異步操做的結果。從Java 5以來,JUC包一直提供着最基本的Future,不過它太雞肋了,除了get、cancel、isDone和isCancelled方法以外就沒有其餘的操做了,對於結果的獲取很不方便,只能經過阻塞或者輪詢的方式獲得任務的結果。阻塞的方式顯然和咱們的異步編程的初衷相違背,輪詢的方式又會耗費無謂的CPU資源,並且也不能及時地獲得計算結果,這樣很不方便。
  • 好在Java 8中引入了具備函數式風格的CompletableFuture,支持一系列的函數式的組合、運算操做,很是方便,能夠寫出函數式風格的代碼而擺脫callback hell。
  • 主動完成計算:CompletableFuture類實現了CompletionStage和Future接口,因此你仍是能夠像之前同樣經過阻塞或者輪詢的方式得到結果,儘管這種方式不推薦使用。
  • 主要的API以下所示:
supplyAsync/runAsync -- 建立CompletableFuture對象;
whenComplete/whenCompleteAsync/exceptionally -- 計算完成或者拋出異常的時能夠執行特定的Action;
thenApply/thenApplyAsync -- 對數據進行一些處理或變換;
thenAccept/thenAcceptAsync -- 純消費,不返回新的計算值;
thenAcceptBoth/thenAcceptBothAsync/runAfterBoth -- 當兩個CompletionStage都正常完成計算的時候,就會執行提供的Action;
thenCompose/thenComposeAsync -- 這個Function的輸入是當前的CompletableFuture的計算值,返回結果將是一個新的CompletableFuture。 記住,thenCompose返回的對象並不一是函數fn返回的對象,若是原來的CompletableFuture尚未計算出來, 它就會生成一個新的組合後的CompletableFuture。能夠用來實現異步pipline; thenCombine/thenCombineAsync - 並行執行的,它們之間並無前後依賴順序,和thenAcceptBoth的區別在於有返回值; allOf/anyOf -- 全部的/其中一個CompletableFuture都執行完後執行計算
做者:ginobefun 連接:https://www.jianshu.com/p/938100bb0b1e 來源:簡書 簡書著做權歸做者全部,任何形式的轉載都請聯繫做者得到受權並註明出處。
相關文章
相關標籤/搜索