DotNetty 版 mqtt 開源客戶端 (MqttFx)

1、DotNetty背景介紹html

    某天發現 dotnet  是個好東西,就找了個項目來練練手。因而有了本文的 Mqtt 客戶端   (github:  MqttFx )git

DotNetty是微軟的Azure團隊,使用C#實現的Netty的版本發佈。不但使用了C#和.Net平臺的技術特色,而且保留了Netty原來絕大部分的編程接口。讓咱們在使用時,徹底能夠依照Netty官方的教程來學習和使用DotNetty應用程序。 github

DotNetty同時也是開源的,它的源代碼託管在Github上: https://github.com/azure/dotnetty編程

Netty 的官方文檔 : http://netty.io/wiki/all-documents.html服務器

 

2、Packet網絡

    套件裏是有個 DotNetty.Codecs.Mqtt, 本項目沒有使用。直接寫了一個。ide

FixedHeader:  固定報頭學習

    /// <summary>
    /// 固定報頭
    /// </summary>
    public class FixedHeader
    {
        /// <summary>
        /// 報文類型
        /// </summary>
        public PacketType PacketType { get; set; }
        /// <summary>
        /// 重發標誌
        /// </summary>
        public bool Dup { get; set; }
        /// <summary>
        /// 服務質量等級
        /// </summary>
        public MqttQos Qos { get; set; }
        /// <summary>
        /// 保留標誌
        /// </summary>
        public bool Retain { get; set; }
        /// <summary>
        /// 剩餘長度
        /// </summary>
        public int RemaingLength { internal get; set; }

        public FixedHeader(PacketType packetType)
        {
            PacketType = packetType;
        }

        public FixedHeader(byte signature, int remainingLength)
        {
            PacketType = (PacketType)((signature & 0xf0) >> 4);
            Dup = ((signature & 0x08) >> 3) > 0;
            Qos = (MqttQos)((signature & 0x06) >> 1);
            Retain = (signature & 0x01) > 0;
            RemaingLength = remainingLength;
        }

        public void WriteTo(IByteBuffer buffer)
        {
            var flags = (byte)PacketType << 4;
            flags |= Dup.ToByte() << 3;
            flags |= (byte)Qos << 1;
            flags |= Retain.ToByte();

            buffer.WriteByte((byte)flags);
            buffer.WriteBytes(EncodeLength(RemaingLength));
        }

        static byte[] EncodeLength(int length)
        {
            var result = new List<byte>();
            do
            {
                var digit = (byte)(length % 0x80);
                length /= 0x80;
                if (length > 0)
                    digit |= 0x80;
                result.Add(digit);
            } while (length > 0);

            return result.ToArray();
        }
    }

  

 

Packet:  消息基類編碼

    /// <summary>
    /// 消息基類
    /// </summary>
    public abstract class Packet
    {
        #region FixedHeader

        /// <summary>
        /// 固定報頭
        /// </summary>
        public FixedHeader FixedHeader { protected get; set; }
        /// <summary>
        /// 報文類型
        /// </summary>
        public PacketType PacketType => FixedHeader.PacketType;
        /// <summary>
        /// 重發標誌
        /// </summary>
        public bool Dup => FixedHeader.Dup;
        /// <summary>
        /// 服務質量等級
        /// </summary>
        public MqttQos Qos => FixedHeader.Qos;
        /// <summary>
        /// 保留標誌
        /// </summary>
        public bool Retain => FixedHeader.Retain;
        /// <summary>
        /// 剩餘長度
        /// </summary>
        public int RemaingLength => FixedHeader.RemaingLength;

        #endregion

        public Packet(PacketType packetType) => FixedHeader = new FixedHeader(packetType);

        public virtual void Encode(IByteBuffer buffer) { }

        public virtual void Decode(IByteBuffer buffer) { }
    }

  

PacketWithId: 消息基類(帶ID)spa

    /// <summary>
    /// 消息基類(帶ID)
    /// </summary>
    public abstract class PacketWithId : Packet
    {
        public PacketWithId(PacketType packetType) : base(packetType)
        {
        }

        /// <summary>
        /// 報文標識符
        /// </summary>
        public ushort PacketId { get; set; }

        /// <summary>
        /// EncodePacketIdVariableHeader
        /// </summary>
        /// <param name="buffer"></param>
        public override void Encode(IByteBuffer buffer)
        {
            var buf = Unpooled.Buffer();
            try
            {
                EncodePacketId(buf);

                FixedHeader.RemaingLength = buf.ReadableBytes;
                FixedHeader.WriteTo(buffer);
                buffer.WriteBytes(buf);
                buf = null;
            }
            finally
            {
                buf?.Release();
            }
        }

        /// <summary>
        /// DecodePacketIdVariableHeader
        /// </summary>
        /// <param name="buffer"></param>
        public override void Decode(IByteBuffer buffer)
        {
            int remainingLength = RemaingLength;
            DecodePacketId(buffer, ref remainingLength);
            FixedHeader.RemaingLength = remainingLength;
        }

        protected void EncodePacketId(IByteBuffer buffer)
        {
            if (Qos > MqttQos.AtMostOnce)
            {
                buffer.WriteUnsignedShort(PacketId);
            }
        }

        protected void DecodePacketId(IByteBuffer buffer, ref int remainingLength)
        {
            if (Qos > MqttQos.AtMostOnce)
            {
                PacketId = buffer.ReadUnsignedShort(ref remainingLength);
                if (PacketId == 0)
                    throw new DecoderException("[MQTT-2.3.1-1]");
            }
        }
    }

 

ConnectPacket: 發起鏈接包

    /// <summary>
    /// 發起鏈接
    /// </summary>
    internal sealed class ConnectPacket : Packet
    {
        public ConnectPacket()
            : base(PacketType.CONNECT)
        {
        }

        #region Variable header

        /// <summary>
        /// 協議名
        /// </summary>
        public string ProtocolName { get; } = "MQTT";
        /// <summary>
        /// 協議級別
        /// </summary>
        public byte ProtocolLevel { get; } = 0x04;
        /// <summary>
        /// 保持鏈接 
        /// </summary>
        public short KeepAlive { get; set; }

        #region Connect Flags
        /// <summary>
        /// 用戶名標誌
        /// </summary>
        public bool UsernameFlag { get; set; }
        /// <summary>
        /// 密碼標誌
        /// </summary>
        public bool PasswordFlag { get; set; }
        /// <summary>
        /// 遺囑保留
        /// </summary>
        public bool WillRetain { get; set; }
        /// <summary>
        /// 遺囑QoS
        /// </summary>
        public MqttQos WillQos { get; set; }
        /// <summary>
        /// 遺囑標誌
        /// </summary>
        public bool WillFlag { get; set; }
        /// <summary>
        /// 清理會話
        /// </summary>
        public bool CleanSession { get; set; }
        #endregion

        #endregion

        #region Payload

        /// <summary>
        /// 客戶端標識符 Client Identifier
        /// </summary>
        public string ClientId { get; set; }
        /// <summary>
        /// 遺囑主題 Will Topic
        /// </summary>
        public string WillTopic { get; set; }
        /// <summary>
        /// 遺囑消息 Will Message
        /// </summary>
        public byte[] WillMessage { get; set; }
        /// <summary>
        /// 用戶名 User Name
        /// </summary>
        public string UserName { get; set; }
        /// <summary>
        /// 密碼 Password
        /// </summary>
        public string Password { get; set; }

        #endregion

        public override void Encode(IByteBuffer buffer)
        {
            var buf = Unpooled.Buffer();
            try
            {
                //variable header
                buf.WriteString(ProtocolName);        //byte 1 - 8
                buf.WriteByte(ProtocolLevel);         //byte 9

                //connect flags;                      //byte 10
                var flags = UsernameFlag.ToByte() << 7;
                flags |= PasswordFlag.ToByte() << 6;
                flags |= WillRetain.ToByte() << 5;
                flags |= ((byte)WillQos) << 3;
                flags |= WillFlag.ToByte() << 2;
                flags |= CleanSession.ToByte() << 1;
                buf.WriteByte((byte)flags);

                //keep alive
                buf.WriteShort(KeepAlive);            //byte 11 - 12

                //payload
                buf.WriteString(ClientId);
                if (WillFlag)
                {
                    buf.WriteString(WillTopic);
                    buf.WriteBytes(WillMessage);
                }
                if (UsernameFlag && PasswordFlag)
                {
                    buf.WriteString(UserName);
                    buf.WriteString(Password);
                }

                FixedHeader.RemaingLength = buf.ReadableBytes;
                FixedHeader.WriteTo(buffer);
                buffer.WriteBytes(buf);
            }
            finally
            {
                buf?.Release();
                buf = null;
            }
        }
    }

 

鏈接回執: ConnAckPacket

    /// <summary>
    /// 鏈接回執
    /// </summary>
    internal sealed class ConnAckPacket : Packet
    {
        public ConnAckPacket() : base (PacketType.CONNACK)
        {
        }

        /// <summary>
        /// 當前會話
        /// </summary>
        public bool SessionPresent { get; set; }
        /// <summary>
        /// 鏈接返回碼
        /// </summary>
        public ConnectReturnCode ConnectReturnCode { get; set; }

        public override void Decode(IByteBuffer buffer)
        {
            SessionPresent = (buffer.ReadByte() & 0x01) == 1;
            ConnectReturnCode = (ConnectReturnCode)buffer.ReadByte();
        }
    }
View Code

 

剩餘幾個包,,你們看看源碼。

 

3、包解碼編碼 MqttDecoder  MqttEncoder  

 

    粘包拆包問題是處於網絡比較底層的問題,在數據鏈路層、網絡層以及傳輸層都有可能發生。咱們平常的網絡應用開發大都在傳輸層進行,因爲UDP有消息保護邊界,不會發生這個問題。

 

什麼是粘包、拆包?

對於什麼是粘包、拆包問題,我想先舉兩個簡單的應用場景:

  1. 客戶端和服務器創建一個鏈接,客戶端發送一條消息,客戶端關閉與服務端的鏈接。

  2. 客戶端和服務器簡歷一個鏈接,客戶端連續發送兩條消息,客戶端關閉與服務端的鏈接。

對於第一種狀況,服務端的處理流程能夠是這樣的:當客戶端與服務端的鏈接創建成功以後,服務端不斷讀取客戶端發送過來的數據,當客戶端與服務端鏈接斷開以後,服務端知道已經讀完了一條消息,而後進行解碼和後續處理...。對於第二種狀況,若是按照上面相同的處理邏輯來處理,那就有問題了,咱們來看看第二種狀況下客戶端發送的兩條消息遞交到服務端有可能出現的狀況:

第一種狀況:

服務端一共讀到兩個數據包,第一個包包含客戶端發出的第一條消息的完整信息,第二個包包含客戶端發出的第二條消息,那這種狀況比較好處理,服務器只須要簡單的從網絡緩衝區去讀就行了,第一次讀到第一條消息的完整信息,消費完再從網絡緩衝區將第二條完整消息讀出來消費。

沒有發生粘包、拆包示意圖

第二種狀況:

服務端一共就讀到一個數據包,這個數據包包含客戶端發出的兩條消息的完整信息,這個時候基於以前邏輯實現的服務端就蒙了,由於服務端不知道第一條消息從哪兒結束和第二條消息從哪兒開始,這種狀況實際上是發生了TCP粘包。

   TCP粘包示意圖

第三種狀況:

服務端一共收到了兩個數據包,第一個數據包只包含了第一條消息的一部分,第一條消息的後半部分和第二條消息都在第二個數據包中,或者是第一個數據包包含了第一條消息的完整信息和第二條消息的一部分信息,第二個數據包包含了第二條消息的剩下部分,這種狀況實際上是發送了TCP拆,由於發生了一條消息被拆分在兩個包裏面發送了,一樣上面的服務器邏輯對於這種狀況是很差處理的。

TCP拆包示意圖

 

爲何會發生TCP粘包、拆包呢?

發生TCP粘包、拆包主要是因爲下面一些緣由:

  1. 應用程序寫入的數據大於套接字緩衝區大小,這將會發生拆包。

  2. 應用程序寫入數據小於套接字緩衝區大小,網卡將應用屢次寫入的數據發送到網絡上,這將會發生粘包。

  3. 進行MSS(最大報文長度)大小的TCP分段,當TCP報文長度-TCP頭部長度>MSS的時候將發生拆包。

  4. 接收方法不及時讀取套接字緩衝區數據,這將發生粘包。

  5. ……

 

如何處理粘包、拆包問題?

知道了粘包、拆包問題及根源,那麼如何處理粘包、拆包問題呢?TCP自己是面向流的,做爲網絡服務器,如何從這源源不斷涌來的數據流中拆分出或者合併出有意義的信息呢?一般會有如下一些經常使用的方法:

  1. 使用帶消息頭的協議、消息頭存儲消息開始標識及消息長度信息,服務端獲取消息頭的時候解析出消息長度,而後向後讀取該長度的內容。

  2. 設置定長消息,服務端每次讀取既定長度的內容做爲一條完整消息。

  3. 設置消息邊界,服務端從網絡流中按消息編輯分離出消息內容。

  4. ……

如何基於DotNetty處理粘包、拆包問題?

ChannelPipeline 網絡層數據的流向

ChannelHandler 組件對網絡數據的處理

  1. ByteToMessageDecoder

  2. MessageToMessageDecoder

這兩個組件都實現了ChannelInboundHandler接口,這說明這兩個組件都是用來解碼網絡上過來的數據的。而他們的順序通常是ByteToMessageDecoder位於head channel handler的後面,MessageToMessageDecoder位於ByteToMessageDecoder的後面。DotNetty中,涉及到粘包、拆包的邏輯主要在ByteToMessageDecoder及其實現中。

 

ByteToMessageDecoder

顧名思義、ByteToMessageDecoder是用來將從網絡緩衝區讀取的字節轉換成有意義的消息對象的

當上面一個channel handler傳入的ByteBuf有數據的時候,這裏咱們能夠把in參數當作網絡流,這裏有不斷的數據流入,而咱們要作的就是從這個byte流中分離出message,而後把message添加給out。分開將一下代碼邏輯:

  1. 當out中有Message的時候,直接將out中的內容交給後面的channel handler去處理。

  2. 當用戶邏輯把當前channel handler移除的時候,當即中止對網絡數據的處理。

  3. 記錄當前in中可讀字節數。

  4. decode是抽象方法,交給子類具體實現。

  5. 一樣判斷當前channel handler移除的時候,當即中止對網絡數據的處理。

  6. 若是子類實現沒有分理出任何message的時候,且子類實現也沒有動bytebuf中的數據的時候,這裏直接跳出,等待後續有數據來了再進行處理。

  7. 若是子類實現沒有分理出任何message的時候,且子類實現動了bytebuf中的數據,則繼續循環,直到解析出message或者不在對bytebuf中數據進行處理爲止。

  8. 若是子類實現解析出了message可是又沒有動bytebuf中的數據,那麼是有問題的,拋出異常。

  9. 若是標誌位只解碼一次,則退出。

能夠知道,若是要實現具備處理粘包、拆包功能的子類,及decode實現,必需要遵照上面的規則,咱們以實現處理第一部分的第二種粘包狀況和第三種狀況拆包狀況的服務器邏輯來舉例:

對於粘包狀況的decode須要實現的邏輯對應於將客戶端發送的兩條消息都解析出來分爲兩個message加入out,這樣的話callDecode只須要調用一次decode便可。

對於拆包狀況的decode須要實現的邏輯主要對應於處理第一個數據包的時候第一次調用decode的時候out的size不變,從continue跳出而且因爲不知足繼續可讀而退出循環,處理第二個數據包的時候,對於decode的調用將會產生兩個message放入out,其中兩次進入callDecode上下文中的數據流將會合併爲一個bytebuf和當前channel handler實例關聯,兩次處理完畢即清空這個bytebuf。

 

MqttDecoder :  Mqtt 解碼器

    public sealed class MqttDecoder : ByteToMessageDecoder
    {
        readonly bool _isServer;
        readonly int _maxMessageSize;

        public MqttDecoder(bool isServer, int maxMessageSize)
        {
            _isServer = isServer;
            _maxMessageSize = maxMessageSize;
        }

        protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)
        {
            try
            {
                if (!TryDecodePacket(context, input, out Packet packet))
                    return;

                output.Add(packet);
            }
            catch (DecoderException)
            {
                input.SkipBytes(input.ReadableBytes);
                throw;
            }
        }

        bool TryDecodePacket(IChannelHandlerContext context, IByteBuffer buffer, out Packet packet)
        {
            if (!buffer.IsReadable(2))
            {
                packet = null;
                return false;
            }

            byte signature = buffer.ReadByte();

            if (!TryDecodeRemainingLength(buffer, out int remainingLength) || !buffer.IsReadable(remainingLength))
            {
                packet = null;
                return false;
            }

            //DecodePacketInternal
            var fixedHeader = new FixedHeader(signature, remainingLength);
            switch (fixedHeader.PacketType)
            {
                case PacketType.CONNECT: packet = new ConnectPacket(); break;
                case PacketType.CONNACK: packet = new ConnAckPacket(); break;
                case PacketType.DISCONNECT: packet = new DisconnectPacket(); break;
                case PacketType.PINGREQ: packet = new PingReqPacket(); break;
                case PacketType.PINGRESP: packet = new PingRespPacket(); break;
                case PacketType.PUBACK: packet = new PubAckPacket(); break;
                case PacketType.PUBCOMP: packet = new PubCompPacket(); break;
                case PacketType.PUBLISH: packet = new PublishPacket(); break;
                case PacketType.PUBREC: packet = new PubRecPacket(); break;
                case PacketType.PUBREL: packet = new PubRelPacket(); break;
                case PacketType.SUBSCRIBE: packet = new SubscribePacket(); break;
                case PacketType.SUBACK: packet = new SubAckPacket(); break;
                case PacketType.UNSUBSCRIBE: packet = new UnsubscribePacket(); break;
                case PacketType.UNSUBACK: packet = new UnsubscribePacket(); break;
                default:
                    throw new DecoderException("Unsupported Message Type");
            }
            packet.FixedHeader = fixedHeader;
            packet.Decode(buffer);

            //if (remainingLength > 0)
            //    throw new DecoderException($"Declared remaining length is bigger than packet data size by {remainingLength}.");

            return true;
        }

        bool TryDecodeRemainingLength(IByteBuffer buffer, out int value)
        {
            int readable = buffer.ReadableBytes;

            int result = 0;
            int multiplier = 1;
            byte digit;
            int read = 0;
            do
            {
                if (readable < read + 1)
                {
                    value = default;
                    return false;
                }
                digit = buffer.ReadByte();
                result += (digit & 0x7f) * multiplier;
                multiplier <<= 7;
                read++;
            }
            while ((digit & 0x80) != 0 && read < 4);

            if (read == 4 && (digit & 0x80) != 0)
                throw new DecoderException("Remaining length exceeds 4 bytes in length");

            int completeMessageSize = result + 1 + read;
            if (completeMessageSize > _maxMessageSize)
                throw new DecoderException("Message is too big: " + completeMessageSize);

            value = result;
            return true;
        }

        //static int DecodeRemainingLength(IByteBuffer buffer)
        //{
        //    byte encodedByte;
        //    var multiplier = 1;
        //    var remainingLength = 0;
        //    do
        //    {
        //        encodedByte = buffer.ReadByte();
        //        remainingLength += (encodedByte & 0x7f) * multiplier;
        //        multiplier *= 0x80;
        //    } while ((encodedByte & 0x80) != 0);

        //    return remainingLength;
        //}
    }

 

MqttEncoder:  mqtt 編碼器

    public sealed class MqttEncoder : MessageToMessageEncoder<Packet>
    {
        public static readonly MqttEncoder Instance = new MqttEncoder();

        protected override void Encode(IChannelHandlerContext context, Packet message, List<object> output) => DoEncode(context.Allocator, message, output);

        public static void DoEncode(IByteBufferAllocator bufferAllocator, Packet packet, List<object> output)
        {
            IByteBuffer  buffer = bufferAllocator.Buffer();
            try
            {
                packet.Encode(buffer);
                output.Add(buffer);
                buffer = null;
            }
            finally
            {
                buffer?.SafeRelease();
            }
        }
    }

 

未完待續。。。

相關文章
相關標籤/搜索