SpringBoot整合Netty並使用Protobuf進行數據傳輸(附工程)

前言

本篇文章主要介紹的是SpringBoot整合Netty以及使用Protobuf進行數據傳輸的相關內容。Protobuf會簡單的介紹下用法,至於Netty在以前的文章中已經簡單的介紹過了,這裏就再也不過多細說了。html

Protobuf

介紹

protocolbuffer(如下簡稱PB)是google 的一種數據交換的格式,它獨立於語言,獨立於平臺。google 提供了多種語言的實現:java、c#、c++、go 和python,每一種實現都包含了相應語言的編譯器以及庫文件。因爲它是一種二進制的格式,比使用 xml進行數據交換快許多。能夠把它用於分佈式應用之間的數據通訊或者異構環境下的數據交換。做爲一種效率和兼容性都很優秀的二進制數據傳輸格式,能夠用於諸如網絡傳輸、配置文件、數據存儲等諸多領域。java

官方地址: https://github.com/google/protobufpython

使用

這裏的使用就只介紹Java相關的使用。 首先咱們須要創建一個proto文件,在該文件定義咱們須要傳輸的文件。 例如咱們須要定義一個用戶的信息,包含的字段主要有編號、名稱、年齡。 那麼該protobuf文件的格式以下: :這裏使用的是proto3,相關的註釋我已寫了,這裏便再也不過多講述了。須要注意一點的是proto文件和生成的Java文件名稱不能一致!c++

syntax = "proto3";
// 生成的包名
option java_package="com.pancm.protobuf";
//生成的java名
option java_outer_classname = "UserInfo";

message UserMsg {  
      
     // ID  
     int32 id = 1;  
      
    // 姓名  
     string name = 2;  
      
    // 年齡  
      int32 age = 3;  
	  
	 // 狀態 
     int32 state = 4;  
} 
複製代碼

建立好該文件以後,咱們把該文件和protoc.exe(生成Java文件的軟件)放到E盤目錄下的protobuf文件夾下,而後再到該目錄的dos界面下輸入:protoc.exe --java_out=文件絕對路徑名稱。 例如:git

protoc.exe --java_out=E:\protobuf User.proto
複製代碼

輸入完以後,回車便可在同級目錄看到已經生成好的Java文件,而後將該文件放到項目中該文件指定的路徑下便可。github

注:生成protobuf的文件軟件和測試的protobuf文件我也整合到該項目中了,能夠直接獲取的。spring

Java文件生成好以後,咱們再來看怎麼使用。 這裏我就直接貼代碼了,而且將註釋寫在代碼中,應該更容易理解些吧。。。 代碼示例:json

// 按照定義的數據結構,建立一個對象  
    	UserInfo.UserMsg.Builder userInfo = UserInfo.UserMsg.newBuilder();  
    	userInfo.setId(1);
    	userInfo.setName("xuwujing");
    	userInfo.setAge(18);
    	UserInfo.UserMsg userMsg = userInfo.build();  
        // 將數據寫到輸出流 
        ByteArrayOutputStream output = new ByteArrayOutputStream();  
        userMsg.writeTo(output);  
        // 將數據序列化後發送 
        byte[] byteArray = output.toByteArray();  
        // 接收到流並讀取
        ByteArrayInputStream input = new ByteArrayInputStream(byteArray);  
        // 反序列化  
        UserInfo.UserMsg userInfo2 = UserInfo.UserMsg.parseFrom(input);  
        System.out.println("id:" + userInfo2.getId());  
        System.out.println("name:" + userInfo2.getName());  
        System.out.println("age:" + userInfo2.getAge());  
複製代碼

注:這裏說明一點,由於protobuf是經過二進制進行傳輸,因此須要注意下相應的編碼。還有使用protobuf也須要注意一下一次傳輸的最大字節長度。bootstrap

輸出結果:c#

id:1
name:xuwujing
age:18
複製代碼

SpringBoot整合Netty

說明:若是想直接獲取工程那麼能夠直接跳到底部,經過連接下載工程代碼。

開發準備

環境要求 JDK::1.8 Netty::4.0或以上(不包括5) Protobuf:3.0或以上

若是對Netty不熟的話,能夠看看我以前寫的一些文章。大神請無視~。~ 地址:https://blog.csdn.net/column/details/17640.html

首先仍是Maven的相關依賴:

<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<java.version>1.8</java.version>
		<netty.version>4.1.22.Final</netty.version>
		<protobuf.version>3.5.1</protobuf.version>
		<springboot>1.5.9.RELEASE</springboot>
		<fastjson>1.2.41</fastjson>
		<maven.compiler.source>1.8</maven.compiler.source>
   		<maven.compiler.target>1.8</maven.compiler.target>
	</properties>


	<dependencies>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
			<version>${springboot}</version>
		</dependency>


		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<version>${springboot}</version>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-devtools</artifactId>
			<version>${springboot}</version>
			<optional>true</optional>
		</dependency>

		
		<dependency>
			<groupId>io.netty</groupId>
			<artifactId>netty-all</artifactId>
			<version>${netty.version}</version>
		</dependency>

		<dependency>
			<groupId>com.google.protobuf</groupId>
			<artifactId>protobuf-java</artifactId>
			<version>${protobuf.version}</version>
		</dependency>

		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>${fastjson}</version>
		</dependency>

		
	<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.12</version>
			<scope>test</scope>
		</dependency> 
</dependencies>
複製代碼

添加了相應的maven依賴以後,配置文件這塊暫時沒有什麼能夠添加的,由於暫時就一個監聽的端口而已。

代碼編寫

代碼模塊主要分爲服務端和客戶端。 主要實現的業務邏輯: 服務端啓動成功以後,客戶端也啓動成功,這時服務端會發送一條protobuf格式的信息給客戶端,而後客戶端給予相應的應答。客戶端與服務端鏈接成功以後,客戶端每一個一段時間會發送心跳指令給服務端,告訴服務端該客戶端還存過中,若是客戶端沒有在指定的時間發送信息,服務端會關閉與該客戶端的鏈接。當客戶端沒法鏈接到服務端以後,會每隔一段時間去嘗試重連,只到重連成功!

服務端

首先是編寫服務端的啓動類,相應的註釋在代碼中寫得很詳細了,這裏也再也不過多講述了。不過須要注意的是,在以前的我寫的Netty文章中,是經過main方法直接啓動服務端,所以是直接new一個對象的。而在和SpringBoot整合以後,咱們須要將Netty交給springBoot去管理,因此這裏就用了相應的註解。 代碼以下:

@Service("nettyServer")
public class NettyServer {
	private static final int port = 9876; // 設置服務端端口
	private static EventLoopGroup boss = new NioEventLoopGroup(); // 經過nio方式來接收鏈接和處理鏈接
	private static EventLoopGroup work = new NioEventLoopGroup(); // 經過nio方式來接收鏈接和處理鏈接
	private static ServerBootstrap b = new ServerBootstrap();
	
	@Autowired
	private NettyServerFilter nettyServerFilter;
	
	
	public void run() {
		try {
			b.group(boss, work);
			b.channel(NioServerSocketChannel.class);
			b.childHandler(nettyServerFilter); // 設置過濾器
			// 服務器綁定端口監聽
			ChannelFuture f = b.bind(port).sync();
			System.out.println("服務端啓動成功,端口是:" + port);
			// 監聽服務器關閉監聽
			f.channel().closeFuture().sync();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			// 關閉EventLoopGroup,釋放掉全部資源包括建立的線程
			work.shutdownGracefully();
			boss.shutdownGracefully();
		}
	}
}
複製代碼

服務端主類編寫完畢以後,咱們再來設置下相應的過濾條件。 這裏須要繼承Netty中ChannelInitializer類,而後重寫initChannel該方法,進行添加相應的設置,如心跳超時設置,傳輸協議設置,以及相應的業務實現類。 代碼以下:

@Component
	 public class NettyServerFilter extends ChannelInitializer<SocketChannel> {
	
	@Autowired
	private NettyServerHandler nettyServerHandler;
	
     @Override
     protected void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline ph = ch.pipeline();
      
         //入參說明: 讀超時時間、寫超時時間、全部類型的超時時間、時間格式
         ph.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
         // 解碼和編碼,應和客戶端一致
         //傳輸的協議 Protobuf
         ph.addLast(new ProtobufVarint32FrameDecoder());
         ph.addLast(new ProtobufDecoder(UserMsg.getDefaultInstance()));
         ph.addLast(new ProtobufVarint32LengthFieldPrepender());
         ph.addLast(new ProtobufEncoder());
         
         //業務邏輯實現類
         ph.addLast("nettyServerHandler", nettyServerHandler);
       }
     }
複製代碼

服務相關的設置的代碼寫完以後,咱們再來編寫主要的業務代碼。 使用Netty編寫業務層的代碼,咱們須要繼承ChannelInboundHandlerAdapterSimpleChannelInboundHandler類,在這裏順便說下它們兩的區別吧。 繼承SimpleChannelInboundHandler類以後,會在接收到數據後會自動release掉數據佔用的Bytebuffer資源。而且繼承該類須要指定數據格式。 而繼承ChannelInboundHandlerAdapter則不會自動釋放,須要手動調用ReferenceCountUtil.release()等方法進行釋放。繼承該類不須要指定數據格式。 因此在這裏,我的推薦服務端繼承ChannelInboundHandlerAdapter,手動進行釋放,防止數據未處理完就自動釋放了。並且服務端可能有多個客戶端進行鏈接,而且每個客戶端請求的數據格式都不一致,這時即可以進行相應的處理。 客戶端根據狀況能夠繼承SimpleChannelInboundHandler類。好處是直接指定好傳輸的數據格式,就不須要再進行格式的轉換了。

代碼以下:

@Service("nettyServerHandler")
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

	/** 空閒次數 */
	private int idle_count = 1;
	/** 發送次數 */
	private int count = 1;


	/**
	 * 創建鏈接時,發送一條消息
	 */
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		System.out.println("鏈接的客戶端地址:" + ctx.channel().remoteAddress());
		UserInfo.UserMsg userMsg = UserInfo.UserMsg.newBuilder().setId(1).setAge(18).setName("xuwujing").setState(0)
				.build();
		ctx.writeAndFlush(userMsg);
		super.channelActive(ctx);
	}

	/**
	 * 超時處理 若是5秒沒有接受客戶端的心跳,就觸發; 若是超過兩次,則直接關閉;
	 */
	@Override
	public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception {
		if (obj instanceof IdleStateEvent) {
			IdleStateEvent event = (IdleStateEvent) obj;
			if (IdleState.READER_IDLE.equals(event.state())) { // 若是讀通道處於空閒狀態,說明沒有接收到心跳命令
				System.out.println("已經5秒沒有接收到客戶端的信息了");
				if (idle_count > 1) {
					System.out.println("關閉這個不活躍的channel");
					ctx.channel().close();
				}
				idle_count++;
			}
		} else {
			super.userEventTriggered(ctx, obj);
		}
	}

	/**
	 * 業務邏輯處理
	 */
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		System.out.println("第" + count + "次" + ",服務端接受的消息:" + msg);
		try {
			// 若是是protobuf類型的數據
		  if (msg instanceof UserMsg) {
				UserInfo.UserMsg userState = (UserInfo.UserMsg) msg;
				if (userState.getState() == 1) {
					System.out.println("客戶端業務處理成功!");
				} else if(userState.getState() == 2){
					System.out.println("接受到客戶端發送的心跳!");
				}else{
					System.out.println("未知命令!");
				}
			} else {
				System.out.println("未知數據!" + msg);
				return;
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			ReferenceCountUtil.release(msg);
		}
		count++;
	}

	/**
	 * 異常處理
	 */
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		cause.printStackTrace();
		ctx.close();
	}
}
複製代碼

還有個服務端的啓動類,以前是經過main方法直接啓動, 不過這裏改爲了經過springBoot進行啓動,差異不大。 代碼以下:

@SpringBootApplication
public class NettyServerApp {

	public static void main(String[] args) {
		// 啓動嵌入式的 Tomcat 並初始化 Spring 環境及其各 Spring 組件
		ApplicationContext context = SpringApplication.run(NettyServerApp.class, args);
		NettyServer nettyServer = context.getBean(NettyServer.class);
		nettyServer.run();
	}

}
複製代碼

到這裏服務端相應的代碼就編寫完畢了。

客戶端

客戶端這邊的代碼和服務端的不少地方都相似,我就再也不過多細說了,主要將一些不一樣的代碼拿出來簡單的講述下。 首先是客戶端的主類,基本和服務端的差很少,也就是多了監聽的端口和一個監聽器(用來監聽是否和服務端斷開鏈接,用於重連)。 主要實現的代碼邏輯以下:

public void doConnect(Bootstrap bootstrap, EventLoopGroup eventLoopGroup) {
		ChannelFuture f = null;
		try {
			if (bootstrap != null) {
				bootstrap.group(eventLoopGroup);
				bootstrap.channel(NioSocketChannel.class);
				bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
				bootstrap.handler(nettyClientFilter);
				bootstrap.remoteAddress(host, port);
				f = bootstrap.connect().addListener((ChannelFuture futureListener) -> {
					final EventLoop eventLoop = futureListener.channel().eventLoop();
					if (!futureListener.isSuccess()) {
						System.out.println("與服務端斷開鏈接!在10s以後準備嘗試重連!");
						eventLoop.schedule(() -> doConnect(new Bootstrap(), eventLoop), 10, TimeUnit.SECONDS);
					}
				});
				if(initFalg){
					System.out.println("Netty客戶端啓動成功!");
					initFalg=false;
				}
				// 阻塞
				f.channel().closeFuture().sync();
			}
		} catch (Exception e) {
			System.out.println("客戶端鏈接失敗!"+e.getMessage());
		}
	}
複製代碼

注:監聽器這塊的實現用的是JDK1.8的寫法。

客戶端過濾其這塊基本和服務端一直。不過須要注意的是,傳輸協議、編碼和解碼應該一致,還有心跳的讀寫時間應該小於服務端所設置的時間。 改動的代碼以下:

ChannelPipeline ph = ch.pipeline();
        /*
         * 解碼和編碼,應和服務端一致
         * */
        //入參說明: 讀超時時間、寫超時時間、全部類型的超時時間、時間格式
        ph.addLast(new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS)); 
複製代碼

客戶端的業務代碼邏輯。 主要實現的幾點邏輯是心跳按時發送以及解析服務發送的protobuf格式的數據。
這裏比服務端多個個註解, 該註解Sharable主要是爲了多個handler能夠被多個channel安全地共享,也就是保證線程安全。 廢話就很少說了,代碼以下:

@Service("nettyClientHandler")
	@ChannelHandler.Sharable
	public class NettyClientHandler extends ChannelInboundHandlerAdapter {
	@Autowired
	private NettyClient nettyClient;
	
	/** 循環次數 */
	private int fcount = 1;
	
	/**
	 * 創建鏈接時
	 */
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		System.out.println("創建鏈接時:" + new Date());
		ctx.fireChannelActive();
	}

	/**
	 * 關閉鏈接時
	 */
	@Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
		System.out.println("關閉鏈接時:" + new Date());
		final EventLoop eventLoop = ctx.channel().eventLoop();
		nettyClient.doConnect(new Bootstrap(), eventLoop);
		super.channelInactive(ctx);
	}

	/**
	 * 心跳請求處理 每4秒發送一次心跳請求;
	 * 
	 */
	@Override
	public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception {
		System.out.println("循環請求的時間:" + new Date() + ",次數" + fcount);
		if (obj instanceof IdleStateEvent) {
			IdleStateEvent event = (IdleStateEvent) obj;
			if (IdleState.WRITER_IDLE.equals(event.state())) { // 若是寫通道處於空閒狀態,就發送心跳命令
				UserMsg.Builder userState = UserMsg.newBuilder().setState(2);
				ctx.channel().writeAndFlush(userState);
				fcount++;
			}
		}
	}

	/**
	 * 業務邏輯處理
	 */
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		// 若是不是protobuf類型的數據
		if (!(msg instanceof UserMsg)) {
			System.out.println("未知數據!" + msg);
			return;
		}
		try {

			// 獲得protobuf的數據
			UserInfo.UserMsg userMsg = (UserInfo.UserMsg) msg;
			// 進行相應的業務處理。。。
			// 這裏就從簡了,只是打印而已
			System.out.println(
					"客戶端接受到的用戶信息。編號:" + userMsg.getId() + ",姓名:" + userMsg.getName() + ",年齡:" + userMsg.getAge());

			// 這裏返回一個已經接受到數據的狀態
			UserMsg.Builder userState = UserMsg.newBuilder().setState(1);
			ctx.writeAndFlush(userState);
			System.out.println("成功發送給服務端!");
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			ReferenceCountUtil.release(msg);
		}
	 }
	}
複製代碼

那麼到這裏客戶端的代碼也編寫完畢了。

功能測試

首先啓動服務端,而後再啓動客戶端。 咱們來看看結果是否如上述所說。

服務端輸出結果:

服務端啓動成功,端口是:9876
鏈接的客戶端地址:/127.0.0.1:53319
第1次,服務端接受的消息:state: 1

客戶端業務處理成功!
第2次,服務端接受的消息:state: 2

接受到客戶端發送的心跳!
第3次,服務端接受的消息:state: 2

接受到客戶端發送的心跳!
第4次,服務端接受的消息:state: 2

接受到客戶端發送的心跳!
複製代碼

客戶端輸入結果:

Netty客戶端啓動成功!
創建鏈接時:Mon Jul 16 23:31:58 CST 2018
客戶端接受到的用戶信息。編號:1,姓名:xuwujing,年齡:18
成功發送給服務端!
循環請求的時間:Mon Jul 16 23:32:02 CST 2018,次數1
循環請求的時間:Mon Jul 16 23:32:06 CST 2018,次數2
循環請求的時間:Mon Jul 16 23:32:10 CST 2018,次數3
循環請求的時間:Mon Jul 16 23:32:14 CST 2018,次數4
複製代碼

經過打印信息能夠看出如上述所說。

接下來咱們再來看看客戶端是否可以實現重連。 先啓動客戶端,再啓動服務端。

客戶端輸入結果:

Netty客戶端啓動成功!
與服務端斷開鏈接!在10s以後準備嘗試重連!
客戶端鏈接失敗!AbstractChannel$CloseFuture@1fbaa3ac(incomplete)
創建鏈接時:Mon Jul 16 23:41:33 CST 2018
客戶端接受到的用戶信息。編號:1,姓名:xuwujing,年齡:18
成功發送給服務端!
循環請求的時間:Mon Jul 16 23:41:38 CST 2018,次數1
循環請求的時間:Mon Jul 16 23:41:42 CST 2018,次數2
循環請求的時間:Mon Jul 16 23:41:46 CST 2018,次數3
複製代碼

服務端輸出結果:

服務端啓動成功,端口是:9876
鏈接的客戶端地址:/127.0.0.1:53492
第1次,服務端接受的消息:state: 1

客戶端業務處理成功!
第2次,服務端接受的消息:state: 2

接受到客戶端發送的心跳!
第3次,服務端接受的消息:state: 2

接受到客戶端發送的心跳!
第4次,服務端接受的消息:state: 2
複製代碼

結果也如上述所說!

其它

關於SpringBoot整合Netty使用Protobuf進行數據傳輸到這裏就結束了。 SpringBoot整合Netty使用Protobuf進行數據傳輸的項目工程地址: https://github.com/xuwujing/springBoot-study/tree/master/springboot-netty-protobuf

對了,也有不使用springBoot整合的Netty項目工程地址: https://github.com/xuwujing/Netty-study/tree/master/Netty-protobuf

原創不易,若是感受不錯,但願給個推薦!您的支持是我寫做的最大動力! 版權聲明: 做者:虛無境 博客園出處:http://www.cnblogs.com/xuwujing CSDN出處:http://blog.csdn.net/qazwsxpcm     我的博客出處:http://www.panchengming.com

相關文章
相關標籤/搜索