invokeAll和CompletionService

需求場景:服務A依賴於其餘三個服務:服務B、服務C、服務D,而服務A的調用方要求服務A在100ms內返回結果。服務A須要在100ms內把已經有結果的服務返回,取消無結果的服務。this

使用ExecutorService.invokeAll()方法,該方法輸入是一個Callable任務的集合,返回的是Future集合,Future集合的順序與輸入的任務同樣。invokeAll()的超時時限是對這一整組集合來講的。
該方法會阻塞,當全部任務執行完畢或者超時的時候,方法就會返回,如有未完成的任務,invokeAll()方法中會調用cancel(true)方法來取消任務。咱們能夠對返回的Future調用isCancelled()方法來看該任務是否已經執行完畢。code

public void testCustomerExecutorException() {
        List<SayHello> tasks = Lists.newArrayList();
        for (int i = 5; i >= 1; i--) {
            tasks.add(new SayHello(i));
        }

        List<Future<String>> futures = Lists.newArrayList();
        try {
            futures = fixedExecutorService.invokeAll(tasks, 250, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            System.out.println("Main: " + e);
        }

        for (Future<String> future :futures) {
            try {
                String s = future.get();
                System.out.println("get ok: " + s);
            } catch (Exception e) {
                System.out.println("get error: " + e);
            }
        }
    }

    private class SayHello implements Callable<String> {
        private int id;
        public SayHello(int id) {
            this.id = id;
        }
        public String call() throws Exception {
            try {
                Thread.sleep(id*100);
            } catch (Exception e) {
                System.out.println(id + "; SayHello: " + e);
                return "hello " + id;
            }
            return "hello " + id;
        }
    }

CompletionService與invokeAll的不一樣點在於:
CompletionService:任務執行完畢立刻返回
invokeAll:須要等所有任務執行完畢或者超時再返回get

public ExecutorService fixedExecutorService = Executors.newFixedThreadPool(5);
    private final BlockingQueue<Future<String>> queue = new LinkedBlockingDeque<Future<String>>(10);
    private final CompletionService<String> completionService = new ExecutorCompletionService<String>(fixedExecutorService, queue);

    public void testCustomerExecutorException() {
        for (int i = 5; i >= 1; i--) {
            completionService.submit(new SayHello(i));
        }

        for (int i = 0; i < 5; i++)
        {
            try {
                //誰最早執行完成,直接返回
                Future<String> f = completionService.take();
                System.out.println(f.get());
            } catch (Exception e) {

            }
        }
    }

    private class SayHello implements Callable<String> {
        private int id;
        public SayHello(int id) {
            this.id = id;
        }
        public String call() throws Exception {
            try {
                Thread.sleep(id*100);
            } catch (Exception e) {
                return "hello " + id;
            }
            return "hello " + id;
        }
    }
//輸出
//hello1
//hello2
//hello3
//hello4
//hello5
相關文章
相關標籤/搜索