Vert.x 優雅解決Tcp分包問題

優雅解決Tcp分包問題

vert.x 版本java

<vertx.version>3.8.0</vertx.version>
複製代碼

Server 端

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.NetServerOptions;
import io.vertx.ext.eventbus.bridge.tcp.impl.protocol.FrameHelper;
import io.vertx.ext.eventbus.bridge.tcp.impl.protocol.FrameParser;

public class TcpServer extends AbstractVerticle {

  public static void main(String[] args) {
    Vertx.vertx().deployVerticle(new TcpServer());
  }
  @Override
  public void start() throws Exception {
    super.start();

    vertx.createNetServer(new NetServerOptions().setTcpKeepAlive(true))
      .connectHandler(conn -> {
        final FrameParser parser = new FrameParser(res -> {
          if (res.succeeded()) {
            JsonObject body = res.result();
            System.out.println(body);
            FrameHelper.writeFrame(body.put("server", true), conn);
          } else {
            res.cause().printStackTrace();
          }
        });

        conn.handler(parser);
        conn.endHandler(end -> System.out.println("conn[" + conn.writeHandlerID() + "] end"));
        conn.exceptionHandler(ex -> System.out.println("conn error: " + ex.getMessage()));

      }).listen(9092, s -> {
      if (s.succeeded()) {
        System.out.println("tcp server ok port: 9092");
      } else {
        s.cause().printStackTrace();
      }
    });
  }
}
複製代碼

之前都是本身根據分隔符來解析,後來發現FrameParser FrameHelper這兩個好東西,數據包結構式這樣的 len+json 跟一下FrameHelper的源碼就能看到,相應的 FrameParser 是根據長度來解包。json

Client 端

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.NetSocket;
import io.vertx.ext.eventbus.bridge.tcp.impl.protocol.FrameHelper;
import io.vertx.ext.eventbus.bridge.tcp.impl.protocol.FrameParser;

import java.time.LocalDateTime;

public class TcpClient extends AbstractVerticle {

  public static void main(String[] args) {
    Vertx.vertx().deployVerticle(new TcpClient());
  }

  private NetSocket netSocket;

  @Override
  public void start() throws Exception {
    super.start();
    vertx.createNetClient(new NetClientOptions().setTcpKeepAlive(true)).connect(9092, "127.0.0.1", s -> {
      if (s.succeeded()) {
        netSocket = s.result();
        final FrameParser parser = new FrameParser(res -> {
          if (res.succeeded()) {
            JsonObject body = res.result();
            System.out.println(body);
          } else {
            res.cause().printStackTrace();
          }
        });

        netSocket.handler(parser);
        netSocket.endHandler(end -> System.out.println("client[" + netSocket.writeHandlerID() + "] end"));
        netSocket.exceptionHandler(ex -> System.out.println("error client: " + ex.getMessage()));
      } else {
        s.cause().printStackTrace();
      }
    });

    vertx.setPeriodic(6000L, t -> {
      if (netSocket != null) {
        FrameHelper.writeFrame(new JsonObject()
            .put("clientId", netSocket.writeHandlerID())
            .put("time", LocalDateTime.now().toString()),
          netSocket);
      }
    });
  }
}
複製代碼

pom.xml

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-core</artifactId>
</dependency>
<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-tcp-eventbus-bridge</artifactId>
</dependency>
複製代碼
相關文章
相關標籤/搜索