關於聚合和多線程的處理套路

概述

無差異地請求多個外部接口並聚合全部請求結果,應該有屬於它本身的套路,應該將全部多線程的操做屏蔽之,咱們只關心參數和結果。所以,應該拋棄Callable/FutureTask/Future等這些手工模式,這些代碼應該交給框架來實現。java

手工模式

何爲手工模式,咱們以Callable爲例設計請求外部的接口,可能像下面這樣子,參數是NumberParam,兩個外部接口分別是IntToStringCallable和DoubleToStringCallable,git

class IntToStringCallable implements Callable<String> {
        private final NumberParam param;

        IntToStringCallable(NumberParam numberParam) {
            this.param = numberParam;
        }
        @Override
        public String call() {
            return Integer.toHexString(param.getAge());
        }
    }

    class DoubleToStringCallable implements Callable<String> {

        private final NumberParam param;

        DoubleToStringCallable(NumberParam numberParam) {
            this.param = numberParam;
        }
        @Override
        public String call() {
            return Double.toHexString(param.getMoney());
        }
    }

若是採用FutureTask的方式多線程執行這兩個接口,多是這樣子的,github

FutureTask<String> r1 = new FutureTask<>(new IntToStringCallable(numberParam));
        new Thread(r1).start();
        FutureTask<String> r2 = new FutureTask<>(new DoubleToStringCallable(numberParam));
        new Thread(r2).start();
        try {
            List<String> ret = new ArrayList<>();
            ret.add(r1.get());
            ret.add(r2.get());
            log.info("ret=" + ret);
        } catch (Exception ignore) {

        }

須要首先構造FutureTask,而後使用Thread比較原始的api去執行,固然還能夠再簡化一下,好比使用Future方式,編程

ExecutorService threadPool = Executors.newFixedThreadPool(2);
        Future<String> r1 = threadPool.submit(new IntToStringCallable(numberParam));
        Future<String> r2 = threadPool.submit(new DoubleToStringCallable(numberParam));
        try {
            List<String> ret = new ArrayList<>();
            ret.add(r1.get());
            ret.add(r2.get());
            log.info("ret=" + ret);
        } catch (Exception ignore) {

        }

我相信這是一種廣泛常見的作法了。這裏沒有必要繼續評論這些作法的問題了。api

Java 8以後

Java 8以後有了更加方便的異步編程方式了,不用再辛苦地去寫Callable的,一句話就能夠表達Callable+FutureTask/...,多線程

CompletableFuture<String> pf = CompletableFuture.supplyAsync(() -> new IntToStringCallable(numberParam).call());

改造以前的作法結果可能就是這個樣子了,框架

CompletableFuture<String> r1 = CompletableFuture.supplyAsync(() -> new IntToStringCallable(numberParam).call());
        CompletableFuture<String> r2 = CompletableFuture.supplyAsync(() -> new DoubleToStringCallable(numberParam).call());
        try {
            List<String> ret = new ArrayList<>();
            ret.add(r1.get());
            ret.add(r2.get());
            log.info("ret=" + ret);
        } catch (Exception ignore) {

        }

其實能夠看出來,這個時候咱們不必定須要一個Callable了,提供異步的能力是supplyAsync來完成的,咱們只須要正常的入參出參的普通方法就能夠了。異步

Java 8以後再以後

Java 8以後的異步編程方式確實簡單了不少,可是在咱們的業務代碼中仍是出現了和異步編程相關的無關業務邏輯的事情,能否繼續簡化呢。本案的設計靈感來自一樣Java 8的優秀設計——ParallelStream,舉個簡單的例子,ide

Arrays.asList("a", "b", "c").parallelStream().map(String::toUpperCase).collect(Collectors.toList());

異步及多線程是ParallelStream來完成的,用戶只須要完成String::toUpperCase部分。異步編程

本案的設計主要有三個interface來實現,分別是,

public interface MyProvider<T,V> {
    T provide(V v);
}
public interface MyCollector<T> {

    void collectList(T t);

    List<T> retList();
}
public interface MyStream<T,V> {

    List<T> toList(List<MyProvider<T,V>> providers, V v);
}

其實MyProvider表達是請求外部接口,MyStream表示一種相似ParallelStream的思想,一種內化異步多線程的操做模式,MyCollector屬於內部設計api能夠不暴露給用戶;
一個改寫上面的例子的例子,

@Test
    public void testStream() {
        MyProvider<String,NumberParam> p1 = new IntToStringProvider();
        MyProvider<String,NumberParam> p2 = new DoubleToStringProvider();
        List<MyProvider<String, NumberParam>> providers = Arrays.asList(p1, p2);
        MyStream<String, NumberParam> myStream = new CollectStringStream();

        List<String> strings = myStream.toList(providers, numberParam);
        log.info("ret=" + strings);
    }

在這個方法內一點異步編程的內容都沒有的,用戶只須要編程本身關心的邏輯便可,固然是要按照Provider的思路去寫,這或許有一點心智負擔。
這個CollectStringStream幫咱們完成來一些髒活累活,

public List<String> toList(List<MyProvider<String, NumberParam>> myProviders, NumberParam param) {
        MyCollector<String> myCollector = new NoMeaningCollector();
        List<CompletableFuture<Void>> pfs = new ArrayList<>(myProviders.size());
        for (MyProvider<String, NumberParam> provider : myProviders) {
            CompletableFuture<Void> pf = CompletableFuture.runAsync(() -> myCollector.collectList(provider.provide(param)), executor);
            pfs.add(pf);
        }
        try {
            CompletableFuture.allOf(pfs.toArray(new CompletableFuture[0])).get(3, TimeUnit.SECONDS);
        } catch (Exception e) {
            if (e instanceof TimeoutException) {
                pfs.forEach(p -> {
                    if (!p.isDone()){
                        p.cancel(true);
                    }});
            }
        }
        return myCollector.retList();
    }

這樣看起這個設計又不美了,可是若是有更多的外部接口須要調用,CollectStringStream就顯得頗有價值了,新加入再多的請求外部接口要改動的代碼不多不多,因此這種思想我以爲是值得推廣的。

總結

照例附上參考代碼,不過值得思考的是咱們如何像優秀的代碼學習並運用到本身的項目中。
參考代碼,java-toy

相關文章
相關標籤/搜索