Java 多線程執行

背景:ide

若是一個任務由多個子任務組成,子任務所有執行完成後而後由主線程對全部子任務結果進行封裝,能夠採用以下幾種方式:測試

一、基於Guava ListenableFuture 進行;spa

二、基於FutureTask 和CountDownLatch進行線程

三、基於FutureTask進行;code

四、基於CompletionService進行blog

五、基於BlockingQueue進行get

說明:it

二、3 的區別就是線程池時候每次都新建、shutdown;io

四、5 是一個東西class

 

 public static void listenableFuture() {
        try {
            ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2));
            List<ListenableFuture<Integer>> futures = new ArrayList<>();
            for (int i = 0; i < 20; i++) {
                final ListenableFuture<Integer> future = pool.submit(new CountTask());
                futures.add(future);
                Futures.addCallback(future, new FutureCallback<Integer>() {
                    @Override
                    public void onSuccess(Integer result) {
                        System.out.println(result);
                    }

                    @Override
                    public void onFailure(Throwable t) {
                        t.printStackTrace();
                    }
                });
            }
            System.out.println("submit success");
            ListenableFuture<List<Integer>> ret = Futures.successfulAsList(futures);
            List<Integer> res = ret.get();
            System.out.println(res);
            pool.shutdown();
            System.out.println("shutdown success");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void countDownCount() throws Exception {
        int threadNum = 20;
        ExecutorService executor = Executors.newCachedThreadPool();
        CountDownLatch count = new CountDownLatch(threadNum);
        List<FutureTask<Integer>> futureTasks = new ArrayList<>();
        for (int i = 0; i < threadNum; i++) {
            CountTask task = new CountTask(count);
            FutureTask<Integer> futureTask = new FutureTask<>(task);
            executor.submit(futureTask);
            futureTasks.add(futureTask);
        }
        // 該動做會阻塞主線程知道各個線程完成任務
        count.await();
        System.out.println("執行完成");
        for (FutureTask<Integer> futureTask : futureTasks) {
            Integer ret = futureTask.get();
            System.out.println(ret);
        }
        executor.shutdown();
        System.out.println("測試完成");
    }

    public static void futureTaskCount() throws Exception {
        int threadNum = 20;
        ExecutorService executor = Executors.newCachedThreadPool();
        List<FutureTask<Integer>> futureTasks = new ArrayList<>();
        for (int i = 0; i < threadNum; i++) {
            CountTask task = new CountTask();
            FutureTask<Integer> futureTask = new FutureTask<>(task);
            executor.submit(futureTask);
            futureTasks.add(futureTask);
        }
        // 關閉線程池,該動做會阻塞主線程知道線程池中線程執行完成
        executor.shutdown();
        System.out.println("shutdown");
        for (FutureTask<Integer> futureTask : futureTasks) {
            Integer ret = futureTask.get();
            System.out.println(ret);
        }
        System.out.println("測試完成");
    }

    public static void completionCount() throws Exception {
        int threadNum = 20;
        ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
        CompletionService<Integer> pool = new ExecutorCompletionService<Integer>(executor);
        for (int i = 0; i < threadNum; i++) {
            pool.submit(new CountTask());
        }
        for (int i = 0; i < threadNum; i++) {
            Integer ret = pool.take().get();
            System.out.println("輸出結果" + ret);
        }
        System.out.println("測試完成");
        executor.shutdown();
    }

    // 使用阻塞容器保存每次Executor處理的結果,在後面進行統一處理
    public static void blockingQueueCount() throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool();
        BlockingQueue<Future<Integer>> queue = new LinkedBlockingQueue<Future<Integer>>();
        for (int i = 0; i < 10; i++) {
            Future<Integer> future = exec.submit(new CountTask());
            queue.add(future);
        }
        int sum = 0;
        int queueSize = queue.size();
        for (int i = 0; i < queueSize; i++) {
            sum += queue.take().get();
        }
        System.out.println("總數爲:" + sum);
        exec.shutdown();
    }
相關文章
相關標籤/搜索