(先埋怨一下微軟大大)咱們作NET開發,十分羨慕JAVA上能有NETTY, SPRING, STRUTS, DUBBO等等優秀框架,而咱們NET就只有乾瞪眼,哎,無賴以前生態圈沒作好,恨鐵不成鋼啊。不過因爲近來Net Core的發佈,慢慢也拉回了一小部分屬於微軟的天下,打住,閒話扯到這兒。html
DotNetty是Azure團隊仿照(幾乎能夠這麼說)JAVA的Netty而出來的(目前已實現Netty的一部分),目前在Github上的Star有1.8K+,地址:https://github.com/Azure/DotNetty,沒有任何文檔,和代碼中少許的註釋。雖然比Netty出來晚了不少年,不過咱們NET程序員們也該慶幸了,在本身的平臺上終於能用上相似Netty這樣強大的通訊框架了。git
咱們使用通用的應用程序或者類庫來實現互相通信,好比,咱們常用一個 HTTP 客戶端庫來從 web 服務器上獲取信息,或者經過 web 服務來執行一個遠程的調用。程序員
然而,有時候一個通用的協議或他的實現並無很好的知足需求。好比咱們沒法使用一個通用的 HTTP 服務器來處理大文件、電子郵件以及近實時消息,好比金融信息和多人遊戲數據。咱們須要一個高度優化的協議來處理一些特殊的場景。例如你可能想實現一個優化了的 Ajax 的聊天應用、媒體流傳輸或者是大文件傳輸器,你甚至能夠本身設計和實現一個全新的協議來準確地實現你的需求。github
另外一個不可避免的狀況是當你不得不處理遺留的專有協議來確保與舊系統的互操做性。在這種狀況下,重要的是咱們如何才能快速實現協議而不犧牲應用的穩定性和性能。web
Netty 是一個提供 asynchronous event-driven (異步事件驅動)的網絡應用框架,是一個用以快速開發高性能、可擴展協議的服務器和客戶端。編程
換句話說,Netty 是一個 NIO 客戶端服務器框架,使用它能夠快速簡單地開發網絡應用程序,好比服務器和客戶端的協議。Netty 大大簡化了網絡程序的開發過程好比 TCP 和 UDP 的 socket 服務的開發。bootstrap
「快速和簡單」並不意味着應用程序會有難維護和性能低的問題,Netty 是一個精心設計的框架,它從許多協議的實現中吸取了不少的經驗好比 FTP、SMTP、HTTP、許多二進制和基於文本的傳統協議.所以,Netty 已經成功地找到一個方式,在不失靈活性的前提下來實現開發的簡易性,高性能,穩定性。緩存
有一些用戶可能已經發現其餘的一些網絡框架也聲稱本身有一樣的優點,因此你可能會問是 Netty 和它們的不一樣之處。答案就是 Netty 的哲學設計理念。Netty 從開始就爲用戶提供了用戶體驗最好的 API 以及實現設計。正是由於 Netty 的哲學設計理念,才讓您得以輕鬆地閱讀本指南並使用 Netty。服務器
(DotNetty的框架和實現是怎麼回事,筆者不太清楚,但徹底可參考Netty官方的文檔來學習和使用DotNetty相關的API接口)網絡
DotNetty.Buffers: 對內存緩衝區管理的封裝。
DotNetty.Codecs: 對編解碼是封裝,包括一些基礎基類的實現,咱們在項目中自定義的協議,都要繼承該項目的特定基類和實現。
DotNetty.Codecs.Mqtt: MQTT(消息隊列遙測傳輸)編解碼是封裝,包括一些基礎基類的實現。
DotNetty.Codecs.Protobuf: Protobuf 編解碼是封裝,包括一些基礎基類的實現。
DotNetty.Codecs.ProtocolBuffers: ProtocolBuffers編解碼是封裝,包括一些基礎基類的實現。
DotNetty.Codecs.Redis: Redis 協議編解碼是封裝,包括一些基礎基類的實現。
DotNetty.Common: 公共的類庫項目,包裝線程池,並行任務和經常使用幫助類的封裝。
DotNetty.Handlers: 封裝了經常使用的管道處理器,好比Tls編解碼,超時機制,心跳檢查,日誌等。
DotNetty.Transport: DotNetty核心的實現,Socket基礎框架,通訊模式:異步非阻塞。
DotNetty.Transport.Libuv: DotNetty本身實現基於Libuv (高性能的,事件驅動的I/O庫) 核心的實現。
經常使用的庫有Codecs, Common, Handlers, Buffers, Transport,目前Azure團隊正在實現其餘Netty中的API(包括非公共Netty的API),讓咱們拭目以待吧。
DotNetty的Example文件夾下有許多官方提供的實例,有拋棄服務實例(Discard),有應答服務實例(echo),有Telnet服務實例等等,爲了實現直接點對點通信,筆者採用了Echo的demo,此後的RPC調用也會基於Echo而實現,註釋詳細,直接上接收端(Server)的代碼:
/*
* Netty 是一個半成品,做用是在須要基於自定義協議的基礎上完成本身的通訊封裝
* Netty 大大簡化了網絡程序的開發過程好比 TCP 和 UDP 的 socket 服務的開發。
* 「快速和簡單」並不意味着應用程序會有難維護和性能低的問題,
* Netty 是一個精心設計的框架,它從許多協議的實現中吸取了不少的經驗好比 FTP、SMTP、HTTP、許多二進制和基於文本的傳統協議。
* 所以,Netty 已經成功地找到一個方式,在不失靈活性的前提下來實現開發的簡易性,高性能,穩定性。
*/
namespace Echo.Server
{
using System;
using System.Threading.Tasks;
using DotNetty.Codecs;
using DotNetty.Handlers.Logging;
using DotNetty.Transport.Bootstrapping;
using DotNetty.Transport.Channels;
using DotNetty.Transport.Libuv;
using Examples.Common;
static class Program
{
static async Task RunServerAsync()
{
ExampleHelper.SetConsoleLogger();
// 申明一個主迴路調度組
var dispatcher = new DispatcherEventLoopGroup();
/*
Netty 提供了許多不一樣的 EventLoopGroup 的實現用來處理不一樣的傳輸。
在這個例子中咱們實現了一個服務端的應用,所以會有2個 NioEventLoopGroup 會被使用。
第一個常常被叫作‘boss’,用來接收進來的鏈接。第二個常常被叫作‘worker’,用來處理已經被接收的鏈接,一旦‘boss’接收到鏈接,就會把鏈接信息註冊到‘worker’上。
如何知道多少個線程已經被使用,如何映射到已經建立的 Channel上都須要依賴於 IEventLoopGroup 的實現,而且能夠經過構造函數來配置他們的關係。
*/
// 主工做線程組,設置爲1個線程
IEventLoopGroup bossGroup = dispatcher; // (1)
// 子工做線程組,設置爲1個線程
IEventLoopGroup workerGroup = new WorkerEventLoopGroup(dispatcher);
try
{
// 聲明一個服務端Bootstrap,每一個Netty服務端程序,都由ServerBootstrap控制,經過鏈式的方式組裝須要的參數
var serverBootstrap = new ServerBootstrap(); // (2)
// 設置主和工做線程組
serverBootstrap.Group(bossGroup, workerGroup);
if (ServerSettings.UseLibuv)
{
// 申明服務端通訊通道爲TcpServerChannel
serverBootstrap.Channel<TcpServerChannel>(); // (3)
}
serverBootstrap
// 設置網絡IO參數等
.Option(ChannelOption.SoBacklog, 100) // (5)
// 在主線程組上設置一個打印日誌的處理器
.Handler(new LoggingHandler("SRV-LSTN"))
// 設置工做線程參數
.ChildHandler(
/*
* ChannelInitializer 是一個特殊的處理類,他的目的是幫助使用者配置一個新的 Channel。
* 也許你想經過增長一些處理類好比DiscardServerHandler 來配置一個新的 Channel 或者其對應的ChannelPipeline 來實現你的網絡程序。
* 當你的程序變的複雜時,可能你會增長更多的處理類到 pipline 上,而後提取這些匿名類到最頂層的類上。
*/
new ActionChannelInitializer<IChannel>( // (4)
channel =>
{
/*
* 工做線程鏈接器是設置了一個管道,服務端主線程全部接收到的信息都會經過這個管道一層層往下傳輸,
* 同時全部出棧的消息 也要這個管道的全部處理器進行一步步處理。
*/
IChannelPipeline pipeline = channel.Pipeline;
// 添加日誌攔截器
pipeline.AddLast(new LoggingHandler("SRV-CONN"));
// 添加出棧消息,經過這個handler在消息頂部加上消息的長度。
// LengthFieldPrepender(2):使用2個字節來存儲數據的長度。
pipeline.AddLast("framing-enc", new LengthFieldPrepender(2));
/*
入棧消息經過該Handler,解析消息的包長信息,並將正確的消息體發送給下一個處理Handler
1,InitialBytesToStrip = 0, //讀取時須要跳過的字節數
2,LengthAdjustment = -5, //包實際長度的糾正,若是包長包括包頭和包體,則要減去Length以前的部分
3,LengthFieldLength = 4, //長度字段的字節數 整型爲4個字節
4,LengthFieldOffset = 1, //長度屬性的起始(偏移)位
5,MaxFrameLength = int.MaxValue, //最大包長
*/
pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2));
// 業務handler
pipeline.AddLast("echo", new EchoServerHandler());
}));
// bootstrap綁定到指定端口的行爲就是服務端啓動服務,一樣的Serverbootstrap能夠bind到多個端口
IChannel boundChannel = await serverBootstrap.BindAsync(ServerSettings.Port); // (6)
Console.WriteLine("wait the client input");
Console.ReadLine();
// 關閉服務
await boundChannel.CloseAsync();
}
finally
{
// 釋放指定工做組線程
await Task.WhenAll( // (7)
bossGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)),
workerGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1))
);
}
}
static void Main() => RunServerAsync().Wait();
}
}
IEventLoopGroup 是用來處理I/O操做的多線程事件循環器,DotNetty 提供了許多不一樣的 EventLoopGroup 的實現用來處理不一樣的傳輸。在這個例子中咱們實現了一個服務端的應用,所以會有2個 IEventLoopGroup 會被使用。第一個常常被叫作‘boss’,用來接收進來的鏈接。第二個常常被叫作‘worker’,用來處理已經被接收的鏈接,一旦‘boss’接收到鏈接,就會把鏈接信息註冊到‘worker’上。
ServerBootstrap 是一個啓動 Transport 服務的輔助啓動類。你能夠在這個服務中直接使用 Channel,可是這會是一個複雜的處理過程,在不少狀況下你並不須要這樣作。
這裏咱們指定使用 TcpServerChannel類來舉例說明一個新的 Channel 如何接收進來的鏈接。
ChannelInitializer 是一個特殊的處理類,他的目的是幫助使用者配置一個新的 Channel,當你的程序變的複雜時,可能你會增長更多的處理類到 pipline 上,而後提取這些匿名類到最頂層的類上。
你能夠設置這裏指定的 Channel 實現的配置參數。咱們正在寫一個TCP/IP 的服務端,所以咱們被容許設置 socket 的參數選項好比tcpNoDelay 和 keepAlive。
綁定端口而後啓動服務,這裏咱們在機器上綁定了機器網卡上的設置端口,固然如今你能夠屢次調用 bind() 方法(基於不一樣綁定地址)。
使用完成後,優雅的釋放掉指定的工做組線程,固然,你能夠選擇關閉程序,但這並不推薦。
上一部分代碼中加粗地方的實現
namespace Echo.Server
{
using System;
using System.Text;
using DotNetty.Buffers;
using DotNetty.Transport.Channels;
/// <summary>
/// 服務端處理事件函數
/// </summary>
public class EchoServerHandler : ChannelHandlerAdapter // ChannelHandlerAdapter 業務繼承基類適配器 // (1)
{
/// <summary>
/// 管道開始讀
/// </summary>
/// <param name="context"></param>
/// <param name="message"></param>
public override void ChannelRead(IChannelHandlerContext context, object message) // (2)
{
if (message is IByteBuffer buffer) // (3)
{
Console.WriteLine("Received from client: " + buffer.ToString(Encoding.UTF8));
}
context.WriteAsync(message); // (4)
}
/// <summary>
/// 管道讀取完成
/// </summary>
/// <param name="context"></param>
public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush(); // (5)
/// <summary>
/// 出現異常
/// </summary>
/// <param name="context"></param>
/// <param name="exception"></param>
public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
{
Console.WriteLine("Exception: " + exception);
context.CloseAsync();
}
}
}
DiscardServerHandler 繼承自 ChannelInboundHandlerAdapter,這個類實現了IChannelHandler接口,IChannelHandler提供了許多事件處理的接口方法,而後你能夠覆蓋這些方法。如今僅僅只須要繼承 ChannelInboundHandlerAdapter 類而不是你本身去實現接口方法。
這裏咱們覆蓋了 chanelRead() 事件處理方法。每當從客戶端收到新的數據時,這個方法會在收到消息時被調用,這個例子中,收到的消息的類型是 ByteBuf。
爲了響應或顯示客戶端發來的信息,爲此,咱們將在控制檯中打印出客戶端傳來的數據。
而後,咱們將客戶端傳來的消息經過context.WriteAsync寫回到客戶端。
固然,步驟4只是將流緩存到上下文中,並沒執行真正的寫入操做,經過執行Flush將流數據寫入管道,並經過context傳回給傳來的客戶端。
重點看註釋的地方,其餘地方跟Server端沒有任何區別
namespace Echo.Client
{
using System;
using System.Net;
using System.Text;
using System.Threading.Tasks;
using DotNetty.Buffers;
using DotNetty.Codecs;
using DotNetty.Handlers.Logging;
using DotNetty.Transport.Bootstrapping;
using DotNetty.Transport.Channels;
using DotNetty.Transport.Channels.Sockets;
using Examples.Common;
static class Program
{
static async Task RunClientAsync()
{
ExampleHelper.SetConsoleLogger();
var group = new MultithreadEventLoopGroup();
try
{
var bootstrap = new Bootstrap();
bootstrap
.Group(group)
.Channel<TcpSocketChannel>()
.Option(ChannelOption.TcpNodelay, true)
.Handler(
new ActionChannelInitializer<ISocketChannel>(
channel =>
{
IChannelPipeline pipeline = channel.Pipeline;
pipeline.AddLast(new LoggingHandler());
pipeline.AddLast("framing-enc", new LengthFieldPrepender(2));
pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2));
pipeline.AddLast("echo", new EchoClientHandler());
}));
IChannel clientChannel = await bootstrap.ConnectAsync(new IPEndPoint(ClientSettings.Host, ClientSettings.Port));
// 創建死循環,類同於While(true)
for (;;) // (4)
{
Console.WriteLine("input you data:");
// 根據設置創建緩存區大小
IByteBuffer initialMessage = Unpooled.Buffer(ClientSettings.Size); // (1)
string r = Console.ReadLine();
// 將數據流寫入緩衝區
initialMessage.WriteBytes(Encoding.UTF8.GetBytes(r ?? throw new InvalidOperationException())); // (2)
// 將緩衝區數據流寫入到管道中
await clientChannel.WriteAndFlushAsync(initialMessage); // (3)
if(r.Contains("bye"))
break;
}
Console.WriteLine("byebye");
await clientChannel.CloseAsync();
}
finally
{
await group.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1));
}
}
static void Main() => RunClientAsync().Wait();
}
}
初始化一個緩衝區的大小。
默認緩衝區接受的數據類型爲bytes[],固然這樣也更加便於序列化成流。
將緩衝區的流直接數據寫入到Channel管道中。該管道通常爲連接通信的另外一端(C端)。
創建死循環,這樣作的目的是爲了測試每次都必須從客戶端輸入的數據,經過服務端迴路一次後,再進行下一次的輸入操做。
namespace Echo.Client
{
using System;
using System.Text;
using DotNetty.Buffers;
using DotNetty.Transport.Channels;
public class EchoClientHandler : ChannelHandlerAdapter
{
readonly IByteBuffer initialMessage;
public override void ChannelActive(IChannelHandlerContext context) => context.WriteAndFlushAsync(this.initialMessage);
public override void ChannelRead(IChannelHandlerContext context, object message)
{
if (message is IByteBuffer byteBuffer)
{
Console.WriteLine("Received from server: " + byteBuffer.ToString(Encoding.UTF8));
}
}
public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush();
public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
{
Console.WriteLine("Exception: " + exception);
context.CloseAsync();
}
}
}
很是簡單,將數據流顯示到控制檯。
至此,咱們使用DotNetty框架搭建簡單的應答服務器就這樣作好了,很簡單,實現效果以下:
C端主動向S端主動發送數據後,S端收到數據,在控制檯打印出數據,並回傳給C端,固然,S端還能夠作不少不少的事情。
雖然DotNetty官方沒有提供任何技術文檔,但官方卻提供了詳細的調試記錄,不少時候,咱們學習者其實也能夠經過調試記錄來分析某一個功能的實現流程。咱們能夠經過將DotNetty的內部輸入輸出記錄打印到控制檯上。
InternalLoggerFactory.DefaultFactory.AddProvider(new ConsoleLoggerProvider((s, level) => true, false));
能夠看到服務端的打印記錄一下多出來了許多許多,有大部分是屬於DotNetty內部調試時的打印記錄,咱們只着重看以下的部分。
dbug: SRV-LSTN[0]
[id: 0x3e8afca1] HANDLER_ADDED
dbug: SRV-LSTN[0]
[id: 0x3e8afca1] REGISTERED (1)
dbug: SRV-LSTN[0]
[id: 0x3e8afca1] BIND: 0.0.0.0:8007 (2)
wait the client input
dbug: SRV-LSTN[0]
[id: 0x3e8afca1, 0.0.0.0:8007] ACTIVE (3)
dbug: SRV-LSTN[0]
[id: 0x3e8afca1, 0.0.0.0:8007] READ (4)
dbug: SRV-LSTN[0]
[id: 0x3e8afca1, 0.0.0.0:8007] RECEIVED: [id: 0x7bac2775, 127.0.0.1:64073 :> 127.0.0.1:8007] (5)
dbug: SRV-LSTN[0]
[id: 0x3e8afca1, 0.0.0.0:8007] RECEIVED_COMPLETE (6)
dbug: SRV-LSTN[0]
[id: 0x3e8afca1, 0.0.0.0:8007] READ (7)
dbug: SRV-CONN[0]
[id: 0x7bac2775, 127.0.0.1:64073 => 127.0.0.1:8007] HANDLER_ADDED (8)
dbug: SRV-CONN[0]
[id: 0x7bac2775, 127.0.0.1:64073 => 127.0.0.1:8007] REGISTERED (9)
dbug: SRV-CONN[0]
[id: 0x7bac2775, 127.0.0.1:64073 => 127.0.0.1:8007] ACTIVE (10)
dbug: SRV-CONN[0]
[id: 0x7bac2775, 127.0.0.1:64073 => 127.0.0.1:8007] READ (11)
dbug: DotNetty.Buffers.AbstractByteBuffer[0] (12)
-Dio.netty.buffer.bytebuf.checkAccessible: True
dbug: SRV-CONN[0]
[id: 0x7bac2775, 127.0.0.1:64073 => 127.0.0.1:8007] RECEIVED: 14B (13)
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|100000000| 00 0C 68 65 6C 6C 6F 20 77 6F 72 6C 64 21 |..hello world! |
+--------+-------------------------------------------------+----------------+
Received from client: hello world!
dbug: SRV-CONN[0] (14)
[id: 0x7bac2775, 127.0.0.1:64073 => 127.0.0.1:8007] WRITE: 2B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|100000000| 00 0C |.. |
+--------+-------------------------------------------------+----------------+
dbug: SRV-CONN[0] (15)
[id: 0x7bac2775, 127.0.0.1:64073 => 127.0.0.1:8007] WRITE: 12B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|100000000| 68 65 6C 6C 6F 20 77 6F 72 6C 64 21 |hello world! |
+--------+-------------------------------------------------+----------------+
dbug: SRV-CONN[0] (16)
[id: 0x7bac2775, 127.0.0.1:64073 => 127.0.0.1:8007] RECEIVED_COMPLETE
dbug: SRV-CONN[0] (17)
[id: 0x7bac2775, 127.0.0.1:64073 => 127.0.0.1:8007] FLUSH
dbug: SRV-CONN[0] (18)
[id: 0x7bac2775, 127.0.0.1:64073 => 127.0.0.1:8007] READ
咋一看,有18個操做,好像有點太多了,其實否則,還有不少不少的內部調試細節並沒打印到控制檯上。
經過手動創建的工做線程組,並將這組線程註冊到管道中,這個管道能夠是基於SOCKER,能夠基於IChannel(1);
綁定自定的IP地址和端口號到自定義管道上(2);
激活自定義管道(3);
開始讀取(其實也是開始監聽)(4);
收到來自id爲0x7bac2775的客戶端鏈接請求,創建鏈接,並繼續開始監聽(5)(6)(7);
從第8步開始,日誌已經變成id爲0x7bac2775的記錄了,固然同樣包含註冊管道,激活管道,開始監聽等等與S端如出一轍的操做(8)(9)(10)(11)
當筆者輸入一條"hello world!"數據後,DotNetty.Buffers.AbstractByteBuffer會進行數據類型檢查,以便確認能將數據放入到管道中。(12)
將數據發送到S端,數據大小爲14B,hello world前有兩個點,表明這是數據頭,緊接着再發送兩個點,但沒有任何數據,表明數據已經結束。DotNetty將數據的十六進制存儲位用易懂的方式表現了出來,很人性化。(13)(14)
S端收到數據沒有任何加工和處理,立刻將數據回傳到C端。(15)(16)
最後,當這個過程完成後,須要將緩存區的數據強制寫入到管道中,因此會執行一次Flush操做,整個傳輸完成。接下來,不論是C端仍是S端,繼續將本身的狀態改爲READ,用於監聽管道中的各類狀況,好比鏈接狀態,數據傳輸等等(17)。
對於剛開始接觸Socket編程的朋友而言,這是個噩夢,由於Socket編程的複雜性不會比多線程容易,甚至會更復雜。協議,壓縮,傳輸,多線程,監聽,流控制等等一系列問題擺在面前,所以而誕生了Netty這樣優秀的開源框架,可是Netty是個半成品,由於你須要基於他來實現本身想要的協議,傳輸等等自定義操做,而底層的內容,你徹底不用關心。不像某些框架,好比Newtonsoft.Json這樣的功能性框架,不用配置,不用自定義,直接拿來用就能夠了。
雖然DotNetty幫咱們實現了底層大量的操做,但若是不熟悉或者一點也不懂網絡通訊,一樣對上面的代碼是一頭霧水,爲什麼?行情須要,咱們程序員每天都在趕業務,哪有時間去了解和學習更多的細節...經過將調試記錄打印出來,並逐行挨個的對照代碼進行分析,就會慢慢開始理解最簡單的通訊流程了。
本篇只是實現了基於DotNetty最簡單的通信過程,也只是將數據作了一下回路,並沒作到任何與RPC有關的調用,下一篇咱們開始講這個例子深刻,介紹基於DotNetty的RPC調用。
原文地址: https://www.cnblogs.com/SteveLee/p/9860507.html