原文:http://blog.mygraphql.com/wordpress/?p=106html
Graphql 訂閱(subscriptions)使你能夠讓你訂閱響應式數據源(reactive
source) 。當有新數據時,會發送給訂閱者。java
能夠閱讀 http://graphql.org/blog/subsc...
來了解訂閱的背景知識。react
假設你有一個股票服務。能夠用這個 graphql 語句來訂閱它的數據:git
subscription StockCodeSubscription { stockQuotes(stockCode:"IBM') { dateTime stockCode stockPrice stockPriceChange } }
股票價格變化時,graphql 訂閱 能夠把 ExecutionResult
對象以流的方式傳送給訂閱者。和其它 graphql 查詢同樣,只會發送指定的字段
。github
不一樣的是,一開始的查詢結果是一個響應式流(reactive-streams)Publisher(流發佈者)
對象。經過對象能夠獲取將來的數據。web
你須要使用 SubscriptionExecutionStrategy
策略做爲執行策略(execution
strategy)。由於它支持 reactive-streams APIs.websocket
GraphQL graphQL = GraphQL .newGraphQL(schema) .subscriptionExecutionStrategy(new SubscriptionExecutionStrategy()) .build(); ExecutionResult executionResult = graphQL.execute(query); Publisher<ExecutionResult> stockPriceStream = executionResult.getData();
這裏的 Publisher<ExecutionResult>
就是流事件的發佈者【譯註:原文
publisher of a stream of events】。你須要編寫你本身的流處理代碼,如:網絡
GraphQL graphQL = GraphQL .newGraphQL(schema) .subscriptionExecutionStrategy(new SubscriptionExecutionStrategy()) .build(); String query = "" + " subscription StockCodeSubscription {\n" + " stockQuotes(stockCode:\"IBM') {\n" + " dateTime\n" + " stockCode\n" + " stockPrice\n" + " stockPriceChange\n" + " }\n" + " }\n"; ExecutionResult executionResult = graphQL.execute(query); Publisher<ExecutionResult> stockPriceStream = executionResult.getData(); AtomicReference<Subscription> subscriptionRef = new AtomicReference<>(); stockPriceStream.subscribe(new Subscriber<ExecutionResult>() { @Override public void onSubscribe(Subscription s) { subscriptionRef.set(s); s.request(1); } @Override public void onNext(ExecutionResult er) { // // process the next stock price // processStockPriceChange(er.getData()); // // ask the publisher for one more item please // subscriptionRef.get().request(1); } @Override public void onError(Throwable t) { // // The upstream publishing data source has encountered an error // and the subscription is now terminated. Real production code needs // to decide on a error handling strategy. // } @Override public void onComplete() { // // the subscription has completed. There is not more data // } });
須要編寫 reactive-streams 代碼去消費一源源不斷的ExecutionResults
。你能夠在 http://www.reactive-streams.org/ 中看到更
reactive-streams 代碼的編寫細節。socket
>><<RxJava
是這個流行的 reactive-streams 實現。在
http://reactivex.io/intro.html 中能夠看到更多建立Publishers 數據 和
Subscriptions 數據的細節。ide
graphql-java 只是產出一個流對象。它不關心如何在網絡上用 web sockets
或其它手段發送流數據 。雖然這很重要,但不是做爲基礎 graphql-java
庫應該作的。
咱們編寫了一個 websockets 的(基於 Jetty)
模擬股票報價的示例應用。它使用了 RxJava。
詳見 https://github.com/graphql-ja...
訂閱字段的 DataFetcher
的職責是生成一個 Publisher
。這個 Publisher
輸出的每個對象,將會經過 graphql 查詢來映射。而後做爲執行結果返回。
你會像這樣子去編寫Data Fetcher:
DataFetcher<Publisher<StockInfo>> publisherDataFetcher = new DataFetcher<Publisher<StockInfo>>() { @Override public Publisher<StockInfo> get(DataFetchingEnvironment environment) { String stockCodeArg = environment.getArgument("stockCode"); return buildPublisherForStockCode(stockCodeArg); } };
如何獲取流事件,就由你的 reactive code 來決定 了。graphql-java會幫助你從流對象中獲取 graphql 字段(fields)。像通常的 graphql查詢同樣。