Java編程的邏輯 (92) - 函數式數據處理 (上)

本系列文章經補充和完善,已修訂整理成書《Java編程的邏輯》,由機械工業出版社華章分社出版,於2018年1月上市熱銷,讀者好評如潮!各大網店和書店有售,歡迎購買,京東自營連接http://item.jd.com/12299018.htmlhtml


上節咱們介紹了Lambda表達式和函數式接口,本節探討它們的應用,函數式數據處理,針對常見的集合數據處理,Java 8引入了一套新的類庫,位於包java.util.stream下,稱之爲Stream API,這套API操做數據的思路,不一樣於咱們在38節55節介紹的容器類API,它們是函數式的,很是簡潔、靈活、易讀,具體有什麼不一樣呢?因爲內容較多,咱們分爲兩節來介紹,本節先介紹一些基本的API,下節討論一些高級功能。
java

基本概念
nginx

接口Stream相似於一個迭代器,但提供了更爲豐富的操做,Stream API的主要操做就定義在該接口中。 Java 8給Collection接口增長了兩個默認方法,它們能夠返回一個Stream,以下所示:git

default Stream<E> stream() {
    return StreamSupport.stream(spliterator(), false);
}

default Stream<E> parallelStream() {
    return StreamSupport.stream(spliterator(), true);
}

stream()返回的是一個順序流,parallelStream()返回的是一個並行。順序流就是由一個線程執行操做。而並行流背後可能有多個線程並行執行,與以前介紹的併發技術不一樣,使用並行流不須要顯式管理線程,使用方法與順序流是同樣的。程序員

下面,咱們主要針對順序流,學習Stream接口,包括其用法和基本原理,隨後咱們再介紹下並行流。先來看一些簡單的示例。github

基本示例數據庫

上節演示時使用了學生類Student和學生列表List<Student> lists,本節繼續使用它們。編程

基本過濾swift

返回學生列表中90分以上的,傳統上的代碼通常是這樣的:數組

List<Student> above90List = new ArrayList<>();
for (Student t : students) {
    if (t.getScore() > 90) {
        above90List.add(t);
    }
}

使用Stream API,代碼能夠這樣:

List<Student> above90List = students.stream()
        .filter(t->t.getScore()>90)
        .collect(Collectors.toList());

先經過stream()獲得一個Stream對象,而後調用Stream上的方法,filter()過濾獲得90分以上的,它的返回值依然是一個Stream,爲了轉換爲List,調用了collect方法並傳遞了一個Collectors.toList(),表示將結果收集到一個List中。

代碼更爲簡潔易讀了,這種數據處理方式被稱爲函數式數據處理,與傳統代碼相比,它的特色是:

  • 沒有顯式的循環迭代,循環過程被Stream的方法隱藏了
  • 提供了聲明式的處理函數,好比filter,它封裝了數據過濾的功能,而傳統代碼是命令式的,須要一步步的操做指令
  • 流暢式接口,方法調用連接在一塊兒,清晰易讀

基本轉換

根據學生列表返回名稱列表,傳統上的代碼通常是這樣:

List<String> nameList = new ArrayList<>(students.size());
for (Student t : students) {
    nameList.add(t.getName());
}

使用Stream API,代碼能夠這樣:

List<String> nameList = students.stream()
        .map(Student::getName)
        .collect(Collectors.toList());

這裏使用了Stream的map函數,它的參數是一個Function函數式接口,這裏傳遞了方法引用。       

基本的過濾和轉換組合

返回90分以上的學生名稱列表,傳統上的代碼通常是這樣:

List<String> nameList = new ArrayList<>();
for (Student t : students) {
    if (t.getScore() > 90) {
        nameList.add(t.getName());
    }
}

使用函數式數據處理的思路,能夠將這個問題分解爲由兩個基本函數實現:

  1. 過濾:獲得90分以上的學生列表
  2. 轉換:將學生列表轉換爲名稱列表

使用Stream API,能夠將基本函數filter()和map()結合起來,代碼能夠這樣:

List<String> above90Names = students.stream()
        .filter(t->t.getScore()>90)
        .map(Student::getName)
        .collect(Collectors.toList());

這種組合利用基本函數、聲明式實現集合數據處理功能的編程風格,就是函數式數據處理。

代碼更爲直觀易讀了,但你可能會擔憂它的性能有問題。filter()和map()都須要對流中的每一個元素操做一次,一塊兒使用會不會就須要遍歷兩次呢?答案是否認的,只須要一次。實際上,調用filter()和map()都不會執行任何實際的操做,它們只是在構建操做的流水線,調用collect纔會觸發實際的遍歷執行,在一次遍歷中完成過濾、轉換以及收集結果的任務。

像filter和map這種不實際觸發執行、用於構建流水線、返回Stream的操做被稱爲中間操做(intermediate operation),而像collect這種觸發實際執行、返回具體結果的操做被稱爲終端操做(terminal operation)。Stream API中還有更多的中間和終端操做,下面咱們具體來看下。

中間操做

除了filter和map,Stream API的中間操做還有distinct, sorted, skip, limit, peek, mapToLong, mapToInt, mapToDouble, flatMap等,咱們逐個來看下。

distinct

distinct返回一個新的Stream,過濾重複的元素,只留下惟一的元素,是否重複是根據equals方法來比較的,distinct能夠與其餘函數如filter, map結合使用。

好比,返回字符串列表中長度小於3的字符串、轉換爲小寫、只保留惟一的,代碼能夠爲:

List<String> list = Arrays.asList(new String[]{"abc","def","hello","Abc"});
List<String> retList = list.stream()
        .filter(s->s.length()<=3)
        .map(String::toLowerCase)
        .distinct()
        .collect(Collectors.toList());
System.out.println(retList);

輸出爲:

[abc, def]

雖然都是中間操做,但distinct與filter和map是不一樣的,filter和map都是無狀態的,對於流中的每個元素,它的處理都是獨立的,處理後即交給流水線中的下一個操做,但distinct不一樣,它是有狀態的,在處理過程當中,它須要在內部記錄以前出現過的元素,若是已經出現過,即重複元素,它就會過濾掉,不傳遞給流水線中的下一個操做。

對於順序流,內部實現時,distinct操做會使用HashSet記錄出現過的元素,若是流是有順序的,須要保留順序,會使用LinkedHashSet

sorted

有兩個sorted方法:

Stream<T> sorted()
Stream<T> sorted(Comparator<? super T> comparator)

它們都對流中的元素排序,都返回一個排序後的Stream,第一個方法假定元素實現了Comparable接口,第二個方法接受一個自定義的Comparator。

好比,過濾獲得90分以上的學生,而後按分數從高到低排序,分數同樣的,按名稱排序,代碼能夠爲:

List<Student> list = students.stream()
        .filter(t->t.getScore()>90)
        .sorted(Comparator.comparing(Student::getScore)
                .reversed()
                .thenComparing(Student::getName))
        .collect(Collectors.toList());

這裏,使用了Comparator的comparing, reversed和thenComparing構建了Comparator。

與distinct同樣,sorted也是一個有狀態的中間操做,在處理過程當中,須要在內部記錄出現過的元素,與distinct不一樣的是,每碰到流中的一個元素,distinct都能當即作出處理,要麼過濾,要麼立刻傳遞給下一個操做,但sorted不能,它須要先排序,爲了排序,它須要先在內部數組中保存碰到的每個元素,到流結尾時,再對數組排序,而後再將排序後的元素逐個傳遞給流水線中的下一個操做。

skip/limit

它們的定義爲:

Stream<T> skip(long n)
Stream<T> limit(long maxSize)

skip跳過流中的n個元素,若是流中元素不足n個,返回一個空流,limit限制流的長度爲maxSize。

好比,將學生列表按照分數排序,返回第3名到第5名,代碼能夠爲:

List<Student> list = students.stream()
        .sorted(Comparator.comparing(
            Student::getScore).reversed())
        .skip(2)
        .limit(3)
        .collect(Collectors.toList());

skip和limit都是有狀態的中間操做。對前n個元素,skip的操做就是過濾,對後面的元素,skip就是傳遞給流水線中的下一個操做。limit的一個特色是,它不須要處理流中的全部元素,只要處理的元素個數達到maxSize,後面的元素就不須要處理了,這種能夠提早結束的操做被稱爲短路操做。

peek

peek的定義爲:

Stream<T> peek(Consumer<? super T> action)

它返回的流與以前的流是同樣的,沒有變化,但它提供了一個Consumer,會將流中的每個元素傳給該Consumer。這個方法的主要目的是支持調試,可使用該方法觀察在流水線中流轉的元素,好比:

List<String> above90Names = students.stream()
        .filter(t->t.getScore()>90)
        .peek(System.out::println)
        .map(Student::getName)
        .collect(Collectors.toList());

mapToLong/mapToInt/mapToDouble

map函數接受的參數是一個Function<T, R>,爲避免裝箱/拆箱,提升性能,Stream還有以下返回基本類型特定流的方法:

DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper)
IntStream mapToInt(ToIntFunction<? super T> mapper)
LongStream mapToLong(ToLongFunction<? super T> mapper)

DoubleStream/IntStream/LongStream是基本類型特定的流,有一些專門的更爲高效的方法。好比,求學生列表的分數總和,代碼能夠爲:

double sum = students.stream()
        .mapToDouble(Student::getScore)
        .sum();

flatMap                

flatMap的定義爲:

<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper)

它接受一個函數mapper,對流中的每個元素,mapper會將該元素轉換爲一個流Stream,而後把新生成流的每個元素傳遞給下一個操做。好比:

List<String> lines = Arrays.asList(new String[]{
        "hello abc",
        "老馬  編程"
    });
List<String> words = lines.stream()
        .flatMap(line -> Arrays.stream(line.split("\\s+")))
        .collect(Collectors.toList());
System.out.println(words);

這裏的mapper將一行字符串按空白符分隔爲了一個單詞流,Arrays.stream能夠將一個數組轉換爲一個流,輸出爲:

[hello, abc, 老馬, 編程]

能夠看出,實際上,flatMap完成了一個1到n的映射。

針對基本類型,flatMap還有以下相似方法:

DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper)
IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper)
LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper)

終端操做

中間操做不觸發實際的執行,返回值是Stream,而終端操做觸發執行,返回一個具體的值,除了collect,Stream API的終端操做還有max, min, count, allMatch, anyMatch, noneMatch, findFirst, findAny, forEach, toArray, reduce等,咱們逐個來看下。

max/min

max/min的定義爲:

Optional<T> max(Comparator<? super T> comparator)
Optional<T> min(Comparator<? super T> comparator)

它們返回流中的最大值/最小值,值的注意的是,它的返回值類型是Optional<T>,而不是T。

java.util.Optional是Java 8引入的一個新類,它是一個泛型容器類,內部只有一個類型爲T的單一變量value,可能爲null,也可能不爲null。Optional有什麼用呢?它用於準確地傳遞程序的語義,它清楚地代表,其表明的值可能爲null,程序員應該進行適當的處理。

Optional定義了一些方法,好比:

// value不爲null時返回true
public boolean isPresent()
// 返回實際的值,若是爲null,拋出異常NoSuchElementException
public T get()
// 若是value不爲null,返回value,不然返回other
public T orElse(T other)
// 構建一個空的Optional,value爲null
public static<T> Optional<T> empty()
// 構建一個非空的Optional, 參數value不能爲null
public static <T> Optional<T> of(T value)
// 構建一個Optional,參數value能夠爲null,也能夠不爲null
public static <T> Optional<T> ofNullable(T value)

在max/min的例子中,經過聲明返回值爲Optional,咱們就知道,具體的返回值不必定存在,這發生在流中不含任何元素的狀況下。

看個簡單的例子,返回分數最高的學生,代碼能夠爲:

Student student = students.stream()
        .max(Comparator.comparing(Student::getScore).reversed())
        .get();

這裏,假定students不爲空。

count

count很簡單,就是返回流中元素的個數。好比,統計大於90分的學生個數,代碼能夠爲:

long above90Count = students.stream()
        .filter(t->t.getScore()>90)
        .count();

allMatch/anyMatch/noneMatch

這幾個函數都接受一個謂詞Predicate,返回一個boolean值,用於斷定流中的元素是否知足必定的條件,它們的區別是:

  • allMatch: 只有在流中全部元素都知足條件的狀況下才返回true
  • anyMatch: 只要流中有一個元素知足條件就返回true
  • noneMatch: 只有流中全部元素都不知足條件才返回true

若是流爲空,這幾個函數的返回值都是true。

好比,判斷是否是全部學生都及格了(不小於60分),代碼能夠爲:

boolean allPass = students.stream()
        .allMatch(t->t.getScore()>=60);

這幾個操做都是短路操做,都不必定須要處理全部元素就能得出結果,好比,對於allMatch,只要有一個元素不知足條件,就能返回false。

findFirst/findAny

它們的定義爲:

Optional<T> findFirst()
Optional<T> findAny()

它們的返回類型都是Optional,若是流爲空,返回Optional.empty()。findFirst返回第一個元素,而findAny返回任一元素,它們都是短路操做。

隨便找一個不及格的學生,代碼能夠爲:

Optional<Student> student = students.stream()
        .filter(t->t.getScore()<60)
        .findAny();
if(student.isPresent()){
    // 不及格的學生....
}

forEach

有兩個foreach方法:

void forEach(Consumer<? super T> action)
void forEachOrdered(Consumer<? super T> action)

它們都接受一個Consumer,對流中的每個元素,傳遞元素給Consumer,區別在於,在並行流中,forEach不保證處理的順序,而forEachOrdered會保證按照流中元素的出現順序進行處理。

好比,逐行打印大於90分的學生,代碼能夠爲:

students.stream()
        .filter(t->t.getScore()>90)
        .forEach(System.out::println);

toArray

toArray將流轉換爲數組,有兩個方法:

Object[] toArray()
<A> A[] toArray(IntFunction<A[]> generator)

不帶參數的toArray返回的數組類型爲Object[],這常常不是指望的結果,若是但願獲得正確類型的數組,須要傳遞一個類型爲IntFunction的generator,IntFunction的定義爲:

public interface IntFunction<R> {
    R apply(int value);
}

generator接受的參數是流的元素個數,它應該返回對應大小的正確類型的數組。

好比,獲取90分以上的學生數組,代碼能夠爲:

Student[] above90Arr = students.stream()
        .filter(t->t.getScore()>90)
        .toArray(Student[]::new);

Student[]::new就是一個類型爲IntFunction<Student[]>的generator。

reduce

reduce表明歸約或者叫摺疊,它是max/min/count的更爲通用的函數,將流中的元素歸約爲一個值,有三個reduce函數: 

Optional<T> reduce(BinaryOperator<T> accumulator);
T reduce(T identity, BinaryOperator<T> accumulator);
<U> U reduce(U identity,
    BiFunction<U, ? super T, U> accumulator,
    BinaryOperator<U> combiner); 

第一個基本等同於調用:

boolean foundAny = false;
T result = null;
for (T element : this stream) {
    if (!foundAny) {
        foundAny = true;
        result = element;
    }
    else
        result = accumulator.apply(result, element);
}
return foundAny ? Optional.of(result) : Optional.empty();

好比,使用reduce求分數最高的學生,代碼能夠爲:

Student topStudent = students.stream().reduce((accu, t) -> {
    if (accu.getScore() >= t.getScore()) {
        return accu;
    } else {
        return t;
    }
}).get();

第二個reduce函數多了一個identity參數,表示初始值,它基本等同於調用:

T result = identity;
for (T element : this stream)
    result = accumulator.apply(result, element)
return result;

第一個和第二個reduce的返回類型只能是流中元素的類型,而第三個更爲通用,它的歸約類型能夠自定義,另外,它多了一個combiner參數,combiner用在並行流中,用於合併子線程的結果,對於順序流,它基本等同於調用:

U result = identity;
for (T element : this stream)
    result = accumulator.apply(result, element)
return result;

注意與第二個reduce函數相區分,它的結果類型不是T,而是U。好比,使用reduce函數計算學生分數的和,代碼能夠爲:

double sumScore = students.stream().reduce(0d,
        (sum, t) -> sum += t.getScore(),
        (sum1, sum2) -> sum1 += sum2
    );

以上,能夠看出,reduce雖然更爲通用,但比較費解,難以使用,通常狀況,應該優先使用其餘函數。

collect函數比reduce更爲通用、強大和易用,關於它,咱們下節再詳細介紹。

構建流

前面咱們提到,能夠經過Collection接口的stream/parallelStream獲取流,還有一些其餘的方式能夠獲取流。

Arrays有一些stream方法,能夠將數組或子數組轉換爲流,好比: 

public static IntStream stream(int[] array)
public static DoubleStream stream(double[] array, int startInclusive, int endExclusive)
public static <T> Stream<T> stream(T[] array)

好比,輸出當前目錄下全部普通文件的名字,代碼能夠爲:

File[] files = new File(".").listFiles();
Arrays.stream(files)
    .filter(File::isFile)    
    .map(File::getName)
    .forEach(System.out::println);

Stream也有一些靜態方法,能夠構建流:

//返回一個空流
public static<T> Stream<T> empty()
//返回只包含一個元素t的流
public static<T> Stream<T> of(T t)
//返回包含多個元素values的流
public static<T> Stream<T> of(T... values)
//經過Supplier生成流,流的元素個數是無限的
public static<T> Stream<T> generate(Supplier<T> s)
//一樣生成無限流,第一個元素爲seed,第二個爲f(seed),第三個爲f(f(seed)),依次類推
public static<T> Stream<T> iterate(final T seed, final UnaryOperator<T> f)

好比,輸出10個隨機數,代碼能夠爲:

Stream.generate(()->Math.random())
    .limit(10)
    .forEach(System.out::println);

輸出100個遞增的奇數,代碼能夠爲:

Stream.iterate(1, t->t+2)
    .limit(100)
    .forEach(System.out::println);

並行流

前面咱們主要使用的是Collection的stream()方法,換作parallelStream()方法,就會使用並行流,接口方法都是通用的。但並行流內部會使用多線程,線程個數通常與系統的CPU核數同樣,以充分利用CPU的計算能力。

進一步來講,並行流內部會使用Java 7引入的fork/join框架,簡單來講,處理由fork和join兩個階段組成,fork就是將要處理的數據拆分爲小塊,多線程按小塊進行並行計算,join就是將小塊的計算結果進行合併,具體咱們就不探討了。使用並行流,不須要任何線程管理的代碼,就能實現並行。

函數式數據處理思惟

看的出來,使用Stream API處理數據集合,與直接使用容器類API處理數據的思路是徹底不同的。

流定義了不少數據處理的基本函數,對於一個具體的數據處理問題,解決的主要思路就是組合利用這些基本函數,實現指望的功能,這種思路就是函數式數據處理思惟,相比直接利用容器類API的命令式思惟,思考的層次更高。

Stream API的這種思路也不是新發明,它與數據庫查詢語言SQL是很像的,都是聲明式地操做集合數據,不少函數都能在SQL中找到對應,好比filter對應SQL的where,sorted對應order by等。SQL通常都支持分組(group by)功能,Stream API也支持,但關於分組,咱們下節再介紹。

Stream API也與各類基於Unix系統的管道命令相似,熟悉Unix系統的都知道,Unix有不少命令,大部分命令只是專一於完成一件事情,但能夠經過管道的方式將多個命令連接起來,完成一些複雜的功能,好比:

cat nginx_access.log | awk '{print $1}' | sort | uniq -c | sort -rnk 1 | head -n 20

以上命令能夠分析nginx訪問日誌,統計出訪問次數最多的前20個IP地址及其訪問次數。具體來講,cat命令輸出nginx訪問日誌到流,一行爲一個元素,awk輸出行的第一列,這裏爲IP地址,sort按IP進行排序,"uniq -c"按IP統計計數,"sort -rnk 1"按計數從高到低排序,"head -n 20"輸出前20行。

小結

本節初步介紹了Java 8引入的函數式數據處理類庫,Stream API,它相似於Unix的管道命令,也相似於數據庫查詢語言SQL,經過組合利用基本函數,能夠在更高的層次上思考問題,以聲明式的方式簡潔地實現指望的功能。

對於collect方法,本節只是演示了最基本的應用,它還有不少高級功能,好比實現相似SQL的group by功能,具體怎麼實現?實現的原理是什麼呢?

(與其餘章節同樣,本節全部代碼位於 https://github.com/swiftma/program-logic,位於包shuo.laoma.java8.c92下)

----------------

未完待續,查看最新文章,敬請關注微信公衆號「老馬說編程」(掃描下方二維碼),從入門到高級,深刻淺出,老馬和你一塊兒探索Java編程及計算機技術的本質。用心原創,保留全部版權。

相關文章
相關標籤/搜索