在執行器中分離任務的啓動與結果的處理

CompletionService類有一個方法用來發送任務給執行器,還有一個方法爲下一個已經執行結束的任務獲取Future對象。從內部實現機制來看,CompletionService類使用Executor對象來執行任務。這個行爲的優點是能夠共享CompletionService對象,併發送任務到執行器,而後其餘的對象能夠處理任務的結果。第二個方法有一個不足之處,他只能爲已經執行結束的任務獲取future對象,所以,這些Future對象只能被用來獲取任務的結果。java

Code併發

package com.packtpub.java7.concurrency.chapter4.recipe11.task;

import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

/**
 * This class simulates the generation of a report. Is a Callable
 * object that will be executed by the executor inside a 
 * CompletionService
 *
 */
public class ReportGenerator implements Callable<String> {

    /**
     * The sender of the report
     */
    private String sender;
    /**
     * The title of the report
     */
    private String title;
    
    /**
     * Constructor of the class. Initializes the two attributes
     * @param sender The sender of the report
     * @param title The title of the report
     */
    public ReportGenerator(String sender, String title){
        this.sender=sender;
        this.title=title;
    }

    /**
     * Main method of the ReportGenerator. Waits a random period of time
     * and then generates the report as a String.
     */
    @Override
    public String call() throws Exception {
        try {
            Long duration=(long)(Math.random()*10);
            System.out.printf("%s_%s: ReportGenerator: Generating a report during %d seconds\n",this.sender,this.title,duration);
            TimeUnit.SECONDS.sleep(duration);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        String ret=sender+": "+title;
        return ret;
    }
    
}
package com.packtpub.java7.concurrency.chapter4.recipe11.task;

import java.util.concurrent.CompletionService;

/**
 * This class represents every actor that can request a report. For this example,
 * it simply create three ReportGenerator objects and execute them through a 
 * CompletionService
 *
 */
public class ReportRequest implements Runnable {

    /**
     * Name of this ReportRequest
     */
    private String name;
    
    /**
     * CompletionService used for the execution of the ReportGenerator tasks
     */
    private CompletionService<String> service;
    
    /**
     * Constructor of the class. Initializes the parameters
     * @param name Name of the ReportRequest
     * @param service Service used for the execution of tasks
     */
    public ReportRequest(String name, CompletionService<String> service){
        this.name=name;
        this.service=service;
    }

    /**
     * Main method of the class. Create three ReportGenerator tasks and executes them
     * through a CompletionService
     */
    @Override
    public void run() {
            ReportGenerator reportGenerator=new ReportGenerator(name, "Report");
            service.submit(reportGenerator);
    }

}
package com.packtpub.java7.concurrency.chapter4.recipe11.task;

import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 * This class will take the results of the ReportGenerator tasks executed through
 * a CompletinoService
 *
 */
public class ReportProcessor implements Runnable {

    /**
     * CompletionService that executes the ReportGenerator tasks
     */
    private CompletionService<String> service;
    /**
     * Variable to store the status of the Object. It will executes until the variable
     * takes the true value
     */
    private boolean end;
    
    /**
     * Constructor of the class. It initializes the attributes of the class
     * @param service The CompletionService used to execute the ReportGenerator tasks
     */
    public ReportProcessor (CompletionService<String> service){
        this.service=service;
        end=false;
    }

    /**
     * Main method of the class. While the variable end is false, it
     * calls the poll method of the CompletionService and waits 20 seconds
     * for the end of a ReportGenerator task
     */
    @Override
    public void run() {
        while (!end){
            try {
//調用CompletionService接口的poll()方法,來獲取下一個已經完成的任務Future對象。
                Future<String> result=service.poll(20, TimeUnit.SECONDS);
                if (result!=null) {
                    String report=result.get();
                    System.out.printf("ReportReceiver: Report Recived: %s\n",report);
                }            
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        
        System.out.printf("ReportSender: End\n");
    }

    /**
     * Method that establish the value of the end attribute
     * @param end New value of the end attribute.
     */
    public void setEnd(boolean end) {
        this.end = end;
    }
    
    
}
package com.packtpub.java7.concurrency.chapter4.recipe11.core;

import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import com.packtpub.java7.concurrency.chapter4.recipe11.task.ReportProcessor;
import com.packtpub.java7.concurrency.chapter4.recipe11.task.ReportRequest;

/**
 * Main class of the example creates all the necessary objects and throws the tasks
 *
 */
public class Main {

    /**
     * @param args
     */
    public static void main(String[] args) {
        // Create the executor and thee CompletionService using that executor
        ExecutorService executor=(ExecutorService)Executors.newCachedThreadPool();
        CompletionService<String> service=new ExecutorCompletionService<>(executor);

        // Crete two ReportRequest objects and two Threads to execute them
        ReportRequest faceRequest=new ReportRequest("Face", service);
        ReportRequest onlineRequest=new ReportRequest("Online", service);
        Thread faceThread=new Thread(faceRequest);
        Thread onlineThread=new Thread(onlineRequest);
        
        // Create a ReportSender object and a Thread to execute  it
        ReportProcessor processor=new ReportProcessor(service);
        Thread senderThread=new Thread(processor);
        
        // Start the Threads
        System.out.printf("Main: Starting the Threads\n");
        faceThread.start();
        onlineThread.start();
        senderThread.start();
        
        // Wait for the end of the ReportGenerator tasks
        try {
            System.out.printf("Main: Waiting for the report generators.\n");
            faceThread.join();
            onlineThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        // Shutdown the executor
        System.out.printf("Main: Shuting down the executor.\n");
        executor.shutdown();
        try {
            executor.awaitTermination(1, TimeUnit.DAYS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // End the execution of the ReportSender
        processor.setEnd(true);
        System.out.printf("Main: Ends\n");

    }

}

工做原理dom

在範例的主類中,咱們調用了Executors工廠類的newCachedThreadPool()方法建立了ThreadPoolExecutor執行器對象。而後,使用這個對象初始化了CompletionService對象,由於完成服務(Completion Service)使用執行器來執行任務。而後,調用ReportRequest類中的submit()方法,利用「完成服務」來執行任務。ide

  當「完成服務」任務結束,這些任務中的一個任務就執行結束了,「完成服務」中存儲着Future對象,用來控制它在隊列中執行。this

  調用poll()方法訪問這個隊列,查看是否有任務已經完成,若是有,則返回隊列中的第一個元素(即一個執行完成後的future對象)。當poll()返回Future對象後,他將從隊列中刪除這個Future對象。在這個示例中,咱們調用poll()方法時傳遞了兩個參數,表示當隊列裏完成任務結果爲空時,想要等待任務執行結束的時間。code

  一旦建立了CompletionService對象,還要建立兩個ReportRequest對象,用來執行在CompletionService中的lianggeReportGenerator任務。ReportProcessor任務則將處理兩個被髮送到執行器裏的ReportRequest任務所產生的結果。
對象

相關文章
相關標籤/搜索