Callable實現JAVA多線程

最近項目用到一個功能須要實現多線程分發任務且須要任務的返回值,以前一直都是實現Runnable接口,但裏面的run方法是返回void的。後來在網上查了下JAVA1.5開始就有了Callable。html

下面來看看如何倒騰下這個東西。java

import java.util.concurrent.Callable;

/**
 * @類說明 線程業務處理
 * @author DavenTsang
 * @date 2016-11-16
 * 
 */
public class PoolTask implements Callable<String> {

    private String id;

    @Override
    public String call() throws Exception {
        return "當前線程名:" + Thread.currentThread().getName() + ":" + id;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

}
View Code

先創建一個類實現Callable接口。一下是JDK的API對Callable接口的描述:數組

public interface Callable<V>

返回結果而且可能拋出異常的任務。實現者定義了一個不帶任何參數的叫作 call 的方法。多線程

Callable 接口相似於 Runnable,二者都是爲那些其實例可能被另外一個線程執行的類設計的。可是 Runnable 不會返回結果,而且沒法拋出通過檢查的異常。ide

Executors 類包含一些從其餘普通形式轉換成 Callable 類的實用方法。ui

V是須要返回的對象。this

須要執行這個實現類,咱們須要建立一個線程池spa

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class  PoolUtils {

        //可伸縮線程池
        private static ExecutorService cachedPool = Executors.newCachedThreadPool();
        public static CompletionService<String> completionService = new ExecutorCompletionService<String>(cachedPool);
        private PoolTask task;
        public String addTask() throws InterruptedException, ExecutionException{
            //添加任務
            Future<String> future = completionService.submit(task);
            //檢查是否出現第二個線程進來
            Thread.sleep(1000);
            List<Future<String>> list = new ArrayList<Future<String>>();
            System.out.println(completionService.take().get());
            list.add(future);
            return completionService.take().get();
        }
        
        public PoolTask getTask() {
            return task;
        }
        public void setTask(PoolTask task) {
            this.task = task;
        }
        
}
View Code

Thread.sleep();調用這個方法是由於在前面添加任務是用一個線程數組調用,看下Executors.newCachedThreadPool();這個是否能夠本身去根據須要建立線程。線程

咱們再來看下submit()方法的源碼,設計

public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }

 

  private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

 追一下源碼會發現裏面有個隊列存在。submit添加一個任務就會放到隊列裏面。這樣就不用咱們顯示的去建立一個隊列。

調用類:

import java.util.concurrent.ExecutionException;

public class Test {
    
    
    public static void main(String[] args) throws InterruptedException, ExecutionException {
            Thread[] thread = new Thread[10];
            for(int i = 0;i<thread.length;i++){
                thread[i] = new Thread(new A());
            }
            for(Thread th : thread){
                th.start();
            }
        
        
    }
}

class A implements Runnable{
    @Override
    public void run() {
        PoolTask task = new PoolTask();
        task.setId("daven");
        PoolUtils utils = new PoolUtils();
        utils.setTask(task);
        try {
            String a = utils.addTask();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}
View Code


調用類用多個線程去調用是由於模擬項目的場景了。

以上是一些記錄供本身回憶用。遇到問題先本身查找緣由,再去網上找下而後再找API追源碼。

相關文章
相關標籤/搜索