Java8的CompletionService使用與原理

爲了防止無良網站的爬蟲抓取文章,特此標識,轉載請註明文章出處。LaplaceDemon/ShiJiaqi。html

https://www.cnblogs.com/shijiaqi1066/p/10454237.htmljava




CompletionService是Java8的新增接口,JDK爲其提供了一個實現類ExecutorCompletionService。這個類是爲線程池中Task的執行結果服務的,即爲Executor中Task返回Future而服務的。CompletionService的實現目標是任務先完成可優先獲取到,即結果按照完成前後順序排序。ide

CompletionService的使用很是簡單。從源碼查看ExecutorCompletionService類,該類只有三個成員變量:網站

public class ExecutorCompletionService<V> implements CompletionService<V> {
    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;
    
    ...
}

能夠看到ExecutorCompletionService主要是加強executor線程池的。Task包裝後被塞入completionQueue,當Task結束,其Future就能夠從completionQueue中獲取到。this

其基本原理能夠參看下圖:
線程

CompletionService接口源碼:code

public interface CompletionService<V> {
    // 提交
    Future<V> submit(Callable<V> task);
    Future<V> submit(Runnable task, V result);
    // 獲取
    Future<V> take() throws InterruptedException;
    Future<V> poll();
    Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}





例:向CompletionService中提交10個Task,當Task有任務返回則會優先從CompletionService內部的隊列中獲取到Task的Future。htm

package test;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class TestCompletionService {
    public static void main(String[] args)  {
        Long start = System.currentTimeMillis();
        //開啓3個線程
        ExecutorService exs = Executors.newFixedThreadPool(5);
        try {
            int taskCount = 10;
            // 結果集
            List<Integer> list = new ArrayList<Integer>();
            List<Future<Integer>> futureList = new ArrayList<Future<Integer>>();
            
            // 1.定義CompletionService
            CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(exs);  
            
            // 2.添加任務
            for(int i=0;i<taskCount;i++){
                Future<Integer> future = completionService.submit(new Task(i+1));
                futureList.add(future);
            }

            // 3.獲取結果
            for(int i=0;i<taskCount;i++){
                Integer result = completionService.take().get();
                System.out.println("任務i=="+result+"完成!"+new Date());
                list.add(result);
            }
            
            System.out.println("list="+list);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //關閉線程池
            exs.shutdown();
        }
        
    }

    static class Task implements Callable<Integer>{
        Integer i;
        
        public Task(Integer i) {
            super();
            this.i=i;
        }

        @Override
        public Integer call() throws Exception {
            if(i==5) {
                Thread.sleep(5000);
            }else{
                Thread.sleep(1000);
            }
            System.out.println("線程:"+Thread.currentThread().getName()+"任務i="+i+",執行完成!");  
            return i;
        }
        
    }
}
相關文章
相關標籤/搜索