SuperSocket Code解析

SuperSocket1.6Code解析

Normal Socket

System.Net.Sockets.dll程序集中使用socket類:node

服務器:bootstrap

  1. 建立socket:_socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
  2. 建立IPIPAddress _ip = IPAddress.Parse(ip);_endPoint = new IPEndPoint(_ip, port);
  3. 綁定IP地址: _socket.Bind(_endPoint); //綁定端口
  4. 服務開啓監聽:_socket.Listen(BACKLOG); //開啓監聽,backlog是監聽的最大數列
  5. 開啓監聽線程:建立新的監聽線程,在監聽線程中while調用Socket acceptSocket = _socket.Accept();
    1. 一旦acceptSocket 不爲空,說明有客戶端鏈接成功,保存客戶端socket,並查看該socket的isConnected屬性是否鏈接socket.RemoteEndPoint.ToString();
    2. 一旦鏈接建立接收線程,並啓動線程,在該線程中建立whilewhile (sInfo.isConnected){sInfo.socket.BeginReceive(sInfo.buffer, 0, sInfo.buffer.Length, SocketFlags.None, ReceiveCallBack, sInfo.socket.RemoteEndPoint);}來接收客戶端傳來的消息。
  6. BeginReceive()有一個回調函數ReceiveCallBack()經過讀取byte[]buffer
  7. 向客戶端發送信息socket.Send(Encoding.ASCII.GetBytes(text));

receivebuffer默認值8192c#

SocketAsyncEventArgs

異步套接字操做api

  1. 建立IPEndPoint
  2. 建立socketListenerSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
  3. 綁定IP地址ListenerSocket.Bind(e);
  4. 開始監聽ListenerSocket.Listen(10);
  5. 建立異步套接字,並綁定異步完成事件Args = new SocketAsyncEventArgs();Args.Completed += new EventHandler<SocketAsyncEventArgs>(ProcessAccept);
  6. 調用socket的AcceptAsync(Args)方法ListenerSocket.AcceptAsync(Args);
  7. 在異步套接字完成事件的回調函數中,建立新的異步套接字用於接收客戶端傳入消息的異步操做。var args = new SocketAsyncEventArgs();args.Completed += new EventHandler<SocketAsyncEventArgs>(OnIOCompleted);args.AcceptSocket = s;s.ReceiveAsync(args)s.ReceiveAsync(args),s接收的socket的,新建一個異步套接字,並傳入ReceiveAsync()方法。
  8. switch (e.LastOperation)case SocketAsyncOperation.Receive:

Socket.AcceptAsync(SocketAsyncEventArgs) 方法數組

返回:若是 I/O 操做掛起,則爲 true。 操做完成時,將引起 Completed 參數的 e 事件。緩存

若是 I/O 操做同步完成,則爲 false。 將不會引起 Completed 參數的 e 事件,而且可能在方法調用返回後當即檢查做爲參數傳遞的 e 對象以檢索操做的結果。安全

SuperSocket Architecture

SuperSocket 層次示意圖

![# SuperSocket1.6Code解析服務器

Normal Socket

System.Net.Sockets.dll程序集中使用socket類:session

服務器:併發

  1. 建立socket:_socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
  2. 建立IPIPAddress _ip = IPAddress.Parse(ip);_endPoint = new IPEndPoint(_ip, port);
  3. 綁定IP地址: _socket.Bind(_endPoint); //綁定端口
  4. 服務開啓監聽:_socket.Listen(BACKLOG); //開啓監聽,backlog是監聽的最大數列
  5. 開啓監聽線程:建立新的監聽線程,在監聽線程中while調用Socket acceptSocket = _socket.Accept();
    1. 一旦acceptSocket 不爲空,說明有客戶端鏈接成功,保存客戶端socket,並查看該socket的isConnected屬性是否鏈接socket.RemoteEndPoint.ToString();
    2. 一旦鏈接建立接收線程,並啓動線程,在該線程中建立whilewhile (sInfo.isConnected){sInfo.socket.BeginReceive(sInfo.buffer, 0, sInfo.buffer.Length, SocketFlags.None, ReceiveCallBack, sInfo.socket.RemoteEndPoint);}來接收客戶端傳來的消息。
  6. BeginReceive()有一個回調函數ReceiveCallBack()經過讀取byte[]buffer
  7. 向客戶端發送信息socket.Send(Encoding.ASCII.GetBytes(text));

receivebuffer默認值8192

SocketAsyncEventArgs

異步套接字操做

  1. 建立IPEndPoint
  2. 建立socketListenerSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
  3. 綁定IP地址ListenerSocket.Bind(e);
  4. 開始監聽ListenerSocket.Listen(10);
  5. 建立異步套接字,並綁定異步完成事件Args = new SocketAsyncEventArgs();Args.Completed += new EventHandler<SocketAsyncEventArgs>(ProcessAccept);
  6. 調用socket的AcceptAsync(Args)方法ListenerSocket.AcceptAsync(Args);
  7. 在異步套接字完成事件的回調函數中,建立新的異步套接字用於接收客戶端傳入消息的異步操做。var args = new SocketAsyncEventArgs();args.Completed += new EventHandler<SocketAsyncEventArgs>(OnIOCompleted);args.AcceptSocket = s;s.ReceiveAsync(args)s.ReceiveAsync(args),s接收的socket的,新建一個異步套接字,並傳入ReceiveAsync()方法。
  8. switch (e.LastOperation)case SocketAsyncOperation.Receive:

Socket.AcceptAsync(SocketAsyncEventArgs) 方法

返回:若是 I/O 操做掛起,則爲 true。 操做完成時,將引起 Completed 參數的 e 事件。

若是 I/O 操做同步完成,則爲 false。 將不會引起 Completed 參數的 e 事件,而且可能在方法調用返回後當即檢查做爲參數傳遞的 e 對象以檢索操做的結果。

SuperSocket Architecture

SuperSocket 層次示意圖

![# SuperSocket1.6Code解析

Normal Socket

System.Net.Sockets.dll程序集中使用socket類:

服務器:

  1. 建立socket:_socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
  2. 建立IPIPAddress _ip = IPAddress.Parse(ip);_endPoint = new IPEndPoint(_ip, port);
  3. 綁定IP地址: _socket.Bind(_endPoint); //綁定端口
  4. 服務開啓監聽:_socket.Listen(BACKLOG); //開啓監聽,backlog是監聽的最大數列
  5. 開啓監聽線程:建立新的監聽線程,在監聽線程中while調用Socket acceptSocket = _socket.Accept();
    1. 一旦acceptSocket 不爲空,說明有客戶端鏈接成功,保存客戶端socket,並查看該socket的isConnected屬性是否鏈接socket.RemoteEndPoint.ToString();
    2. 一旦鏈接建立接收線程,並啓動線程,在該線程中建立whilewhile (sInfo.isConnected){sInfo.socket.BeginReceive(sInfo.buffer, 0, sInfo.buffer.Length, SocketFlags.None, ReceiveCallBack, sInfo.socket.RemoteEndPoint);}來接收客戶端傳來的消息。
  6. BeginReceive()有一個回調函數ReceiveCallBack()經過讀取byte[]buffer
  7. 向客戶端發送信息socket.Send(Encoding.ASCII.GetBytes(text));

receivebuffer默認值8192

SocketAsyncEventArgs

異步套接字操做

  1. 建立IPEndPoint
  2. 建立socketListenerSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
  3. 綁定IP地址ListenerSocket.Bind(e);
  4. 開始監聽ListenerSocket.Listen(10);
  5. 建立異步套接字,並綁定異步完成事件Args = new SocketAsyncEventArgs();Args.Completed += new EventHandler<SocketAsyncEventArgs>(ProcessAccept);
  6. 調用socket的AcceptAsync(Args)方法ListenerSocket.AcceptAsync(Args);
  7. 在異步套接字完成事件的回調函數中,建立新的異步套接字用於接收客戶端傳入消息的異步操做。var args = new SocketAsyncEventArgs();args.Completed += new EventHandler<SocketAsyncEventArgs>(OnIOCompleted);args.AcceptSocket = s;s.ReceiveAsync(args)s.ReceiveAsync(args),s接收的socket的,新建一個異步套接字,並傳入ReceiveAsync()方法。
  8. switch (e.LastOperation)case SocketAsyncOperation.Receive:

Socket.AcceptAsync(SocketAsyncEventArgs) 方法

返回:若是 I/O 操做掛起,則爲 true。 操做完成時,將引起 Completed 參數的 e 事件。

若是 I/O 操做同步完成,則爲 false。 將不會引起 Completed 參數的 e 事件,而且可能在方法調用返回後當即檢查做爲參數傳遞的 e 對象以檢索操做的結果。

SuperSocket Architecture

SuperSocket 層次示意圖

SuperSocket Layers

  • Reusable IO Buffer Pool:BufferManager類

SuperSocket 對象模型圖示意圖

SuperSocket Object Model

SuperSocket 請求處理模型示意圖

SuperSocket Request Handling Model

SuperSocket 隔離模型示意圖

SuperSocket Isolation Model

Config

Command Filters

Log/LogFactory

Command Loaders

1579856396698

ReceiveFilterFactory

1579855102888

ReceiveFilter

1579855469740

Connection Filters

1579856311644

SocketBase.dll

ISessionBase

1579318554194

AppSession

1580125323588

1579319140513

對AppServer和SocketSession的包裝

ServerConfig

服務參數配置,在serverbase基類SetUp中建立

/// <summary>
/// Setups with the specified ip and port.
/// </summary>
/// <param name="ip">The ip.</param>
/// <param name="port">The port.</param>
/// <param name="socketServerFactory">The socket server factory.</param>
/// <param name="receiveFilterFactory">The Receive filter factory.</param>
/// <param name="logFactory">The log factory.</param>
/// <param name="connectionFilters">The connection filters.</param>
/// <param name="commandLoaders">The command loaders.</param>
/// <returns>return setup result</returns>
public bool Setup(string ip, int port, ISocketServerFactory socketServerFactory = null, IReceiveFilterFactory<TRequestInfo> receiveFilterFactory = null, ILogFactory logFactory = null, IEnumerable<IConnectionFilter> connectionFilters = null, IEnumerable<ICommandLoader<ICommand<TAppSession, TRequestInfo>>> commandLoaders = null)
{
    return Setup(new ServerConfig
                    {
                        Ip = ip,
                        Port = port
                    },
                    socketServerFactory,
                    receiveFilterFactory,
                    logFactory,
                    connectionFilters,
                    commandLoaders);
}

1578468089417

RootConfig

1578469017093

  • MaxWorkingThreads:最大工做線程數量
  • MaxCompletionPortThreads:線程池中異步 I/O 線程的最大數目。
  • PerformanceDataCollectInterval:性能數據收集間隔

RequestInfo

類圖

1578446037999

  • 基類是RequestInfo,提供了兩個方法Key和Body,Body是模板,由子類肯定具體類型
  • StringRequestInfo,在父類基礎上提供了一個參數,String[] Parameters
  • RequestInfo<TRequestHeader, TRequestBody>:提供了請求頭和請求體類型的模板。
  • 三個接口,key屬性,body屬性,heater屬性

ListenerInfo

1579947882373

監聽節點

ListenerConfig

1578474012112

ReflectCommandLoader

1578472566238

  • ReflectCommandLoader:經過TryLoadCommands方法反射出程序集中的全部命令
/// <summary>
/// Tries to load commands.
/// </summary>
/// <param name="commands">The commands.</param>
/// <returns></returns>
public override bool TryLoadCommands(out IEnumerable<TCommand> commands)
{
    commands = null;
    var commandAssemblies = new List<Assembly>();
    if (m_AppServer.GetType().Assembly != this.GetType().Assembly)
        commandAssemblies.Add(m_AppServer.GetType().Assembly);
    string commandAssembly = m_AppServer.Config.Options.GetValue("commandAssembly");
    if (!string.IsNullOrEmpty(commandAssembly))
    {
        OnError("The configuration attribute 'commandAssembly' is not in used, please try to use the child node 'commandAssemblies' instead!");
        return false;
    }
    if (m_AppServer.Config.CommandAssemblies != null && m_AppServer.Config.CommandAssemblies.Any())
    {
        try
        {
            var definedAssemblies = AssemblyUtil.GetAssembliesFromStrings(m_AppServer.Config.CommandAssemblies.Select(a => a.Assembly).ToArray());

            if (definedAssemblies.Any())
                commandAssemblies.AddRange(definedAssemblies);
        }
        catch (Exception e)
        {
            OnError(new Exception("Failed to load defined command assemblies!", e));
            return false;
        }
    }
    if (!commandAssemblies.Any())
    {
        commandAssemblies.Add(Assembly.GetEntryAssembly());
    }
    var outputCommands = new List<TCommand>();
    foreach (var assembly in commandAssemblies)
    {
        try
        {
            outputCommands.AddRange(assembly.GetImplementedObjectsByInterface<TCommand>());
        }
        catch (Exception exc)
        {
            OnError(new Exception(string.Format("Failed to get commands from the assembly {0}!", assembly.FullName), exc));
            return false;
        }
    }
    commands = outputCommands;
    return true;
}
}

StatusInfoCollection

1578475314305

AppServerBase

1580125365801

1578487267303

AppSeverBase<TAppSession,TRequestInfo>

m_CommandContainer:命令容器

m_CommandLoaders

m_ConnectionFilters

m_GlobalCommandFilters

m_Listeners

m_SocketServerFactory:在SetupBas

Facility.dll

PolicyReceiveFilterFactory

1579780070988

PolicyRecieveFilter

1579780715003

Protocol

ReceiveFilterBase

類圖

1578464691790

1578464252406

1578466885687

  • 在SuperSocket.SocketBase.Protocol程序集中
  • IReceiveFilter<TRequestInfo>接口,接收解析接口
    • Filter方法,解析會話請求的信息,參數包括,讀取緩衝,偏移量,長度,是否copy,沒有被解析的長度
    • LeftBufferSize屬性:空餘的緩衝區長度
    • NextReceiveFilter屬性,下一個接收解析器
    • Reset方法,恢復初始化
    • State:解析器狀態,正常和錯誤狀態
  • ArraySegmentEx<T>數段類
    • T爲數組模板
    • Array數組,count:數量,Offset偏移量,From從,To到
  • ArraySegmentList<T>數段列表
    • 實現了一個數組段列表
    • m_PrevSegment:當前的數段
    • m_PrevSegmentIndex,數段所在的index
  • ReceiveFilterBase<TRequestInfo>
    • BufferSegments屬性

SocketEngine.dll

PerformanceMonitor

1580126841482

SocketSession

1579319465778

在初始化裏對AppSession產生依賴,同時維護Socket和SmartPool(SendingQueue[]),由於維護着socket因此發送接收數據都是經過這個類。

  • 設置狀態:AddStateFlag()TryAddStateFlag()RemoveStateFlag(),AddStateFlag:自旋設置m_State狀態,線程安全的
  • m_Client:Socket
  • SessionID:new guid
  • LocalEndPoint:本地Id端
  • RemoteEndPoint:遠程終結點
  • m_SendingQueuePool:實際是SmartPool類的實例,該實例維護者sendingQueue數組
  • m_SendingQueue:從SmarlPool中獲取一個SendingQueue實例。

方法

Initialize()方法:

  • 初始化m_SendingQueuePool和m_SendingQueue

TrySend()方法:參數:IList<ArraySegment<byte>> segments:將segments壓入sendingqueue隊列並調用StartSend最終是調用SendAsyncSendSync,這個是由子類實現。

AsyncSocketSession

在子類中維護SocketAsyncEventArgs

  • SocketAsyncProxy:維護着SocketAsyncEventArgs
  • m_SocketEventArgSend:發送的SocketAsyncEventArgs實例

在初始化中若是同步發送就使用m_SocketEventArgSend,並OnSendingCompleted方法綁定其Completed事件

在SendAsync()方法中將SendingQueue實例給m_SocketEventArgSend的UserToken屬性,並調用m_SocketEventArgSend的SetBufferSendAsync方法,發送失敗也調用OnSendingCompleted

SocketAsyncProxy中的Completed事件中調用ProcessReceive方法,再調用this.AppSession.ProcessRequest(e.Buffer, e.Offset, e.BytesTransferred, true);方法

AsyncStreamSocketSession

SocketFactory

1578467105778

/// <summary>
/// Creates the socket server.
/// </summary>
/// <typeparam name="TRequestInfo">The type of the request info.</typeparam>
/// <param name="appServer">The app server.</param>
/// <param name="listeners">The listeners.</param>
/// <param name="config">The config.</param>
/// <returns></returns>
public ISocketServer CreateSocketServer<TRequestInfo>(IAppServer appServer, ListenerInfo[] listeners, IServerConfig config)
    where TRequestInfo : IRequestInfo
{
    if (appServer == null)
        throw new ArgumentNullException("appServer");
    if (listeners == null)
        throw new ArgumentNullException("listeners");
    if (config == null)
        throw new ArgumentNullException("config");
    switch(config.Mode)
    {
        case(SocketMode.Tcp):
            return new AsyncSocketServer(appServer, listeners);
        case(SocketMode.Udp):
            return new UdpSocketServer<TRequestInfo>(appServer, listeners);
        default:
            throw new NotSupportedException("Unsupported SocketMode:" + config.Mode);
    }
}

SocketServers

1578487919018

AsyncSocketServer

  • 緩存管理器m_BufferManager
  • 線程安全的SocketAsyncEventArgsProxy棧

構造函數,父類

public TcpSocketServerBase(IAppServer appServer, ListenerInfo[] listeners)
    : base(appServer, listeners)
{
    var config = appServer.Config;

    uint dummy = 0;
    m_KeepAliveOptionValues = new byte[Marshal.SizeOf(dummy) * 3];
    m_KeepAliveOptionOutValues = new byte[m_KeepAliveOptionValues.Length];
    //whether enable KeepAlive
    BitConverter.GetBytes((uint)1).CopyTo(m_KeepAliveOptionValues, 0);
    //how long will start first keep alive
    BitConverter.GetBytes((uint)(config.KeepAliveTime * 1000)).CopyTo(m_KeepAliveOptionValues, Marshal.SizeOf(dummy));
    //keep alive interval
    BitConverter.GetBytes((uint)(config.KeepAliveInterval * 1000)).CopyTo(m_KeepAliveOptionValues, Marshal.SizeOf(dummy) * 2);

    m_SendTimeOut = config.SendTimeOut;
    m_ReceiveBufferSize = config.ReceiveBufferSize;
    m_SendBufferSize = config.SendBufferSize;
}
public override bool Start()
{
    try
    {
        int bufferSize = AppServer.Config.ReceiveBufferSize;

        if (bufferSize <= 0)
            bufferSize = 1024 * 4;

        m_BufferManager = new BufferManager(bufferSize * AppServer.Config.MaxConnectionNumber, bufferSize);

        try
        {
            m_BufferManager.InitBuffer();
        }
        catch (Exception e)
        {
            AppServer.Logger.Error("Failed to allocate buffer for async socket communication, may because there is no enough memory, please decrease maxConnectionNumber in configuration!", e);
            return false;
        }

        // preallocate pool of SocketAsyncEventArgs objects
        SocketAsyncEventArgs socketEventArg;

        var socketArgsProxyList = new List<SocketAsyncEventArgsProxy>(AppServer.Config.MaxConnectionNumber);

        for (int i = 0; i < AppServer.Config.MaxConnectionNumber; i++)
        {
            //Pre-allocate a set of reusable SocketAsyncEventArgs
            socketEventArg = new SocketAsyncEventArgs();
            m_BufferManager.SetBuffer(socketEventArg);

            socketArgsProxyList.Add(new SocketAsyncEventArgsProxy(socketEventArg));
        }

        m_ReadWritePool = new ConcurrentStack<SocketAsyncEventArgsProxy>(socketArgsProxyList);

        if (!base.Start())
            return false;

        IsRunning = true;
        return true;
    }
    catch (Exception e)
    {
        AppServer.Logger.Error(e);
        return false;
    }
}

SocketAsyncEventArgsProxy

1579316308225

SocketAsyncEventArgs的代理

維護着一個SocketAsyncEventArgs對象,並訂閱了該對象的Completed事件(異步完成事件)

IsRecyclable:是否能夠循環使用

OrigOffset:原始偏移量

每當異步完成的時候調用SocketAsyncEventArgs實例中的UserToken屬性,該屬性實際上保存着SocketSession實例,並調用SocketSession的ProcessReceive()AsyncRun()方法;socketSession.AsyncRun(() => socketSession.ProcessReceive(e));

UserToken屬性是在SocketAsyncEventArgsProxy的初始化方法中定義的

public void Initialize(IAsyncSocketSession socketSession)
{
    SocketEventArgs.UserToken = socketSession;
}

代理模式

img

BootstrapFactory

1580106366845

DefaultBootStrap

1579403530245

引導配置文件並經過配置實例化各個server和factory,在CreateWorkItemInstance方法經過Activator.CreateInstance(serviceType)實例化

ConfigurationWatcher

1580108736600

SocketListenerBase

1579439987190

TcpAsyncSocketListener

監聽類,由三個事件:監聽錯誤,監聽中止,新的客戶端鏈接

m_ListrnSocket:監聽Socket

WorkItemFactoryInfoLoader

1580119613894

配置文件載入 LoadResult,載入配置的connectionFilter,logfactory,commandloaderfactory,將appserver轉化成IworkItem接口,

Common.dll

BufferManager

1578487794792

此類建立一個大緩衝區,該緩衝區能夠分配給每一個套接字I / O操做使用,並分配給SocketAsyncEventArgs對象。 這使得bufffer能夠輕鬆地重用,而且能夠防止堆內存碎片化

BufferManager類上公開的操做不是線程安全的。我以爲這個類不須要線程安全,由於每一個socket得到數據基本不會併發執行。

  • m_buffer:全部的字節緩存
  • m_bufferSize:單個片斷的緩存大小
  • m_currentIndex:當前字節在總緩存中的索引
  • m_freeIndexPool:空閒索引池
  • m_numBytes:緩存片斷的數目

主要提供兩個方法:一個是SetBuffer和FreeBuffer

SetBuffer:

  • 檢查空閒索引棧中是否有值,有值就直接使用空閒索引棧中的值,並將其值從棧中推出,
  • 若是沒有空閒棧的值就先檢查剩餘的緩存是否有一個片斷大小,有的化就設置並改變m_currentIndex索引,沒有返回false

FreeBuffer:

  • 將當前索引添加到空閒索引棧中,並釋放SocketAsyncEventArgs中用的緩存片斷。

ArraySegmentList

1579323401012

方法:

IndexOf:T在全部緩存中的索引

ArraySegmentEx

  • 數組,是保存着全部緩存,T[]
  • 偏移,該片斷在緩存中的位置
  • 數量,該片斷的長度

SendingQueue

1579329295160

維護ArraySegment<byte>[] globalQueue, globalQueue中包含着全部全部緩存

入棧,出戰,開始入棧,開始出棧。

全部的發送隊列內存片組成一個大的arraysegment,由SendingQueueSourceCreator建立,並由SmartPool維護

SendingQueueSourceCreator

實際就是SmartPoolSourceCreator,發送隊列建立者,默認有5個發送隊列,其實每一個鏈接一個發送隊列,這邊的全部sendingQueue組數是由SmartPool維護的

m_SendingQueueSize:發送隊列大小,默認爲5

/// <summary>
/// Creates the specified size.
/// </summary>
/// <param name="size">The size.</param>
/// <param name="poolItems">The pool items.</param>
/// <returns></returns>
public ISmartPoolSource Create(int size, out SendingQueue[] poolItems)
{
    var source = new ArraySegment<byte>[size * m_SendingQueueSize];//256*5
    poolItems = new SendingQueue[size];//size=256
    for (var i = 0; i < size; i++)
    {
        poolItems[i] = new SendingQueue(source, i * m_SendingQueueSize, m_SendingQueueSize);//SendingQueue中的source是全部的隊列緩存,發送隊列偏移量和發送隊列容量
    }
    return new SmartPoolSource(source, size);
}

SmartPool

1579327943978

其中維護了一個T(實際是SendingQueue)線程安全棧(m_GlobalStack)。由此看出SmartPool就是SendingQueue的池

m_MinPoolSize:Math.Max(config.MaxConnectionNumber / 6, 256)

m_MaxPoolSize:Math.Max(config.MaxConnectionNumber * 2, 256)

m_SourceCreator:new SendingQueueSourceCreator(config.SendingQueueSize)

m_ItemsSource:保存着SmartPoolSource[]對象,該對象其實是全部的sendingqueue緩存。

m_GlobalStack:保存着單個SendingQueuep對象的數組

Initialize():初始化函數,初始化上面的變量

SmartPoolSource

維護全部的發送隊列緩存,並保存sendingQueue的個數

Source:是object類型,其實是ArraySegment<byte>[],其實是全部的sendingqueue的緩存,大小爲size*sendingqueuesize=256*5

Count:爲默認值5

Other.dll

SocketAsyncEventArgs

表示異步套接字操做。

設置IP和Port調用流程

  1. 建立ServerConfig實例,RootConfig實例
  2. 設置m_State狀態,線程安全的,經過Interlocked.CompareExchange方法設置
  3. 在setbasic中設置RootConfig,m_Name,Config,設置currentculture,設置線程池參數,設置m_socketfactory,設置textencoding,
  4. 設置logfactory
  5. 在setMedium中設置ReceiveFilterFactory,m_ConnectionFilters,m_CommandLoaders(add ReflectCommandLoader
  6. 在SetupAdvanced中設置BaseSecurity和Certificate,設置listners(ListenerInfo) 設置CommandFilterAttribute,遍歷m_CommandLoaders,訂閱Error,Updated事件,調用Initialize方法,經過TryLoadCommands方法獲取命令集合commands,遍歷命令集合添加命令到discoveredCommands集合中
  7. 遍歷discoveredCommands集合,將其添加到命令容器 m_CommandContainer中,使用Interlocked.Exchange方法保證線程安全
  8. 在SetupFinal中設置ReceiveFilterFactory=new CommandLineReceiveFilterFactory(TextEncoding),設置m_ServerStatus,經過socketfactory得到serverfactory。

start調用流程

  1. 調用SuperSocket.SocketBase.AppServer中start()方法,調用基類AppServerBase的start()方法,該方法中調用socketserver的start方法
  2. 在socketserver的start方法中設置BufferManager,建立SocketAsyncEventArg,並經過buffermanager設置其buffer,並建立SocketAsyncEventArgProxys, SocketAsyncEventArgProxys集合賦值給m_ReadWritePool。調用SocketServer基類中的start
  3. 在socketserver基類的start中建立SendingQueuePool並初始化,實際是初始化隊列池中的sendingqueue隊列;經過遍歷ListenerInfo集合建立TcpAsyncSocketListener監聽者,訂閱監聽者的stop,error,NewClientAccepted事件,並開始監聽Listener.Start,也添加到容器中。
  4. Listener.Start中建立一個監聽Listen_socket和new異步套接字SocketAsyncEventArgs,並訂閱Compeleted事件,啓用socket監聽,並調用AcceptAsync方法,異步完成觸發compeleted事件,調用ProcessAccept方法,原來的方法異步已經觸發從新調用一下AcceptAsync方法,經過函數遞歸實現while,斷定acceptsocket是否正常,觸發NewClientAccepted事件,
  5. 事件觸發AsyncSocketServer 類中的ProcessNewClient方法,從m_ReadWritePool池中取一個空閒的SocketAsyncEventArgProxy,並經過代理,socket建立AsyncSocketSession,並經過socketsession建立Appsession,在建立過程當中作鏈接過濾,初始化app'session,經過receivefactory建立receivefilter,同時初始化socketsession,主要是訂閱SocketAsyncEventArgProxy中的compeleted事件。調用socketsession的start方法
  6. 在socketsession中調用startreceive方法,中調用socket.ReceiveAsync方法,當異步完成時調用socketProxy的SocketEventArgs_Completed方法,該方法調用SocketSession的ProcessReceive方法,在該方法中執行過濾FilterRequest,執行命令,再一次調用startReceive方法,如此不停經過異步直接實現接收循環

send調用流程

在訂閱了NewRequestReceived事件以後,該事件會有兩個參數,一個是appsession,一個是requestinfo,

appsession和socketsession完成,

在appsession的InteralSend函數中對sendtimeout進行限制。

在socketsession中將消息壓入消息棧對消息進行校驗,最終是經過socket.send和socket.sendasync兩個方法將消息發送。

Stop調用流程

先調用stop再調用close

socketserver的stop,釋放m_ReadWritePool中全部SocketAsyncEventArgs,全部listener的stop,釋放其SocketAsyncEventArgs

socket'session的closed,回收全部sendingqqueue到pool中.SuperSocket Layers](LearnSuperSocket.assets/layermodel-1579317752168.jpg)

  • Reusable IO Buffer Pool:BufferManager類

SuperSocket 對象模型圖示意圖

SuperSocket Object Model

SuperSocket 請求處理模型示意圖

SuperSocket Request Handling Model

SuperSocket 隔離模型示意圖

SuperSocket Isolation Model

Config

Command Filters

Log/LogFactory

Command Loaders

1579856396698

ReceiveFilterFactory

1579855102888

ReceiveFilter

1579855469740

Connection Filters

1579856311644

SocketBase.dll

ISessionBase

1579318554194

AppSession

1580125323588

1579319140513

對AppServer和SocketSession的包裝

ServerConfig

服務參數配置,在serverbase基類SetUp中建立

/// <summary>
/// Setups with the specified ip and port.
/// </summary>
/// <param name="ip">The ip.</param>
/// <param name="port">The port.</param>
/// <param name="socketServerFactory">The socket server factory.</param>
/// <param name="receiveFilterFactory">The Receive filter factory.</param>
/// <param name="logFactory">The log factory.</param>
/// <param name="connectionFilters">The connection filters.</param>
/// <param name="commandLoaders">The command loaders.</param>
/// <returns>return setup result</returns>
public bool Setup(string ip, int port, ISocketServerFactory socketServerFactory = null, IReceiveFilterFactory<TRequestInfo> receiveFilterFactory = null, ILogFactory logFactory = null, IEnumerable<IConnectionFilter> connectionFilters = null, IEnumerable<ICommandLoader<ICommand<TAppSession, TRequestInfo>>> commandLoaders = null)
{
    return Setup(new ServerConfig
                    {
                        Ip = ip,
                        Port = port
                    },
                    socketServerFactory,
                    receiveFilterFactory,
                    logFactory,
                    connectionFilters,
                    commandLoaders);
}

1578468089417

RootConfig

1578469017093

  • MaxWorkingThreads:最大工做線程數量
  • MaxCompletionPortThreads:線程池中異步 I/O 線程的最大數目。
  • PerformanceDataCollectInterval:性能數據收集間隔

RequestInfo

類圖

1578446037999

  • 基類是RequestInfo,提供了兩個方法Key和Body,Body是模板,由子類肯定具體類型
  • StringRequestInfo,在父類基礎上提供了一個參數,String[] Parameters
  • RequestInfo<TRequestHeader, TRequestBody>:提供了請求頭和請求體類型的模板。
  • 三個接口,key屬性,body屬性,heater屬性

ListenerInfo

1579947882373

監聽節點

ListenerConfig

1578474012112

ReflectCommandLoader

1578472566238

  • ReflectCommandLoader:經過TryLoadCommands方法反射出程序集中的全部命令
/// <summary>
/// Tries to load commands.
/// </summary>
/// <param name="commands">The commands.</param>
/// <returns></returns>
public override bool TryLoadCommands(out IEnumerable<TCommand> commands)
{
    commands = null;
    var commandAssemblies = new List<Assembly>();
    if (m_AppServer.GetType().Assembly != this.GetType().Assembly)
        commandAssemblies.Add(m_AppServer.GetType().Assembly);
    string commandAssembly = m_AppServer.Config.Options.GetValue("commandAssembly");
    if (!string.IsNullOrEmpty(commandAssembly))
    {
        OnError("The configuration attribute 'commandAssembly' is not in used, please try to use the child node 'commandAssemblies' instead!");
        return false;
    }
    if (m_AppServer.Config.CommandAssemblies != null && m_AppServer.Config.CommandAssemblies.Any())
    {
        try
        {
            var definedAssemblies = AssemblyUtil.GetAssembliesFromStrings(m_AppServer.Config.CommandAssemblies.Select(a => a.Assembly).ToArray());

            if (definedAssemblies.Any())
                commandAssemblies.AddRange(definedAssemblies);
        }
        catch (Exception e)
        {
            OnError(new Exception("Failed to load defined command assemblies!", e));
            return false;
        }
    }
    if (!commandAssemblies.Any())
    {
        commandAssemblies.Add(Assembly.GetEntryAssembly());
    }
    var outputCommands = new List<TCommand>();
    foreach (var assembly in commandAssemblies)
    {
        try
        {
            outputCommands.AddRange(assembly.GetImplementedObjectsByInterface<TCommand>());
        }
        catch (Exception exc)
        {
            OnError(new Exception(string.Format("Failed to get commands from the assembly {0}!", assembly.FullName), exc));
            return false;
        }
    }
    commands = outputCommands;
    return true;
}
}

StatusInfoCollection

1578475314305

AppServerBase

1580125365801

1578487267303

AppSeverBase<TAppSession,TRequestInfo>

m_CommandContainer:命令容器

m_CommandLoaders

m_ConnectionFilters

m_GlobalCommandFilters

m_Listeners

m_SocketServerFactory:在SetupBas

Facility.dll

PolicyReceiveFilterFactory

1579780070988

PolicyRecieveFilter

1579780715003

Protocol

ReceiveFilterBase

類圖

1578464691790

1578464252406

1578466885687

  • 在SuperSocket.SocketBase.Protocol程序集中
  • IReceiveFilter<TRequestInfo>接口,接收解析接口
    • Filter方法,解析會話請求的信息,參數包括,讀取緩衝,偏移量,長度,是否copy,沒有被解析的長度
    • LeftBufferSize屬性:空餘的緩衝區長度
    • NextReceiveFilter屬性,下一個接收解析器
    • Reset方法,恢復初始化
    • State:解析器狀態,正常和錯誤狀態
  • ArraySegmentEx<T>數段類
    • T爲數組模板
    • Array數組,count:數量,Offset偏移量,From從,To到
  • ArraySegmentList<T>數段列表
    • 實現了一個數組段列表
    • m_PrevSegment:當前的數段
    • m_PrevSegmentIndex,數段所在的index
  • ReceiveFilterBase<TRequestInfo>
    • BufferSegments屬性

SocketEngine.dll

PerformanceMonitor

1580126841482

SocketSession

1579319465778

在初始化裏對AppSession產生依賴,同時維護Socket和SmartPool(SendingQueue[]),由於維護着socket因此發送接收數據都是經過這個類。

  • 設置狀態:AddStateFlag()TryAddStateFlag()RemoveStateFlag(),AddStateFlag:自旋設置m_State狀態,線程安全的
  • m_Client:Socket
  • SessionID:new guid
  • LocalEndPoint:本地Id端
  • RemoteEndPoint:遠程終結點
  • m_SendingQueuePool:實際是SmartPool類的實例,該實例維護者sendingQueue數組
  • m_SendingQueue:從SmarlPool中獲取一個SendingQueue實例。

方法

Initialize()方法:

  • 初始化m_SendingQueuePool和m_SendingQueue

TrySend()方法:參數:IList<ArraySegment<byte>> segments:將segments壓入sendingqueue隊列並調用StartSend最終是調用SendAsyncSendSync,這個是由子類實現。

AsyncSocketSession

在子類中維護SocketAsyncEventArgs

  • SocketAsyncProxy:維護着SocketAsyncEventArgs
  • m_SocketEventArgSend:發送的SocketAsyncEventArgs實例

在初始化中若是同步發送就使用m_SocketEventArgSend,並OnSendingCompleted方法綁定其Completed事件

在SendAsync()方法中將SendingQueue實例給m_SocketEventArgSend的UserToken屬性,並調用m_SocketEventArgSend的SetBufferSendAsync方法,發送失敗也調用OnSendingCompleted

SocketAsyncProxy中的Completed事件中調用ProcessReceive方法,再調用this.AppSession.ProcessRequest(e.Buffer, e.Offset, e.BytesTransferred, true);方法

AsyncStreamSocketSession

SocketFactory

1578467105778

/// <summary>
/// Creates the socket server.
/// </summary>
/// <typeparam name="TRequestInfo">The type of the request info.</typeparam>
/// <param name="appServer">The app server.</param>
/// <param name="listeners">The listeners.</param>
/// <param name="config">The config.</param>
/// <returns></returns>
public ISocketServer CreateSocketServer<TRequestInfo>(IAppServer appServer, ListenerInfo[] listeners, IServerConfig config)
    where TRequestInfo : IRequestInfo
{
    if (appServer == null)
        throw new ArgumentNullException("appServer");
    if (listeners == null)
        throw new ArgumentNullException("listeners");
    if (config == null)
        throw new ArgumentNullException("config");
    switch(config.Mode)
    {
        case(SocketMode.Tcp):
            return new AsyncSocketServer(appServer, listeners);
        case(SocketMode.Udp):
            return new UdpSocketServer<TRequestInfo>(appServer, listeners);
        default:
            throw new NotSupportedException("Unsupported SocketMode:" + config.Mode);
    }
}

SocketServers

1578487919018

AsyncSocketServer

  • 緩存管理器m_BufferManager
  • 線程安全的SocketAsyncEventArgsProxy棧

構造函數,父類

public TcpSocketServerBase(IAppServer appServer, ListenerInfo[] listeners)
    : base(appServer, listeners)
{
    var config = appServer.Config;

    uint dummy = 0;
    m_KeepAliveOptionValues = new byte[Marshal.SizeOf(dummy) * 3];
    m_KeepAliveOptionOutValues = new byte[m_KeepAliveOptionValues.Length];
    //whether enable KeepAlive
    BitConverter.GetBytes((uint)1).CopyTo(m_KeepAliveOptionValues, 0);
    //how long will start first keep alive
    BitConverter.GetBytes((uint)(config.KeepAliveTime * 1000)).CopyTo(m_KeepAliveOptionValues, Marshal.SizeOf(dummy));
    //keep alive interval
    BitConverter.GetBytes((uint)(config.KeepAliveInterval * 1000)).CopyTo(m_KeepAliveOptionValues, Marshal.SizeOf(dummy) * 2);

    m_SendTimeOut = config.SendTimeOut;
    m_ReceiveBufferSize = config.ReceiveBufferSize;
    m_SendBufferSize = config.SendBufferSize;
}
public override bool Start()
{
    try
    {
        int bufferSize = AppServer.Config.ReceiveBufferSize;

        if (bufferSize <= 0)
            bufferSize = 1024 * 4;

        m_BufferManager = new BufferManager(bufferSize * AppServer.Config.MaxConnectionNumber, bufferSize);

        try
        {
            m_BufferManager.InitBuffer();
        }
        catch (Exception e)
        {
            AppServer.Logger.Error("Failed to allocate buffer for async socket communication, may because there is no enough memory, please decrease maxConnectionNumber in configuration!", e);
            return false;
        }

        // preallocate pool of SocketAsyncEventArgs objects
        SocketAsyncEventArgs socketEventArg;

        var socketArgsProxyList = new List<SocketAsyncEventArgsProxy>(AppServer.Config.MaxConnectionNumber);

        for (int i = 0; i < AppServer.Config.MaxConnectionNumber; i++)
        {
            //Pre-allocate a set of reusable SocketAsyncEventArgs
            socketEventArg = new SocketAsyncEventArgs();
            m_BufferManager.SetBuffer(socketEventArg);

            socketArgsProxyList.Add(new SocketAsyncEventArgsProxy(socketEventArg));
        }

        m_ReadWritePool = new ConcurrentStack<SocketAsyncEventArgsProxy>(socketArgsProxyList);

        if (!base.Start())
            return false;

        IsRunning = true;
        return true;
    }
    catch (Exception e)
    {
        AppServer.Logger.Error(e);
        return false;
    }
}

SocketAsyncEventArgsProxy

1579316308225

SocketAsyncEventArgs的代理

維護着一個SocketAsyncEventArgs對象,並訂閱了該對象的Completed事件(異步完成事件)

IsRecyclable:是否能夠循環使用

OrigOffset:原始偏移量

每當異步完成的時候調用SocketAsyncEventArgs實例中的UserToken屬性,該屬性實際上保存着SocketSession實例,並調用SocketSession的ProcessReceive()AsyncRun()方法;socketSession.AsyncRun(() => socketSession.ProcessReceive(e));

UserToken屬性是在SocketAsyncEventArgsProxy的初始化方法中定義的

public void Initialize(IAsyncSocketSession socketSession)
{
    SocketEventArgs.UserToken = socketSession;
}

代理模式

img

BootstrapFactory

1580106366845

DefaultBootStrap

1579403530245

引導配置文件並經過配置實例化各個server和factory,在CreateWorkItemInstance方法經過Activator.CreateInstance(serviceType)實例化

ConfigurationWatcher

1580108736600

SocketListenerBase

1579439987190

TcpAsyncSocketListener

監聽類,由三個事件:監聽錯誤,監聽中止,新的客戶端鏈接

m_ListrnSocket:監聽Socket

WorkItemFactoryInfoLoader

1580119613894

配置文件載入 LoadResult,載入配置的connectionFilter,logfactory,commandloaderfactory,將appserver轉化成IworkItem接口,

Common.dll

BufferManager

1578487794792

此類建立一個大緩衝區,該緩衝區能夠分配給每一個套接字I / O操做使用,並分配給SocketAsyncEventArgs對象。 這使得bufffer能夠輕鬆地重用,而且能夠防止堆內存碎片化

BufferManager類上公開的操做不是線程安全的。我以爲這個類不須要線程安全,由於每一個socket得到數據基本不會併發執行。

  • m_buffer:全部的字節緩存
  • m_bufferSize:單個片斷的緩存大小
  • m_currentIndex:當前字節在總緩存中的索引
  • m_freeIndexPool:空閒索引池
  • m_numBytes:緩存片斷的數目

主要提供兩個方法:一個是SetBuffer和FreeBuffer

SetBuffer:

  • 檢查空閒索引棧中是否有值,有值就直接使用空閒索引棧中的值,並將其值從棧中推出,
  • 若是沒有空閒棧的值就先檢查剩餘的緩存是否有一個片斷大小,有的化就設置並改變m_currentIndex索引,沒有返回false

FreeBuffer:

  • 將當前索引添加到空閒索引棧中,並釋放SocketAsyncEventArgs中用的緩存片斷。

ArraySegmentList

1579323401012

方法:

IndexOf:T在全部緩存中的索引

ArraySegmentEx

  • 數組,是保存着全部緩存,T[]
  • 偏移,該片斷在緩存中的位置
  • 數量,該片斷的長度

SendingQueue

1579329295160

維護ArraySegment<byte>[] globalQueue, globalQueue中包含着全部全部緩存

入棧,出戰,開始入棧,開始出棧。

全部的發送隊列內存片組成一個大的arraysegment,由SendingQueueSourceCreator建立,並由SmartPool維護

SendingQueueSourceCreator

實際就是SmartPoolSourceCreator,發送隊列建立者,默認有5個發送隊列,其實每一個鏈接一個發送隊列,這邊的全部sendingQueue組數是由SmartPool維護的

m_SendingQueueSize:發送隊列大小,默認爲5

/// <summary>
/// Creates the specified size.
/// </summary>
/// <param name="size">The size.</param>
/// <param name="poolItems">The pool items.</param>
/// <returns></returns>
public ISmartPoolSource Create(int size, out SendingQueue[] poolItems)
{
    var source = new ArraySegment<byte>[size * m_SendingQueueSize];//256*5
    poolItems = new SendingQueue[size];//size=256
    for (var i = 0; i < size; i++)
    {
        poolItems[i] = new SendingQueue(source, i * m_SendingQueueSize, m_SendingQueueSize);//SendingQueue中的source是全部的隊列緩存,發送隊列偏移量和發送隊列容量
    }
    return new SmartPoolSource(source, size);
}

SmartPool

1579327943978

其中維護了一個T(實際是SendingQueue)線程安全棧(m_GlobalStack)。由此看出SmartPool就是SendingQueue的池

m_MinPoolSize:Math.Max(config.MaxConnectionNumber / 6, 256)

m_MaxPoolSize:Math.Max(config.MaxConnectionNumber * 2, 256)

m_SourceCreator:new SendingQueueSourceCreator(config.SendingQueueSize)

m_ItemsSource:保存着SmartPoolSource[]對象,該對象其實是全部的sendingqueue緩存。

m_GlobalStack:保存着單個SendingQueuep對象的數組

Initialize():初始化函數,初始化上面的變量

SmartPoolSource

維護全部的發送隊列緩存,並保存sendingQueue的個數

Source:是object類型,其實是ArraySegment<byte>[],其實是全部的sendingqueue的緩存,大小爲size*sendingqueuesize=256*5

Count:爲默認值5

Other.dll

SocketAsyncEventArgs

表示異步套接字操做。

設置IP和Port調用流程

  1. 建立ServerConfig實例,RootConfig實例
  2. 設置m_State狀態,線程安全的,經過Interlocked.CompareExchange方法設置
  3. 在setbasic中設置RootConfig,m_Name,Config,設置currentculture,設置線程池參數,設置m_socketfactory,設置textencoding,
  4. 設置logfactory
  5. 在setMedium中設置ReceiveFilterFactory,m_ConnectionFilters,m_CommandLoaders(add ReflectCommandLoader
  6. 在SetupAdvanced中設置BaseSecurity和Certificate,設置listners(ListenerInfo) 設置CommandFilterAttribute,遍歷m_CommandLoaders,訂閱Error,Updated事件,調用Initialize方法,經過TryLoadCommands方法獲取命令集合commands,遍歷命令集合添加命令到discoveredCommands集合中
  7. 遍歷discoveredCommands集合,將其添加到命令容器 m_CommandContainer中,使用Interlocked.Exchange方法保證線程安全
  8. 在SetupFinal中設置ReceiveFilterFactory=new CommandLineReceiveFilterFactory(TextEncoding),設置m_ServerStatus,經過socketfactory得到serverfactory。

start調用流程

  1. 調用SuperSocket.SocketBase.AppServer中start()方法,調用基類AppServerBase的start()方法,該方法中調用socketserver的start方法
  2. 在socketserver的start方法中設置BufferManager,建立SocketAsyncEventArg,並經過buffermanager設置其buffer,並建立SocketAsyncEventArgProxys, SocketAsyncEventArgProxys集合賦值給m_ReadWritePool。調用SocketServer基類中的start
  3. 在socketserver基類的start中建立SendingQueuePool並初始化,實際是初始化隊列池中的sendingqueue隊列;經過遍歷ListenerInfo集合建立TcpAsyncSocketListener監聽者,訂閱監聽者的stop,error,NewClientAccepted事件,並開始監聽Listener.Start,也添加到容器中。
  4. Listener.Start中建立一個監聽Listen_socket和new異步套接字SocketAsyncEventArgs,並訂閱Compeleted事件,啓用socket監聽,並調用AcceptAsync方法,異步完成觸發compeleted事件,調用ProcessAccept方法,原來的方法異步已經觸發從新調用一下AcceptAsync方法,經過函數遞歸實現while,斷定acceptsocket是否正常,觸發NewClientAccepted事件,
  5. 事件觸發AsyncSocketServer 類中的ProcessNewClient方法,從m_ReadWritePool池中取一個空閒的SocketAsyncEventArgProxy,並經過代理,socket建立AsyncSocketSession,並經過socketsession建立Appsession,在建立過程當中作鏈接過濾,初始化app'session,經過receivefactory建立receivefilter,同時初始化socketsession,主要是訂閱SocketAsyncEventArgProxy中的compeleted事件。調用socketsession的start方法
  6. 在socketsession中調用startreceive方法,中調用socket.ReceiveAsync方法,當異步完成時調用socketProxy的SocketEventArgs_Completed方法,該方法調用SocketSession的ProcessReceive方法,在該方法中執行過濾FilterRequest,執行命令,再一次調用startReceive方法,如此不停經過異步直接實現接收循環

send調用流程

在訂閱了NewRequestReceived事件以後,該事件會有兩個參數,一個是appsession,一個是requestinfo,

appsession和socketsession完成,

在appsession的InteralSend函數中對sendtimeout進行限制。

在socketsession中將消息壓入消息棧對消息進行校驗,最終是經過socket.send和socket.sendasync兩個方法將消息發送。

Stop調用流程

先調用stop再調用close

socketserver的stop,釋放m_ReadWritePool中全部SocketAsyncEventArgs,全部listener的stop,釋放其SocketAsyncEventArgs

socket'session的closed,回收全部sendingqqueue到pool中.SuperSocket Layers](LearnSuperSocket.assets/layermodel-1579317752168.jpg)

  • Reusable IO Buffer Pool:BufferManager類

SuperSocket 對象模型圖示意圖

SuperSocket Object Model

SuperSocket 請求處理模型示意圖

SuperSocket Request Handling Model

SuperSocket 隔離模型示意圖

SuperSocket Isolation Model

Config

Command Filters

Log/LogFactory

Command Loaders

1579856396698

ReceiveFilterFactory

1579855102888

ReceiveFilter

1579855469740

Connection Filters

1579856311644

SocketBase.dll

ISessionBase

1579318554194

AppSession

1580125323588

1579319140513

對AppServer和SocketSession的包裝

ServerConfig

服務參數配置,在serverbase基類SetUp中建立

/// <summary>
/// Setups with the specified ip and port.
/// </summary>
/// <param name="ip">The ip.</param>
/// <param name="port">The port.</param>
/// <param name="socketServerFactory">The socket server factory.</param>
/// <param name="receiveFilterFactory">The Receive filter factory.</param>
/// <param name="logFactory">The log factory.</param>
/// <param name="connectionFilters">The connection filters.</param>
/// <param name="commandLoaders">The command loaders.</param>
/// <returns>return setup result</returns>
public bool Setup(string ip, int port, ISocketServerFactory socketServerFactory = null, IReceiveFilterFactory<TRequestInfo> receiveFilterFactory = null, ILogFactory logFactory = null, IEnumerable<IConnectionFilter> connectionFilters = null, IEnumerable<ICommandLoader<ICommand<TAppSession, TRequestInfo>>> commandLoaders = null)
{
    return Setup(new ServerConfig
                    {
                        Ip = ip,
                        Port = port
                    },
                    socketServerFactory,
                    receiveFilterFactory,
                    logFactory,
                    connectionFilters,
                    commandLoaders);
}

1578468089417

RootConfig

1578469017093

  • MaxWorkingThreads:最大工做線程數量
  • MaxCompletionPortThreads:線程池中異步 I/O 線程的最大數目。
  • PerformanceDataCollectInterval:性能數據收集間隔

RequestInfo

類圖

1578446037999

  • 基類是RequestInfo,提供了兩個方法Key和Body,Body是模板,由子類肯定具體類型
  • StringRequestInfo,在父類基礎上提供了一個參數,String[] Parameters
  • RequestInfo<TRequestHeader, TRequestBody>:提供了請求頭和請求體類型的模板。
  • 三個接口,key屬性,body屬性,heater屬性

ListenerInfo

1579947882373

監聽節點

ListenerConfig

1578474012112

ReflectCommandLoader

1578472566238

  • ReflectCommandLoader:經過TryLoadCommands方法反射出程序集中的全部命令
/// <summary>
/// Tries to load commands.
/// </summary>
/// <param name="commands">The commands.</param>
/// <returns></returns>
public override bool TryLoadCommands(out IEnumerable<TCommand> commands)
{
    commands = null;
    var commandAssemblies = new List<Assembly>();
    if (m_AppServer.GetType().Assembly != this.GetType().Assembly)
        commandAssemblies.Add(m_AppServer.GetType().Assembly);
    string commandAssembly = m_AppServer.Config.Options.GetValue("commandAssembly");
    if (!string.IsNullOrEmpty(commandAssembly))
    {
        OnError("The configuration attribute 'commandAssembly' is not in used, please try to use the child node 'commandAssemblies' instead!");
        return false;
    }
    if (m_AppServer.Config.CommandAssemblies != null && m_AppServer.Config.CommandAssemblies.Any())
    {
        try
        {
            var definedAssemblies = AssemblyUtil.GetAssembliesFromStrings(m_AppServer.Config.CommandAssemblies.Select(a => a.Assembly).ToArray());

            if (definedAssemblies.Any())
                commandAssemblies.AddRange(definedAssemblies);
        }
        catch (Exception e)
        {
            OnError(new Exception("Failed to load defined command assemblies!", e));
            return false;
        }
    }
    if (!commandAssemblies.Any())
    {
        commandAssemblies.Add(Assembly.GetEntryAssembly());
    }
    var outputCommands = new List<TCommand>();
    foreach (var assembly in commandAssemblies)
    {
        try
        {
            outputCommands.AddRange(assembly.GetImplementedObjectsByInterface<TCommand>());
        }
        catch (Exception exc)
        {
            OnError(new Exception(string.Format("Failed to get commands from the assembly {0}!", assembly.FullName), exc));
            return false;
        }
    }
    commands = outputCommands;
    return true;
}
}

StatusInfoCollection

1578475314305

AppServerBase

1580125365801

1578487267303

AppSeverBase<TAppSession,TRequestInfo>

m_CommandContainer:命令容器

m_CommandLoaders

m_ConnectionFilters

m_GlobalCommandFilters

m_Listeners

m_SocketServerFactory:在SetupBas

Facility.dll

PolicyReceiveFilterFactory

1579780070988

PolicyRecieveFilter

1579780715003

Protocol

ReceiveFilterBase

類圖

1578464691790

1578464252406

1578466885687

  • 在SuperSocket.SocketBase.Protocol程序集中
  • IReceiveFilter<TRequestInfo>接口,接收解析接口
    • Filter方法,解析會話請求的信息,參數包括,讀取緩衝,偏移量,長度,是否copy,沒有被解析的長度
    • LeftBufferSize屬性:空餘的緩衝區長度
    • NextReceiveFilter屬性,下一個接收解析器
    • Reset方法,恢復初始化
    • State:解析器狀態,正常和錯誤狀態
  • ArraySegmentEx<T>數段類
    • T爲數組模板
    • Array數組,count:數量,Offset偏移量,From從,To到
  • ArraySegmentList<T>數段列表
    • 實現了一個數組段列表
    • m_PrevSegment:當前的數段
    • m_PrevSegmentIndex,數段所在的index
  • ReceiveFilterBase<TRequestInfo>
    • BufferSegments屬性

SocketEngine.dll

PerformanceMonitor

1580126841482

SocketSession

1579319465778

在初始化裏對AppSession產生依賴,同時維護Socket和SmartPool(SendingQueue[]),由於維護着socket因此發送接收數據都是經過這個類。

  • 設置狀態:AddStateFlag()TryAddStateFlag()RemoveStateFlag(),AddStateFlag:自旋設置m_State狀態,線程安全的
  • m_Client:Socket
  • SessionID:new guid
  • LocalEndPoint:本地Id端
  • RemoteEndPoint:遠程終結點
  • m_SendingQueuePool:實際是SmartPool類的實例,該實例維護者sendingQueue數組
  • m_SendingQueue:從SmarlPool中獲取一個SendingQueue實例。

方法

Initialize()方法:

  • 初始化m_SendingQueuePool和m_SendingQueue

TrySend()方法:參數:IList<ArraySegment<byte>> segments:將segments壓入sendingqueue隊列並調用StartSend最終是調用SendAsyncSendSync,這個是由子類實現。

AsyncSocketSession

在子類中維護SocketAsyncEventArgs

  • SocketAsyncProxy:維護着SocketAsyncEventArgs
  • m_SocketEventArgSend:發送的SocketAsyncEventArgs實例

在初始化中若是同步發送就使用m_SocketEventArgSend,並OnSendingCompleted方法綁定其Completed事件

在SendAsync()方法中將SendingQueue實例給m_SocketEventArgSend的UserToken屬性,並調用m_SocketEventArgSend的SetBufferSendAsync方法,發送失敗也調用OnSendingCompleted

SocketAsyncProxy中的Completed事件中調用ProcessReceive方法,再調用this.AppSession.ProcessRequest(e.Buffer, e.Offset, e.BytesTransferred, true);方法

AsyncStreamSocketSession

SocketFactory

1578467105778

/// <summary>
/// Creates the socket server.
/// </summary>
/// <typeparam name="TRequestInfo">The type of the request info.</typeparam>
/// <param name="appServer">The app server.</param>
/// <param name="listeners">The listeners.</param>
/// <param name="config">The config.</param>
/// <returns></returns>
public ISocketServer CreateSocketServer<TRequestInfo>(IAppServer appServer, ListenerInfo[] listeners, IServerConfig config)
    where TRequestInfo : IRequestInfo
{
    if (appServer == null)
        throw new ArgumentNullException("appServer");
    if (listeners == null)
        throw new ArgumentNullException("listeners");
    if (config == null)
        throw new ArgumentNullException("config");
    switch(config.Mode)
    {
        case(SocketMode.Tcp):
            return new AsyncSocketServer(appServer, listeners);
        case(SocketMode.Udp):
            return new UdpSocketServer<TRequestInfo>(appServer, listeners);
        default:
            throw new NotSupportedException("Unsupported SocketMode:" + config.Mode);
    }
}

SocketServers

1578487919018

AsyncSocketServer

  • 緩存管理器m_BufferManager
  • 線程安全的SocketAsyncEventArgsProxy棧

構造函數,父類

public TcpSocketServerBase(IAppServer appServer, ListenerInfo[] listeners)
    : base(appServer, listeners)
{
    var config = appServer.Config;

    uint dummy = 0;
    m_KeepAliveOptionValues = new byte[Marshal.SizeOf(dummy) * 3];
    m_KeepAliveOptionOutValues = new byte[m_KeepAliveOptionValues.Length];
    //whether enable KeepAlive
    BitConverter.GetBytes((uint)1).CopyTo(m_KeepAliveOptionValues, 0);
    //how long will start first keep alive
    BitConverter.GetBytes((uint)(config.KeepAliveTime * 1000)).CopyTo(m_KeepAliveOptionValues, Marshal.SizeOf(dummy));
    //keep alive interval
    BitConverter.GetBytes((uint)(config.KeepAliveInterval * 1000)).CopyTo(m_KeepAliveOptionValues, Marshal.SizeOf(dummy) * 2);

    m_SendTimeOut = config.SendTimeOut;
    m_ReceiveBufferSize = config.ReceiveBufferSize;
    m_SendBufferSize = config.SendBufferSize;
}
public override bool Start()
{
    try
    {
        int bufferSize = AppServer.Config.ReceiveBufferSize;

        if (bufferSize <= 0)
            bufferSize = 1024 * 4;

        m_BufferManager = new BufferManager(bufferSize * AppServer.Config.MaxConnectionNumber, bufferSize);

        try
        {
            m_BufferManager.InitBuffer();
        }
        catch (Exception e)
        {
            AppServer.Logger.Error("Failed to allocate buffer for async socket communication, may because there is no enough memory, please decrease maxConnectionNumber in configuration!", e);
            return false;
        }

        // preallocate pool of SocketAsyncEventArgs objects
        SocketAsyncEventArgs socketEventArg;

        var socketArgsProxyList = new List<SocketAsyncEventArgsProxy>(AppServer.Config.MaxConnectionNumber);

        for (int i = 0; i < AppServer.Config.MaxConnectionNumber; i++)
        {
            //Pre-allocate a set of reusable SocketAsyncEventArgs
            socketEventArg = new SocketAsyncEventArgs();
            m_BufferManager.SetBuffer(socketEventArg);

            socketArgsProxyList.Add(new SocketAsyncEventArgsProxy(socketEventArg));
        }

        m_ReadWritePool = new ConcurrentStack<SocketAsyncEventArgsProxy>(socketArgsProxyList);

        if (!base.Start())
            return false;

        IsRunning = true;
        return true;
    }
    catch (Exception e)
    {
        AppServer.Logger.Error(e);
        return false;
    }
}

SocketAsyncEventArgsProxy

1579316308225

SocketAsyncEventArgs的代理

維護着一個SocketAsyncEventArgs對象,並訂閱了該對象的Completed事件(異步完成事件)

IsRecyclable:是否能夠循環使用

OrigOffset:原始偏移量

每當異步完成的時候調用SocketAsyncEventArgs實例中的UserToken屬性,該屬性實際上保存着SocketSession實例,並調用SocketSession的ProcessReceive()AsyncRun()方法;socketSession.AsyncRun(() => socketSession.ProcessReceive(e));

UserToken屬性是在SocketAsyncEventArgsProxy的初始化方法中定義的

public void Initialize(IAsyncSocketSession socketSession)
{
    SocketEventArgs.UserToken = socketSession;
}

代理模式

img

BootstrapFactory

1580106366845

DefaultBootStrap

1579403530245

引導配置文件並經過配置實例化各個server和factory,在CreateWorkItemInstance方法經過Activator.CreateInstance(serviceType)實例化

ConfigurationWatcher

1580108736600

SocketListenerBase

1579439987190

TcpAsyncSocketListener

監聽類,由三個事件:監聽錯誤,監聽中止,新的客戶端鏈接

m_ListrnSocket:監聽Socket

WorkItemFactoryInfoLoader

1580119613894

配置文件載入 LoadResult,載入配置的connectionFilter,logfactory,commandloaderfactory,將appserver轉化成IworkItem接口,

Common.dll

BufferManager

1578487794792

此類建立一個大緩衝區,該緩衝區能夠分配給每一個套接字I / O操做使用,並分配給SocketAsyncEventArgs對象。 這使得bufffer能夠輕鬆地重用,而且能夠防止堆內存碎片化

BufferManager類上公開的操做不是線程安全的。我以爲這個類不須要線程安全,由於每一個socket得到數據基本不會併發執行。

  • m_buffer:全部的字節緩存
  • m_bufferSize:單個片斷的緩存大小
  • m_currentIndex:當前字節在總緩存中的索引
  • m_freeIndexPool:空閒索引池
  • m_numBytes:緩存片斷的數目

主要提供兩個方法:一個是SetBuffer和FreeBuffer

SetBuffer:

  • 檢查空閒索引棧中是否有值,有值就直接使用空閒索引棧中的值,並將其值從棧中推出,
  • 若是沒有空閒棧的值就先檢查剩餘的緩存是否有一個片斷大小,有的化就設置並改變m_currentIndex索引,沒有返回false

FreeBuffer:

  • 將當前索引添加到空閒索引棧中,並釋放SocketAsyncEventArgs中用的緩存片斷。

ArraySegmentList

1579323401012

方法:

IndexOf:T在全部緩存中的索引

ArraySegmentEx

  • 數組,是保存着全部緩存,T[]
  • 偏移,該片斷在緩存中的位置
  • 數量,該片斷的長度

SendingQueue

1579329295160

維護ArraySegment<byte>[] globalQueue, globalQueue中包含着全部全部緩存

入棧,出戰,開始入棧,開始出棧。

全部的發送隊列內存片組成一個大的arraysegment,由SendingQueueSourceCreator建立,並由SmartPool維護

SendingQueueSourceCreator

實際就是SmartPoolSourceCreator,發送隊列建立者,默認有5個發送隊列,其實每一個鏈接一個發送隊列,這邊的全部sendingQueue組數是由SmartPool維護的

m_SendingQueueSize:發送隊列大小,默認爲5

/// <summary>
/// Creates the specified size.
/// </summary>
/// <param name="size">The size.</param>
/// <param name="poolItems">The pool items.</param>
/// <returns></returns>
public ISmartPoolSource Create(int size, out SendingQueue[] poolItems)
{
    var source = new ArraySegment<byte>[size * m_SendingQueueSize];//256*5
    poolItems = new SendingQueue[size];//size=256
    for (var i = 0; i < size; i++)
    {
        poolItems[i] = new SendingQueue(source, i * m_SendingQueueSize, m_SendingQueueSize);//SendingQueue中的source是全部的隊列緩存,發送隊列偏移量和發送隊列容量
    }
    return new SmartPoolSource(source, size);
}

SmartPool

1579327943978

其中維護了一個T(實際是SendingQueue)線程安全棧(m_GlobalStack)。由此看出SmartPool就是SendingQueue的池

m_MinPoolSize:Math.Max(config.MaxConnectionNumber / 6, 256)

m_MaxPoolSize:Math.Max(config.MaxConnectionNumber * 2, 256)

m_SourceCreator:new SendingQueueSourceCreator(config.SendingQueueSize)

m_ItemsSource:保存着SmartPoolSource[]對象,該對象其實是全部的sendingqueue緩存。

m_GlobalStack:保存着單個SendingQueuep對象的數組

Initialize():初始化函數,初始化上面的變量

SmartPoolSource

維護全部的發送隊列緩存,並保存sendingQueue的個數

Source:是object類型,其實是ArraySegment<byte>[],其實是全部的sendingqueue的緩存,大小爲size*sendingqueuesize=256*5

Count:爲默認值5

Other.dll

SocketAsyncEventArgs

表示異步套接字操做。

設置IP和Port調用流程

  1. 建立ServerConfig實例,RootConfig實例
  2. 設置m_State狀態,線程安全的,經過Interlocked.CompareExchange方法設置
  3. 在setbasic中設置RootConfig,m_Name,Config,設置currentculture,設置線程池參數,設置m_socketfactory,設置textencoding,
  4. 設置logfactory
  5. 在setMedium中設置ReceiveFilterFactory,m_ConnectionFilters,m_CommandLoaders(add ReflectCommandLoader
  6. 在SetupAdvanced中設置BaseSecurity和Certificate,設置listners(ListenerInfo) 設置CommandFilterAttribute,遍歷m_CommandLoaders,訂閱Error,Updated事件,調用Initialize方法,經過TryLoadCommands方法獲取命令集合commands,遍歷命令集合添加命令到discoveredCommands集合中
  7. 遍歷discoveredCommands集合,將其添加到命令容器 m_CommandContainer中,使用Interlocked.Exchange方法保證線程安全
  8. 在SetupFinal中設置ReceiveFilterFactory=new CommandLineReceiveFilterFactory(TextEncoding),設置m_ServerStatus,經過socketfactory得到serverfactory。

start調用流程

  1. 調用SuperSocket.SocketBase.AppServer中start()方法,調用基類AppServerBase的start()方法,該方法中調用socketserver的start方法
  2. 在socketserver的start方法中設置BufferManager,建立SocketAsyncEventArg,並經過buffermanager設置其buffer,並建立SocketAsyncEventArgProxys, SocketAsyncEventArgProxys集合賦值給m_ReadWritePool。調用SocketServer基類中的start
  3. 在socketserver基類的start中建立SendingQueuePool並初始化,實際是初始化隊列池中的sendingqueue隊列;經過遍歷ListenerInfo集合建立TcpAsyncSocketListener監聽者,訂閱監聽者的stop,error,NewClientAccepted事件,並開始監聽Listener.Start,也添加到容器中。
  4. Listener.Start中建立一個監聽Listen_socket和new異步套接字SocketAsyncEventArgs,並訂閱Compeleted事件,啓用socket監聽,並調用AcceptAsync方法,異步完成觸發compeleted事件,調用ProcessAccept方法,原來的方法異步已經觸發從新調用一下AcceptAsync方法,經過函數遞歸實現while,斷定acceptsocket是否正常,觸發NewClientAccepted事件,
  5. 事件觸發AsyncSocketServer 類中的ProcessNewClient方法,從m_ReadWritePool池中取一個空閒的SocketAsyncEventArgProxy,並經過代理,socket建立AsyncSocketSession,並經過socketsession建立Appsession,在建立過程當中作鏈接過濾,初始化app'session,經過receivefactory建立receivefilter,同時初始化socketsession,主要是訂閱SocketAsyncEventArgProxy中的compeleted事件。調用socketsession的start方法
  6. 在socketsession中調用startreceive方法,中調用socket.ReceiveAsync方法,當異步完成時調用socketProxy的SocketEventArgs_Completed方法,該方法調用SocketSession的ProcessReceive方法,在該方法中執行過濾FilterRequest,執行命令,再一次調用startReceive方法,如此不停經過異步直接實現接收循環

send調用流程

在訂閱了NewRequestReceived事件以後,該事件會有兩個參數,一個是appsession,一個是requestinfo,

appsession和socketsession完成,

在appsession的InteralSend函數中對sendtimeout進行限制。

在socketsession中將消息壓入消息棧對消息進行校驗,最終是經過socket.send和socket.sendasync兩個方法將消息發送。

Stop調用流程

先調用stop再調用close

socketserver的stop,釋放m_ReadWritePool中全部SocketAsyncEventArgs,全部listener的stop,釋放其SocketAsyncEventArgs

socket'session的closed,回收全部sendingqqueue到pool中.

相關文章
相關標籤/搜索