Windows 非阻塞或異步 socket

異步與非阻塞區別見個人另一篇文章Socket 同步/異步與阻塞/非阻塞區別ios

  1. select編程

  2. WSAAsyncSelect數組

  3. WSAEventSelect服務器

  4. 重疊(Overlapped)I/O網絡

  5. IOCP:完成端口數據結構

Select

首先要使用ioctlsocket設置爲非阻塞模式。併發

而後啓動線程,線程中不停select。app

WSAAsyncSelect

WSAAsyncSelect模型是Windows下最簡單易用的一種Socket I/O模型。使用這種模型時,Windows會把網絡事件以消息的形勢通知應用程序。此模型提供了讀寫數據能力的異步通知,但不提供異步數據傳送。須要在消息響應函數裏send(通常爲resend)和receive。因爲該模型基於Windows消息機制,必須在應用程序中建立窗口。雖然能夠在開發中,肯定是否顯示該窗口。 異步

WSAEventSelect

一般與WSACreateEvent、WSAResetEvent、WSACloseEvent、WSAWaitForMultileEvents和WSAEnumNetworkEvents一塊兒使用,無需建立窗口。WSAWaitForMultileEvents檢查是否有Event,WSAEnumNetworkEvents枚舉事件類型,FD_READ、FD_WRITE等。socket

函數最多能夠支持WSA_MAXIMUM_WAIT_EVENTS(64)個對象.該函數會等待網絡事件的發生,若是過了指定了時間(dwTimeOut)則返回WSA_WAIT_TIMEOUT,若是在規定的時間內有事件發生,則返回該事件對象的索引(注意:在程序中要想獲得發生的事件的真正索引需得用返回值減去WSA_WAIT_EVENT_0),調用失敗返回WSA_WAIT_FAILED.若是將參數fWaitAll設置成false若是有多個網絡事件發生該函數也只返回一個事件對象索引,而且該事件是在事件句柄數組中最前面的一個.解決方法是循環調用該函數處理後面的受信事件. 

重疊(Overlapped)I/O

它和以前模型不一樣的是,使用重疊模型的應用程序通知緩衝區收發系統直接使用數據,也就是說,若是應用程序投遞了一個10KB大小的緩衝區來接收數據,且數據已經到達套接字,則該數據將直接被拷貝到投遞的緩衝區。以前的模型都是在套接字的緩衝區中,當通知應用程序接收後,在把數據拷貝到程序的緩衝區。

——摘自http://zhoumf1214.blog.163.com/blog/static/5241940201211705318496/

以receive爲例,以前的模型,須要本身從套接字的緩衝區拷貝至程序緩衝區,而重疊IO則是操做系統直接將數據拷貝至程序緩衝區。

重疊模型的核心是一個重疊數據結構。若想以重疊方式使用文件,必須用 FILE_FLAG_OVERLAPPED標誌打開它。

有2種方式實現:

1. 事件

先WaitForSingleObject/WaitForMultipleObjects或WSAWaitForMultipleEvents函數 ,而後調用(WSA)GetOverlappedResult()函數,最後,使用指針偏移定位就能夠準確操做接受到的數據了。

與其餘事件相似,最大個數爲64.

2. 完成例程(非完成端口)

ReadFileEx(),傳遞迴調函數指針。

完成端口

所謂「完成端口",實際是Win32 、Windows NT以及Windows 2000採用的一種I / O構造機制,除套接字句柄以外,實際上還可接受其餘東西(重疊IO好像也能夠)。

使用這種模型以前,首先要建立一個 I / O 完成端口對象(CreateIoCompletionPort),

NumberOfConcurrentThread參數的特殊之處在於,它定義了在一個完成端口上,同時容許執行的線程數量。理想狀況下,咱們但願每一個處理器各自負責一個線程的運行,爲完成端口提供服務,避免過於頻繁的線程「場景」切換。若將該參數設爲 0,代表系統內安裝了多少個處理器,便容許同時運行多少個線程!

1)  建立一個完成端口。第四個參數保持爲 0,指定在完成端口上,每一個處理器一次只容許執行一個工做者線程。

2)  判斷系統內到底安裝了多少個處理器。

3)  建立工做者線程,根據步驟 2 )獲得的處理器信息,在完成端口上,爲已完成的 I / O請求提供服務。在這個簡單的例子中,咱們爲每一個處理器都只建立一個工做者線程。這是因爲事先已預計到,到時不會有任何線程進入「掛起」狀態,形成因爲線程數量的不足,而使處理器空閒的局面(沒有足夠的線程可供執行) 。調用C r e a t e T h r e a d函數時,必須同時提供一個工做者例程,由線程在建立好執行。本節稍後還會詳細討論線程的職責。

4)  準備好一個監聽套接字,在端口 5 1 5 0上監聽進入的鏈接請求。

5) 使用a c c e p t函數,接受進入的鏈接請求。

6)  建立一個數據結構,用於容納「單句柄數據」 ,同時在結構中存入接受的套接字句柄。

7)  調用C r e a t e I o C o m p l e t i o n P o r t,將自a c c e p t返回的新套接字句柄同完成端口關聯到一塊兒。經過完成鍵(C o m p l e t i o n K e y)參數,將單句柄數據結構傳遞給 C r e a t e I o C o m p l e t i o n P o r t。

8)  開始在已接受的鏈接上進行 I / O 操做。在此,咱們但願經過重疊 I / O機制,在新建的套接字上投遞一個或多個異步 W S A R e c v或W S A S e n d請求。這些 I / O 請求完成後,一個工做者線程會爲I / O請求提供服務,同時繼續處理將來的 I / O 請求,稍後便會在步驟 3 )指定的工做者例程中,體驗到這一點。

9)  重複步驟5 ) ~ 8 ),直至服務器停止。

// IOCP_TCPIP_Socket_Server.cpp

#include <WinSock2.h>
#include <Windows.h>
#include <vector>
#include <iostream>

using namespace std;

#pragma comment(lib, "Ws2_32.lib")		// Socket編程需用的動態連接庫
#pragma comment(lib, "Kernel32.lib")	// IOCP須要用到的動態連接庫

/**
 * 結構體名稱:PER_IO_DATA
 * 結構體功能:重疊I/O須要用到的結構體,臨時記錄IO數據
 **/
const int DataBuffSize  = 2 * 1024;
typedef struct
{
	OVERLAPPED overlapped;
	WSABUF databuff;
	char buffer[ DataBuffSize ];
	int BufferLen;
	int operationType;
}PER_IO_OPERATEION_DATA, *LPPER_IO_OPERATION_DATA, *LPPER_IO_DATA, PER_IO_DATA;

/**
 * 結構體名稱:PER_HANDLE_DATA
 * 結構體存儲:記錄單個套接字的數據,包括了套接字的變量及套接字的對應的客戶端的地址。
 * 結構體做用:當服務器鏈接上客戶端時,信息存儲到該結構體中,知道客戶端的地址以便於回訪。
 **/
typedef struct
{
	SOCKET socket;
	SOCKADDR_STORAGE ClientAddr;
}PER_HANDLE_DATA, *LPPER_HANDLE_DATA;

// 定義全局變量
const int DefaultPort = 6000;		
vector < PER_HANDLE_DATA* > clientGroup;		// 記錄客戶端的向量組

HANDLE hMutex = CreateMutex(NULL, FALSE, NULL);
DWORD WINAPI ServerWorkThread(LPVOID CompletionPortID);
DWORD WINAPI ServerSendThread(LPVOID IpParam);

// 開始主函數
int main()
{
// 加載socket動態連接庫
	WORD wVersionRequested = MAKEWORD(2, 2); // 請求2.2版本的WinSock庫
	WSADATA wsaData;	// 接收Windows Socket的結構信息
	DWORD err = WSAStartup(wVersionRequested, &wsaData);

	if (0 != err){	// 檢查套接字庫是否申請成功
		cerr << "Request Windows Socket Library Error!\n";
		system("pause");
		return -1;
	}
	if(LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2){// 檢查是否申請了所需版本的套接字庫
		WSACleanup();
		cerr << "Request Windows Socket Version 2.2 Error!\n";
		system("pause");
		return -1;
	}

// 建立IOCP的內核對象
	/**
	 * 須要用到的函數的原型:
	 * HANDLE WINAPI CreateIoCompletionPort(
     *    __in   HANDLE FileHandle,		// 已經打開的文件句柄或者空句柄,通常是客戶端的句柄
     *    __in   HANDLE ExistingCompletionPort,	// 已經存在的IOCP句柄
     *    __in   ULONG_PTR CompletionKey,	// 完成鍵,包含了指定I/O完成包的指定文件
     *    __in   DWORD NumberOfConcurrentThreads // 真正併發同時執行最大線程數,通常推介是CPU核心數*2
     * );
	 **/
	HANDLE completionPort = CreateIoCompletionPort( INVALID_HANDLE_VALUE, NULL, 0, 0);
	if (NULL == completionPort){	// 建立IO內核對象失敗
		cerr << "CreateIoCompletionPort failed. Error:" << GetLastError() << endl;
		system("pause");
		return -1;
	}

// 建立IOCP線程--線程裏面建立線程池

	// 肯定處理器的核心數量
	SYSTEM_INFO mySysInfo;
	GetSystemInfo(&mySysInfo);

	// 基於處理器的核心數量建立線程
	for(DWORD i = 0; i < (mySysInfo.dwNumberOfProcessors * 2); ++i){
		// 建立服務器工做器線程,並將完成端口傳遞到該線程
		HANDLE ThreadHandle = CreateThread(NULL, 0, ServerWorkThread, completionPort, 0, NULL);
		if(NULL == ThreadHandle){
			cerr << "Create Thread Handle failed. Error:" << GetLastError() << endl;
		system("pause");
			return -1;
		}
		CloseHandle(ThreadHandle);
	}

// 創建流式套接字
	SOCKET srvSocket = socket(AF_INET, SOCK_STREAM, 0);

// 綁定SOCKET到本機
	SOCKADDR_IN srvAddr;
	srvAddr.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
	srvAddr.sin_family = AF_INET;
	srvAddr.sin_port = htons(DefaultPort);
	int bindResult = bind(srvSocket, (SOCKADDR*)&srvAddr, sizeof(SOCKADDR));
	if(SOCKET_ERROR == bindResult){
		cerr << "Bind failed. Error:" << GetLastError() << endl;
		system("pause");
		return -1;
	}

// 將SOCKET設置爲監聽模式
	int listenResult = listen(srvSocket, 10);
	if(SOCKET_ERROR == listenResult){
		cerr << "Listen failed. Error: " << GetLastError() << endl;
		system("pause");
		return -1;
	}
	
// 開始處理IO數據
	cout << "本服務器已準備就緒,正在等待客戶端的接入...\n";

	// 建立用於發送數據的線程
	HANDLE sendThread = CreateThread(NULL, 0, ServerSendThread, 0, 0, NULL);

	while(true){
		PER_HANDLE_DATA * PerHandleData = NULL;
		SOCKADDR_IN saRemote;
		int RemoteLen;
		SOCKET acceptSocket;

		// 接收鏈接,並分配完成端,這兒能夠用AcceptEx()
		RemoteLen = sizeof(saRemote);
		acceptSocket = accept(srvSocket, (SOCKADDR*)&saRemote, &RemoteLen);
		if(SOCKET_ERROR == acceptSocket){	// 接收客戶端失敗
			cerr << "Accept Socket Error: " << GetLastError() << endl;
			system("pause");
			return -1;
		}
		
		// 建立用來和套接字關聯的單句柄數據信息結構
		PerHandleData = (LPPER_HANDLE_DATA)GlobalAlloc(GPTR, sizeof(PER_HANDLE_DATA));	// 在堆中爲這個PerHandleData申請指定大小的內存
		PerHandleData -> socket = acceptSocket;
		memcpy (&PerHandleData -> ClientAddr, &saRemote, RemoteLen);
		clientGroup.push_back(PerHandleData);		// 將單個客戶端數據指針放到客戶端組中

		// 將接受套接字和完成端口關聯
		CreateIoCompletionPort((HANDLE)(PerHandleData -> socket), completionPort, (DWORD)PerHandleData, 0);

		
		// 開始在接受套接字上處理I/O使用重疊I/O機制
		// 在新建的套接字上投遞一個或多個異步
		// WSARecv或WSASend請求,這些I/O請求完成後,工做者線程會爲I/O請求提供服務	
		// 單I/O操做數據(I/O重疊)
		LPPER_IO_OPERATION_DATA PerIoData = NULL;
		PerIoData = (LPPER_IO_OPERATION_DATA)GlobalAlloc(GPTR, sizeof(PER_IO_OPERATEION_DATA));
		ZeroMemory(&(PerIoData -> overlapped), sizeof(OVERLAPPED));
		PerIoData->databuff.len = 1024;
		PerIoData->databuff.buf = PerIoData->buffer;
		PerIoData->operationType = 0;	// read

		DWORD RecvBytes;
		DWORD Flags = 0;
		WSARecv(PerHandleData->socket, &(PerIoData->databuff), 1, &RecvBytes, &Flags, &(PerIoData->overlapped), NULL);
	}

	system("pause");
	return 0;
}

// 開始服務工做線程函數
DWORD WINAPI ServerWorkThread(LPVOID IpParam)
{
	HANDLE CompletionPort = (HANDLE)IpParam;
	DWORD BytesTransferred;
	LPOVERLAPPED IpOverlapped;
	LPPER_HANDLE_DATA PerHandleData = NULL;
	LPPER_IO_DATA PerIoData = NULL;
	DWORD RecvBytes;
	DWORD Flags = 0;
	BOOL bRet = false;

	while(true){
		bRet = GetQueuedCompletionStatus(CompletionPort, &BytesTransferred, (PULONG_PTR)&PerHandleData, (LPOVERLAPPED*)&IpOverlapped, INFINITE);
		if(bRet == 0){
			cerr << "GetQueuedCompletionStatus Error: " << GetLastError() << endl;
			return -1;
		}
		PerIoData = (LPPER_IO_DATA)CONTAINING_RECORD(IpOverlapped, PER_IO_DATA, overlapped);
		
		// 檢查在套接字上是否有錯誤發生
		if(0 == BytesTransferred){
			closesocket(PerHandleData->socket);
			GlobalFree(PerHandleData);
			GlobalFree(PerIoData);
			continue;
		}
		
		// 開始數據處理,接收來自客戶端的數據
		WaitForSingleObject(hMutex,INFINITE);
		cout << "A Client says: " << PerIoData->databuff.buf << endl;
		ReleaseMutex(hMutex);

		// 爲下一個重疊調用創建單I/O操做數據
		ZeroMemory(&(PerIoData->overlapped), sizeof(OVERLAPPED)); // 清空內存
		PerIoData->databuff.len = 1024;
		PerIoData->databuff.buf = PerIoData->buffer;
		PerIoData->operationType = 0;	// read
		WSARecv(PerHandleData->socket, &(PerIoData->databuff), 1, &RecvBytes, &Flags, &(PerIoData->overlapped), NULL);
	}

	return 0;
}


// 發送信息的線程執行函數
DWORD WINAPI ServerSendThread(LPVOID IpParam)
{
	while(1){
		char talk[200];
		gets(talk);
		int len;
		for (len = 0; talk[len] != '\0'; ++len){
			// 找出這個字符組的長度
		}
		talk[len] = '\n';
		talk[++len] = '\0';
		printf("I Say:");
		cout << talk;
		WaitForSingleObject(hMutex,INFINITE);
		for(int i = 0; i < clientGroup.size(); ++i){
			send(clientGroup[i]->socket, talk, 200, 0);	// 發送信息
		}
		ReleaseMutex(hMutex); 
	}
	return 0;
}
相關文章
相關標籤/搜索