java和golang經過protobuf協議相互通訊

由於設備的通訊協議準備採用protobuf,因此準備這篇protobuf的使用入門,golang做爲客戶端,java做爲服務端,這才能真正體現出protobuf的無關語言特性。java

本文采用protobuf2,注重於如何快速入門使用,並不會涉及到具體的細節知識點。git


總體結構說明

golang做爲客戶端,java做爲服務端,protobuf2爲二者的通訊協議格式。github


protobuf2文件

  • protobuf2簡介golang

    詳細說明spring

  • helloworld.protoshell

    syntax = "proto2";
    
    package proto;
    
    message ProtocolMessage {
        message SearchRequest{
            required string name = 1;
            optional int32 search = 2 ;
        }
    
        message ActionRequest{
            required string name = 1;
            optional int32 action = 2 ;
        }
    
        message SearchResponse{
            required string name = 1;
            optional int32 search = 2 ;
        }
    
        message ActionResponse{
            required string name = 1;
            optional int32 action = 2 ;
        }
    
    
        optional SearchRequest searchRequest = 1;
        optional ActionRequest actionRequest = 2;
        optional SearchResponse searchResponse = 3;
        optional ActionResponse actionResponse = 4;
    }
    • SearchRequestSearchResponse爲對應的請求和相應message;
    • ActionRequestActionResponse爲對應的請求和相應message;
    • 因爲服務端使用netty框架,限制了只能接受一個message進行編碼解碼,因此把SearchRequestSearchResponseActionRequestActionResponse都內嵌到ProtocolMessage中,經過對ProtocolMessage編碼解碼進行數據交互。

golang客戶端

目錄結構

client_proto/
├── api
│   ├── proto # 存放proto協議文件以及生產的pd.go文件
│   	├── helloworld.pb.go
│   	└── helloworld.proto
├── cmd
│   	├── main.go
│   	├── util
│   		└── util.go

採用go mod 進行開發apache

生成pb.go文件

  • 安裝protobootstrap

    自行百度......api

  • 在.proto文件處,輸入protoc --go_out=./ helloworld.proto服務器

  • 便可生成helloworld.pb.go文件

main.go

package main

import (
	"github.com/gin-gonic/gin"
	proto "grpc/api/grpc_proto"
	"grpc/cmd/demo3/util"
	"net/http"
	"time"
)

func init()  {
	util.InitTransfer()
}

func main() {
	router := gin.Default()
	// search 測試
	router.GET("/search", func(c *gin.Context) {
		name := "search"
		search := int32(12)
		message := &proto.ProtocolMessage{
			SearchRequest:&proto.ProtocolMessage_SearchRequest{
				Name:&name,
				Search:&search,
			},
		}

		if err := util.G_transfer.SendMsg(message); err != nil {
			c.JSON(500, gin.H{
				"err": err.Error(),
			})
			return
		}


		if err := util.G_transfer.ReadResponse(message); err != nil {
			c.JSON(500, gin.H{
				"err": err.Error(),
			})

			return
		}

		c.JSON(200, gin.H{
			"message": message.SearchResponse.Name,
		})
	})

	// action測試
	router.GET("/action", func(c *gin.Context) {
		name := "action"
		action := int32(34)
		message := &proto.ProtocolMessage{
			ActionRequest: &proto.ProtocolMessage_ActionRequest{
				Name:   &name,
				Action: &action,
			},
		}

		if err := util.G_transfer.SendMsg(message); err != nil {
			c.JSON(500, gin.H{
				"err": err.Error(),
			})
		}

		if err := util.G_transfer.ReadResponse(message); err != nil {
			c.JSON(500, gin.H{
				"err": err.Error(),
			})
		}

		c.JSON(200, gin.H{
			"message": message.ActionResponse.Name,
		})
	})


	ReadTimeout := time.Duration(60) * time.Second
	WriteTimeout := time.Duration(60) * time.Second

	s := &http.Server{
		Addr:          ":8090",
		Handler:        router,
		ReadTimeout:    ReadTimeout,
		WriteTimeout:   WriteTimeout,
		MaxHeaderBytes: 1 << 20,
	}

	s.ListenAndServe()
}

util.go

package util

import (
	"encoding/binary"
	"errors"
	"github.com/gogo/protobuf/proto"
	grpc_proto "grpc/api/grpc_proto"
	"net"
)
var (
	G_transfer *Transfer
)

func InitTransfer()  {
	var (
		pTCPAddr *net.TCPAddr
		conn net.Conn
		err error
	)
	if pTCPAddr, err = net.ResolveTCPAddr("tcp", "127.0.0.1:3210"); err != nil {
		return
	}

	if conn, err = net.DialTCP("tcp", nil, pTCPAddr); err != nil {
		return
	}

	// 定義 Transfer 指針變量
	G_transfer = &Transfer{
		Conn:  conn,
	}
}

// 聲明 Transfer 結構體
type Transfer struct {
	Conn          net.Conn       // 鏈接
	Buf           [1024 * 2]byte // 傳輸時,使用的緩衝
}


// 獲取並解析服務器的消息
func (transfer *Transfer) ReadResponse(response *grpc_proto.ProtocolMessage) (err error) {
	_, err = transfer.Conn.Read(transfer.Buf[:4])
	if err != nil {
		return
	}

	// 根據 buf[:4] 轉成一個 uint32 類型
	var pkgLen uint32
	pkgLen = binary.BigEndian.Uint32(transfer.Buf[:4])
	//根據pkglen 讀取消息內容
	n, err := transfer.Conn.Read(transfer.Buf[:pkgLen])
	if n != int(pkgLen) || err != nil {
		return
	}

	if err = proto.Unmarshal(transfer.Buf[:pkgLen], response); err != nil {
		return
	}
	return
}

// 發送消息到服務器
func (transfer *Transfer) SendMsg(action *grpc_proto.ProtocolMessage) (err error) {
	var (
		sendBytes []byte
		readLen   int
	)
	//sendBytes, ints := action.Descriptor()
	if sendBytes, err = proto.Marshal(action); err != nil {
		return
	}

	pkgLen := uint32(len(sendBytes))
	var buf [4]byte
	binary.BigEndian.PutUint32(buf[:4],pkgLen)

	if readLen, err = transfer.Conn.Write(buf[:4]); readLen != 4 && err != nil {
		if readLen == 0 {
			return errors.New("發送數據長度發生異常,長度爲0")
		}
		return
	}
	// 發送消息
	if readLen, err = transfer.Conn.Write(sendBytes); err != nil {
		if readLen == 0 {
			return errors.New("檢查到服務器關閉,客戶端也關閉")
		}
		return
	}
	return
}
  • 這裏發送消息和讀取消息都須要先發送/解析數據的長度,而後發送/解析數據自己;
  • 這裏與服務端怎麼樣解析/發送數據有關,這是因爲netty框架中定義的編碼解碼器決定的。

java服務端

目錄結構

server_proto/
├── src
│   ├── main 
│   	├── java
│   		├── com
│   			├── dust
│   				├── proto_server
│   					├── config
│   						└── NettyConfig.java 
│   					├── netty
│   						└── NettyServerListener.java 
│   						└── SocketServerHandler.java 
│   					├── proto
│   						└── Helloworld.java 
│   						└── helloworld.proto # proto配置文件
│   					└── Application.java # 啓動配置類
│   	├── resources
│   		└── application.yml #配置文件
│   ├── test
└── pom.xml # maven配置文件

採用springBoot+netty+maven開發

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.6.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.dust</groupId>
    <artifactId>proto_server</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>proto_server</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- protobuf依賴-->
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>3.8.0</version>
        </dependency>

        <dependency>
            <groupId>com.googlecode.protobuf-java-format</groupId>
            <artifactId>protobuf-java-format</artifactId>
            <version>1.2</version>
        </dependency>

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.19.Final</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
  • 注意:protobuf-java的版本爲3.8.0,必須和安裝proto.exe的版本保持一致。

application.yml

# netty配置
netty:
  # 端口號
  port: 3210
  # 最大線程數
  maxThreads: 1024
  # 數據包的最大長度
  max_frame_length: 65535

NettyConfig.java

package com.dust.proto_server.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Data
@Component
@ConfigurationProperties(prefix = "netty")
public class NettyConfig {
    private int port;
}

生成Helloworld.java

  • 在.proto文件處,輸入protoc --java_out=./ helloworld.proto
  • 便可生成Helloworld.java文件

SocketServerHandler.java

package com.dust.proto_server.netty;

import com.dust.proto_server.proto.Helloworld;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;


@Component
@ChannelHandler.Sharable
public class SocketServerHandler extends ChannelInboundHandlerAdapter {

    private static final Logger LOGGER = LoggerFactory.getLogger(SocketServerHandler.class);

    public ChannelGroup CHANNEL_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    public void handlerAdded(ChannelHandlerContext ctx){
        Channel channel = ctx.channel();
        LOGGER.info(channel.id().toString()+"加入");
        CHANNEL_GROUP.add(channel);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx){
        Channel channel = ctx.channel();
        LOGGER.info(channel.id().toString()+"退出");
        CHANNEL_GROUP.remove(channel);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
//
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        LOGGER.info("開始讀取客戶端發送過來的數據");
        Helloworld.ProtocolMessage protocolMessage = (Helloworld.ProtocolMessage) msg;
        Helloworld.ProtocolMessage.Builder builder = Helloworld.ProtocolMessage.newBuilder();

        if (protocolMessage.getSearchRequest().getSerializedSize() != 0) {
            Helloworld.ProtocolMessage.SearchRequest searchRequest = protocolMessage.getSearchRequest();
            LOGGER.info("searchRequest--{}",searchRequest);

            Helloworld.ProtocolMessage.SearchResponse searchResponse = Helloworld.ProtocolMessage.SearchResponse.newBuilder().setName("i am SearchResponse").setSearch(45).build();
            builder.setSearchResponse(searchResponse);

        } else if (protocolMessage.getActionRequest().getSerializedSize() != 0) {
            Helloworld.ProtocolMessage.ActionRequest actionRequest = protocolMessage.getActionRequest();
            LOGGER.info("actionRequest--{}",actionRequest);

            Helloworld.ProtocolMessage.ActionResponse actionResponse = Helloworld.ProtocolMessage.ActionResponse.newBuilder().setName("i am ActionResponse").setAction(67).build();
            builder.setActionResponse(actionResponse);
        }

        Helloworld.ProtocolMessage message = builder.build();
        // 發送數據長度
        ctx.channel().writeAndFlush(message.toByteArray().length);
         // 發送數據自己
        ctx.channel().writeAndFlush(message);
    }
}

NettyServerListener.java

package com.dust.proto_server.netty;

import com.dust.proto_server.config.NettyConfig;

import com.dust.proto_server.proto.Helloworld;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.annotation.PreDestroy;
import javax.annotation.Resource;

@Component
public class NettyServerListener {
    /**
     * NettyServerListener 日誌輸出器
     *
     */
    private static final Logger LOGGER = LoggerFactory.getLogger(NettyServerListener.class);
    /**
     * 建立bootstrap
     */
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    /**
     * BOSS
     */
    EventLoopGroup boss = new NioEventLoopGroup();
    /**
     * Worker
     */
    EventLoopGroup work = new NioEventLoopGroup();

    @Resource
    private SocketServerHandler socketServerHandler;

    /**
     * NETT服務器配置類
     */
    @Resource
    private NettyConfig nettyConfig;

    /**
     * 關閉服務器方法
     */
    @PreDestroy
    public void close() {
        LOGGER.info("關閉服務器....");
        //優雅退出
        boss.shutdownGracefully();
        work.shutdownGracefully();
    }

    /**
     * 開啓及服務線程
     */
    public void start() {
        // 從配置文件中(application.yml)獲取服務端監聽端口號
        int port = nettyConfig.getPort();
        serverBootstrap.group(boss, work).channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        // 負責經過4字節Header指定的Body長度將消息切割
                        pipeline.addLast("frameDecoder",
                                new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4));
                        // 負責將frameDecoder處理後的完整的一條消息的protobuf字節碼轉成ProtocolMessage對象
                        pipeline.addLast("protobufDecoder",
                                new ProtobufDecoder(Helloworld.ProtocolMessage.getDefaultInstance()));
                        // 負責將寫入的字節碼加上4字節Header前綴來指定Body長度
                        pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
                        // 負責將ProtocolMessage對象轉成protobuf字節碼
                        pipeline.addLast("protobufEncoder", new ProtobufEncoder());

                        pipeline.addLast(socketServerHandler);
                    }

                }).option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.SO_BACKLOG, 100)
                .handler(new LoggingHandler(LogLevel.INFO));
        try {
            LOGGER.info("netty服務器在[{}]端口啓動監聽", port);
            ChannelFuture f = serverBootstrap.bind(port).sync();
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            LOGGER.info("[出現異常] 釋放資源");
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }
    }
}
  • 這個類就定義服務端是怎麼樣處理接受和發送數據的;

  • frameDecoderprotobufDecoder對應的handler用於解碼Protobuf package數據包,他們都是Upstream Handles:先處理長度,而後再處理數據自己;

  • frameEncoderprotobufEncoder對應的handler用於編碼Protobuf package數據包,他們都是Downstream Handles;

  • 此外還有一個handler,是一個自定義的Upstream Handles,用於開發者從網絡數據中解析獲得本身所需的數據socketServerHandler;

  • 上例Handles的執行順序爲

    upstream:frameDecoder,protobufDecoder,handler   //解碼從Socket收到的數據 
    downstream:frameEncoder,protobufEncoder         //編碼要經過Socket發送出去的數據

Application.java

package com.dust.proto_server;

import com.dust.proto_server.netty.NettyServerListener;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import javax.annotation.Resource;

@SpringBootApplication
public class Application implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
    
    @Resource
    private NettyServerListener nettyServerListener;

    @Override
    public void run(String... args) throws Exception {
        nettyServerListener.start();
    }
}

測試

  • 先啓動服務端,再啓動客戶端

  • search測試

  • action測試

相關文章
相關標籤/搜索