不少人費盡心思,都沒有找到一個完美的 I/O CP 例程,甚至跟人於誤解,先將本人編寫的例程公佈出來,但願對那些苦苦尋覓的人帶來收穫。本例程能夠做爲初學者的學習之用,亦能夠做爲大型服務程序的通信模塊。其處理速度能夠說,優化到了極點。若是理解了本例程的精髓,加上一個高效的通信協議,你徹底能夠用它來構建一個高性能的通信服務器。 在公佈代碼前,先談談I/O CP。對I/O CP的函數很少作說明了,網上不少,都同樣。在此本人僅說一些技術上要注意的問題。 1、如何管理內存 一、IO數據緩衝管理 動態分配內存,是一種靈活的方式。但對於系統資源浪費是巨大的。所以本人採用的是預先分配服務器最大須要的內存,用鏈表來管理。任什麼時候候分配交還都不須要遍歷,僅須要互斥而已。 更巧妙的是,將IO發送信息和內存塊有機的結合在一塊兒,減小了鏈表的管理工做。 //IO操做標誌 TIOFlag = (IO_ACCEPT, IO_READ, IO_WRITE); //IO操做信息 PIOInfo =^ TIOInfo; TIOInfo = packed record Overlapped: TOverlapped; //重疊結構 DataBuf: TWSABUF; //IO數據信息 Socket: TSocket; Flag: TIOFlag; TickCountSend: DWord; Next: PIOInfo; Prior: PIOInfo; end; PUNode =^ TUNode; TUNode = record Next: Pointer; end; PIOMem =^ TIOMem; TIOMem = packed record IOInfo: TIOInfo; Data: array[1..IO_MEM_SIZE] of Byte; //申請內存的時候,返回的是Data的地址 end; 二、鏈路數據管理 採用雙向鏈表結構,減小刪除節點時遍歷消耗的時間 //每一個鏈接的信息 PLink =^ TLink; TLink = record Socket: TSocket; RemoteIP: string[30]; RemotePort: DWord; //最後收到數據時的系統節拍 TickCountActive: DWord; //處理該鏈接的當前線程的信息 Worker: PWorker; Data: Pointer; //應用層能夠設置這個成員,當OnReceive的時候,就不要每次遍歷每一個鏈接對應的數據區了 Section: TRTLCriticalSection; Next: PLink; Prior: PLink; end; 2、如何管理線程 每一個工做線程建立的時候,調用:OnWorkerThreadCreateEvt,該函數能夠返回這個線程對應的信息,好比爲該線程建立的數據庫鏈接控件或對應的類等,在OnReceive的能夠從Link的Worker訪問該成員Worker^.Data。 //工做線程信息 PWorker =^ TWorker; TWorker = record ID: THandle; CompletionPort: THandle; Data: Pointer; //調用OnWorkerThreadCreateEvt返回的值 //用於反應工做狀況的數據 TickCountLong, TickCountActive: DWord; ExecCount: Integer; //線程完成後設置 Finished: THandle; Next: PWorker; end; 同理,服務線程也是具備同樣的特色。相見源碼。 關於線程同步,一直是衆多程序頭疼的問題。在本例程中,儘可能避免了過多的互斥,並有效地防止了死鎖現象。用RTLCriticalSection,稍微不注意,就會形成死鎖的災難。哪怕是兩行代碼的差異,對多線程而言都是災難的。在本例程中,對數據同步須要操做的是在維護鏈路鏈表方面上。服務線程須要計算哪一個鏈接空閒超時了,工做線程須要處理斷線狀況,應用層主動發送數據時須要對該鏈路獨佔,不然一個在發送,一個在處理斷線故障,就會發送衝突,致使災難後果。 在本人的壓力測試中,已經有效的解決了這個問題,應用層部分不須要作什麼同步工做,能夠安心的收發數據了。同時每一個線程都支持了數據庫鏈接。 3、到底要建立多少個工做線程合適 不少文章說,有N個CPU就建立N個線程,也有說N*2+2。最不喜歡說話不負責任的人了,本例程可讓剛入門 I/O CP 的人對它有更深刻的瞭解。 例程測試結果: 4、該不應使用類 有人說,拋棄一切類,對於服務器而言,會爲類付出不少代價,從個人觀點看,爲類付出代價的,主要是動態建立的緣由。其實,類成員訪問和結構成員訪問同樣,須要相對地址。若是都是預先建立的,二者沒有多大的差異。本例程採用裸奔函數的方式,固然在應用層能夠採用類來管理,很難想象,若是沒有沒有類,須要多作多少工做。 5、缺點 不能發大數據包,只能發不超過固定數的數據包。但對於小數據報而言,它將是優秀的。 時間緣由,不能作太多的解釋和對代碼作太多的註釋,須要例程源碼的能夠和本人聯繫,免費提供。QQ:48092788 例程源碼: http://d.download.csdn.net/down/1546336/guestcode 完成端口通信服務模塊源碼: {****************************************************************************** * UCode 系列組件、控件 * * 做者:盧益貴 2003~2009 * * 版權全部 任何未經受權的使用和銷售,均保留追究法律責任的權力 * * * * UCode 系列由XCtrls-YCtrls-ICtrls-NCode系列演變而來 * * QQ:48092788 luyigui.blog.gxsky.com * ******************************************************************************} {****************************************************************************** 完成端口模型的socket服務器 ******************************************************************************} unit UTcpServer; interface uses Windows, Classes, UClasses, UWinSock2; const //每一個IO緩衝區的大小 IO_MEM_SIZE = 2048; //內存要足夠用,可視狀況設置 IO_MEM_MAX_COUNT = 1000 * 10; //最大鏈接數 SOCK_MAX_COUNT = 3000; //鏈接空閒實現,超過這個時間未收到客戶端數據則關閉 SOCK_IDLE_OVERTIME = 60; type //工做線程信息 PWorker =^ TWorker; TWorker = record ID: THandle; CompletionPort: THandle; Data: Pointer; //用於反應工做狀況的數據 TickCountLong, TickCountActive: DWord; ExecCount: Integer; //線程完成後設置 Finished: THandle; Next: PWorker; end; //每一個鏈接的信息 PLink =^ TLink; TLink = record Socket: TSocket; RemoteIP: string[30]; RemotePort: DWord; //最後收到數據時的系統節拍 TickCountActive: DWord; //處理該鏈接的當前線程的信息 Worker: PWorker; Data: Pointer; Section: TRTLCriticalSection; Next: PLink; Prior: PLink; end; TOnLinkIdleOvertimeEvt = procedure(Link: PLink); TOnDisconnectEvt = procedure(Link: PLink); TOnReceiveEvt = function(Link: PLink; Buf: PByte; Len: Integer): Boolean; TOnThreadCreateEvt = function(IsWorkerThread: Boolean): Pointer; //取得鏈路鏈表使用狀況X% function GetLinkUse(): real; //鏈路鏈表所佔內存 function GetLinkSize(): Integer; //當前鏈路數 function GetLinkCount(): Integer; //空閒鏈路數 function GetLinkFree(): Integer; //IO內存使用狀況 function GetIOMemUse(): Real; //IO內存鏈表佔內存數 function GetIOMemSize(): Integer; //IO內存空閒數 function GetIOMemFree(): Integer; //交還一個IO內存 procedure FreeIOMem(Mem: Pointer); //獲取一個IO內存區 function GetIOMem(): Pointer; //獲取工做線程的工做狀況 function GetWorkerExecInfo(Index: Integer; var TickCount: DWord): Integer; //獲取工做線程的ID function GetWorkerID(Index: Integer): Integer; //獲取工做線程數量 function GetWorkerCount(): Integer; //打開一個IP端口,並監聽 function StartTcpServer(RemoteIP: String; RemotePort: DWord): Boolean; //中止並關閉一個IP端口 function StopTcpServer(): Boolean; //設置響應事件的函數指針,在StartTcpServer以前調用 procedure SetEventProc(OnReceive: TOnReceiveEvt; OnDisconnect: TOnDisconnectEvt; OnLinkIdleOvertime: TOnLinkIdleOvertimeEvt; OnServerThreadCreate: TOnThreadCreateEvt; OnWorkerThreadCreate: TOnThreadCreateEvt); //寫日誌文件 procedure WriteLog(Log: String); function PostRecv(Link: PLink; IOMem: Pointer): Boolean; //拋出一個發送事件 function PostSend(Link: PLink; IOMem: Pointer; Len: Integer): Boolean; //廣播數據到全部的鏈路對方 procedure PostBroadcast(Buf: PByte; Len: Integer); //當前是否打開 function IsTcpServerActive(): Boolean; //獲取服務線程最後一次工做所佔的時間(MS) function GetServerExecLong(): DWord; //獲取服務線程工做次數 function GetServerExecCount(): Integer; //獲取本地或對外IP地址 function GetLocalIP(IsIntnetIP: Boolean): String; implementation uses IniFiles, SysUtils, ActiveX; var ExePath: String = ''; const HEAP_NO_SERIALIZE = 1; {非互斥, 此標記可容許多個線程同時訪問此堆} HEAP_GENERATE_EXCEPTIONS = 4; {當創建堆出錯時, 此標記可激發一個異常並返回異常標識} HEAP_ZERO_MEMORY = 8; {把分配的內存初始化爲 0} HEAP_REALLOC_IN_PLACE_ONLY = 16; {此標記不容許改變原來的內存位置} STATUS_ACCESS_VIOLATION = DWORD($C0000005); {參數錯誤} STATUS_NO_MEMORY = DWORD($C0000017); {內存不足} {=============================================================================== IO內存管理 ================================================================================} type //IO操做標誌 TIOFlag = (IO_ACCEPT, IO_READ, IO_WRITE); //IO操做信息 PIOInfo =^ TIOInfo; TIOInfo = packed record Overlapped: TOverlapped; //重疊結構 DataBuf: TWSABUF; //IO數據信息 Socket: TSocket; Flag: TIOFlag; TickCountSend: DWord; Next: PIOInfo; Prior: PIOInfo; end; PUNode =^ TUNode; TUNode = record Next: Pointer; end; PIOMem =^ TIOMem; TIOMem = packed record IOInfo: TIOInfo; Data: array[1..IO_MEM_SIZE] of Byte; end; var IOMemHead: PIOMem = nil; IOMemLast: PIOMem = nil; IOMemUse: Integer = 0; IOMemSec: TRTLCriticalSection; IOMemList: array[1..IO_MEM_MAX_COUNT] of Pointer; function GetIOMem(): Pointer; begin //內存要足夠用,若是不夠,即便是動態分配,神仙也救不了 EnterCriticalSection(IOMemSec); try try Result := @(IOMemHead^.Data); IOMemHead := PUNode(IOMemHead)^.Next; IOMemUse := IOMemUse + 1; except Result := nil; WriteLog('GetIOMem: error'); end; finally LeaveCriticalSection(IOMemSec); end; end; procedure FreeIOMem(Mem: Pointer); begin EnterCriticalSection(IOMemSec); try try Mem := Pointer(Integer(Mem) - sizeof(TIOInfo)); PUNode(Mem).Next := nil; PUNode(IOMemLast)^.Next := Mem; IOMemLast := Mem; IOMemUse := IOMemUse - 1; except WriteLog('FreeIOMem: error'); end; finally LeaveCriticalSection(IOMemSec); end; end; procedure IniIOMem(); var i: Integer; Heap: THandle; begin InitializeCriticalSection(IOMemSec); IOMemHead := HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(TIOMem)); IOMemLast := IOMemHead; IOMemList[1] := IOMemHead; Heap := GetProcessHeap(); for i := 2 to IO_MEM_MAX_COUNT do begin PUNode(IOMemLast)^.Next := HeapAlloc(Heap, HEAP_ZERO_MEMORY, sizeof(TIOMem)); IOMemList[i] := PUNode(IOMemLast)^.Next; IOMemLast := PUNode(IOMemLast)^.Next; end; PUNode(IOMemLast).Next := nil; end; function GetIOMemFree(): Integer; var IOMems: PUNode; begin EnterCriticalSection(IOMemSec); Result := 0; IOMems := PUNode(IOMemHead); while IOMems nil do begin Result := Result + 1; IOMems := IOMems^.Next; end; LeaveCriticalSection(IOMemSec); end; procedure DeleteIOMem(); var i: Integer; Heap: THandle; begin Heap := GetProcessHeap(); for i := 1 to IO_MEM_MAX_COUNT do HeapFree(Heap, HEAP_NO_SERIALIZE, IOMemList[i]); IOMemUse := 0; DeleteCriticalSection(IOMemSec); end; function GetIOMemSize(): Integer; begin Result := IO_MEM_MAX_COUNT * sizeof(TIOMem); end; function GetIOMemUse(): Real; begin Result := (IOMemUse * 100) / IO_MEM_MAX_COUNT; end; {=============================================================================== Socket鏈路管理 ================================================================================} procedure OnLinkIdleOvertimeDef(Link: PLink); begin end; var LinkHead: PLink = nil; LinkLast: PLink = nil; LinkUse: Integer = 0; LinkCount: Integer = 0; LinkSec: TRTLCriticalSection; LinkList: array[1..SOCK_MAX_COUNT] of PLink; OnLinkIdleOvertimeEvt: TOnLinkIdleOvertimeEvt = OnLinkIdleOvertimeDef; LinksHead: PLink = nil; LinksLast: PLink = nil; function GetLinkFree(): Integer; var Links: PLink; begin EnterCriticalSection(LinkSec); Result := 0; Links := LinkHead; while Links nil do begin Result := Result + 1; Links := Links^.Next; end; LeaveCriticalSection(LinkSec); end; function GetLink(): PLink; begin try //內存要足夠用,若是不夠,即便是動態分配,神仙也救不了 Result := LinkHead; LinkHead := LinkHead^.Next; LinkUse := LinkUse + 1; LinkCount := LinkCount + 1; if LinksHead = nil then begin LinksHead := Result; LinksHead^.Next := nil; LinksHead^.Prior := nil; LinksLast := LinksHead; end else begin Result^.Prior := LinksLast; LinksLast^.Next := Result; LinksLast := Result; LinksLast^.Next := nil; end; with Result^ do begin Socket := INVALID_SOCKET; RemoteIP := ''; RemotePort := 0; TickCountActive := GetTickCount(); Worker := nil; Data := nil; end; except Result := nil; WriteLog('GetLink: error'); end; end; procedure FreeLink(Link: PLink); begin try with Link^ do begin Link^.Worker := nil; if Link = LinksHead then begin LinksHead := Next; if LinksLast = Link then LinksLast := LinksHead else LinksHead^.Prior := nil; end else begin Prior^.Next := Next; if Next nil then Next^.Prior := Prior; if Link = LinksLast then LinksLast := Prior; end; Next := nil; LinkLast^.Next := Link; LinkLast := Link; LinkUse := LinkUse - 1; LinkCount := LinkCount - 1; end; except WriteLog('FreeLink: error'); end; end; procedure CloseLink(Link: PLink); begin EnterCriticalSection(LinkSec); with Link^ do begin EnterCriticalSection(Section); if Socket INVALID_SOCKET then begin try CloseSocket(Socket); except WriteLog('CloseSocket: error'); end; Socket := INVALID_SOCKET; FreeLink(Link); end; LeaveCriticalSection(Link^.Section); end; LeaveCriticalSection(LinkSec); end; procedure CheckLinkLinkIdleOvertime(Data: Pointer); var TickCount: DWord; Long: Integer; Link: PLink; begin EnterCriticalSection(LinkSec); try TickCount := GetTickCount(); Link := LinksHead; while Link nil do with Link^ do begin EnterCriticalSection(Section); if Socket INVALID_SOCKET then begin if TickCount > TickCountActive then Long := TickCount - TickCountActive else Long := $FFFFFFFF - TickCountActive + TickCount; if SOCK_IDLE_OVERTIME * 1000 0 do i := i - 1; if not PostSend(Link, IOMem, Len) then FreeIOMem(IOMem); end; function OnWorkerThreadCreateDef(IsWorkerThread: Boolean): Pointer; begin Result := nil; end; var WorkerHead: PWorker = nil; WorkerCount: Integer = 0; OnDisconnectEvt: TOnDisconnectEvt = OnDisconnectDef; OnReceiveEvt: TOnReceiveEvt = OnReceiveDef; OnWorkerThreadCreateEvt: TOnThreadCreateEvt = OnWorkerThreadCreateDef; function GetWorkerCount(): Integer; begin Result := WorkerCount; end; function WorkerThread(Worker: PWorker): DWORD; stdcall; var Link: PLink; IOInfo: PIOInfo; Bytes: DWord; CompletionPort: THandle; begin Result := 0; CompletionPort := Worker^.CompletionPort; with Worker^ do begin TickCountActive := GetTickCount(); TickCountLong := 0; ExecCount := 0; end; WriteLog(Format('Worker thread:%d begin', [Worker^.ID])); CoInitialize(nil); try while True do begin try with Worker^ do TickCountLong := TickCountLong + GetTickCount() - TickCountActive; if GetQueuedCompletionStatus(CompletionPort, Bytes, DWORD(Link), POverlapped(IOInfo), INFINITE) = False then begin if (Link nil) then with Link^ do begin EnterCriticalSection(LinkSec); EnterCriticalSection(Section); if Link^.Socket INVALID_SOCKET then begin try CloseSocket(Socket); except WriteLog(Format('CloseSocket1:%d error', [Worker^.ID])); end; Socket := INVALID_SOCKET; Link^.Worker := Worker; try OnDisconnectEvt(Link); except WriteLog(Format('OnDisconnectEvt1:%d error', [Worker^.ID])); end; Link^.Worker := nil; FreeLink(Link); end; LeaveCriticalSection(Section); LeaveCriticalSection(LinkSec); end; if IOInfo nil then FreeIOMem(IOInfo^.DataBuf.buf); WriteLog(Format('GetQueuedCompletionStatus:%d error', [Worker^.ID])); continue; end; with Worker^ do begin TickCountActive := GetTickCount(); ExecCount := ExecCount + 1; end; if (Bytes = 0) then begin if (Link nil) then with Link^ do begin EnterCriticalSection(LinkSec); EnterCriticalSection(Section); if Link^.Socket INVALID_SOCKET then begin try CloseSocket(Socket); except WriteLog(Format('CloseSocket2:%d error', [Worker^.ID])); end; Socket := INVALID_SOCKET; Link^.Worker := Worker; try OnDisconnectEvt(Link); except WriteLog(Format('OnDisconnectEvt2:%d error', [Worker^.ID])); end; Link^.Worker := nil; FreeLink(Link); end; LeaveCriticalSection(Section); LeaveCriticalSection(LinkSec); if IOInfo.Flag = IO_WRITE then FreeIOMem(IOInfo^.DataBuf.buf) else FreeIOMem(IOInfo^.DataBuf.buf); continue; end else begin if IOInfo nil then FreeIOMem(IOInfo^.DataBuf.buf); break; end; end; if IOInfo.Flag = IO_WRITE then begin FreeIOMem(IOInfo^.DataBuf.buf); continue; end; {if IOInfo.Flag = IO_ACCEPT then begin ...... continue; end;} with Link^, IOInfo^.DataBuf do begin Link^.Worker := Worker; try OnReceiveEvt(Link, buf, Bytes); except WriteLog(Format('OnReceiveEvt:%d error', [Worker^.ID])); end; Link^.Worker := nil; TickCountActive := GetTickCount(); if not PostRecv(Link, buf) then begin EnterCriticalSection(LinkSec); EnterCriticalSection(Section); if Socket INVALID_SOCKET then begin try CloseSocket(Socket); except WriteLog(Format('CloseSocket3:%d error', [Worker^.ID])); end; Socket := INVALID_SOCKET; Link^.Worker := Worker; try OnDisconnectEvt(Link); except WriteLog(Format('OnDisconnectEvt3:%d error', [Worker^.ID])); end; Link^.Worker := nil; FreeLink(Link); end; LeaveCriticalSection(Section); LeaveCriticalSection(LinkSec); FreeIOMem(buf); end; end; except WriteLog(Format('Worker thread:%d error', [Worker^.ID])); end; end; finally CoUninitialize(); WriteLog(Format('Worker thread:%d end', [Worker^.ID])); SetEvent(Worker^.Finished); end; end; procedure CreateWorkerThread(CompletionPort: THandle); var Worker, Workers: PWorker; i: Integer; SystemInfo: TSystemInfo; ThreadHandle: THandle; begin GetSystemInfo(SystemInfo); Workers := nil; WorkerCount := (SystemInfo.dwNumberOfProcessors * 2 + 2); for i := 1 to WorkerCount do begin Worker := HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(TWorker)); if Workers = nil then begin Workers := Worker; WorkerHead := Workers; end else begin Workers^.Next := Worker; Workers := Worker; end; Worker^.CompletionPort := CompletionPort; Worker^.Data := OnWorkerThreadCreateEvt(False); Worker^.Finished := CreateEvent(nil, True, False, nil); ThreadHandle := CreateThread(nil, 0, @WorkerThread, Worker, 0, Worker^.ID); if ThreadHandle 0 then CloseHandle(ThreadHandle); end; Workers^.Next := nil; end; procedure DestroyWorkerThread(); var Worker, Save: PWorker; begin WorkerCount := 0; Worker := WorkerHead; while Worker nil do begin PostQueuedCompletionStatus(Worker^.CompletionPort, 0, 0, nil); Worker := Worker^.Next; end; Worker := WorkerHead; while Worker nil do begin with Worker^ do begin WaitForSingleObject(Worker^.Finished, INFINITE); CloseHandle(Worker^.Finished); Save := Worker^.Next; end; HeapFree(GetProcessHeap(), HEAP_NO_SERIALIZE, Worker); Worker := Save; end; end; function GetWorkerExecInfo(Index: Integer; var TickCount: DWord): Integer; var Worker: PWorker; Count: Integer; begin Worker := WorkerHead; Count := 0; Result := 0; while Worker nil do with Worker^ do begin Count := Count + 1; if Count = Index then begin TickCount := TickCountLong; TickCountLong := 0; Result := Worker^.ExecCount; break; end; Worker := Worker^.Next; end; end; function GetWorkerID(Index: Integer): Integer; var Worker: PWorker; Count: Integer; begin Worker := WorkerHead; Count := 0; while Worker nil do begin Count := Count + 1; if Count = Index then begin Count := Worker^.ID; break; end; Worker := Worker^.Next; end; Result := Count; end; {=============================================================================== 服務線程 ================================================================================} function OnServerThreadCreateDef(IsWorkerThread: Boolean): Pointer; begin Result := nil; end; var ListenSocket: TSocket = INVALID_SOCKET; SocketEvent: THandle = WSA_INVALID_EVENT; CompletionPort: THandle = 0; Terminated: Boolean = False; ServerThreadID: DWORD = 0; ServerExecCount: Integer = 0; ServerExecLong: DWord = 0; OnServerThreadCreateEvt: TOnThreadCreateEvt = OnServerThreadCreateDef; ServerFinished: THandle; function GetServerExecCount(): Integer; begin Result := ServerExecCount; end; function GetServerExecLong(): DWord; begin Result := ServerExecLong; ServerExecLong := 0; end; function ServerThread(Param: Pointer): DWORD; stdcall; var AcceptSocket: TSocket; Addr: TSockAddrIn; Len: Integer; Link: PLink; IOMem: Pointer; bNodelay: Boolean; TickCount: DWord; WR: DWord; begin Result := 0; CoInitialize(nil); WriteLog('Server thread begin'); TickCount := GetTickCount(); try while not Terminated do begin try ServerExecLong := ServerExecLong + (GetTickCount() - TickCount); WR := WaitForSingleObject(SocketEvent, 10000); ServerExecCount := ServerExecCount + 1; TickCount := GetTickCount(); if (WAIT_TIMEOUT = WR) then begin CheckLinkLinkIdleOvertime(Param); continue; end else if (WAIT_FAILED = WR) then begin continue; end else begin Len := SizeOf(TSockAddrIn); AcceptSocket := WSAAccept(ListenSocket, @Addr, @Len, nil, 0); if (AcceptSocket = INVALID_SOCKET) then continue; if LinkCount >= SOCK_MAX_COUNT then begin try CloseSocket(AcceptSocket); except WriteLog('Link count over'); end; continue; end; bNodelay := True; if SetSockOpt(AcceptSocket, IPPROTO_TCP, TCP_NODELAY, PChar(@bNodelay), sizeof(bNodelay)) = SOCKET_ERROR then begin try CloseSocket(AcceptSocket); except WriteLog('SetSockOpt: error'); end; continue; end; EnterCriticalSection(LinkSec); Link := GetLink(); with Link^ do begin EnterCriticalSection(Section); RemoteIP := inet_ntoa(Addr.sin_addr); RemotePort := Addr.sin_port; TickCountActive := GetTickCount(); Socket := AcceptSocket; IOMem := GetIOMem(); if (CreateIoCompletionPort(AcceptSocket, CompletionPort, DWORD(Link), 0) = 0) or (not PostRecv(Link, IOMem)) then begin try CloseSocket(Socket); except WriteLog('CreateIoCompletionPort or PostRecv: error'); end; Socket := INVALID_SOCKET; FreeLink(Link); FreeIOMem(IOMem); end; LeaveCriticalSection(Section); end; LeaveCriticalSection(LinkSec); end; except WriteLog('Server thread error'); end; end; finally CoUninitialize(); WriteLog('Server thread end'); SetEvent(ServerFinished); end; end; function StartTcpServer(RemoteIP: String; RemotePort: DWord): Boolean; var NonBlock: Integer; bNodelay: Boolean; Addr: TSockAddrIn; ThreadHandle: THANDLE; begin Result := ListenSocket = INVALID_SOCKET; if not Result then exit; IniIOMem(); IniLink(); ListenSocket := WSASocket(AF_INET, SOCK_STREAM, 0, nil, 0, WSA_FLAG_OVERLAPPED); Result := ListenSocket INVALID_SOCKET; if not Result then begin DeleteLink(); DeleteIOMem(); exit; end; bNodelay := True; NonBlock := 1; Addr.sin_family := AF_INET; Addr.sin_addr.s_addr := inet_addr(PChar(RemoteIP)); Addr.sin_port := htons(RemotePort); Result := (SetSockOpt(ListenSocket, IPPROTO_TCP, TCP_NODELAY, PChar(@bNodelay), sizeof(bNodelay)) SOCKET_ERROR) and (ioctlsocket(ListenSocket, Integer(FIONBIO), NonBlock) SOCKET_ERROR) and (Bind(ListenSocket, @Addr, SizeOf(TSockAddrIn)) SOCKET_ERROR) and (Listen(ListenSocket, SOMAXCONN) SOCKET_ERROR); if not Result then begin ListenSocket := INVALID_SOCKET; DeleteLink(); DeleteIOMem(); exit; end; SocketEvent := CreateEvent(nil, FALSE, FALSE, nil); Result := (SocketEvent WSA_INVALID_EVENT); if (not Result) then begin CloseSocket(ListenSocket); ListenSocket := INVALID_SOCKET; DeleteLink(); DeleteIOMem(); exit; end; Result := (WSAEventSelect(ListenSocket, SocketEvent, FD_ACCEPT) SOCKET_ERROR); if not Result then begin CloseSocket(ListenSocket); ListenSocket := INVALID_SOCKET; WSACloseEvent(SocketEvent); SocketEvent := WSA_INVALID_EVENT; DeleteLink(); DeleteIOMem(); exit; end; CompletionPort := CreateIoCompletionPort(INVALID_HANDLE_value, 0, 0, 0); Result := CompletionPort 0; if not Result then begin CloseSocket(ListenSocket); ListenSocket := INVALID_SOCKET; WSACloseEvent(SocketEvent); SocketEvent := WSA_INVALID_EVENT; DeleteLink(); DeleteIOMem(); exit; end; WriteLog('Server Start'); CreateWorkerThread(CompletionPort); ServerFinished := CreateEvent(nil, True, False, nil); Result := ServerFinished 0; if not Result then begin CloseSocket(ListenSocket); ListenSocket := INVALID_SOCKET; WSACloseEvent(SocketEvent); SocketEvent := WSA_INVALID_EVENT; DeleteLink(); DeleteIOMem(); exit; end; Terminated := False; ThreadHandle := CreateThread(nil, 0, @ServerThread, OnServerThreadCreateEvt(False), 0, ServerThreadID); if (ThreadHandle = 0) then begin StopTcpServer(); exit; end; CloseHandle(ThreadHandle); end; function StopTcpServer(): Boolean; begin Result := ListenSocket INVALID_SOCKET; if not Result then exit; WriteLog('Server Stop'); Terminated := True; if ServerFinished 0 then begin WaitForSingleObject(ServerFinished, INFINITE); CloseHandle(ServerFinished); ServerFinished := 0; end; if SocketEvent 0 then WSACloseEvent(SocketEvent); SocketEvent := 0; DestroyWorkerThread(); if ListenSocket INVALID_SOCKET then CloseSocket(ListenSocket); ListenSocket := INVALID_SOCKET; if CompletionPort 0 then CloseHandle(CompletionPort); CompletionPort := 0; ServerExecCount := 0; ServerExecLong := 0; DeleteLink(); DeleteIOMem(); end; function GetLocalIP(IsIntnetIP: Boolean): String; type TaPInAddr = Array[0..10] of PInAddr; PaPInAddr = ^TaPInAddr; var phe: PHostEnt; pptr: PaPInAddr; Buffer: Array[0..63] of Char; I: Integer; begin Result := '0.0.0.0'; try GetHostName(Buffer, SizeOf(Buffer)); phe := GetHostByName(buffer); if phe = nil then Exit; pPtr := PaPInAddr(phe^.h_addr_list); if IsIntnetIP then begin I := 0; while pPtr^[I] nil do begin Result := inet_ntoa(pptr^[I]^); Inc(I); end; end else Result := inet_ntoa(pptr^[0]^); except end; end; procedure SetEventProc(OnReceive: TOnReceiveEvt; OnDisconnect: TOnDisconnectEvt; OnLinkIdleOvertime: TOnLinkIdleOvertimeEvt; OnServerThreadCreate: TOnThreadCreateEvt; OnWorkerThreadCreate: TOnThreadCreateEvt); begin OnReceiveEvt := OnReceive; OnDisconnectEvt := OnDisconnect; OnLinkIdleOvertimeEvt := OnLinkIdleOvertime; OnServerThreadCreateEvt := OnServerThreadCreate; OnWorkerThreadCreateEvt := OnWorkerThreadCreate; end; function PostRecv(Link: PLink; IOMem: Pointer): Boolean; var Flags: DWord; Bytes: DWord; IOInfo: PIOInfo; begin Result := Link^.Socket INVALID_SOCKET; if Result then try Flags := 0; Bytes := 0; IOInfo := PIOInfo(Integer(IOMem) - sizeof(TIOInfo)); with IOInfo^ do begin ZeroMemory(IOInfo, sizeof(TIOInfo)); DataBuf.buf := IOMem; DataBuf.len := IO_MEM_SIZE; Socket := Link^.Socket; Flag := IO_READ; Result := (WSARecv(Socket, @DataBuf, 1, @Bytes, @Flags, @Overlapped, nil) SOCKET_ERROR) or (WSAGetLastError() = ERROR_IO_PENDING); end; except Result := False; WriteLog('PostRecv: error'); end; end; function PostSend(Link: PLink; IOMem: Pointer; Len: Integer): Boolean; var Bytes: DWord; IOInfo: PIOInfo; begin Result := Link^.Socket INVALID_SOCKET; if Result then try Bytes := 0; IOInfo := PIOInfo(Integer(IOMem) - sizeof(TIOInfo)); with IOInfo^ do begin ZeroMemory(IOInfo, sizeof(TIOInfo)); DataBuf.buf := IOMem; DataBuf.len := Len; Socket := Link^.Socket; Flag := IO_WRITE; Result := (WSASend(Socket, @(DataBuf), 1, @Bytes, 0, @(Overlapped), nil) SOCKET_ERROR) or (WSAGetLastError() = ERROR_IO_PENDING); end; except Result := False; WriteLog('PostSend: error'); end; end; procedure PostBroadcast(Buf: PByte; Len: Integer); var IOMem: Pointer; Link: PLink; begin EnterCriticalSection(LinkSec); Link := LinksHead; while Link nil do with Link^ do begin if Socket INVALID_SOCKET then begin IOMem := GetIOMem(); CopyMemory(IOMem, Buf, Len); if not PostSend(Link, IOMem, Len) then FreeIOMem(IOMem); end; Link := Link^.Next; end; LeaveCriticalSection(LinkSec); end; function IsTcpServerActive(): Boolean; begin Result := ListenSocket INVALID_SOCKET; end; {=============================================================================== 日誌管理 ================================================================================} var LogSec: TRTLCriticalSection; Inifile: TIniFile; LogCount: Integer = 0; LogName: String = ''; procedure WriteLog(Log: String); begin EnterCriticalSection(LogSec); try LogCount := LogCount + 1; IniFile.WriteString(LogName, 'Index' + IntToStr(LogCount), DateTimeToStr(Now()) + ':' + Log); finally LeaveCriticalSection(LogSec); end; end; {=============================================================================== 初始化Window Socket ================================================================================} var WSAData: TWSAData; procedure Startup; var ErrorCode: Integer; begin ErrorCode := WSAStartup( {$SK_blogItemTitle$} {$SK_ItemBody$} {$SK_blogDiary$} {$SK_blogItemLink$} {$SK_blogItemComm$} {$SK_blogItemQuote$} {$SK_blogItemVisit$} 01, WSAData); if ErrorCode 0 then WriteLog('Window Socket init Error!'); end; procedure Cleanup; var ErrorCode: Integer; begin ErrorCode := WSACleanup; if ErrorCode 0 then WriteLog('Window Socket cleanup error!'); end; function GetExePath(): String; var ModuleName: array[0..1024] of char; begin GetModuleFileName(MainInstance, ModuleName, SizeOf(ModuleName)); Result := ExtractFilePath(ModuleName); end; initialization LogName := DateTimeToStr(Now()); InitializeCriticalSection(LogSec); ExePath := GetExePath(); IniFile := TIniFile.Create(ExePath + 'Logs.Ini'); Startup(); finalization Cleanup(); DeleteCriticalSection(LogSec); IniFile.Destroy(); end. 主窗口單元源碼: unit uMainTcpServerIOCP; interface uses Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms, Dialogs, ExtCtrls, StdCtrls, ComCtrls, UTcpServer, Sockets, Grids; type TfrmMainUTcpServerIOCP = class(TForm) Label1: TLabel; Label2: TLabel; edtIP: TEdit; edtPort: TEdit; btn: TButton; Timer1: TTimer; Label3: TLabel; lbIO: TLabel; Label5: TLabel; lbIOU: TLabel; Label7: TLabel; lbL: TLabel; Label9: TLabel; lbLU: TLabel; Label11: TLabel; lbLS: TLabel; Label13: TLabel; lbW: TLabel; Info: TStringGrid; Label4: TLabel; lbWC: TLabel; Label8: TLabel; lbWU: TLabel; Label12: TLabel; lbLF: TLabel; Label15: TLabel; lbLFL: TLabel; Label6: TLabel; lbIOF: TLabel; lbIOFL: TLabel; Label16: TLabel; Timer2: TTimer; procedure btnClick(Sender: TObject); procedure FormCreate(Sender: TObject); procedure Timer1Timer(Sender: TObject); procedure FormDestroy(Sender: TObject); procedure Timer2Timer(Sender: TObject); private { Private declarations } FTickCount: DWord; public { Public declarations } end; var frmMainUTcpServerIOCP: TfrmMainUTcpServerIOCP; implementation {$R *.dfm} { TfrmMainUTcpServerIOCP } procedure TfrmMainUTcpServerIOCP.btnClick(Sender: TObject); var i: Integer; C1: Integer; C2: DWord; DT: TDateTime; begin if btn.Caption = 'Open' then begin StartTcpServer(edtIP.Text, StrToInt(edtPort.Text)); if IsTcpServerActive() then begin FTickCount := GetTickCount(); Info.RowCount := GetWorkerCount() + 1; DT := Now(); for i := 1 to Info.RowCount - 1 do begin Info.Cells[0, i] := IntToStr(i); Info.Cells[1, i] := IntToStr(GetWorkerID(i)); C1 := GetWorkerExecInfo(i, C2); Info.Cells[2, i] := IntToStr(C1); Info.Cells[3, i] := '0'; Info.Cells[4, i] := IntToStr(C2); Info.Cells[5, i] := '0'; Info.Cells[6, i] := DateTimeToStr(DT); end; Timer1.Enabled := True; end; end else begin Timer1.Enabled := False; StopTcpServer(); end; if IsTcpServerActive() then btn.Caption := 'Close' else btn.Caption := 'Open'; end; procedure TfrmMainUTcpServerIOCP.FormCreate(Sender: TObject); begin edtIP.Text := GetLocalIP(False); Info.ColCount := 7; Info.RowCount := 2; Info.ColWidths[0] := 30; Info.ColWidths[1] := 30; Info.ColWidths[2] := 40; Info.ColWidths[3] := 40; Info.ColWidths[4] := 30; Info.ColWidths[5] := 40; Info.ColWidths[6] := 110; Info.Cells[0, 0] := '序號'; Info.Cells[1, 0] := 'ID'; Info.Cells[2, 0] := '計數'; Info.Cells[3, 0] := '次/S'; Info.Cells[4, 0] := '時長'; Info.Cells[5, 0] := '使用率'; Info.Cells[6, 0] := '時間'; end; procedure TfrmMainUTcpServerIOCP.Timer1Timer(Sender: TObject); var i: Integer; Count1, Count2, Count3, TC, TCC: DWord; begin if not IsTcpServerActive() then begin Timer1.Enabled := False; exit; end; TC := GetTickCount(); TCC := TC - FTickCount; if TCC = 0 then TCC := $FFFFFFFF; lbWC.Caption := IntToStr(GetServerExecCount()); lbWU.Caption := FloatToStrF(GetServerExecLong() / TCC * 100, ffFixed, 10, 3) + '%'; for i := 1 to Info.RowCount - 1 do begin Count1 := GetWorkerExecInfo(i, Count2); TC := GetTickCount(); TCC := TC - FTickCount; if TCC = 0 then TCC := $FFFFFFFF; Count3 := StrToInt(Info.Cells[2, i]); if Count1 Count3 then begin Info.Cells[2, i] := IntToStr(Count1); Info.Cells[3, i] := IntToStr(Count1 - Count3); Info.Cells[4, i] := IntToStr(Count2); Info.Cells[5, i] := FloatToStrF(Count2 / TCC * 100, ffFixed, 10, 1) + '%'; Info.Cells[6, i] := DateTimeToStr(Now()); end; end; FTickCount := TC; lbIO.Caption := IntToStr(GetIOMemSize()); lbIOU.Caption := FloatToStrF(GetIOMemUse(), ffFixed, 10, 3) + '%'; Count1 := GetIOMemFree(); lbIOF.Caption := IntToStr(Count1); lbIOFL.Caption := FloatToStrF(Count1 / IO_MEM_MAX_COUNT * 100, ffFixed, 10, 3) + '%'; lbW.Caption := IntToStr(GetWorkerCount()); lbL.Caption := IntToStr(GetLinkSize()); Count1 := GetLinkFree(); lbLF.Caption := IntToStr(Count1); lbLFL.Caption := FloatToStrF(Count1 / SOCK_MAX_COUNT * 100, ffFixed, 10, 3) + '%'; lbLU.Caption := FloatToStrF(GetLinkUse(), ffFixed, 10, 3) + '%'; lbLS.Caption := IntToStr(GetLinkCount()); end; procedure TfrmMainUTcpServerIOCP.FormDestroy(Sender: TObject); begin StopTcpServer(); end; procedure TfrmMainUTcpServerIOCP.Timer2Timer(Sender: TObject); begin if not IsTcpServerActive() then begin Timer1.Enabled := False; exit; end; PostBroadcast(PByte(PChar('這是來自服務器的數據!')), 21); end; end.