實現線程有三種方式:使用內核線程實現、使用用戶線程實現和使用用戶線程加輕量級進程混合實現。內核線程是直接由操做系統內核支持的線程,經過內核完成線程切換,內核經過操縱調度器對線程進行調度,並負責將線程的任務映射到各個處理器上。
程序不會直接使用內核線程,而是去使用內核線程的高級接口-輕量級進程。每一個輕量級進程由一個內核線程支持。這種輕量級進程與內核線程之間1:1的關係稱爲一對一的線程模型。Sun JDK 的Windows版和Linux版都是使用一對一的線程模型,一條Java線程就映射到一條輕量級進程中。因爲內核線程的支持,每一個輕量級進程成爲一個獨立的調度單元,一個輕量級進程的阻塞不會影響整個進程。但也是由於基於內核線程實現,各類線程操做,如建立、析構及同步都要進行系統調用,須要在用戶態和內核態中來回切換,調用代價高,其次輕量級進程消耗必定的內核資源,所以一個系統支持輕量級進程的數量有限。java
線程調度是指系統爲線程分配處理器使用權的過程,分爲協同式調度和搶佔式調度。協同式調度的多線程系統,線程執行時間由線程自己控制,線程完成本身的工做以後,主動通知系統切換到另外一個線程上。優勢是實現簡單,切換操做是由線程主動的,對線程可知,沒有線程同步問題。缺點是線程執行時間不可控制,若是一個線程阻塞,可能致使整個系統奔潰。搶佔式調度的多線程系統,每一個線程有系統分配執行時間,線程的切換不禁線程自己決定。(yield可讓出執行時間,但線程自己沒法獲取執行時間)優勢是線程執行時間系統可控。Java使用的線程調度方式就是搶佔式調度。算法
Java線程的6種狀態:編程
線程建立成功但還沒有啓動就是New;Runable狀態的線程可能正在執行,也可能在等待CPU分配執行時間;當線程等待另外一個線程通知調度器一個條件時就進入等待狀態,例如Object.wait、Thread.join;當這些方法指定時間參數時就成了限時等待;當一個線程試圖獲取一個內部的對象鎖,而該鎖被另外一線程持有時,該線程進入阻塞狀態;當線程因run方法正常退出而天然死亡,或者由於沒有捕獲的異常死亡都會致使線程進入Terminated狀態。安全
Java中斷機制是一種協做機制,經過中斷並不能直接終止另外一個線程,而須要被中斷的線程本身處理中斷。當對一個線程調用interrup方法時,線程的中斷狀態將被置位。這是每個線程都具備的boolean標誌位。每一個線程都應該不時地檢查這個標誌,以判斷線程是否被中斷,並及時處理。服務器
public void interrupt() { if (this != Thread.currentThread()) checkAccess(); synchronized (blockerLock) { Interruptible b = blocker; if (b != null) { interrupt0(); // Just to set the interrupt flag b.interrupt(this); return; } } interrupt0(); } public static boolean interrupted() { return currentThread().isInterrupted(true); } public boolean isInterrupted() { return isInterrupted(false); } private native boolean isInterrupted(boolean ClearInterrupted);
能夠看到,interrupt方法經過設置中斷位來完成中斷。interrupted方法和isInterrupted方法都是經過調用native方法來檢測中斷的,interrupted是一個靜態方法,用來檢測當前線程是否被中斷,並且interrupted會清除該線程的中斷狀態;isInterrupted是一個實例方法,可用來檢驗是否有線程被中斷,該方法不會改變中斷狀態。多線程
通常來講,咱們中斷線程的目的極可能是想中止線程執行。怎麼中止線程執行呢?咱們能夠在判斷中斷置位後,用return退出run方法。但這樣設計並不優雅,另一種方式,就是拋出InterruptedException並在run方法裏捕獲。捕獲後怎麼處理也是件值得考慮的事,最好的方法是直接拋給調用者處理,但run方法是重寫方法,結構已固定,沒法拋出異常,咱們還能夠在捕獲InterruptedException後從新中斷當前線程,讓調用者檢測。併發
wait方法是掛起當前線程,釋放當前對象的控制權(釋放鎖),而後線程處於等待狀態。notify是通知正在等待對象控制權(鎖)的線程能夠繼續運行。notifyAll是通知全部等待對象控制權的線程繼續運行。這幾個方法是基於monitor監視器鎖來實現的,因此必須在同步塊內執行。app
sleep讓當前線程暫停指定時間。wait方法依賴於同步,sleep能夠直接調用。由於sleep只是暫時讓出CPU的執行權,並不釋放鎖,而wait須要釋放鎖。舉個簡單的例子:異步
public class WaitTest { private static Object o = new Object(); static class Thread1 extends Thread{ @Override public void run() { try { synchronized(o){ System.out.println("Thread1--start"); //o.wait(); Thread.sleep(2000); System.out.println("Thread1--end"); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); System.out.println("a"); } } } static class Thread2 extends Thread{ @Override public void run() { synchronized(o){ System.out.println("Thread2--start"); o.notify(); System.out.println("Thread2--end"); } } } public static void main(String[] args) throws InterruptedException{ Thread1 t1 = new Thread1(); Thread2 t2 = new Thread2(); t1.start(); Thread.sleep(100);//保證t1先得到鎖 t2.start(); } }
在線程1裏分別調用sleep和wait會有不一樣的結果,調用sleep時線程1不會釋放鎖,因此會打印完「Thread1 start」、「Thread1 end」,再進入線程2的打印。調用wait時,打印完「Thread1-start」,就會釋放鎖,這時線程2的打印得以繼續進行,會打印「Thread2 start」。ide
Thread.yield()方法會將當前線程從Running轉爲Runnable,讓出當前對進程的使用,以便其餘線程有機會執行,不過調度器能夠忽虐該方法,也不能指定暫停時間,通常只用來調試和測試。
Thread.join()方法用於將異步的線程「合併」爲同步的線程,父線程等待子線程執行完成後再執行。其實並不算合併,而是調用join的線程進入限時等待,不斷檢查子線程狀態,在子線程執行完成後恢復執行。看一下它的實現原理:
public final synchronized void join(long millis) throws InterruptedException { long base = System.currentTimeMillis(); long now = 0; if (millis < 0) { throw new IllegalArgumentException("timeout value is negative"); } if (millis == 0) { while (isAlive()) { wait(0); } } else { while (isAlive()) { long delay = millis - now; if (delay <= 0) { break; } wait(delay); now = System.currentTimeMillis() - base; } } }
能夠看到,join是經過子線程不斷輪詢本身狀態一直到執行完畢才返回繼續執行父線程。
在Java中,最基本的互斥同步手段就是synchronized關鍵字,synchronized自動提供一個鎖以及相關的條件。synchronized同步塊對於同一條線程來講是可重入的,其次,同步塊在已進入的線程執行完以前,會阻塞後面其餘線程的進入。前面提到,Java的線程是映射到系統原生線程上,阻塞或喚醒一個線程,都須要操做系統幫忙完成,須要從用戶態轉爲核心態,這須要耗費很長時間。所以synchronized是重量級操做,虛擬機自己會有一些優化手段,好比在阻塞以前加入自旋等待過程,避免頻繁切入核心態之中。
重入鎖(ReentrantLock)與synchronized類似,具有同樣的線程重入性,一個表現爲API層面的互斥鎖,另外一個表現爲原生語法層面的互斥鎖。ReetrantLock增長了一些功能:等待可中斷、公平鎖和鎖綁定多個條件。
公平鎖是指多個線程等待同一個鎖時,必須按照申請鎖的時間順序來依次得到鎖,能夠經過帶布爾值的構造函數要求使用公平鎖。
/** * Creates an instance of {@code ReentrantLock} with the * given fairness policy. * * @param fair {@code true} if this lock should use a fair ordering policy */ public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
等待可中斷是指持有鎖的線程長期不釋放鎖的時候,正在等待的線程能夠選擇放棄等待:
import java.util.concurrent.locks.ReentrantLock; public class ReentrantLockTestOne { static int count = 0; static ReentrantLock lock = new ReentrantLock(); public static void main(String[] args) throws Exception{ Thread a = new Thread(new CountThread("a")); Thread b = new Thread(new CountThread("b")); a.start(); Thread.sleep(100);//確保b線程後執行,a能先得到鎖 b.start(); Thread.sleep(500);//等待0.5s後,a線程尚未釋放鎖,經過中斷放棄等待 b.interrupt(); } static class CountThread extends Thread{ String name; CountThread(String name){ this.name = name; } @Override public void run() { /*try{ lock.lockInterruptibly(); }catch(InterruptedException e){ System.out.println("Thread "+name+" interrupted"); return; }*/ lock.lock(); System.out.println(Thread.currentThread().isInterrupted()); try{ System.out.println("Thread "+name+" begin"); for(int i=0; i<2000000; i++){ for(int j=0; j<100000; j++){ count++; } } System.out.println("Thread "+name+" end"); }finally{ lock.unlock(); } } } }
咱們先看lock.lock的執行結果:
能夠看見a線程執行完後b纔開始執行,且b線程的中斷位已被置位。說明lock是阻塞式的獲取鎖,只有在成功獲取到鎖之後才處理中斷信息,而且怎麼處理由調用端決定,lock只負責給中斷位置位。
再看一下lock.lockInterruptibly的執行結果:
能夠看到,lockInterruptibly會當即處理中斷信息,拋出InterruptedException,而不用等到獲取鎖。
鎖綁定多個條件是指一個ReentrantLock對象能夠同時綁定多個Condition對象。在synchronized中,鎖對象的wait和notify其實實現一個隱含的條件,若是要和多個條件關聯,必須額外添加鎖。
public class ReentrantLockTestTwo { static ReentrantLock lock = new ReentrantLock(); static Condition productCondition = lock.newCondition(); static Condition customerCondition = lock.newCondition(); static Set<Object> set = new HashSet<Object>(8); public static void main(String[] args) { ProductThread pt = new ProductThread(); CustomerThread ct = new CustomerThread(); new Thread(pt).start(); new Thread(pt).start(); new Thread(ct).start(); new Thread(ct).start(); } static class ProductThread extends Thread{ @Override public void run() { lock.lock(); try{ System.out.println("進入生產線程"); for(;;){ Thread.sleep(1000); if(set.size()>=6){ customerCondition.signalAll(); productCondition.await(); }else{ System.out.println("開始生產"); Object o = new Object(); set.add(o); System.out.println("目前有"+set.size()+"個產品"); } } }catch(Exception e){ }finally{ lock.unlock(); } } } static class CustomerThread extends Thread{ @Override public void run() { lock.lock(); try{ System.out.println("進入使用線程"); for(;;){ Thread.sleep(1000); if(set.size()<=2){ productCondition.signalAll(); customerCondition.await(); }else{ System.out.println("開始使用"); Iterator it = set.iterator(); if(it.hasNext()){ Object o = it.next(); set.remove(o); } System.out.println("目前有"+set.size()+"個產品"); } } }catch(Exception e){ }finally{ lock.unlock(); } } } }
上面展現了ReentrantLock鎖綁定多個條件。能夠看到咱們在產品上加鎖並在鎖上新建了兩個條件:生產條件和使用條件。當產品數量多於6時,讓生產線程等待,小於2時,讓使用線程等待。從執行結果能夠看出,每次喚醒的線程只多是生產或使用線程的一種,而並無喚醒這個鎖上的全部線程。
鎖優化有幾種措施:自旋鎖與自適應鎖、鎖消除、鎖粗化、輕量級鎖和偏向鎖。
前面提到同步塊會阻塞其餘線程,而線程的阻塞和恢復須要系統切換狀態,耗費較長時間。因此若是持有鎖的線程很快就會釋放鎖時,咱們並不須要讓等待線程阻塞,而是讓它執行一個忙循環,這就是所謂的自旋鎖。但自旋鎖雖然避免了線程切換的開銷,卻要佔用處理器時間。當鎖被長時間佔用時,自旋鎖除了浪費處理器資源就沒有做用了。JDK1.6引入了自適應的自旋鎖,有系統決定自旋時間,改善性能。
鎖消除是指虛擬機即時編譯器在運行時,對一些代碼上要求同步,可是被檢測到不可能存在共享數據競爭的鎖進行消除。鎖消除主要斷定依據來源於逃逸分析的數據支持。
若是一系列的連續操做都對同一個對象反覆加鎖和解鎖,甚至加鎖操做是出如今循環體中,頻繁地進行互斥同步操做也會致使沒必要要的性能損耗。這時能夠鎖粗化。
偏向鎖是消除數據在無競爭狀況下的同步,所謂偏向,是指其偏向第一個得到它的線程。假設JVM啓用了偏向鎖,當鎖對象第一次被線程得到的時候,虛擬機將會把對象頭的標誌位設爲「01」,即偏向模式。同時使用CAS操做把獲取到這個鎖的線程的ID記錄在對象的Mark Word中,若是CAS成功,持有偏向鎖的線程之後每次進入這個鎖相關的同步塊時,虛擬機都再也不進行任何同步操做。第二個線程來訪問時,檢查原來持有對象鎖線程是否存活,若已介素則偏向鎖偏向第二個線程,不然第一個線程若是存活,經過線程棧檢查對象是否處於鎖定狀態,若是無鎖,則撤銷偏向恢復到未鎖定對象,若是仍然鎖定,則升級爲輕量級鎖。
輕量級鎖是在無競爭狀況(我的認爲是輕度競爭)下使用CAS操做去消除同步使用的互斥量,線程在執行同步塊以前,虛擬機在當前線程的棧幀中創建Lock Record來存儲對象目前Mark Word的拷貝,而後JVM經過CAS替換對象Mark Word爲Lock Record的指針。若是成功,對象處於輕量級鎖定,失敗說明存在額外線程競爭鎖,則嘗試自旋,若是自旋時間內還未得到鎖,則開始膨脹,修改MarkWord爲重量級鎖的指針,而且阻塞本身。
同步機制採用了「以時間換空間」的方式,而ThreadLocal採用了「以空間換時間」的方式。ThreadLocal會在每一個線程中爲變量建立一個副本,即每一個線程內部都會有一個該變量,且在線程內部任何地方可使用,線程之間互不影響,這樣須要在多線程使用的變量就不存在線程安全問題。
ThreadLocal自己並不存儲變量值,它自己其實只是一個鍵值對的鍵,用來讓線程從ThreadLocalMap中獲取Value,ThreadLocalMap是每一個線程內部的容器。
能夠看一下ThreadLocal的源碼:
public void set(T value) { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); } public T get() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } return setInitialValue(); }
同時,咱們看一下ThreadLocalMap的源碼,會發現它的key使用的是ThreadLocal的弱引用。至於爲何用弱引用,是由於從上圖咱們能夠看見一共有兩條引用鏈到ThreadLocal變量,若是ThreadLocalRef置空,也就是程序再也不訪問ThreadLocal變量了。此時若是key使用的是強引用,那麼根據判斷對象存亡的可達性分析算法,ThreadLocal並不會被回收,由於還有一條GC root的引用鏈到ThreadLocal上;若是使用的是弱引用,咱們知道弱引用只會存活到下一次JVM GC時,ThreadLocal就能夠被回收。
static class Entry extends WeakReference<ThreadLocal<?>> { /** The value associated with this ThreadLocal. */ Object value; Entry(ThreadLocal<?> k, Object v) { super(k); value = v; } }
使用弱引用ThreadLocal當然能夠被回收,可是帶來新的問題。ThreadLocal被回收後ThreadLocalMap中會出現key爲null的Entry,意味着沒有辦法訪問這些key爲null的Entry的value,若是當前線程遲遲不結束,value對應的對象不被回收,就會致使內存泄漏。從下面的代碼看到,ThreadLocal的set、get、remove方法在一些時機下會清理這些value,但這不及時,仍是會有一些內存泄漏,最好的辦法時咱們能夠經過每次使用完ThreadLocal後,調用它的remove方法來避免這種狀況。
public T get() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } return setInitialValue(); } private Entry getEntry(ThreadLocal<?> key) { int i = key.threadLocalHashCode & (table.length - 1); Entry e = table[i]; if (e != null && e.get() == key) return e; else return getEntryAfterMiss(key, i, e); } private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) { Entry[] tab = table; int len = tab.length; while (e != null) { ThreadLocal<?> k = e.get(); if (k == key) return e; if (k == null) expungeStaleEntry(i); else i = nextIndex(i, len); e = tab[i]; } return null; } private int expungeStaleEntry(int staleSlot) { Entry[] tab = table; int len = tab.length; // expunge entry at staleSlot tab[staleSlot].value = null; tab[staleSlot] = null; size--; // Rehash until we encounter null Entry e; int i; for (i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) { ThreadLocal<?> k = e.get(); if (k == null) { e.value = null; tab[i] = null; size--; } else { int h = k.threadLocalHashCode & (len - 1); if (h != i) { tab[i] = null; // Unlike Knuth 6.4 Algorithm R, we must scan until // null because multiple entries could have been stale. while (tab[h] != null) h = nextIndex(h, len); tab[h] = e; } } } return i; }
Collections做爲集合的工具類,除了提供一些有效的算法以外,還能夠對集合進行包裝。其中一種就是非同步集合包裝成同步集合。
public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m) { return new SynchronizedMap<>(m); } private static class SynchronizedMap<K,V> implements Map<K,V>, Serializable { private static final long serialVersionUID = 1978198479659022715L; private final Map<K,V> m; // Backing Map final Object mutex; // Object on which to synchronize SynchronizedMap(Map<K,V> m) { this.m = Objects.requireNonNull(m); mutex = this; } SynchronizedMap(Map<K,V> m, Object mutex) { this.m = m; this.mutex = mutex; } public int size() { synchronized (mutex) {return m.size();} } public boolean isEmpty() { synchronized (mutex) {return m.isEmpty();} } public boolean containsKey(Object key) { synchronized (mutex) {return m.containsKey(key);} } public boolean containsValue(Object value) { synchronized (mutex) {return m.containsValue(value);} } public V get(Object key) { synchronized (mutex) {return m.get(key);} } public V put(K key, V value) { synchronized (mutex) {return m.put(key, value);} } public V remove(Object key) { synchronized (mutex) {return m.remove(key);} } public void putAll(Map<? extends K, ? extends V> map) { synchronized (mutex) {m.putAll(map);} } public void clear() { synchronized (mutex) {m.clear();} } private transient Set<K> keySet; private transient Set<Map.Entry<K,V>> entrySet; private transient Collection<V> values; public Set<K> keySet() { synchronized (mutex) { if (keySet==null) keySet = new SynchronizedSet<>(m.keySet(), mutex); return keySet; } } public Set<Map.Entry<K,V>> entrySet() { synchronized (mutex) { if (entrySet==null) entrySet = new SynchronizedSet<>(m.entrySet(), mutex); return entrySet; } } public Collection<V> values() { synchronized (mutex) { if (values==null) values = new SynchronizedCollection<>(m.values(), mutex); return values; } } public boolean equals(Object o) { if (this == o) return true; synchronized (mutex) {return m.equals(o);} } public int hashCode() { synchronized (mutex) {return m.hashCode();} } public String toString() { synchronized (mutex) {return m.toString();} } // Override default methods in Map @Override public V getOrDefault(Object k, V defaultValue) { synchronized (mutex) {return m.getOrDefault(k, defaultValue);} } @Override public void forEach(BiConsumer<? super K, ? super V> action) { synchronized (mutex) {m.forEach(action);} } @Override public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) { synchronized (mutex) {m.replaceAll(function);} } @Override public V putIfAbsent(K key, V value) { synchronized (mutex) {return m.putIfAbsent(key, value);} } @Override public boolean remove(Object key, Object value) { synchronized (mutex) {return m.remove(key, value);} } @Override public boolean replace(K key, V oldValue, V newValue) { synchronized (mutex) {return m.replace(key, oldValue, newValue);} } @Override public V replace(K key, V value) { synchronized (mutex) {return m.replace(key, value);} } @Override public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) { synchronized (mutex) {return m.computeIfAbsent(key, mappingFunction);} } @Override public V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) { synchronized (mutex) {return m.computeIfPresent(key, remappingFunction);} } @Override public V compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) { synchronized (mutex) {return m.compute(key, remappingFunction);} } @Override public V merge(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction) { synchronized (mutex) {return m.merge(key, value, remappingFunction);} } private void writeObject(ObjectOutputStream s) throws IOException { synchronized (mutex) {s.defaultWriteObject();} } }
能夠看見,其實包裝的原理很簡單,無非是對原來的全部操做加上同步鎖,這樣非同步集合就成了同步集合。
讀寫鎖其實就是共享鎖和排它鎖。若是對資源加了寫鎖,其餘線程沒法再得到讀鎖或寫鎖,但持有寫鎖的線程,能夠對資源加讀鎖(鎖降級)。若是線程對資源加了讀鎖,其餘線程能夠繼續加讀鎖。舉個例子:幾我的一塊兒開發,SVN服務器上的代碼你們能夠同時查看,但對同一段代碼的修改提交同時只能一我的操做。這裏查看就須要讀鎖,提交就須要加寫鎖,以下代碼。
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; public class ReentrantReadWriteLockTest { static int readCount = 0; static int writeCount = 0; static ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); static String code = "hello world"; static ReadLock readlock = lock.readLock(); static WriteLock writelock = lock.writeLock(); public static void main(String[] args) { ReadThread r = new ReadThread(); WriteThread w = new WriteThread(); for(int i=0; i<3; i++){ new Thread(r).start(); new Thread(w).start(); } } static class ReadThread extends Thread{ @Override public void run() { while(true){ readlock.lock(); try{ readCount ++; System.out.println("同時有"+readCount+"個線程同時讀的內容: "+code); String temp = new String(code); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(code.equals(temp)); readCount --; }finally{ readlock.unlock(); } } } } static class WriteThread extends Thread{ @Override public void run() { while(true){ writelock.lock(); try{ writeCount ++; code = code + "a"; System.out.println("同時有"+writeCount+"個線程寫的內容: "+code); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } writeCount --; }finally{ writelock.unlock(); } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
咱們來看一下運行結果:
能夠看到,有多個線程同時讀取代碼,但任意時刻只有一個線程進行更改。且讀的時候不容許更改(代碼是經過比較先後兩次讀到的內容來驗證讀寫鎖不兼容的,這不夠嚴謹,暫時沒有想到更好例子)。至於鎖降級,由於在修改數據後寫線程沒有再用到數據,因此上例中沒有用鎖降級,在此摘抄一段話來講明其必要性。
參考:《深刻理解Java虛擬機》、《Java併發編程實戰》、《Java核心技術》。