探索Java9 模塊系統和反應流

Java9 新特性 ,Java 模塊化,Java 反應流 Reactive,Jigsawreact

模塊系統

Java平臺模塊系統(JPMS)是Java9中的特性,它是Jigsaw項目的產物。簡而言之,它以更簡單和易於維護的方式來組織包和類型。編程

直到Java8,系統仍面臨與類型系統相關的兩個問題:api

1.全部的組件(大可能是Jar包)都處在classpath中,沒有任何的顯式依賴申明。諸如Maven之類的構建工具能夠在開發過程當中幫助組織這些構件。然而,在運行時卻沒有這樣的支持工具。你最終可能會遇到calsspath中缺乏某個類,或者更嚴重的是存在同個類的兩個版本,向這樣的錯誤很難診斷。架構

2.在API級別上不支持封裝。全部的public的類在整個應用中均可以訪問,經管事實上這些類只是想供一部分其餘類調用。另外一方面,私有的類和私有的成員也不是私有的,由於你可使用反射來繞過訪問限制。app

這些就是Java 模塊系統要應對的地方。Oralce的Java平臺首席架構師Mark Reinhold描述了Java模塊系統的目標:異步

1.可靠的配置 - 用程序組件相互聲明顯式依賴的方法替換脆弱,容易出錯的類路徑機制。ide

2.強大的封裝 - 容許組件聲明其中哪些公共類型可供其餘組件訪問,哪些不能夠。模塊化

Java9 容許你使用模塊描述符來定義模塊。函數

模塊描述符

模塊描述符是模塊系統的核心,模塊描述的聲明是在模塊目錄層次結構的根目錄中名爲module-info.java的文件中指定的。

模塊描述的聲明是以module關鍵字開始的,其後緊跟的是模塊的名字。聲明結束標記是一對大括號,裏邊包含零個或者多個模塊。你能夠像這樣聲明一個空模塊:

module com.stackify { }

在模塊聲明中你能夠列出的指令有這些:

  • require 表示它所依賴的模塊,也稱做dependency。
  • transitive 僅與require指令一塊兒使用,代表指明的依賴項也能夠供此模塊的依賴項訪問。
  • exports 聲明一個能夠被其餘模塊訪問的包
  • opens 在運行時曝光一個包,供反射API自省。
  • uses 指定此模塊消費的服務的全限定名。
  • provides with – denotes an implementation, specified by the with keyword, for a service, indicated by provides

模塊化應用程序示例

  • dist
  • src
    • client
      • com
        • stackify
          • client
            • Main.java
      • module-info.java
    • impl
      • com
        • stackify
          • impl
            • AccessImpl.java
      • module-info.java
    • model
      • com
        • stackify
          • model
            • Person.java
      • module-info.java
    • service
      • com
        • stackify
          • service
            • AccessService.java
      • module-info.java

示例應用程序有四個模塊組成——model,service,impl,client。在實際項目中應該使用反向域名模式對模塊命名,以免名稱衝突。本文例子中使用了簡單的名稱,便於掌握。每一個模塊的代碼都在src根目錄中,編譯以後的文件會放在dist中。

咱們先從model模塊開始,這個模塊只有一個包,包裏邊有一個class。

package com.stackify.model;

public class Person {
    private int id;
    private String name;

    public Person(int id, String name) {
        this.id = id;
        this.name = name;
    }
}

模塊的聲明是這樣的:

module model {
    exports com.stackify.model;
    opens com.stackify.model;
}

這個模塊導出了com.stackify.model包,併爲次包開啓了內省。

service模塊中定義了一個接口:

package com.stackify.service;

import com.stackify.model.Person;

public interface AccessService {
    public String getName(Person person);
}

鑑於service模塊使用了com.stackify.model包,因此它必需要依賴model模塊,配置以下:

module service {
    requires transitive model;
    exports com.stackify.service;
}

注意聲明中的 transitive 關鍵字,這個關鍵字的存在代表依賴了service模塊的全部模塊都自動得到了model模塊的訪問許可。爲了使AccessService能被其餘模塊訪問,你必須使用exports處處它所在的包。

impl模塊爲訪問服務提供了一個實現:

package com.stackify.impl;

import com.stackify.service.AccessService;
import com.stackify.model.Person;
import java.lang.reflect.Field;

public class AccessImpl implements AccessService {
    public String getName(Person person) {
        try {
            return extract(person);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private String extract(Person person) throws Exception {
        Field field = person.getClass().getDeclaredField("name");
        field.setAccessible(true);
        return (String) field.get(person);
    }
}

因爲model模塊中的opens聲明使得AccessImpl能夠反射Person類。impl模塊的模塊聲明以下:

module impl {
    requires service;
    provides com.stackify.service.AccessService with com.stackify.impl.AccessImpl;
}

service模塊中對model模塊的導入是transitive的,所以,impl模塊只須要引用service模塊就能夠得到這兩個模塊的訪問了。(service,model)

provides 聲明代表impl模塊爲AccessService接口提供了實現,這個實現是AccessImpl類。

client模塊消費這個AccessService服務,須要進行以下聲明:

module client {
    requires service;
    uses com.stackify.service.AccessService;
}

client使用這個服務的示例以下:

package com.stackify.client;

import com.stackify.service.AccessService;
import com.stackify.model.Person;
import java.util.ServiceLoader;

public class Main {
    
    public static void main(String[] args) throws Exception {
        AccessService service = ServiceLoader.load(AccessService.class).findFirst().get();
        Person person = new Person(1, "John Doe");
        String name = service.getName(person);
        assert name.equals("John Doe");
    }
}

你能夠看到main函數中並無使用AccessImpl實現類,事實上,模塊系統在運行時基於模塊定義中的users,provides...指令自動定位到了AccessService類的具體實現。

編譯和執行

本小節介紹編譯和執行剛纔看到的模塊化應用程序的步驟。 請注意,您必須按順序運行項目根目錄中的全部命令(src的父目錄),而後顯示出來。

編譯model模塊,並將生成的類文件放入dist目錄中的命令爲:

javac -d dist/model src/model/module-info.java src/model/com/stackify/model/Person.java

鑑於service模塊依賴model模塊,當你編譯service模塊的時候,你須要使用-p指令指定依賴的模塊的路徑。

javac -d dist/service -p dist src/service/module-info.java src/service/com/stackify/service/AccessService.java

一樣的,下面的命令展現瞭如何編譯impl和client模塊:

javac -d dist/impl -p dist src/impl/module-info.java src/impl/com/stackify/impl/AccessImpl.java 
javac -d dist/client -p dist src/client/module-info.java src/client/com/stackify/client/Main.java

Main類中使用了斷言聲明,因而,你在執行Main程序的時候,須要啓用斷言:

java -ea -p dist -m client/com.stackify.client.Main

注意,你需在Main 類以前加上模塊名稱,而後傳遞給 - m 選項。

向後兼容

在java9以前,全部包的聲明都不存在「模塊」的理念。可是,這並不會妨礙你將這些包部署在新的模塊化系統上。你只須要將它添加在類路徑中,就像你在java8中作的那樣,這個包會成爲「未命名」模塊的一部分。

「未命名」模塊會讀取其餘全部模塊,不管這些模塊處在classpath中仍是模塊路徑中。因而乎,在java8上編譯運行的程序也能夠同樣的在java9上嘚瑟。不過,具備顯式聲明的模塊沒法訪問「未命名」模塊,這裏你須要另一個模塊了——自動模塊。

你能夠將祖傳的不具有模塊聲明的老jar包轉換成自動模塊,方法是將其放入模塊路徑中。這將定義一個名稱來源於jar文件名的模塊。這樣的一個自動模塊能夠訪問模塊路徑中的其餘全部模塊,並公開本身的包。從而實現了包之間的無縫互操做,不管有沒有明確的模塊。

反應流

反應流是一種編程範例——容許以背壓的非阻塞的方式處理異步數據流。實質上,這種機制將接收器置於控制之下,使其可以肯定要傳輸的數據量,而沒必要在每次請求以後等待響應。

Java平臺將反應流做爲Java9的一部分集成了進來。該集成容許您以標準方式利用Reactive Streams,從而各類實現能夠協同工做。

Flow類

Java api將反應流的接口封裝在Flow類中——包括,Publisher,Subscriber,Subscription和Processor。

Publisher提供了條目和相關的控制信息。這個接口只定義了一個方法,即subscribe方法,這個方法添加了一個subscriber(訂閱者),改訂閱者監聽數據和發佈者傳輸的數據。

Subscriber 從一個Publisher接收數據,這個接口定義了四個方法:

  • onSubscribe
  • onNext
  • onError
  • onComplete

Subscription 用來控制發佈者,訂閱者之間的通訊。這個接口定義了兩個方法:request和cancel。request方法請求發佈者發佈特定數量的條目,cancel會致使訂閱者取消訂閱。

有時候,你可能但願在數據條目從發佈者傳輸到訂閱者時對其進行操做。這個時候你可使用Processor。這個接口拓展了Subscriber和Publisher,使其能夠從發佈者的角度充當發佈者,從訂閱者的角度充當訂閱者。

內部實現

Java平臺爲Publisher和Subscription提供了開箱即用的實現。Publisher接口的實現類是SubmissionPublisher.除了Publisher接口中定義的方法,這個類還具備其餘方法,包括:

  • submit 發佈一個條目給每一個subscriber
  • close 給每個subscriber發送一個onComplete信號,並禁止後續的訂閱

Subscription的實現類是一個私有的類,意圖僅供SubmissionPublisher使用。當你調用SubmissionPublisher 的帶有Subscriber 參數的subscribe方法時,一個Subscription對象被建立並傳遞給那個subscriber的onSubscribe 方法。

一個簡單的應用

有了PublisherSubscription 的現成實現,你僅須要聲明一個Subscriber接口的實現類,就能夠建立一個反應流的應用。以下,這個類須要String類型的消息:

public class StringSubscriber implements Subscriber<String> {
    private Subscription subscription;
    private StringBuilder buffer;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.buffer = new StringBuilder();
        subscription.request(1);
    }

    public String getData() {
        return buffer.toString();
    }

    // other methods
}

如你所見,StringSubscriber 將當訂閱一個發佈者時獲得的Subscription 存儲在其私有的成員變量中。同時,它使用了一個buffer成員來存儲它接收的String消息 。你能夠經過getData()方法來取回buffer數據。onSubscribe 方法請求了發佈者發出單個數據條目。

onNext 方法的定義以下:

@Override
public void onNext(String item) {
    buffer.append(item);
    if (buffer.length() < 5) {
        subscription.request(1);
        return;
    }
    subscription.cancel();
}

這個方法接收Publisher發佈的新消息,併疊加在前一消息後面,當接收到5條消息以後,訂閱者便中止了接收。

並不重要的 onErroronComplete 兩個方法的實現以下:

@Override
public void onError(Throwable throwable) {
    throwable.printStackTrace();
}

@Override
public void onComplete() {
    System.out.println("Data transfer is complete");
}

這個Test驗證了咱們的實現:

@Test
public void whenTransferingDataDirectly_thenGettingString() throws Exception {
    StringSubscriber subscriber = new StringSubscriber();
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    publisher.subscribe(subscriber);

    String[] data = { "0", "1", "2", "3", "4", "5", "6", "7", "8", "9" };
    Arrays.stream(data).forEach(publisher::submit);

    Thread.sleep(100);
    publisher.close();

    assertEquals("01234", subscriber.getData());
}

在上面的test方法中,sleep方法啥都沒幹,只是等着異步的數據傳輸完成。

應用Processor

咱們來添加Processor來讓這個簡單應用變得複雜一點,這個processor將發佈的String轉換成Integer,若是轉換失敗就拋出一個異常。轉換以後,processor將這個結果number轉發給subscriber。Subscriber的代碼實現以下:

public class NumberSubscriber implements Subscriber<Integer> {
    private Subscription subscription;
    private int sum;
    private int remaining;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
        remaining = 1;
    }

    public int getData() {
        return sum;
    }

    @Override
    public void onNext(Integer item) {
     sum += item;
     if (--remaining == 0) {
           subscription.request(3);
            remaining = 3;
        }
    }   
}

代碼和以前的Subscriber相似,不作解釋。Processor 的實現以下:

public class StringToNumberProcessor extends SubmissionPublisher<Integer> implements Subscriber<String> {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    // other methods
}

本例中,這個processor繼承了SubmissionPublisher,所以Subscriber接口的抽象方法。其餘的幾個方法是:

@Override
public void onNext(String item) {
    try {
        submit(Integer.parseInt(item));
    } catch (NumberFormatException e) {
        closeExceptionally(e);
        subscription.cancel();
        return;
    }
    subscription.request(1);
}

@Override
public void onError(Throwable throwable) {
    closeExceptionally(throwable);
}

@Override
public void onComplete() {
    System.out.println("Data conversion is complete");
    close();
}

注意,當publisher關閉的時候,processor也須要關閉,向subscriber發佈onComplete信號。一樣的,當錯誤發生時——不管是processor仍是publisher——processor自己應該通知subscriber這個錯誤。

你能夠經過調用closecloseExceptionally 方法來實現通知流。

測試用例以下:

@Test
public void whenProcessingDataMidway_thenGettingNumber() throws Exception {
    NumberSubscriber subscriber = new NumberSubscriber();
    StringToNumberProcessor processor = new StringToNumberProcessor();
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();

    processor.subscribe(subscriber);
    publisher.subscribe(processor);

    String[] data = { "0", "1", "2", "3", "4", "5", "6", "7", "8", "9" };
    Arrays.stream(data).forEach(publisher::submit);

    Thread.sleep(100);
    publisher.close();

    assertEquals(45, subscriber.getData());
}

API使用

經過上面的應用,你能夠更好的理解反應流。可是,它們決不是用於從頭開始構建反應式程序的指南。實現一個反應流規範並不容易,由於它要解決的問題一點也不簡單。你應該利用有效的庫(例如RxJava或者Project Reactor)來編寫高效的應用。

未來,當許多Reactive庫支持Java 9時,甚至能夠組合來自不一樣工具的各類實現,以充分利用API。

總結

這篇文章涉及了Java9中的兩個核心技術——模塊系統和反應流。

模塊化系統是新的特性,預計不久就會被普遍使用。然而,整個java世界走向模塊化系統是必然的,你應該爲此作好準備。

反應流已經存在了一段時間了,Java 9的推出有助於標準化範例,這可能會加速它的運用。

原文地址:https://stackify.com/exploring-java-9-module-system-and-reactive-streams/

相關文章
相關標籤/搜索