並行化-你的高併發大殺器

1.前言

想必熱愛遊戲的同窗小時候,都幻想過要是本身要是能像鳴人那樣會多重影分身之術,就能一邊打遊戲一邊上課了,惋惜漫畫就是漫畫,現實中並無這個技術,你要麼只有老老實實的上課,要麼就只有逃課去打遊戲了。雖然在現實中咱們沒法實現多重影分身這樣的技術,可是咱們能夠在計算機世界中實現咱們這樣的願望。java

2.計算機中的分身術

計算機中的分身術不是天生就有了。在1971年,1971年,英特爾推出的全球第一顆通用型微處理器4004,由2300個晶體管構成。當時,公司的聯合創始人之一戈登摩爾就提出大名鼎鼎的「摩爾定律」——每過18個月,芯片上能夠集成的晶體管數目將增長一倍。最初的主頻740kHz(每秒運行74萬次),如今過了快50年了,你們去買電腦的時候會發現如今的主頻都能達到4.0GHZ了(每秒40億次)。可是主頻越高帶來的收益倒是愈來愈小:git

  • 據測算,主頻每增長1G,功耗將上升25瓦,而在芯片功耗超過150瓦後,現有的風冷散熱系統將沒法知足散熱的須要。有部分CPU均可以用來煎雞蛋了。
  • 流水線過長,使得單位頻率效能低下,越大的主頻其實總體性能反而不如小的主頻。
  • 戈登摩爾認爲摩爾定律將來10-20年會失效。

在單核主頻遇到瓶頸的狀況下,多核CPU應運而生,不只提高了性能,而且下降了功耗。因此多核CPU逐漸成爲如今市場的主流,這樣讓咱們的多線程編程也更加的容易。github

說到了多核CPU就必定要說GPU,你們可能對這個比較陌生,可是一說到顯卡就確定不陌生,筆者搞過一段時間的CUDA編程,我才意識到這個纔是真正的並行計算,你們都知道圖片像素點吧,好比19201080的圖片有210萬個像素點,若是想要把一張圖片的每一個像素點都進行轉換一下,那在咱們java裏面可能就要循環遍歷210萬次。 就算咱們用多線程8核CPU,那也得循環幾十萬次。可是若是使用Cuda,最多能夠365535*512=100661760(一億)個線程並行執行,就這種級別的圖片那也是立刻處理完成。可是Cuda通常適合於圖片這種,有大量的像素點須要同時處理,可是其支持指令很少因此邏輯不能太複雜。GPU只是用來擴展介紹,感興趣能夠和筆者交流。面試

3.應用中的並行

一提及讓你的服務高性能的手段,那麼異步化,並行化這些確定會第一時間在你腦海中顯現出來,在以前的文章:《異步化,你的高併發大殺器》中已經介紹過了異步化的優化手段,有興趣的朋友能夠看看。並行化能夠用來配合異步化,也能夠用來單獨作優化。算法

咱們能夠想一想有這麼一個需求,在你下外賣訂單的時候,這筆訂單可能還須要查,用戶信息,折扣信息,商家信息,菜品信息等,用同步的方式調用,以下圖所示:編程

設想一下這5個查詢服務,平均每次消耗50ms,那麼本次調用至少是250ms,咱們細想一下,在這個這五個服務其實並無任何的依賴,誰先獲取誰後獲取均可以,那麼咱們能夠想一想,是否能夠用多重影分身之術,同時獲取這五個服務的信息呢?優化以下:安全

將這五個查詢服務並行查詢,在理想狀況下能夠優化至50ms。固然提及來簡單,咱們真正如何落地呢?

3.1 CountDownLatch/Phaser

CountDownLatch和Phaser是JDK提供的同步工具類Phaser是1.7版本以後提供的工具類而CountDownLatch是1.5版本以後提供的工具類。這裏簡單介紹一下CountDownLatch,能夠將其當作是一個計數器,await()方法能夠阻塞至超時或者計數器減至0,其餘線程當完成本身目標的時候能夠減小1,利用這個機制咱們能夠將其用來作併發。 能夠用以下的代碼實現咱們上面的下訂單的需求:bash

public class CountDownTask {
    private static final int CORE_POOL_SIZE = 4;
    private static final int MAX_POOL_SIZE = 12;
    private static final long KEEP_ALIVE_TIME = 5L;
    private final static int QUEUE_SIZE = 1600;

    protected final static ExecutorService THREAD_POOL = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE,
            KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue<>(QUEUE_SIZE));
    public static void main(String[] args) throws InterruptedException {
        // 新建一個爲5的計數器
        CountDownLatch countDownLatch = new CountDownLatch(5);
        OrderInfo orderInfo = new OrderInfo();
        THREAD_POOL.execute(() -> {
            System.out.println("當前任務Customer,線程名字爲:" + Thread.currentThread().getName());
            orderInfo.setCustomerInfo(new CustomerInfo());
            countDownLatch.countDown();
        });
        THREAD_POOL.execute(() -> {
            System.out.println("當前任務Discount,線程名字爲:" + Thread.currentThread().getName());
            orderInfo.setDiscountInfo(new DiscountInfo());
            countDownLatch.countDown();
        });
        THREAD_POOL.execute(() -> {
            System.out.println("當前任務Food,線程名字爲:" + Thread.currentThread().getName());
            orderInfo.setFoodListInfo(new FoodListInfo());
            countDownLatch.countDown();
        });
        THREAD_POOL.execute(() -> {
            System.out.println("當前任務Tenant,線程名字爲:" + Thread.currentThread().getName());
            orderInfo.setTenantInfo(new TenantInfo());
            countDownLatch.countDown();
        });
        THREAD_POOL.execute(() -> {
            System.out.println("當前任務OtherInfo,線程名字爲:" + Thread.currentThread().getName());
            orderInfo.setOtherInfo(new OtherInfo());
            countDownLatch.countDown();
        });
        countDownLatch.await(1, TimeUnit.SECONDS);
        System.out.println("主線程:"+ Thread.currentThread().getName());
    }
}

複製代碼

創建一個線程池(具體配置根據具體業務,具體機器配置),進行併發的執行咱們的任務(生成用戶信息,菜品信息等),最後利用await方法阻塞等待結果成功返回。多線程

3.2CompletableFuture

相信各位同窗已經發現,CountDownLatch雖然能實現咱們須要知足的功能可是其任然有個問題是,在咱們的業務代碼須要耦合CountDownLatch的代碼,好比在咱們獲取用戶信息以後咱們會執行countDownLatch.countDown(),很明顯咱們的業務代碼顯然不該該關心這一部分邏輯,而且在開發的過程當中萬一寫漏了,那咱們的await方法將只會被各類異常喚醒。併發

因此在JDK1.8中提供了一個類CompletableFuture,它是一個多功能的非阻塞的Future。(什麼是Future:用來表明異步結果,而且提供了檢查計算完成,等待完成,檢索結果完成等方法。)在我以前的這篇文章中詳細介紹了《異步技巧之CompletableFuture》,有興趣的能夠看這篇文章。咱們將每一個任務的計算完成的結果都用CompletableFuture來表示,利用CompletableFuture.allOf匯聚成一個大的CompletableFuture,那麼利用get()方法就能夠阻塞。

public class CompletableFutureParallel {
    private static final int CORE_POOL_SIZE = 4;
    private static final int MAX_POOL_SIZE = 12;
    private static final long KEEP_ALIVE_TIME = 5L;
    private final static int QUEUE_SIZE = 1600;

    protected final static ExecutorService THREAD_POOL = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE,
            KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue<>(QUEUE_SIZE));
    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
        OrderInfo orderInfo = new OrderInfo();
        //CompletableFuture 的List
        List<CompletableFuture> futures = new ArrayList<>();
        futures.add(CompletableFuture.runAsync(() -> {
            System.out.println("當前任務Customer,線程名字爲:" + Thread.currentThread().getName());
            orderInfo.setCustomerInfo(new CustomerInfo());
        }, THREAD_POOL));
        futures.add(CompletableFuture.runAsync(() -> {
            System.out.println("當前任務Discount,線程名字爲:" + Thread.currentThread().getName());
            orderInfo.setDiscountInfo(new DiscountInfo());
        }, THREAD_POOL));
        futures.add( CompletableFuture.runAsync(() -> {
            System.out.println("當前任務Food,線程名字爲:" + Thread.currentThread().getName());
            orderInfo.setFoodListInfo(new FoodListInfo());
        }, THREAD_POOL));
        futures.add(CompletableFuture.runAsync(() -> {
            System.out.println("當前任務Other,線程名字爲:" + Thread.currentThread().getName());
            orderInfo.setOtherInfo(new OtherInfo());
        }, THREAD_POOL));
        CompletableFuture allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
        allDoneFuture.get(10, TimeUnit.SECONDS);
        System.out.println(orderInfo);
    }
}
複製代碼

能夠看見咱們使用CompletableFuture能很快的完成的需求,固然這還不夠。

3.3 Fork/Join

咱們上面用CompletableFuture完成了咱們對多組任務並行執行,可是其依然是依賴咱們的線程池,在咱們的線程池中使用的是阻塞隊列,也就是當咱們某個線程執行完任務的時候須要經過這個阻塞隊列進行,那麼確定會發生競爭,因此在JDK1.7中提供了ForkJoinTask和ForkJoinPool。

ForkJoinPool中每一個線程都有本身的工做隊列,而且採用Work-Steal算法防止線程飢餓。 Worker線程用LIFO的方法取出任務,可是會用FIFO的方法去偷取別人隊列的任務,這樣就減小了鎖的衝突。

網上這個框架的例子不少,咱們看看如何使用代碼其完成咱們上面的下訂單需求:

public class OrderTask extends RecursiveTask<OrderInfo> {
    @Override
    protected OrderInfo compute() {
        System.out.println("執行"+ this.getClass().getSimpleName() + "線程名字爲:" + Thread.currentThread().getName());
        // 定義其餘五種並行TasK
        CustomerTask customerTask = new CustomerTask();
        TenantTask tenantTask = new TenantTask();
        DiscountTask discountTask = new DiscountTask();
        FoodTask foodTask = new FoodTask();
        OtherTask otherTask = new OtherTask();
        invokeAll(customerTask, tenantTask, discountTask, foodTask, otherTask);
        OrderInfo orderInfo = new OrderInfo(customerTask.join(), tenantTask.join(), discountTask.join(), foodTask.join(), otherTask.join());
        return orderInfo;
    }
    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() -1 );
        System.out.println(forkJoinPool.invoke(new OrderTask()));
    }
}
class CustomerTask extends RecursiveTask<CustomerInfo>{

    @Override
    protected CustomerInfo compute() {
        System.out.println("執行"+ this.getClass().getSimpleName() + "線程名字爲:" + Thread.currentThread().getName());
        return new CustomerInfo();
    }
}
class TenantTask extends RecursiveTask<TenantInfo>{

    @Override
    protected TenantInfo compute() {
        System.out.println("執行"+ this.getClass().getSimpleName() + "線程名字爲:" + Thread.currentThread().getName());
        return new TenantInfo();
    }
}
class DiscountTask extends RecursiveTask<DiscountInfo>{

    @Override
    protected DiscountInfo compute() {
        System.out.println("執行"+ this.getClass().getSimpleName() + "線程名字爲:" + Thread.currentThread().getName());
        return new DiscountInfo();
    }
}
class FoodTask extends RecursiveTask<FoodListInfo>{

    @Override
    protected FoodListInfo compute() {
        System.out.println("執行"+ this.getClass().getSimpleName() + "線程名字爲:" + Thread.currentThread().getName());
        return new FoodListInfo();
    }
}
class OtherTask extends RecursiveTask<OtherInfo>{

    @Override
    protected OtherInfo compute() {
        System.out.println("執行"+ this.getClass().getSimpleName() + "線程名字爲:" + Thread.currentThread().getName());
        return new OtherInfo();
    }
}
複製代碼

咱們定義一個OrderTask而且定義五個獲取信息的任務,在compute中分別fork執行這五個任務,最後在將這五個任務的結果經過Join得到,最後完成咱們的並行化的需求。

3.4 parallelStream

在jdk1.8中提供了並行流的API,當咱們使用集合的時候能很好的進行並行處理,下面舉了一個簡單的例子從1加到100:

public class ParallelStream {
    public static void main(String[] args) {
        ArrayList<Integer> list = new ArrayList<Integer>();
        for (int i = 1; i <= 100; i++) {
            list.add(i);
        }
        LongAdder sum = new LongAdder();
        list.parallelStream().forEach(integer -> {
//            System.out.println("當前線程" + Thread.currentThread().getName());
            sum.add(integer);
        });
        System.out.println(sum);
    }
}
複製代碼

parallelStream中底層使用的那一套也是Fork/Join的那一套,默認的併發程度是可用CPU數-1。

3.5 分片

能夠想象有這麼一個需求,天天定時對id在某個範圍之間的用戶發券,好比這個範圍之間的用戶有幾百萬,若是給一臺機器發的話,可能所有發完須要好久的時間,因此分佈式調度框架好比:elastic-job。都提供了分片的功能,好比你用50臺機器,那麼id%50=0的在第0臺機器上,=1的在第1臺機器上發券,那麼咱們的執行時間其實就分攤到了不一樣的機器上了。

4.並行化注意事項

  • 線程安全:在parallelStream中咱們列舉的代碼中使用的是LongAdder,並無直接使用咱們的Integer和Long,這個是由於在多線程環境下Integer和Long線程不安全。因此線程安全咱們須要特別注意。
  • 合理參數配置:能夠看見咱們須要配置的參數比較多,好比咱們的線程池的大小,等待隊列大小,並行度大小以及咱們的等待超時時間等等,咱們都須要根據本身的業務不斷的調優防止出現隊列不夠用或者超時時間不合理等等。

5.最後

本文介紹了什麼是並行化,並行化的各類歷史,在Java中如何實現並行化,以及並行化的注意事項。但願你們對並行化有個比較全面的認識。最後給你們提個兩個小問題:

  1. 在咱們並行化當中有某個任務若是某個任務出現了異常應該怎麼辦?
  2. 在咱們並行化當中有某個任務的信息並非強依賴,也就是若是出現了問題這部分信息咱們也能夠不須要,當並行化的時候,這種任務出現了異常應該怎麼辦?

最後這篇文章被我收錄於JGrowing,一個全面,優秀,由社區一塊兒共建的Java學習路線,若是您想參與開源項目的維護,能夠一塊兒共建,github地址爲:github.com/javagrowing… 麻煩給個小星星喲。

若是你以爲這篇文章對你有文章,能夠關注個人技術公衆號,最近做者收集了不少最新的學習資料視頻以及面試資料,關注以後便可領取,你的關注和轉發是對我最大的支持,O(∩_∩)O

相關文章
相關標籤/搜索