今天要講的Stream指的是java.util.stream包中的諸多類。Stream能夠方便的將以前的結合類以轉換爲Stream並以流式方式進行處理,大大的簡化了咱們的編程,Stream包中,最核心的就是interface Stream<T> java
從上面的圖中咱們能夠看到Stream繼承自BaseStream。Stream中定義了不少很是實用的方法,好比filter,map,flatmap,forEach,reduce,collect等等。接下來咱們將會逐一講解。git
Stream的建立有不少方式,java引入Stream以後全部的集合類都添加了一個stream()方法,經過這個方法能夠直接獲得其對應的Stream。也能夠經過Stream.of方法來建立:github
//Stream Creation String[] arr = new String[]{"a", "b", "c"}; Stream<String> stream = Arrays.stream(arr); stream = Stream.of("a", "b", "c");
若是咱們想使用多線程來處理集合類的數據,Stream提供了很是方便的多線程方法parallelStream():編程
//Multi-threading List<String> list =new ArrayList(); list.add("aaa"); list.add("bbb"); list.add("abc"); list.add("ccc"); list.add("ddd"); list.parallelStream().forEach(element -> doPrint(element));
Stream的操做能夠分爲兩類,一類是中間操做,中間操做返回Stream<T>,所以能夠級聯調用。 另外一類是終止操做,這類操做會返回Stream定義的類型。多線程
//Operations long count = list.stream().distinct().count();
上面的例子中,distinct()返回一個Stream,因此能夠級聯操做,最後的count()是一個終止操做,返回最後的值。併發
Stream提供了anyMatch(), allMatch(), noneMatch()這三種match方式,咱們看下怎麼使用:app
//Matching boolean isValid = list.stream().anyMatch(element -> element.contains("h")); boolean isValidOne = list.stream().allMatch(element -> element.contains("h")); boolean isValidTwo = list.stream().noneMatch(element -> element.contains("h"));
filter() 方法容許咱們對Stream中的數據進行過濾,從而獲得咱們須要的:框架
Stream<String> filterStream = list.stream().filter(element -> element.contains("d"));
上面的例子中咱們從list中選出了包含「d」字母的String。dom
map就是對Stream中的值進行再加工,而後將加工事後的值做爲新的Stream返回。ide
//Mapping Stream<String> mappingStream = list.stream().map(element -> convertElement(element)); private static String convertElement(String element) { return "element"+"abc"; }
上的例子中咱們把list中的每一個值都加上了「abc」而後返回一個新的Stream。
flatMap和Map很相似,可是他們兩個又有不一樣,看名字咱們能夠看到flatMap意思是打平以後再作Map。
怎麼理解呢?
假如咱們有一個CustBook類:
@Data public class CustBook { List<String> bookName; }
CustBook定義了一個bookName字段。
先看一下Map返回的結果:
List<CustBook> users = new ArrayList<>(); users.add(new CustBook()); Stream<Stream<String>> userStreamMap = users.stream().map(user -> user.getBookName().stream());
在上面的代碼中,map將每個user都轉換成了stream,因此最後的結果是返回Stream的Stream。
若是咱們只想返回String,則可使用FlatMap:
List<CustBook> users = new ArrayList<>(); users.add(new CustBook()); Stream<String> userStream = users.stream().map(user -> user.getBookName().stream());
簡單點講FlatMap就是將層級關係鋪平重來。
使用reduce() 方法能夠方便的對集合的數據進行運算,reduce()接收兩個參數,第一個是開始值,後面是一個函數表示累計。
//Reduction List<Integer> integers = Arrays.asList(1, 1, 1); Integer reduced = integers.stream().reduce(100, (a, b) -> a + b);
上面的例子咱們定義了3個1的list,而後調用reduce(100, (a, b) -> a + b)方法,最後的結果是103.
collect()方法能夠方便的將Stream再次轉換爲集合類,方便處理和展現:
List<String> resultList = list.stream().map(element -> element.toUpperCase()).collect(Collectors.toList());
java 8引入了lambda表達式,lambda表達式實際上表示的就是一個匿名的function。
在java 8以前,若是須要使用到匿名function須要new一個類的實現,可是有了lambda表達式以後,一切都變的很是簡介。
咱們看一個以前講線程池的時候的一個例子:
//ExecutorService using class ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.submit(new Runnable() { @Override public void run() { log.info("new runnable"); } });
executorService.submit須要接收一個Runnable類,上面的例子中咱們new了一個Runnable類,並實現了它的run()方法。
上面的例子若是用lambda表達式來重寫,則以下所示:
//ExecutorService using lambda executorService.submit(()->log.info("new runnable"));
看起是否是很簡單,使用lambda表達式就能夠省略匿名類的構造,而且可讀性更強。
那麼是否是全部的匿名類均可以用lambda表達式來重構呢?也不是。
咱們看下Runnable類有什麼特色:
@FunctionalInterface public interface Runnable
Runnable類上面有一個@FunctionalInterface註解。這個註解就是咱們今天要講到的Functional Interface。
Functional Interface是指帶有 @FunctionalInterface 註解的interface。它的特色是其中只有一個子類必需要實現的abstract方法。若是abstract方法前面帶有default關鍵字,則不作計算。
其實這個也很好理解,由於Functional Interface改寫成爲lambda表達式以後,並無指定實現的哪一個方法,若是有多個方法須要實現的話,就會有問題。
@Documented @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) public @interface FunctionalInterface {}
Functional Interface通常都在java.util.function包中。
根據要實現的方法參數和返回值的不一樣,Functional Interface能夠分爲不少種,下面咱們分別來介紹。
Function接口定義了一個方法,接收一個參數,返回一個參數。
@FunctionalInterface public interface Function<T, R> { /** * Applies this function to the given argument. * * @param t the function argument * @return the function result */ R apply(T t);
通常咱們在對集合類進行處理的時候,會用到Function。
Map<String, Integer> nameMap = new HashMap<>(); Integer value = nameMap.computeIfAbsent("name", s -> s.length());
上面的例子中咱們調用了map的computeIfAbsent方法,傳入一個Function。
上面的例子還能夠改寫成更短的:
Integer value1 = nameMap.computeIfAbsent("name", String::length);
Function沒有指明參數和返回值的類型,若是須要傳入特定的參數,則可使用IntFunction, LongFunction, DoubleFunction:
@FunctionalInterface public interface IntFunction<R> { /** * Applies this function to the given argument. * * @param value the function argument * @return the function result */ R apply(int value); }
若是須要返回特定的參數,則可使用ToIntFunction, ToLongFunction, ToDoubleFunction:
@FunctionalInterface public interface ToDoubleFunction<T> { /** * Applies this function to the given argument. * * @param value the function argument * @return the function result */ double applyAsDouble(T value); }
若是要同時指定參數和返回值,則可使用DoubleToIntFunction, DoubleToLongFunction, IntToDoubleFunction, IntToLongFunction, LongToIntFunction, LongToDoubleFunction:
@FunctionalInterface public interface LongToIntFunction { /** * Applies this function to the given argument. * * @param value the function argument * @return the function result */ int applyAsInt(long value); }
若是須要接受兩個參數,一個返回值的話,可使用BiFunction:BiFunction, ToDoubleBiFunction, ToIntBiFunction, ToLongBiFunction等。
@FunctionalInterface public interface BiFunction<T, U, R> { /** * Applies this function to the given arguments. * * @param t the first function argument * @param u the second function argument * @return the function result */ R apply(T t, U u);
咱們看一個BiFunction的例子:
//BiFunction Map<String, Integer> salaries = new HashMap<>(); salaries.put("alice", 100); salaries.put("jack", 200); salaries.put("mark", 300); salaries.replaceAll((name, oldValue) -> name.equals("alice") ? oldValue : oldValue + 200);
若是什麼參數都不須要,則可使用Supplier:
@FunctionalInterface public interface Supplier<T> { /** * Gets a result. * * @return a result */ T get(); }
Consumer接收一個參數,可是不返回任何值,咱們看下Consumer的定義:
@FunctionalInterface public interface Consumer<T> { /** * Performs this operation on the given argument. * * @param t the input argument */ void accept(T t);
看一個Consumer的具體應用:
//Consumer nameMap.forEach((name, age) -> System.out.println(name + " is " + age + " years old"));
Predicate接收一個參數,返回boolean值:
@FunctionalInterface public interface Predicate<T> { /** * Evaluates this predicate on the given argument. * * @param t the input argument * @return {@code true} if the input argument matches the predicate, * otherwise {@code false} */ boolean test(T t);
若是用在集合類的過濾上面那是極好的:
//Predicate List<String> names = Arrays.asList("A", "B", "C", "D", "E"); List<String> namesWithA = names.stream() .filter(name -> name.startsWith("A")) .collect(Collectors.toList());
Operator接收和返回一樣的類型,有不少種Operator:UnaryOperator BinaryOperator ,DoubleUnaryOperator, IntUnaryOperator, LongUnaryOperator, DoubleBinaryOperator, IntBinaryOperator, LongBinaryOperator等。
@FunctionalInterface public interface IntUnaryOperator { /** * Applies this operator to the given operand. * * @param operand the operand * @return the operator result */ int applyAsInt(int operand);
咱們看一個BinaryOperator的例子:
//Operator List<Integer> values = Arrays.asList(1, 2, 3, 4, 5); int sum = values.stream() .reduce(0, (i1, i2) -> i1 + i2);
Lambda表達式java 8引入的函數式編程框架。以前的文章中咱們也講過Lambda表達式的基本用法。
本文將會在以前的文章基礎上更加詳細的講解Lambda表達式在實際應用中的最佳實踐經驗。
以前的文章咱們講到了,java在java.util.function包中定義了不少Function接口。基本上涵蓋了咱們可以想到的各類類型。
假如咱們自定義了下面的Functional interface:
@FunctionalInterface public interface Usage { String method(String string); }
而後咱們須要在一個test方法中傳入該interface:
public String test(String string, Usage usage) { return usage.method(string); }
上面咱們定義的函數接口須要實現method方法,接收一個String,返回一個String。這樣咱們徹底可使用Function來代替:
public String test(String string, Function<String, String> fn) { return fn.apply(string); }
使用標準接口的好處就是,不要重複造輪子。
雖然@FunctionalInterface不是必須的,不使用@FunctionalInterface也能夠定義一個Functional Interface。
可是使用@FunctionalInterface能夠在違背Functional Interface定義的時候報警。
若是是在維護一個大型項目中,加上@FunctionalInterface註解能夠清楚的讓其餘人瞭解這個類的做用。
從而使代碼更加規範和更加可用。
因此咱們須要這樣定義:
@FunctionalInterface public interface Usage { String method(String string); }
而不是:
public interface Usage { String method(String string); }
Functional Interface是指只有一個未實現的抽象方法的接口。
若是該Interface中有多個方法,則可使用default關鍵字爲其提供一個默認的實現。
可是咱們知道Interface是能夠多繼承的,一個class能夠實現多個Interface。 若是多個Interface中定義了相同的default方法,則會報錯。
一般來講default關鍵字通常用在升級項目中,避免代碼報錯。
仍是上面的例子:
@FunctionalInterface public interface Usage { String method(String string); }
要實例化Usage,咱們可使用new關鍵詞:
Usage usage = new Usage() { @Override public String method(String string) { return string; } };
可是最好的辦法就是用lambda表達式:
Usage usage = parameter -> parameter;
怎麼理解呢? 咱們看下面兩個方法:
public class ProcessorImpl implements Processor { @Override public String process(Callable<String> c) throws Exception { // implementation details } @Override public String process(Supplier<String> s) { // implementation details } }
兩個方法的方法名是同樣的,只有傳入的參數不一樣。可是兩個參數都是Functional Interface,均可以用一樣的lambda表達式來表示。
在調用的時候:
String result = processor.process(() -> "test");
由於區別不了到底調用的哪一個方法,則會報錯。
最好的辦法就是將兩個方法的名字修改成不一樣的。
雖然咱們以前講到使用lambda表達式能夠替換內部類。可是二者的做用域範圍是不一樣的。
在內部類中,會建立一個新的做用域範圍,在這個做用域範圍以內,你能夠定義新的變量,而且能夠用this引用它。
可是在Lambda表達式中,並無定義新的做用域範圍,若是在Lambda表達式中使用this,則指向的是外部類。
咱們舉個例子:
private String value = "Outer scope value"; public String scopeExperiment() { Usage usage = new Usage() { String value = "Inner class value"; @Override public String method(String string) { return this.value; } }; String result = usage.method(""); Usage usageLambda = parameter -> { String value = "Lambda value"; return this.value; }; String resultLambda = usageLambda.method(""); return "Results: result = " + result + ", resultLambda = " + resultLambda; }
上面的例子將會輸出「Results: result = Inner class value, resultLambda = Outer scope value」
一般來講一行代碼便可。若是你有很是多的邏輯,能夠將這些邏輯封裝成一個方法,在lambda表達式中調用該方法便可。
由於lambda表達式說到底仍是一個表達式,表達式固然越短越好。
java經過類型推斷來判斷傳入的參數類型,因此咱們在lambda表達式的參數中儘可能不傳參數類型,像下面這樣:
(a, b) -> a.toLowerCase() + b.toLowerCase();
而不是:
(String a, String b) -> a.toLowerCase() + b.toLowerCase();
若是隻有一個參數的時候,不須要帶括號:
a -> a.toLowerCase();
而不是:
(a) -> a.toLowerCase();
返回值不須要帶return:
a -> a.toLowerCase();
而不是:
a -> {return a.toLowerCase()};
爲了讓lambda表達式更加簡潔,在可使用方法引用的時候,咱們可使用方法引用:
a -> a.toLowerCase();
能夠被替換爲:
String::toLowerCase;
若是在lambda表達式中引用了non-final變量,則會報錯。
effectively final是什麼意思呢?這個是一個近似final的意思。只要一個變量只被賦值一次,那麼編譯器將會把這個變量看做是effectively final的。
String localVariable = "Local"; Usage usage = parameter -> { localVariable = parameter; return localVariable; };
上面的例子中localVariable被賦值了兩次,從而不是一個Effectively Final 變量,會編譯報錯。
爲何要這樣設置呢?由於lambda表達式一般會用在並行計算中,當有多個線程同時訪問變量的時候Effectively Final 變量能夠防止不能夠預料的修改。
在Stream處理中,咱們一般會遇到if/else的判斷狀況,對於這樣的問題咱們怎麼處理呢?
還記得咱們在上一篇文章lambda最佳實踐中提到,lambda表達式應該越簡潔越好,不要在其中寫臃腫的業務邏輯。
接下來咱們看一個具體的例子。
假如咱們有一個1 to 10的list,咱們想要分別挑選出奇數和偶數出來,傳統的寫法,咱們會這樣使用:
public void inForEach(){ List<Integer> ints = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); ints.stream() .forEach(i -> { if (i.intValue() % 2 == 0) { System.out.println("i is even"); } else { System.out.println("i is old"); } }); }
上面的例子中,咱們把if/else的邏輯放到了forEach中,雖然沒有任何問題,可是代碼顯得很是臃腫。
接下來看看怎麼對其進行改寫。
咱們能夠把if/else的邏輯改寫爲兩個filter:
List<Integer> ints = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); Stream<Integer> evenIntegers = ints.stream() .filter(i -> i.intValue() % 2 == 0); Stream<Integer> oddIntegers = ints.stream() .filter(i -> i.intValue() % 2 != 0);
有了這兩個filter,再在filter事後的stream中使用for each:
evenIntegers.forEach(i -> System.out.println("i is even")); oddIntegers.forEach(i -> System.out.println("i is old"));
怎麼樣,代碼是否是很是簡潔明瞭。
Map是java中很是經常使用的一個集合類型,咱們一般也須要去遍歷Map去獲取某些值,java 8引入了Stream的概念,那麼咱們怎麼在Map中使用Stream呢?
Map有key,value還有表示key,value總體的Entry。
建立一個Map:
Map<String, String> someMap = new HashMap<>();
獲取Map的entrySet:
Set<Map.Entry<String, String>> entries = someMap.entrySet();
獲取map的key:
Set<String> keySet = someMap.keySet();
獲取map的value:
Collection<String> values = someMap.values();
上面咱們能夠看到有這樣幾個集合:Map,Set,Collection。
除了Map沒有stream,其餘兩個都有stream方法:
Stream<Map.Entry<String, String>> entriesStream = entries.stream(); Stream<String> valuesStream = values.stream(); Stream<String> keysStream = keySet.stream();
咱們能夠經過其餘幾個stream來遍歷map。
咱們先給map添加幾個值:
someMap.put("jack","20"); someMap.put("bill","35");
上面咱們添加了name和age字段。
若是咱們想查找age=20的key,則能夠這樣作:
Optional<String> optionalName = someMap.entrySet().stream() .filter(e -> "20".equals(e.getValue())) .map(Map.Entry::getKey) .findFirst(); log.info(optionalName.get());
由於返回的是Optional,若是值不存在的狀況下,咱們也能夠處理:
optionalName = someMap.entrySet().stream() .filter(e -> "Non ages".equals(e.getValue())) .map(Map.Entry::getKey).findFirst(); log.info("{}",optionalName.isPresent());
上面的例子咱們經過調用isPresent來判斷age是否存在。
若是有多個值,咱們能夠這樣寫:
someMap.put("alice","20"); List<String> listnames = someMap.entrySet().stream() .filter(e -> e.getValue().equals("20")) .map(Map.Entry::getKey) .collect(Collectors.toList()); log.info("{}",listnames);
上面咱們調用了collect(Collectors.toList())將值轉成了List。
上面咱們獲取的map的key,一樣的咱們也能夠獲取map的value:
List<String> listAges = someMap.entrySet().stream() .filter(e -> e.getKey().equals("alice")) .map(Map.Entry::getValue) .collect(Collectors.toList()); log.info("{}",listAges);
上面咱們匹配了key值是alice的value。
java 8 stream做爲流式操做有兩種操做類型,中間操做和終止操做。這兩種有什麼區別呢?
咱們看一個peek的例子:
Stream<String> stream = Stream.of("one", "two", "three","four"); stream.peek(System.out::println);
上面的例子中,咱們的本意是打印出Stream的值,但實際上沒有任何輸出。
爲何呢?
一個java 8的stream是由三部分組成的。數據源,零個或一個或多箇中間操做,一個或零個終止操做。
中間操做是對數據的加工,注意,中間操做是lazy操做,並不會立馬啓動,須要等待終止操做纔會執行。
終止操做是stream的啓動操做,只有加上終止操做,stream纔會真正的開始執行。
因此,問題解決了,peek是一箇中間操做,因此上面的例子沒有任何輸出。
咱們看下peek的文檔說明:peek主要被用在debug用途。
咱們看下debug用途的使用:
Stream.of("one", "two", "three","four").filter(e -> e.length() > 3) .peek(e -> System.out.println("Filtered value: " + e)) .map(String::toUpperCase) .peek(e -> System.out.println("Mapped value: " + e)) .collect(Collectors.toList());
上面的例子輸出:
Filtered value: three Mapped value: THREE Filtered value: four Mapped value: FOUR
上面的例子咱們輸出了stream的中間值,方便咱們的調試。
爲何只做爲debug使用呢?咱們再看一個例子:
Stream.of("one", "two", "three","four").peek(u -> u.toUpperCase()) .forEach(System.out::println);
上面的例子咱們使用peek將element轉換成爲upper case。而後輸出:
one two three four
能夠看到stream中的元素並無被轉換成大寫格式。
再看一個map的對比:
Stream.of("one", "two", "three","four").map(u -> u.toUpperCase()) .forEach(System.out::println);
輸出:
ONE TWO THREE FOUR
能夠看到map是真正的對元素進行了轉換。
固然peek也有例外,假如咱們Stream裏面是一個對象會怎麼樣?
@Data @AllArgsConstructor static class User{ private String name; }
List<User> userList=Stream.of(new User("a"),new User("b"),new User("c")).peek(u->u.setName("kkk")).collect(Collectors.toList()); log.info("{}",userList);
輸出結果:
10:25:59.784 [main] INFO com.flydean.PeekUsage - [PeekUsage.User(name=kkk), PeekUsage.User(name=kkk), PeekUsage.User(name=kkk)]
咱們看到若是是對象的話,實際的結果會被改變。
爲何peek和map有這樣的區別呢?
咱們看下peek和map的定義:
Stream<T> peek(Consumer<? super T> action) <R> Stream<R> map(Function<? super T, ? extends R> mapper);
peek接收一個Consumer,而map接收一個Function。
Consumer是沒有返回值的,它只是對Stream中的元素進行某些操做,可是操做以後的數據並不返回到Stream中,因此Stream中的元素仍是原來的元素。
而Function是有返回值的,這意味着對於Stream的元素的全部操做都會做爲新的結果返回到Stream中。
這就是爲何peek String不會發生變化而peek Object會發送變化的緣由。
java 8中引入了lambda表達式,lambda表達式可讓咱們的代碼更加簡介,業務邏輯更加清晰,可是在lambda表達式中使用的Functional Interface並無很好的處理異常,由於JDK提供的這些Functional Interface一般都是沒有拋出異常的,這意味着須要咱們本身手動來處理異常。
由於異常分爲Unchecked Exception和checked Exception,咱們分別來討論。
Unchecked exception也叫作RuntimeException,出現RuntimeException一般是由於咱們的代碼有問題。RuntimeException是不須要被捕獲的。也就是說若是有RuntimeException,沒有捕獲也能夠經過編譯。
咱們看一個例子:
List<Integer> integers = Arrays.asList(1,2,3,4,5); integers.forEach(i -> System.out.println(1 / i));
這個例子是能夠編譯成功的,可是上面有一個問題,若是list中有一個0的話,就會拋出ArithmeticException。
雖然這個是一個Unchecked Exception,可是咱們仍是想處理一下:
integers.forEach(i -> { try { System.out.println(1 / i); } catch (ArithmeticException e) { System.err.println( "Arithmetic Exception occured : " + e.getMessage()); } });
上面的例子咱們使用了try,catch來處理異常,簡單可是破壞了lambda表達式的最佳實踐。代碼變得臃腫。
咱們將try,catch移到一個wrapper方法中:
static Consumer<Integer> lambdaWrapper(Consumer<Integer> consumer) { return i -> { try { consumer.accept(i); } catch (ArithmeticException e) { System.err.println( "Arithmetic Exception occured : " + e.getMessage()); } }; }
則原來的調用變成這樣:
integers.forEach(lambdaWrapper(i -> System.out.println(1 / i)));
可是上面的wrapper固定了捕獲ArithmeticException,咱們再將其改編成一個更通用的類:
static <T, E extends Exception> Consumer<T> consumerWrapperWithExceptionClass(Consumer<T> consumer, Class<E> clazz) { return i -> { try { consumer.accept(i); } catch (Exception ex) { try { E exCast = clazz.cast(ex); System.err.println( "Exception occured : " + exCast.getMessage()); } catch (ClassCastException ccEx) { throw ex; } } }; }
上面的類傳入一個class,並將其cast到異常,若是能cast,則處理,不然拋出異常。
這樣處理以後,咱們這樣調用:
integers.forEach( consumerWrapperWithExceptionClass( i -> System.out.println(1 / i), ArithmeticException.class));
checked Exception是必需要處理的異常,咱們仍是看個例子:
static void throwIOException(Integer integer) throws IOException { }
List<Integer> integers = Arrays.asList(1, 2, 3, 4, 5); integers.forEach(i -> throwIOException(i));
上面咱們定義了一個方法拋出IOException,這是一個checked Exception,須要被處理,因此在下面的forEach中,程序會編譯失敗,由於沒有處理相應的異常。
最簡單的辦法就是try,catch住,以下所示:
integers.forEach(i -> { try { throwIOException(i); } catch (IOException e) { throw new RuntimeException(e); } });
固然,這樣的作法的壞處咱們在上面已經講過了,一樣的,咱們能夠定義一個新的wrapper方法:
static <T> Consumer<T> consumerWrapper( ThrowingConsumer<T, Exception> throwingConsumer) { return i -> { try { throwingConsumer.accept(i); } catch (Exception ex) { throw new RuntimeException(ex); } }; }
咱們這樣調用:
integers.forEach(consumerWrapper(i -> throwIOException(i)));
咱們也能夠封裝一下異常:
static <T, E extends Exception> Consumer<T> consumerWrapperWithExceptionClass( ThrowingConsumer<T, E> throwingConsumer, Class<E> exceptionClass) { return i -> { try { throwingConsumer.accept(i); } catch (Exception ex) { try { E exCast = exceptionClass.cast(ex); System.err.println( "Exception occured : " + exCast.getMessage()); } catch (ClassCastException ccEx) { throw new RuntimeException(ex); } } }; }
而後這樣調用:
integers.forEach(consumerWrapperWithExceptionClass( i -> throwIOException(i), IOException.class));
以前的文章咱們講到,在stream中處理異常,須要將checked exception轉換爲unchecked exception來處理。
咱們是這樣作的:
static <T> Consumer<T> consumerWrapper( ThrowingConsumer<T, Exception> throwingConsumer) { return i -> { try { throwingConsumer.accept(i); } catch (Exception ex) { throw new RuntimeException(ex); } }; }
將異常捕獲,而後封裝成爲RuntimeException。
封裝成RuntimeException感受老是有那麼一點點問題,那麼有沒有什麼更好的辦法?
java的類型推斷你們應該都知道,若是是<T extends Throwable> 這樣的形式,那麼T將會被認爲是RuntimeException!
咱們看下例子:
public class RethrowException { public static <T extends Exception, R> R throwException(Exception t) throws T { throw (T) t; // just throw it, convert checked exception to unchecked exception } }
上面的類中,咱們定義了一個throwException方法,接收一個Exception參數,將其轉換爲T,這裏的T就是unchecked exception。
接下來看下具體的使用:
@Slf4j public class RethrowUsage { public static void main(String[] args) { try { throwIOException(); } catch (IOException e) { log.error(e.getMessage(),e); RethrowException.throwException(e); } } static void throwIOException() throws IOException{ throw new IOException("io exception"); } }
上面的例子中,咱們將一個IOException轉換成了一個unchecked exception。
在java stream中,咱們一般須要將處理後的stream轉換成集合類,這個時候就須要用到stream.collect方法。collect方法須要傳入一個Collector類型,要實現Collector仍是很麻煩的,須要實現好幾個接口。
因而java提供了更簡單的Collectors工具類來方便咱們構建Collector。
下面咱們將會具體講解Collectors的用法。
假如咱們有這樣兩個list:
List<String> list = Arrays.asList("jack", "bob", "alice", "mark"); List<String> duplicateList = Arrays.asList("jack", "jack", "alice", "mark");
上面一個是無重複的list,一個是帶重複數據的list。接下來的例子咱們會用上面的兩個list來說解Collectors的用法。
List<String> listResult = list.stream().collect(Collectors.toList()); log.info("{}",listResult);
將stream轉換爲list。這裏轉換的list是ArrayList,若是想要轉換成特定的list,須要使用toCollection方法。
Set<String> setResult = list.stream().collect(Collectors.toSet()); log.info("{}",setResult);
toSet將Stream轉換成爲set。這裏轉換的是HashSet。若是須要特別指定set,那麼須要使用toCollection方法。
由於set中是沒有重複的元素,若是咱們使用duplicateList來轉換的話,會發現最終結果中只有一個jack。
Set<String> duplicateSetResult = duplicateList.stream().collect(Collectors.toSet()); log.info("{}",duplicateSetResult);
上面的toMap,toSet轉換出來的都是特定的類型,若是咱們須要自定義,則可使用toCollection()
List<String> custListResult = list.stream().collect(Collectors.toCollection(LinkedList::new)); log.info("{}",custListResult);
上面的例子,咱們轉換成了LinkedList。
toMap接收兩個參數,第一個參數是keyMapper,第二個參數是valueMapper:
Map<String, Integer> mapResult = list.stream() .collect(Collectors.toMap(Function.identity(), String::length)); log.info("{}",mapResult);
若是stream中有重複的值,則轉換會報IllegalStateException異常:
Map<String, Integer> duplicateMapResult = duplicateList.stream() .collect(Collectors.toMap(Function.identity(), String::length));
怎麼解決這個問題呢?咱們能夠這樣:
Map<String, Integer> duplicateMapResult2 = duplicateList.stream() .collect(Collectors.toMap(Function.identity(), String::length, (item, identicalItem) -> item)); log.info("{}",duplicateMapResult2);
在toMap中添加第三個參數mergeFunction,來解決衝突的問題。
collectingAndThen容許咱們對生成的集合再作一次操做。
List<String> collectAndThenResult = list.stream() .collect(Collectors.collectingAndThen(Collectors.toList(), l -> {return new ArrayList<>(l);})); log.info("{}",collectAndThenResult);
Joining用來鏈接stream中的元素:
String joinResult = list.stream().collect(Collectors.joining()); log.info("{}",joinResult); String joinResult1 = list.stream().collect(Collectors.joining(" ")); log.info("{}",joinResult1); String joinResult2 = list.stream().collect(Collectors.joining(" ", "prefix","suffix")); log.info("{}",joinResult2);
能夠不帶參數,也能夠帶一個參數,也能夠帶三個參數,根據咱們的須要進行選擇。
counting主要用來統計stream中元素的個數:
Long countResult = list.stream().collect(Collectors.counting()); log.info("{}",countResult);
SummarizingDouble/Long/Int爲stream中的元素生成了統計信息,返回的結果是一個統計類:
IntSummaryStatistics intResult = list.stream() .collect(Collectors.summarizingInt(String::length)); log.info("{}",intResult);
輸出結果:
22:22:35.238 [main] INFO com.flydean.CollectorUsage - IntSummaryStatistics{count=4, sum=16, min=3, average=4.000000, max=5}
averagingDouble/Long/Int()對stream中的元素作平均:
Double averageResult = list.stream().collect(Collectors.averagingInt(String::length)); log.info("{}",averageResult);
summingDouble/Long/Int()對stream中的元素作sum操做:
Double summingResult = list.stream().collect(Collectors.summingDouble(String::length)); log.info("{}",summingResult);
maxBy()/minBy()根據提供的Comparator,返回stream中的最大或者最小值:
Optional<String> maxByResult = list.stream().collect(Collectors.maxBy(Comparator.naturalOrder())); log.info("{}",maxByResult);
GroupingBy根據某些屬性進行分組,並返回一個Map:
Map<Integer, Set<String>> groupByResult = list.stream() .collect(Collectors.groupingBy(String::length, Collectors.toSet())); log.info("{}",groupByResult);
PartitioningBy是一個特別的groupingBy,PartitioningBy返回一個Map,這個Map是以boolean值爲key,從而將stream分紅兩部分,一部分是匹配PartitioningBy條件的,一部分是不知足條件的:
Map<Boolean, List<String>> partitionResult = list.stream() .collect(Collectors.partitioningBy(s -> s.length() > 3)); log.info("{}",partitionResult);
看下運行結果:
22:39:37.082 [main] INFO com.flydean.CollectorUsage - {false=[bob], true=[jack, alice, mark]}
結果被分紅了兩部分。
在以前的java collectors文章裏面,咱們講到了stream的collect方法能夠調用Collectors裏面的toList()或者toMap()方法,將結果轉換爲特定的集合類。
今天咱們介紹一下怎麼自定義一個Collector。
咱們先看一下Collector的定義:
Collector接口須要實現supplier(),accumulator(),combiner(),finisher(),characteristics()這5個接口。
同時Collector也提供了兩個靜態of方法來方便咱們建立一個Collector實例。
咱們能夠看到兩個方法的參數跟Collector接口須要實現的接口是一一對應的。
下面分別解釋一下這幾個參數:
Supplier是一個函數,用來建立一個新的可變的集合。換句話說Supplier用來建立一個初始的集合。
accumulator定義了累加器,用來將原始元素添加到集合中。
combiner用來將兩個集合合併成一個。
finisher將集合轉換爲最終的集合類型。
characteristics表示該集合的特徵。這個不是必須的參數。
Collector定義了三個參數類型,T是輸入元素的類型,A是reduction operation的累加類型也就是Supplier的初始類型,R是最終的返回類型。 咱們畫個圖來看一下這些類型之間的轉換關係:
有了這幾個參數,咱們接下來看看怎麼使用這些參數來構造一個自定義Collector。
咱們利用Collector的of方法來建立一個不變的Set:
public static <T> Collector<T, Set<T>, Set<T>> toImmutableSet() { return Collector.of(HashSet::new, Set::add, (left, right) -> { left.addAll(right); return left; }, Collections::unmodifiableSet); }
上面的例子中,咱們HashSet::new做爲supplier,Set::add做爲accumulator,自定義了一個方法做爲combiner,最後使用Collections::unmodifiableSet將集合轉換成不可變集合。
上面咱們固定使用HashSet::new做爲初始集合的生成方法,實際上,上面的方法能夠更加通用:
public static <T, A extends Set<T>> Collector<T, A, Set<T>> toImmutableSet( Supplier<A> supplier) { return Collector.of( supplier, Set::add, (left, right) -> { left.addAll(right); return left; }, Collections::unmodifiableSet); }
上面的方法,咱們將supplier提出來做爲一個參數,由外部來定義。
看下上面兩個方法的測試:
@Test public void toImmutableSetUsage(){ Set<String> stringSet1=Stream.of("a","b","c","d") .collect(ImmutableSetCollector.toImmutableSet()); log.info("{}",stringSet1); Set<String> stringSet2=Stream.of("a","b","c","d") .collect(ImmutableSetCollector.toImmutableSet(LinkedHashSet::new)); log.info("{}",stringSet2); }
輸出:
INFO com.flydean.ImmutableSetCollector - [a, b, c, d] INFO com.flydean.ImmutableSetCollector - [a, b, c, d]
Stream API提供了一些預約義的reduce操做,好比count(), max(), min(), sum()等。若是咱們須要本身寫reduce的邏輯,則可使用reduce方法。
本文將會詳細分析一下reduce方法的使用,並給出具體的例子。
Stream類中有三種reduce,分別接受1個參數,2個參數,和3個參數,首先來看一個參數的狀況:
Optional<T> reduce(BinaryOperator<T> accumulator);
該方法接受一個BinaryOperator參數,BinaryOperator是一個@FunctionalInterface,須要實現方法:
R apply(T t, U u);
accumulator告訴reduce方法怎麼去累計stream中的數據。
舉個例子:
List<Integer> intList = Arrays.asList(1,2,3); Optional<Integer> result1=intList.stream().reduce(Integer::sum); log.info("{}",result1);
上面的例子輸出結果:
com.flydean.ReduceUsage - Optional[6]
一個參數的例子很簡單。這裏再也不多說。
接下來咱們再看一下兩個參數的例子:
T reduce(T identity, BinaryOperator<T> accumulator);
這個方法接收兩個參數:identity和accumulator。多出了一個參數identity。
也許在有些文章裏面有人告訴你identity是reduce的初始化值,能夠隨便指定,以下所示:
Integer result2=intList.stream().reduce(100, Integer::sum); log.info("{}",result2);
上面的例子,咱們計算的值是106。
若是咱們將stream改爲parallelStream:
Integer result3=intList.parallelStream().reduce(100, Integer::sum); log.info("{}",result3);
得出的結果就是306。
爲何是306呢?由於在並行計算的時候,每一個線程的初始累加值都是100,最後3個線程加出來的結果就是306。
並行計算和非並行計算的結果竟然不同,這確定不是JDK的問題,咱們再看一下JDK中對identity的說明:
identity必須是accumulator函數的一個identity,也就是說必須知足:對於全部的t,都必須知足 accumulator.apply(identity, t) == t
因此這裏咱們傳入100是不對的,由於sum(100+1)!= 1。
這裏sum方法的identity只能是0。
若是咱們用0做爲identity,則stream和parallelStream計算出的結果是同樣的。這就是identity的真正意圖。
下面再看一下三個參數的方法:
<U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner);
和前面的方法不一樣的是,多了一個combiner,這個combiner用來合併多線程計算的結果。
一樣的,identity須要知足combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)
你們可能注意到了爲何accumulator的類型是BiFunction而combiner的類型是BinaryOperator?
public interface BinaryOperator<T> extends BiFunction<T,T,T>
BinaryOperator是BiFunction的子接口。BiFunction中定義了要實現的apply方法。
其實reduce底層方法的實現只用到了apply方法,並無用到接口中其餘的方法,因此我猜想這裏的不一樣只是爲了簡單的區分。
雖然reduce是一個很經常使用的方法,可是你們必定要遵循identity的規範,並非全部的identity都是合適的。
Spliterator是在java 8引入的一個接口,它一般和stream一塊兒使用,用來遍歷和分割序列。
只要用到stream的地方都須要Spliterator,好比List,Collection,IO channel等等。
咱們先看一下Collection中stream方法的定義:
default Stream<E> stream() { return StreamSupport.stream(spliterator(), false); }
default Stream<E> parallelStream() { return StreamSupport.stream(spliterator(), true); }
咱們能夠看到,無論是並行stream仍是非並行stream,都是經過StreamSupport來構造的,而且都須要傳入一個spliterator的參數。
好了,咱們知道了spliterator是作什麼的以後,看一下它的具體結構:
spliterator有四個必須實現的方法,咱們接下來進行詳細的講解。
tryAdvance就是對stream中的元素進行處理的方法,若是元素存在,則對他進行處理,並返回true,不然返回false。
若是咱們不想處理stream後續的元素,則在tryAdvance中返回false便可,利用這個特徵,咱們能夠中斷stream的處理。這個例子我將會在後面的文章中講到。
trySplit嘗試對現有的stream進行分拆,通常用在parallelStream的狀況,由於在併發stream下,咱們須要用多線程去處理stream的不一樣元素,trySplit就是對stream中元素進行分拆處理的方法。
理想狀況下trySplit應該將stream拆分紅數目相同的兩部分才能最大提高性能。
estimateSize表示Spliterator中待處理的元素,在trySplit以前和以後通常是不一樣的,後面咱們會在具體的例子中說明。
characteristics表示這個Spliterator的特徵,Spliterator有8大特徵:
public static final int ORDERED = 0x00000010;//表示元素是有序的(每一次遍歷結果相同) public static final int DISTINCT = 0x00000001;//表示元素不重複 public static final int SORTED = 0x00000004;//表示元素是按必定規律進行排列(有指定比較器) public static final int SIZED = 0x00000040;// 表示大小是固定的 public static final int NONNULL = 0x00000100;//表示沒有null元素 public static final int IMMUTABLE = 0x00000400;//表示元素不可變 public static final int CONCURRENT = 0x00001000;//表示迭代器能夠多線程操做 public static final int SUBSIZED = 0x00004000;//表示子Spliterators都具備SIZED特性
一個Spliterator能夠有多個特徵,多個特徵進行or運算,最後獲得最終的characteristics。
上面咱們討論了Spliterator一些關鍵方法,如今咱們舉一個具體的例子:
@AllArgsConstructor @Data public class CustBook { private String name; }
先定義一個CustBook類,裏面放一個name變量。
定義一個方法,來生成一個CustBook的list:
public static List<CustBook> generateElements() { return Stream.generate(() -> new CustBook("cust book")) .limit(1000) .collect(Collectors.toList()); }
咱們定義一個call方法,在call方法中調用了tryAdvance方法,傳入了咱們自定義的處理方法。這裏咱們修改book的name,並附加額外的信息。
public String call(Spliterator<CustBook> spliterator) { int current = 0; while (spliterator.tryAdvance(a -> a.setName("test name" .concat("- add new name")))) { current++; } return Thread.currentThread().getName() + ":" + current; }
最後,寫一下測試方法:
@Test public void useTrySplit(){ Spliterator<CustBook> split1 = SpliteratorUsage.generateElements().spliterator(); Spliterator<CustBook> split2 = split1.trySplit(); log.info("before tryAdvance: {}",split1.estimateSize()); log.info("Characteristics {}",split1.characteristics()); log.info(call(split1)); log.info(call(split2)); log.info("after tryAdvance {}",split1.estimateSize()); }
運行的結果以下:
23:10:08.852 [main] INFO com.flydean.SpliteratorUsage - before tryAdvance: 500 23:10:08.857 [main] INFO com.flydean.SpliteratorUsage - Characteristics 16464 23:10:08.858 [main] INFO com.flydean.SpliteratorUsage - main:500 23:10:08.858 [main] INFO com.flydean.SpliteratorUsage - main:500 23:10:08.858 [main] INFO com.flydean.SpliteratorUsage - after tryAdvance 0
List總共有1000條數據,調用一次trySplit以後,將List分紅了兩部分,每部分500條數據。
注意,在tryAdvance調用以後,estimateSize變爲0,表示全部的元素都已經被處理完畢。
再看一下這個Characteristics=16464,轉換爲16進制:Ox4050 = ORDERED or SIZED or SUBSIZED 這三個的或運算。
這也是ArrayList的基本特徵。
咱們一般須要在java stream中遍歷處理裏面的數據,其中foreach是最最經常使用的方法。
可是有時候咱們並不想處理完全部的數據,或者有時候Stream可能很是的長,或者根本就是無限的。
一種方法是先filter出咱們須要處理的數據,而後再foreach遍歷。
那麼咱們如何直接break這個stream呢?今天本文重點講解一下這個問題。
上篇文章咱們在講Spliterator的時候提到了,在tryAdvance方法中,若是返回false,則Spliterator將會中止處理後續的元素。
經過這個思路,咱們能夠建立自定義Spliterator。
假如咱們有這樣一個stream:
Stream<Integer> ints = Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
咱們想定義一個操做,當x > 5的時候就中止。
咱們定義一個通用的Spliterator:
public class CustomSpliterator<T> extends Spliterators.AbstractSpliterator<T> { private Spliterator<T> splitr; private Predicate<T> predicate; private volatile boolean isMatched = true; public CustomSpliterator(Spliterator<T> splitr, Predicate<T> predicate) { super(splitr.estimateSize(), 0); this.splitr = splitr; this.predicate = predicate; } @Override public synchronized boolean tryAdvance(Consumer<? super T> consumer) { boolean hadNext = splitr.tryAdvance(elem -> { if (predicate.test(elem) && isMatched) { consumer.accept(elem); } else { isMatched = false; } }); return hadNext && isMatched; } }
在上面的類中,predicate是咱們將要傳入的判斷條件,咱們重寫了tryAdvance,經過將predicate.test(elem)加入判斷條件,從而當條件不知足的時候返回false.
看下怎麼使用:
@Slf4j public class CustomSpliteratorUsage { public static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<T> predicate) { CustomSpliterator<T> customSpliterator = new CustomSpliterator<>(stream.spliterator(), predicate); return StreamSupport.stream(customSpliterator, false); } public static void main(String[] args) { Stream<Integer> ints = Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); List<Integer> result = takeWhile(ints, x -> x < 5 ) .collect(Collectors.toList()); log.info(result.toString()); } }
咱們定義了一個takeWhile方法,接收Stream和predicate條件。
只有當predicate條件知足的時候纔會繼續,咱們看下輸出的結果:
[main] INFO com.flydean.CustomSpliteratorUsage - [1, 2, 3, 4]
除了使用Spliterator,咱們還能夠自定義forEach方法來使用本身的遍歷邏輯:
public class CustomForEach { public static class Breaker { private volatile boolean shouldBreak = false; public void stop() { shouldBreak = true; } boolean get() { return shouldBreak; } } public static <T> void forEach(Stream<T> stream, BiConsumer<T, Breaker> consumer) { Spliterator<T> spliterator = stream.spliterator(); boolean hadNext = true; Breaker breaker = new Breaker(); while (hadNext && !breaker.get()) { hadNext = spliterator.tryAdvance(elem -> { consumer.accept(elem, breaker); }); } } }
上面的例子中,咱們在forEach中引入了一個外部變量,經過判斷這個外部變量來決定是否進入spliterator.tryAdvance方法。
看下怎麼使用:
@Slf4j public class CustomForEachUsage { public static void main(String[] args) { Stream<Integer> ints = Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); List<Integer> result = new ArrayList<>(); CustomForEach.forEach(ints, (elem, breaker) -> { if (elem >= 5 ) { breaker.stop(); } else { result.add(elem); } }); log.info(result.toString()); } }
上面咱們用新的forEach方法,並經過判斷條件來重置判斷flag,從而達到break stream的目的。
Predicate是一個FunctionalInterface,表明的方法須要輸入一個參數,返回boolean類型。一般用在stream的filter中,表示是否知足過濾條件。
boolean test(T t);
咱們先看下在stream的filter中怎麼使用Predicate:
@Test public void basicUsage(){ List<String> stringList=Stream.of("a","b","c","d").filter(s -> s.startsWith("a")).collect(Collectors.toList()); log.info("{}",stringList); }
上面的例子很基礎了,這裏就很少講了。
若是咱們有多個Predicate條件,則可使用多個filter來進行過濾:
public void multipleFilters(){ List<String> stringList=Stream.of("a","ab","aac","ad").filter(s -> s.startsWith("a")) .filter(s -> s.length()>1) .collect(Collectors.toList()); log.info("{}",stringList); }
上面的例子中,咱們又添加了一個filter,在filter又添加了一個Predicate。
Predicate的定義是輸入一個參數,返回boolean值,那麼若是有多個測試條件,咱們能夠將其合併成一個test方法:
@Test public void complexPredicate(){ List<String> stringList=Stream.of("a","ab","aac","ad") .filter(s -> s.startsWith("a") && s.length()>1) .collect(Collectors.toList()); log.info("{}",stringList); }
上面的例子中,咱們把s.startsWith("a") && s.length()>1 做爲test的實現。
Predicate雖然是一個interface,可是它有幾個默認的方法能夠用來實現Predicate之間的組合操做。
好比:Predicate.and(), Predicate.or(), 和 Predicate.negate()。
下面看下他們的例子:
@Test public void combiningPredicate(){ Predicate<String> predicate1 = s -> s.startsWith("a"); Predicate<String> predicate2 = s -> s.length() > 1; List<String> stringList1 = Stream.of("a","ab","aac","ad") .filter(predicate1.and(predicate2)) .collect(Collectors.toList()); log.info("{}",stringList1); List<String> stringList2 = Stream.of("a","ab","aac","ad") .filter(predicate1.or(predicate2)) .collect(Collectors.toList()); log.info("{}",stringList2); List<String> stringList3 = Stream.of("a","ab","aac","ad") .filter(predicate1.or(predicate2.negate())) .collect(Collectors.toList()); log.info("{}",stringList3); }
實際上,咱們並不須要顯示的assign一個predicate,只要是知足
predicate接口的lambda表達式均可以看作是一個predicate。一樣能夠調用and,or和negate操做:
List<String> stringList4 = Stream.of("a","ab","aac","ad") .filter(((Predicate<String>)a -> a.startsWith("a")) .and(a -> a.length() > 1)) .collect(Collectors.toList()); log.info("{}",stringList4);
若是咱們有一個Predicate集合,咱們可使用reduce方法來對其進行合併運算:
@Test public void combiningPredicateCollection(){ List<Predicate<String>> allPredicates = new ArrayList<>(); allPredicates.add(a -> a.startsWith("a")); allPredicates.add(a -> a.length() > 1); List<String> stringList = Stream.of("a","ab","aac","ad") .filter(allPredicates.stream().reduce(x->true, Predicate::and)) .collect(Collectors.toList()); log.info("{}",stringList); }
上面的例子中,咱們調用reduce方法,對集合中的Predicate進行了and操做。
在java中,咱們能夠將特定的集合轉換成爲stream,那麼在有些狀況下,好比測試環境中,咱們須要構造必定數量元素的stream,須要怎麼處理呢?
這裏咱們能夠構建一個無限的stream,而後調用limit方法來限定返回的數目。
先看一個使用Stream.iterate來建立無限Stream的例子:
@Test public void infiniteStream(){ Stream<Integer> infiniteStream = Stream.iterate(0, i -> i + 1); List<Integer> collect = infiniteStream .limit(10) .collect(Collectors.toList()); log.info("{}",collect); }
上面的例子中,咱們經過調用Stream.iterate方法,建立了一個0,1,2,3,4....的無限stream。
而後調用limit(10)來獲取其中的前10個。最後調用collect方法將其轉換成爲一個集合。
看下輸出結果:
INFO com.flydean.InfiniteStreamUsage - [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
若是咱們想輸出自定義類型的集合,該怎麼處理呢?
首先,咱們定義一個自定義類型:
@Data @AllArgsConstructor public class IntegerWrapper { private Integer integer; }
而後利用Stream.generate的生成器來建立這個自定義類型:
public static IntegerWrapper generateCustType(){ return new IntegerWrapper(new Random().nextInt(100)); } @Test public void infiniteCustType(){ Supplier<IntegerWrapper> randomCustTypeSupplier = InfiniteStreamUsage::generateCustType; Stream<IntegerWrapper> infiniteStreamOfCustType = Stream.generate(randomCustTypeSupplier); List<IntegerWrapper> collect = infiniteStreamOfCustType .skip(10) .limit(10) .collect(Collectors.toList()); log.info("{}",collect); }
看下輸出結果:
INFO com.flydean.InfiniteStreamUsage - [IntegerWrapper(integer=46), IntegerWrapper(integer=42), IntegerWrapper(integer=67), IntegerWrapper(integer=11), IntegerWrapper(integer=14), IntegerWrapper(integer=80), IntegerWrapper(integer=15), IntegerWrapper(integer=19), IntegerWrapper(integer=72), IntegerWrapper(integer=41)]
以前咱們講到parallelStream的底層使用到了ForkJoinPool來提交任務的,默認狀況下ForkJoinPool爲每個處理器建立一個線程,parallelStream若是沒有特別指明的狀況下,都會使用這個共享線程池來提交任務。
那麼在特定的狀況下,咱們想使用自定義的ForkJoinPool該怎麼處理呢?
假如咱們想作一個從1到1000的加法,咱們能夠用並行stream這樣作:
List<Integer> integerList= IntStream.range(1,1000).boxed().collect(Collectors.toList()); ForkJoinPool customThreadPool = new ForkJoinPool(4); Integer total= integerList.parallelStream().reduce(0, Integer::sum); log.info("{}",total);
輸出結果:
INFO com.flydean.CustThreadPool - 499500
上面的例子使用的共享的thread pool。 咱們看下怎麼使用自定義的thread pool來提交併行stream:
List<Integer> integerList= IntStream.range(1,1000).boxed().collect(Collectors.toList()); ForkJoinPool customThreadPool = new ForkJoinPool(4); Integer actualTotal = customThreadPool.submit( () -> integerList.parallelStream().reduce(0, Integer::sum)).get(); log.info("{}",actualTotal);
上面的例子中,咱們定義了一個4個線程的ForkJoinPool,並使用它來提交了這個parallelStream。
輸出結果:
INFO com.flydean.CustThreadPool - 499500
若是不想使用公共的線程池,則可使用自定義的ForkJoinPool來提交。
本文統一介紹了Stream和lambda表達式的使用,涵蓋了Stream和lambda表達式的各個小的細節,但願你們可以喜歡。
本文的代碼https://github.com/ddean2009/learn-java-streams/
本文的PDFjava-stream-lambda-all-in-one.pdf
最通俗的解讀,最深入的乾貨,最簡潔的教程,衆多你不知道的小技巧等你來發現!歡迎關注個人公衆號:「程序那些事」,懂技術,更懂你!