DotNetty 跨平臺的網絡通訊庫

長久以來,.Net開發人員都很是羨慕Java有Netty這樣,高效,穩定又易用的網絡通訊基礎框架。終於微軟的Azure團隊,使用C#實現的Netty的版本發佈。不但使用了C#和.Net平臺的技術特色,而且保留了Netty原來絕大部分的編程接口。讓咱們在使用時,徹底能夠依照Netty官方的教程來學習和使用DotNetty應用程序。
DotNetty同時也是開源的,它的源代碼託管在Github上:https://github.com/azure/dotnettyjavascript

0x01 項目預覽

從github上下載最新的代碼到本地,使用VS2017或者VSCode打開下載好的代碼,能夠看到如圖所示的代碼那結構,其中源碼部分有9個項目組成,其中java

DotNetty.Common 是公共的類庫項目,包裝線程池,並行任務和經常使用幫助類的封裝
DotNetty.Transport 是DotNetty核心的實現
DotNetty.Buffers 是對內存緩衝區管理的封裝
DotNetty.Codes 是對編解碼是封裝,包括一些基礎基類的實現,咱們在項目中自定義的協議,都要繼承該項目的特定基類和實現
DotNetty.Handlers 封裝了經常使用的管道處理器,好比Tls編解碼,超時機制,心跳檢查,日誌等,若是項目中沒有用到能夠不引用,不過通常都會用到
其餘還有對Redis的編解碼,Mqtt的編解碼,Protobuf2/3的編解碼項目中可根據實際狀況引用
很遺憾Http協議和Websocket協議尚未實現。git

0x02 快速開始-示例-回聲程序的實現

從上一步下載的代碼中,看到有一個sample目錄,有不少例子,都大同小異, 先來看這個最簡單的Echo服務的實現吧.
Echo服務,分爲服務端和客戶端,服務端使用DotNetty框架啓動一個Socket服務,並等待客戶端連接,當客戶端連接並接收客戶端消息,並將接收到的消息原樣返回給客戶端。而客戶端一樣使用DotNetty框架啓動一個Socket客戶端服務,並連接到服務端,併發送一條Hello的字符串信息,並等待服務端返回。如此往復。github

2.1 Echo Server

來一塊兒看一下代碼吧,我把註釋都寫到代碼中:web

static async Task RunServerAsync() { //設置輸出日誌到Console ExampleHelper.SetConsoleLogger(); // 主工做線程組,設置爲1個線程 var bossGroup = new MultithreadEventLoopGroup(1); // 工做線程組,默認爲內核數*2的線程數 var workerGroup = new MultithreadEventLoopGroup(); X509Certificate2 tlsCertificate = null; if (ServerSettings.IsSsl) //若是使用加密通道 { tlsCertificate = new X509Certificate2(Path.Combine(ExampleHelper.ProcessDirectory, "dotnetty.com.pfx"), "password"); } try { //聲明一個服務端Bootstrap,每一個Netty服務端程序,都由ServerBootstrap控制, //經過鏈式的方式組裝須要的參數 var bootstrap = new ServerBootstrap(); bootstrap .Group(bossGroup, workerGroup) // 設置主和工做線程組 .Channel<TcpServerSocketChannel>() // 設置通道模式爲TcpSocket .Option(ChannelOption.SoBacklog, 100) // 設置網絡IO參數等,這裏能夠設置不少參數,固然你對網絡調優和參數設置很是瞭解的話,你能夠設置,或者就用默認參數吧 .Handler(new LoggingHandler("SRV-LSTN")) //在主線程組上設置一個打印日誌的處理器 .ChildHandler(new ActionChannelInitializer<ISocketChannel>(channel => { //工做線程鏈接器 是設置了一個管道,服務端主線程全部接收到的信息都會經過這個管道一層層往下傳輸 //同時全部出棧的消息 也要這個管道的全部處理器進行一步步處理 IChannelPipeline pipeline = channel.Pipeline; if (tlsCertificate != null) //Tls的加解密 { pipeline.AddLast("tls", TlsHandler.Server(tlsCertificate)); } //日誌攔截器 pipeline.AddLast(new LoggingHandler("SRV-CONN")); //出棧消息,經過這個handler 在消息頂部加上消息的長度 pipeline.AddLast("framing-enc", new LengthFieldPrepender(2)); //入棧消息經過該Handler,解析消息的包長信息,並將正確的消息體發送給下一個處理Handler,該類比較經常使用,後面單獨說明 pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2)); //業務handler ,這裏是實際處理Echo業務的Handler pipeline.AddLast("echo", new EchoServerHandler()); })); // bootstrap綁定到指定端口的行爲 就是服務端啓動服務,一樣的Serverbootstrap能夠bind到多個端口 IChannel boundChannel = await bootstrap.BindAsync(ServerSettings.Port); Console.ReadLine(); //關閉服務 await boundChannel.CloseAsync(); } finally { //釋放工做組線程 await Task.WhenAll( bossGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)), workerGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1))); } }

來看下實際的業務代碼,比較簡單,也就是打印日誌,並返回收到的字符串編程

public class EchoServerHandler : ChannelHandlerAdapter //管道處理基類,較經常使用 { // 重寫基類的方法,當消息到達時觸發,這裏收到消息後,在控制檯輸出收到的內容,並原樣返回了客戶端 public override void ChannelRead(IChannelHandlerContext context, object message) { var buffer = message as IByteBuffer; if (buffer != null) { Console.WriteLine("Received from client: " + buffer.ToString(Encoding.UTF8)); } context.WriteAsync(message);//寫入輸出流 } // 輸出到客戶端,也能夠在上面的方法中直接調用WriteAndFlushAsync方法直接輸出 public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush(); //捕獲 異常,並輸出到控制檯後斷開連接,提示:客戶端意外斷開連接,也會觸發 public override void ExceptionCaught(IChannelHandlerContext context, Exception exception) { Console.WriteLine("Exception: " + exception); context.CloseAsync(); } }

2.2 Echo Client

客戶端的代碼和服務端的代碼相差不多,體現了Netty統一的編程模型。有幾個不一樣點:bootstrap

  1. 客戶端的Bootstrap不是ServerBootstrap了,
  2. 客戶端不須要主線程組,只有工做線程組,消息處理管道也創建在裏主線程工做組的攔截通道上。
  3. 最後不是bind而是connect
static async Task RunClientAsync() { ExampleHelper.SetConsoleLogger(); var group = new MultithreadEventLoopGroup(); X509Certificate2 cert = null; string targetHost = null; if (ClientSettings.IsSsl) { cert = new X509Certificate2(Path.Combine(ExampleHelper.ProcessDirectory, "dotnetty.com.pfx"), "password"); targetHost = cert.GetNameInfo(X509NameType.DnsName, false); } try { var bootstrap = new Bootstrap(); bootstrap .Group(group) .Channel<TcpSocketChannel>() .Option(ChannelOption.TcpNodelay, true) .Handler(new ActionChannelInitializer<ISocketChannel>(channel => { IChannelPipeline pipeline = channel.Pipeline; if (cert != null) { pipeline.AddLast("tls", new TlsHandler(stream => new SslStream(stream, true, (sender, certificate, chain, errors) => true), new ClientTlsSettings(targetHost))); } pipeline.AddLast(new LoggingHandler()); pipeline.AddLast("framing-enc", new LengthFieldPrepender(2)); pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2)); pipeline.AddLast("echo", new EchoClientHandler()); })); IChannel clientChannel = await bootstrap.ConnectAsync(new IPEndPoint(ClientSettings.Host, ClientSettings.Port)); Console.ReadLine(); await clientChannel.CloseAsync(); } finally { await group.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)); } }

業務代碼服務器

// 代碼和服務端也相差很少,而且繼承了一樣的基類。 public class EchoClientHandler : ChannelHandlerAdapter { readonly IByteBuffer initialMessage; public EchoClientHandler() { this.initialMessage = Unpooled.Buffer(ClientSettings.Size); byte[] messageBytes = Encoding.UTF8.GetBytes("Hello world"); this.initialMessage.WriteBytes(messageBytes); } //重寫基類方法,當連接上服務器後,立刻發送Hello World消息到服務端 public override void ChannelActive(IChannelHandlerContext context) => context.WriteAndFlushAsync(this.initialMessage); public override void ChannelRead(IChannelHandlerContext context, object message) { var byteBuffer = message as IByteBuffer; if (byteBuffer != null) { Console.WriteLine("Received from server: " + byteBuffer.ToString(Encoding.UTF8)); } context.WriteAsync(message); } public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush(); public override void ExceptionCaught(IChannelHandlerContext context, Exception exception) { Console.WriteLine("Exception: " + exception); context.CloseAsync(); } }

0x03 經常使用Handler和基類

從Echo服務的例子中,咱們能夠看到Netty程序無論時服務端仍是客戶端都經過一個Bootstrap/ServerBootstrap來啓動Socket程序,並經過設置處理Handler管道來處理出入的消息,管道中常見的攔截器有加解密,日誌記錄,編解碼,消息頭處理,業務處理等,實際業務中根據狀況能夠自行添加本身的業務邏輯,同時不少處理器代碼在服務端和客戶端是公用的,Netty自己已經提供了一些經常使用處理器和業務處理器的基類來簡化實際開發,咱們一塊兒看一下網絡

3.1 TlsHandler

Netty支持Tls加密傳輸,TlsHandler類能夠在開發人員無須關心加密傳輸時字節碼的變化,只關心本身的業務代碼便可。在管道處理的第一個配置該類便可併發

3.2 LengthFieldPrepender

這個handler 會在實際發送前在將數據的長度放置在數據前,本例中使用2個字節來存儲數據的長度。

3.3 LengthFieldBasedFrameDecoder

這個handler比較經常使用,會在解碼前用於解析數據,用於讀取數據包的頭信息,特別是包長,並等待數據達到包長後再交由下一個handler處理。
參數說明 如下是Amp協議的參數值,並註釋了意義

InitialBytesToStrip = 0, //讀取時須要跳過的字節數
LengthAdjustment = -5, //包實際長度的糾正,若是包長包括包頭和包體,則要減去Length以前的部分
LengthFieldLength = 4, //長度字段的字節數 整型爲4個字節
LengthFieldOffset = 1, //長度屬性的起始(偏移)位
MaxFrameLength = int.MaxValue, // 最大包長

3.4 ChannelHandlerAdapter和SimpleChannelInboundHandler

業務處理的經常使用Handler基類,通常客戶端和服務端的業務處理handler 都要繼承這個這兩個類,其中SimpleChannelInboundHandler是ChannelHandlerAdapter的子類,對其簡單的進行封裝,並進行了類型檢查。

3.5 IdleStateHandler 連接狀態檢查handler

這個handler通常用於檢查連接的狀態,好比寫超時,讀超時。在實際項目中通常在客戶端添加它,並用於發送心跳包。

如下是DotBPE在客戶端管道中 第一個添加IdleStateHandler 並設置觸發時間

var bootstrap = new Bootstrap(); bootstrap .Channel<TcpSocketChannel>() .Option(ChannelOption.TcpNodelay, true) .Option(ChannelOption.ConnectTimeout, TimeSpan.FromSeconds(3)) .Group(new MultithreadEventLoopGroup()) .Handler(new ActionChannelInitializer<ISocketChannel>(c => { var pipeline = c.Pipeline; pipeline.AddLast(new LoggingHandler("CLT-CONN")); MessageMeta meta = _msgCodecs.GetMessageMeta(); // IdleStateHandler pipeline.AddLast("timeout", new IdleStateHandler(0, 0, meta.HeartbeatInterval / 1000)); //消息前處理 pipeline.AddLast( new LengthFieldBasedFrameDecoder( meta.MaxFrameLength, meta.LengthFieldOffset, meta.LengthFieldLength, meta.LengthAdjustment, meta.InitialBytesToStrip ) ); pipeline.AddLast(new ChannelDecodeHandler<TMessage>(_msgCodecs)); pipeline.AddLast(new ClientChannelHandlerAdapter<TMessage>(this)); })); return bootstrap;

而後在業務處理handler中處理UserEventTriggered事件

//ChannelHandlerAdapter 重寫UserEventTriggered public override void UserEventTriggered(IChannelHandlerContext context, object evt){ if(evt is IdleStateEvent){ var eventState = evt as IdleStateEvent; if(eventState !=null){ this._bootstrap.SendHeartbeatAsync(context,eventState); } } }

更多細節能夠參考 《Netty 4.x 用戶指南

相關文章
相關標籤/搜索