Tcp粘包處理

Tcp傳輸的是數據流,不能保證每次收到數據包與發送的數據包徹底一致。好比發送了兩個消息abc和deg收到的多是ab和cdef。緩存

爲了解決這個問題須要在消息中加上能標識,以拆分出發送的原始消息。ide

在此使用了簡單的方式,在消息前加上4字節的包長度,收到消息後查看是否完整,若不完整則等到下一次收到數據再處理,直到接收到完整的數據包。spa

負載包 = 4bytes(消息包長度) + nbytes(消息包內容)。code

下面是完整的實現:blog

PacketBufferManager負責拆包及數據包緩存的處理,爲泛型類TPacket爲消息包的類型。接口

收到到的消息包通常會有必定的結構好比Json、Protobuf等,所以抽象了IPacketFactory接口,能夠自定義消息的反序列化,若消息包就以字節數據的方式處理可用示例中的BytePacketFactory。ip

    public interface IPacketFactory<out TPacket>
    {
        TPacket ReadPacket(Stream stream, int count);
    }
    public class BytePacketFactory : IPacketFactory<byte[]>
    {
        public IKeyInfo KeyInfo { get; set; }

        public byte[] ReadPacket(Stream stream, int count)
        {
            var bytes = new byte[count];
            var unused = stream.Read(bytes, 0, count);
            return bytes;
        }
    }
    public class PacketBufferManager<TPacket>
    {
        private const int MaxPacketLength = 1024 * 1024 * 1024;
        private readonly ArraySegmentStream _bufferStream = new ArraySegmentStream();
        private readonly IPacketFactory<TPacket> _packetFactory;
        private List<ArraySegment<byte>> _datas = new List<ArraySegment<byte>>();

        public PacketBufferManager(IPacketFactory<TPacket> packetFactory)
        {
            _packetFactory = packetFactory ?? throw new ArgumentNullException(nameof(packetFactory));
        }

        public TPacket[] ReadPackets(byte[] data, int offset, int count)
        {
            var temp = _datas.ToList();
            temp.Add(new ArraySegment<byte>(data, offset, count));

            var totalCount = ArraySegmentStream.GetLeftCount(temp.ToArray(), 0, 0);
            if (totalCount < 4)
            {
                var currentBytes = data.Skip(offset).Take(count).ToArray();
                temp[temp.Count - 1] = new ArraySegment<byte>(currentBytes);
                _datas = temp;
                return null;
            }

            _bufferStream.Reset(temp.ToArray());
            var packets = new List<TPacket>();
            while (true)
            {
                var lengthBytes = new byte[4];
                var savePosition = _bufferStream.Position;
                var readLength = _bufferStream.Read(lengthBytes, 0, 4);
                _bufferStream.Position = savePosition;
                if (readLength < 4)
                {
                    var currentBytes = data
                        .Skip(offset + _bufferStream.SegmentPosition)
                        .Take(count - _bufferStream.SegmentPosition)
                        .ToArray();
                    temp = temp.Skip(_bufferStream.SegmentIndex).ToList();
                    temp[temp.Count - 1] = new ArraySegment<byte>(currentBytes);
                    _datas = temp;
                    return packets.ToArray();
                }

                var packetLength = BitConverter.ToInt32(lengthBytes, 0);
                if (packetLength > MaxPacketLength)
                {
                    throw new InvalidDataException("packet excced max length");
                }

                var leftCount = _bufferStream.Length - _bufferStream.Position;
                if (leftCount < packetLength + 4) //no enough bytes
                {
                    var currentBytes = data
                        .Skip(offset + _bufferStream.SegmentPosition)
                        .Take(count - _bufferStream.SegmentPosition)
                        .ToArray();
                    temp = temp.Skip(_bufferStream.SegmentIndex).ToList();
                    temp[temp.Count - 1] = new ArraySegment<byte>(currentBytes);
                    _datas = temp;
                    return packets.ToArray();
                }

                _bufferStream.Read(lengthBytes, 0, 4);
                var pb = _packetFactory.ReadPacket(_bufferStream, packetLength);
                packets.Add(pb);

                if (_bufferStream.Length == _bufferStream.Position) //all byte read
                {
                    _datas.Clear();
                    return packets.ToArray();
                }
                //var usedDataLength = _bufferStream.Position;
            }
        }
    }
    public class ArraySegmentStream : Stream
    {
        public int SegmentIndex { get; private set; }
        public int SegmentPosition { get; private set; }
        public ArraySegment<byte>[] Datas { get; private set; }

        public ArraySegmentStream() { }

        public ArraySegmentStream(ArraySegment<byte>[] datas)
        {
            Reset(datas);
        }

        public void Reset(ArraySegment<byte>[] datas)
        {
            Datas = datas;
            SegmentIndex = 0;
            SegmentPosition = 0;
        }

        public override void Flush()
        {
            //throw new NotImplementedException();
        }

        public override int Read(byte[] buffer, int offset, int count)
        {
            var leftTotalCount = GetLeftCount();
            if (leftTotalCount < count)
                throw new IOException("no enough buffer data");

            var bufferIndex = 0;
            for (var i = SegmentIndex; i < Datas.Length; i++)
            {
                var leftCount = count - bufferIndex;
                var segment = Datas[SegmentIndex];
                var currentSegmentLeftCount = segment.Count - SegmentPosition;
                var readCount = Math.Min(leftCount, currentSegmentLeftCount);
                Array.Copy(segment.Array, segment.Offset + SegmentPosition,
                    buffer, offset + bufferIndex, readCount);

                SegmentPosition += readCount;
                bufferIndex += readCount;

                if (SegmentPosition == segment.Count)
                {
                    //move to next segment
                    SegmentPosition = 0;
                    SegmentIndex++;
                }
                if (bufferIndex != count) continue;

                break;
            }
            return bufferIndex;
        }

        private int GetLeftCount()
        {
            return GetLeftCount(Datas, SegmentIndex, SegmentPosition);
        }

        public static int GetLeftCount(ArraySegment<byte>[] segments, int currentSegmentIndex, int currentSegmentPosition)
        {
            var count = 0;
            var isCurrent = true;
            for (var i = currentSegmentIndex; i < segments.Length; i++)
            {
                count += isCurrent
                    ? segments[i].Count - currentSegmentPosition
                    : segments[i].Count;
                isCurrent = false;
            }
            return count;
        }

        public override long Seek(long offset, SeekOrigin origin)
        {
            throw new NotImplementedException();
        }

        public override void SetLength(long value)
        {
            throw new NotImplementedException();
        }

        public override void Write(byte[] buffer, int offset, int count)
        {
            throw new NotImplementedException();
        }

        public override bool CanRead { get; } = true;
        public override bool CanSeek { get; } = false;
        public override bool CanWrite { get; } = false;
        public override long Length => GetLeftCount(Datas, 0, 0);
        public override long Position
        {
            get => Length - GetLeftCount();
            set
            {
                if (value + 1 > Length) throw new InvalidOperationException("position out of range");

                var position = 0;
                for (int segmentIndex = 0; segmentIndex < Datas.Length; segmentIndex++)
                {
                    if (position + Datas[segmentIndex].Count - 1 < value)
                    {
                        position += Datas[segmentIndex].Count; //next segment first element position
                    }
                    else
                    {
                        SegmentIndex = segmentIndex;
                        SegmentPosition = (int)value - position;
                        return;
                    }
                }
                throw new IndexOutOfRangeException();
            }
        }
    }
            //用法示例
            var bufferManager = new PacketBufferManager<byte[]>(new BytePacketFactory());
            var readBufferSize = 8192;
            var readBuffer = new byte[readBufferSize];
            while (true)
            {
                try
                {
                    var receivedLength = networkStream.Read(readBuffer, 0, readBufferSize);
                    //接收到字節流後由bufferManager處理,若是無完整的消息包則返回null, 可能會返回多個包
                    var packets = bufferManager.ReadPackets(readBuffer, 0, receivedLength);
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex);
                    break;
                }
            }
相關文章
相關標籤/搜索