Vert.x 用RxJava響應式編程 譯<十一>

TIP:相應的代碼在step-8文件夾中(https://github.com/vert-x3/vertx-guide-for-java-devs)java

到目前爲止,咱們已經探討了Vert.x的多個領域,使用基於APIs的回調方法,這種編程模型在多鍾編程語言中能夠良好實現。而後,它會變得有的冗長乏味,尤爲是處理多個時間源或者多個數據流。git

這就是RxJava閃亮的地方,Vert.x能夠與之無縫鏈接。github

使用RxJava APIssql

除了回調的API,Vert.x模塊還提供了一個"Rxified"API,須要咱們在maven的pom.xml中加入依賴:數據庫

而後咱們把Verticles修改一下,須要將原來類中繼承的io.vertx.core.AbstractVerticle修改爲io.vertx.rxjava.core.AbstractVerticle,這有什麼不一樣呢?前面的類繼承了後者而且加入io.vertx.rxjava.core.Vertx屬性。編程

io.vertx.rxjava.core.Vertx定義了額外的rxSomething(…​)方法,等同於callback-based的存在。微信

讓咱們看看MainVerticle,來找一找在實踐中更好的實現:異步

rxDeploy方法沒有把Handler<AsyncResult<String>>做爲一個final的參數,並放回一個Single<String>maven

以及,這個操做並非在方法被調用執行,在你訂閱這個Single執行,當這個操做完成,它返回部署的 id 或者異常拋出的信息。編程語言

依次部署verticles

重構MainVerticle,咱們須要肯定部署操做觸發有序:

    1.flatMap中執行dbVerticleDeployment方法的結果,HttpServerVerticle部署的任務平常調度。

    2.當被訂閱的時候執行,成功或者失敗,MainVerticle的future會是完成或者失敗。

Rxifying" HttpServerVerticle

若是你按序看這個guide,編輯這個代碼,那麼你的HttpServerVerticle使用的仍是基於callback-based的API。在使用RxJava API執行異步的操做以前,你須要先重構HttpServerVerticle

Import RxJava versions of Vert.x classes

    1.咱們的backupHandler()方法依舊使用HttpResponse類,所以這是必須導入的。RxJava版本中提供的HttpResponse在某些狀況下能夠被替換,在step-8文件夾中的沒有導入這個類,由於經過lambda表達式推到的緣由。

委託"Rxified" vertx實例

當你有一個io.vertx.rxjava.core.Vertx的時候能夠調用一個io.vertx.core.Vertx實例,須要調整Verticle’s srart()建立WikiDatabaseService實例的方法:

同時執行受權查詢

在前面的例子中,咱們看到使用RxJava operators 和 Rxified Vert.x API 來依次執行異步操做。可是有時候這是不須要的,或者你只是想讓他們同時運行出於性能的緣由。

對於這樣的情景,HttpServerVerticle中的JWT 貼可能生成程序是個好的例子。爲了建立一個token,咱們須要權限查詢完成,可是查詢是獨立的:

    1.建立了三個Single 對象,表明了不一樣的權限查詢

    2.當三個操做完成,zip操做帶着results回調。

數據庫鏈接

從鏈接池中獲取一個數據庫鏈接,須要作的事情就是JDBCClient調用rxGetConnection

Single<SQLConnection> connection = dbClient.rxGetConnection();

這個方法返回Single<Connection>,這可使你容易的sql查詢

Single<ResultSet> resultSet = dbClient.rxQueryWithParams(
  sqlQueries.get(SqlQuery.GET_PAGE_BY_ID), new JsonArray().add(id));

當SQLConnection沒有用的時候,咱們怎麼釋放這個鏈接呢?一個簡單方便的方式是調用關閉close方法當

Single<SQLConnection>沒有被訂閱:

    1.當鏈接被請求的時候,咱們把它放入Single對象中

    2.當沒有被訂閱的時候,Single調用close方法

如今當咱們須要的額時候就可使用getConnection來執行sql查詢

爲callbacks和RxJava之間搭橋

這個時候,你可能混淆了RxJava 代碼和callback-based API,例如service proxy 接口定義爲callbacks,可是實現使用了Vert.x Rxified API。

在這個例子中,io.vertx.rx.java.RxHelper能夠適配Handler<AsyncResult<T>>到RxJava Subscriber<T>:

1.fetchAllPagesData是一個一個異步的service proxy操做,在Handler<AsyncResult<List<JsonObject>>>調用中定義

2.toSubscriber適配resultHandler到Subscriber<List<JsonObject>>

數據流

RxJava不只僅在組合不一樣的時間源作的很好,對於數據流也是很是好的,不像Vert.x或者JDK中的fututre,Observable能夠發佈事件流,而不是單個,它配備了一套普遍的數據操做,咱們可使用其中的把一部分來重構咱們的fetchAllPages database verticle 方法:

    1.利用flatMapObservable咱們能夠從Single<Result>發起中建立Observable

    2.from方法將數據庫results迭代入Observable

    3.由於咱們只須要網頁名稱,能夠映射的每一個JSONObject到第一列

    4.客戶端指望數據是按字母順序排列

    5.event bus service由一個JsonArray組成,collect用來JsonArray::new建立一個新的,而後添加JsonArray::add

 

 

原文連接:http://vertx.io/docs/guide-for-java-devs/

個人微信公衆號:

相關文章
相關標籤/搜索