(七)分佈式通訊----Netty實現NIO通訊

==>>點擊查看本系列文章目錄

 

目錄

1. 消息監聽器

2. 指令執行器

3. 消息發送器

4. 客戶端工廠

5. 序列化工具

6. 通訊主機

 

項目文件結構圖html

 

 通訊主機:bootstrap

 

 

1. 消息監聽器(黃色框)

這部分由 Netty 實現,Netty是一個異步且非阻塞的通訊框架。TCP通訊實現服務端和客戶端的交互。緩存

Netty 的簡單描述以下:服務器

客戶端(調用方):負責發送要執行的指令。app

服務端(接收方):分爲主從線程。主線程負責接收指令,將指令存入緩存區中,等待執行完成後再通知客戶端(非阻塞);框架

                從線程,有不止一個線程(異步),負責從緩存池中取出線程依次執行(按隊列前後順序執行),咱們經過程序來決定先執行哪一個,好比先解碼,後執行,再編碼。異步

上層接口:socket

    /// <summary>
    /// 接受到消息的委託。
    /// </summary>
    /// <param name="sender">消息發送者。</param>
    /// <param name="message">接收到的消息。</param>
    public delegate Task ReceivedDelegate(IMessageSender sender, TransportMessage message);

    /// <summary>
    /// 一個抽象的消息監聽者。
    /// </summary>
    public interface IMessageListener
    {
        /// <summary>
        /// 接收到消息的事件。
        /// </summary>
        event ReceivedDelegate Received;

        /// <summary>
        /// 觸發接收到消息事件。
        /// </summary>
        /// <param name="sender">消息發送者。</param>
        /// <param name="message">接收到的消息。</param>
        /// <returns>一個任務。</returns>
        Task OnReceived(IMessageSender sender, TransportMessage message);
    }

 

客戶端:async

/// <summary>
    /// 消息監聽者。
    /// </summary>
    public class DotNettyClientMessageListener : IMessageListener
    {
        #region Implementation of IMessageListener

        /// <summary>
        /// 接收到消息的事件。
        /// </summary>
        public event ReceivedDelegate Received;

        /// <summary>
        /// 觸發接收到消息事件。
        /// </summary>
        /// <param name="sender">消息發送者。</param>
        /// <param name="message">接收到的消息。</param>
        /// <returns>一個任務。</returns>
        public async Task OnReceived(IMessageSender sender, TransportMessage message)
        {
            if (Received == null)
                return;
            await Received(sender, message);
        }

        #endregion Implementation of IMessageListener
    }

 

服務端:ide

    public class DotNettyServerMessageListener : IMessageListener, IDisposable
    {
        #region Field

        private readonly ILogger<DotNettyServerMessageListener> _logger;
        private readonly ITransportMessageDecoder _transportMessageDecoder;
        private readonly ITransportMessageEncoder _transportMessageEncoder;
        private IChannel _channel;

        #endregion Field

        #region Constructor

        public DotNettyServerMessageListener(ILogger<DotNettyServerMessageListener> logger, ITransportMessageCodecFactory codecFactory)
        {
            _logger = logger;
            _transportMessageEncoder = codecFactory.GetEncoder();
            _transportMessageDecoder = codecFactory.GetDecoder();
        }

        #endregion Constructor

        #region Implementation of IMessageListener

        public event ReceivedDelegate Received;

        /// <summary>
        /// 觸發接收到消息事件。
        /// </summary>
        /// <param name="sender">消息發送者。</param>
        /// <param name="message">接收到的消息。</param>
        /// <returns>一個任務。</returns>
        public async Task OnReceived(IMessageSender sender, TransportMessage message)
        {
            if (Received == null)
                return;
            await Received(sender, message);
        }

        #endregion Implementation of IMessageListener

        public async Task StartAsync(EndPoint endPoint)
        {
            if (_logger.IsEnabled(LogLevel.Debug))
                _logger.LogDebug($"準備啓動服務主機,監聽地址:{endPoint}。");

            IEventLoopGroup bossGroup = new MultithreadEventLoopGroup(1);
            IEventLoopGroup workerGroup = new MultithreadEventLoopGroup();//Default eventLoopCount is Environment.ProcessorCount * 2
            var bootstrap = new ServerBootstrap();
            bootstrap
            .Channel<TcpServerSocketChannel>()
            .Option(ChannelOption.SoBacklog, 128)
            .ChildOption(ChannelOption.Allocator, PooledByteBufferAllocator.Default)
            .Group(bossGroup, workerGroup)
            .ChildHandler(new ActionChannelInitializer<IChannel>(channel =>
            {
                var pipeline = channel.Pipeline;
                pipeline.AddLast(new LengthFieldPrepender(4));
                pipeline.AddLast(new LengthFieldBasedFrameDecoder(int.MaxValue, 0, 4, 0, 4));
                pipeline.AddLast(new TransportMessageChannelHandlerAdapter(_transportMessageDecoder));
                pipeline.AddLast(new ServerHandler(async (contenxt, message) =>
                {
                    var sender = new DotNettyServerMessageSender(_transportMessageEncoder, contenxt);
                    await OnReceived(sender, message);
                }, _logger));
            }));
            try
            {
                _channel = await bootstrap.BindAsync(endPoint);
                if (_logger.IsEnabled(LogLevel.Debug))
                    _logger.LogDebug($"服務主機啓動成功,監聽地址:{endPoint}。");
            }
            catch
            {
                _logger.LogError($"服務主機啓動失敗,監聽地址:{endPoint}。 ");
            }
        }

        public void CloseAsync()
        {
            Task.Run(async () =>
            {
                await _channel.EventLoop.ShutdownGracefullyAsync();
                await _channel.CloseAsync();
            }).Wait();
        }

        #region Implementation of IDisposable

        /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
        public void Dispose()
        {
            Task.Run(async () =>
            {
                await _channel.DisconnectAsync();
            }).Wait();
        }

        #endregion Implementation of IDisposable

        #region Help Class

        private class ServerHandler : ChannelHandlerAdapter
        {
            private readonly Action<IChannelHandlerContext, TransportMessage> _readAction;
            private readonly ILogger _logger;

            public ServerHandler(Action<IChannelHandlerContext, TransportMessage> readAction, ILogger logger)
            {
                _readAction = readAction;
                _logger = logger;
            }

            #region Overrides of ChannelHandlerAdapter

            public override void ChannelRead(IChannelHandlerContext context, object message)
            {
                Task.Run(() =>
                {
                    var transportMessage = (TransportMessage)message;
                    _readAction(context, transportMessage);
                });
            }

            public override void ChannelReadComplete(IChannelHandlerContext context)
            {
                context.Flush();
            }

            public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
            {
                context.CloseAsync();//客戶端主動斷開須要應答,不然socket變成CLOSE_WAIT狀態致使socket資源耗盡
                if (_logger.IsEnabled(LogLevel.Error))
                    _logger.LogError(null,exception, $"與服務器:{context.Channel.RemoteAddress}通訊時發送了錯誤。");
            }

            #endregion Overrides of ChannelHandlerAdapter
        }

        #endregion Help Class
    }

 

2. 指令執行器(紅色框)

上圖中只有服務端的實現,服務端接收到指令後會回調執行器中的過程。

客戶端的時候須要在業務場景中來實現,根據業務不一樣,接收到服務端消息後執行的過程也不一樣。而後經過控制反轉,由程序自動找到該過程。

上層接口:

    /// <summary>
    /// 一個抽象的服務執行器。
    /// </summary>
    public interface IServiceExecutor
    {
        /// <summary>
        /// 執行。
        /// </summary>
        /// <param name="sender">消息發送者。</param>
        /// <param name="message">調用消息。</param>
        Task ExecuteAsync(IMessageSender sender, TransportMessage message);
    }

 

實現類,代碼中沒有處理邏輯(服務發現、執行,後續有專題來談),只有輸出打印 "服務提供者接收到消息。" :

    public class HttpServiceExecutor : IServiceExecutor
    {
        #region Field
        
        private readonly ILogger<HttpServiceExecutor> _logger;
        #endregion Field

        #region Constructor

        public HttpServiceExecutor(ILogger<HttpServiceExecutor> logger)
        {
            _logger = logger;
        }

        #endregion Constructor

        #region Implementation of IServiceExecutor

        /// <summary>
        /// 執行。
        /// </summary>
        /// <param name="sender">消息發送者。</param>
        /// <param name="message">調用消息。</param>
        public async Task ExecuteAsync(IMessageSender sender, TransportMessage message)
        {
            if (_logger.IsEnabled(LogLevel.Trace))
                _logger.LogTrace("服務提供者接收到消息。");
            return;

        }
        #endregion Implementation of IServiceExecutor
    }

 

3. 消息發送器(藍色框)

將要發送的消息寫入到通訊管道中,服務端和客戶端都有實現。

上層接口:

    /// <summary>
    /// 一個抽象的發送者。
    /// </summary>
    public interface IMessageSender
    {
        /// <summary>
        /// 發送消息。
        /// </summary>
        /// <param name="message">消息內容。</param>
        /// <returns>一個任務。</returns>
        Task SendAsync(TransportMessage message);

        /// <summary>
        /// 發送消息並清空緩衝區。
        /// </summary>
        /// <param name="message">消息內容。</param>
        /// <returns>一個任務。</returns>
        Task SendAndFlushAsync(TransportMessage message);
    }

 

實現類基類:

    /// <summary>
    /// 基於DotNetty的消息發送者基類。
    /// </summary>
    public abstract class DotNettyMessageSender
    {
        private readonly ITransportMessageEncoder _transportMessageEncoder;

        protected DotNettyMessageSender(ITransportMessageEncoder transportMessageEncoder)
        {
            _transportMessageEncoder = transportMessageEncoder;
        }

        protected IByteBuffer GetByteBuffer(TransportMessage message)
        {
            var data = _transportMessageEncoder.Encode(message);
            //var buffer = PooledByteBufferAllocator.Default.Buffer();
            return Unpooled.WrappedBuffer(data);
        }
    }

服務端:

    /// <summary>
    /// 基於DotNetty服務端的消息發送者。
    /// </summary>
    public class DotNettyServerMessageSender : DotNettyMessageSender, IMessageSender
    {
        private readonly IChannelHandlerContext _context;

        public DotNettyServerMessageSender(ITransportMessageEncoder transportMessageEncoder, IChannelHandlerContext context) : base(transportMessageEncoder)
        {
            _context = context;
        }

        #region Implementation of IMessageSender

        /// <summary>
        /// 發送消息。
        /// </summary>
        /// <param name="message">消息內容。</param>
        /// <returns>一個任務。</returns>
        public async Task SendAsync(TransportMessage message)
        {
            var buffer = GetByteBuffer(message);
            await _context.WriteAsync(buffer);
        }

        /// <summary>
        /// 發送消息並清空緩衝區。
        /// </summary>
        /// <param name="message">消息內容。</param>
        /// <returns>一個任務。</returns>
        public async Task SendAndFlushAsync(TransportMessage message)
        {
            var buffer = GetByteBuffer(message);
            await _context.WriteAndFlushAsync(buffer);
        }

        #endregion Implementation of IMessageSender
    }

客戶端:

/// <summary>
    /// 基於DotNetty客戶端的消息發送者。
    /// </summary>
    public class DotNettyMessageClientSender : DotNettyMessageSender, IMessageSender, IDisposable
    {
        private readonly IChannel _channel;

        public DotNettyMessageClientSender(ITransportMessageEncoder transportMessageEncoder, IChannel channel) : base(transportMessageEncoder)
        {
            _channel = channel;
        }

        #region Implementation of IDisposable

        /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
        public void Dispose()
        {
            Task.Run(async () =>
            {
                await _channel.DisconnectAsync();
            }).Wait();
        }

        #endregion Implementation of IDisposable

        #region Implementation of IMessageSender

        /// <summary>
        /// 發送消息。
        /// </summary>
        /// <param name="message">消息內容。</param>
        /// <returns>一個任務。</returns>
        public async Task SendAsync(TransportMessage message)
        {
            var buffer = GetByteBuffer(message);
            await _channel.WriteAndFlushAsync(buffer);
        }

        /// <summary>
        /// 發送消息並清空緩衝區。
        /// </summary>
        /// <param name="message">消息內容。</param>
        /// <returns>一個任務。</returns>
        public async Task SendAndFlushAsync(TransportMessage message)
        {
            var buffer = GetByteBuffer(message);
            await _channel.WriteAndFlushAsync(buffer);
        }

        #endregion Implementation of IMessageSender
    }

 

4. 客戶端工廠(綠色框)

爲啥沒有服務端工廠呢,由於服務端是由服務端主機(Host)直接建立的,主機直接調用監聽器監聽端口。既然咱們重寫了通訊過程,就不能用微軟原有的WebHost,後續會講到如何搭建本身的主機。

客戶端工廠用來建立客戶端,而後與服務端主機通訊。

上層接口:

    /// <summary>
    /// 一個抽象的傳輸客戶端工廠。
    /// </summary>
    public interface ITransportClientFactory
    {
        /// <summary>
        /// 建立客戶端。
        /// </summary>
        /// <param name="endPoint">終結點。</param>
        /// <returns>傳輸客戶端實例。</returns>
        Task<ITransportClient> CreateClientAsync(EndPoint endPoint);
    }

 

    /// <summary>
    /// 一個抽象的傳輸客戶端。
    /// </summary>
    public interface ITransportClient
    {
        /// <summary>
        /// 發送消息。
        /// </summary>
        /// <param name="message">遠程調用消息模型。</param>
        /// <returns>遠程調用消息的傳輸消息。</returns>
        Task SendAsync(TransportMessage transportMessage);
    }

 

實現類:

    /// <summary>
    /// 基於DotNetty的傳輸客戶端工廠。
    /// </summary>
    public class DotNettyTransportClientFactory : ITransportClientFactory, IDisposable
    {
        #region Field

        private readonly ITransportMessageEncoder _transportMessageEncoder;
        private readonly ITransportMessageDecoder _transportMessageDecoder;
        private readonly ILogger<DotNettyTransportClientFactory> _logger;
        private readonly IServiceExecutor _serviceExecutor;
        private readonly ConcurrentDictionary<EndPoint, Lazy<Task<ITransportClient>>> _clients = new ConcurrentDictionary<EndPoint, Lazy<Task<ITransportClient>>>();
        private readonly Bootstrap _bootstrap;

        private static readonly AttributeKey<IMessageSender> messageSenderKey = AttributeKey<IMessageSender>.ValueOf(typeof(DotNettyTransportClientFactory), nameof(IMessageSender));
        private static readonly AttributeKey<IMessageListener> messageListenerKey = AttributeKey<IMessageListener>.ValueOf(typeof(DotNettyTransportClientFactory), nameof(IMessageListener));
        private static readonly AttributeKey<EndPoint> origEndPointKey = AttributeKey<EndPoint>.ValueOf(typeof(DotNettyTransportClientFactory), nameof(EndPoint));

        #endregion Field

        #region Constructor

        public DotNettyTransportClientFactory(ITransportMessageCodecFactory codecFactory, ILogger<DotNettyTransportClientFactory> logger)
            : this(codecFactory,  logger, null)
        {
        }

        public DotNettyTransportClientFactory(ITransportMessageCodecFactory codecFactory, ILogger<DotNettyTransportClientFactory> logger, IServiceExecutor serviceExecutor)
        {
            _transportMessageEncoder = codecFactory.GetEncoder();
            _transportMessageDecoder = codecFactory.GetDecoder();
            _logger = logger;
            _serviceExecutor = serviceExecutor;
            _bootstrap = GetBootstrap();
            _bootstrap.Handler(new ActionChannelInitializer<ISocketChannel>(c =>
            {
                var pipeline = c.Pipeline;
                pipeline.AddLast(new LengthFieldPrepender(4));
                pipeline.AddLast(new LengthFieldBasedFrameDecoder(int.MaxValue, 0, 4, 0, 4));
                pipeline.AddLast(new TransportMessageChannelHandlerAdapter(_transportMessageDecoder));
                pipeline.AddLast(new DefaultChannelHandler(this));
            }));
        }

        #endregion Constructor

        #region Implementation of ITransportClientFactory

        /// <summary>
        /// 建立客戶端。
        /// </summary>
        /// <param name="endPoint">終結點。</param>
        /// <returns>傳輸客戶端實例。</returns>
        public async Task<ITransportClient> CreateClientAsync(EndPoint endPoint)
        {
            var key = endPoint;
            if (_logger.IsEnabled(LogLevel.Debug))
                _logger.LogDebug($"準備爲服務端地址:{key}建立客戶端。");
            try
            {
                return await _clients.GetOrAdd(key
                    , k => new Lazy<Task<ITransportClient>>(async () =>
                    {
                        //客戶端對象
                        var bootstrap = _bootstrap;
                        //異步鏈接返回channel
                        var channel = await bootstrap.ConnectAsync(k);
                        var messageListener = new DotNettyClientMessageListener();
                        //設置監聽
                        channel.GetAttribute(messageListenerKey).Set(messageListener);
                        //實例化發送者
                        var messageSender = new DotNettyMessageClientSender(_transportMessageEncoder, channel);
                        //設置channel屬性
                        channel.GetAttribute(messageSenderKey).Set(messageSender);
                        channel.GetAttribute(origEndPointKey).Set(k);
                        //建立客戶端
                        var client = new DotNettyTransportClient(messageSender, messageListener, _logger, _serviceExecutor);
                        return client;
                    }
                    )).Value;//返回實例
            }
            catch
            {
                throw;
            }
        }

        #endregion Implementation of ITransportClientFactory

        #region Implementation of IDisposable

        /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
        public void Dispose()
        {
            foreach (var client in _clients.Values.Where(i => i.IsValueCreated))
            {
                (client.Value as IDisposable)?.Dispose();
            }
        }

        #endregion Implementation of IDisposable

        private static Bootstrap GetBootstrap()
        {
            IEventLoopGroup group;

            var bootstrap = new Bootstrap();

            group = new MultithreadEventLoopGroup();
            bootstrap.Channel<TcpServerSocketChannel>();
            
            bootstrap
                .Channel<TcpSocketChannel>()
                .Option(ChannelOption.TcpNodelay, true)
                .Option(ChannelOption.Allocator, PooledByteBufferAllocator.Default)
                .Group(group);

            return bootstrap;
        }

        protected class DefaultChannelHandler : ChannelHandlerAdapter
        {
            private readonly DotNettyTransportClientFactory _factory;

            public DefaultChannelHandler(DotNettyTransportClientFactory factory)
            {
                this._factory = factory;
            }

            #region Overrides of ChannelHandlerAdapter

            public override void ChannelInactive(IChannelHandlerContext context)
            {
                _factory._clients.TryRemove(context.Channel.GetAttribute(origEndPointKey).Get(), out var value);
            }

            public override void ChannelRead(IChannelHandlerContext context, object message)
            {
                var transportMessage = message as TransportMessage;

                var messageListener = context.Channel.GetAttribute(messageListenerKey).Get();
                var messageSender = context.Channel.GetAttribute(messageSenderKey).Get();
                messageListener.OnReceived(messageSender, transportMessage);
            }

            #endregion Overrides of ChannelHandlerAdapter
        }
    }

 

    /// <summary>
    /// 一個默認的傳輸客戶端實現。
    /// </summary>
    public class DotNettyTransportClient : ITransportClient, IDisposable
    {
        #region Field

        private readonly IMessageSender _messageSender;
        private readonly IMessageListener _messageListener;
        private readonly ILogger _logger;
        private readonly IServiceExecutor _serviceExecutor;

        #endregion Field

        #region Constructor

        public DotNettyTransportClient(IMessageSender messageSender, IMessageListener messageListener, ILogger logger, IServiceExecutor serviceExecutor)
        {
            _messageSender = messageSender;
            _messageListener = messageListener;
            _logger = logger;
            _serviceExecutor = serviceExecutor;
            messageListener.Received += MessageListener_Received;
        }

        #endregion Constructor

        #region Implementation of ITransportClient

        /// <summary>
        /// 發送消息。
        /// </summary>
        /// <param name="message">遠程調用消息模型。</param>
        /// <returns>遠程調用消息的傳輸消息。</returns>
        public async Task SendAsync(TransportMessage transportMessage)
        {
            try
            {
                if (_logger.IsEnabled(LogLevel.Debug))
                    _logger.LogDebug("準備發送消息。");

                try
                {
                    //發送
                    await _messageSender.SendAndFlushAsync(transportMessage);
                }
                catch (Exception exception)
                {
                    throw new Exception("與服務端通信時發生了異常。", exception);
                }

                if (_logger.IsEnabled(LogLevel.Debug))
                    _logger.LogDebug("消息發送成功。");
                
            }
            catch (Exception exception)
            {
                if (_logger.IsEnabled(LogLevel.Error))
                    _logger.LogError(null,exception, "消息發送失敗。");
                throw;
            }
        }

        #endregion Implementation of ITransportClient

        #region Implementation of IDisposable

        /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
        public void Dispose()
        {
            (_messageSender as IDisposable)?.Dispose();
            (_messageListener as IDisposable)?.Dispose();
        }

        #endregion Implementation of IDisposable

        #region Private Method

        private async Task MessageListener_Received(IMessageSender sender, TransportMessage message)
        {
            if (_logger.IsEnabled(LogLevel.Trace))
                _logger.LogTrace("服務消費者接收到消息。");
            
            if (_serviceExecutor != null)
                await _serviceExecutor.ExecuteAsync(sender, message);
        }

        #endregion Private Method
    }

 

5. 序列化工具(白色框)

上節中咱們用MessagePack實現了序列化與反序列化,本節爲通訊,天然離不開消息序列化。

須要繼承自 DotNetty.Transport.Channels.ChannelHandlerAdapter,才能被 netty 調用:

public class TransportMessageChannelHandlerAdapter : ChannelHandlerAdapter
    {
        private readonly ITransportMessageDecoder _transportMessageDecoder;

        public TransportMessageChannelHandlerAdapter(ITransportMessageDecoder transportMessageDecoder)
        {
            _transportMessageDecoder = transportMessageDecoder;
        }

        #region Overrides of ChannelHandlerAdapter

        public override void ChannelRead(IChannelHandlerContext context, object message)
        {
            var buffer = (IByteBuffer)message;
            var data = new byte[buffer.ReadableBytes];
            buffer.ReadBytes(data);
            var transportMessage = _transportMessageDecoder.Decode(data);
            context.FireChannelRead(transportMessage);
            ReferenceCountUtil.Release(buffer);
        }

        #endregion Overrides of ChannelHandlerAdapter
    }

 

6. 通訊主機(黃色框)

用於啓動通訊監聽端口

內部包含消息監聽器(_serverMessageListener)和消息執行器(_serverMessageListener)。

接口:

public interface ITransportHost : IDisposable
    {
        /// <summary>
        /// 啓動主機。
        /// </summary>
        /// <param name="endPoint">主機終結點。</param>
        /// <returns>一個任務。</returns>
        Task StartAsync(EndPoint endPoint);

        /// <summary>
        /// 啓動主機。
        /// </summary>
        /// <param name="endPoint">ip地址。</param>
        Task StartAsync(string ip, int port);
    }

實現類:

public class DotNettyTransportHost : ITransportHost
    {
        #region Field

        private IServiceExecutor _serviceExecutor;
        public IServiceExecutor ServiceExecutor { get => _serviceExecutor; }
        private readonly Func<EndPoint, Task<IMessageListener>> _messageListenerFactory;
        private IMessageListener _serverMessageListener;

        #endregion Field

        public DotNettyTransportHost(Func<EndPoint, Task<IMessageListener>> messageListenerFactory, IServiceExecutor serviceExecutor)
        {
            _messageListenerFactory = messageListenerFactory;
            _serviceExecutor = serviceExecutor;
        }

        /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
        public void Dispose()
        {
            (_serverMessageListener as IDisposable)?.Dispose();
        }

        /// <summary>
        /// 啓動主機。
        /// </summary>
        /// <param name="endPoint">主機終結點。</param>
        /// <returns>一個任務。</returns>
        public async Task StartAsync(EndPoint endPoint)
        {
            if (_serverMessageListener != null)
                return;
            _serverMessageListener = await _messageListenerFactory(endPoint);
            _serverMessageListener.Received += MessageListener_Received;
        }

        public async Task StartAsync(string ip, int port)
        {
            if (_serverMessageListener != null)
                return;
            _serverMessageListener = await _messageListenerFactory(new IPEndPoint(IPAddress.Parse(ip), port));
            _serverMessageListener.Received += MessageListener_Received;
            //await StartAsync(new IPEndPoint(IPAddress.Parse(ip), port));
        }

        /// <summary>
        /// 監聽並回調
        /// </summary>
        /// <param name="sender">消息發送器</param>
        /// <param name="message">監聽到的消息</param>
        /// <returns></returns>
        private async Task MessageListener_Received(IMessageSender sender, TransportMessage message)
        {
            await _serviceExecutor.ExecuteAsync(sender, message);
        }
    }
相關文章
相關標籤/搜索