從3.0 版本開始,MongoDB 開始提供異步方式的驅動(Java Async Driver),這爲應用提供了一種更高性能的選擇。
但實質上,使用同步驅動(Java Sync Driver)的項目也不在少數,或許是由於先入爲主的緣由(同步Driver的文檔說明更加的完善),又或者是爲了兼容舊的 MongoDB 版本。
不管如何,因爲 Reactive 的發展,將來使用異步驅動應該是一個趨勢。php
在使用 Async Driver 以前,須要對 Reactive 的概念有一些熟悉。html
響應式(Reactive)是一種異步的、面向數據流的開發方式,最先是來自於.NET 平臺上的 Reactive Extensions 庫,隨後被擴展爲各類編程語言的實現。
在著名的 Reactive Manifesto(響應式宣言) 中,對 Reactive 定義了四個特徵:java
在響應式宣言的所定義的這些系統特徵中,無一不與響應式的流有若干的關係,因而乎就有了 2013年發起的 響應式流規範(Reactive Stream Specification)。react
https://www.reactive-streams.org/git
其中,對於響應式流的處理環節又作了以下定義:github
Java 平臺則是在 JDK 9 版本上發佈了對 Reactive Streams 的支持。mongodb
下面介紹響應式流的幾個關鍵接口:數據庫
當訂閱成功後,可使用 Subscription 的 request(long n) 方法來請求發佈者發佈 n 條數據。發佈者可能產生3種不一樣的消息通知,分別對應 Subscriber 的另外3個回調方法。編程
數據通知:對應 onNext 方法,表示發佈者產生的數據。
錯誤通知:對應 onError 方法,表示發佈者產生了錯誤。
結束通知:對應 onComplete 方法,表示發佈者已經完成了全部數據的發佈。
在上述3種通知中,錯誤通知和結束通知都是終結通知,也就是在終結通知以後,不會再有其餘通知產生。緩存
這幾個接口的關係以下圖所示:
圖片出處:http://wiki.jikexueyuan.com/index.php/project/reactor-2.0/05.html
MongoDB 的異步驅動爲 mongo-java-driver-reactivestreams 組件,其實現了 Reactive Stream 的上述接口。
> 除了 reactivestream 以外,MongoDB 的異步驅動還包含 RxJava 等風格的版本,有興趣的讀者能夠進一步瞭解
http://mongodb.github.io/mongo-java-driver-reactivestreams/1.11/getting-started/quick-tour-primer/
接下來,經過一個簡單的例子來演示一下 Reactive 方式的代碼風格:
org.mongodb mongodb-driver-reactivestreams 1.11.0
> 引入mongodb-driver-reactivestreams 將會自動添加 reactive-streams, bson, mongodb-driver-async組件
//服務器實例表 List servers = new ArrayList(); servers.add(new ServerAddress("localhost", 27018)); //配置構建器 MongoClientSettings.Builder settingsBuilder = MongoClientSettings.builder(); //傳入服務器實例 settingsBuilder.applyToClusterSettings( builder -> builder.hosts(servers)); //構建 Client 實例 MongoClient mongoClient = MongoClients.create(settingsBuilder.build());
//得到數據庫對象 MongoDatabase database = client.getDatabase(databaseName); //得到集合 MongoCollection collection = database.getCollection(collectionName); //異步返回Publisher FindPublisher publisher = collection.find(); //訂閱實現 publisher.subscribe(new Subscriber() { @Override public void onSubscribe(Subscription s) { System.out.println("start..."); //執行請求 s.request(Integer.MAX_VALUE); } @Override public void onNext(Document document) { //得到文檔 System.out.println("Document:" + document.toJson()); } @Override public void onError(Throwable t) { System.out.println("error occurs."); } @Override public void onComplete() { System.out.println("finished."); } });
注意到,與使用同步驅動不一樣的是,collection.find()方法返回的不是 Cursor,而是一個 FindPublisher對象,這是Publisher接口的一層擴展。
並且,在返回 Publisher 對象時,此時並無產生真正的數據庫IO請求。 真正發起請求須要經過調用 Subscription.request()方法。
在上面的代碼中,爲了讀取由 Publisher 產生的結果,經過自定義一個Subscriber,在onSubscribe 事件觸發時就執行 數據庫的請求,以後分別對 onNext、onError、onComplete進行處理。
儘管這種實現方式是純異步的,但在使用上比較繁瑣。試想若是對於每一個數據庫操做都要完成一個Subscriber 邏輯,那麼開發的工做量是巨大的。
爲了儘量複用重複的邏輯,能夠對Subscriber的邏輯作一層封裝,包含以下功能:
代碼以下:
public class ObservableSubscriber implements Subscriber { //響應數據 private final List received; //錯誤信息 private final List errors; //等待對象 private final CountDownLatch latch; //訂閱器 private volatile Subscription subscription; //是否完成 private volatile boolean completed; public ObservableSubscriber() { this.received = new ArrayList(); this.errors = new ArrayList(); this.latch = new CountDownLatch(1); } @Override public void onSubscribe(final Subscription s) { subscription = s; } @Override public void onNext(final T t) { received.add(t); } @Override public void onError(final Throwable t) { errors.add(t); onComplete(); } @Override public void onComplete() { completed = true; latch.countDown(); } public Subscription getSubscription() { return subscription; } public List getReceived() { return received; } public Throwable getError() { if (errors.size() > 0) { return errors.get(0); } return null; } public boolean isCompleted() { return completed; } /** * 阻塞必定時間等待結果 * * @param timeout * @param unit * @return * @throws Throwable */ public List get(final long timeout, final TimeUnit unit) throws Throwable { return await(timeout, unit).getReceived(); } /** * 一直阻塞等待請求完成 * * @return * @throws Throwable */ public ObservableSubscriber await() throws Throwable { return await(Long.MAX_VALUE, TimeUnit.MILLISECONDS); } /** * 阻塞必定時間等待完成 * * @param timeout * @param unit * @return * @throws Throwable */ public ObservableSubscriber await(final long timeout, final TimeUnit unit) throws Throwable { subscription.request(Integer.MAX_VALUE); if (!latch.await(timeout, unit)) { throw new MongoTimeoutException("Publisher onComplete timed out"); } if (!errors.isEmpty()) { throw errors.get(0); } return this; } }
藉助這個基礎的工具類,咱們對於文檔的異步操做就變得簡單多了。
好比對於文檔查詢的操做能夠改造以下:
ObservableSubscriber subscriber = new ObservableSubscriber(); collection.find().subscribe(subscriber); //結果處理 subscriber.get(15, TimeUnit.SECONDS).forEach( d -> { System.out.println("Document:" + d.toJson()); });
固然,這個例子還有能夠繼續完善,好比使用 List 做爲緩存,則要考慮數據量的問題,避免將所有(或超量) 的文檔一次性轉入內存。