System.Net.Sockets.dll程序集中使用socket類:node
服務器:bootstrap
_socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
IPAddress _ip = IPAddress.Parse(ip);_endPoint = new IPEndPoint(_ip, port);
_socket.Bind(_endPoint);
//綁定端口_socket.Listen(BACKLOG);
//開啓監聽,backlog是監聽的最大數列Socket acceptSocket = _socket.Accept();
socket.RemoteEndPoint.ToString();
while (sInfo.isConnected){sInfo.socket.BeginReceive(sInfo.buffer, 0, sInfo.buffer.Length, SocketFlags.None, ReceiveCallBack, sInfo.socket.RemoteEndPoint);}
來接收客戶端傳來的消息。socket.Send(Encoding.ASCII.GetBytes(text));
receivebuffer默認值8192c#
異步套接字操做api
ListenerSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
ListenerSocket.Bind(e);
ListenerSocket.Listen(10);
Args = new SocketAsyncEventArgs();Args.Completed += new EventHandler<SocketAsyncEventArgs>(ProcessAccept);
ListenerSocket.AcceptAsync(Args);
var args = new SocketAsyncEventArgs();args.Completed += new EventHandler<SocketAsyncEventArgs>(OnIOCompleted);args.AcceptSocket = s;s.ReceiveAsync(args)
s.ReceiveAsync(args),s接收的socket的,新建一個異步套接字,並傳入ReceiveAsync()方法。switch (e.LastOperation)case SocketAsyncOperation.Receive:
Socket.AcceptAsync(SocketAsyncEventArgs) 方法數組
返回:若是 I/O 操做掛起,則爲 true
。 操做完成時,將引起 Completed 參數的 e
事件。緩存
若是 I/O 操做同步完成,則爲 false
。 將不會引起 Completed 參數的 e
事件,而且可能在方法調用返回後當即檢查做爲參數傳遞的 e
對象以檢索操做的結果。安全
![# SuperSocket1.6Code解析服務器
System.Net.Sockets.dll程序集中使用socket類:session
服務器:併發
_socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
IPAddress _ip = IPAddress.Parse(ip);_endPoint = new IPEndPoint(_ip, port);
_socket.Bind(_endPoint);
//綁定端口_socket.Listen(BACKLOG);
//開啓監聽,backlog是監聽的最大數列Socket acceptSocket = _socket.Accept();
socket.RemoteEndPoint.ToString();
while (sInfo.isConnected){sInfo.socket.BeginReceive(sInfo.buffer, 0, sInfo.buffer.Length, SocketFlags.None, ReceiveCallBack, sInfo.socket.RemoteEndPoint);}
來接收客戶端傳來的消息。socket.Send(Encoding.ASCII.GetBytes(text));
receivebuffer默認值8192
異步套接字操做
ListenerSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
ListenerSocket.Bind(e);
ListenerSocket.Listen(10);
Args = new SocketAsyncEventArgs();Args.Completed += new EventHandler<SocketAsyncEventArgs>(ProcessAccept);
ListenerSocket.AcceptAsync(Args);
var args = new SocketAsyncEventArgs();args.Completed += new EventHandler<SocketAsyncEventArgs>(OnIOCompleted);args.AcceptSocket = s;s.ReceiveAsync(args)
s.ReceiveAsync(args),s接收的socket的,新建一個異步套接字,並傳入ReceiveAsync()方法。switch (e.LastOperation)case SocketAsyncOperation.Receive:
Socket.AcceptAsync(SocketAsyncEventArgs) 方法
返回:若是 I/O 操做掛起,則爲 true
。 操做完成時,將引起 Completed 參數的 e
事件。
若是 I/O 操做同步完成,則爲 false
。 將不會引起 Completed 參數的 e
事件,而且可能在方法調用返回後當即檢查做爲參數傳遞的 e
對象以檢索操做的結果。
![# SuperSocket1.6Code解析
System.Net.Sockets.dll程序集中使用socket類:
服務器:
_socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
IPAddress _ip = IPAddress.Parse(ip);_endPoint = new IPEndPoint(_ip, port);
_socket.Bind(_endPoint);
//綁定端口_socket.Listen(BACKLOG);
//開啓監聽,backlog是監聽的最大數列Socket acceptSocket = _socket.Accept();
socket.RemoteEndPoint.ToString();
while (sInfo.isConnected){sInfo.socket.BeginReceive(sInfo.buffer, 0, sInfo.buffer.Length, SocketFlags.None, ReceiveCallBack, sInfo.socket.RemoteEndPoint);}
來接收客戶端傳來的消息。socket.Send(Encoding.ASCII.GetBytes(text));
receivebuffer默認值8192
異步套接字操做
ListenerSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
ListenerSocket.Bind(e);
ListenerSocket.Listen(10);
Args = new SocketAsyncEventArgs();Args.Completed += new EventHandler<SocketAsyncEventArgs>(ProcessAccept);
ListenerSocket.AcceptAsync(Args);
var args = new SocketAsyncEventArgs();args.Completed += new EventHandler<SocketAsyncEventArgs>(OnIOCompleted);args.AcceptSocket = s;s.ReceiveAsync(args)
s.ReceiveAsync(args),s接收的socket的,新建一個異步套接字,並傳入ReceiveAsync()方法。switch (e.LastOperation)case SocketAsyncOperation.Receive:
Socket.AcceptAsync(SocketAsyncEventArgs) 方法
返回:若是 I/O 操做掛起,則爲 true
。 操做完成時,將引起 Completed 參數的 e
事件。
若是 I/O 操做同步完成,則爲 false
。 將不會引起 Completed 參數的 e
事件,而且可能在方法調用返回後當即檢查做爲參數傳遞的 e
對象以檢索操做的結果。
對AppServer和SocketSession的包裝
服務參數配置,在serverbase基類SetUp中建立
/// <summary> /// Setups with the specified ip and port. /// </summary> /// <param name="ip">The ip.</param> /// <param name="port">The port.</param> /// <param name="socketServerFactory">The socket server factory.</param> /// <param name="receiveFilterFactory">The Receive filter factory.</param> /// <param name="logFactory">The log factory.</param> /// <param name="connectionFilters">The connection filters.</param> /// <param name="commandLoaders">The command loaders.</param> /// <returns>return setup result</returns> public bool Setup(string ip, int port, ISocketServerFactory socketServerFactory = null, IReceiveFilterFactory<TRequestInfo> receiveFilterFactory = null, ILogFactory logFactory = null, IEnumerable<IConnectionFilter> connectionFilters = null, IEnumerable<ICommandLoader<ICommand<TAppSession, TRequestInfo>>> commandLoaders = null) { return Setup(new ServerConfig { Ip = ip, Port = port }, socketServerFactory, receiveFilterFactory, logFactory, connectionFilters, commandLoaders); }
類圖
監聽節點
/// <summary> /// Tries to load commands. /// </summary> /// <param name="commands">The commands.</param> /// <returns></returns> public override bool TryLoadCommands(out IEnumerable<TCommand> commands) { commands = null; var commandAssemblies = new List<Assembly>(); if (m_AppServer.GetType().Assembly != this.GetType().Assembly) commandAssemblies.Add(m_AppServer.GetType().Assembly); string commandAssembly = m_AppServer.Config.Options.GetValue("commandAssembly"); if (!string.IsNullOrEmpty(commandAssembly)) { OnError("The configuration attribute 'commandAssembly' is not in used, please try to use the child node 'commandAssemblies' instead!"); return false; } if (m_AppServer.Config.CommandAssemblies != null && m_AppServer.Config.CommandAssemblies.Any()) { try { var definedAssemblies = AssemblyUtil.GetAssembliesFromStrings(m_AppServer.Config.CommandAssemblies.Select(a => a.Assembly).ToArray()); if (definedAssemblies.Any()) commandAssemblies.AddRange(definedAssemblies); } catch (Exception e) { OnError(new Exception("Failed to load defined command assemblies!", e)); return false; } } if (!commandAssemblies.Any()) { commandAssemblies.Add(Assembly.GetEntryAssembly()); } var outputCommands = new List<TCommand>(); foreach (var assembly in commandAssemblies) { try { outputCommands.AddRange(assembly.GetImplementedObjectsByInterface<TCommand>()); } catch (Exception exc) { OnError(new Exception(string.Format("Failed to get commands from the assembly {0}!", assembly.FullName), exc)); return false; } } commands = outputCommands; return true; } }
m_CommandContainer:命令容器
m_CommandLoaders
m_ConnectionFilters
m_GlobalCommandFilters
m_Listeners
m_SocketServerFactory:在SetupBas
ReceiveFilterBase
類圖
在初始化裏對AppSession產生依賴,同時維護Socket和SmartPool(SendingQueue[]),由於維護着socket因此發送接收數據都是經過這個類。
方法
Initialize()方法:
TrySend()方法:參數:IList<ArraySegment<byte>> segments:將segments壓入sendingqueue隊列並調用StartSend最終是調用SendAsync或SendSync,這個是由子類實現。
在子類中維護SocketAsyncEventArgs
在初始化中若是同步發送就使用m_SocketEventArgSend,並OnSendingCompleted方法綁定其Completed事件
在SendAsync()方法中將SendingQueue實例給m_SocketEventArgSend的UserToken屬性,並調用m_SocketEventArgSend的SetBuffer和SendAsync方法,發送失敗也調用OnSendingCompleted
SocketAsyncProxy中的Completed事件中調用ProcessReceive方法,再調用this.AppSession.ProcessRequest(e.Buffer, e.Offset, e.BytesTransferred, true);
方法
/// <summary> /// Creates the socket server. /// </summary> /// <typeparam name="TRequestInfo">The type of the request info.</typeparam> /// <param name="appServer">The app server.</param> /// <param name="listeners">The listeners.</param> /// <param name="config">The config.</param> /// <returns></returns> public ISocketServer CreateSocketServer<TRequestInfo>(IAppServer appServer, ListenerInfo[] listeners, IServerConfig config) where TRequestInfo : IRequestInfo { if (appServer == null) throw new ArgumentNullException("appServer"); if (listeners == null) throw new ArgumentNullException("listeners"); if (config == null) throw new ArgumentNullException("config"); switch(config.Mode) { case(SocketMode.Tcp): return new AsyncSocketServer(appServer, listeners); case(SocketMode.Udp): return new UdpSocketServer<TRequestInfo>(appServer, listeners); default: throw new NotSupportedException("Unsupported SocketMode:" + config.Mode); } }
構造函數,父類
public TcpSocketServerBase(IAppServer appServer, ListenerInfo[] listeners) : base(appServer, listeners) { var config = appServer.Config; uint dummy = 0; m_KeepAliveOptionValues = new byte[Marshal.SizeOf(dummy) * 3]; m_KeepAliveOptionOutValues = new byte[m_KeepAliveOptionValues.Length]; //whether enable KeepAlive BitConverter.GetBytes((uint)1).CopyTo(m_KeepAliveOptionValues, 0); //how long will start first keep alive BitConverter.GetBytes((uint)(config.KeepAliveTime * 1000)).CopyTo(m_KeepAliveOptionValues, Marshal.SizeOf(dummy)); //keep alive interval BitConverter.GetBytes((uint)(config.KeepAliveInterval * 1000)).CopyTo(m_KeepAliveOptionValues, Marshal.SizeOf(dummy) * 2); m_SendTimeOut = config.SendTimeOut; m_ReceiveBufferSize = config.ReceiveBufferSize; m_SendBufferSize = config.SendBufferSize; }
public override bool Start() { try { int bufferSize = AppServer.Config.ReceiveBufferSize; if (bufferSize <= 0) bufferSize = 1024 * 4; m_BufferManager = new BufferManager(bufferSize * AppServer.Config.MaxConnectionNumber, bufferSize); try { m_BufferManager.InitBuffer(); } catch (Exception e) { AppServer.Logger.Error("Failed to allocate buffer for async socket communication, may because there is no enough memory, please decrease maxConnectionNumber in configuration!", e); return false; } // preallocate pool of SocketAsyncEventArgs objects SocketAsyncEventArgs socketEventArg; var socketArgsProxyList = new List<SocketAsyncEventArgsProxy>(AppServer.Config.MaxConnectionNumber); for (int i = 0; i < AppServer.Config.MaxConnectionNumber; i++) { //Pre-allocate a set of reusable SocketAsyncEventArgs socketEventArg = new SocketAsyncEventArgs(); m_BufferManager.SetBuffer(socketEventArg); socketArgsProxyList.Add(new SocketAsyncEventArgsProxy(socketEventArg)); } m_ReadWritePool = new ConcurrentStack<SocketAsyncEventArgsProxy>(socketArgsProxyList); if (!base.Start()) return false; IsRunning = true; return true; } catch (Exception e) { AppServer.Logger.Error(e); return false; } }
SocketAsyncEventArgs的代理
維護着一個SocketAsyncEventArgs對象,並訂閱了該對象的Completed事件(異步完成事件)
IsRecyclable:是否能夠循環使用
OrigOffset:原始偏移量
每當異步完成的時候調用SocketAsyncEventArgs實例中的UserToken屬性,該屬性實際上保存着SocketSession實例,並調用SocketSession的ProcessReceive()和AsyncRun()方法;socketSession.AsyncRun(() => socketSession.ProcessReceive(e));
UserToken屬性是在SocketAsyncEventArgsProxy的初始化方法中定義的
public void Initialize(IAsyncSocketSession socketSession) { SocketEventArgs.UserToken = socketSession; }
代理模式
引導配置文件並經過配置實例化各個server和factory,在CreateWorkItemInstance方法經過Activator.CreateInstance(serviceType)實例化
監聽類,由三個事件:監聽錯誤,監聽中止,新的客戶端鏈接
m_ListrnSocket:監聽Socket
配置文件載入 LoadResult,載入配置的connectionFilter,logfactory,commandloaderfactory,將appserver轉化成IworkItem接口,
此類建立一個大緩衝區,該緩衝區能夠分配給每一個套接字I / O操做使用,並分配給SocketAsyncEventArgs對象。 這使得bufffer能夠輕鬆地重用,而且能夠防止堆內存碎片化。
BufferManager類上公開的操做不是線程安全的。我以爲這個類不須要線程安全,由於每一個socket得到數據基本不會併發執行。
主要提供兩個方法:一個是SetBuffer和FreeBuffer
SetBuffer:
FreeBuffer:
方法:
IndexOf:T在全部緩存中的索引
維護ArraySegment<byte>[] globalQueue, globalQueue中包含着全部全部緩存
入棧,出戰,開始入棧,開始出棧。
全部的發送隊列內存片組成一個大的arraysegment,由SendingQueueSourceCreator建立,並由SmartPool維護
實際就是SmartPoolSourceCreator,發送隊列建立者,默認有5個發送隊列,其實每一個鏈接一個發送隊列,這邊的全部sendingQueue組數是由SmartPool維護的
m_SendingQueueSize:發送隊列大小,默認爲5
/// <summary> /// Creates the specified size. /// </summary> /// <param name="size">The size.</param> /// <param name="poolItems">The pool items.</param> /// <returns></returns> public ISmartPoolSource Create(int size, out SendingQueue[] poolItems) { var source = new ArraySegment<byte>[size * m_SendingQueueSize];//256*5 poolItems = new SendingQueue[size];//size=256 for (var i = 0; i < size; i++) { poolItems[i] = new SendingQueue(source, i * m_SendingQueueSize, m_SendingQueueSize);//SendingQueue中的source是全部的隊列緩存,發送隊列偏移量和發送隊列容量 } return new SmartPoolSource(source, size); }
其中維護了一個T(實際是SendingQueue)線程安全棧(m_GlobalStack)。由此看出SmartPool就是SendingQueue的池
m_MinPoolSize:Math.Max(config.MaxConnectionNumber / 6, 256)
m_MaxPoolSize:Math.Max(config.MaxConnectionNumber * 2, 256)
m_SourceCreator:new SendingQueueSourceCreator(config.SendingQueueSize)
m_ItemsSource:保存着SmartPoolSource[]對象,該對象其實是全部的sendingqueue緩存。
m_GlobalStack:保存着單個SendingQueuep對象的數組
Initialize():初始化函數,初始化上面的變量
維護全部的發送隊列緩存,並保存sendingQueue的個數
Source:是object類型,其實是ArraySegment<byte>[],其實是全部的sendingqueue的緩存,大小爲size*sendingqueuesize=256*5
,
Count:爲默認值5
表示異步套接字操做。
在訂閱了NewRequestReceived事件以後,該事件會有兩個參數,一個是appsession,一個是requestinfo,
appsession和socketsession完成,
在appsession的InteralSend函數中對sendtimeout進行限制。
在socketsession中將消息壓入消息棧對消息進行校驗,最終是經過socket.send和socket.sendasync兩個方法將消息發送。
先調用stop再調用close
socketserver的stop,釋放m_ReadWritePool中全部SocketAsyncEventArgs,全部listener的stop,釋放其SocketAsyncEventArgs
socket'session的closed,回收全部sendingqqueue到pool中.SuperSocket Layers](LearnSuperSocket.assets/layermodel-1579317752168.jpg)
對AppServer和SocketSession的包裝
服務參數配置,在serverbase基類SetUp中建立
/// <summary> /// Setups with the specified ip and port. /// </summary> /// <param name="ip">The ip.</param> /// <param name="port">The port.</param> /// <param name="socketServerFactory">The socket server factory.</param> /// <param name="receiveFilterFactory">The Receive filter factory.</param> /// <param name="logFactory">The log factory.</param> /// <param name="connectionFilters">The connection filters.</param> /// <param name="commandLoaders">The command loaders.</param> /// <returns>return setup result</returns> public bool Setup(string ip, int port, ISocketServerFactory socketServerFactory = null, IReceiveFilterFactory<TRequestInfo> receiveFilterFactory = null, ILogFactory logFactory = null, IEnumerable<IConnectionFilter> connectionFilters = null, IEnumerable<ICommandLoader<ICommand<TAppSession, TRequestInfo>>> commandLoaders = null) { return Setup(new ServerConfig { Ip = ip, Port = port }, socketServerFactory, receiveFilterFactory, logFactory, connectionFilters, commandLoaders); }
類圖
監聽節點
/// <summary> /// Tries to load commands. /// </summary> /// <param name="commands">The commands.</param> /// <returns></returns> public override bool TryLoadCommands(out IEnumerable<TCommand> commands) { commands = null; var commandAssemblies = new List<Assembly>(); if (m_AppServer.GetType().Assembly != this.GetType().Assembly) commandAssemblies.Add(m_AppServer.GetType().Assembly); string commandAssembly = m_AppServer.Config.Options.GetValue("commandAssembly"); if (!string.IsNullOrEmpty(commandAssembly)) { OnError("The configuration attribute 'commandAssembly' is not in used, please try to use the child node 'commandAssemblies' instead!"); return false; } if (m_AppServer.Config.CommandAssemblies != null && m_AppServer.Config.CommandAssemblies.Any()) { try { var definedAssemblies = AssemblyUtil.GetAssembliesFromStrings(m_AppServer.Config.CommandAssemblies.Select(a => a.Assembly).ToArray()); if (definedAssemblies.Any()) commandAssemblies.AddRange(definedAssemblies); } catch (Exception e) { OnError(new Exception("Failed to load defined command assemblies!", e)); return false; } } if (!commandAssemblies.Any()) { commandAssemblies.Add(Assembly.GetEntryAssembly()); } var outputCommands = new List<TCommand>(); foreach (var assembly in commandAssemblies) { try { outputCommands.AddRange(assembly.GetImplementedObjectsByInterface<TCommand>()); } catch (Exception exc) { OnError(new Exception(string.Format("Failed to get commands from the assembly {0}!", assembly.FullName), exc)); return false; } } commands = outputCommands; return true; } }
m_CommandContainer:命令容器
m_CommandLoaders
m_ConnectionFilters
m_GlobalCommandFilters
m_Listeners
m_SocketServerFactory:在SetupBas
ReceiveFilterBase
類圖
在初始化裏對AppSession產生依賴,同時維護Socket和SmartPool(SendingQueue[]),由於維護着socket因此發送接收數據都是經過這個類。
方法
Initialize()方法:
TrySend()方法:參數:IList<ArraySegment<byte>> segments:將segments壓入sendingqueue隊列並調用StartSend最終是調用SendAsync或SendSync,這個是由子類實現。
在子類中維護SocketAsyncEventArgs
在初始化中若是同步發送就使用m_SocketEventArgSend,並OnSendingCompleted方法綁定其Completed事件
在SendAsync()方法中將SendingQueue實例給m_SocketEventArgSend的UserToken屬性,並調用m_SocketEventArgSend的SetBuffer和SendAsync方法,發送失敗也調用OnSendingCompleted
SocketAsyncProxy中的Completed事件中調用ProcessReceive方法,再調用this.AppSession.ProcessRequest(e.Buffer, e.Offset, e.BytesTransferred, true);
方法
/// <summary> /// Creates the socket server. /// </summary> /// <typeparam name="TRequestInfo">The type of the request info.</typeparam> /// <param name="appServer">The app server.</param> /// <param name="listeners">The listeners.</param> /// <param name="config">The config.</param> /// <returns></returns> public ISocketServer CreateSocketServer<TRequestInfo>(IAppServer appServer, ListenerInfo[] listeners, IServerConfig config) where TRequestInfo : IRequestInfo { if (appServer == null) throw new ArgumentNullException("appServer"); if (listeners == null) throw new ArgumentNullException("listeners"); if (config == null) throw new ArgumentNullException("config"); switch(config.Mode) { case(SocketMode.Tcp): return new AsyncSocketServer(appServer, listeners); case(SocketMode.Udp): return new UdpSocketServer<TRequestInfo>(appServer, listeners); default: throw new NotSupportedException("Unsupported SocketMode:" + config.Mode); } }
構造函數,父類
public TcpSocketServerBase(IAppServer appServer, ListenerInfo[] listeners) : base(appServer, listeners) { var config = appServer.Config; uint dummy = 0; m_KeepAliveOptionValues = new byte[Marshal.SizeOf(dummy) * 3]; m_KeepAliveOptionOutValues = new byte[m_KeepAliveOptionValues.Length]; //whether enable KeepAlive BitConverter.GetBytes((uint)1).CopyTo(m_KeepAliveOptionValues, 0); //how long will start first keep alive BitConverter.GetBytes((uint)(config.KeepAliveTime * 1000)).CopyTo(m_KeepAliveOptionValues, Marshal.SizeOf(dummy)); //keep alive interval BitConverter.GetBytes((uint)(config.KeepAliveInterval * 1000)).CopyTo(m_KeepAliveOptionValues, Marshal.SizeOf(dummy) * 2); m_SendTimeOut = config.SendTimeOut; m_ReceiveBufferSize = config.ReceiveBufferSize; m_SendBufferSize = config.SendBufferSize; }
public override bool Start() { try { int bufferSize = AppServer.Config.ReceiveBufferSize; if (bufferSize <= 0) bufferSize = 1024 * 4; m_BufferManager = new BufferManager(bufferSize * AppServer.Config.MaxConnectionNumber, bufferSize); try { m_BufferManager.InitBuffer(); } catch (Exception e) { AppServer.Logger.Error("Failed to allocate buffer for async socket communication, may because there is no enough memory, please decrease maxConnectionNumber in configuration!", e); return false; } // preallocate pool of SocketAsyncEventArgs objects SocketAsyncEventArgs socketEventArg; var socketArgsProxyList = new List<SocketAsyncEventArgsProxy>(AppServer.Config.MaxConnectionNumber); for (int i = 0; i < AppServer.Config.MaxConnectionNumber; i++) { //Pre-allocate a set of reusable SocketAsyncEventArgs socketEventArg = new SocketAsyncEventArgs(); m_BufferManager.SetBuffer(socketEventArg); socketArgsProxyList.Add(new SocketAsyncEventArgsProxy(socketEventArg)); } m_ReadWritePool = new ConcurrentStack<SocketAsyncEventArgsProxy>(socketArgsProxyList); if (!base.Start()) return false; IsRunning = true; return true; } catch (Exception e) { AppServer.Logger.Error(e); return false; } }
SocketAsyncEventArgs的代理
維護着一個SocketAsyncEventArgs對象,並訂閱了該對象的Completed事件(異步完成事件)
IsRecyclable:是否能夠循環使用
OrigOffset:原始偏移量
每當異步完成的時候調用SocketAsyncEventArgs實例中的UserToken屬性,該屬性實際上保存着SocketSession實例,並調用SocketSession的ProcessReceive()和AsyncRun()方法;socketSession.AsyncRun(() => socketSession.ProcessReceive(e));
UserToken屬性是在SocketAsyncEventArgsProxy的初始化方法中定義的
public void Initialize(IAsyncSocketSession socketSession) { SocketEventArgs.UserToken = socketSession; }
代理模式
引導配置文件並經過配置實例化各個server和factory,在CreateWorkItemInstance方法經過Activator.CreateInstance(serviceType)實例化
監聽類,由三個事件:監聽錯誤,監聽中止,新的客戶端鏈接
m_ListrnSocket:監聽Socket
配置文件載入 LoadResult,載入配置的connectionFilter,logfactory,commandloaderfactory,將appserver轉化成IworkItem接口,
此類建立一個大緩衝區,該緩衝區能夠分配給每一個套接字I / O操做使用,並分配給SocketAsyncEventArgs對象。 這使得bufffer能夠輕鬆地重用,而且能夠防止堆內存碎片化。
BufferManager類上公開的操做不是線程安全的。我以爲這個類不須要線程安全,由於每一個socket得到數據基本不會併發執行。
主要提供兩個方法:一個是SetBuffer和FreeBuffer
SetBuffer:
FreeBuffer:
方法:
IndexOf:T在全部緩存中的索引
維護ArraySegment<byte>[] globalQueue, globalQueue中包含着全部全部緩存
入棧,出戰,開始入棧,開始出棧。
全部的發送隊列內存片組成一個大的arraysegment,由SendingQueueSourceCreator建立,並由SmartPool維護
實際就是SmartPoolSourceCreator,發送隊列建立者,默認有5個發送隊列,其實每一個鏈接一個發送隊列,這邊的全部sendingQueue組數是由SmartPool維護的
m_SendingQueueSize:發送隊列大小,默認爲5
/// <summary> /// Creates the specified size. /// </summary> /// <param name="size">The size.</param> /// <param name="poolItems">The pool items.</param> /// <returns></returns> public ISmartPoolSource Create(int size, out SendingQueue[] poolItems) { var source = new ArraySegment<byte>[size * m_SendingQueueSize];//256*5 poolItems = new SendingQueue[size];//size=256 for (var i = 0; i < size; i++) { poolItems[i] = new SendingQueue(source, i * m_SendingQueueSize, m_SendingQueueSize);//SendingQueue中的source是全部的隊列緩存,發送隊列偏移量和發送隊列容量 } return new SmartPoolSource(source, size); }
其中維護了一個T(實際是SendingQueue)線程安全棧(m_GlobalStack)。由此看出SmartPool就是SendingQueue的池
m_MinPoolSize:Math.Max(config.MaxConnectionNumber / 6, 256)
m_MaxPoolSize:Math.Max(config.MaxConnectionNumber * 2, 256)
m_SourceCreator:new SendingQueueSourceCreator(config.SendingQueueSize)
m_ItemsSource:保存着SmartPoolSource[]對象,該對象其實是全部的sendingqueue緩存。
m_GlobalStack:保存着單個SendingQueuep對象的數組
Initialize():初始化函數,初始化上面的變量
維護全部的發送隊列緩存,並保存sendingQueue的個數
Source:是object類型,其實是ArraySegment<byte>[],其實是全部的sendingqueue的緩存,大小爲size*sendingqueuesize=256*5
,
Count:爲默認值5
表示異步套接字操做。
在訂閱了NewRequestReceived事件以後,該事件會有兩個參數,一個是appsession,一個是requestinfo,
appsession和socketsession完成,
在appsession的InteralSend函數中對sendtimeout進行限制。
在socketsession中將消息壓入消息棧對消息進行校驗,最終是經過socket.send和socket.sendasync兩個方法將消息發送。
先調用stop再調用close
socketserver的stop,釋放m_ReadWritePool中全部SocketAsyncEventArgs,全部listener的stop,釋放其SocketAsyncEventArgs
socket'session的closed,回收全部sendingqqueue到pool中.SuperSocket Layers](LearnSuperSocket.assets/layermodel-1579317752168.jpg)
對AppServer和SocketSession的包裝
服務參數配置,在serverbase基類SetUp中建立
/// <summary> /// Setups with the specified ip and port. /// </summary> /// <param name="ip">The ip.</param> /// <param name="port">The port.</param> /// <param name="socketServerFactory">The socket server factory.</param> /// <param name="receiveFilterFactory">The Receive filter factory.</param> /// <param name="logFactory">The log factory.</param> /// <param name="connectionFilters">The connection filters.</param> /// <param name="commandLoaders">The command loaders.</param> /// <returns>return setup result</returns> public bool Setup(string ip, int port, ISocketServerFactory socketServerFactory = null, IReceiveFilterFactory<TRequestInfo> receiveFilterFactory = null, ILogFactory logFactory = null, IEnumerable<IConnectionFilter> connectionFilters = null, IEnumerable<ICommandLoader<ICommand<TAppSession, TRequestInfo>>> commandLoaders = null) { return Setup(new ServerConfig { Ip = ip, Port = port }, socketServerFactory, receiveFilterFactory, logFactory, connectionFilters, commandLoaders); }
類圖
監聽節點
/// <summary> /// Tries to load commands. /// </summary> /// <param name="commands">The commands.</param> /// <returns></returns> public override bool TryLoadCommands(out IEnumerable<TCommand> commands) { commands = null; var commandAssemblies = new List<Assembly>(); if (m_AppServer.GetType().Assembly != this.GetType().Assembly) commandAssemblies.Add(m_AppServer.GetType().Assembly); string commandAssembly = m_AppServer.Config.Options.GetValue("commandAssembly"); if (!string.IsNullOrEmpty(commandAssembly)) { OnError("The configuration attribute 'commandAssembly' is not in used, please try to use the child node 'commandAssemblies' instead!"); return false; } if (m_AppServer.Config.CommandAssemblies != null && m_AppServer.Config.CommandAssemblies.Any()) { try { var definedAssemblies = AssemblyUtil.GetAssembliesFromStrings(m_AppServer.Config.CommandAssemblies.Select(a => a.Assembly).ToArray()); if (definedAssemblies.Any()) commandAssemblies.AddRange(definedAssemblies); } catch (Exception e) { OnError(new Exception("Failed to load defined command assemblies!", e)); return false; } } if (!commandAssemblies.Any()) { commandAssemblies.Add(Assembly.GetEntryAssembly()); } var outputCommands = new List<TCommand>(); foreach (var assembly in commandAssemblies) { try { outputCommands.AddRange(assembly.GetImplementedObjectsByInterface<TCommand>()); } catch (Exception exc) { OnError(new Exception(string.Format("Failed to get commands from the assembly {0}!", assembly.FullName), exc)); return false; } } commands = outputCommands; return true; } }
m_CommandContainer:命令容器
m_CommandLoaders
m_ConnectionFilters
m_GlobalCommandFilters
m_Listeners
m_SocketServerFactory:在SetupBas
ReceiveFilterBase
類圖
在初始化裏對AppSession產生依賴,同時維護Socket和SmartPool(SendingQueue[]),由於維護着socket因此發送接收數據都是經過這個類。
方法
Initialize()方法:
TrySend()方法:參數:IList<ArraySegment<byte>> segments:將segments壓入sendingqueue隊列並調用StartSend最終是調用SendAsync或SendSync,這個是由子類實現。
在子類中維護SocketAsyncEventArgs
在初始化中若是同步發送就使用m_SocketEventArgSend,並OnSendingCompleted方法綁定其Completed事件
在SendAsync()方法中將SendingQueue實例給m_SocketEventArgSend的UserToken屬性,並調用m_SocketEventArgSend的SetBuffer和SendAsync方法,發送失敗也調用OnSendingCompleted
SocketAsyncProxy中的Completed事件中調用ProcessReceive方法,再調用this.AppSession.ProcessRequest(e.Buffer, e.Offset, e.BytesTransferred, true);
方法
/// <summary> /// Creates the socket server. /// </summary> /// <typeparam name="TRequestInfo">The type of the request info.</typeparam> /// <param name="appServer">The app server.</param> /// <param name="listeners">The listeners.</param> /// <param name="config">The config.</param> /// <returns></returns> public ISocketServer CreateSocketServer<TRequestInfo>(IAppServer appServer, ListenerInfo[] listeners, IServerConfig config) where TRequestInfo : IRequestInfo { if (appServer == null) throw new ArgumentNullException("appServer"); if (listeners == null) throw new ArgumentNullException("listeners"); if (config == null) throw new ArgumentNullException("config"); switch(config.Mode) { case(SocketMode.Tcp): return new AsyncSocketServer(appServer, listeners); case(SocketMode.Udp): return new UdpSocketServer<TRequestInfo>(appServer, listeners); default: throw new NotSupportedException("Unsupported SocketMode:" + config.Mode); } }
構造函數,父類
public TcpSocketServerBase(IAppServer appServer, ListenerInfo[] listeners) : base(appServer, listeners) { var config = appServer.Config; uint dummy = 0; m_KeepAliveOptionValues = new byte[Marshal.SizeOf(dummy) * 3]; m_KeepAliveOptionOutValues = new byte[m_KeepAliveOptionValues.Length]; //whether enable KeepAlive BitConverter.GetBytes((uint)1).CopyTo(m_KeepAliveOptionValues, 0); //how long will start first keep alive BitConverter.GetBytes((uint)(config.KeepAliveTime * 1000)).CopyTo(m_KeepAliveOptionValues, Marshal.SizeOf(dummy)); //keep alive interval BitConverter.GetBytes((uint)(config.KeepAliveInterval * 1000)).CopyTo(m_KeepAliveOptionValues, Marshal.SizeOf(dummy) * 2); m_SendTimeOut = config.SendTimeOut; m_ReceiveBufferSize = config.ReceiveBufferSize; m_SendBufferSize = config.SendBufferSize; }
public override bool Start() { try { int bufferSize = AppServer.Config.ReceiveBufferSize; if (bufferSize <= 0) bufferSize = 1024 * 4; m_BufferManager = new BufferManager(bufferSize * AppServer.Config.MaxConnectionNumber, bufferSize); try { m_BufferManager.InitBuffer(); } catch (Exception e) { AppServer.Logger.Error("Failed to allocate buffer for async socket communication, may because there is no enough memory, please decrease maxConnectionNumber in configuration!", e); return false; } // preallocate pool of SocketAsyncEventArgs objects SocketAsyncEventArgs socketEventArg; var socketArgsProxyList = new List<SocketAsyncEventArgsProxy>(AppServer.Config.MaxConnectionNumber); for (int i = 0; i < AppServer.Config.MaxConnectionNumber; i++) { //Pre-allocate a set of reusable SocketAsyncEventArgs socketEventArg = new SocketAsyncEventArgs(); m_BufferManager.SetBuffer(socketEventArg); socketArgsProxyList.Add(new SocketAsyncEventArgsProxy(socketEventArg)); } m_ReadWritePool = new ConcurrentStack<SocketAsyncEventArgsProxy>(socketArgsProxyList); if (!base.Start()) return false; IsRunning = true; return true; } catch (Exception e) { AppServer.Logger.Error(e); return false; } }
SocketAsyncEventArgs的代理
維護着一個SocketAsyncEventArgs對象,並訂閱了該對象的Completed事件(異步完成事件)
IsRecyclable:是否能夠循環使用
OrigOffset:原始偏移量
每當異步完成的時候調用SocketAsyncEventArgs實例中的UserToken屬性,該屬性實際上保存着SocketSession實例,並調用SocketSession的ProcessReceive()和AsyncRun()方法;socketSession.AsyncRun(() => socketSession.ProcessReceive(e));
UserToken屬性是在SocketAsyncEventArgsProxy的初始化方法中定義的
public void Initialize(IAsyncSocketSession socketSession) { SocketEventArgs.UserToken = socketSession; }
代理模式
引導配置文件並經過配置實例化各個server和factory,在CreateWorkItemInstance方法經過Activator.CreateInstance(serviceType)實例化
監聽類,由三個事件:監聽錯誤,監聽中止,新的客戶端鏈接
m_ListrnSocket:監聽Socket
配置文件載入 LoadResult,載入配置的connectionFilter,logfactory,commandloaderfactory,將appserver轉化成IworkItem接口,
此類建立一個大緩衝區,該緩衝區能夠分配給每一個套接字I / O操做使用,並分配給SocketAsyncEventArgs對象。 這使得bufffer能夠輕鬆地重用,而且能夠防止堆內存碎片化。
BufferManager類上公開的操做不是線程安全的。我以爲這個類不須要線程安全,由於每一個socket得到數據基本不會併發執行。
主要提供兩個方法:一個是SetBuffer和FreeBuffer
SetBuffer:
FreeBuffer:
方法:
IndexOf:T在全部緩存中的索引
維護ArraySegment<byte>[] globalQueue, globalQueue中包含着全部全部緩存
入棧,出戰,開始入棧,開始出棧。
全部的發送隊列內存片組成一個大的arraysegment,由SendingQueueSourceCreator建立,並由SmartPool維護
實際就是SmartPoolSourceCreator,發送隊列建立者,默認有5個發送隊列,其實每一個鏈接一個發送隊列,這邊的全部sendingQueue組數是由SmartPool維護的
m_SendingQueueSize:發送隊列大小,默認爲5
/// <summary> /// Creates the specified size. /// </summary> /// <param name="size">The size.</param> /// <param name="poolItems">The pool items.</param> /// <returns></returns> public ISmartPoolSource Create(int size, out SendingQueue[] poolItems) { var source = new ArraySegment<byte>[size * m_SendingQueueSize];//256*5 poolItems = new SendingQueue[size];//size=256 for (var i = 0; i < size; i++) { poolItems[i] = new SendingQueue(source, i * m_SendingQueueSize, m_SendingQueueSize);//SendingQueue中的source是全部的隊列緩存,發送隊列偏移量和發送隊列容量 } return new SmartPoolSource(source, size); }
其中維護了一個T(實際是SendingQueue)線程安全棧(m_GlobalStack)。由此看出SmartPool就是SendingQueue的池
m_MinPoolSize:Math.Max(config.MaxConnectionNumber / 6, 256)
m_MaxPoolSize:Math.Max(config.MaxConnectionNumber * 2, 256)
m_SourceCreator:new SendingQueueSourceCreator(config.SendingQueueSize)
m_ItemsSource:保存着SmartPoolSource[]對象,該對象其實是全部的sendingqueue緩存。
m_GlobalStack:保存着單個SendingQueuep對象的數組
Initialize():初始化函數,初始化上面的變量
維護全部的發送隊列緩存,並保存sendingQueue的個數
Source:是object類型,其實是ArraySegment<byte>[],其實是全部的sendingqueue的緩存,大小爲size*sendingqueuesize=256*5
,
Count:爲默認值5
表示異步套接字操做。
在訂閱了NewRequestReceived事件以後,該事件會有兩個參數,一個是appsession,一個是requestinfo,
appsession和socketsession完成,
在appsession的InteralSend函數中對sendtimeout進行限制。
在socketsession中將消息壓入消息棧對消息進行校驗,最終是經過socket.send和socket.sendasync兩個方法將消息發送。
先調用stop再調用close
socketserver的stop,釋放m_ReadWritePool中全部SocketAsyncEventArgs,全部listener的stop,釋放其SocketAsyncEventArgs
socket'session的closed,回收全部sendingqqueue到pool中.