淺談parallelStream

parallelStream是什麼,它是一個集合的併發處理流.其做用是把一個集合中的數據分片,進行一個多線程的處理,增快運行速度.java

好比說這樣一段代碼安全

private Set<SysRole> sysRoles;
private Set<String> permission;

@Override
public Collection<? extends GrantedAuthority> getAuthorities() {
    Collection<GrantedAuthority> collection = Collections.synchronizedSet(new HashSet<>());
    if (!CollectionUtils.isEmpty(sysRoles)) {
        sysRoles.parallelStream().forEach(role -> {
            if (role.getCode().startsWith("ROLE_")) {
                collection.add(new SimpleGrantedAuthority(role.getCode()));
            }else {
                collection.add(new SimpleGrantedAuthority("ROLE_" + role.getCode()));
            }
        });
    }
    return collection;
}

它就是以不一樣的線程來給collection添加SimpleGrantedAuthority的,請注意collection的線程安全性.多線程

固然咱們能夠用下面這個例子來證實parallelStream的確是多線程處理併發

public class App {
    public static void main(String[] args) throws Exception {
        System.out.println("Hello World!");
        // 構造一個10000個元素的集合
        List<Integer> list = new ArrayList<>();
        for (int i = 0; i < 10000; i++) {
            list.add(i);
        }
        // 統計並行執行list的線程
        Set<Thread> threadSet = new CopyOnWriteArraySet<>();
        // 並行執行
        list.parallelStream().forEach(integer -> {
            Thread thread = Thread.currentThread();
            // System.out.println(thread);
            // 統計並行執行list的線程
            threadSet.add(thread);
        });
        System.out.println("threadSet一共有" + threadSet.size() + "個線程");
        System.out.println("系統一個有"+Runtime.getRuntime().availableProcessors()+"個cpu");
        List<Integer> list1 = new ArrayList<>();
        List<Integer> list2 = new ArrayList<>();
        for (int i = 0; i < 100000; i++) {
            list1.add(i);
            list2.add(i);
        }
        Set<Thread> threadSetTwo = new CopyOnWriteArraySet<>();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        Thread threadA = new Thread(() -> {
            list1.parallelStream().forEach(integer -> {
                Thread thread = Thread.currentThread();
                // System.out.println("list1" + thread);
                threadSetTwo.add(thread);
            });
            countDownLatch.countDown();
        });
        Thread threadB = new Thread(() -> {
            list2.parallelStream().forEach(integer -> {
                Thread thread = Thread.currentThread();
                // System.out.println("list2" + thread);
                threadSetTwo.add(thread);
            });
            countDownLatch.countDown();
        });

        threadA.start();
        threadB.start();
        countDownLatch.await();
        System.out.print("threadSetTwo一共有" + threadSetTwo.size() + "個線程");

        System.out.println("---------------------------");
        System.out.println(threadSet);
        System.out.println(threadSetTwo);
        System.out.println("---------------------------");
        threadSetTwo.addAll(threadSet);
        System.out.println(threadSetTwo);
        System.out.println("threadSetTwo一共有" + threadSetTwo.size() + "個線程");
        System.out.println("系統一個有"+Runtime.getRuntime().availableProcessors()+"個cpu");
    }
}

運行結果以下ide

Hello World!
threadSet一共有3個線程
系統一個有4個cpu
threadSetTwo一共有5個線程---------------------------
[Thread[main,5,main], Thread[ForkJoinPool.commonPool-worker-2,5,main], Thread[ForkJoinPool.commonPool-worker-1,5,main]]
[Thread[ForkJoinPool.commonPool-worker-1,5,main], Thread[ForkJoinPool.commonPool-worker-2,5,main], Thread[ForkJoinPool.commonPool-worker-3,5,main], Thread[Thread-0,5,], Thread[Thread-1,5,]]
---------------------------
[Thread[ForkJoinPool.commonPool-worker-1,5,main], Thread[ForkJoinPool.commonPool-worker-2,5,main], Thread[ForkJoinPool.commonPool-worker-3,5,main], Thread[Thread-0,5,], Thread[Thread-1,5,], Thread[main,5,main]]
threadSetTwo一共有6個線程
系統一個有4個cpu線程

咱們能夠看到threadSet一共有3個線程,證實get

Set<Thread> threadSet = new CopyOnWriteArraySet<>();
// 並行執行
list.parallelStream().forEach(integer -> {
    Thread thread = Thread.currentThread();
    // System.out.println(thread);
    // 統計並行執行list的線程
    threadSet.add(thread);
});
System.out.println("threadSet一共有" + threadSet.size() + "個線程");

是3個線程處理的,另外CopyOnWriteArraySet是線程安全的.後面是由顯示線程調用,主線程等待的方式.it

調節parallelStream的併發線程數能夠用參數-Djava.util.concurrent.ForkJoinPool.common.parallelism=N (N爲線程數量)io

相關文章
相關標籤/搜索