CompletionService類有一個方法用來發送任務給執行器,還有一個方法爲下一個已經執行結束的任務獲取Future對象。從內部實現機制來看,CompletionService類使用Executor對象來執行任務。這個行爲的優點是能夠共享CompletionService對象,併發送任務到執行器,而後其餘的對象能夠處理任務的結果。第二個方法有一個不足之處,他只能爲已經執行結束的任務獲取future對象,所以,這些Future對象只能被用來獲取任務的結果。java
Code併發
package com.packtpub.java7.concurrency.chapter4.recipe11.task; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; /** * This class simulates the generation of a report. Is a Callable * object that will be executed by the executor inside a * CompletionService * */ public class ReportGenerator implements Callable<String> { /** * The sender of the report */ private String sender; /** * The title of the report */ private String title; /** * Constructor of the class. Initializes the two attributes * @param sender The sender of the report * @param title The title of the report */ public ReportGenerator(String sender, String title){ this.sender=sender; this.title=title; } /** * Main method of the ReportGenerator. Waits a random period of time * and then generates the report as a String. */ @Override public String call() throws Exception { try { Long duration=(long)(Math.random()*10); System.out.printf("%s_%s: ReportGenerator: Generating a report during %d seconds\n",this.sender,this.title,duration); TimeUnit.SECONDS.sleep(duration); } catch (InterruptedException e) { e.printStackTrace(); } String ret=sender+": "+title; return ret; } }
package com.packtpub.java7.concurrency.chapter4.recipe11.task; import java.util.concurrent.CompletionService; /** * This class represents every actor that can request a report. For this example, * it simply create three ReportGenerator objects and execute them through a * CompletionService * */ public class ReportRequest implements Runnable { /** * Name of this ReportRequest */ private String name; /** * CompletionService used for the execution of the ReportGenerator tasks */ private CompletionService<String> service; /** * Constructor of the class. Initializes the parameters * @param name Name of the ReportRequest * @param service Service used for the execution of tasks */ public ReportRequest(String name, CompletionService<String> service){ this.name=name; this.service=service; } /** * Main method of the class. Create three ReportGenerator tasks and executes them * through a CompletionService */ @Override public void run() { ReportGenerator reportGenerator=new ReportGenerator(name, "Report"); service.submit(reportGenerator); } }
package com.packtpub.java7.concurrency.chapter4.recipe11.task; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; /** * This class will take the results of the ReportGenerator tasks executed through * a CompletinoService * */ public class ReportProcessor implements Runnable { /** * CompletionService that executes the ReportGenerator tasks */ private CompletionService<String> service; /** * Variable to store the status of the Object. It will executes until the variable * takes the true value */ private boolean end; /** * Constructor of the class. It initializes the attributes of the class * @param service The CompletionService used to execute the ReportGenerator tasks */ public ReportProcessor (CompletionService<String> service){ this.service=service; end=false; } /** * Main method of the class. While the variable end is false, it * calls the poll method of the CompletionService and waits 20 seconds * for the end of a ReportGenerator task */ @Override public void run() { while (!end){ try { //調用CompletionService接口的poll()方法,來獲取下一個已經完成的任務Future對象。 Future<String> result=service.poll(20, TimeUnit.SECONDS); if (result!=null) { String report=result.get(); System.out.printf("ReportReceiver: Report Recived: %s\n",report); } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } System.out.printf("ReportSender: End\n"); } /** * Method that establish the value of the end attribute * @param end New value of the end attribute. */ public void setEnd(boolean end) { this.end = end; } }
package com.packtpub.java7.concurrency.chapter4.recipe11.core; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import com.packtpub.java7.concurrency.chapter4.recipe11.task.ReportProcessor; import com.packtpub.java7.concurrency.chapter4.recipe11.task.ReportRequest; /** * Main class of the example creates all the necessary objects and throws the tasks * */ public class Main { /** * @param args */ public static void main(String[] args) { // Create the executor and thee CompletionService using that executor ExecutorService executor=(ExecutorService)Executors.newCachedThreadPool(); CompletionService<String> service=new ExecutorCompletionService<>(executor); // Crete two ReportRequest objects and two Threads to execute them ReportRequest faceRequest=new ReportRequest("Face", service); ReportRequest onlineRequest=new ReportRequest("Online", service); Thread faceThread=new Thread(faceRequest); Thread onlineThread=new Thread(onlineRequest); // Create a ReportSender object and a Thread to execute it ReportProcessor processor=new ReportProcessor(service); Thread senderThread=new Thread(processor); // Start the Threads System.out.printf("Main: Starting the Threads\n"); faceThread.start(); onlineThread.start(); senderThread.start(); // Wait for the end of the ReportGenerator tasks try { System.out.printf("Main: Waiting for the report generators.\n"); faceThread.join(); onlineThread.join(); } catch (InterruptedException e) { e.printStackTrace(); } // Shutdown the executor System.out.printf("Main: Shuting down the executor.\n"); executor.shutdown(); try { executor.awaitTermination(1, TimeUnit.DAYS); } catch (InterruptedException e) { e.printStackTrace(); } // End the execution of the ReportSender processor.setEnd(true); System.out.printf("Main: Ends\n"); } }
工做原理dom
在範例的主類中,咱們調用了Executors工廠類的newCachedThreadPool()方法建立了ThreadPoolExecutor執行器對象。而後,使用這個對象初始化了CompletionService對象,由於完成服務(Completion Service)使用執行器來執行任務。而後,調用ReportRequest類中的submit()方法,利用「完成服務」來執行任務。ide
當「完成服務」任務結束,這些任務中的一個任務就執行結束了,「完成服務」中存儲着Future對象,用來控制它在隊列中執行。this
調用poll()方法訪問這個隊列,查看是否有任務已經完成,若是有,則返回隊列中的第一個元素(即一個執行完成後的future對象)。當poll()返回Future對象後,他將從隊列中刪除這個Future對象。在這個示例中,咱們調用poll()方法時傳遞了兩個參數,表示當隊列裏完成任務結果爲空時,想要等待任務執行結束的時間。code
一旦建立了CompletionService對象,還要建立兩個ReportRequest對象,用來執行在CompletionService中的lianggeReportGenerator任務。ReportProcessor任務則將處理兩個被髮送到執行器裏的ReportRequest任務所產生的結果。
對象