由於設備的通訊協議準備採用
protobuf
,因此準備這篇protobuf
的使用入門,golang
做爲客戶端,java
做爲服務端,這才能真正體現出protobuf
的無關語言特性。java本文采用
protobuf2
,注重於如何快速入門使用,並不會涉及到具體的細節知識點。git
golang
做爲客戶端,java
做爲服務端,protobuf2
爲二者的通訊協議格式。github
protobuf2簡介golang
詳細說明spring
helloworld.protoshell
syntax = "proto2"; package proto; message ProtocolMessage { message SearchRequest{ required string name = 1; optional int32 search = 2 ; } message ActionRequest{ required string name = 1; optional int32 action = 2 ; } message SearchResponse{ required string name = 1; optional int32 search = 2 ; } message ActionResponse{ required string name = 1; optional int32 action = 2 ; } optional SearchRequest searchRequest = 1; optional ActionRequest actionRequest = 2; optional SearchResponse searchResponse = 3; optional ActionResponse actionResponse = 4; }
SearchRequest
和SearchResponse
爲對應的請求和相應message;ActionRequest
和ActionResponse
爲對應的請求和相應message;netty
框架,限制了只能接受一個message進行編碼解碼,因此把SearchRequest
、SearchResponse
、ActionRequest
和ActionResponse
都內嵌到ProtocolMessage
中,經過對ProtocolMessage
編碼解碼進行數據交互。client_proto/ ├── api │ ├── proto # 存放proto協議文件以及生產的pd.go文件 │ ├── helloworld.pb.go │ └── helloworld.proto ├── cmd │ ├── main.go │ ├── util │ └── util.go
採用go mod
進行開發apache
安裝protobootstrap
自行百度......api
在.proto文件處,輸入protoc --go_out=./ helloworld.proto
服務器
便可生成helloworld.pb.go
文件
package main import ( "github.com/gin-gonic/gin" proto "grpc/api/grpc_proto" "grpc/cmd/demo3/util" "net/http" "time" ) func init() { util.InitTransfer() } func main() { router := gin.Default() // search 測試 router.GET("/search", func(c *gin.Context) { name := "search" search := int32(12) message := &proto.ProtocolMessage{ SearchRequest:&proto.ProtocolMessage_SearchRequest{ Name:&name, Search:&search, }, } if err := util.G_transfer.SendMsg(message); err != nil { c.JSON(500, gin.H{ "err": err.Error(), }) return } if err := util.G_transfer.ReadResponse(message); err != nil { c.JSON(500, gin.H{ "err": err.Error(), }) return } c.JSON(200, gin.H{ "message": message.SearchResponse.Name, }) }) // action測試 router.GET("/action", func(c *gin.Context) { name := "action" action := int32(34) message := &proto.ProtocolMessage{ ActionRequest: &proto.ProtocolMessage_ActionRequest{ Name: &name, Action: &action, }, } if err := util.G_transfer.SendMsg(message); err != nil { c.JSON(500, gin.H{ "err": err.Error(), }) } if err := util.G_transfer.ReadResponse(message); err != nil { c.JSON(500, gin.H{ "err": err.Error(), }) } c.JSON(200, gin.H{ "message": message.ActionResponse.Name, }) }) ReadTimeout := time.Duration(60) * time.Second WriteTimeout := time.Duration(60) * time.Second s := &http.Server{ Addr: ":8090", Handler: router, ReadTimeout: ReadTimeout, WriteTimeout: WriteTimeout, MaxHeaderBytes: 1 << 20, } s.ListenAndServe() }
package util import ( "encoding/binary" "errors" "github.com/gogo/protobuf/proto" grpc_proto "grpc/api/grpc_proto" "net" ) var ( G_transfer *Transfer ) func InitTransfer() { var ( pTCPAddr *net.TCPAddr conn net.Conn err error ) if pTCPAddr, err = net.ResolveTCPAddr("tcp", "127.0.0.1:3210"); err != nil { return } if conn, err = net.DialTCP("tcp", nil, pTCPAddr); err != nil { return } // 定義 Transfer 指針變量 G_transfer = &Transfer{ Conn: conn, } } // 聲明 Transfer 結構體 type Transfer struct { Conn net.Conn // 鏈接 Buf [1024 * 2]byte // 傳輸時,使用的緩衝 } // 獲取並解析服務器的消息 func (transfer *Transfer) ReadResponse(response *grpc_proto.ProtocolMessage) (err error) { _, err = transfer.Conn.Read(transfer.Buf[:4]) if err != nil { return } // 根據 buf[:4] 轉成一個 uint32 類型 var pkgLen uint32 pkgLen = binary.BigEndian.Uint32(transfer.Buf[:4]) //根據pkglen 讀取消息內容 n, err := transfer.Conn.Read(transfer.Buf[:pkgLen]) if n != int(pkgLen) || err != nil { return } if err = proto.Unmarshal(transfer.Buf[:pkgLen], response); err != nil { return } return } // 發送消息到服務器 func (transfer *Transfer) SendMsg(action *grpc_proto.ProtocolMessage) (err error) { var ( sendBytes []byte readLen int ) //sendBytes, ints := action.Descriptor() if sendBytes, err = proto.Marshal(action); err != nil { return } pkgLen := uint32(len(sendBytes)) var buf [4]byte binary.BigEndian.PutUint32(buf[:4],pkgLen) if readLen, err = transfer.Conn.Write(buf[:4]); readLen != 4 && err != nil { if readLen == 0 { return errors.New("發送數據長度發生異常,長度爲0") } return } // 發送消息 if readLen, err = transfer.Conn.Write(sendBytes); err != nil { if readLen == 0 { return errors.New("檢查到服務器關閉,客戶端也關閉") } return } return }
netty
框架中定義的編碼解碼器決定的。server_proto/ ├── src │ ├── main │ ├── java │ ├── com │ ├── dust │ ├── proto_server │ ├── config │ └── NettyConfig.java │ ├── netty │ └── NettyServerListener.java │ └── SocketServerHandler.java │ ├── proto │ └── Helloworld.java │ └── helloworld.proto # proto配置文件 │ └── Application.java # 啓動配置類 │ ├── resources │ └── application.yml #配置文件 │ ├── test └── pom.xml # maven配置文件
採用springBoot
+netty
+maven
開發
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.6.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.dust</groupId> <artifactId>proto_server</artifactId> <version>0.0.1-SNAPSHOT</version> <name>proto_server</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- protobuf依賴--> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.8.0</version> </dependency> <dependency> <groupId>com.googlecode.protobuf-java-format</groupId> <artifactId>protobuf-java-format</artifactId> <version>1.2</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.19.Final</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
protobuf-java
的版本爲3.8.0
,必須和安裝proto.exe
的版本保持一致。# netty配置 netty: # 端口號 port: 3210 # 最大線程數 maxThreads: 1024 # 數據包的最大長度 max_frame_length: 65535
package com.dust.proto_server.config; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; @Data @Component @ConfigurationProperties(prefix = "netty") public class NettyConfig { private int port; }
protoc --java_out=./ helloworld.proto
Helloworld.java
文件package com.dust.proto_server.netty; import com.dust.proto_server.proto.Helloworld; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @Component @ChannelHandler.Sharable public class SocketServerHandler extends ChannelInboundHandlerAdapter { private static final Logger LOGGER = LoggerFactory.getLogger(SocketServerHandler.class); public ChannelGroup CHANNEL_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override public void handlerAdded(ChannelHandlerContext ctx){ Channel channel = ctx.channel(); LOGGER.info(channel.id().toString()+"加入"); CHANNEL_GROUP.add(channel); } @Override public void handlerRemoved(ChannelHandlerContext ctx){ Channel channel = ctx.channel(); LOGGER.info(channel.id().toString()+"退出"); CHANNEL_GROUP.remove(channel); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } // @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { LOGGER.info("開始讀取客戶端發送過來的數據"); Helloworld.ProtocolMessage protocolMessage = (Helloworld.ProtocolMessage) msg; Helloworld.ProtocolMessage.Builder builder = Helloworld.ProtocolMessage.newBuilder(); if (protocolMessage.getSearchRequest().getSerializedSize() != 0) { Helloworld.ProtocolMessage.SearchRequest searchRequest = protocolMessage.getSearchRequest(); LOGGER.info("searchRequest--{}",searchRequest); Helloworld.ProtocolMessage.SearchResponse searchResponse = Helloworld.ProtocolMessage.SearchResponse.newBuilder().setName("i am SearchResponse").setSearch(45).build(); builder.setSearchResponse(searchResponse); } else if (protocolMessage.getActionRequest().getSerializedSize() != 0) { Helloworld.ProtocolMessage.ActionRequest actionRequest = protocolMessage.getActionRequest(); LOGGER.info("actionRequest--{}",actionRequest); Helloworld.ProtocolMessage.ActionResponse actionResponse = Helloworld.ProtocolMessage.ActionResponse.newBuilder().setName("i am ActionResponse").setAction(67).build(); builder.setActionResponse(actionResponse); } Helloworld.ProtocolMessage message = builder.build(); // 發送數據長度 ctx.channel().writeAndFlush(message.toByteArray().length); // 發送數據自己 ctx.channel().writeAndFlush(message); } }
package com.dust.proto_server.netty; import com.dust.proto_server.config.NettyConfig; import com.dust.proto_server.proto.Helloworld; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; import javax.annotation.Resource; @Component public class NettyServerListener { /** * NettyServerListener 日誌輸出器 * */ private static final Logger LOGGER = LoggerFactory.getLogger(NettyServerListener.class); /** * 建立bootstrap */ ServerBootstrap serverBootstrap = new ServerBootstrap(); /** * BOSS */ EventLoopGroup boss = new NioEventLoopGroup(); /** * Worker */ EventLoopGroup work = new NioEventLoopGroup(); @Resource private SocketServerHandler socketServerHandler; /** * NETT服務器配置類 */ @Resource private NettyConfig nettyConfig; /** * 關閉服務器方法 */ @PreDestroy public void close() { LOGGER.info("關閉服務器...."); //優雅退出 boss.shutdownGracefully(); work.shutdownGracefully(); } /** * 開啓及服務線程 */ public void start() { // 從配置文件中(application.yml)獲取服務端監聽端口號 int port = nettyConfig.getPort(); serverBootstrap.group(boss, work).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 負責經過4字節Header指定的Body長度將消息切割 pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)); // 負責將frameDecoder處理後的完整的一條消息的protobuf字節碼轉成ProtocolMessage對象 pipeline.addLast("protobufDecoder", new ProtobufDecoder(Helloworld.ProtocolMessage.getDefaultInstance())); // 負責將寫入的字節碼加上4字節Header前綴來指定Body長度 pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); // 負責將ProtocolMessage對象轉成protobuf字節碼 pipeline.addLast("protobufEncoder", new ProtobufEncoder()); pipeline.addLast(socketServerHandler); } }).option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)); try { LOGGER.info("netty服務器在[{}]端口啓動監聽", port); ChannelFuture f = serverBootstrap.bind(port).sync(); f.channel().closeFuture().sync(); } catch (InterruptedException e) { LOGGER.info("[出現異常] 釋放資源"); boss.shutdownGracefully(); work.shutdownGracefully(); } } }
frameDecoder
和protobufDecoder
對應的handler用於解碼Protobuf package數據包,他們都是Upstream Handles:先處理長度,而後再處理數據自己;
frameEncoder
和protobufEncoder
對應的handler用於編碼Protobuf package數據包,他們都是Downstream Handles;
此外還有一個handler,是一個自定義的Upstream Handles,用於開發者從網絡數據中解析獲得本身所需的數據socketServerHandler
;
上例Handles的執行順序爲
upstream:frameDecoder,protobufDecoder,handler //解碼從Socket收到的數據 downstream:frameEncoder,protobufEncoder //編碼要經過Socket發送出去的數據
package com.dust.proto_server; import com.dust.proto_server.netty.NettyServerListener; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import javax.annotation.Resource; @SpringBootApplication public class Application implements CommandLineRunner { public static void main(String[] args) { SpringApplication.run(Application.class, args); } @Resource private NettyServerListener nettyServerListener; @Override public void run(String... args) throws Exception { nettyServerListener.start(); } }
先啓動服務端,再啓動客戶端
search測試