高性能的 socket 通信服務器(完成端口模型--IOCP)

不少人費盡心思,都沒有找到一個完美的 I/O CP 例程,甚至跟人於誤解,先將本人編寫的例程公佈出來,但願對那些苦苦尋覓的人帶來收穫。本例程能夠做爲初學者的學習之用,亦能夠做爲大型服務程序的通信模塊。其處理速度能夠說,優化到了極點。若是理解了本例程的精髓,加上一個高效的通信協議,你徹底能夠用它來構建一個高性能的通信服務器。  
  
在公佈代碼前,先談談I/O CP。對I/O CP的函數很少作說明了,網上不少,都同樣。在此本人僅說一些技術上要注意的問題。  
  
1、如何管理內存  
一、IO數據緩衝管理  
動態分配內存,是一種靈活的方式。但對於系統資源浪費是巨大的。所以本人採用的是預先分配服務器最大須要的內存,用鏈表來管理。任什麼時候候分配交還都不須要遍歷,僅須要互斥而已。  
更巧妙的是,將IO發送信息和內存塊有機的結合在一塊兒,減小了鏈表的管理工做。  
  
//IO操做標誌  
TIOFlag = (IO_ACCEPT, IO_READ, IO_WRITE);  
//IO操做信息  
PIOInfo =^ TIOInfo;  
TIOInfo = packed record  
Overlapped: TOverlapped; //重疊結構  
DataBuf: TWSABUF; //IO數據信息  
Socket: TSocket;  
Flag: TIOFlag;  
TickCountSend: DWord;  
Next: PIOInfo;  
Prior: PIOInfo;  
end;  
  
PUNode =^ TUNode;  
TUNode = record  
Next: Pointer;  
end;  
  
PIOMem =^ TIOMem;  
TIOMem = packed record  
IOInfo: TIOInfo;  
Data: array[1..IO_MEM_SIZE] of Byte;  
//申請內存的時候,返回的是Data的地址  
end;  
  
二、鏈路數據管理  
採用雙向鏈表結構,減小刪除節點時遍歷消耗的時間  
  
//每一個鏈接的信息  
PLink =^ TLink;  
TLink = record  
Socket: TSocket;  
RemoteIP: string[30];  
RemotePort: DWord;  
//最後收到數據時的系統節拍  
TickCountActive: DWord;  
//處理該鏈接的當前線程的信息  
Worker: PWorker;  
Data: Pointer; //應用層能夠設置這個成員,當OnReceive的時候,就不要每次遍歷每一個鏈接對應的數據區了  
Section: TRTLCriticalSection;  
Next: PLink;  
Prior: PLink;  
end;  
  
2、如何管理線程  
每一個工做線程建立的時候,調用:OnWorkerThreadCreateEvt,該函數能夠返回這個線程對應的信息,好比爲該線程建立的數據庫鏈接控件或對應的類等,在OnReceive的能夠從Link的Worker訪問該成員Worker^.Data。  
  
//工做線程信息  
PWorker =^ TWorker;  
TWorker = record  
ID: THandle;  
CompletionPort: THandle;  
Data: Pointer; //調用OnWorkerThreadCreateEvt返回的值  
//用於反應工做狀況的數據  
TickCountLong,  
TickCountActive: DWord;  
ExecCount: Integer;  
//線程完成後設置  
Finished: THandle;  
Next: PWorker;  
end;  
  
同理,服務線程也是具備同樣的特色。相見源碼。  
  
關於線程同步,一直是衆多程序頭疼的問題。在本例程中,儘可能避免了過多的互斥,並有效地防止了死鎖現象。用RTLCriticalSection,稍微不注意,就會形成死鎖的災難。哪怕是兩行代碼的差異,對多線程而言都是災難的。在本例程中,對數據同步須要操做的是在維護鏈路鏈表方面上。服務線程須要計算哪一個鏈接空閒超時了,工做線程須要處理斷線狀況,應用層主動發送數據時須要對該鏈路獨佔,不然一個在發送,一個在處理斷線故障,就會發送衝突,致使災難後果。  
  
在本人的壓力測試中,已經有效的解決了這個問題,應用層部分不須要作什麼同步工做,能夠安心的收發數據了。同時每一個線程都支持了數據庫鏈接。  
  
3、到底要建立多少個工做線程合適  
不少文章說,有N個CPU就建立N個線程,也有說N*2+2。最不喜歡說話不負責任的人了,本例程可讓剛入門 I/O CP 的人對它有更深刻的瞭解。  
例程測試結果:  
  
4、該不應使用類  
有人說,拋棄一切類,對於服務器而言,會爲類付出不少代價,從個人觀點看,爲類付出代價的,主要是動態建立的緣由。其實,類成員訪問和結構成員訪問同樣,須要相對地址。若是都是預先建立的,二者沒有多大的差異。本例程採用裸奔函數的方式,固然在應用層能夠採用類來管理,很難想象,若是沒有沒有類,須要多作多少工做。  
  
5、缺點  
不能發大數據包,只能發不超過固定數的數據包。但對於小數據報而言,它將是優秀的。  
  
時間緣由,不能作太多的解釋和對代碼作太多的註釋,須要例程源碼的能夠和本人聯繫,免費提供。QQ:48092788  
  
例程源碼:  
http://d.download.csdn.net/down/1546336/guestcode  
  
完成端口通信服務模塊源碼:  
{******************************************************************************  
* UCode 系列組件、控件 *  
* 做者:盧益貴 2003~2009 *  
* 版權全部 任何未經受權的使用和銷售,均保留追究法律責任的權力 *  
* *  
* UCode 系列由XCtrls-YCtrls-ICtrls-NCode系列演變而來 *  
* QQ:48092788 luyigui.blog.gxsky.com *  
******************************************************************************}  
{******************************************************************************  
完成端口模型的socket服務器  
******************************************************************************}  
unit UTcpServer;  
interface  
uses  
Windows, Classes, UClasses, UWinSock2;  
const  
//每一個IO緩衝區的大小  
IO_MEM_SIZE = 2048;  
//內存要足夠用,可視狀況設置  
IO_MEM_MAX_COUNT = 1000 * 10;  
//最大鏈接數  
SOCK_MAX_COUNT = 3000;  
//鏈接空閒實現,超過這個時間未收到客戶端數據則關閉  
SOCK_IDLE_OVERTIME = 60;  
type  
//工做線程信息  
PWorker =^ TWorker;  
TWorker = record  
ID: THandle;  
CompletionPort: THandle;  
Data: Pointer;  
//用於反應工做狀況的數據  
TickCountLong,  
TickCountActive: DWord;  
ExecCount: Integer;  
//線程完成後設置  
Finished: THandle;  
Next: PWorker;  
end;  
//每一個鏈接的信息  
PLink =^ TLink;  
TLink = record  
Socket: TSocket;  
RemoteIP: string[30];  
RemotePort: DWord;  
//最後收到數據時的系統節拍  
TickCountActive: DWord;  
//處理該鏈接的當前線程的信息  
Worker: PWorker;  
Data: Pointer;  
Section: TRTLCriticalSection;  
Next: PLink;  
Prior: PLink;  
end;  
TOnLinkIdleOvertimeEvt = procedure(Link: PLink);  
TOnDisconnectEvt = procedure(Link: PLink);  
TOnReceiveEvt = function(Link: PLink; Buf: PByte; Len: Integer): Boolean;  
TOnThreadCreateEvt = function(IsWorkerThread: Boolean): Pointer;  
//取得鏈路鏈表使用狀況X%  
function GetLinkUse(): real;  
//鏈路鏈表所佔內存  
function GetLinkSize(): Integer;  
//當前鏈路數  
function GetLinkCount(): Integer;  
//空閒鏈路數  
function GetLinkFree(): Integer;  
//IO內存使用狀況  
function GetIOMemUse(): Real;  
//IO內存鏈表佔內存數  
function GetIOMemSize(): Integer;  
//IO內存空閒數  
function GetIOMemFree(): Integer;  
//交還一個IO內存  
procedure FreeIOMem(Mem: Pointer);  
//獲取一個IO內存區  
function GetIOMem(): Pointer;  
//獲取工做線程的工做狀況  
function GetWorkerExecInfo(Index: Integer; var TickCount: DWord): Integer;  
//獲取工做線程的ID  
function GetWorkerID(Index: Integer): Integer;  
//獲取工做線程數量  
function GetWorkerCount(): Integer;  
//打開一個IP端口,並監聽  
function StartTcpServer(RemoteIP: String; RemotePort: DWord): Boolean;  
//中止並關閉一個IP端口  
function StopTcpServer(): Boolean;  
//設置響應事件的函數指針,在StartTcpServer以前調用  
procedure SetEventProc(OnReceive: TOnReceiveEvt;  
OnDisconnect: TOnDisconnectEvt;  
OnLinkIdleOvertime: TOnLinkIdleOvertimeEvt;  
OnServerThreadCreate: TOnThreadCreateEvt;  
OnWorkerThreadCreate: TOnThreadCreateEvt);  
//寫日誌文件  
procedure WriteLog(Log: String);  
function PostRecv(Link: PLink; IOMem: Pointer): Boolean;  
//拋出一個發送事件  
function PostSend(Link: PLink; IOMem: Pointer; Len: Integer): Boolean;  
//廣播數據到全部的鏈路對方  
procedure PostBroadcast(Buf: PByte; Len: Integer);  
//當前是否打開  
function IsTcpServerActive(): Boolean;  
//獲取服務線程最後一次工做所佔的時間(MS)  
function GetServerExecLong(): DWord;  
//獲取服務線程工做次數  
function GetServerExecCount(): Integer;  
//獲取本地或對外IP地址  
function GetLocalIP(IsIntnetIP: Boolean): String;  
implementation  
uses  
IniFiles, SysUtils, ActiveX;  
var  
ExePath: String = '';  
const  
HEAP_NO_SERIALIZE = 1; {非互斥, 此標記可容許多個線程同時訪問此堆}  
HEAP_GENERATE_EXCEPTIONS = 4; {當創建堆出錯時, 此標記可激發一個異常並返回異常標識}  
HEAP_ZERO_MEMORY = 8; {把分配的內存初始化爲 0}  
HEAP_REALLOC_IN_PLACE_ONLY = 16; {此標記不容許改變原來的內存位置}  
STATUS_ACCESS_VIOLATION = DWORD($C0000005); {參數錯誤}  
STATUS_NO_MEMORY = DWORD($C0000017); {內存不足}  
{===============================================================================  
IO內存管理  
================================================================================}  
type  
//IO操做標誌  
TIOFlag = (IO_ACCEPT, IO_READ, IO_WRITE);  
//IO操做信息  
PIOInfo =^ TIOInfo;  
TIOInfo = packed record  
Overlapped: TOverlapped; //重疊結構  
DataBuf: TWSABUF; //IO數據信息  
Socket: TSocket;  
Flag: TIOFlag;  
TickCountSend: DWord;  
Next: PIOInfo;  
Prior: PIOInfo;  
end;  
  
PUNode =^ TUNode;  
TUNode = record  
Next: Pointer;  
end;  
  
PIOMem =^ TIOMem;  
TIOMem = packed record  
IOInfo: TIOInfo;  
Data: array[1..IO_MEM_SIZE] of Byte;  
end;  
var  
IOMemHead: PIOMem = nil;  
IOMemLast: PIOMem = nil;  
IOMemUse: Integer = 0;  
IOMemSec: TRTLCriticalSection;  
IOMemList: array[1..IO_MEM_MAX_COUNT] of Pointer;  
function GetIOMem(): Pointer;  
begin  
//內存要足夠用,若是不夠,即便是動態分配,神仙也救不了  
EnterCriticalSection(IOMemSec);  
try  
try  
Result := @(IOMemHead^.Data);  
IOMemHead := PUNode(IOMemHead)^.Next;  
IOMemUse := IOMemUse + 1;  
except  
Result := nil;  
WriteLog('GetIOMem: error');  
end;  
finally  
LeaveCriticalSection(IOMemSec);  
end;  
end;  
procedure FreeIOMem(Mem: Pointer);  
begin  
EnterCriticalSection(IOMemSec);  
try  
try  
Mem := Pointer(Integer(Mem) - sizeof(TIOInfo));  
PUNode(Mem).Next := nil;  
PUNode(IOMemLast)^.Next := Mem;  
IOMemLast := Mem;  
IOMemUse := IOMemUse - 1;  
except  
WriteLog('FreeIOMem: error');  
end;  
finally  
LeaveCriticalSection(IOMemSec);  
end;  
end;  
procedure IniIOMem();  
var  
i: Integer;  
Heap: THandle;  
begin  
InitializeCriticalSection(IOMemSec);  
IOMemHead := HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(TIOMem));  
IOMemLast := IOMemHead;  
IOMemList[1] := IOMemHead;  
Heap := GetProcessHeap();  
for i := 2 to IO_MEM_MAX_COUNT do  
begin  
PUNode(IOMemLast)^.Next := HeapAlloc(Heap, HEAP_ZERO_MEMORY, sizeof(TIOMem));  
IOMemList[i] := PUNode(IOMemLast)^.Next;  
IOMemLast := PUNode(IOMemLast)^.Next;  
end;  
PUNode(IOMemLast).Next := nil;  
end;  
function GetIOMemFree(): Integer;  
var  
IOMems: PUNode;  
begin  
EnterCriticalSection(IOMemSec);  
Result := 0;  
IOMems := PUNode(IOMemHead);  
while IOMems nil do  
begin  
Result := Result + 1;  
IOMems := IOMems^.Next;  
end;  
LeaveCriticalSection(IOMemSec);  
end;  
procedure DeleteIOMem();  
var  
i: Integer;  
Heap: THandle;  
begin  
Heap := GetProcessHeap();  
for i := 1 to IO_MEM_MAX_COUNT do  
HeapFree(Heap, HEAP_NO_SERIALIZE, IOMemList[i]);  
IOMemUse := 0;  
DeleteCriticalSection(IOMemSec);  
end;  
function GetIOMemSize(): Integer;  
begin  
Result := IO_MEM_MAX_COUNT * sizeof(TIOMem);  
end;  
function GetIOMemUse(): Real;  
begin  
Result := (IOMemUse * 100) / IO_MEM_MAX_COUNT;  
end;  
{===============================================================================  
Socket鏈路管理  
================================================================================}  
procedure OnLinkIdleOvertimeDef(Link: PLink);  
begin  
end;  
var  
LinkHead: PLink = nil;  
LinkLast: PLink = nil;  
LinkUse: Integer = 0;  
LinkCount: Integer = 0;  
LinkSec: TRTLCriticalSection;  
LinkList: array[1..SOCK_MAX_COUNT] of PLink;  
OnLinkIdleOvertimeEvt: TOnLinkIdleOvertimeEvt = OnLinkIdleOvertimeDef;  
LinksHead: PLink = nil;  
LinksLast: PLink = nil;  
function GetLinkFree(): Integer;  
var  
Links: PLink;  
begin  
EnterCriticalSection(LinkSec);  
Result := 0;  
Links := LinkHead;  
while Links nil do  
begin  
Result := Result + 1;  
Links := Links^.Next;  
end;  
LeaveCriticalSection(LinkSec);  
end;  
function GetLink(): PLink;  
begin  
try  
//內存要足夠用,若是不夠,即便是動態分配,神仙也救不了  
Result := LinkHead;  
LinkHead := LinkHead^.Next;  
LinkUse := LinkUse + 1;  
LinkCount := LinkCount + 1;  
if LinksHead = nil then  
begin  
LinksHead := Result;  
LinksHead^.Next := nil;  
LinksHead^.Prior := nil;  
LinksLast := LinksHead;  
end else  
begin  
Result^.Prior := LinksLast;  
LinksLast^.Next := Result;  
LinksLast := Result;  
LinksLast^.Next := nil;  
end;  
with Result^ do  
begin  
Socket := INVALID_SOCKET;  
RemoteIP := '';  
RemotePort := 0;  
TickCountActive := GetTickCount();  
Worker := nil;  
Data := nil;  
end;  
except  
Result := nil;  
WriteLog('GetLink: error');  
end;  
end;  
procedure FreeLink(Link: PLink);  
begin  
try  
with Link^ do  
begin  
Link^.Worker := nil;  
if Link = LinksHead then  
begin  
LinksHead := Next;  
if LinksLast = Link then  
LinksLast := LinksHead  
else  
LinksHead^.Prior := nil;  
end else  
begin  
Prior^.Next := Next;  
if Next nil then  
Next^.Prior := Prior;  
if Link = LinksLast then  
LinksLast := Prior;  
end;  
Next := nil;  
LinkLast^.Next := Link;  
LinkLast := Link;  
LinkUse := LinkUse - 1;  
LinkCount := LinkCount - 1;  
end;  
except  
WriteLog('FreeLink: error');  
end;  
end;  
procedure CloseLink(Link: PLink);  
begin  
EnterCriticalSection(LinkSec);  
with Link^ do  
begin  
EnterCriticalSection(Section);  
if Socket INVALID_SOCKET then  
begin  
try  
CloseSocket(Socket);  
except  
WriteLog('CloseSocket: error');  
end;  
Socket := INVALID_SOCKET;  
FreeLink(Link);  
end;  
LeaveCriticalSection(Link^.Section);  
end;  
LeaveCriticalSection(LinkSec);  
end;  
procedure CheckLinkLinkIdleOvertime(Data: Pointer);  
var  
TickCount: DWord;  
Long: Integer;  
Link: PLink;  
begin  
EnterCriticalSection(LinkSec);  
try  
TickCount := GetTickCount();  
Link := LinksHead;  
while Link nil do  
with Link^ do  
begin  
EnterCriticalSection(Section);  
if Socket INVALID_SOCKET then  
begin  
if TickCount > TickCountActive then  
Long := TickCount - TickCountActive  
else  
Long := $FFFFFFFF - TickCountActive + TickCount;  
if SOCK_IDLE_OVERTIME * 1000 0 do  
i := i - 1;  
if not PostSend(Link, IOMem, Len) then  
FreeIOMem(IOMem);  
end;  
function OnWorkerThreadCreateDef(IsWorkerThread: Boolean): Pointer;  
begin  
Result := nil;  
end;  
var  
WorkerHead: PWorker = nil;  
WorkerCount: Integer = 0;  
OnDisconnectEvt: TOnDisconnectEvt = OnDisconnectDef;  
OnReceiveEvt: TOnReceiveEvt = OnReceiveDef;  
OnWorkerThreadCreateEvt: TOnThreadCreateEvt = OnWorkerThreadCreateDef;  
function GetWorkerCount(): Integer;  
begin  
Result := WorkerCount;  
end;  
function WorkerThread(Worker: PWorker): DWORD; stdcall;  
var  
Link: PLink;  
IOInfo: PIOInfo;  
Bytes: DWord;  
CompletionPort: THandle;  
begin  
Result := 0;  
CompletionPort := Worker^.CompletionPort;  
with Worker^ do  
begin  
TickCountActive := GetTickCount();  
TickCountLong := 0;  
ExecCount := 0;  
end;  
WriteLog(Format('Worker thread:%d begin', [Worker^.ID]));  
CoInitialize(nil);  
try  
while True do  
begin  
try  
with Worker^ do  
TickCountLong := TickCountLong + GetTickCount() - TickCountActive;  
  
if GetQueuedCompletionStatus(CompletionPort, Bytes, DWORD(Link), POverlapped(IOInfo), INFINITE) = False then  
begin  
if (Link nil) then  
with Link^ do  
begin  
EnterCriticalSection(LinkSec);  
EnterCriticalSection(Section);  
if Link^.Socket INVALID_SOCKET then  
begin  
try  
CloseSocket(Socket);  
except  
WriteLog(Format('CloseSocket1:%d error', [Worker^.ID]));  
end;  
Socket := INVALID_SOCKET;  
Link^.Worker := Worker;  
try  
OnDisconnectEvt(Link);  
except  
WriteLog(Format('OnDisconnectEvt1:%d error', [Worker^.ID]));  
end;  
Link^.Worker := nil;  
FreeLink(Link);  
end;  
LeaveCriticalSection(Section);  
LeaveCriticalSection(LinkSec);  
end;  
if IOInfo nil then  
FreeIOMem(IOInfo^.DataBuf.buf);  
WriteLog(Format('GetQueuedCompletionStatus:%d error', [Worker^.ID]));  
continue;  
end;  
  
with Worker^ do  
begin  
TickCountActive := GetTickCount();  
ExecCount := ExecCount + 1;  
end;  
if (Bytes = 0) then  
begin  
if (Link nil) then  
with Link^ do  
begin  
EnterCriticalSection(LinkSec);  
EnterCriticalSection(Section);  
if Link^.Socket INVALID_SOCKET then  
begin  
try  
CloseSocket(Socket);  
except  
WriteLog(Format('CloseSocket2:%d error', [Worker^.ID]));  
end;  
Socket := INVALID_SOCKET;  
Link^.Worker := Worker;  
try  
OnDisconnectEvt(Link);  
except  
WriteLog(Format('OnDisconnectEvt2:%d error', [Worker^.ID]));  
end;  
Link^.Worker := nil;  
FreeLink(Link);  
end;  
LeaveCriticalSection(Section);  
LeaveCriticalSection(LinkSec);  
if IOInfo.Flag = IO_WRITE then  
FreeIOMem(IOInfo^.DataBuf.buf)  
else  
FreeIOMem(IOInfo^.DataBuf.buf);  
continue;  
end else  
begin  
if IOInfo nil then  
FreeIOMem(IOInfo^.DataBuf.buf);  
break;  
end;  
end;  
  
if IOInfo.Flag = IO_WRITE then  
begin  
FreeIOMem(IOInfo^.DataBuf.buf);  
continue;  
end;  
  
{if IOInfo.Flag = IO_ACCEPT then  
begin  
......  
continue;  
end;}  
with Link^, IOInfo^.DataBuf do  
begin  
Link^.Worker := Worker;  
try  
OnReceiveEvt(Link, buf, Bytes);  
except  
WriteLog(Format('OnReceiveEvt:%d error', [Worker^.ID]));  
end;  
Link^.Worker := nil;  
TickCountActive := GetTickCount();  
if not PostRecv(Link, buf) then  
begin  
EnterCriticalSection(LinkSec);  
EnterCriticalSection(Section);  
if Socket INVALID_SOCKET then  
begin  
try  
CloseSocket(Socket);  
except  
WriteLog(Format('CloseSocket3:%d error', [Worker^.ID]));  
end;  
Socket := INVALID_SOCKET;  
Link^.Worker := Worker;  
try  
OnDisconnectEvt(Link);  
except  
WriteLog(Format('OnDisconnectEvt3:%d error', [Worker^.ID]));  
end;  
Link^.Worker := nil;  
FreeLink(Link);  
end;  
LeaveCriticalSection(Section);  
LeaveCriticalSection(LinkSec);  
FreeIOMem(buf);  
end;  
end;  
except  
WriteLog(Format('Worker thread:%d error', [Worker^.ID]));  
end;  
end;  
finally  
CoUninitialize();  
WriteLog(Format('Worker thread:%d end', [Worker^.ID]));  
SetEvent(Worker^.Finished);  
end;  
end;  
procedure CreateWorkerThread(CompletionPort: THandle);  
var  
Worker, Workers: PWorker;  
i: Integer;  
SystemInfo: TSystemInfo;  
ThreadHandle: THandle;  
begin  
GetSystemInfo(SystemInfo);  
Workers := nil;  
WorkerCount := (SystemInfo.dwNumberOfProcessors * 2 + 2);  
for i := 1 to WorkerCount do  
begin  
Worker := HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(TWorker));  
if Workers = nil then  
begin  
Workers := Worker;  
WorkerHead := Workers;  
end else  
begin  
Workers^.Next := Worker;  
Workers := Worker;  
end;  
Worker^.CompletionPort := CompletionPort;  
Worker^.Data := OnWorkerThreadCreateEvt(False);  
Worker^.Finished := CreateEvent(nil, True, False, nil);  
ThreadHandle := CreateThread(nil, 0, @WorkerThread, Worker, 0, Worker^.ID);  
if ThreadHandle 0 then  
CloseHandle(ThreadHandle);  
end;  
Workers^.Next := nil;  
end;  
procedure DestroyWorkerThread();  
var  
Worker, Save: PWorker;  
begin  
WorkerCount := 0;  
Worker := WorkerHead;  
while Worker nil do  
begin  
PostQueuedCompletionStatus(Worker^.CompletionPort, 0, 0, nil);  
Worker := Worker^.Next;  
end;  
Worker := WorkerHead;  
while Worker nil do  
begin  
with Worker^ do  
begin  
WaitForSingleObject(Worker^.Finished, INFINITE);  
CloseHandle(Worker^.Finished);  
Save := Worker^.Next;  
end;  
HeapFree(GetProcessHeap(), HEAP_NO_SERIALIZE, Worker);  
Worker := Save;  
end;  
end;  
function GetWorkerExecInfo(Index: Integer; var TickCount: DWord): Integer;  
var  
Worker: PWorker;  
Count: Integer;  
begin  
Worker := WorkerHead;  
Count := 0;  
Result := 0;  
while Worker nil do  
with Worker^ do  
begin  
Count := Count + 1;  
if Count = Index then  
begin  
TickCount := TickCountLong;  
TickCountLong := 0;  
Result := Worker^.ExecCount;  
break;  
end;  
Worker := Worker^.Next;  
end;  
end;  
function GetWorkerID(Index: Integer): Integer;  
var  
Worker: PWorker;  
Count: Integer;  
begin  
Worker := WorkerHead;  
Count := 0;  
while Worker nil do  
begin  
Count := Count + 1;  
if Count = Index then  
begin  
Count := Worker^.ID;  
break;  
end;  
Worker := Worker^.Next;  
end;  
Result := Count;  
end;  
{===============================================================================  
服務線程  
================================================================================}  
function OnServerThreadCreateDef(IsWorkerThread: Boolean): Pointer;  
begin  
Result := nil;  
end;  
var  
ListenSocket: TSocket = INVALID_SOCKET;  
SocketEvent: THandle = WSA_INVALID_EVENT;  
CompletionPort: THandle = 0;  
Terminated: Boolean = False;  
ServerThreadID: DWORD = 0;  
ServerExecCount: Integer = 0;  
ServerExecLong: DWord = 0;  
OnServerThreadCreateEvt: TOnThreadCreateEvt = OnServerThreadCreateDef;  
ServerFinished: THandle;  
function GetServerExecCount(): Integer;  
begin  
Result := ServerExecCount;  
end;  
function GetServerExecLong(): DWord;  
begin  
Result := ServerExecLong;  
ServerExecLong := 0;  
end;  
  
function ServerThread(Param: Pointer): DWORD; stdcall;  
var  
AcceptSocket: TSocket;  
Addr: TSockAddrIn;  
Len: Integer;  
Link: PLink;  
IOMem: Pointer;  
bNodelay: Boolean;  
TickCount: DWord;  
WR: DWord;  
begin  
Result := 0;  
CoInitialize(nil);  
WriteLog('Server thread begin');  
TickCount := GetTickCount();  
try  
while not Terminated do  
begin  
try  
ServerExecLong := ServerExecLong + (GetTickCount() - TickCount);  
WR := WaitForSingleObject(SocketEvent, 10000);  
  
ServerExecCount := ServerExecCount + 1;  
TickCount := GetTickCount();  
  
if (WAIT_TIMEOUT = WR) then  
begin  
CheckLinkLinkIdleOvertime(Param);  
continue;  
end else  
if (WAIT_FAILED = WR) then  
begin  
continue;  
end else  
begin  
Len := SizeOf(TSockAddrIn);  
AcceptSocket := WSAAccept(ListenSocket, @Addr, @Len, nil, 0);  
if (AcceptSocket = INVALID_SOCKET) then  
continue;  
if LinkCount >= SOCK_MAX_COUNT then  
begin  
try  
CloseSocket(AcceptSocket);  
except  
WriteLog('Link count over');  
end;  
continue;  
end;  
  
bNodelay := True;  
if SetSockOpt(AcceptSocket, IPPROTO_TCP, TCP_NODELAY,  
PChar(@bNodelay), sizeof(bNodelay)) = SOCKET_ERROR then  
begin  
try  
CloseSocket(AcceptSocket);  
except  
WriteLog('SetSockOpt: error');  
end;  
continue;  
end;  
EnterCriticalSection(LinkSec);  
Link := GetLink();  
with Link^ do  
begin  
EnterCriticalSection(Section);  
RemoteIP := inet_ntoa(Addr.sin_addr);  
RemotePort := Addr.sin_port;  
TickCountActive := GetTickCount();  
Socket := AcceptSocket;  
IOMem := GetIOMem();  
if (CreateIoCompletionPort(AcceptSocket, CompletionPort, DWORD(Link), 0) = 0) or  
(not PostRecv(Link, IOMem)) then  
begin  
try  
CloseSocket(Socket);  
except  
WriteLog('CreateIoCompletionPort or PostRecv: error');  
end;  
Socket := INVALID_SOCKET;  
FreeLink(Link);  
FreeIOMem(IOMem);  
end;  
LeaveCriticalSection(Section);  
end;  
LeaveCriticalSection(LinkSec);  
end;  
except  
WriteLog('Server thread error');  
end;  
end;  
finally  
CoUninitialize();  
WriteLog('Server thread end');  
SetEvent(ServerFinished);  
end;  
end;  
function StartTcpServer(RemoteIP: String; RemotePort: DWord): Boolean;  
var  
NonBlock: Integer;  
bNodelay: Boolean;  
Addr: TSockAddrIn;  
ThreadHandle: THANDLE;  
begin  
Result := ListenSocket = INVALID_SOCKET;  
if not Result then  
exit;  
IniIOMem();  
IniLink();  
  
ListenSocket := WSASocket(AF_INET, SOCK_STREAM, 0, nil, 0, WSA_FLAG_OVERLAPPED);  
Result := ListenSocket INVALID_SOCKET;  
if not Result then  
begin  
DeleteLink();  
DeleteIOMem();  
exit;  
end;  
bNodelay := True;  
NonBlock := 1;  
Addr.sin_family := AF_INET;  
Addr.sin_addr.s_addr := inet_addr(PChar(RemoteIP));  
Addr.sin_port := htons(RemotePort);  
Result := (SetSockOpt(ListenSocket, IPPROTO_TCP, TCP_NODELAY, PChar(@bNodelay), sizeof(bNodelay)) SOCKET_ERROR) and  
(ioctlsocket(ListenSocket, Integer(FIONBIO), NonBlock) SOCKET_ERROR) and  
(Bind(ListenSocket, @Addr, SizeOf(TSockAddrIn)) SOCKET_ERROR) and  
(Listen(ListenSocket, SOMAXCONN) SOCKET_ERROR);  
if not Result then  
begin  
ListenSocket := INVALID_SOCKET;  
DeleteLink();  
DeleteIOMem();  
exit;  
end;  
SocketEvent := CreateEvent(nil, FALSE, FALSE, nil);  
Result := (SocketEvent WSA_INVALID_EVENT);  
if (not Result) then  
begin  
CloseSocket(ListenSocket);  
ListenSocket := INVALID_SOCKET;  
DeleteLink();  
DeleteIOMem();  
exit;  
end;  
Result := (WSAEventSelect(ListenSocket, SocketEvent, FD_ACCEPT) SOCKET_ERROR);  
if not Result then  
begin  
CloseSocket(ListenSocket);  
ListenSocket := INVALID_SOCKET;  
WSACloseEvent(SocketEvent);  
SocketEvent := WSA_INVALID_EVENT;  
DeleteLink();  
DeleteIOMem();  
exit;  
end;  
CompletionPort := CreateIoCompletionPort(INVALID_HANDLE_value, 0, 0, 0);  
Result := CompletionPort 0;  
if not Result then  
begin  
CloseSocket(ListenSocket);  
ListenSocket := INVALID_SOCKET;  
WSACloseEvent(SocketEvent);  
SocketEvent := WSA_INVALID_EVENT;  
DeleteLink();  
DeleteIOMem();  
exit;  
end;  
WriteLog('Server Start');  
CreateWorkerThread(CompletionPort);  
ServerFinished := CreateEvent(nil, True, False, nil);  
Result := ServerFinished 0;  
if not Result then  
begin  
CloseSocket(ListenSocket);  
ListenSocket := INVALID_SOCKET;  
WSACloseEvent(SocketEvent);  
SocketEvent := WSA_INVALID_EVENT;  
DeleteLink();  
DeleteIOMem();  
exit;  
end;  
Terminated := False;  
ThreadHandle := CreateThread(nil, 0, @ServerThread, OnServerThreadCreateEvt(False), 0, ServerThreadID);  
if (ThreadHandle = 0) then  
begin  
StopTcpServer();  
exit;  
end;  
CloseHandle(ThreadHandle);  
end;  
function StopTcpServer(): Boolean;  
begin  
Result := ListenSocket INVALID_SOCKET;  
if not Result then  
exit;  
WriteLog('Server Stop');  
Terminated := True;  
if ServerFinished 0 then  
begin  
WaitForSingleObject(ServerFinished, INFINITE);  
CloseHandle(ServerFinished);  
ServerFinished := 0;  
end;  
if SocketEvent 0 then  
WSACloseEvent(SocketEvent);  
SocketEvent := 0;  
DestroyWorkerThread();  
if ListenSocket INVALID_SOCKET then  
CloseSocket(ListenSocket);  
ListenSocket := INVALID_SOCKET;  
if CompletionPort 0 then  
CloseHandle(CompletionPort);  
CompletionPort := 0;  
ServerExecCount := 0;  
ServerExecLong := 0;  
DeleteLink();  
DeleteIOMem();  
end;  
function GetLocalIP(IsIntnetIP: Boolean): String;  
type  
TaPInAddr = Array[0..10] of PInAddr;  
PaPInAddr = ^TaPInAddr;  
var  
phe: PHostEnt;  
pptr: PaPInAddr;  
Buffer: Array[0..63] of Char;  
I: Integer;  
begin  
Result := '0.0.0.0';  
try  
GetHostName(Buffer, SizeOf(Buffer));  
phe := GetHostByName(buffer);  
if phe = nil then  
Exit;  
pPtr := PaPInAddr(phe^.h_addr_list);  
if IsIntnetIP then  
begin  
I := 0;  
while pPtr^[I] nil do  
begin  
Result := inet_ntoa(pptr^[I]^);  
Inc(I);  
end;  
end else  
Result := inet_ntoa(pptr^[0]^);  
except  
end;  
end;  
procedure SetEventProc(OnReceive: TOnReceiveEvt;  
OnDisconnect: TOnDisconnectEvt;  
OnLinkIdleOvertime: TOnLinkIdleOvertimeEvt;  
OnServerThreadCreate: TOnThreadCreateEvt;  
OnWorkerThreadCreate: TOnThreadCreateEvt);  
begin  
OnReceiveEvt := OnReceive;  
OnDisconnectEvt := OnDisconnect;  
OnLinkIdleOvertimeEvt := OnLinkIdleOvertime;  
OnServerThreadCreateEvt := OnServerThreadCreate;  
OnWorkerThreadCreateEvt := OnWorkerThreadCreate;  
end;  
function PostRecv(Link: PLink; IOMem: Pointer): Boolean;  
var  
Flags: DWord;  
Bytes: DWord;  
IOInfo: PIOInfo;  
begin  
Result := Link^.Socket INVALID_SOCKET;  
if Result then  
try  
Flags := 0;  
Bytes := 0;  
IOInfo := PIOInfo(Integer(IOMem) - sizeof(TIOInfo));  
with IOInfo^ do  
begin  
ZeroMemory(IOInfo, sizeof(TIOInfo));  
DataBuf.buf := IOMem;  
DataBuf.len := IO_MEM_SIZE;  
Socket := Link^.Socket;  
Flag := IO_READ;  
Result := (WSARecv(Socket, @DataBuf, 1, @Bytes, @Flags, @Overlapped, nil) SOCKET_ERROR) or  
(WSAGetLastError() = ERROR_IO_PENDING);  
end;  
except  
Result := False;  
WriteLog('PostRecv: error');  
end;  
end;  
function PostSend(Link: PLink; IOMem: Pointer; Len: Integer): Boolean;  
var  
Bytes: DWord;  
IOInfo: PIOInfo;  
begin  
Result := Link^.Socket INVALID_SOCKET;  
if Result then  
try  
Bytes := 0;  
IOInfo := PIOInfo(Integer(IOMem) - sizeof(TIOInfo));  
with IOInfo^ do  
begin  
ZeroMemory(IOInfo, sizeof(TIOInfo));  
DataBuf.buf := IOMem;  
DataBuf.len := Len;  
Socket := Link^.Socket;  
Flag := IO_WRITE;  
Result := (WSASend(Socket, @(DataBuf), 1, @Bytes, 0, @(Overlapped), nil) SOCKET_ERROR) or  
(WSAGetLastError() = ERROR_IO_PENDING);  
end;  
except  
Result := False;  
WriteLog('PostSend: error');  
end;  
end;  
procedure PostBroadcast(Buf: PByte; Len: Integer);  
var  
IOMem: Pointer;  
Link: PLink;  
begin  
EnterCriticalSection(LinkSec);  
Link := LinksHead;  
while Link nil do  
with Link^ do  
begin  
if Socket INVALID_SOCKET then  
begin  
IOMem := GetIOMem();  
CopyMemory(IOMem, Buf, Len);  
if not PostSend(Link, IOMem, Len) then  
FreeIOMem(IOMem);  
end;  
Link := Link^.Next;  
end;  
LeaveCriticalSection(LinkSec);  
end;  
function IsTcpServerActive(): Boolean;  
begin  
Result := ListenSocket INVALID_SOCKET;  
end;  
{===============================================================================  
日誌管理  
================================================================================}  
var  
LogSec: TRTLCriticalSection;  
Inifile: TIniFile;  
LogCount: Integer = 0;  
LogName: String = '';  
procedure WriteLog(Log: String);  
begin  
EnterCriticalSection(LogSec);  
try  
LogCount := LogCount + 1;  
IniFile.WriteString(LogName,  
'Index' + IntToStr(LogCount),  
DateTimeToStr(Now()) + ':' + Log);  
finally  
LeaveCriticalSection(LogSec);  
end;  
end;  
{===============================================================================  
初始化Window Socket  
================================================================================}  
var  
WSAData: TWSAData;  
  
procedure Startup;  
var  
ErrorCode: Integer;  
begin  
ErrorCode := WSAStartup( {$SK_blogItemTitle$}  
{$SK_ItemBody$}  
  
{$SK_blogDiary$} {$SK_blogItemLink$} {$SK_blogItemComm$} {$SK_blogItemQuote$} {$SK_blogItemVisit$}  
  
01, WSAData);  
if ErrorCode 0 then  
WriteLog('Window Socket init Error!');  
end;  
procedure Cleanup;  
var  
ErrorCode: Integer;  
begin  
ErrorCode := WSACleanup;  
if ErrorCode 0 then  
WriteLog('Window Socket cleanup error!');  
end;  
function GetExePath(): String;  
var  
ModuleName: array[0..1024] of char;  
begin  
GetModuleFileName(MainInstance, ModuleName, SizeOf(ModuleName));  
Result := ExtractFilePath(ModuleName);  
end;  
initialization  
LogName := DateTimeToStr(Now());  
InitializeCriticalSection(LogSec);  
ExePath := GetExePath();  
IniFile := TIniFile.Create(ExePath + 'Logs.Ini');  
Startup();  
finalization  
Cleanup();  
DeleteCriticalSection(LogSec);  
IniFile.Destroy();  
  
end.  
  
主窗口單元源碼:  
unit uMainTcpServerIOCP;  
interface  
uses  
Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,  
Dialogs, ExtCtrls, StdCtrls, ComCtrls, UTcpServer, Sockets, Grids;  
type  
TfrmMainUTcpServerIOCP = class(TForm)  
Label1: TLabel;  
Label2: TLabel;  
edtIP: TEdit;  
edtPort: TEdit;  
btn: TButton;  
Timer1: TTimer;  
Label3: TLabel;  
lbIO: TLabel;  
Label5: TLabel;  
lbIOU: TLabel;  
Label7: TLabel;  
lbL: TLabel;  
Label9: TLabel;  
lbLU: TLabel;  
Label11: TLabel;  
lbLS: TLabel;  
Label13: TLabel;  
lbW: TLabel;  
Info: TStringGrid;  
Label4: TLabel;  
lbWC: TLabel;  
Label8: TLabel;  
lbWU: TLabel;  
Label12: TLabel;  
lbLF: TLabel;  
Label15: TLabel;  
lbLFL: TLabel;  
Label6: TLabel;  
lbIOF: TLabel;  
lbIOFL: TLabel;  
Label16: TLabel;  
Timer2: TTimer;  
procedure btnClick(Sender: TObject);  
procedure FormCreate(Sender: TObject);  
procedure Timer1Timer(Sender: TObject);  
procedure FormDestroy(Sender: TObject);  
procedure Timer2Timer(Sender: TObject);  
private  
{ Private declarations }  
FTickCount: DWord;  
public  
{ Public declarations }  
end;  
var  
frmMainUTcpServerIOCP: TfrmMainUTcpServerIOCP;  
implementation  
{$R *.dfm}  
{ TfrmMainUTcpServerIOCP }  
procedure TfrmMainUTcpServerIOCP.btnClick(Sender: TObject);  
var  
i: Integer;  
C1: Integer;  
C2: DWord;  
DT: TDateTime;  
begin  
if btn.Caption = 'Open' then  
begin  
StartTcpServer(edtIP.Text, StrToInt(edtPort.Text));  
if IsTcpServerActive() then  
begin  
FTickCount := GetTickCount();  
Info.RowCount := GetWorkerCount() + 1;  
DT := Now();  
for i := 1 to Info.RowCount - 1 do  
begin  
Info.Cells[0, i] := IntToStr(i);  
Info.Cells[1, i] := IntToStr(GetWorkerID(i));  
C1 := GetWorkerExecInfo(i, C2);  
Info.Cells[2, i] := IntToStr(C1);  
Info.Cells[3, i] := '0';  
Info.Cells[4, i] := IntToStr(C2);  
Info.Cells[5, i] := '0';  
Info.Cells[6, i] := DateTimeToStr(DT);  
end;  
Timer1.Enabled := True;  
end;  
end else  
begin  
Timer1.Enabled := False;  
StopTcpServer();  
end;  
if IsTcpServerActive() then  
btn.Caption := 'Close'  
else  
btn.Caption := 'Open';  
end;  
procedure TfrmMainUTcpServerIOCP.FormCreate(Sender: TObject);  
begin  
edtIP.Text := GetLocalIP(False);  
Info.ColCount := 7;  
Info.RowCount := 2;  
Info.ColWidths[0] := 30;  
Info.ColWidths[1] := 30;  
Info.ColWidths[2] := 40;  
Info.ColWidths[3] := 40;  
Info.ColWidths[4] := 30;  
Info.ColWidths[5] := 40;  
Info.ColWidths[6] := 110;  
Info.Cells[0, 0] := '序號';  
Info.Cells[1, 0] := 'ID';  
Info.Cells[2, 0] := '計數';  
Info.Cells[3, 0] := '次/S';  
Info.Cells[4, 0] := '時長';  
Info.Cells[5, 0] := '使用率';  
Info.Cells[6, 0] := '時間';  
end;  
procedure TfrmMainUTcpServerIOCP.Timer1Timer(Sender: TObject);  
var  
i: Integer;  
Count1, Count2, Count3, TC, TCC: DWord;  
begin  
if not IsTcpServerActive() then  
begin  
Timer1.Enabled := False;  
exit;  
end;  
TC := GetTickCount();  
TCC := TC - FTickCount;  
if TCC = 0 then  
TCC := $FFFFFFFF;  
lbWC.Caption := IntToStr(GetServerExecCount());  
lbWU.Caption := FloatToStrF(GetServerExecLong() / TCC * 100, ffFixed, 10, 3) + '%';  
for i := 1 to Info.RowCount - 1 do  
begin  
Count1 := GetWorkerExecInfo(i, Count2);  
TC := GetTickCount();  
TCC := TC - FTickCount;  
if TCC = 0 then  
TCC := $FFFFFFFF;  
  
Count3 := StrToInt(Info.Cells[2, i]);  
if Count1 Count3 then  
begin  
Info.Cells[2, i] := IntToStr(Count1);  
Info.Cells[3, i] := IntToStr(Count1 - Count3);  
Info.Cells[4, i] := IntToStr(Count2);  
Info.Cells[5, i] := FloatToStrF(Count2 / TCC * 100, ffFixed, 10, 1) + '%';  
Info.Cells[6, i] := DateTimeToStr(Now());  
end;  
end;  
FTickCount := TC;  
lbIO.Caption := IntToStr(GetIOMemSize());  
lbIOU.Caption := FloatToStrF(GetIOMemUse(), ffFixed, 10, 3) + '%';  
Count1 := GetIOMemFree();  
lbIOF.Caption := IntToStr(Count1);  
lbIOFL.Caption := FloatToStrF(Count1 / IO_MEM_MAX_COUNT * 100, ffFixed, 10, 3) + '%';  
lbW.Caption := IntToStr(GetWorkerCount());  
lbL.Caption := IntToStr(GetLinkSize());  
Count1 := GetLinkFree();  
lbLF.Caption := IntToStr(Count1);  
lbLFL.Caption := FloatToStrF(Count1 / SOCK_MAX_COUNT * 100, ffFixed, 10, 3) + '%';  
lbLU.Caption := FloatToStrF(GetLinkUse(), ffFixed, 10, 3) + '%';  
lbLS.Caption := IntToStr(GetLinkCount());  
end;  
procedure TfrmMainUTcpServerIOCP.FormDestroy(Sender: TObject);  
begin  
StopTcpServer();  
end;  
procedure TfrmMainUTcpServerIOCP.Timer2Timer(Sender: TObject);  
begin  
if not IsTcpServerActive() then  
begin  
Timer1.Enabled := False;  
exit;  
end;  
PostBroadcast(PByte(PChar('這是來自服務器的數據!')), 21);  
end;  
end.
相關文章
相關標籤/搜索