订阅(Subscriptions)
订阅查询(Subscription Queries)
Graphql 订阅(subscriptions)使你可以让你订阅响应式数据源(reactive
source) 。当有新数据时,会发送给订阅者。可以阅读
来了解订阅的背景知识。假设你有一个股票服务。可以用这个 graphql 语句来订阅它的数据:
subscription StockCodeSubscription { stockQuotes(stockCode:"IBM') { dateTime stockCode stockPrice stockPriceChange }}
股票价格变化时,graphql 订阅 可以把 ExecutionResult
不同的是,一开始的查询结果是一个响应式流(reactive-streams)
Publisher(流发布者)
对象。通过对象可以获取未来的数据。 你需要使用 SubscriptionExecutionStrategy
策略作为执行策略(execution
GraphQL graphQL = GraphQL .newGraphQL(schema) .subscriptionExecutionStrategy(new SubscriptionExecutionStrategy()) .build();ExecutionResult executionResult = graphQL.execute(query);PublisherstockPriceStream = executionResult.getData();
这里的 Publisher<ExecutionResult>
就是流事件的发布者【译注:原文
GraphQL graphQL = GraphQL .newGraphQL(schema) .subscriptionExecutionStrategy(new SubscriptionExecutionStrategy()) .build();String query = "" + " subscription StockCodeSubscription {\n" + " stockQuotes(stockCode:\"IBM') {\n" + " dateTime\n" + " stockCode\n" + " stockPrice\n" + " stockPriceChange\n" + " }\n" + " }\n";ExecutionResult executionResult = graphQL.execute(query);PublisherstockPriceStream = executionResult.getData();AtomicReference subscriptionRef = new AtomicReference<>();stockPriceStream.subscribe(new Subscriber () { @Override public void onSubscribe(Subscription s) { subscriptionRef.set(s); s.request(1); } @Override public void onNext(ExecutionResult er) { // // process the next stock price // processStockPriceChange(er.getData()); // // ask the publisher for one more item please // subscriptionRef.get().request(1); } @Override public void onError(Throwable t) { // // The upstream publishing data source has encountered an error // and the subscription is now terminated. Real production code needs // to decide on a error handling strategy. // } @Override public void onComplete() { // // the subscription has completed. There is not more data // }});
需要编写 reactive-streams 代码去消费一源源不断的
ExecutionResults
。你可以在 中看到更reactive-streams 代码的编写细节。 >><<RxJava
是这个流行的 reactive-streams 实现。在
graphql-java 只是产出一个流对象。它不关心如何在网络上用 web sockets
或其它手段发送流数据 。虽然这很重要,但不是作为基础 graphql-java库应该做的。我们编写了一个 websockets 的(基于 Jetty)
模拟股票报价的示例应用。它使用了 RxJava。详见
关于订阅服务的 Data Fetchers
订阅字段的 DataFetcher
的职责是生成一个 Publisher
。这个 Publisher
你会像这样子去编写Data Fetcher:
DataFetcher> publisherDataFetcher = new DataFetcher >() { @Override public Publisher get(DataFetchingEnvironment environment) { String stockCodeArg = environment.getArgument("stockCode"); return buildPublisherForStockCode(stockCodeArg); }};
如何获取流事件,就由你的 reactive code 来决定 了。graphql-java
会帮助你从流对象中获取 graphql 字段(fields)。像一般的 graphql查询一样。