3 JDK併發包

JDK內部提供了大量實用的API和框架。本章主要介紹這些JDK內部功能,主要分爲3大部分:java

首先,介紹有關同步控制的工具,以前介紹的synchronized就是一種同步控制手段,將介紹更加豐富的多線程控制方法。node

其次,將詳細介紹JDK對線程池的支持,使用線程池,將很大程度提升線程調度的性能。程序員

第三,介紹JDK的一些併發容器。這些容器專爲並行訪問所設計,絕對是高效、安全、穩定的實用工具。算法

多線程團隊協做:同步控制

以前提到的synchronized是最簡單的同步控制的方法。本節中,首先介紹synchronized、Object.wait()、Object.notify()方法的替代品(或者說加強版)——重入鎖數據庫

重入鎖(ReentrantLock):synchronized的功能擴展

jdk6.0以前,重入鎖的性能遠遠好於synchronized,可是以後,二者差距並不大。數組

下面是一段重入鎖使用案例:安全

public class ReenterLock implements Runnable{
   public static ReentrantLock lock=new ReentrantLock();
   public static int i=0;
   @Override
   public void run() {
      for(int j=0;j<10000000;j++){
         lock.lock();   //1
         try{
            i++;
         }finally{
            lock.unlock();  //2
         }
      }
   }
   public static void main(String[] args) throws InterruptedException {
      ReenterLock tl=new ReenterLock();
      Thread t1=new Thread(tl);
      Thread t2=new Thread(tl);
      t1.start();t2.start();
      t1.join();t2.join();
      System.out.println(i);
   }
}

1處加鎖,2處釋放鎖。數據結構

重入鎖是可讓線程反覆進入的,這裏的反覆僅僅侷限於一個線程。能夠寫成下面的形式:多線程

lock.lock();
lock.lock();
try{
   i++;
}finally{
   lock.unlock();
   lock.unlock();
}

這種狀況下,一個線程連續兩次得到同一把鎖,這是容許的!同時,釋放也必須釋放兩次,釋放次數多了,拋出異常,次數少了,至關於線程還持有當前鎖,其餘線程沒法進入臨界區。併發

重入鎖除了靈活,還提供了中斷處理的能力:

中斷響應

對於synchronized來講,若是一個線程在等待鎖,那麼結果只有兩種狀況,要麼它得到這把鎖繼續執行,要麼它保持等待。而重入鎖提供了另外一種可能,那就是線程能夠被中斷。也就是在等待鎖的過程當中,程序能夠根據須要取消對鎖的請求。有些時候,這麼作頗有必要。若是一個線程正在等待鎖,那麼它依然能夠收到一個通知,被告知無需再等待,能夠中止工做了。這種狀況對於處理死鎖是有必定幫助的。

下面代碼產生了一個死鎖,但得益於鎖中斷,咱們能夠輕鬆解決這個死鎖:

public class IntLock implements Runnable {
    //重入鎖ReentrantLock
    public static ReentrantLock lock1 = new ReentrantLock();
    public static ReentrantLock lock2 = new ReentrantLock();
    int lock;
    public IntLock(int lock) {
        this.lock = lock;
    }
    @Override
    public void run() {
        // TODO Auto-generated method stub
        try {
            if (lock == 1) {
                lock1.lockInterruptibly();  //1
                Thread.sleep(500);
                lock2.lockInterruptibly();
                System.out.println("lock1 is working....");
            } else {
                lock2.lockInterruptibly();
                Thread.sleep(500);
                lock1.lockInterruptibly();
                System.out.println("lock2 is working....");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (lock1.isHeldByCurrentThread()) {
                lock1.unlock();    //釋放鎖
            }
            if (lock2.isHeldByCurrentThread()) {
                lock2.unlock();
            }
            System.out.println(Thread.currentThread().getId() + ":線程退出");
        }

    }
    public static void main(String[] args) throws InterruptedException {
        IntLock r1 = new IntLock(1);
        IntLock r2 = new IntLock(2);
        Thread t1 = new Thread(r1);
        Thread t2 = new Thread(r2);
        t1.start();
        t2.start();
        Thread.sleep(1000);
        t2.interrupt(); //2
    }
}

線程t1和t2啓動後,t1先佔用lock1,再佔用lock2;t2先佔用lock2,再請求lock1。這很容易照成t一、t2互相等待,造成死鎖。這裏,統一使用1處的lockInterruptibly()方法,這是一個能夠對中斷進行相應的鎖申請動做,即在等待鎖的過程當中,能夠響應中斷。

在2處,t2線程被中斷,放棄對lock1的鎖申請,同時釋放已得到的lock2。這時t1就能順利執行完剩餘程序

鎖申請等待限時

除了外部通知以外,避免死鎖還有另一種方法,就是限時等待。咱們可使用tryLock()方法進行一次限時的等待。

public class TimeLock implements Runnable {
   public static ReentrantLock lock = new ReentrantLock();
   @Override
   public void run() {
      try {
         if (lock.tryLock(5, TimeUnit.SECONDS)) {   //2
            Thread.sleep(6000);     //1
         } else {
            System.out.println("get lock failed");
         }
      } catch (InterruptedException e) {
         e.printStackTrace();
      } finally {
         if(lock.isHeldByCurrentThread())
            lock.unlock();
      }
   }
   public static void main(String[] args) {
      TimeLock tl = new TimeLock();
      Thread t1 = new Thread(tl);
      Thread t2 = new Thread(tl);
      t1.start();
      t2.start();
   }
}

在這裏2處,tryLock()方法接收兩個參數,一個表示等待時長,另一個表示計時單位。沒個進入臨界區的線程須要佔用6秒的鎖(1處),而t2因爲等待5秒沒有等到想要的鎖(2處),便返回false。若等待時間改成比5秒大,將返回true,並得到鎖。

公平鎖

在大多數狀況下,鎖的申請是非公平的。系統知識隨機挑選一個,不保證其公平性。公平的鎖,會按照時間的前後順序,保證先到者先得,後到者後得。公平鎖的一大特色是:不會產生飢餓現象。咱們使用synchronized關鍵字獲得的就是非公平鎖,而重入鎖能夠對公平性設置。它有一個構造函數:

public ReentrantLock(boolean fair)     //爲true時是公平鎖

實現公平鎖要維護一個有序隊列,所以實現公平鎖的成本較高,性能相對低下,所以,默認狀況下,鎖時非公平的。

public class FairLock implements Runnable{
    //建立公平鎖
    private static ReentrantLock lock=new ReentrantLock(true);  //1
    public void run() {
        while(true){
            lock.lock();
            try{
                System.out.println(Thread.currentThread().getName()+"得到鎖");
            }finally{
                lock.unlock();
            }
        }
    }
    public static void main(String[] args) {
        FairLock lft=new FairLock();
        Thread th1=new Thread(lft);
        Thread th2=new Thread(lft);
        th1.start();
        th2.start();
    }
}/**
Thread-0得到鎖
Thread-1得到鎖
Thread-0得到鎖
Thread-1得到鎖
Thread-0得到鎖
Thread-1得到鎖
*/

你運行上面的程序,會看到結果頗有規律。

若是不使用公平鎖,根據系統的調度,一個線程會傾向於再次獲取已經持有的鎖,這種分配方式是高效的。可是無公平性可言,將上面1中的true改爲false便可。

對ReentrantLock的幾個重要方法整理以下:

  • lock():得到鎖,若是鎖已經被佔用,則等待。
  • lockInterruptibly():得到鎖,但優先響應中斷。
  • tryLock():嘗試得到鎖,若是成功,返回true,失敗返回false。不等待,當即返回。
  • tryLock(long time, TimeUnit unit):在給定時間內嘗試得到鎖。
  • unlock():釋放鎖。

重入鎖的實現

就重入鎖的實現來看,它主要集中在java層面。主要包含三個要素:

  • 第一,是原子狀態。原子狀態使用CAS操做來存儲當前鎖的狀態,判斷鎖是否已經被別的線程持有。
  • 第二,是等待隊列。
  • 第三,是阻塞語句park()和unpark(),用來掛起和恢復線程。沒有獲得鎖的線程將會被掛起。有關park()和unpark()的詳細介紹,能夠參考線程阻塞工具類:LockSupport。

重入鎖的好搭檔:Condition條件

Condition的做用和wait()和notify()方法的做用是大體相同的。不一樣的是wait()和notify()方法是和synchronized關鍵字合做使用的,而Condition是與重入鎖合做的。經過Lock接口(重入鎖實現了該接口)的newCondition()方法能夠生成一個與當前重入鎖綁定的Condition實例。

Condition接口提供的基本方法:

  • await():使當前線程等待,同時釋放當前鎖,當其餘線程使用signal()或者signalAll()方法時,線程會從新得到鎖並繼續執行。當線程中斷時,也能跳出等待,和Object.wait()很是類似。
  • awaitUninterruptibly():與await()基本相同,可是不會響應等待過程當中的中斷。
  • signal():喚醒一個等待中的線程,signalAll()會喚醒全部等待中的線程。

下面是Condition的演示:

public class ReenterLockCondition implements Runnable {
    public static ReentrantLock lock = new ReentrantLock();
    public static Condition condition = lock.newCondition();

    @Override
    public void run() {
        try {
            lock.lock();
            System.out.println("Thread is start...");
            condition.await();
            System.out.println("Thread is going on");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ReenterLockCondition tl = new ReenterLockCondition();
        Thread t1 = new Thread(tl);
        t1.start();
        Thread.sleep(2000);
        //通知線程t1繼續執行
        lock.lock();   //1
        condition.signal();
        lock.unlock();
    }
}

和Object.wait()和notify()方法同樣,當線程使用Condition.await()時,要求線程持有相關的重入鎖,在Condition.await()調用後,這個線程會釋放這把鎖。同理,在Condition.signal()方法調用時,也要求線程先得到相關的鎖。在siganl()方法調用後,系統會從當前Condition對象的等待隊列中,喚醒一個線程,一旦線程被喚醒,它會從新嘗試得到與之綁定的重入鎖,一旦成功得到,就能夠繼續執行了。所以,通常調用完condition.signal()後,都須要釋放重入鎖。

容許多個線程同時訪問:信號量(Semaphore)

廣義上講,信號量是對鎖的擴展。不管是內部鎖synchronized仍是重入鎖ReentrantLock,一次都只容許一個線程訪問一個資源,而信號量能夠指定多個線程,同時訪問某個資源

主要提供了兩個構造函數:

public Semaphore(int permits)
public Semaphore(int permits, boolean fair)     //第二個參數能夠指定是否公平

在構造信號量對象時,必需要指定信號量的准入數,即同時能申請多少個許可。信號量的主要邏輯方法有:

public void acquire()
public void acquireUninterruptibly()
public boolean tryAcquire()
public boolean tryAcquire(long, TimeUnit unit)
public void release()
  • acquire():嘗試得到一個准入的許可。若沒法得到,則線程會等待,直到有線程釋放一個許可或當前線程被中斷。
  • acquireUninterruptibly():和acquire()方法相似,可是不響應中斷。
  • tryAcquire():嘗試得到一個許可,當即返回結果
  • release():釋放一個許可。
public class SemapDemo implements Runnable{
    final Semaphore semp = new Semaphore(5);    //3
    @Override
    public void run() {
        try {
            semp.acquire(); //1
            //模擬耗時操做
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getId()+":done!");    //2
            semp.release();     //4
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    public static void main(String[] args) {
        ExecutorService exec = Executors.newFixedThreadPool(20);
        final SemapDemo demo=new SemapDemo();
        for(int i=0;i<20;i++){
            exec.submit(demo);
        }
    }
}

上述代碼中,1處到2處爲臨界區管理代碼,程序會限制這段代碼的線程數。在第3處,申明瞭一個包含5個許可的信號量。這意味着1~2處只能同時有5個線程進入。線程在使用完acquire(),在離開時,務必使用release()釋放信號量。這和釋放鎖是一個道理。

讀寫鎖:ReadWriteLock

ReadWriteLock是JDK5中提供的讀寫分離鎖。讀寫分離鎖能夠有效地減小鎖競爭,以提高系統性能。用鎖分離的機制來提高性能很容易理解,若是使用重入鎖或內部鎖,理論上全部讀—讀、讀—寫、寫—寫都是串行操做。而讀寫鎖,容許多個線程同時讀

好比A一、A二、A3進行寫操做,B一、B二、B3進行讀操做。讀寫鎖容許B一、B二、B3之間並行。可是,考慮數據完整性,寫寫操做和讀寫操做間依然是須要相互等待和持有鎖的。總結以下:

  • 讀-讀不互斥:可並行;

  • 讀-寫互斥;

  • 寫-寫互斥;

    public class ReadWriteLockDemo {
       private static Lock lock = new ReentrantLock();
       private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
       private static Lock readLock = readWriteLock.readLock();
       private static Lock writeLock = readWriteLock.writeLock();
       private int value;
    
       public Object handleRead(Lock lock) throws InterruptedException {
          try {
             lock.lock(); // 模擬讀操做
             Thread.sleep(1000); // 讀操做的耗時越多,讀寫鎖的優點越明顯
             System.out.println(Thread.currentThread().getName()+" read end!");
             return value;
          } finally {
             lock.unlock();
          }
       }
    
       public void handleWrite(Lock lock, int index) throws InterruptedException {
          try {
             lock.lock(); // 模擬寫操做
             Thread.sleep(1000);
             System.out.println(Thread.currentThread().getName()+"  wrait end!");
             value = index;
          } finally {
             lock.unlock();
             System.out.println(value);
          }
       }
    
       public static void main(String[] args) {
          // TODO Auto-generated method stub
          final ReadWriteLockDemo demo = new ReadWriteLockDemo();
          Runnable readRunnable = () -> {
             // TODO Auto-generated method stub
             try {
                demo.handleRead(readLock);
                // demo.handleRead(lock);
             } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
             }
          };
          Runnable writeRunnable = () -> {
             // TODO Auto-generated method stub
             try {
                demo.handleWrite(writeLock, new Random().nextInt());
                // demo.handleWrite(lock, new Random().nextInt());
             } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
             }
    
          };
          for (int i = 0; i < 18; i++) {
             new Thread(readRunnable).start();    //1
          }
          for (int i = 18; i < 20; i++) {
             new Thread(writeRunnable).start();   //2
          }
       }
    }

上面代碼中,讀和寫的線程使用耗時的操做來模擬,在1處開啓同時讀的線程,能夠從結果看出讀的速度能夠是並行的,而2處則不行。

倒計時鎖:CountDownLatch

這個工具稱爲倒計數器:一般用來控制線程等待,它可讓某一個線程等待直到倒計時結束,再開始執行。

使用場景:好比火箭就很適合使用CountDownLatch。火箭發射前,每每要進行各項設備、儀器檢查,只有檢查完畢後,引擎才能點火。

CountDownLatch的構造函數接收一個整數,即當前這個計數器的計數個數:

public CountDownLatch(int count)

演示:

public class CountDownLatchDemo implements Runnable {
    static final CountDownLatch end = new CountDownLatch(10);   //1
    static final CountDownLatchDemo demo = new CountDownLatchDemo();    

    @Override
    public void run() {
        try {
            //模擬檢查任務
            Thread.sleep(new Random().nextInt(10) * 1000);
            System.out.println("check complete");
            end.countDown();    //2
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService exec = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 10; i++) {
            exec.submit(demo);
        }
        //等待檢查
        end.await();    //3
        //發射火箭
        System.out.println("Fire!");
        exec.shutdown();
    }
}

在1處,生成一個CountDownLatch,計數數量爲10,表示須要10個線程完成任務,等待在CountDownLatch上的線程才能繼續執行。2處表示一個線程已完成,計數器減一。在3處,要求主線程等待10個線程所有完成任務後,主線程才繼續執行。

111_2019-08-26_15-58-26

主線程在CountDownLatch上等待,當全部檢查任務所有完成後,主線程方能繼續執行。

循環柵欄:CyclicBarrier

循環柵欄(CyclicBarrier)和倒計時鎖(CountDownLatch)很是相似:只是循環柵欄的計數器能夠反覆使用。好比假設咱們將計數器設置爲10,那麼湊齊第一批10個線程後,計數器就會歸零,而後接着湊齊下一批10個線程,這就是循環柵欄的內在含義。

使用場景:好比司令下達命令,要10個士兵去完成一項任務,士兵要先集合報道完,接着去執行任務。當10個士兵把手頭任務都執行完成了,司令才能對象宣佈,任務完成!

這裏有兩步:1,士兵集合報道;2,士兵把任務完成。當這兩步前後完成,司令才認爲任務完成。

構造函數:比CountDownLatch稍微強大一些。CyclicBarrier能夠接收一個參數做爲barrierAction(系統當計數器一次計數完成後,系統會執行的動做):

public class CyclicBarrierDemo {
    public static class Soldier implements Runnable {
        private String soldier;
        private final CyclicBarrier cyclic;

        Soldier(CyclicBarrier cyclic, String soldierName) {
            this.cyclic = cyclic;
            this.soldier = soldierName;
        }

        public void run() {
            try {
                //士兵報道
                System.out.println(soldier + " 報道");
                //等待全部士兵到齊
                cyclic.await();     //2
                doWork();           
                //等待全部士兵完成任務    
                cyclic.await();     //3
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }

        void doWork() {
            try {
                Thread.sleep(Math.abs(new Random().nextInt() % 10000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(soldier + ":任務完成");
        }
    }

    public static class BarrierRun implements Runnable {
        boolean flag;
        int N;

        public BarrierRun(boolean flag, int N) {
            this.flag = flag;
            this.N = N;
        }

        public void run() {
            if (flag) {
                System.out.println("司令:[士兵" + N + "個,任務完成!]");
            } else {
                System.out.println("司令:[士兵" + N + "集合完畢]");
                flag = true;
            }
        }
    }

    public static void main(String args[]) {
        final int N = 10;
        Thread[] allSoldier = new Thread[N];
        boolean flag = false;
        CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N));   //1
        //設置屏障點,主要是爲了執行這個方法
        System.out.println("隊伍集合");
        for (int i = 0; i < N; ++i) {
            allSoldier[i] = new Thread(new Soldier(cyclic, "士兵 " + i));
            allSoldier[i].start();  //4
        }
    }
}

在1處,建立了CyclicBarrier實例,並將計數器設置爲10,並要求在計數器達到指標時,執行BarrierRun。在2處,每個士兵線程都會等待,知道全部士兵集合完畢,集合完畢後,意味着CyclicBarrier的一次計數完成,當再一次調用CyclicBarrier()時,會進行下一次計數。在3處,會等待全部士兵完成任務。還能夠第三次第四次調用 cyclic.await();

整個工做過程圖示:

2222_2019-08-26_17-02-53

CyclicBarrier.await可能會拋出兩個異常,第一是中斷異常,能夠響應外部緊急事件。大部分迫使線程等待的方法均可能拋出這個異常。第二是它特有的BrokenBarrierException,這個異常說明當前的CyclicBarrier已經破損了,可能沒有辦法等待全部線程到齊了。若是繼續等待,就白等了。

能夠在4處上方插入:

if (i == 5)
    allSoldier[0].interrupt();

這樣作,咱們能夠獲得1箇中斷異常和9個BrokenBarrierException,1個士兵處於中斷,其餘9個須要等待這個線程,拋出BrokenBarrierException能夠避免其餘9個線程進行永久的,無謂的等待。

線程阻塞工具類:LockSupport

LockSupport是一個很實用的線程阻塞工具,能夠在線程的任何位置讓線程阻塞。

和Thread.suspend()相比,它尼補了因爲resume()在前生成的致使線程沒法繼續執行的問題。和Object.wait()相比,它不須要先得到某個對象鎖,也不會拋出InterruptedException。

public class LockSupportDemo {
   public static Object u = new Object();
   static ChangeObjectThread t1 = new ChangeObjectThread("t1");
   static ChangeObjectThread t2 = new ChangeObjectThread("t2");
 
   public static class ChangeObjectThread extends Thread {
      public ChangeObjectThread(String name){
         super.setName(name);
      }
      @Override
      public void run() {
         synchronized (u) {
            System.out.println("in "+getName());
            LockSupport.park(this);     //1
         }
      }
   }
   public static void main(String[] args) throws InterruptedException {
      t1.start();
      Thread.sleep(1000);
      t2.start();
      LockSupport.unpark(t1);       //2
      LockSupport.unpark(t2);       //3
      t1.join();
      t2.join();
   }
}

咱們將原來的suspend和resume方法用park()和unpark()代替,在1處,咱們掛起了當前線程,在2處,咱們分別繼續執行t1和t2,從結果能夠看出,它不會由於unpark在park執行前而致使線程永久掛起。

爲何LockSupport不會致使線程永久掛起?

由於LockSupport使用了相似信號量的機制(不一樣的是不能累加),它爲每一個線程準備了一個許可。

  • 若許可可用—>park()會當即返回,將許可變爲不可用—>線程阻塞;
  • 調用unpark()—>使許可變爲可用

這個特色使得:即便unpark()操做發生在park()以前,它也可使下一次park()操做當即返回。這就是不會致使線程永久掛起的緣由。

同時,處於park()掛起狀態的線程不會像suspend()那樣給出使人費解的Runnable狀態,它會很是明確的給出一個WAITING狀態,甚至會標註是park()引發的。這讓問題很容易分析。

1111_2019-08-27_10-11-12

LockSupport除了阻塞功能外,還支持中斷響應。可是和其餘接收中斷的函數不同,它不拋出中斷異常,而是默默返回,但能夠從Thread.interrupted()等方法得到中斷標記。

public class LockSupportIntDemo {
  public static Object u = new Object();
  static ChangeObjectThread t1 = new ChangeObjectThread("t1");
  static ChangeObjectThread t2 = new ChangeObjectThread("t2");

  public static class ChangeObjectThread extends Thread {
      public ChangeObjectThread(String name) {
          super.setName(name);
      }

      public void run() {
          synchronized (u) {
              System.out.println("in " + getName());
              LockSupport.park();
              if (Thread.interrupted()) {
                  System.out.println(getName() + " 被中斷了!");
              }
          }
          System.out.println(getName() + " 執行結束");
      }
  }

  public static void main(String[] args) throws InterruptedException {
      t1.start();
      Thread.sleep(100);
      t2.start();
      t1.interrupt();
      LockSupport.unpark(t2);
  }
}

線程複用:線程池

多線程在多核的處理下有助於性能,但若是不加控制的使用線程,反而會對系統性能產生不利影響。

爲何會形成不利影響?

  1. 線程建立和關閉須要花費時間,少數沒關係,系統級別的(線程不少的)就很耗時了。
  2. 線程自己須要佔用內存。有拋出out of memory的危險

在實際生產環境中,線程的數量必須獲得控制。

什麼是線程池

聯想一下數據庫鏈接池,就知道線程池是啥了。

在線程池中,總有那麼幾個活躍的線程。當須要時,就從池中取出空閒線程,當完成工做後,再還回去,方便其餘人使用。

JDK對線程池的支持

JDK提供了一套Executor框架,用來對線程池的支持。它的核心成員以下:

Executor框架結構_2019-08-27_11-14-53

上面是jdk併發包的核心類。其中ThreadPoolExecutor表示一個線程池。Excecutor扮演線程池工廠的角色,經過它能夠取得一個擁有特定功能的線程池。

Executors提供了各類類型的線程池:

線程池類型 做用
newFixedThreadPool() 返回固定線程數量的線程池。線程池中線程數量保持不變。有新任務時,有空閒線程,則執行。若沒有則暫存一個任務隊列,等到有空閒線程,再執行
newSingleThreadExecutor() 返回只有一個線程的線程池。如有多餘任務提交線程池,則存入任務隊列,待線程空閒,按先入先出的順序執行任務隊列中的任務。
newCachedThreadPool() 返回一個可根據實際狀況調整線程數量的線程池。如有空閒線程可用,則用空閒線程。若沒有,建立新的線程處理任務。全部線程完成任務後,將返回線程池進行復用。
newSingleThreadScheduledExecutor() 返回一個ScheduleExecutorService對象,線程池大小爲1。它會在給定時間執行某任務。如在固定延時以後,或週期性執行某個任務
newScheduledThreadPool() 返回ScheduleExecutorService對象,但能夠指定線程池數量

一、固定大小的線程池

以newFixedThreadPool()爲例,簡單展現線程池的使用:

public class ThreadPoolDemo {
   public static class MyTask implements Runnable {
      @Override
      public void run() {
         System.out.println(System.currentTimeMillis() + ":Thread ID:"
               + Thread.currentThread().getId());
         try {
            Thread.sleep(1000);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
      }
   }

   public static void main(String[] args) {
      MyTask task = new MyTask();
      ExecutorService es = Executors.newFixedThreadPool(5);     //1
      for (int i = 0; i < 10; i++) {
         es.submit(task);   //2
      }
   }
}

在1處,建立了一個固定大小的線程池,內有5個線程。在2處,依次向線程池提交了10個任務。

上面程序,前5個任務和後5個任務的執行時間正好相差1秒。

二、計劃任務

newScheduledThreadPool()。它返回一個ScheduledExecutorService對象,能夠根據時間須要進行調度,它其實起到了計劃任務的做用。它的一些主要方法以下:

public ScheduledFuture<?> schedule(Runnable command, long delay, 
          TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
          long initialDelay, long period, TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
          long initialDelay, long delay, TimeUnit unit);

scheduleAtFixedRate()和scheduleWithFixedDelay()都會對任務進行週期性調度,不過它們有小小區別:

FixedRate和FixedDelay的區別_2019-08-27_14-28-23

下面是scheduleAtFixedRate()的例子,任務執行1秒,調用週期2秒。即每2秒,任務會被執行一次。

public class ScheduledExecutorServiceDemo {
   public static void main(String[] args) {
      ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
      ses.scheduleAtFixedRate(new Runnable() {
         @Override
         public void run() {
            try {
               Thread.sleep(1000);
               System.out.println(System.currentTimeMillis()/1000);
            } catch (InterruptedException e) {
               e.printStackTrace();
            }
         }
      }, 0, 2, TimeUnit.SECONDS);
   }
}

上面的執行結果是每次打印時間間隔爲2秒。

那若是任務的執行時間超過調度時間,會發生什麼呢?會出現堆疊的狀況嗎,不會,若出現這種狀況,任務的週期將變成8秒,即任務完成那一刻纔開始下一次任務的調度。

若是採用scheduleWithFixedDelay(),任務的實際間隔將是10秒。

刨根究底:核心線程池的內部實現

對於核心的幾個線程池,其內部都使用了ThreadPoolExecutor實現。這裏就不給出它們的實現方式了。

下面是ThreadPoolExecutor最重要的構造函數:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

參數含義:

  • corePoolSize:指定線程池中的線程數量。
  • maximumPoolSize:指定了線程池中的最大線程數量。
  • keepAliveTime:超過corePoolSize的空閒線程,在多長時間內,會被銷燬。
  • unit:keepAliveTime的單位。
  • workQueue:任務隊列,被提交但還沒有被執行的任務。
  • threadFactory:線程工廠,用來建立線程,用默認的便可。
  • handler:拒絕策略。太多任務而處理不及時,如何拒絕任務。

參數workQueue:被提交但未被執行的任務隊列,它是一個BlockingQueue接口的對象,僅用於存放Runnable對象。ThreadPoolExecutor的構造函數中可使用下面幾種BlockingQueue:

  • 直接提交的隊列:該功能由SynchronousQueue提供。SynchronousQueue沒有容量,若使用它,提交的任務不會被真實的保存,而老是將任務提交給線程執行。若沒有空線程,則建立;若線程數量達到頂峯,則執行拒絕策略。一般,使用SynchronousQueue須要很大的maximumPoolSize值,不然很容易執行拒絕策略。

  • 有界的任務隊列:可使用ArrayBlockingQueue實現。它的構造函數必須帶一個容量參數,表示最大容量。若實際線程數小於corePoolSize,則優先建立新線程,若大於corePoolSize,則將新任務加入等待隊列。若等待隊列已滿,在總線程數<=maximumPoolSize的前提下,建立新的進行執行任務。若>maximumPoolSize,執行拒絕策略。

    可見,有界隊列僅在任務隊列裝滿時,才肯呢過將線程數提高至corePoolSize以上。換句話說,除非系統很是繁忙,不然核心線程數維持在corePoolSize。

  • 無界的任務隊列:可經過LinkedBlockingQueue實現。與有界隊列相比,除非系統資源耗盡,不然無界隊列不存在任務入隊失敗的狀況。

    當系統線程數<corePoolSize,建立新線程;

    若>=corePoolSize,不增長新線程,加入等待隊列,若任務建立和處理速度差別太大,無界隊列會保持快速增加,知道耗盡系統內存。

  • 優先任務隊列(帶有執行優先級的隊列):經過PriorityBlockingQueue實現,能夠控制任務的執行先後順序。高優先級的任務線執行。

回顧一下:

newFixedThreadPool()方法:它的corePoolSize和maximumPoolSize大小同樣,由於固定大小的線程池不存在線程數量的動態變化。同時,它使用無界隊列存聽任務列表,從而在任務提交頻繁的狀況下有可能耗盡系統資源。

newSingleThreadExecutor()返回單線程線程池,是newFixedThreadPool()的退化,只是簡單將線程數設爲1。

newCachedThreadPool()方法返回corePoolSize爲0,maximumPoolSize無窮大的線程池。剛開始該線程池無線程,它會將提交的線程加入SynchronousQueue,這是一種當即提交的隊列,它會迫使線程池增長新的線程執行任務。當任務執行完畢,在60秒內將線程池不用的線程回收(不留任何空閒線程)。所以,當同時有大量任務提交時,任務執行又不快,那麼系統便會開啓燈亮線程處理,很快就會耗盡系統資源。

超負載了?使用拒絕策略

JDK內置的拒絕策略:

  • AbortPolicy策略:該策略會直接拋出異常,阻止系統正常工做。
  • CallerRunsPolicy策略:只要線程池未關閉,該策略直接在調用者線程中,運行當前被丟棄的線程。顯然,不會真正丟棄任務,可是,任務提交線程的性能可能急劇降低。
  • DiscardOledestPolicy策略:該策略將丟棄最老的一個請求,即將要執行的一個任務,並嘗試再次提交當前任務。
  • DiscardPolicy策略:該策略默默丟棄沒法處理的任務,不予任何處理。(若容許丟失,多是最好的一種方式了)

能夠本身擴展RejectedExecutionHandle接口實現本身的拒絕策略,下面代碼簡單演示了自定義線程池和拒絕策略的使用:

public class RejectThreadPoolDemo {
   public static class MyTask implements Runnable {
      @Override
      public void run() {
         System.out.println(System.currentTimeMillis() + ":Thread ID:"
               + Thread.currentThread().getId());
         try {
            Thread.sleep(100);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
      }
   }

   public static void main(String[] args) throws InterruptedException {
      MyTask task = new MyTask();
      ExecutorService es = new ThreadPoolExecutor(5, 5,
                0L, TimeUnit.MILLISECONDS,
                new SynchronousQueue<Runnable>(),
                Executors.defaultThreadFactory(),
                new RejectedExecutionHandler(){
               @Override
               public void rejectedExecution(Runnable r,
                     ThreadPoolExecutor executor) {
                  System.out.println(r.toString()+" is discard");
               }
      });       //1
      for (int i = 0; i < Integer.MAX_VALUE; i++) {
         es.submit(task);
         Thread.sleep(10);
      }
   }
}

上述1處自定義了一個線程池。該線程池有5個常駐線程,而且最大線程數量也是5個。這和固定大小的線程池是同樣的。可是它卻擁有一個只有10個容器的等待隊列。在這裏,咱們自定義了拒絕策略,只是比DiscardPolicy高級一點點,把拒絕的信息打印出來,在實際應用中,咱們能夠將其記錄到日誌上。用來分析系統的負載和任務丟失狀況。

自定義線程建立:ThreadFactory

線程池的主要做用是爲了線程複用,也就是避免了線程的頻繁建立。可是,線程池最開始的線程從何而來呢?答案就是ThreadFactory。

ThreadFactory是一個接口,它只有一個方法,用來建立線程:

Thread newThread(Runnable r);

當線程池須要新建線程時,就會調用這個方法。

咱們使用自定義線程能夠更自由地設置池子中全部線程的狀態,甚至能夠設置爲守護線程:

public class TFThreadPoolDemo {
   public static class MyTask implements Runnable {
      @Override
      public void run() {
         System.out.println(System.currentTimeMillis() + ":Thread ID:"
               + Thread.currentThread().getId());
         try {
            Thread.sleep(100);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
      }
   }

   public static void main(String[] args) throws InterruptedException {
      MyTask task = new MyTask();
      ExecutorService es = new ThreadPoolExecutor(5, 5,
                0L, TimeUnit.MILLISECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactory(){
                   @Override
                   public Thread newThread(Runnable r) {
                      Thread t= new Thread(r);
                      t.setDaemon(true);
                      System.out.println("create "+t);
                      return t;
                   }
                }
               );
      for (int i = 0; i < 5; i++) {
         es.submit(task);
      }
      Thread.sleep(2000);
   }
}

擴展線程池

雖然JDK已經幫咱們實現了這個穩定的高性能線程池。但若是咱們須要對線程池進行一些擴展。好比,想監控每一個任務執行的開始和結束時間,或者其餘一些自定義加強功能,怎麼辦呢?

ThreadPoolExecutor:它也是一個能夠擴展的線程池。它提供了beforeExecute()、afterExecute()和terminated()三個接口對線程池進行控制。

在默認的ThreadPoolExecutor實現中,提供了空的beforeExecute()、afterExecute()實現。在實際引用中,能夠對其擴展實現對線程池運行狀態的跟蹤,輸出一些有用的調試信息,用以幫助系統故障診斷。下面演示對線程池的擴展,在這個擴展中,將記錄每個任務的執行日誌:

public class ExtThreadPool {
    public static class MyTask implements Runnable {
        public String name;

        public MyTask(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            System.out.println("正在執行" + ":Thread ID:" + Thread.currentThread().getId()
                    + ",Task Name=" + name);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>()) {
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("準備執行:" + ((MyTask) r).name);
            }
            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println("執行完成:" + ((MyTask) r).name);
            }
            @Override
            protected void terminated() {
                System.out.println("線程池退出");
            }
        };
        for (int i = 0; i < 5; i++) {
            MyTask task = new MyTask("TASK-GEYM-" + i);
            es.execute(task);
            Thread.sleep(10);
        }
        es.shutdown();
    }
}/**
......
正在執行:Thread ID:13,Task Name=TASK-GEYM-2
準備執行:TASK-GEYM-3
正在執行:Thread ID:14,Task Name=TASK-GEYM-3
準備執行:TASK-GEYM-4
正在執行:Thread ID:15,Task Name=TASK-GEYM-4
執行完成:TASK-GEYM-0
執行完成:TASK-GEYM-1
執行完成:TASK-GEYM-2
執行完成:TASK-GEYM-3
執行完成:TASK-GEYM-4
線程池退出
*/

能夠看到,全部任務執行先後都捕獲到了。這對於應用的調試和診斷是很是有幫助的。

合理的選擇:優化線程池線程數量

線程池的大小對系統的性能有必定的影響。過大或太小的線程數量都沒法發揮最優的系統性能。可是也不用作得很是精確,只要避免極大和極小兩種狀況便可。

最優池的大小計算公式_2019-08-27_19-16-27

堆棧去哪裏了:挖出線程池中被淹沒的異常堆棧

先說明一下要解決的問題!

public class DivTask implements Runnable {
    int a, b;
    public DivTask(int a, int b) {
        this.a = a;
        this.b = b;
    }
    @Override
    public void run() {
        double re = a / b;      //1
        System.out.println(re);
    }
    public static void main(String[] args) {
        ThreadPoolExecutor pools = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                0L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
        for(int i=0; i<5; i++)
            pools.submit(new DivTask(100, i));
//            pools.execute(new DivTask(100, i));
    }
}/**
100.0
25.0
50.0
33.0
*/

在上面程序中,只有四個輸出結果,少了一個,然而沒有報錯信息。使用submit會出現這樣的狀況(execute會拋出異常,具體緣由後面再看吧)。

再說下解決方案!

對於程序員來講,沒有異常堆棧是最頭疼的事。咱們能夠經過兩種方法來討回異常堆棧:

1 是放棄submit()改用execute(),如註釋所示;

pools.execute(new DivTask(100, i));

2 是改造submit():

Future<?> submit = pools.submit(new DivTask(100, i));
            submit.get();

以上兩種均可以獲得部分堆棧信息:

Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at geym.ch3.DivTask.main(DivTask.java:21)
Caused by: java.lang.ArithmeticException: / by zero
    at geym.ch3.DivTask.run(DivTask.java:13)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

注意,上面說的是部分!咱們只知道異常是在哪裏拋出的,也就是代碼的1處,可是不肯定線程是在哪裏提交的,任務的具體提交的位置被淹沒了!

三、本身動手,擴展ThreadPoolExecutor線程池(完全解決的辦法)

爲了少加班!咱們仍是本身動手,把堆棧的信息完全挖出來吧!擴展咱們的ThreadPoolExecutor線程池,讓它在調度任務以前,先保存一下提交任務線程的堆棧信息。

public class TraceThreadPoolExecutor extends ThreadPoolExecutor {
   public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                           long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
      super(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue);
   }
   @Override
   public void execute(Runnable task) {
      // TODO Auto-generated method stub
      super.execute(wrap(task, clientTrace(), Thread.currentThread().getName()));    //包裝器
   }
   @Override
   public Future<?> submit(Runnable task) {
      // TODO Auto-generated method stub
      return super.submit(wrap(task, clientTrace(), Thread.currentThread().getName()));
   }
   private Exception clientTrace() {
      return new Exception("Client stack trace");
   }
   private Runnable wrap(final Runnable task,final Exception clientStack, String clientThreadName) {    //1
      return new Runnable() {
         @Override
         public void run() {
            try {
               task.run();
            } catch (Exception e) {
               clientStack.printStackTrace();
               try {
                  throw e;
               } catch (Exception e1) {
                  // TODO Auto-generated catch block
                  e1.printStackTrace();
               }
            }
         }
      };
   }
   public static class DivTask implements Runnable {
      int a,b;
      public DivTask(int a,int b) {
         this.a = a;
         this.b = b;
      }
      @Override
      public void run() {
         double re = a/b;
         System.out.println(re);
      }
   }
   public static void main(String[] args) {
      ThreadPoolExecutor pools = new TraceThreadPoolExecutor(0, Integer.MAX_VALUE,
            0L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
      for(int i=0; i<5; i++)
         pools.execute(new DivTask(100, i));
   }
}

在wrap()(1處)方法的第2個參數爲一個異常,裏面保存着提交任務的線程的堆棧信息。該方法將咱們傳入的Runnable任務進行一層包裝,使之能處理異常信息。當任務發生異常時,這個異常會被打印。

分而治之:Fork/Join框架

fork()用來開啓分支線程來處理任務,通常會提交給ForkJoinPool線程池進行處理,以節省系統資源。

Join()用來等待fork()的執行分支執行結束。

使用Fork/Join進行數據處理時的整體結構如圖所示:

ForkJoin執行邏輯_2019-08-28_15-06-28

因爲線程池的優化,提交的任務和線程數量不是一對一的關係。一般是一個線程處理多個任務,每一個線程都有一個任務隊列。當線程A把任務完成,而線程B還在有一堆任務處理時,線程A會幫助B。B從任務隊列頂部拿數據,而A則是從任務隊列的底部拿數據,這樣有利於避免數據競爭。

互相幫助的線程_2019-08-28_15-13-28

ForkJoinPool的一個重要的接口,能夠提交一個ForkJoinTask,ForkJoinTask支持fork()分解以及join()等待的任務,它有兩個重要子類:RecursiveAction(無返回值)和RecursiveTask(返回v類型)。

public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task);

使用:

public class CountTask extends RecursiveTask<Long> {
    private static final int THRESHOLD = 10000;
    private long start;
    private long end;

    public CountTask(long start, long end) {
        this.start = start;
        this.end = end;
    }

    public Long compute() {
        long sum = 0;
        boolean canCompute = (end - start) < THRESHOLD;
        if (canCompute) {
            //求和總數小於THRESHOLD,直接求和
            for (long i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            //分紅100個小任務
            // 好比start=0,end=100,則每一小步計算2個數
            //i=0,lastOne=0+2=2, pos=2+1=3
            //i=1,lastOne=2+2=4, pos=4+1=5
            //...
            //i=100
            long step = (start + end) / 100;    //
            long pos = start;
            for (int i = 0; i < 100; i++) {
                long lastOne = pos + step;
                if (lastOne > end) lastOne = end;
                CountTask subTask = new CountTask(pos, lastOne);
                pos = lastOne + 1;
                subTask.fork();
                sum += subTask.join();
            }
        }
        return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        CountTask task = new CountTask(0, 200000L);
        ForkJoinTask<Long> result = forkJoinPool.submit(task);  //1
        try {
            long res = result.get();
            System.out.println("sum=" + res);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

在1處,使用forkJoinPool提交了CountTask,CountTask構造一個計算1到200000求和的任務。在compute()方法中,遵循了下面的邏輯:

if (canCompute) {
    //求和總數夠小,直接求和
} else {
    //分紅若干個小任務
}

在使用ForkJoinPool時須要注意,若是任務的劃分層次很深,一直沒有返回,可能出現兩種狀況:

  1. 系統內線程數量越積越多,致使性能嚴重降低。
  2. 函數調用層次變得很深,致使棧溢出。

JDK的併發容器

JDK提供了好用的併發容器類,使用也很方便,這裏主要講講這些工具的具體實現。

併發集合介紹

先簡單認識一下併發集合:

  • ConcurrentHashMap:高效的併發HashMap。即線程安全的HashMap
  • CopyOnWriteArrayList:屬於List,和ArrayList是一族的。在讀多少寫的場合性能很是好,遠遠好於Vector。
  • ConcurrentLinkedQueue:線程安全的LinkedList。
  • BlockingQueue:這是一個接口,JDK內部經過鏈表、數組等方式實現這個接口。表示阻塞隊列,很是適合用於做爲數據共享的通道。
  • ConcurrentSkipListMap:跳錶的實現。這是一個Map,使用跳錶的數據結構進行快速查找。
  • Vector也是線程安全的,另外Collections工具類能夠幫助咱們將任意集合包裝成線程安全的工具類

線程安全的HashMap:ConcurrentHashMap

若是得到一個線程安全的HashMap?

第一種方法是:使用Collections.synchronizedMap()方法來包裝HashMap

static Map<String, String> map = Collections.synchronizedMap(new HashMap<String, String>());

Collections.synchronizedMap()會生成一個名爲SynchronizedMap的Map。它使用委託,將本身全部Map相關的功能交給傳入的HashMap實現,本身則主要負責保證線程安全。

第二種方法是使用ConcurrentHashMap代替HashMap,這種方式更專業,更適合併發場合。

線程安全的List

Vector是線程安全的List,也可使用Collections.synchronizedList()方法來包裝任意List。

高效讀寫隊列:ConcurrentLinkedQueue

ConcurrentLinkedQueue算是高併發中性能最好的隊列了。

具體實現:

一、節點

做爲一個鏈表,天然須要定義一個節點:

private static class Node<E>{
    volatile E item;
    volatile Node<E> next;

item用來表示目標元素,好比:放入String,item就是String元素。next表示Node的下一個元素。這樣Node就環環相扣,串在一塊兒了。

二、CAS操做

首先,說明一下CAS操做的原理:CAS操做包含三個操做數—— 內存位置的值(V)、預期原值(A)和新值(B)。若是內存位置的值與預期原值相匹配,那麼處理器會自動將該位置更新爲新值。不然,處理器不作任何操做。不管哪一種狀況,它都會在CAS指令以前返回該位置的值。CAS有效地說明了「我認爲位置V應該包含值A;若是包含該值,則將B放到這個位置;不然,不要更改該位置,只告訴我這個位置如今的值便可。」

CAS是一種樂觀鎖。樂觀鎖在少寫的狀況下適用,如果多寫的狀況,會致使CAS算法不斷的進行retry,反而下降了系統性能,多寫的狀況適合適用悲觀鎖。

CAS操做_2019-08-29_09-49-50

casItem()表示設置當前Node的item值。cmp爲指望值,第二個參數val爲目標值。噹噹前值等於cmp指望值時,就會將目標值設置爲val。第二個方法相似。只是它用來設置next字段。

ConcurrentLinkedQueue內部有兩個重要的字段,head和tail,分別表示頭部和尾部。tail的更新不是及時的,而是有延遲,每次更新會跳躍兩個元素。以下圖:

Snipaste_2019-09-05_19-09-42

原書中的源碼分析我沒怎麼看懂,有看懂的童鞋歡迎在評論中分享心得

u=3405433144,4098183174&fm=26&gp=0

高效讀取:不變模式下的CopyOnWriteArrayList

在不少應用場景中,讀操做每每會遠遠大於寫操做。因此這種狀況下,咱們但願讀的性能好些,而寫的性能差些也無所謂。

咱們知道:在讀寫鎖ReadWriteLock中,讀讀不互斥,而讀寫,寫寫是互斥的。

而如今,JDK還提供了另一個讀寫工具類,將讀取性能發揮到極致CopyOnWriteArrayList,它的讀讀不阻塞,讀寫也不會互相阻塞,只有寫寫須要同步等待。

它是怎麼作到讀寫不阻塞的?

CopyOnWrite在寫入操做時,對原有的數據進行復製成一個副本(而不修改原來的數據),將修改的內容寫入複製後的副本中。寫完後,再用副本替換原來的數據,這樣就不會影響讀了。

讀取的實現:

Snipaste_2019-09-06_11-15-40

讀取沒有任何同步和鎖的操做,理由是內部數組array不會發生修改,只會被另一個array替換。

寫人的實現:

public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        Object[] newElements = Arrays.copyOf(elements, len + 1);    //1
        newElements[len] = e;//2
        setArray(newElements);//3
        return true;
    } finally {
        lock.unlock();
    }
}

首先,寫入操做使用鎖,這個鎖僅限於控制寫-寫的狀況。重點在1處,進行了內部元素的複製,而生成一個新數組newElements。2處,將新元素增長到數組末尾,而後,將新數組替換老數組,修改就完成了。整個過程不會影響讀取,而且讀取線程會實時查看到這個修改(array變量是volatile的)

數據共享通道:BlockingQueue

如何實現多個線程間的數據共享呢?好比,線程A但願給線程B發一個消息,用什麼方式好呢?

咱們但願線程A可以通知到線程B,又但願線程A不知道線程B的存在。這樣對於之後線程B的升級或維護,而不用再修改線程A有幫助。爲了實現這一點,咱們可使用一箇中間件BlockingQueue來實現。它就至關於一個意見箱,用來做爲發表意見者與接收意見者溝通的橋樑。

BlockingQueue和以前提到的ConcurrentLinkedQueue和CopyOnWriteArrayList不一樣,它是一個接口,而不是具體的實現。它的主要實現以下圖:

Snipaste_2019-09-06_13-38-28

ArrayBlockingQueue:基於數組實現。更適合作有界隊列,擴展比較不方便

LinkedBlockingQueue:基於鏈表。更適合作無界隊列,由於其內部元素可動態增長。

BlockingQueue爲何適合做爲數據共享的通道呢?緣由在於Blocking(阻塞)。

當服務線程(指獲取隊列中消息並進行處理的線程)處理完隊列中全部的消息後,服務線程是如何知道下一條消息的到來的?BlockingQueue會讓服務線程在隊列爲空時,進行等待,當有新的消息進入隊列後,自動將線程喚醒。

它是如何工做的?以ArrayBlockingQueue爲例說明:

寫入數據:

它有一個items,items就是用來存放數據的隊列。offer()在列隊滿時,返回false。咱們關注的是put()方法,put()也是將元素壓入隊列隊尾,但隊列滿了,它會一直等待,直到隊列中有空閒位置。

讀取數據:

poll()、take()兩個方法都能從隊列中的頭部彈出一個元素。不一樣的是:若是隊列爲空poll()方法直接返回null。而take()方法會等待,直到隊列內有可用元素。

從上面能夠看出,put()take()方法纔是Blocking的關鍵。爲了作好通知和等待兩件事,ArrayBlockingQueue定義了三個字段:

Snipaste_2019-09-06_14-18-02

take()操做:

當隊列爲空時,讓當前線程等待在notEmpty,新元素入隊時,則進行一次notEmpty上的通知。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();   //1
        return dequeue();
    } finally {
        lock.unlock();
    }
}

在1處,要求線程在notEmpty對象中等待。下面是元素入隊的一段代碼:

/**
 * 在當前put位置插入元素、進給和信號。
 * 只有在持有鎖時才調用。
 */
private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();  //1
}

在1處,當新元素入列後,須要通知等待在notEmpty上的線程,讓它們繼續工做。

put()操做:

當隊列滿時,須要讓 壓入線程 等待:

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();    //1
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

在1處,隊列滿時,在notFull對象中等待。

固然,當元素從隊列中挪走時,隊列中有空位時,天然也要通知等待入隊的線程:

private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();   //1
    return x;
}

咱們還會在「5.3 生產者消費者」一節中,看到他們的身影。在那裏,咱們能夠更清楚地看到如何使用BlockingQueue解耦生產者和消費者。

隨機數據結構—跳錶:SkipList

介紹跳錶

除了經常使用的哈希表外,還有一種有趣的數據結構:跳錶。跳錶的本質是同時維護了多個鏈表,而且鏈表是分層的。跳錶的查詢性能要比哈希表好。以下圖

跳錶結構_2019-09-06_16-48-36

最低層的鏈表維護了跳錶中全部的元素,每上面一層都是下面一層的子集,一個元素插入哪一層徹底隨機,運氣很差可能獲得性能最差的結構。可是實際工做中,它仍是表現得很好的。

跳錶內全部元素都是排序的。查找時,從頂級鏈表開始找,一旦發現被查找的元素大於當前鏈表中的取值,就會轉入下一層鏈表繼續查找。好比要查找上面跳錶結構中的7。查找過程以下圖所示:

跳錶的查找過程_2019-09-06_16-55-07

跳錶顯然是一種空間換時間的算法。

使用跳錶實現的Map和使用哈希算法實現的Map的另外一個不一樣之處是:跳錶實現的Map是會排序的,而哈希實現的Map不排序。若須要一個有序的Map,那就選擇跳錶。

使用:ConcurrentSkipListMap

實現這一數據結構的類是ConcurrentSkipListMap。簡單使用:

Map<Integer, Integer> map = new ConcurrentSkipListMap<>();
for (int i = 0; i<30; i++)
    map.put(i,i);
for (Map.Entry<Integer, Integer> entry: map.entrySet()
     ) {
    System.out.println(entry.getKey());
}

跳錶有三個關鍵的數據結構組成:

  • Node<K,V>:(節點,含有key、value、next元素,對Node的全部操做,都使用CAS方法)
  • Index<K,V>:(表示索引,它的內部包裝了node,同時增長了向下和向右的引用),整個跳錶就是根據Index進行全網的組織的。
  • HeadIndex:表示鏈表頭部的第一個Index。它繼承自Index。

下面是三種數據結構的代碼:

Node_2019-09-06_17-20-04

Index_2019-09-06_17-20-23

HeadIndex_2019-09-06_17-20-38

相關文章
相關標籤/搜索