匿名函數java
λ演算git
在Java
中流式編程的基本原理有兩點。github
IntStream.rangeClosed(1, 100) // 1. 構建流 .mapToObj(String::valueOf)// 2. 數據流轉(流水線) .collect(joining()); // 3. 規約
@Test fun t1() { // 英雄的主位置一共有幾類,分別是什麼 // 映射 val roleMains = heroes.map(Hero::getRoleMain) // 過濾爲空的數據 .filter(Objects::nonNull) // 去重 .distinct() println(roleMains.size) println(roleMains) }
@Test public void t1() { // 英雄的主位置一共有幾類,分別是什麼 List<String> roleMains = heroes.stream() // 映射 .map(Hero::getRoleMain) // 過濾爲空的數據 .filter(Objects::nonNull) // 去重 .distinct() // 收集列表 .collect(toList()); System.out.println(roleMains.size()); System.out.println(roleMains); }
@Test fun t2() { // 英雄按主次位置分組後,輸出每一個分組有多少英雄,其中:近戰英雄有多少位,遠程英雄有多少位 // 主次位置分組的英雄數量 val groupHeroCount = heroes.groupingBy { Pair.of(it.roleMain, it.roleAssist) }.eachCount() // 主次分組後,再按攻擊範圍分組的英雄數量 val groupThenGroupCount = heroes.groupBy { Pair.of(it.roleMain, it.roleAssist) }.map { val value = it.value.groupingBy(Hero::getAttackRange).eachCount() Pair.of(it.key, value) }.associateBy({ it.left }, { it.value }) // 遍歷輸出 groupThenGroupCount.forEach { (groupKey, groupValue) -> val groupingCount = groupHeroCount[groupKey] print("英雄分組key爲:$groupKey;英雄數量:$groupingCount;") groupValue.forEach { (countKey, countValue) -> print("英雄攻擊範圍:$countKey;英雄數量:$countValue;") } println() } }
@Test public void t2() { // 英雄按主次位置分組後,輸出每一個分組有多少英雄,其中:近戰英雄有多少位,遠程英雄有多少位 // 主次位置分組的英雄數量 Map<Pair<String, String>, Long> groupHeroCount = heroes.stream() .collect(groupingBy(hero -> Pair.of(hero.getRoleMain(), hero.getRoleAssist()), counting())); // 主次分組後,再按攻擊範圍分組的英雄數量 Map<Pair<String, String>, Map<String, Long>> groupThenGroupCount = heroes.stream() .collect(groupingBy(hero -> Pair.of(hero.getRoleMain(), hero.getRoleAssist()), groupingBy(Hero::getAttackRange, counting()))); // 遍歷輸出 groupThenGroupCount.forEach((groupKey, groupValue) -> { Long groupingCount = groupHeroCount.get(groupKey); System.out.print("英雄分組key爲:" + groupKey + ";英雄數量:" + groupingCount + ";"); groupValue.forEach((countKey, countValue) -> System.out.print("英雄攻擊範圍:" + countKey + ";英雄數量:" + countValue + ";")); System.out.println(); }); }
@Test fun t3() { // 求近戰英雄HP初始值的加總 val sum = heroes.filter { "近戰" == it.attackRange } .map(Hero::getHpStart) .filter(Objects::nonNull) .reduce(BigDecimal::add) println("近戰英雄HP初始值的加總爲:$sum") }
@Test public void t3() { // 求近戰英雄HP初始值的加總 BigDecimal sum = heroes.stream() .filter(hero -> "近戰".equals(hero.getAttackRange())) .map(Hero::getHpStart) .filter(Objects::nonNull) .reduce(BigDecimal.ZERO, BigDecimal::add); System.out.println("近戰英雄HP初始值的加總爲:" + sum); }
@Test public void t4() { // 經過最小列表收集器獲取最小列表 List<BigDecimal> minAttackGrowth = heroes.stream() .map(Hero::getAttackGrowth) .collect(new MinListCollector<>()); System.out.println(minAttackGrowth); List<Hero> minHero = heroes.stream() .collect(new MinListCollector<>()); System.out.println(minHero); }
import java.util.*; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.BinaryOperator; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collector; import java.util.stream.Collectors; import static java.util.stream.Collector.Characteristics.*; /** * 最小列表收集器 * * @author switch * @since 2020/8/18 */ public class MinListCollector<T extends Comparable<? super T>> implements Collector<T, List<T>, List<T>> { /** * 收集器的特性 * * @see Characteristics */ private final static Set<Characteristics> CHARACTERISTICS = Collections.unmodifiableSet(EnumSet.of(IDENTITY_FINISH)); private final static int ZERO = 0; /** * 最小值 */ private final AtomicReference<T> min = new AtomicReference<>(); @Override public Supplier<List<T>> supplier() { // supplier參數用於生成結果容器,容器類型爲A return ArrayList::new; } @Override public BiConsumer<List<T>, T> accumulator() { // accumulator用於消費元素,也就是概括元素,這裏的T就是元素,它會將流中的元素一個一個與結果容器A發生操做 return (list, element) -> { // 獲取最小值 T minValue = min.get(); if (Objects.isNull(minValue)) { // 第一次比較 list.add(element); min.set(element); } else if (element.compareTo(minValue) < ZERO) { // 發現更小的值 list.clear(); list.add(element); min.compareAndSet(minValue, element); } else if (element.compareTo(minValue) == ZERO) { // 與最小值相等 list.add(element); } }; } @Override public BinaryOperator<List<T>> combiner() { // combiner用於兩個兩個合併並行執行的線程的執行結果,將其合併爲一個最終結果A return (left, right) -> { // 最小值列表合併 List<T> leftList = getMinList(left); List<T> rightList = getMinList(right); leftList.addAll(rightList); return leftList; }; } private List<T> getMinList(List<T> list) { return list.stream() .filter(element -> element.compareTo(min.get()) == ZERO) .collect(Collectors.toList()); } @Override public Function<List<T>, List<T>> finisher() { // finisher用於將以前整合完的結果R轉換成爲A return Function.identity(); } @Override public Set<Characteristics> characteristics() { // characteristics表示當前Collector的特徵值,這是個不可變Set return CHARACTERISTICS; } }
import org.junit.Test; import java.util.Optional; /** * @author switch * @since 2020/8/18 */ public class OptionalTests { @Test public void t1() { // orElse System.out.println(Optional.ofNullable(null).orElse("張三")); System.out.println(Optional.ofNullable(null).orElseGet(() -> "李四")); System.out.println(Optional.ofNullable("王五").orElseThrow(NullPointerException::new)); } @Test public void t2() { // isPresent Optional<String> name = Optional.ofNullable("張三"); if (name.isPresent()) { System.out.println(name.get()); } } @Test public void t3() { // map Optional<Integer> number = Optional.of("123456").map(Integer::valueOf); if (number.isPresent()) { System.out.println(number.get()); } } @Test public void t4() { // flatMap Optional<Integer> number = Optional.of("123456").flatMap(s -> Optional.of(Integer.valueOf(s))); if (number.isPresent()) { System.out.println(number.get()); } } @Test public void t5() { // 過濾 String number = "123456"; String filterNumber = Optional.of(number).filter(s -> !s.equals(number)).orElse("654321"); System.out.println(filterNumber); } }
CompletableFuture
該模型適用於百萬級量級的任務。超過千萬數據,能夠考慮分組,多機器並行執行。
基本流程:web
- 從數據庫獲取Id列表
- 拆分紅n個子Id列表
- 經過子Id列表獲取關聯數據(注意:都須要提供批量查詢接口)
- 映射到須要處理的Model(提交到CompletableFuture)->處理數據->收集成list)(java 8流式處理)
- 收集的list進行join操做
- 收集list
模型原理:Stream+CompletableFuture+lambdaspring
簡要解釋:數據庫
- CompletableFuture是java8提供的一個工具類,主要是用於異步處理流程編排的。
- Stream是java8提供的一個集合流式處理工具類,主要用於數據的流水線處理。
- lambda在java中是基於內部匿名類實現的,能夠大幅減小重複代碼。
- 總結:在該模型中Stream用於集合流水線處理、CompletableFuture解決異步編排問題(非阻塞)、lambda簡化代碼。
List<List<String>> -> Stream<List<String>> -> Stream<List<Model>> -> Stream<CompletableFuture<List<Model>>> -> Stream<CompletableFuture<List<映射類型>>> -> List<CompletableFuture<Void>>
ThreadPoolUtil
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.ThreadPoolExecutor; public final class ThreadPoolUtil { public static ThreadPoolTaskExecutor getDefaultExecutor(Integer poolSize, Integer maxPoolSize, Integer queueCapacity) { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setAllowCoreThreadTimeOut(true); executor.setWaitForTasksToCompleteOnShutdown(true); executor.setCorePoolSize(poolSize); executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(queueCapacity); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return executor; } }
ThreadPoolConfig
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @Configuration public class ThreadPoolConfig { /** * 計算規則:N(thread) = N(cpu) * U(cpu) * (1 + w/c) * N(thread):線程池大小 * N(cpu):處理器核數 * U(cpu):指望CPU利用率(該值應該介於0和1之間) * w/c:是等待時間與計算時間的比率,好比說IO操做即爲等待時間,計算處理即爲計算時間 */ private static final Integer TASK_POOL_SIZE = 50; private static final Integer TASK_MAX_POOL_SIZE = 100; private static final Integer TASK_QUEUE_CAPACITY = 1000; @Bean("taskExecutor") public ThreadPoolTaskExecutor taskExecutor() { return ThreadPoolUtil.getDefaultExecutor(TASK_POOL_SIZE, TASK_MAX_POOL_SIZE, TASK_QUEUE_CAPACITY); } }
#getFuturesStream
public Stream<CompletableFuture<List<Model>>> getFuturesStream(List<List<String>> idSubLists) { return idSubLists.stream() .map(ids -> CompletableFuture.supplyAsync(() -> modelService.listByIds(ids), taskExecutor) ); }
#standardisation
public void standardisation() { List<CompletableFuture<Void>> batchFutures = getFuturesStream(idSubLists) .map(future -> future.thenApply(this::listByNormalize)) .map(future -> future.thenAccept(modelService::batchUpdateData)) .collect(Collectors.toList()); List<Void> results = batchFutures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); }
《Java併發編程實戰》一書中,Brian Goetz和合著者們爲線程池大小的優化提供了很多中肯的建議。這很是重要,若是線程池中線程的數量過多,最終它們會競爭稀缺的處理器和內存資源,浪費大量的時間在上下文切換上。反之,若是線程的數目過少,正如你的應用所面臨的狀況,處理器的一些核可能就沒法充分利用。Brian Goetz建議,線程池大小與處理器的利用率之比可使用下面的公式進行估算:
$$N_{threads} = N_{CPU} * U_{CPU} * (1 + \frac{W}{C})$$編程其中:api
- $N_{CPU}$是處理器的核的數目,能夠經過
Runtime.getRuntime().availableProcessors()
獲得- $U_{CPU}$是指望的CPU利用率(該值應該介於0和1之間)
- $\frac{W}{C}$是等待時間與計算時間的比率,好比說IO操做即爲等待時間,計算處理即爲計算時間
對集合進行並行計算有兩種方式:要麼將其轉化爲並行流,利用map這樣的操做開展工做,要麼枚舉出集合中的每個元素,建立新的線程,在CompletableFuture內對其進行操做。後者提供了更多的靈活性,能夠調整線程池的大小,而這能幫助確保總體的計算不會由於線程都在等待I/O而發生阻塞。網絡
使用這些API的建議以下:多線程
- 若是進行的是計算密集型的操做,而且沒有I/O,那麼推薦使用Stream接口,由於實現簡單,同時效率也多是最高的(若是全部的線程都是計算密集型的,那就沒有必要建立比處理器核數更多的線程)。
- 反之,若是並行的工做單元還涉及等待I/O的操做(包括網絡鏈接等待),那麼使用CompletableFuture靈活性更好,能夠依據等待/計算,或者$\frac{W}{C}$的比率設定須要使用的線程數。這種狀況不使用並行流的另外一個緣由是,處理流的流水線中若是發生I/O等待,流的延遲特性很難判斷到底何時觸發了等待。
使用指南:https://www.yuque.com/docs/share/ee5ef8a7-d261-4593-bd08-2a7a7d2c11ca?#(密碼:gtag) 《時區工具類使用指南》
GitHub:java8-fluent
分享並記錄所學所見