基於Netty和SpringBoot實現一個輕量級RPC框架-Client端請求響應同步化處理

前提

前置文章:java

前一篇文章簡單介紹了經過動態代理完成了Client端契約接口調用轉換爲發送RPC協議請求的功能。這篇文章主要解決一個遺留的技術難題:請求-響應同步化處理。git

須要的依賴以下:github

  • JDK1.8+
  • Netty:4.1.44.Final
  • SpringBoot:2.2.2.RELEASE

簡單分析Netty請求-響應的處理流程

圖中已經忽略了編碼解碼器和其餘入站出站處理器,不一樣顏色的線程表明徹底不相同的線程,不一樣線程之間的處理邏輯是徹底異步,也就是Netty IO線程(n-l-g-1)接收到Server端的消息而且解析完成的時候,用戶調用線程(u-t-1)沒法感知到解析完畢的消息包,那麼這裏要作的事情就是讓用戶調用線程(u-t-1)獲取到Netty IO線程(n-l-g-1)接收而且解析完成的消息包。shell

這裏能夠用一個簡單的例子來講明模擬Client端調用線程等待Netty IO線程的處理結果再同步返回的過程。bootstrap

@Slf4j
public class NettyThreadSyncTest {

    @ToString
    private static class ResponseFuture {

        private final long beginTimestamp = System.currentTimeMillis();
        @Getter
        private final long timeoutMilliseconds;
        @Getter
        private final String requestId;
        @Setter
        @Getter
        private volatile boolean sendRequestSucceed = false;
        @Setter
        @Getter
        private volatile Throwable cause;
        @Getter
        private volatile Object response;

        private final CountDownLatch latch = new CountDownLatch(1);

        public ResponseFuture(String requestId, long timeoutMilliseconds) {
            this.requestId = requestId;
            this.timeoutMilliseconds = timeoutMilliseconds;
        }

        public boolean timeout() {
            return System.currentTimeMillis() - beginTimestamp > timeoutMilliseconds;
        }

        public Object waitResponse(final long timeoutMilliseconds) throws InterruptedException {
            latch.await(timeoutMilliseconds, TimeUnit.MILLISECONDS);
            return response;
        }

        public void putResponse(Object response) throws InterruptedException {
            this.response = response;
            latch.countDown();
        }
    }

    static ExecutorService REQUEST_THREAD;
    static ExecutorService NETTY_IO_THREAD;
    static Callable<Object> REQUEST_TASK;
    static Runnable RESPONSE_TASK;

    static String processBusiness(String name) {
        return String.format("%s say hello!", name);
    }

    private static final Map<String /* request id */, ResponseFuture> RESPONSE_FUTURE_TABLE = Maps.newConcurrentMap();

    @BeforeClass
    public static void beforeClass() throws Exception {
        String requestId = UUID.randomUUID().toString();
        String requestContent = "throwable";
        REQUEST_TASK = () -> {
            try {
                // 3秒沒有獲得響應認爲超時
                ResponseFuture responseFuture = new ResponseFuture(requestId, 3000);
                RESPONSE_FUTURE_TABLE.put(requestId, responseFuture);
                // 這裏忽略發送請求的操做,只打印日誌和模擬耗時1秒
                Thread.sleep(1000);
                log.info("發送請求成功,請求ID:{},請求內容:{}", requestId, requestContent);
                // 更新標記屬性
                responseFuture.setSendRequestSucceed(true);
                // 剩餘2秒等待時間 - 這裏只是粗略計算
                return responseFuture.waitResponse(3000 - 1000);
            } catch (Exception e) {
                log.info("發送請求失敗,請求ID:{},請求內容:{}", requestId, requestContent);
                throw new RuntimeException(e);
            }
        };
        RESPONSE_TASK = () -> {
            String responseContent = processBusiness(requestContent);
            try {
                ResponseFuture responseFuture = RESPONSE_FUTURE_TABLE.get(requestId);
                if (null != responseFuture) {
                    log.warn("處理響應成功,請求ID:{},響應內容:{}", requestId, responseContent);
                    responseFuture.putResponse(responseContent);
                } else {
                    log.warn("請求ID[{}]對應的ResponseFuture不存在,忽略處理", requestId);
                }
            } catch (Exception e) {
                log.info("處理響應失敗,請求ID:{},響應內容:{}", requestId, responseContent);
                throw new RuntimeException(e);
            }
        };
        REQUEST_THREAD = Executors.newSingleThreadExecutor(runnable -> {
            Thread thread = new Thread(runnable, "REQUEST_THREAD");
            thread.setDaemon(true);
            return thread;
        });
        NETTY_IO_THREAD = Executors.newSingleThreadExecutor(runnable -> {
            Thread thread = new Thread(runnable, "NETTY_IO_THREAD");
            thread.setDaemon(true);
            return thread;
        });
    }

    @Test
    public void testProcessSync() throws Exception {
        log.info("異步提交請求處理任務......");
        Future<Object> future = REQUEST_THREAD.submit(REQUEST_TASK);
        // 模擬請求耗時
        Thread.sleep(1500);
        log.info("異步提交響應處理任務......");
        NETTY_IO_THREAD.execute(RESPONSE_TASK);
        // 這裏能夠設置超時
        log.info("同步獲取請求結果:{}", future.get());
        Thread.sleep(Long.MAX_VALUE);
    }
}

執行testProcessSync()方法,控制檯輸出以下:緩存

2020-01-18 13:17:07 [main] INFO  c.t.client.NettyThreadSyncTest - 異步提交請求處理任務......
2020-01-18 13:17:08 [REQUEST_THREAD] INFO  c.t.client.NettyThreadSyncTest - 發送請求成功,請求ID:71f47e27-c17c-458d-b271-4e74fad33a7b,請求內容:throwable
2020-01-18 13:17:09 [main] INFO  c.t.client.NettyThreadSyncTest - 異步提交響應處理任務......
2020-01-18 13:17:09 [NETTY_IO_THREAD] WARN  c.t.client.NettyThreadSyncTest - 處理響應成功,請求ID:71f47e27-c17c-458d-b271-4e74fad33a7b,響應內容:throwable say hello!
2020-01-18 13:17:09 [main] INFO  c.t.client.NettyThreadSyncTest - 同步獲取請求結果:throwable say hello!

上面這個例子裏面的線程同步處理主要參考主流的Netty框架客戶端部分的實現邏輯:RocketMQ(具體是NettyRemotingClient類)以及Redisson(具體是RedisExecutor類),它們就是用這種方式使得異步線程處理轉化爲同步處理。框架

Client端請求響應同步化處理

按照前面的例子,首先新增一個ResponseFuture用於承載已發送但未響應的請求:dom

@ToString
public class ResponseFuture {

    private final long beginTimestamp = System.currentTimeMillis();
    @Getter
    private final long timeoutMilliseconds;
    @Getter
    private final String requestId;
    @Setter
    @Getter
    private volatile boolean sendRequestSucceed = false;
    @Setter
    @Getter
    private volatile Throwable cause;
    @Getter
    private volatile ResponseMessagePacket response;

    private final CountDownLatch latch = new CountDownLatch(1);

    public ResponseFuture(String requestId, long timeoutMilliseconds) {
        this.requestId = requestId;
        this.timeoutMilliseconds = timeoutMilliseconds;
    }

    public boolean timeout() {
        return System.currentTimeMillis() - beginTimestamp > timeoutMilliseconds;
    }

    public ResponseMessagePacket waitResponse(final long timeoutMilliseconds) throws InterruptedException {
        latch.await(timeoutMilliseconds, TimeUnit.MILLISECONDS);
        return response;
    }

    public void putResponse(ResponseMessagePacket response) throws InterruptedException {
        this.response = response;
        latch.countDown();
    }
}

接着須要新增一個HashMap去緩存這些返送成功可是未獲得響應處理的ResponseFuture異步

Map<String /* request id */, ResponseFuture> RESPONSE_FUTURE_TABLE = Maps.newConcurrentMap();

這裏的KEY選用requestId,而requestId以前已經定義爲UUID,確保每一個請求不會重複。爲了簡單起見,目前全部的邏輯都編寫在契約代理工廠ContractProxyFactory,添加下面的功能:ide

  • 添加一個同步發送方法sendRequestSync()處理消息包的發送和同步響應,RequestMessagePacket轉換爲調用代理目標方法返回值類型的邏輯暫時也編寫在此方法中。
  • 添加一個核心線程數量爲邏輯核心數量 * 2的線程池用於處理請求。
  • 添加一個單線程的調度線程池用於定時清理那些過時的ResponseFuture,清理方法爲scanResponseFutureTable()

修改後的ContractProxyFactory以下:

@Slf4j
public class ContractProxyFactory {

    private static final RequestArgumentExtractor EXTRACTOR = new DefaultRequestArgumentExtractor();
    private static final ConcurrentMap<Class<?>, Object> CACHE = Maps.newConcurrentMap();
    static final ConcurrentMap<String /* request id */, ResponseFuture> RESPONSE_FUTURE_TABLE = Maps.newConcurrentMap();
    // 定義請求的最大超時時間爲3秒
    private static final long REQUEST_TIMEOUT_MS = 3000;
    private static final ExecutorService EXECUTOR;
    private static final ScheduledExecutorService CLIENT_HOUSE_KEEPER;
    private static final Serializer SERIALIZER = FastJsonSerializer.X;


    @SuppressWarnings("unchecked")
    public static <T> T ofProxy(Class<T> interfaceKlass) {
        // 緩存契約接口的代理類實例
        return (T) CACHE.computeIfAbsent(interfaceKlass, x ->
                Proxy.newProxyInstance(interfaceKlass.getClassLoader(), new Class[]{interfaceKlass}, (target, method, args) -> {
                    RequestArgumentExtractInput input = new RequestArgumentExtractInput();
                    input.setInterfaceKlass(interfaceKlass);
                    input.setMethod(method);
                    RequestArgumentExtractOutput output = EXTRACTOR.extract(input);
                    // 封裝請求參數
                    RequestMessagePacket packet = new RequestMessagePacket();
                    packet.setMagicNumber(ProtocolConstant.MAGIC_NUMBER);
                    packet.setVersion(ProtocolConstant.VERSION);
                    packet.setSerialNumber(SerialNumberUtils.X.generateSerialNumber());
                    packet.setMessageType(MessageType.REQUEST);
                    packet.setInterfaceName(output.getInterfaceName());
                    packet.setMethodName(output.getMethodName());
                    packet.setMethodArgumentSignatures(output.getMethodArgumentSignatures().toArray(new String[0]));
                    packet.setMethodArguments(args);
                    Channel channel = ClientChannelHolder.CHANNEL_REFERENCE.get();
                    return sendRequestSync(channel, packet, method.getReturnType());
                }));
    }

    /**
     * 同步發送請求
     *
     * @param channel channel
     * @param packet  packet
     * @return Object
     */
    static Object sendRequestSync(Channel channel, RequestMessagePacket packet, Class<?> returnType) {
        long beginTimestamp = System.currentTimeMillis();
        ResponseFuture responseFuture = new ResponseFuture(packet.getSerialNumber(), REQUEST_TIMEOUT_MS);
        RESPONSE_FUTURE_TABLE.put(packet.getSerialNumber(), responseFuture);
        try {
            // 獲取到承載響應Packet的Future
            Future<ResponseMessagePacket> packetFuture = EXECUTOR.submit(() -> {
                channel.writeAndFlush(packet).addListener((ChannelFutureListener)
                        future -> responseFuture.setSendRequestSucceed(true));
                return responseFuture.waitResponse(REQUEST_TIMEOUT_MS - (System.currentTimeMillis() - beginTimestamp));
            });
            ResponseMessagePacket responsePacket = packetFuture.get(
                    REQUEST_TIMEOUT_MS - (System.currentTimeMillis() - beginTimestamp), TimeUnit.MILLISECONDS);
            if (null == responsePacket) {
                // 超時致使響應包獲取失敗
                throw new SendRequestException(String.format("ResponseMessagePacket獲取超時,請求ID:%s", packet.getSerialNumber()));
            } else {
                ByteBuf payload = (ByteBuf) responsePacket.getPayload();
                byte[] bytes = ByteBufferUtils.X.readBytes(payload);
                return SERIALIZER.decode(bytes, returnType);
            }
        } catch (Exception e) {
            log.error("同步發送請求異常,請求包:{}", JSON.toJSONString(packet), e);
            if (e instanceof RuntimeException) {
                throw (RuntimeException) e;
            } else {
                throw new SendRequestException(e);
            }
        }
    }

    static void scanResponseFutureTable() {
        log.info("開始執行ResponseFutureTable清理任務......");
        Iterator<Map.Entry<String, ResponseFuture>> iterator = RESPONSE_FUTURE_TABLE.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<String, ResponseFuture> entry = iterator.next();
            ResponseFuture responseFuture = entry.getValue();
            if (responseFuture.timeout()) {
                iterator.remove();
                log.warn("移除過時的請求ResponseFuture,請求ID:{}", entry.getKey());
            }
        }
        log.info("執行ResponseFutureTable清理任務結束......");
    }

    static {
        int n = Runtime.getRuntime().availableProcessors();
        EXECUTOR = new ThreadPoolExecutor(n * 2, n * 2, 0, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(50), runnable -> {
            Thread thread = new Thread(runnable);
            thread.setDaemon(true);
            thread.setName("CLIENT_REQUEST_EXECUTOR");
            return thread;
        });
        CLIENT_HOUSE_KEEPER = new ScheduledThreadPoolExecutor(1, runnable -> {
            Thread thread = new Thread(runnable);
            thread.setDaemon(true);
            thread.setName("CLIENT_HOUSE_KEEPER");
            return thread;
        });
        CLIENT_HOUSE_KEEPER.scheduleWithFixedDelay(ContractProxyFactory::scanResponseFutureTable, 5, 5, TimeUnit.SECONDS);
    }
}

接着添加一個客戶端入站處理器,用於經過reuqestId匹配目標ResponseFuture實例,同時設置ResponseFuture實例中的response屬性爲響應包,同時釋放閉鎖:

@Slf4j
public class ClientHandler extends SimpleChannelInboundHandler<ResponseMessagePacket> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ResponseMessagePacket packet) throws Exception {
        log.info("接收到響應包,內容:{}", JSON.toJSONString(packet));
        ResponseFuture responseFuture = ContractProxyFactory.RESPONSE_FUTURE_TABLE.get(packet.getSerialNumber());
        if (null != responseFuture) {
            responseFuture.putResponse(packet);
        } else {
            log.warn("接收響應包查詢ResponseFuture不存在,請求ID:{}", packet.getSerialNumber());
        }
    }
}

最後,客戶端啓動類ClientApplication中添加ClientHandlerNetty的處理器流水線中便可:

bootstrap.handler(new ChannelInitializer<SocketChannel>() {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
        ch.pipeline().addLast(new LengthFieldPrepender(4));
        ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
        ch.pipeline().addLast(new RequestMessagePacketEncoder(FastJsonSerializer.X));
        ch.pipeline().addLast(new ResponseMessagePacketDecoder());
        ch.pipeline().addLast(new ClientHandler());
    }
});

先運行以前- 《基於Netty和SpringBoot實現一個輕量級RPC框架-Server篇》中編寫好的ServerApplication,再啓動ClientApplication,日誌輸出以下:

// 服務端
2020-01-18 14:32:59 [nioEventLoopGroup-3-2] INFO  club.throwable.server.ServerHandler - 服務端接收到:RequestMessagePacket(interfaceName=club.throwable.contract.HelloService, methodName=sayHello, methodArgumentSignatures=[java.lang.String], methodArguments=[PooledUnsafeDirectByteBuf(ridx: 0, widx: 11, cap: 11/144)])
2020-01-18 14:32:59 [nioEventLoopGroup-3-2] INFO  club.throwable.server.ServerHandler - 查找目標實現方法成功,目標類:club.throwable.server.contract.DefaultHelloService,宿主類:club.throwable.server.contract.DefaultHelloService,宿主方法:sayHello
2020-01-18 14:32:59 [nioEventLoopGroup-3-2] INFO  club.throwable.server.ServerHandler - 服務端輸出:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":"\"throwable say hello!\"","serialNumber":"21d131d26fc74f91b4691e0207826b90","version":1}

// 客戶端
2020-01-18 14:32:59 [nioEventLoopGroup-2-1] INFO  club.throwable.client.ClientHandler - 接收到響應包,內容:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":{"contiguous":true,"direct":true,"readOnly":false,"readable":true,"writable":false},"serialNumber":"21d131d26fc74f91b4691e0207826b90","version":1}
2020-01-18 14:32:59 [main] INFO  c.throwable.client.ClientApplication - HelloService[throwable]調用結果:"throwable say hello!"
2020-01-18 14:33:04 [CLIENT_HOUSE_KEEPER] INFO  c.t.client.ContractProxyFactory - 開始執行ResponseFutureTable清理任務......
2020-01-18 14:33:04 [CLIENT_HOUSE_KEEPER] WARN  c.t.client.ContractProxyFactory - 移除過時的請求ResponseFuture,請求ID:21d131d26fc74f91b4691e0207826b90

可見異步線程模型已經被改造爲同步化,如今能夠經過契約接口經過RPC同步調用服務端。

小結

Client端的請求-響應同步化處理基本改造完畢,到此爲止,一個RPC框架大體已經完成,接下來會對Client端和Server端進行一些改造,讓契約相關組件託管到IOC容器,實現契約接口自動注入等等功能。

Demo項目地址:

(本文完e-a-20200118 c-2-d)

相關文章
相關標籤/搜索