grpc入門(三)

grpc入門(三)java

一.介紹git

  本文是關於grpc的第三篇博文,是對前兩篇博文的具體代碼實現,秉着我的一向的風格,沒有太多抒情和總結,直接就上代碼。github

  文章代碼參考:https://github.com/grpc/grpc-java/tree/master/examples/src/main/java/io/grpc/examples服務器

二.grpc具體實現ide

2.1 服務器端的編寫ui

public class GrpcServer {

    private Server server;
    
    public void start() throws IOException {
        int port = 8899;
        server = ServerBuilder.forPort(port)
                .addService(new DemoService())
                .build()
                .start();
    }
    
    public void stop() {
        if(null != server) {
            server.shutdown();
        }
    }
    
    public void blockingUtilShutdown() throws InterruptedException {
        if(null != server) {
            server.awaitTermination();
        }
    }
    
    public static void main(String[] args) throws Exception {
        GrpcServer grpcServer = new GrpcServer();
        grpcServer.start();
        grpcServer.blockingUtilShutdown();
    }
}

2.2 服務器端對接口的實現this

public class DemoService extends DemoGrpc.DemoImplBase{

    /**
     * rpc GetUserById(MyRequest) returns (MyResponse);
     */
    @Override
    public void getUserById(MyRequest request, StreamObserver<MyResponse> responseObserver) {
        System.out.println("客戶端的參數: " + request.getId());
        responseObserver.onNext(MyResponse.newBuilder().setRealname("張三").build());
        responseObserver.onCompleted();
    }
    
    /**
     * rpc GetInfos(InfoRequest) returns (InfoResponse);
     */
    @Override
    public void getInfos(InfoRequest request, StreamObserver<InfoResponse> responseObserver) {
        System.out.println("客戶端請求數據: " + request.getMsg());
        
        Map<Long, String> m1 = new HashMap<>();
        m1.put(1L, "AAAA");
        m1.put(2L, "BBBB");
        Info i1 = Info.newBuilder().setAge(10).setName("張三").setFlag(false).putAllOthers(m1).build();
        
        Map<Long, String> m2 = new HashMap<>();
        m2.put(3L, "XXXX");
        m2.put(4L, "YYYY");
        Info i2 = Info.newBuilder().setAge(20).setName("李四").setFlag(true).putAllOthers(m2).build();
        
        responseObserver.onNext(InfoResponse.newBuilder().addAllInfos(Arrays.asList(i1, i2)).build());
        responseObserver.onCompleted();
    }
    
    /**
     * rpc Greeting(stream GreetRequest) returns (GreetResponse);
     */
    @Override
    public StreamObserver<GreetRequest> greeting(StreamObserver<GreetResponse> responseObserver) {
        return new StreamObserver<GreetRequest>() {
            //該方法會等待着客戶端的onCompleted()方法被調用,若是客戶端調用了onCompleted()方法,那麼該方法會被調用
            @Override
            public void onCompleted() {
                responseObserver.onNext(GreetResponse.newBuilder().setDate(new Date() + "").setMsg("data1").build());
                responseObserver.onCompleted();
            }

            @Override
            public void onError(Throwable throwable) {
            }
            
            //該方法會一直監聽客戶端的 onNext()方法,若是客戶端調用了onNext()方法,那麼該方法就會被觸發
            @Override
            public void onNext(GreetRequest request) {
                System.out.println("客戶端的數據: " + request.getName());
            }
        };
    }
    
    /**
     * rpc GetPeoplesByName(PeopleRequest) returns (stream PeopleList);
     */
    @Override
    public void getPeoplesByName(PeopleRequest request, StreamObserver<PeopleList> responseObserver) {
        System.out.println("客戶端請求的數據: " + request.getName());
        
        People p1 = People.newBuilder().setAge(10).setHeight(166.67f).setName("張三").setMoney(45).setIsMarried(true).build();
        People p2 = People.newBuilder().setAge(20).setHeight(176.67f).setName("李四").setMoney(4500).setIsMarried(true).build();
        People p3 = People.newBuilder().setAge(30).setHeight(186.67f).setName("王五").setMoney(45.90).setIsMarried(false).build();
        
        PeopleList list = PeopleList.newBuilder().addAllPeoples(Arrays.asList(p1, p2, p3)).build();
        
        responseObserver.onNext(list);
        responseObserver.onCompleted();
    }
    
    /**
     * rpc GetStudents(stream StudentRequest) returns (stream StudentList);
     */
    @Override
    public StreamObserver<StudentRequest> getStudents(StreamObserver<StudentList> responseObserver) {
        return new StreamObserver<StudentRequest>() {

            @Override
            public void onCompleted() {
                Student s1 = Student.newBuilder().setName("張三").setScore(100).build();
                Student s2 = Student.newBuilder().setName("李四").setScore(90).build();
                
                StudentList l1 = StudentList.newBuilder().addAllStudents(Arrays.asList(s1, s2)).build();
                
                responseObserver.onNext(l1);
                responseObserver.onCompleted();
            }

            @Override
            public void onError(Throwable throwable) {
            }

            @Override
            public void onNext(StudentRequest studentRequest) {
                System.out.println(studentRequest.getInfosMap());
            }
        };
    }
}

2.3 客戶端的編寫spa

public class GrpcClient {
    
    private ManagedChannel channel;
    
    //阻塞的方式,一元的方法和請求爲非流式數據均可以採用該方式
    private DemoGrpc.DemoBlockingStub blockingStub; 
    
    //非阻塞方式,全部的方法均可以採用這種方式,對於請求爲流式數據,必須使用該方式
    private DemoGrpc.DemoStub demoStub;
    
    public GrpcClient(String host, int port) {
        this(ManagedChannelBuilder.forAddress(host, port)
                .usePlaintext(true)
                .build());
    }
    
    public GrpcClient(ManagedChannel channel) {
        this.channel = channel;
        this.blockingStub = DemoGrpc.newBlockingStub(channel);
        this.demoStub = DemoGrpc.newStub(channel);
    }
    
    /** 
     * 一元方法, 普通的調用
     * rpc GetUserById(MyRequest) returns (MyResponse);
     */
    public void getUserById() {
        MyResponse resp = this.blockingStub.getUserById(MyRequest.newBuilder().setId(80000).build());
        System.out.println("服務端返回的數據: " + resp.getRealname());
    }
    
    /**
     * rpc GetInfos(InfoRequest) returns (InfoResponse);
     */
    public void getInfos() {
        InfoResponse resp = this.blockingStub.getInfos(InfoRequest.newBuilder().setMsg("信息一").build());
        resp.getInfosList().forEach(info -> {
            System.out.println(info.getAge() + ";;" + info.getName() + ";;" + info.getFlag());
            info.getOthersMap().forEach((k, v) -> {
                System.out.println(k + " == " + v);
            });
            System.out.println("------------------------------------------------");
        });
    }
    
    /**
     * rpc Greeting(stream GreetRequest) returns (GreetResponse);
     */
    public void greeting() {
        StreamObserver<GreetRequest> requestStream = demoStub.greeting(new StreamObserver<GreetResponse>() {
            //服務端的onNext()方法調用後,該方法會被調用
            @Override
            public void onNext(GreetResponse resp) {
                System.out.println(resp.getDate() + ";;" + resp.getMsg());
            }
            
            @Override
            public void onError(Throwable throwable) {
            }
            
            //服務端的onCompleted()方法調用後,該方法會被調用。
            @Override
            public void onCompleted() {
                System.out.println("服務端的onComplete()方法執行完畢");
            }
        });
        
        requestStream.onNext(GreetRequest.newBuilder().setName("張三號").build());
        requestStream.onCompleted();
    }
    
    /**
     * rpc GetPeoplesByName(PeopleRequest) returns (stream PeopleList);
     */
    public void getPeoplesByName() {
        PeopleRequest req = PeopleRequest.newBuilder().setName("劉XX").build();
        
        demoStub.getPeoplesByName(req, new StreamObserver<PeopleList>() {
            @Override
            public void onCompleted() {
            }
            
            @Override
            public void onError(Throwable throwable) {
            }
            
            @Override
            public void onNext(PeopleList peopleList) {
                List<People> list = peopleList.getPeoplesList();
                list.forEach(p -> {
                    System.out.println(p.getAge() + ";;" + p.getHeight() + ";;" + p.getMoney()
                                            + ";;" + p.getIsMarried() + ";;" + p.getName());
                });
            }
        });
    }
    
    /**
     * rpc GetStudents(stream StudentRequest) returns (stream StudentList);
     * @throws InterruptedException 
     */
    public void getStudentsList() throws InterruptedException {
        StreamObserver<StudentRequest> requestStream = demoStub.getStudents(new StreamObserver<StudentList>() {
            @Override
            public void onCompleted() {
            }

            @Override
            public void onError(Throwable throwable) {
            }

            @Override
            public void onNext(StudentList studentList) {
                Iterator<Student> iter = studentList.getStudentsList().iterator();
                while(iter.hasNext()) {
                    Student s = iter.next();
                    System.out.println(s.getName() + ";;" + s.getScore());
                }
            }
        });
        
        Map<String, String> map = new HashMap<>();
        map.put("xx", "XX");
        map.put("zz", "ZZ");
        
        requestStream.onNext(StudentRequest.newBuilder().putAllInfos(map).build());
        requestStream.onCompleted();
    }
    
    public static void main(String[] args) throws InterruptedException {
        GrpcClient client = new GrpcClient("localhost", 8899);
        //client.getUserById();
        //client.getInfos();
        //client.greeting();
        client.getStudentsList();
        Thread.sleep(10000);
    }
}
相關文章
相關標籤/搜索