項目文件結構圖html
通訊主機:bootstrap
這部分由 Netty 實現,Netty是一個異步且非阻塞的通訊框架。TCP通訊實現服務端和客戶端的交互。緩存
Netty 的簡單描述以下:服務器
客戶端(調用方):負責發送要執行的指令。app
服務端(接收方):分爲主從線程。主線程負責接收指令,將指令存入緩存區中,等待執行完成後再通知客戶端(非阻塞);框架
從線程,有不止一個線程(異步),負責從緩存池中取出線程依次執行(按隊列前後順序執行),咱們經過程序來決定先執行哪一個,好比先解碼,後執行,再編碼。異步
上層接口:socket
/// <summary> /// 接受到消息的委託。 /// </summary> /// <param name="sender">消息發送者。</param> /// <param name="message">接收到的消息。</param> public delegate Task ReceivedDelegate(IMessageSender sender, TransportMessage message); /// <summary> /// 一個抽象的消息監聽者。 /// </summary> public interface IMessageListener { /// <summary> /// 接收到消息的事件。 /// </summary> event ReceivedDelegate Received; /// <summary> /// 觸發接收到消息事件。 /// </summary> /// <param name="sender">消息發送者。</param> /// <param name="message">接收到的消息。</param> /// <returns>一個任務。</returns> Task OnReceived(IMessageSender sender, TransportMessage message); }
客戶端:async
/// <summary> /// 消息監聽者。 /// </summary> public class DotNettyClientMessageListener : IMessageListener { #region Implementation of IMessageListener /// <summary> /// 接收到消息的事件。 /// </summary> public event ReceivedDelegate Received; /// <summary> /// 觸發接收到消息事件。 /// </summary> /// <param name="sender">消息發送者。</param> /// <param name="message">接收到的消息。</param> /// <returns>一個任務。</returns> public async Task OnReceived(IMessageSender sender, TransportMessage message) { if (Received == null) return; await Received(sender, message); } #endregion Implementation of IMessageListener }
服務端:ide
public class DotNettyServerMessageListener : IMessageListener, IDisposable { #region Field private readonly ILogger<DotNettyServerMessageListener> _logger; private readonly ITransportMessageDecoder _transportMessageDecoder; private readonly ITransportMessageEncoder _transportMessageEncoder; private IChannel _channel; #endregion Field #region Constructor public DotNettyServerMessageListener(ILogger<DotNettyServerMessageListener> logger, ITransportMessageCodecFactory codecFactory) { _logger = logger; _transportMessageEncoder = codecFactory.GetEncoder(); _transportMessageDecoder = codecFactory.GetDecoder(); } #endregion Constructor #region Implementation of IMessageListener public event ReceivedDelegate Received; /// <summary> /// 觸發接收到消息事件。 /// </summary> /// <param name="sender">消息發送者。</param> /// <param name="message">接收到的消息。</param> /// <returns>一個任務。</returns> public async Task OnReceived(IMessageSender sender, TransportMessage message) { if (Received == null) return; await Received(sender, message); } #endregion Implementation of IMessageListener public async Task StartAsync(EndPoint endPoint) { if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug($"準備啓動服務主機,監聽地址:{endPoint}。"); IEventLoopGroup bossGroup = new MultithreadEventLoopGroup(1); IEventLoopGroup workerGroup = new MultithreadEventLoopGroup();//Default eventLoopCount is Environment.ProcessorCount * 2 var bootstrap = new ServerBootstrap(); bootstrap .Channel<TcpServerSocketChannel>() .Option(ChannelOption.SoBacklog, 128) .ChildOption(ChannelOption.Allocator, PooledByteBufferAllocator.Default) .Group(bossGroup, workerGroup) .ChildHandler(new ActionChannelInitializer<IChannel>(channel => { var pipeline = channel.Pipeline; pipeline.AddLast(new LengthFieldPrepender(4)); pipeline.AddLast(new LengthFieldBasedFrameDecoder(int.MaxValue, 0, 4, 0, 4)); pipeline.AddLast(new TransportMessageChannelHandlerAdapter(_transportMessageDecoder)); pipeline.AddLast(new ServerHandler(async (contenxt, message) => { var sender = new DotNettyServerMessageSender(_transportMessageEncoder, contenxt); await OnReceived(sender, message); }, _logger)); })); try { _channel = await bootstrap.BindAsync(endPoint); if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug($"服務主機啓動成功,監聽地址:{endPoint}。"); } catch { _logger.LogError($"服務主機啓動失敗,監聽地址:{endPoint}。 "); } } public void CloseAsync() { Task.Run(async () => { await _channel.EventLoop.ShutdownGracefullyAsync(); await _channel.CloseAsync(); }).Wait(); } #region Implementation of IDisposable /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary> public void Dispose() { Task.Run(async () => { await _channel.DisconnectAsync(); }).Wait(); } #endregion Implementation of IDisposable #region Help Class private class ServerHandler : ChannelHandlerAdapter { private readonly Action<IChannelHandlerContext, TransportMessage> _readAction; private readonly ILogger _logger; public ServerHandler(Action<IChannelHandlerContext, TransportMessage> readAction, ILogger logger) { _readAction = readAction; _logger = logger; } #region Overrides of ChannelHandlerAdapter public override void ChannelRead(IChannelHandlerContext context, object message) { Task.Run(() => { var transportMessage = (TransportMessage)message; _readAction(context, transportMessage); }); } public override void ChannelReadComplete(IChannelHandlerContext context) { context.Flush(); } public override void ExceptionCaught(IChannelHandlerContext context, Exception exception) { context.CloseAsync();//客戶端主動斷開須要應答,不然socket變成CLOSE_WAIT狀態致使socket資源耗盡 if (_logger.IsEnabled(LogLevel.Error)) _logger.LogError(null,exception, $"與服務器:{context.Channel.RemoteAddress}通訊時發送了錯誤。"); } #endregion Overrides of ChannelHandlerAdapter } #endregion Help Class }
上圖中只有服務端的實現,服務端接收到指令後會回調執行器中的過程。
客戶端的時候須要在業務場景中來實現,根據業務不一樣,接收到服務端消息後執行的過程也不一樣。而後經過控制反轉,由程序自動找到該過程。
上層接口:
/// <summary> /// 一個抽象的服務執行器。 /// </summary> public interface IServiceExecutor { /// <summary> /// 執行。 /// </summary> /// <param name="sender">消息發送者。</param> /// <param name="message">調用消息。</param> Task ExecuteAsync(IMessageSender sender, TransportMessage message); }
實現類,代碼中沒有處理邏輯(服務發現、執行,後續有專題來談),只有輸出打印 "服務提供者接收到消息。" :
public class HttpServiceExecutor : IServiceExecutor { #region Field private readonly ILogger<HttpServiceExecutor> _logger; #endregion Field #region Constructor public HttpServiceExecutor(ILogger<HttpServiceExecutor> logger) { _logger = logger; } #endregion Constructor #region Implementation of IServiceExecutor /// <summary> /// 執行。 /// </summary> /// <param name="sender">消息發送者。</param> /// <param name="message">調用消息。</param> public async Task ExecuteAsync(IMessageSender sender, TransportMessage message) { if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("服務提供者接收到消息。"); return; } #endregion Implementation of IServiceExecutor }
將要發送的消息寫入到通訊管道中,服務端和客戶端都有實現。
上層接口:
/// <summary> /// 一個抽象的發送者。 /// </summary> public interface IMessageSender { /// <summary> /// 發送消息。 /// </summary> /// <param name="message">消息內容。</param> /// <returns>一個任務。</returns> Task SendAsync(TransportMessage message); /// <summary> /// 發送消息並清空緩衝區。 /// </summary> /// <param name="message">消息內容。</param> /// <returns>一個任務。</returns> Task SendAndFlushAsync(TransportMessage message); }
實現類基類:
/// <summary> /// 基於DotNetty的消息發送者基類。 /// </summary> public abstract class DotNettyMessageSender { private readonly ITransportMessageEncoder _transportMessageEncoder; protected DotNettyMessageSender(ITransportMessageEncoder transportMessageEncoder) { _transportMessageEncoder = transportMessageEncoder; } protected IByteBuffer GetByteBuffer(TransportMessage message) { var data = _transportMessageEncoder.Encode(message); //var buffer = PooledByteBufferAllocator.Default.Buffer(); return Unpooled.WrappedBuffer(data); } }
服務端:
/// <summary> /// 基於DotNetty服務端的消息發送者。 /// </summary> public class DotNettyServerMessageSender : DotNettyMessageSender, IMessageSender { private readonly IChannelHandlerContext _context; public DotNettyServerMessageSender(ITransportMessageEncoder transportMessageEncoder, IChannelHandlerContext context) : base(transportMessageEncoder) { _context = context; } #region Implementation of IMessageSender /// <summary> /// 發送消息。 /// </summary> /// <param name="message">消息內容。</param> /// <returns>一個任務。</returns> public async Task SendAsync(TransportMessage message) { var buffer = GetByteBuffer(message); await _context.WriteAsync(buffer); } /// <summary> /// 發送消息並清空緩衝區。 /// </summary> /// <param name="message">消息內容。</param> /// <returns>一個任務。</returns> public async Task SendAndFlushAsync(TransportMessage message) { var buffer = GetByteBuffer(message); await _context.WriteAndFlushAsync(buffer); } #endregion Implementation of IMessageSender }
客戶端:
/// <summary> /// 基於DotNetty客戶端的消息發送者。 /// </summary> public class DotNettyMessageClientSender : DotNettyMessageSender, IMessageSender, IDisposable { private readonly IChannel _channel; public DotNettyMessageClientSender(ITransportMessageEncoder transportMessageEncoder, IChannel channel) : base(transportMessageEncoder) { _channel = channel; } #region Implementation of IDisposable /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary> public void Dispose() { Task.Run(async () => { await _channel.DisconnectAsync(); }).Wait(); } #endregion Implementation of IDisposable #region Implementation of IMessageSender /// <summary> /// 發送消息。 /// </summary> /// <param name="message">消息內容。</param> /// <returns>一個任務。</returns> public async Task SendAsync(TransportMessage message) { var buffer = GetByteBuffer(message); await _channel.WriteAndFlushAsync(buffer); } /// <summary> /// 發送消息並清空緩衝區。 /// </summary> /// <param name="message">消息內容。</param> /// <returns>一個任務。</returns> public async Task SendAndFlushAsync(TransportMessage message) { var buffer = GetByteBuffer(message); await _channel.WriteAndFlushAsync(buffer); } #endregion Implementation of IMessageSender }
爲啥沒有服務端工廠呢,由於服務端是由服務端主機(Host)直接建立的,主機直接調用監聽器監聽端口。既然咱們重寫了通訊過程,就不能用微軟原有的WebHost,後續會講到如何搭建本身的主機。
客戶端工廠用來建立客戶端,而後與服務端主機通訊。
上層接口:
/// <summary> /// 一個抽象的傳輸客戶端工廠。 /// </summary> public interface ITransportClientFactory { /// <summary> /// 建立客戶端。 /// </summary> /// <param name="endPoint">終結點。</param> /// <returns>傳輸客戶端實例。</returns> Task<ITransportClient> CreateClientAsync(EndPoint endPoint); }
/// <summary> /// 一個抽象的傳輸客戶端。 /// </summary> public interface ITransportClient { /// <summary> /// 發送消息。 /// </summary> /// <param name="message">遠程調用消息模型。</param> /// <returns>遠程調用消息的傳輸消息。</returns> Task SendAsync(TransportMessage transportMessage); }
實現類:
/// <summary> /// 基於DotNetty的傳輸客戶端工廠。 /// </summary> public class DotNettyTransportClientFactory : ITransportClientFactory, IDisposable { #region Field private readonly ITransportMessageEncoder _transportMessageEncoder; private readonly ITransportMessageDecoder _transportMessageDecoder; private readonly ILogger<DotNettyTransportClientFactory> _logger; private readonly IServiceExecutor _serviceExecutor; private readonly ConcurrentDictionary<EndPoint, Lazy<Task<ITransportClient>>> _clients = new ConcurrentDictionary<EndPoint, Lazy<Task<ITransportClient>>>(); private readonly Bootstrap _bootstrap; private static readonly AttributeKey<IMessageSender> messageSenderKey = AttributeKey<IMessageSender>.ValueOf(typeof(DotNettyTransportClientFactory), nameof(IMessageSender)); private static readonly AttributeKey<IMessageListener> messageListenerKey = AttributeKey<IMessageListener>.ValueOf(typeof(DotNettyTransportClientFactory), nameof(IMessageListener)); private static readonly AttributeKey<EndPoint> origEndPointKey = AttributeKey<EndPoint>.ValueOf(typeof(DotNettyTransportClientFactory), nameof(EndPoint)); #endregion Field #region Constructor public DotNettyTransportClientFactory(ITransportMessageCodecFactory codecFactory, ILogger<DotNettyTransportClientFactory> logger) : this(codecFactory, logger, null) { } public DotNettyTransportClientFactory(ITransportMessageCodecFactory codecFactory, ILogger<DotNettyTransportClientFactory> logger, IServiceExecutor serviceExecutor) { _transportMessageEncoder = codecFactory.GetEncoder(); _transportMessageDecoder = codecFactory.GetDecoder(); _logger = logger; _serviceExecutor = serviceExecutor; _bootstrap = GetBootstrap(); _bootstrap.Handler(new ActionChannelInitializer<ISocketChannel>(c => { var pipeline = c.Pipeline; pipeline.AddLast(new LengthFieldPrepender(4)); pipeline.AddLast(new LengthFieldBasedFrameDecoder(int.MaxValue, 0, 4, 0, 4)); pipeline.AddLast(new TransportMessageChannelHandlerAdapter(_transportMessageDecoder)); pipeline.AddLast(new DefaultChannelHandler(this)); })); } #endregion Constructor #region Implementation of ITransportClientFactory /// <summary> /// 建立客戶端。 /// </summary> /// <param name="endPoint">終結點。</param> /// <returns>傳輸客戶端實例。</returns> public async Task<ITransportClient> CreateClientAsync(EndPoint endPoint) { var key = endPoint; if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug($"準備爲服務端地址:{key}建立客戶端。"); try { return await _clients.GetOrAdd(key , k => new Lazy<Task<ITransportClient>>(async () => { //客戶端對象 var bootstrap = _bootstrap; //異步鏈接返回channel var channel = await bootstrap.ConnectAsync(k); var messageListener = new DotNettyClientMessageListener(); //設置監聽 channel.GetAttribute(messageListenerKey).Set(messageListener); //實例化發送者 var messageSender = new DotNettyMessageClientSender(_transportMessageEncoder, channel); //設置channel屬性 channel.GetAttribute(messageSenderKey).Set(messageSender); channel.GetAttribute(origEndPointKey).Set(k); //建立客戶端 var client = new DotNettyTransportClient(messageSender, messageListener, _logger, _serviceExecutor); return client; } )).Value;//返回實例 } catch { throw; } } #endregion Implementation of ITransportClientFactory #region Implementation of IDisposable /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary> public void Dispose() { foreach (var client in _clients.Values.Where(i => i.IsValueCreated)) { (client.Value as IDisposable)?.Dispose(); } } #endregion Implementation of IDisposable private static Bootstrap GetBootstrap() { IEventLoopGroup group; var bootstrap = new Bootstrap(); group = new MultithreadEventLoopGroup(); bootstrap.Channel<TcpServerSocketChannel>(); bootstrap .Channel<TcpSocketChannel>() .Option(ChannelOption.TcpNodelay, true) .Option(ChannelOption.Allocator, PooledByteBufferAllocator.Default) .Group(group); return bootstrap; } protected class DefaultChannelHandler : ChannelHandlerAdapter { private readonly DotNettyTransportClientFactory _factory; public DefaultChannelHandler(DotNettyTransportClientFactory factory) { this._factory = factory; } #region Overrides of ChannelHandlerAdapter public override void ChannelInactive(IChannelHandlerContext context) { _factory._clients.TryRemove(context.Channel.GetAttribute(origEndPointKey).Get(), out var value); } public override void ChannelRead(IChannelHandlerContext context, object message) { var transportMessage = message as TransportMessage; var messageListener = context.Channel.GetAttribute(messageListenerKey).Get(); var messageSender = context.Channel.GetAttribute(messageSenderKey).Get(); messageListener.OnReceived(messageSender, transportMessage); } #endregion Overrides of ChannelHandlerAdapter } }
/// <summary> /// 一個默認的傳輸客戶端實現。 /// </summary> public class DotNettyTransportClient : ITransportClient, IDisposable { #region Field private readonly IMessageSender _messageSender; private readonly IMessageListener _messageListener; private readonly ILogger _logger; private readonly IServiceExecutor _serviceExecutor; #endregion Field #region Constructor public DotNettyTransportClient(IMessageSender messageSender, IMessageListener messageListener, ILogger logger, IServiceExecutor serviceExecutor) { _messageSender = messageSender; _messageListener = messageListener; _logger = logger; _serviceExecutor = serviceExecutor; messageListener.Received += MessageListener_Received; } #endregion Constructor #region Implementation of ITransportClient /// <summary> /// 發送消息。 /// </summary> /// <param name="message">遠程調用消息模型。</param> /// <returns>遠程調用消息的傳輸消息。</returns> public async Task SendAsync(TransportMessage transportMessage) { try { if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug("準備發送消息。"); try { //發送 await _messageSender.SendAndFlushAsync(transportMessage); } catch (Exception exception) { throw new Exception("與服務端通信時發生了異常。", exception); } if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug("消息發送成功。"); } catch (Exception exception) { if (_logger.IsEnabled(LogLevel.Error)) _logger.LogError(null,exception, "消息發送失敗。"); throw; } } #endregion Implementation of ITransportClient #region Implementation of IDisposable /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary> public void Dispose() { (_messageSender as IDisposable)?.Dispose(); (_messageListener as IDisposable)?.Dispose(); } #endregion Implementation of IDisposable #region Private Method private async Task MessageListener_Received(IMessageSender sender, TransportMessage message) { if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("服務消費者接收到消息。"); if (_serviceExecutor != null) await _serviceExecutor.ExecuteAsync(sender, message); } #endregion Private Method }
上節中咱們用MessagePack實現了序列化與反序列化,本節爲通訊,天然離不開消息序列化。
須要繼承自 DotNetty.Transport.Channels.ChannelHandlerAdapter,才能被 netty 調用:
public class TransportMessageChannelHandlerAdapter : ChannelHandlerAdapter { private readonly ITransportMessageDecoder _transportMessageDecoder; public TransportMessageChannelHandlerAdapter(ITransportMessageDecoder transportMessageDecoder) { _transportMessageDecoder = transportMessageDecoder; } #region Overrides of ChannelHandlerAdapter public override void ChannelRead(IChannelHandlerContext context, object message) { var buffer = (IByteBuffer)message; var data = new byte[buffer.ReadableBytes]; buffer.ReadBytes(data); var transportMessage = _transportMessageDecoder.Decode(data); context.FireChannelRead(transportMessage); ReferenceCountUtil.Release(buffer); } #endregion Overrides of ChannelHandlerAdapter }
用於啓動通訊監聽端口
內部包含消息監聽器(_serverMessageListener)和消息執行器(_serverMessageListener)。
接口:
public interface ITransportHost : IDisposable { /// <summary> /// 啓動主機。 /// </summary> /// <param name="endPoint">主機終結點。</param> /// <returns>一個任務。</returns> Task StartAsync(EndPoint endPoint); /// <summary> /// 啓動主機。 /// </summary> /// <param name="endPoint">ip地址。</param> Task StartAsync(string ip, int port); }
實現類:
public class DotNettyTransportHost : ITransportHost { #region Field private IServiceExecutor _serviceExecutor; public IServiceExecutor ServiceExecutor { get => _serviceExecutor; } private readonly Func<EndPoint, Task<IMessageListener>> _messageListenerFactory; private IMessageListener _serverMessageListener; #endregion Field public DotNettyTransportHost(Func<EndPoint, Task<IMessageListener>> messageListenerFactory, IServiceExecutor serviceExecutor) { _messageListenerFactory = messageListenerFactory; _serviceExecutor = serviceExecutor; } /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary> public void Dispose() { (_serverMessageListener as IDisposable)?.Dispose(); } /// <summary> /// 啓動主機。 /// </summary> /// <param name="endPoint">主機終結點。</param> /// <returns>一個任務。</returns> public async Task StartAsync(EndPoint endPoint) { if (_serverMessageListener != null) return; _serverMessageListener = await _messageListenerFactory(endPoint); _serverMessageListener.Received += MessageListener_Received; } public async Task StartAsync(string ip, int port) { if (_serverMessageListener != null) return; _serverMessageListener = await _messageListenerFactory(new IPEndPoint(IPAddress.Parse(ip), port)); _serverMessageListener.Received += MessageListener_Received; //await StartAsync(new IPEndPoint(IPAddress.Parse(ip), port)); } /// <summary> /// 監聽並回調 /// </summary> /// <param name="sender">消息發送器</param> /// <param name="message">監聽到的消息</param> /// <returns></returns> private async Task MessageListener_Received(IMessageSender sender, TransportMessage message) { await _serviceExecutor.ExecuteAsync(sender, message); } }