grpc客戶端類型

以前介紹了grpc的四種服務類型,在定義服務的時候能夠定義流式客戶端,實現客戶端以異步的方式返回多個對象,固然能夠返回單個對象以實現異步的操做。在grpc中能夠在客戶端上進行同步異步的調用。html

阻塞調用(blockingCall)

void blockingCall() {
    HelloServiceGrpc.HelloServiceBlockingStub stub = HelloServiceGrpc.newBlockingStub(channel);
    stub.simpleHello(person).getString();
}

這是最簡單的方式,經過block的形式調用rpc,並在當前線程返回結果。異步

Future直接調用(futureCallDirect)

void futureCallDirect() {
    HelloServiceGrpc.HelloServiceFutureStub stub = HelloServiceGrpc.newFutureStub(channel);
    ListenableFuture<ProtoObj.Result> response = stub.simpleHello(person);
    try {
        response.get();
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new RuntimeException(e);
    } catch (ExecutionException e) {
    }
}

這種方式是經過建立futureStub調用rpc並得到future對象,經過get方法來阻塞調用,這裏的future是guava包中的而不是jdk內置的。async

Future回調調用(furueCallCallback)

void futureCallCallback() {
    final CountDownLatch latch = new CountDownLatch(1);
    HelloServiceGrpc.HelloServiceFutureStub stub = HelloServiceGrpc.newFutureStub(channel);
    ListenableFuture<ProtoObj.Result> response = stub.simpleHello(person);
    Futures.addCallback(
            response,
            new FutureCallback<ProtoObj.Result>() {
                @Override
                public void onSuccess(@Nullable ProtoObj.Result result) {
                   System.out.println(result.getString());
                    latch.countDown();
                }
                @Override
                public void onFailure(Throwable t) {
                }
            },
            directExecutor());
}

同上前一種方式同樣,得到Future,而後經過Futures方法在其餘。ide

異步調用(asyncCall)

void asyncCall() {
    HelloServiceGrpc.HelloServiceStub stub = HelloServiceGrpc.newStub(channel);
    final CountDownLatch latch = new CountDownLatch(1);
    StreamObserver<ProtoObj.Result> responseObserver = new StreamObserver<ProtoObj.Result>() {
        @Override
        public void onNext(ProtoObj.Result value) {
            System.out.println(value.getString());
            latch.countDown();
        }
        @Override
        public void onError(Throwable t) {
        }
        @Override
        public void onCompleted() {
        }
    };
    stub.simpleHello(person, responseObserver);
    if (!Uninterruptibles.awaitUninterruptibly(latch, 1, TimeUnit.SECONDS)) {
        throw new RuntimeException("timeout!");
    }
}

在以前四種服務類型中已經使用過newStub,當時是用在客戶端流式服務上,其實newStub並不僅限於流式服務,任何rpc調用均可以使用其來實現異步調用。線程

異步底層方式調用

void advancedAsyncCall() {
	//使用方法的名字METHOD_SIMPLE_HELLO進行調用
    ClientCall<ProtoObj.Person, ProtoObj.Result> call = channel.newCall(HelloServiceGrpc.METHOD_SIMPLE_HELLO, CallOptions.DEFAULT);
    final CountDownLatch latch = new CountDownLatch(1);
    call.start(new ClientCall.Listener<ProtoObj.Result>() {
        @Override
        public void onMessage(ProtoObj.Result message) {

            System.out.println(Thread.currentThread().getName());
            System.out.println(message.getString());
            latch.countDown();
        }
    }, new Metadata());
    call.request(1);
    call.sendMessage(person);
    call.halfClose();
    if (!Uninterruptibles.awaitUninterruptibly(latch, 10, TimeUnit.SECONDS)) {
        throw new RuntimeException("timeout!");
    }
}

該種方式只是上面newStub的底層實現,沒有特殊需求不會使用這種方式進行rpc。code

相關文章
相關標籤/搜索