次時代Java編程(一):續 vertx-sync實踐

以前介紹過quasar,若是你但願在vert.x項目裏使用coroutine的話,建議使用vertx-sync。本篇將介紹vertx-sync。java

圖片描述

原本打算另起一篇,寫其餘方面的東西,可是最近比較忙,就先寫一篇實踐方面的文章。git

vertx-sync是什麼

上一篇咱們已經講了 Fiber 相關的知識,想必你們對Java實現相似Golang的coroutine已經有印象了,既然Java世界裏有第三方提供了這麼好的庫,那咱們就看看怎麼跟 vert.x 結合起來使用。 github

vert.x官方爲了解決異步代碼編寫的困難,使之更加同步化對開發人員更友好,便基於quasar包裝了一個的同步庫,vertx-sync,該庫的做者一樣也是vert.x的原做者,因此完成度仍是很高的。 sql

vertx-sync 對外只是暴露了幾個簡單的靜態API,來完成對vert.x體系內一系列的操做包裝,其實主要也就是三靜態API而已。數據庫

  • Sync.fiberHandler 若是你但願你的handler裏有一些邏輯須要在Fiber裏運行,則你的handler必須用這個方法包一下。編程

  • Sync.awaitEvent 從Handler裏返回一個事件結果(同步的),且 不會阻塞EventLoop後端

  • Sync.awaitResult 從Handler裏返回一個異步事件結果(同步的),且 不會阻塞EventLoop微信

直接看介紹可能有點不直觀,下面跑幾個例子。架構

使用vertx-sync

以前介紹過quasar,若是你但願在項目裏使用coroutine的話,須要在JVM裏設置一個參數,用於應用啓動前修改字節碼(注入一些中斷方法),從而能夠達到協程的目的。框架

具體方法也很簡單。

-javaagent:/path/to/the/quasar-core-0.7.5-jdk8.jar

若是是基於Maven跑單元測試,那隻須要引用quasar instrument的插件就能夠裏

<plugin>
    <groupId>com.vlkan</groupId>
    <artifactId>quasar-maven-plugin</artifactId>
    <version>0.7.3</version>
    <configuration>
        <check>true</check>
        <debug>true</debug>
        <verbose>true</verbose>
    </configuration>
    <executions>
        <execution>
            <goals>
                <goal>instrument</goal>
            </goals>
        </execution>
    </executions>
    <dependencies>
        <dependency>
            <groupId>co.paralleluniverse</groupId>
            <artifactId>quasar-core</artifactId>
            <version>0.7.5</version>
        </dependency>
    </dependencies>
</plugin>

上面是一些很是必要的準備工做,不然你沒法使用quasar以及vertx-sync。

vertx定時器例子

以前經過vert.x調用定時器,須要傳一個回調handler,而後全部的代碼邏輯都包在裏面。

vertx.setTimer(1000L, h -> {
    System.out.println("time's up");
});

如今咱們來從新塑造一下三觀。

awaitEvent(h -> vertx.setTimer(1000L, h));
System.out.println("time's up");

這裏定時器會阻塞在awaitEvent這一行,直到一秒後纔會執行下面的一行。有點相似執行 Thread.sleep(1000L),可是並不會阻塞 EventLoop 由於quasar會在EventLoop基礎之上再開啓一個fiber。

下面我看個稍微複雜點的例子。

HTTP Client請求例子

咱們先用傳統的回調方式使用vert.x的HttpClient API。

HttpClientRequest httpClientRequest = vertx.createHttpClient().get("leapcloud.cn");
httpClientRequest.handler(response -> {
    response.handler(responseBody -> {
        System.out.println(responseBody.toString());
    });
}).end();

這裏有兩層回調嵌套,一層是獲得Http的Response,另外一層是從Response裏獲得詳細的body。由於有lambda表達式才使得Java如今看起來並非那麼噁心。可是若是咱們須要根據body的內容進一步作判斷去繼續請求其餘頁面,則嵌套會變的很是的深。下面嘗試改形成sync方式看看。

HttpClientRequest httpClientRequest = vertx.createHttpClient().get("leapcloud.cn");
HttpClientResponse response = awaitEvent(Sync::fiberHandler);
Buffer body = awaitEvent(response::handler);
System.out.println(body.toString());

額,是否是感受看着很舒服,無嵌套,直接順序下來,很是的直觀,加上Java8特有的方法引用,會讓代碼更精簡。

經過vertx-sync使用Vert.x JDBC

寫過vert.x同窗確定知道其vertx-jdbc-client爲了使其兼容異步開發模型,將JDBC的底層線程池用異步方式包裝了一下,也就是說JDBC層仍是經過線程池去訪問數據庫的,可是是經過vert.x的context作了層封裝,使其能夠將結果放到對應的 EventLoop 裏,這樣比較符合vert.x的開發風格。可是帶來的弊端就是嵌套太深。

final JDBCClient client = JDBCClient.createShared(vertx, new JsonObject()
.put("url", "jdbc:hsqldb:mem:test?shutdown=true")
.put("driver_class", "org.hsqldb.jdbcDriver")
.put("max_pool_size", 30));

client.getConnection(conn -> {
    if (conn.failed()) {
        System.err.println(conn.cause().getMessage());
        return;
    }

    final SQLConnection connection = conn.result();
    connection.execute("create table test(id int primary key, name varchar(255))", res -> {
        if (res.failed()) {
            throw new RuntimeException(res.cause());
        }
        // insert some test data
        connection.execute("insert into test values(1, 'Hello')", insert -> {
            // query some data
            connection.query("select * from test", rs -> {
                for (JsonArray line : rs.result().getResults()) {
                    System.out.println(line.encode());
                }

                // and close the connection
                connection.close(done -> {
                    if (done.failed()) {
                        throw new RuntimeException(done.cause());
                    }
                });
            });
        });
    });
});

上面代碼能夠是否是有點噁心呢?嘗試改造一下吧。

final JDBCClient client = JDBCClient.createShared(vertx, new JsonObject()
.put("url", "jdbc:hsqldb:mem:test?shutdown=true")
.put("driver_class", "org.hsqldb.jdbcDriver")
.put("max_pool_size", 30));

try (SQLConnection conn = awaitResult(jdbc::getConnection)) {
    awaitResult(h -> conn.execute("create table test(id int primary key, name varchar(255))", h));
    awaitResult(h -> conn.execute("insert into test values(1, 'Hello')", h));
    ResultSet query = awaitResult(h -> conn.query("select * from test", h));
    for (JsonArray line : query.result.getResults()) {
        System.out.println(line.encode());
    }
    AsyncResult done = awaitResult(h -> conn.close(h));
    if (done.failed()) {
        throw new RuntimeException(done.cause())
    }
} catch (Exception e) {
    e.printStackTrace();
}

除了一個try catch,其餘都沒有嵌套,總體邏輯的可讀性很是高,徹底是線性的。

如何將邏輯放倒Fiber裏

你也許會發現咱們彷佛一直都沒有用到 fiberHandler 這個靜態方法,上面雖然寫了定義,可能你們仍是不可以理解,這裏結合場景也許能更好理解。

咱們嘗試實現一個操做很耗時的邏輯而後包到fiber裏,避免 EventLoop 被阻塞。這裏你也許會很好奇,既然 Fiber 這麼廉價開啓10萬8萬的無所謂啊,恩,這裏再提一下quasar的重點部分: fiber能夠很廉價的被創造出來,可是他本質上仍是跑在一個線程上面,若是其中一個fiber執行了很是耗時的操做,則後面的fiber會一直等待,從而形成整個線程阻塞。 也就是說一個fiber不能執行很是耗時的操做,好比計算100萬之內的素數之和,對於這種操做,咱們能夠經過直接將邏輯放到vert.x的worker線程裏單獨去跑,而後經過fiber包裝一下就能夠了。

AsyncResult<Long> result = awaitResult(fiberHandler(h -> vertx.executeBlocking((Handler<Future<Long>>) event -> {
    //求百萬之內素數之和,這裏的邏輯會在vert.x的worker線程裏跑。隨便耗時多久,都不會阻塞EventLoop
    long sum = sumOfPrime(1, 000, 000);
    event.complete(sum);
}, h)));
//打印結果
System.out.println(result.result());

這裏你會注意到 awaitReslt 裏用了 fiberHandler ,由於executeBlocking裏的 handler 邏輯自己並無跑在fiber體系下,因此會致使無效,而fiberHandler的做用就是將一段vert.x的handler包到 fiber 裏。使以後續的await能夠將其結果返回,這裏使用awaitResult返回結果。
咱們再深刻一點看看 fiberHandler 方法裏到底幹了什麼。

@Suspendable
public static <T> Handler<T> fiberHandler(Handler<T> handler) {
  FiberScheduler scheduler = getContextScheduler();
  return p -> new Fiber<Void>(scheduler, () -> handler.handle(p)).start();
}

這裏獲取Fiber的調度器,而後直接new了一個 Fiber ,避免了咱們本身對邏輯作Fiber包裝。是否是很簡單呢。

總結

相比較傳統的回調Handler,vertx-sync的包裝十分優雅,基本能夠恢復到同步方法調用級別,很好的減輕了異步回調帶來的心智負擔。

可是這個畢竟不是JVM級別的實現,因此或多或少仍是有點門檻的,好比部署的時候,須要經過設置JVM參數來修改部分字節碼,同時還要注意一些須要掛起的方法上面加註釋或者強行讓其拋出可中斷異常。我的建議在一些不重要的工具級項目裏使用,很是重要的項目不推薦使用,固然了若是你以爲你的業務只須要依賴vert.x那麼強烈你推薦你使用,只要記得打開 BlockingChecker 就好,能夠即時的發現潛在的阻塞邏輯。

責編:另外7.24號,咱們力譜宿雲攜手Vert.x中國用戶組在太庫·上海的贊助下舉辦了一場關於Vert.x的技術,以後還會有系列活動,有興趣的同窗們能夠關注咱們的微信公衆號:MaxLeap_yidongyanfa,瞭解更多資訊。


做者往期佳做
次時代Java編程(一) Java裏的協程


做者信息
本文系力譜宿雲 LeapCloud旗下MaxLeap團隊_UX成員:劉小溪 【原創】
力譜宿雲首發地址:https://blog.maxleap.cn/archi...
劉小溪,Maxleap的高級開發工程師,喜歡倒騰一些有意思的技術框架,對新的技術以及語言很是有興趣,之前在shopex擔任架構師,目前在Maxleap負責基礎架構以及服務框架這塊技術,同時也會對Vert.x的社區提供一些開源上的支持。

關於MaxLeap
MaxLeap 是力譜宿雲推出,爲移動應用開發、運營提供一站式後端雲服務, 包括應用開發所需的後端雲數據庫、雲數據源、雲代碼、雲容器、 IM、移動支付、應用內社交、第三方登陸、社交分享等基礎服務,以及針對應用運營的數據分析、推送營銷,用戶支持等服務, 覆蓋移動應用的研發、運營完整生命週期。
MaxLeap 致力於讓移動應用開發運營更快速簡單。

官網:https://maxleap.cn/
歡迎掃二維碼,關注咱們

clipboard.png

相關文章
相關標籤/搜索