Java 9 Reactive Streams容許咱們實現非阻塞異步流處理。這是將響應式編程模型應用於核心java編程的重要一步。java
若是您對響應式編程不熟悉,請閱讀Reactive Manifesto並閱讀Reactive Streams的簡短說明。RxJava和Akka Streams一直是十分優秀的響應流實現庫。如今java 9已經經過java.util.concurrent.Flow
API 引入了響應流支持。react
Reactive Streams是關於流的異步處理,所以應該有一個發佈者(Publisher)和一個訂閱者(Subscriber)。發佈者發佈數據流,訂閱者使用數據。git
有時咱們必須在Publisher和Subscriber之間轉換數據。處理器(Processor)是位於最終發佈者和訂閱者之間的實體,用於轉換從發佈者接收的數據,以便訂閱者能理解它。咱們能夠擁有一系列(chain )處理器。github
從上面的圖中能夠清楚地看出,Processor既能夠做爲訂閱者也能夠做爲發佈者。數據庫
Java 9 Flow API實現了Reactive Streams規範。Flow API是Iterator和Observer模式的組合。Iterator在pull
模型上工做,用於應用程序從源中拉取項目;而Observer在push
模型上工做,並在item
從源推送到應用程序時做出反應。編程
Java 9 Flow API訂閱者能夠在訂閱發佈者時請求N個項目。而後將項目從發佈者推送到訂閱者,直到推送玩全部項目或遇到某些錯誤。
app
讓咱們快速瀏覽一下Flow API類和接口。框架
java.util.concurrent.Flow
:這是Flow API的主要類。該類封裝了Flow API的全部重要接口。這是一個final類,咱們不能擴展它。java.util.concurrent.Flow.Publisher
:這是一個功能接口,每一個發佈者都必須實現它的subscribe
方法,並添加相關的訂閱者以接收消息。java.util.concurrent.Flow.Subscriber
:每一個訂閱者都必須實現此接口。訂閱者中的方法以嚴格的順序進行調用。此接口有四種方法:
onSubscribe
:這是訂閱者訂閱了發佈者後接收消息時調用的第一個方法。一般咱們調用subscription.request
開始從處理器(Processor)接收項目。onNext
:當從發佈者收到項目時調用此方法,這是咱們實現業務邏輯以處理流,而後從發佈者請求更多數據的方法。onError
:當發生不可恢復的錯誤時調用此方法,咱們能夠在此方法中執行清理操做,例如關閉數據庫鏈接。onComplete
:這就像finally
方法,而且在發佈者沒有發佈其餘項目或發佈者關閉時調用。咱們能夠用它來發送流成功處理的通知。java.util.concurrent.Flow.Subscription
:這用於在發佈者和訂閱者之間建立異步非阻塞連接。訂閱者調用其request
方法來向發佈者請求項目。它還有cancel
取消訂閱的方法,即關閉發佈者和訂閱者之間的連接。java.util.concurrent.Flow.Processor
:此接口同時擴展了Publisher
和Subscriber
接口,用於在發佈者和訂閱者之間轉換消息。java.util.concurrent.SubmissionPublisher
:一個Publisher實現,它將提交的項目異步發送給當前訂閱者,直到它關閉爲止。它使用Executor框架,咱們將在響應流示例中使用該類來添加訂閱者,而後向其提交項目。讓咱們從一個簡單的例子開始,咱們將實現Flow API Subscriber接口並使用SubmissionPublisher來建立發佈者和發送消息。異步
假設咱們有一個Employee類,用於建立從發佈者發送到訂閱者的流消息。ide
package com.journaldev.reactive.beans; public class Employee { private int id; private String name; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Employee(int i, String s) { this.id = i; this.name = s; } public Employee() { } @Override public String toString() { return "[id="+id+",name="+name+"]"; } }
咱們還有一個實用程序類來爲咱們的示例建立一個員工列表。
package com.journaldev.reactive_streams; import java.util.ArrayList; import java.util.List; import com.journaldev.reactive.beans.Employee; public class EmpHelper { public static List<Employee> getEmps() { Employee e1 = new Employee(1, "Pankaj"); Employee e2 = new Employee(2, "David"); Employee e3 = new Employee(3, "Lisa"); Employee e4 = new Employee(4, "Ram"); Employee e5 = new Employee(5, "Anupam"); List<Employee> emps = new ArrayList<>(); emps.add(e1); emps.add(e2); emps.add(e3); emps.add(e4); emps.add(e5); return emps; } }
package com.journaldev.reactive_streams; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; import com.journaldev.reactive.beans.Employee; public class MySubscriber implements Subscriber<Employee> { private Subscription subscription; private int counter = 0; @Override public void onSubscribe(Subscription subscription) { System.out.println("Subscribed"); this.subscription = subscription; this.subscription.request(1); //requesting data from publisher System.out.println("onSubscribe requested 1 item"); } @Override public void onNext(Employee item) { System.out.println("Processing Employee "+item); counter++; this.subscription.request(1); } @Override public void onError(Throwable e) { System.out.println("Some error happened"); e.printStackTrace(); } @Override public void onComplete() { System.out.println("All Processing Done"); } public int getCounter() { return counter; } }
Subscription
變量以保持引用,以即可以在onNext
方法中進行請求。counter
變量以記錄處理的項目數,請注意它的值在onNext方法中增長。這將在咱們的main方法中用於在結束主線程以前等待執行完成。onSubscribe
方法中調用訂閱請求以開始處理。另請注意,onNext
在處理項目後再次調用方法,要求對下一個從發佈者發佈的項目進行處理。onError
和onComplete
在例子中沒有太多邏輯,但在實際狀況中,它們應該用於在發生錯誤時執行糾正措施或在處理成功完成時清理資源。 咱們將使用SubmissionPublisherPublisher
做爲示例,讓咱們看一下響應流實現的測試程序。
package com.journaldev.reactive_streams; import java.util.List; import java.util.concurrent.SubmissionPublisher; import com.journaldev.reactive.beans.Employee; public class MyReactiveApp { public static void main(String args[]) throws InterruptedException { // Create Publisher SubmissionPublisher<Employee> publisher = new SubmissionPublisher<>(); // Register Subscriber MySubscriber subs = new MySubscriber(); publisher.subscribe(subs); List<Employee> emps = EmpHelper.getEmps(); // Publish items System.out.println("Publishing Items to Subscriber"); emps.stream().forEach(i -> publisher.submit(i)); // logic to wait till processing of all messages are over while (emps.size() != subs.getCounter()) { Thread.sleep(10); } // close the Publisher publisher.close(); System.out.println("Exiting the app"); } }
在上述代碼中,最重要的部分是發佈者subscribe
和submit
方法的調用。咱們應該始終關閉發佈者以免任何內存泄漏。
執行上述程序時,咱們將獲得如下輸出。
Subscribed Publishing Items to Subscriber onSubscribe requested 1 item Processing Employee [id=1,name=Pankaj] Processing Employee [id=2,name=David] Processing Employee [id=3,name=Lisa] Processing Employee [id=4,name=Ram] Processing Employee [id=5,name=Anupam] Exiting the app All Processing Done
請注意,若是咱們在處理全部項目以前,主線程已經退出了,那麼咱們將獲得不想要的結果。
處理器用於在發佈者和訂閱者之間轉換消息。假設咱們有另外一個用戶但願處理不一樣類型的消息。假設這個新的消息類型是Freelancer
。
package com.journaldev.reactive.beans; public class Freelancer extends Employee { private int fid; public int getFid() { return fid; } public void setFid(int fid) { this.fid = fid; } public Freelancer(int id, int fid, String name) { super(id, name); this.fid = fid; } @Override public String toString() { return "[id="+super.getId()+",name="+super.getName()+",fid="+fid+"]"; } }
咱們有一個新訂閱者使用Freelancer流數據。
package com.journaldev.reactive_streams; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; import com.journaldev.reactive.beans.Freelancer; public class MyFreelancerSubscriber implements Subscriber<Freelancer> { private Subscription subscription; private int counter = 0; @Override public void onSubscribe(Subscription subscription) { System.out.println("Subscribed for Freelancer"); this.subscription = subscription; this.subscription.request(1); //requesting data from publisher System.out.println("onSubscribe requested 1 item for Freelancer"); } @Override public void onNext(Freelancer item) { System.out.println("Processing Freelancer "+item); counter++; this.subscription.request(1); } @Override public void onError(Throwable e) { System.out.println("Some error happened in MyFreelancerSubscriber"); e.printStackTrace(); } @Override public void onComplete() { System.out.println("All Processing Done for MyFreelancerSubscriber"); } public int getCounter() { return counter; } }
代碼重要的部分是實現Processor
接口。因爲咱們想要使用它SubmissionPublisher
,咱們會擴展它並在適合的地方使用它。
package com.journaldev.reactive_streams; import java.util.concurrent.Flow.Processor; import java.util.concurrent.Flow.Subscription; import java.util.concurrent.SubmissionPublisher; import java.util.function.Function; import com.journaldev.reactive.beans.Employee; import com.journaldev.reactive.beans.Freelancer; public class MyProcessor extends SubmissionPublisher<Freelancer> implements Processor<Employee, Freelancer> { private Subscription subscription; private Function<Employee,Freelancer> function; public MyProcessor(Function<Employee,Freelancer> function) { super(); this.function = function; } @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; subscription.request(1); } @Override public void onNext(Employee emp) { submit((Freelancer) function.apply(emp)); subscription.request(1); } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onComplete() { System.out.println("Done"); } }
Function
將用於將Employee對象轉換爲Freelancer對象。onNext
方法中的Freelancer消息,而後使用SubmissionPublisher
submit
方法將其發送給訂閱者。package com.journaldev.reactive_streams; import java.util.List; import java.util.concurrent.SubmissionPublisher; import com.journaldev.reactive.beans.Employee; import com.journaldev.reactive.beans.Freelancer; public class MyReactiveAppWithProcessor { public static void main(String[] args) throws InterruptedException { // Create End Publisher SubmissionPublisher<Employee> publisher = new SubmissionPublisher<>(); // Create Processor MyProcessor transformProcessor = new MyProcessor(s -> { return new Freelancer(s.getId(), s.getId() + 100, s.getName()); }); //Create End Subscriber MyFreelancerSubscriber subs = new MyFreelancerSubscriber(); //Create chain of publisher, processor and subscriber publisher.subscribe(transformProcessor); // publisher to processor transformProcessor.subscribe(subs); // processor to subscriber List<Employee> emps = EmpHelper.getEmps(); // Publish items System.out.println("Publishing Items to Subscriber"); emps.stream().forEach(i -> publisher.submit(i)); // Logic to wait for messages processing to finish while (emps.size() != subs.getCounter()) { Thread.sleep(10); } // Closing publishers publisher.close(); transformProcessor.close(); System.out.println("Exiting the app"); } }
閱讀程序中的註釋以正確理解它,最重要的變化是發佈者 - 處理器 - 訂閱者鏈的建立。執行上述程序時,咱們將獲得如下輸出。
Subscribed for Freelancer Publishing Items to Subscriber onSubscribe requested 1 item for Freelancer Processing Freelancer [id=1,name=Pankaj,fid=101] Processing Freelancer [id=2,name=David,fid=102] Processing Freelancer [id=3,name=Lisa,fid=103] Processing Freelancer [id=4,name=Ram,fid=104] Processing Freelancer [id=5,name=Anupam,fid=105] Exiting the app All Processing Done for MyFreelancerSubscriber Done
咱們能夠使用Subscription cancel
方法中止在訂閱者中接收消息。
請注意,若是咱們取消訂閱,則訂閱者將不會收到
onComplete
或onError
信號。
如下是一個示例代碼,其中訂閱者只消費3條消息,而後取消訂閱。
@Override public void onNext(Employee item) { System.out.println("Processing Employee "+item); counter++; if(counter==3) { this.subscription.cancel(); return; } this.subscription.request(1); }
請注意,在這種狀況下,咱們在處理全部消息以前中止主線程的邏輯將進入無限循環。咱們能夠爲此場景添加一些額外的邏輯,若是訂閱者已中止處理或取消訂閱,就使用一些全局變量來標誌該狀態。
當發佈者以比訂閱者消費更快的速度生成消息時,會產生背壓。Flow API不提供任何關於背壓或處理它的信號的機制。但咱們能夠設計本身的策略來處理它,例如微調用戶或下降信息產生率。您能夠閱讀RxJava deals with Back Pressure。
Java 9 Flow API是響應式編程和建立異步非阻塞應用程序的良好舉措。可是,只有在全部系統API都支持它時,才能建立真正的響應式應用程序。
原文地址:Java 9 Reactive Streams written by Pankaj
完整代碼:Github