Java 併發排序

利用併發來實現排序

<!-- create time: 2016-03-14 09:49:12 -->java

1. 說明

本節主要結合排序的實例來演示多線程執行任務的流程,主要使用了線程池 ExecutorService , 閉鎖 Futrue, 完成服務 CompletionService 以及最多見的冒泡排序算法算法

基本介紹

  • ExecutorService 線程池數組

     

private static ExecutorService executorService = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(10), new DefaultThreadFactory("int-sort"), new ThreadPoolExecutor.CallerRunsPolicy());多線程

上面是建立一個線程池的實例,其中幾個參數分別爲:(來自jdk)
        
    ```java
    /**
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
     public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    ```
    
- Futrue
    
    > 就是對於具體的Runnable或者Callable任務的執行結果進行取消、查詢是否完成、獲取結果。必要時能夠經過get方法獲取執行結果,該方法會阻塞直到任務返回結果
    
    ```java
public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
  • CompletionService併發

    若是向Executor提交了一組計算任務,而且但願在計算完成後得到結果,那麼能夠保留與每一個任務關聯的Future,而後反覆使用get方法,同時將參數timeout指定爲0,從而經過輪詢來判斷任務是否完成。這種方法雖然可行,但卻有些繁瑣。幸運的是,還有一種更好的方法:完成服務CompletionServiceless

2. 實例

import io.netty.util.concurrent.DefaultThreadFactory;
import org.junit.Test;

import java.util.concurrent.*;

/**
 * Created by yihui on 16/3/11.
 */
public class RunnableTest {
    private static ExecutorService executorService = new ThreadPoolExecutor(5,
            10,
            60,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque<Runnable>(10), new DefaultThreadFactory("sort-calculate"),
            new ThreadPoolExecutor.CallerRunsPolicy());

    /**
     * 隨機生成一些數
     *
     * @param size
     * @return
     */
    private int[] genNums(final int size) {
        int[] num = new int[size];

        for (int i = 0; i < size; i++) {
            num[i] = (int) (Math.random() * 1230);
        }

        return num;
    }


    // 冒泡排序
    private int[] sort(int[] num, int size) {
        if (size <= 1) {
            return num;
        }

        int tmp;
        for (int i = 0; i < size; i++) {
            for (int j = i + 1; j < size; j++) {
                if (num[i] > num[j]) {
                    tmp = num[i];
                    num[i] = num[j];
                    num[j] = tmp;
                }
            }
        }

        return num;
    }

    // 合併兩個排序數組
    public int[] merge(int[] ans, int[] sub) {
        if (ans == null) {
            return sub;
        }


        int ansSize = ans.length;
        int subSize = sub.length;
        int[] result = new int[subSize + ansSize];

        for (int i =0, ansIndex=0, subIndex=0; i < ansSize + subSize; i ++) {
            if (subIndex >= subSize) {
                result[i] = ans[ansIndex ++];
                continue;
            }

            if (ansIndex >= ansSize) {
                result[i] = sub[subIndex ++];
                continue;
            }

            if (ans[ansIndex] < sub[subIndex]) {
                result[i] = ans[ansIndex ++];
            } else {
                result[i] = sub[subIndex ++];
            }
        }
        return result;
    }


    public int[] calculate(int[] numbers, int size) {
        CompletionService<int[]> completionService = new ExecutorCompletionService<int[]>(executorService);
        if (size <= 50) {
            return this.sort(numbers, size);
        }


        // 將數組分割,50個做爲一組,進行排序
        int subNum = (size - 1) / 50 + 1;
        for (int i = 0; i < subNum; i++) {
            int len = 50;
            if (i == subNum - 1) {
                len = size - 50 * i;
            }
            final int[] subNumbers = new int[len];
            System.arraycopy(numbers, i * 50 + 0, subNumbers, 0, len);

            final int finalLen = len;
            Callable<int[]> runnable = new Callable<int[]>() {
                @Override
                public int[] call() throws Exception {
                    return sort(subNumbers, finalLen);
                }
            };
            completionService.submit(runnable);
        }

        int[] ans = null;


        // 開始對提交的排序任務的結果進行合併
        try{
             for (int i = 0; i < subNum; i ++) {
                 // get and remove the result
                 Future<int[]> f = completionService.take();
                 int[] tmp = f.get();
                 ans = this.merge(ans, tmp);
             }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

        return ans;
    }

    // 輸出數組
    private void print(int[] num, int size, boolean newLine) {
        for (int i = 0; i < size; i++) {
            System.out.print(num[i] + ",");
        }
        if (newLine) {
            System.out.println();
        }
    }

    @Test
    public void tt() {
        int size = 250;
        int[] numbers = this.genNums(size);
        int[] numbers2 = new int[size];
        System.arraycopy(numbers, 0, numbers2, 0, size);

        long start = System.nanoTime();
        this.sort(numbers, size);
        long end = System.nanoTime();
        this.print(numbers, size, true);
        System.out.println("Cost is : " + (end - start) / 1000);

        this.print(numbers2, size, true);
        start = System.nanoTime();
        int[] ans = this.calculate(numbers2, size);
        end = System.nanoTime();
        this.print(ans, size, true);
        System.out.println("cost is : " + (end - start) / 1000);
    }


    // 用於測試排序算法,以及合併算法的正確性
    @Test
    public void test() {
        int size = 10;
        int[] numbers = this.genNums(size);
        int[] ans1 = this.sort(numbers, size);
        this.print(ans1, size, true);

        size += 5;
        int[] numbers2 = this.genNums(size);
        int[] ans2 = this.sort(numbers2, size);
        this.print(ans2, size, true);

        int[] ans = this.merge(ans1, ans2);
        this.print(ans, 25, true);
    }
}

3. 說明

針對上面的實例,咱們重點須要關注的對象集中在 calculate方法中dom

執行流程:ide

  • 對數組進行分割,按照50個一組(最後一組可能不知足50個,因此須要額外注意一下)
  • 將子數組的排序,做爲一個task扔到線程池中執行,由於要保留其返回結果,所以採用Callable 結合 CompletionService 來作,將每一個task返回的結果封裝到Future中,並塞入completionQueue隊列中
  • 從完成隊列中獲取結果,合併排序
  • 完畢後返回最終的結果
// 提交task
completionService.submit(new Callable<int[]>() {
    @Override
    public int[] call() throws Exception {
        return sort(subNumbers, finalLen);
    }
});

// 獲取結果
for (...) {
    // take 表示從阻塞隊列中獲取並移除Future  === get以後remove掉
    Futrue<int[]> futrue = completionService.take();
    int[] ans = futrue.get(); 
}
相關文章
相關標籤/搜索