graphql-java使用手冊:part4 訂閱(Subscriptions)

原文:http://blog.mygraphql.com/wordpress/?p=106html

訂閱(Subscriptions)

訂閱查詢(Subscription Queries)

Graphql 訂閱(subscriptions)使你能夠讓你訂閱響應式數據源(reactive
source) 。當有新數據時,會發送給訂閱者。java

能夠閱讀 http://graphql.org/blog/subsc...
來了解訂閱的背景知識。react

假設你有一個股票服務。能夠用這個 graphql 語句來訂閱它的數據:git

subscription StockCodeSubscription {
    stockQuotes(stockCode:"IBM') {
        dateTime
        stockCode
        stockPrice
        stockPriceChange
    }
}

股票價格變化時,graphql 訂閱 能夠把 ExecutionResult
對象以流的方式傳送給訂閱者。和其它 graphql 查詢同樣,只會發送指定的字段
github

不一樣的是,一開始的查詢結果是一個響應式流(reactive-streams)
Publisher(流發佈者) 對象。經過對象能夠獲取將來的數據。web

你須要使用 SubscriptionExecutionStrategy 策略做爲執行策略(execution
strategy)。由於它支持 reactive-streams APIs.websocket

GraphQL graphQL = GraphQL
        .newGraphQL(schema)
        .subscriptionExecutionStrategy(new SubscriptionExecutionStrategy())
        .build();

ExecutionResult executionResult = graphQL.execute(query);

Publisher<ExecutionResult> stockPriceStream = executionResult.getData();

這裏的 Publisher<ExecutionResult> 就是流事件的發佈者【譯註:原文
publisher of a stream of events】。你須要編寫你本身的流處理代碼,如:網絡

GraphQL graphQL = GraphQL
        .newGraphQL(schema)
        .subscriptionExecutionStrategy(new SubscriptionExecutionStrategy())
        .build();

String query = "" +
        "    subscription StockCodeSubscription {\n" +
        "        stockQuotes(stockCode:\"IBM') {\n" +
        "            dateTime\n" +
        "            stockCode\n" +
        "            stockPrice\n" +
        "            stockPriceChange\n" +
        "        }\n" +
        "    }\n";

ExecutionResult executionResult = graphQL.execute(query);

Publisher<ExecutionResult> stockPriceStream = executionResult.getData();

AtomicReference<Subscription> subscriptionRef = new AtomicReference<>();
stockPriceStream.subscribe(new Subscriber<ExecutionResult>() {

    @Override
    public void onSubscribe(Subscription s) {
        subscriptionRef.set(s);
        s.request(1);
    }

    @Override
    public void onNext(ExecutionResult er) {
        //
        // process the next stock price
        //
        processStockPriceChange(er.getData());

        //
        // ask the publisher for one more item please
        //
        subscriptionRef.get().request(1);
    }

    @Override
    public void onError(Throwable t) {
        //
        // The upstream publishing data source has encountered an error
        // and the subscription is now terminated.  Real production code needs
        // to decide on a error handling strategy.
        //
    }

    @Override
    public void onComplete() {
        //
        // the subscription has completed.  There is not more data
        //
    }
});

須要編寫 reactive-streams 代碼去消費一源源不斷的
ExecutionResults。你能夠在 http://www.reactive-streams.org/ 中看到更
reactive-streams 代碼的編寫細節。socket

>><<RxJava是這個流行的 reactive-streams 實現。在
http://reactivex.io/intro.html 中能夠看到更多建立Publishers 數據 和
Subscriptions 數據的細節。ide

graphql-java 只是產出一個流對象。它不關心如何在網絡上用 web sockets
或其它手段發送流數據 。雖然這很重要,但不是做爲基礎 graphql-java
庫應該作的。

咱們編寫了一個 websockets 的(基於 Jetty)
模擬股票報價的示例應用。它使用了 RxJava。

詳見 https://github.com/graphql-ja...

關於訂閱服務的 Data Fetchers

訂閱字段的 DataFetcher 的職責是生成一個 Publisher。這個 Publisher
輸出的每個對象,將會經過 graphql 查詢來映射。而後做爲執行結果返回。

你會像這樣子去編寫Data Fetcher:

DataFetcher<Publisher<StockInfo>> publisherDataFetcher = new DataFetcher<Publisher<StockInfo>>() {
    @Override
    public Publisher<StockInfo> get(DataFetchingEnvironment environment) {
        String stockCodeArg = environment.getArgument("stockCode");
        return buildPublisherForStockCode(stockCodeArg);
    }
};

如何獲取流事件,就由你的 reactive code 來決定 了。graphql-java會幫助你從流對象中獲取 graphql 字段(fields)。像通常的 graphql查詢同樣。

相關文章
相關標籤/搜索