以前介紹了grpc的四種服務類型,在定義服務的時候能夠定義流式客戶端,實現客戶端以異步的方式返回多個對象,固然能夠返回單個對象以實現異步的操做。在grpc中能夠在客戶端上進行同步異步的調用。html
void blockingCall() { HelloServiceGrpc.HelloServiceBlockingStub stub = HelloServiceGrpc.newBlockingStub(channel); stub.simpleHello(person).getString(); }
這是最簡單的方式,經過block的形式調用rpc,並在當前線程返回結果。異步
void futureCallDirect() { HelloServiceGrpc.HelloServiceFutureStub stub = HelloServiceGrpc.newFutureStub(channel); ListenableFuture<ProtoObj.Result> response = stub.simpleHello(person); try { response.get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } catch (ExecutionException e) { } }
這種方式是經過建立futureStub調用rpc並得到future對象,經過get方法來阻塞調用,這裏的future是guava包中的而不是jdk內置的。async
void futureCallCallback() { final CountDownLatch latch = new CountDownLatch(1); HelloServiceGrpc.HelloServiceFutureStub stub = HelloServiceGrpc.newFutureStub(channel); ListenableFuture<ProtoObj.Result> response = stub.simpleHello(person); Futures.addCallback( response, new FutureCallback<ProtoObj.Result>() { @Override public void onSuccess(@Nullable ProtoObj.Result result) { System.out.println(result.getString()); latch.countDown(); } @Override public void onFailure(Throwable t) { } }, directExecutor()); }
同上前一種方式同樣,得到Future,而後經過Futures方法在其餘。ide
void asyncCall() { HelloServiceGrpc.HelloServiceStub stub = HelloServiceGrpc.newStub(channel); final CountDownLatch latch = new CountDownLatch(1); StreamObserver<ProtoObj.Result> responseObserver = new StreamObserver<ProtoObj.Result>() { @Override public void onNext(ProtoObj.Result value) { System.out.println(value.getString()); latch.countDown(); } @Override public void onError(Throwable t) { } @Override public void onCompleted() { } }; stub.simpleHello(person, responseObserver); if (!Uninterruptibles.awaitUninterruptibly(latch, 1, TimeUnit.SECONDS)) { throw new RuntimeException("timeout!"); } }
在以前四種服務類型中已經使用過newStub,當時是用在客戶端流式服務上,其實newStub並不僅限於流式服務,任何rpc調用均可以使用其來實現異步調用。線程
void advancedAsyncCall() { //使用方法的名字METHOD_SIMPLE_HELLO進行調用 ClientCall<ProtoObj.Person, ProtoObj.Result> call = channel.newCall(HelloServiceGrpc.METHOD_SIMPLE_HELLO, CallOptions.DEFAULT); final CountDownLatch latch = new CountDownLatch(1); call.start(new ClientCall.Listener<ProtoObj.Result>() { @Override public void onMessage(ProtoObj.Result message) { System.out.println(Thread.currentThread().getName()); System.out.println(message.getString()); latch.countDown(); } }, new Metadata()); call.request(1); call.sendMessage(person); call.halfClose(); if (!Uninterruptibles.awaitUninterruptibly(latch, 10, TimeUnit.SECONDS)) { throw new RuntimeException("timeout!"); } }
該種方式只是上面newStub的底層實現,沒有特殊需求不會使用這種方式進行rpc。code