Java:並行編程及同步使用方法

  • 知道java能夠使用java.util.concurrent包下的
CountDownLatch
ExecutorService
Future

Callable
實現並行編程,並在並行線程同步時,用起來十分簡單的一種 。
實現原理:
一、CountDownLatch 統計並行線程完成數,並提供了await()方法,實現等待全部並行線程完成,或者指定最大等待時間。
二、ExecutorService提供了execute(Callable)執行線程方法,還提供了submit(Callable)提交線程。
三、Future接受實現Callable<V>接口的(可執行線程)返回值,接受Executors.submit(Callable<V>)返回值。並且Future<V>提供get()取回並行子線程返回的參數,還能夠給get指定過時時間。

想到Concurrent,就能想到c#中,命名空間System.Collection,Concurrent,在該命名空間下提供了一些線程安全的集合類。html

  • 代碼示例:

MyTaskResult.javajava

package com.dx.testparallel;

public class MyTaskResult {
    private String name;
    
    public MyTaskResult(String name){
        this.name=name;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}

TaskItem.java編程

package com.dx.testparallel;

public class TaskItem {
    private int id;
    private String name;
    
    public TaskItem(int id,String name){
        this.id=id;
        this.name=name;
    }
    
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
}

MyTask.javac#

package com.dx.testparallel;

import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;

public class MyTask implements Callable<MyTaskResult> {
    private final TaskItem taskItem;
    private final CountDownLatch threadsSignal;
    
    public MyTask(CountDownLatch threadsSignal,TaskItem taskItem) {
        this.threadsSignal= threadsSignal;
        this.taskItem=taskItem;
    }
    
    @Override
    public MyTaskResult call() throws Exception {
        MyTaskResult result=new MyTaskResult(this.taskItem.getName());
        
        // 核心處理邏輯處理
        Thread.sleep(2000);
        
        System.out.println("task id:" + taskItem.getId() +" >>>>等待結束");
        System.out.println("task id:" + taskItem.getId() + " >>>>線程名稱:" + Thread.currentThread().getName() + "結束. 還有" + threadsSignal.getCount() + " 個線程");

        // 必須等核心處理邏輯處理完成後才能夠減1
        this.threadsSignal.countDown();
        
        return result;
    }

}

Main.java安全

package com.dx.testparallel;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Main {    
    public static void main(String[] args) throws InterruptedException {
        List<TaskItem> taskItems=new ArrayList<TaskItem>();
        for(int i=0;i<20;i++){
            taskItems.add(new TaskItem(i, "task "+i));
        }
        
        CountDownLatch threadsSignal = new CountDownLatch(taskItems.size());
        ExecutorService executor = Executors.newFixedThreadPool(taskItems.size());
        List<Future<MyTaskResult>> resultLazyItems=new ArrayList<Future<MyTaskResult>>();
        System.out.println("主線程開始進入並行任務提交");
        for (TaskItem taskItem : taskItems) {
            // 使用future存儲子線程執行後返回結果,必須在全部子線程都完成後才能夠使用get();
            // 若是在這裏使用get(),會形成等待同步。
            Future<MyTaskResult> future = executor.submit(new MyTask(threadsSignal,taskItem));
            resultLazyItems.add(future);
        }
        System.out.println("主線程開始走出並行任務提交");
        System.out.println("主線程進入等待階段(等待全部並行子線程任務完成)。。。。。");
        // 等待全部並行子線程任務完成。
        threadsSignal.await();
        // 並非終止線程的運行,而是禁止在這個Executor中添加新的任務
        executor.shutdown(); 
        System.out.println("主線程走出等待階段(等待全部並行子線程任務完成)。。。。。");
        
        for(Future<MyTaskResult> future :resultLazyItems){
            try {
                MyTaskResult result = future.get();
                System.out.println(result.getName());
            } catch (ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }

}

運行結果:ide

主線程開始進入並行任務提交
主線程開始走出並行任務提交
主線程進入等待階段(等待全部並行子線程任務完成)。。。。。
task id:0 >>>>等待結束
task id:6 >>>>等待結束
task id:8 >>>>等待結束
task id:3 >>>>等待結束
task id:4 >>>>等待結束
task id:2 >>>>等待結束
task id:2 >>>>線程名稱:pool-1-thread-3結束. 還有20 個線程
task id:5 >>>>等待結束
task id:1 >>>>等待結束
task id:7 >>>>等待結束
task id:1 >>>>線程名稱:pool-1-thread-2結束. 還有19 個線程
task id:5 >>>>線程名稱:pool-1-thread-6結束. 還有19 個線程
task id:4 >>>>線程名稱:pool-1-thread-5結束. 還有20 個線程
task id:19 >>>>等待結束
task id:10 >>>>等待結束
task id:18 >>>>等待結束
task id:18 >>>>線程名稱:pool-1-thread-19結束. 還有16 個線程
task id:3 >>>>線程名稱:pool-1-thread-4結束. 還有20 個線程
task id:8 >>>>線程名稱:pool-1-thread-9結束. 還有20 個線程
task id:6 >>>>線程名稱:pool-1-thread-7結束. 還有20 個線程
task id:0 >>>>線程名稱:pool-1-thread-1結束. 還有20 個線程
task id:10 >>>>線程名稱:pool-1-thread-11結束. 還有16 個線程
task id:19 >>>>線程名稱:pool-1-thread-20結束. 還有16 個線程
task id:12 >>>>等待結束
task id:17 >>>>等待結束
task id:17 >>>>線程名稱:pool-1-thread-18結束. 還有9 個線程
task id:11 >>>>等待結束
task id:11 >>>>線程名稱:pool-1-thread-12結束. 還有8 個線程
task id:14 >>>>等待結束
task id:14 >>>>線程名稱:pool-1-thread-15結束. 還有7 個線程
task id:16 >>>>等待結束
task id:16 >>>>線程名稱:pool-1-thread-17結束. 還有6 個線程
task id:15 >>>>等待結束
task id:9 >>>>等待結束
task id:13 >>>>等待結束
task id:7 >>>>線程名稱:pool-1-thread-8結束. 還有19 個線程
task id:13 >>>>線程名稱:pool-1-thread-14結束. 還有5 個線程
task id:9 >>>>線程名稱:pool-1-thread-10結束. 還有5 個線程
task id:15 >>>>線程名稱:pool-1-thread-16結束. 還有5 個線程
task id:12 >>>>線程名稱:pool-1-thread-13結束. 還有9 個線程
主線程走出等待階段(等待全部並行子線程任務完成)。。。。。
task 0
task 1
task 2
task 3
task 4
task 5
task 6
task 7
task 8
task 9
task 10
task 11
task 12
task 13
task 14
task 15
task 16
task 17
task 18
task 19

  註釋如下代碼:this

    // Thread.sleep(2000);        
    // System.out.println("task id:" + taskItem.getId() +" >>>>等待結束");

以後運行結果:spa

主線程開始進入並行任務提交
task id:1 >>>>線程名稱:pool-1-thread-2結束. 還有20 個線程
task id:2 >>>>線程名稱:pool-1-thread-3結束. 還有20 個線程
task id:3 >>>>線程名稱:pool-1-thread-4結束. 還有20 個線程
task id:4 >>>>線程名稱:pool-1-thread-5結束. 還有19 個線程
task id:5 >>>>線程名稱:pool-1-thread-6結束. 還有19 個線程
task id:8 >>>>線程名稱:pool-1-thread-9結束. 還有15 個線程
task id:0 >>>>線程名稱:pool-1-thread-1結束. 還有14 個線程
task id:9 >>>>線程名稱:pool-1-thread-10結束. 還有13 個線程
task id:7 >>>>線程名稱:pool-1-thread-8結束. 還有12 個線程
task id:6 >>>>線程名稱:pool-1-thread-7結束. 還有14 個線程
task id:10 >>>>線程名稱:pool-1-thread-11結束. 還有10 個線程
task id:11 >>>>線程名稱:pool-1-thread-12結束. 還有9 個線程
task id:12 >>>>線程名稱:pool-1-thread-13結束. 還有8 個線程
task id:13 >>>>線程名稱:pool-1-thread-14結束. 還有7 個線程
task id:14 >>>>線程名稱:pool-1-thread-15結束. 還有6 個線程
task id:15 >>>>線程名稱:pool-1-thread-16結束. 還有5 個線程
task id:16 >>>>線程名稱:pool-1-thread-17結束. 還有4 個線程
task id:17 >>>>線程名稱:pool-1-thread-18結束. 還有3 個線程
task id:18 >>>>線程名稱:pool-1-thread-19結束. 還有2 個線程
主線程開始走出並行任務提交
主線程進入等待階段(等待全部並行子線程任務完成)。。。。。
task id:19 >>>>線程名稱:pool-1-thread-20結束. 還有1 個線程
主線程走出等待階段(等待全部並行子線程任務完成)。。。。。
task 0
task 1
task 2
task 3
task 4
task 5
task 6
task 7
task 8
task 9
task 10
task 11
task 12
task 13
task 14
task 15
task 16
task 17
task 18
task 19

 

  • 項目中應用:

定義可執行線程類:.net

public class UploadFileToTask implements Callable<UploadFileToTaskResult> {
    private final Task_UploadFileToTaskItem taskItem;
    private final Log log = LogHelper.getInstance(ImportMain.class);
    private final CountDownLatch threadsSignal;
    private final HDFSUtil hdfsUtil = new HDFSUtil();
    private final static String HADOOP_HDFS_PATH = HdfsConfiguration.getHdfsUrl();

    public UploadFileToTask(CountDownLatch threadsSignal ,Task_UploadFileToTaskItem taskItem){
        this.taskItem=taskItem;
        this.threadsSignal=threadsSignal;
    }

    @Override
    public UploadFileToTaskResult call() throws Exception {
        String area = taskItem.getArea();
        String fileGenerateDate = taskItem.getFileGenerateDate();
        String manufacturer = taskItem.getManufacturer();
        String eNodeBId = taskItem.geteNodeBId();
        String filePath = taskItem.getFilePath();
        FileType fileType = taskItem.getFileType();

        TaskStatus taskStatus= TaskStatus.Success;

        // 不肯定該FileSystem是不是線程安全的,故在每個thread初始化一次。
        Configuration conf = new Configuration();
        Path dstPath = new Path(HADOOP_HDFS_PATH);
        FileSystem hdfs = dstPath.getFileSystem(conf);

        // 核心代碼。。。
        // 上傳MR文件
        // 上傳Signal文件

        // 若是文件路徑不爲空,就開始上傳文件到hdfs
        if(uploadFilePath.length()>0){
            if (!hdfsUtil.uploadFileToHdfs(hdfs, filePath, uploadFilePath)) {
                taskStatus= TaskStatus.Fail;
            }
        }

        TaskGroupInfo taskGroupInfo = new TaskGroupInfo();
        taskGroupInfo.setArea(area);
        taskGroupInfo.setManufacturer(manufacturer);
        taskGroupInfo.setFileGenerateDate(fileGenerateDate);
        taskGroupInfo.setFileType(fileType);

        String key = String.format("%s,%s,%s,%s", taskGroupInfo.getArea(), taskGroupInfo.getManufacturer(), taskGroupInfo.getFileGenerateDate(), String.valueOf(taskGroupInfo.getFileType().getValue()));

        UploadFileToTaskResult result=new UploadFileToTaskResult();

        // 填充返回值
        result.setStatus(taskStatus);
        result.setTaskGroupInfo(taskGroupInfo);
        result.setTaskGroupkey(key);
        result.setTaskOID(taskItem.getOid());

        System.out.println("task id:" + taskItem.getOid() + " >>>>線程名稱:" + Thread.currentThread().getName() + "結束. 還有" + threadsSignal.getCount() + " 個線程");

        // 必須等核心處理邏輯處理完成後才能夠減1
        this.threadsSignal.countDown();

        return result;
    }
}

實現並行線程同步核心代碼:線程

            // 獲取當前節點帶執行任務
            ArrayList<Task_UploadFileToTaskItem> taskItems = uploadFileToTaskItemDao.getTopNTodoTaskItems(this.computeNode.getId(),Configuration.getTaskCount());
            // 批量修改任務狀態爲正在處理狀態(doing)。
            log.info("Start:>>>>>>batch modify task status(doing)>>>>>>");
            log.info("Over:>>>>>>batch modify task status(doing)>>>>>>");

            // 批量處理上傳任務(上傳文件到)
            log.info("Start:>>>>>>each process task(upload to)>>>>>>");
CountDownLatch threadsSignal
= new CountDownLatch(taskItems.size()); ExecutorService executor = Executors.newFixedThreadPool(taskItems.size()); List<Future<UploadFileToTaskResult>> resultLazyItems=new ArrayList<Future<UploadFileToTaskResult>>(); for (Task_UploadFileToTaskItem taskItem : taskItems) { // 使用future存儲子線程執行後返回結果,必須在全部子線程都完成後才能夠使用get(); // 若是在這裏使用get(),會形成等待同步。 Future<UploadFileToTaskResult> future = executor.submit(new UploadFileToTask(threadsSignal,taskItem)); resultLazyItems.add(future); } // 等待全部並行子線程任務完成。 threadsSignal.await();
executor.shutdown();//並非終止線程的運行,而是禁止在這個Executor中添加新的任務  log.info(
"Over:>>>>>>each process task(upload to)>>>>>>"); // 批量修改任務處理狀態 Map<String, TaskGroupInfo> taskGroupItems=new HashMap<String, TaskGroupInfo>(); Map<Integer, TaskStatus> successTaskItems = new HashMap<Integer, TaskStatus>(); Map<Integer, TaskStatus> failTaskItems = new HashMap<Integer, TaskStatus>(); for(Future<UploadFileToTaskResult> future :resultLazyItems){ UploadFileToTaskResult result= future.get(); if(!taskGroupItems.containsKey(result.getTaskGroupkey())){ taskGroupItems.put(result.getTaskGroupkey(),result.getTaskGroupInfo()); } if(result.getStatus()== TaskStatus.Success){ successTaskItems.put(result.getTaskOID(),result.getStatus()); }else{ failTaskItems.put(result.getTaskOID(),result.getStatus()); } }
  •  參考資料:

http://blog.csdn.net/wangmuming/article/details/19832865

http://www.importnew.com/21312.html

相關文章
相關標籤/搜索