異步和事件驅動是Netty設計的關鍵。html
Channel:一個鏈接就是一個Channel。
Channel是Socket的封裝,提供綁定,讀,寫等操做,下降了直接使用Socket的複雜性。Channel是Socket的抽象,能夠被註冊到一個EventLoop上,EventLoop至關於Selector,每個EventLoop又有本身的處理線程。
json
回調:通知的基礎。bootstrap
EventLoop
咱們以前就講過EventLoop這裏回顧一下:服務器
一個 EventLoopGroup 包含一個或者多個 EventLoop;
一個 EventLoop 在它的生命週期內只和一個 Thread 綁定;
全部由 EventLoop 處理的 I/O 事件都將在它專有的 Thread 上被處理;
一個 Channel 在它的生命週期內只註冊於一個 EventLoop;
一個 EventLoop 可能會被分配給一個或多個 Channel。app
ChannelHandler是處理數據的邏輯容器框架
ChannelInboundHandler是接收並處理入站事件的邏輯容器,能夠處理入站數據以及給客戶端以回覆。異步
ChannelPipeline是將ChannelHandler穿成一串的的容器。async
編碼器和解碼器都實現了ChannelInboundHandler和 ChannelOutboundHandler接口用於處理入站或出站數據。ide
字節級操做,工控協議的話大多都是字節流,咱們之前的方式就是拼,大概就是:對照協議這兩個字節是什麼,後四個字節表示什麼意思。如今DotNetty提供了一個ByteBuffer來簡化咱們對於字節流的操做。適用工控協議。json格式的,可是底層仍是字節流。oop
DotNetty由九個項目構成,在NuGet中都是單獨的包,能夠按需引用,其中比較重要的幾個是如下幾個:
配置成服務器管道,TcpServerSocketChannel,之因此配置成服務器管道緣由是與客戶端管道不一樣,服務器管道多了偵聽服務。將服務端的邏輯處理代碼Handler以pipeline形式添加到channel中去。
using DotNetty.Transport.Bootstrapping; using DotNetty.Transport.Channels; using DotNetty.Transport.Channels.Sockets; using System; using System.Threading.Tasks; namespace EchoServer { class Program { static async Task RunServerAsync() { IEventLoopGroup eventLoop; eventLoop = new MultithreadEventLoopGroup(); try { // 服務器引導程序 var bootstrap = new ServerBootstrap(); bootstrap.Group(eventLoop); bootstrap.Channel<TcpServerSocketChannel>() // 保持長鏈接 .ChildOption(ChannelOption.SoKeepalive, true); bootstrap.ChildHandler(new ActionChannelInitializer<IChannel>(channel => { IChannelPipeline pipeline = channel.Pipeline; pipeline.AddLast(new EchoServerHandler()); })); IChannel boundChannel = await bootstrap.BindAsync(3000); Console.ReadLine(); await boundChannel.CloseAsync(); } catch (Exception ex) { Console.WriteLine(ex); } finally { await eventLoop.ShutdownGracefullyAsync(); } } static void Main(string[] args) => RunServerAsync().Wait(); } }
接收連入服務端代碼的客戶端消息,並將此消息從新返回給客戶端。
using DotNetty.Buffers; using DotNetty.Transport.Channels; using System; using System.Text; namespace EchoServer { /// <summary> /// 由於服務器只須要響應傳入的消息,因此只須要實現ChannelHandlerAdapter就能夠了 /// </summary> public class EchoServerHandler : ChannelHandlerAdapter { /// <summary> /// 每一個傳入消息都會調用 /// 處理傳入的消息須要複寫這個方法 /// </summary> /// <param name="ctx"></param> /// <param name="msg"></param> public override void ChannelRead(IChannelHandlerContext ctx, object msg) { IByteBuffer message = msg as IByteBuffer; Console.WriteLine("收到信息:" + message.ToString(Encoding.UTF8)); ctx.WriteAsync(message); } /// <summary> /// 批量讀取中的最後一條消息已經讀取完成 /// </summary> /// <param name="context"></param> public override void ChannelReadComplete(IChannelHandlerContext context) { context.Flush(); } /// <summary> /// 發生異常 /// </summary> /// <param name="context"></param> /// <param name="exception"></param> public override void ExceptionCaught(IChannelHandlerContext context, Exception exception) { Console.WriteLine(exception); context.CloseAsync(); } } }
配置須要鏈接的服務端ip地址及其端口號,而且配置客戶端的處理邏輯代碼以pipeline形式添加到channel中去。
using DotNetty.Transport.Bootstrapping; using DotNetty.Transport.Channels; using DotNetty.Transport.Channels.Sockets; using System; using System.Net; using System.Threading.Tasks; namespace EchoClient { class Program { static async Task RunClientAsync() { var group = new MultithreadEventLoopGroup(); try { var bootstrap = new Bootstrap(); bootstrap .Group(group) .Channel<TcpSocketChannel>() .Handler(new ActionChannelInitializer<ISocketChannel>(channel => { IChannelPipeline pipeline = channel.Pipeline; pipeline.AddLast(new EchoClientHandler()); })); IChannel clientChannel = await bootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse("192.168.1.11"), 3000)); Console.ReadLine(); await clientChannel.CloseAsync(); } catch (Exception ex) { Console.WriteLine(ex); } finally { await group.ShutdownGracefullyAsync(); } } static void Main(string[] args) => RunClientAsync().Wait(); } }
客戶端鏈接成功後,向服務端發送消息,接受到服務端消息,對消息計數再發還給服務端。
using DotNetty.Buffers; using DotNetty.Transport.Channels; using System; using System.Collections.Generic; using System.Text; namespace EchoClient { public class EchoClientHandler : SimpleChannelInboundHandler<IByteBuffer> { public static int i=0; /// <summary> /// Read0是DotNetty特有的對於Read方法的封裝 /// 封裝實現了: /// 1. 返回的message的泛型實現 /// 2. 丟棄非該指定泛型的信息 /// </summary> /// <param name="ctx"></param> /// <param name="msg"></param> protected override void ChannelRead0(IChannelHandlerContext ctx, IByteBuffer msg) { if (msg != null) { i++; Console.WriteLine($"Receive From Server {i}:" + msg.ToString(Encoding.UTF8)); } ctx.WriteAsync(Unpooled.CopiedBuffer(msg)); } public override void ChannelReadComplete(IChannelHandlerContext context) { context.Flush(); } public override void ChannelActive(IChannelHandlerContext context) { Console.WriteLine($"發送客戶端消息"); context.WriteAndFlushAsync(Unpooled.CopiedBuffer(Encoding.UTF8.GetBytes($"客戶端消息!"))); } public override void ExceptionCaught(IChannelHandlerContext context, Exception exception) { Console.WriteLine(exception); context.CloseAsync(); } } }
備註:紅框表示接收到消息的序號。