前言
IOCP 全称 Input/Ouput Completion Ports,中文中翻译一般为“完成端口”,本文中我们使用 IOCP 简写.
IOCP 模型是迄今为止最为复杂的一种 I/O 模型,但是同时通过使用 IOCP 我们往往可以达到最佳的系统性能. 当你的网络应用程序需要管理大量的 Socket I/O 请求时,你或许没有其他的选择.
本篇文章,我们将通过一个官方的 IOCP demo 程序来介绍如何使用 IOCP. 因为其复杂性,这篇文章中我们主要介绍如何使用,不深入内部的实现,更多的详细信息,请参考官方文档.
官方程序的地址:
https://github.com/microsoft/Windows-classic-samples/tree/master/Samples/Win7Samples/netds/winsock/iocp/serverex
个人感觉官方的 demo 代码不太好看(包括格式,和一些额外琐碎的可省略的细节),因此,文末我会附上自己精简过的代码,以便读者阅读. 读者按需自取.
API 基础
关于我们将要使用的数据结构:
- OVERLAPPED 结构体
- WSAEvent
- CriticalSection
- CreateThread
等相关知识,在 WinSocket I/O 模型的相关文章 WinSock I/O 模型 – OVERLAPPED I/O 模型 中均已介绍过,这里不在赘述.
CreateIoCompletionPort
CreateIoCompletionPort 方法用于创建一个 IOCP handle 或者将现有的 Socket handle 与已经创建的 IOCP 关联起来.
HANDLE WINAPI CreateIoCompletionPort(_In_ HANDLE FileHandle,_In_opt_ HANDLE ExistingCompletionPort,_In_ ULONG_PTR CompletionKey,_In_ DWORD NumberOfConcurrentThreads
);
- FileHandle: 指定与 ExistingCompletionPort 关联的文件 handle(注意不仅仅是 socket handle)。 这个 fileHandle 必须支持 overlapped I/O。 对于 Socket handle 来说,该 socket 在创建时需要指定 WSA_FLAG_OVERLAPPED 标志. 当我们想要使用这个 API 来创建一个新的 IOCP handle 时, 将这个参数设置为 INVALID_HANDLE_VALUE.
- ExistingCompletionPort: NULL 或者一个已经使用 CreateIoCompletionPort 创建出来的 IOCP 实例. 当我们想要使用这个 API 来创建一个新的 IOCP handle 时, 将这个参数设置为 NULL. 此时,该方法的返回值是新创建出来的 IOCP 实例。 当我们想要将一个 IOCP 实例与一个 FileHandle 关联以来的时候,将这个参数设置为当前已经存在的 IOCP 实例. 此时当此次方法调用成功的时候,该方法返回是 ExistingCompletionPort.
- CompletionKey: 此参数用于指定一个与当前 FileHandle 关联的数据结构,这个结构将包含在每个 I/O completion packet (后面我们会解释该packet) 中.
- NumberOfConcurrentThreads: 指定最大允许的线程数, 这些线程用于处理 I/O completion packet。 这个参数仅仅在创建新的 IOCP 实例时有用,其他情况下会被忽略. 当指定为 0, 系统将使用等同与当前系统 CPU 处理器数量的线程.
GetQueuedCompletionStatus
GetQueuedCompletionStatus 方法用于从指定的 IOCP 实例上获取 I/O completion packet.
I/O completion packet:通缩来讲,当我们创建一个 IOCP实例之后,系统内部会给对应的 IOCP 实例分配一个队列,这个队列用户保存所有与当前 IOCP 关联起来的 FileHandle 上已经完成的异步任务的信息。我们将这样的保存这个队列中的已完成的异步任务的信息称作 I/O completion packet.
使用这个 API 可以从该队列中取出这些 I/O completion packet. 注意这是一个队列, 意味着即使有多个线程同时从一个 IOCP 实例上获取 I/O completion packet 时,他们也不会获取到相同的 I/O completion packet,
还有一个更高级的方法: GetQueuedCompletionStatusEx,这里我们没有使用它,暂且不提.
BOOL GetQueuedCompletionStatus(HANDLE CompletionPort,LPDWORD lpNumberOfBytesTransferred,PULONG_PTR lpCompletionKey,LPOVERLAPPED *lpOverlapped,DWORD dwMilliseconds
);
- CompletionPort: IOCP 实例
- lpNumberOfBytesTransferred:当前已完成的异步任务成功传输的字节数. 如果当前异步任务是一个发送操作,这个这个参数返回成功发送的字节数。读操作同理.
- lpCompletionKey: 我们在将一个 FileHandle 和 IOCP实例关联起来时指定了一个 lpCompletionKey,在这个 FileHandle 上有任务完成,我们通过GetQueuedCompletionStatus 获取到该任务完成的 I/O completion packet 时,这个参数便等于我们指定的那个 lpCompletionKey.
- lpOverlapped: 提交异步任务给 IOCP 实例时所指定的 OVERLAPPED 结构体. 我们之前说过,OVERLAPPED 数据结构就像是一个异步任务的id,我们在开始一个异步任务的时候需要指定一个 OVERLAPPED结构体,当这个异步任务完成时,操作系统便可以通过返回这个 OVERLAPPED 结构体给我们,这样我们便能得知是我们提交的哪个异步任务完成了. 关于这个数据结构的使用,还有一些技巧,我们后边再解释.
- dwMilliseconds: 指定一个超时时间,在指定时间内没有获取到任何 I/O completion packet,该方法将会返回, 此时该方法返回 FALSE. 实例中,我们将使用 INFINITE 来让这个方法一直阻塞,直到有至少一个任务完成.
返回值:
当该方法成功的获取到一个 I/O completion packet 时,该方法会返回 TRUE。 此时,lpNumberOfBytes,lpOverlapped, lpCompletionKey 会被填充上与当前 I/O completion packet 对应的数据结构.
当该方法调用失败时,该方法会返回 FALSE。此时 lpNumberOfBytes,lpOverlapped, lpCompletionKey 的可能返回值如下:
- lpOverlapped 返回参数是 NULL, 代表我们没有从 IOCP 实例上获取到任何异步任务的完成信息. lpNumberOfBytes, lpCompletionKey 也不包含任何有效信息.
- lpOverlapped 返回参数不为 NULL, 代表我们从 IOCP 实例上获取到了异步任务的信息. 这种情况下,该异步任务发生了错误, lpNumberOfBytes,lpOverlapped, lpCompletionKey 返回参数上保存这个失败的任务的信息。 详细的错误信息需要使用 GetLastError.来获取.
当该方法返回 FALSE,且 lpOverlapped 是 NULL, GetLastError 返回 ERROR_ABANDONED_WAIT_0, 代表当前 IOCP 实例被关闭.
HasOverlappedIoCompleted
HasOverlappedIoCompleted 是一个宏,这个宏用来查询在当前 IOCP 实例上是否有正在执行的异步任务.
void HasOverlappedIoCompleted(lpOverlapped
);
lpOverlapped 返回参数表示当前处于 Pending 状态的异步任务所关联的 OVERLAPPED 结构体.
如果你的异步任务不处于 ERROR_IO_PENDING, 在这种情况下,不要使用该宏
我们已经直到如何创建一个 IOCP 实例,以及如何得到异步任务完成的通知,我们接下来看看如何提交一个异步任务。
注意,我们将只关注这些 API 与 IOCP 搭配使用,不再提及他们支持的其他操作.
AcceptEx
AcceptEx 方法用来接收新连接.
BOOL AcceptEx(SOCKET sListenSocket,SOCKET sAcceptSocket,PVOID lpOutputBuffer,DWORD dwReceiveDataLength,DWORD dwLocalAddressLength,DWORD dwRemoteAddressLength,LPDWORD lpdwBytesReceived,LPOVERLAPPED lpOverlapped
);
- sAcceptSocket: 不同与 accept 方法,因为我们异步的接收新连接,因此,在调用此方法之前,我们需要创建一个 Socket Handle 来保存新接收到的 Socket 实例.
- lpOutputBuffer: 该方法支持在接收连接的同时,解析该新socket的本地和远程地址,同时接收一块数据。接收到的数据会从该buffer 的开始位置,地址相关的数据紧跟这个接收到的数据.
- dwReceiveDataLength: 用于指定我们用来期待接收到的第一块儿数据的长度. 当该参数为 0 时,意味着我们不接收数据,只接收新的连接. 此时, lpOutputBuffer 仅仅用来保存本地和远程地址。
- dwLocalAddressLength, dwRemoteAddressLength: 指定需要为保存本地/远程地址应该在 lpOutputBuffer 中保留的地址。 该参数至少为 16,不能为 0.
- lpdwBytesReceived: 返回我们接收到的第一块儿数据的长度. 这个参数仅仅在 AcceptEx 方法立马成功的情况下有效,如果当前接收操作返回 ERROR_IO_PENDING 错误,该返回值无效.
- lpOverlapped:指定与当前异步接收操作关联的 OVERLAPPED 结构体.
返回值:
- 当该方法调用立马成功时,该方法返回 TRUE.
- 当该方法没有立马成功时,该方法返回 FALSE。 此时应该使用 WSAGetLastError 获取具体的错误信息. 如果 WSAGetLastError 返回 ERROR_IO_PENDING,代表该接收任务已经提交成功,当前正在进行中.
值得一提的是: 官方文档中明确表明,该方法的性能远远高于 accept 方法。
WSARecv
WSARecv 用于从一个处于连接状态的 Socket 上接收数据.
int WSAAPI WSARecv(SOCKET s,LPWSABUF lpBuffers,DWORD dwBufferCount,LPDWORD lpNumberOfBytesRecvd,LPDWORD lpFlags,LPWSAOVERLAPPED lpOverlapped,LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
);
这里的 lpOverlapped 参数同 AcceptEx 方法中的 lpOverlapped 参数.
dwBuffers 用于指定一个用于保存接收到的数据的 buffer的数组。 dwBufferCount 指定 buffer 数组中的 buffer 数量。
lpNumberOfBytesRecvd:如果当前读操作立马完成,这个参数用于保存接收到的数据长度. 如果当前任务没有立即完成,而是处于 pending状态,那个这个参数的值无效.
lpCompletionRoutine: 本例中,我们不适用这个参数,因此指定为空。 我们使用 GetQueuedCompletionStatus 方法来异步的获取该接收任务完成的通知.
WSASend
WSASend 用于从一个处于连接状态的 Socket 上发送数据.
int WSAAPI WSASend(SOCKET s,LPWSABUF lpBuffers,DWORD dwBufferCount,LPDWORD lpNumberOfBytesSent,DWORD dwFlags,LPWSAOVERLAPPED lpOverlapped,LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
);
这个方法几乎和 WSARecv 相同,不再赘述。
实现思路
- 创建一个 socket 作为监听 socket
- 创建 IOCP 实例,并将 server socket 和 IOCP 实例关联起来
- 使用 AcceptEx 提交异步 accept 任务。
- 创建多个子线程, 在子线程中使用 GetQueuedCompletionStatus 阻塞的等待异步任务完成的通知(I/O completion packet)。并处理该通知。
- 主线程一直阻塞,直到服务器退出, IOCP 实例关闭.
这个流程说起来是非常简单,但是简单的流程中隐藏了极多的细节,这里我们来详细描述一下我们这个 IOCP服务器的实现思路:
- 首先,在我们创建了 server socket 之后,我们紧接着就需要创建对应的 IOCP实例(使用 CreateIoCompletePort)。同时将 server socket 与 IOCP 实例关联起来(使用 CreateIoCompletePort)。 在关联当前 server socket 实例的同时,我们需要指定一个 lpCompletionKey。我们需要在这个 lpCompletionKey 结构中存入足够多的信息,以便我们在收到该 server socket 上异步任务完成通知时,做出相应操作时有足够的信息.
这里,我们看看实例代码中作为 lpCompletionKey 的结构是什么样子的:
typedef struct _PER_SOCKET_CONTEXT {SOCKET Socket;LPFN_ACCEPTEX fnAcceptEx;PPER_IO_CONTEXT pIOContext; struct _PER_SOCKET_CONTEXT *pCtxtBack; struct _PER_SOCKET_CONTEXT *pCtxtForward;
} PER_SOCKET_CONTEXT, *PPER_SOCKET_CONTEXT;
Socket 字段: 当一个任务完成时,我们需要直到是哪个 socket 上的任务完成了,而 GetQueuedCompletionStatus 的返回值中并没有这个信息,因此我们需要自己保存。
fnAcceptEx: 这个字段的存在是因为 AcceptEx 方法的特殊性决定的。 我们无法直接调用 AcceptEx 方法,而是需要先通过 WSAIoctl 搭配 SIO_GET_EXTENSION_FUNCTION_POINTER 这个参数来动态的获取该方法的指针。 并且该方法指针是和对应的 Server socket 绑定的,也就是如果你有多个 server socket,那么这个函数指针也会有多个。 因此,这个字段不得不存储起来
pIOContext:这个字段用于保存在当前 socket 上执行异步任务需要使用的 Overlapped 结构体的数据。 (接下来,我们会更加详细来说这个结构)
pCtxBack 和 pCtxForward:这个真的不是必须的,如果你使用其他方式维护多个 _PER_SOCKET_CONTEXT 数据结构,那个两个字段完全不需要.
-
在将 Server socket 和 IOCP 绑定之后,我们需要启用其他线程使用 GetQueuedCompletionStatus 来处理完成的异步任务。这里需要斟酌的点是? 我们需要使用几个线程,这些线程是应该的阻塞的等待还是使用 timeout 来一轮询的方式等待,这需要读者自己好好斟酌。
-
将 server socket 和 IOCP 实例关联起来之后, 处理任务完成通知的线程也有了,我们如何让 server socket 开始接收新的连接呢 ?使用 accept ? 不,这里我们不是用它,它是阻塞的方式,这里我们用 AcceptEx 来异步的接收新连接。 那么我们如何做呢?
要使用 AcceptEx,非常重要的一点是,我们得先有个 Overlapped 结构体. 直接创建一个 Overlapped 结构体实例使用好不好? 也不能说不好,但是就目前看到的 IOCP 实现中,没有人这样玩儿(本人看过两个 IOCP 的实现,不包括微软的官方demo,报错 libuv)。
目前,他们使用的方法都是将 Overlapped 数据结构包进另外一个结构体。 demo 中的结构体如下:
typedef struct _PER_IO_CONTEXT {WSAOVERLAPPED Overlapped;char Buffer[MAX_BUFF_SIZE];WSABUF wsabuf;int nTotalBytes;int nSentBytes;IO_OPERATION IOOperation;SOCKET SocketAccept; struct _PER_IO_CONTEXT *pIOContextForward;
} PER_IO_CONTEXT, *PPER_IO_CONTEXT;
注意,这个 _PER_IO_CONTEXT 包含在 _PER_SOCKET_CONTEXT(也就是我们 lpCompletionKey) 这个结构体中。
Overlapped: 这个字段自然是必须存在的.
IOOperation: 指明我们当前异步任务的类型,它的类型 IO_OPERATION: accept, send, read
SocketAccept: 如果我们当前异步任务是一个 accept 任务,那个这个字段用来存储我们新接收到的 socket 实例
wsaBuf: 这个字段是我们提交读或者写任务是需要传给 WSARecv 或 WSARead 的一个数据结构。
Buffer 是我们真正用来存储数据的地方。 WSABuf 这个结构中只包含一个 buffer 的指针,和这个buffer 的长度。这个 demo 中这样设计,那么毫无疑问, WSABuf 中的 buffer 指针必然指向 Buffer。 发送或接收到的数据都需要存在这儿
nTotalbytes, nSentBytes 用来存储要发送或者接收到的数据长度
pIOcontextForward: 这个字段存在的是因为: 我们将一个 Socket 与 _PER_SOCKET_CONTEXT 关联,而一个 _PER_SOCKET_CONTEXT 中仅仅包含一个 _PER_IO_CONTEXT(也就是 Overlapped 结构),那么如何应对在一个socket 上进行多个异步任务的场景呢? 此时就需要多个 _PER_IO_CONTEXT 实例了,此时这个链表就发挥作用了。
这里唯一值得注意的是: Overlapped结构体放在 _PER_IO_CONTEXT 第一个字段,它的好处是,在我们使用 GetQueuedCompletionStatus 获取到当前完成的异步任务的 lpOverlapped 参数时,我们可以直接将该指针强转为 _PER_IO_CONTEXT, 这样我们便能直到当前具体的 I/O 操作是什么。 而 _PER_SOCKET_CONTEXT 这个结构会作为 lpCompletionKey 被GetQueuedCompletionStatus 返回,此时我们便有了当前 Socket 所有的上下文.
这种设计下, 一个 _PER_IO_CONTEXT 结构便 对应一个异步任务,如果一个 socket 有多个异步任务,那么便需要有多个 _PER_IO_CONTEXT 结构.
至于这个 demo 中,对于这个结构体的设计,在实际使用中,有很多需要斟酌的地方。
到了这里,我们使用 WSARecv 和 WSASend 也就不难了。
实例
代码较多,细细品味
// THIS CODE AND INFORMATION IS PROVIDED "AS IS" WITHOUT WARRANTY OF
// ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO
// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A
// PARTICULAR PURPOSE.
//
// Copyright (C) Microsoft Corporation. All Rights Reserved.
//#pragma warning (disable:4127)
#pragma comment(lib,"ws2_32.lib")#include <winsock2.h>
#include <mswsock.h>
#include <Ws2tcpip.h>
#include <stdio.h>
#include <stdlib.h>
#include <strsafe.h>#define DEFAULT_PORT "5001"
#define MAX_BUFF_SIZE 8192
#define MAX_WORKER_THREAD 16#define xmalloc(s) HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, (s))
#define xfree(p) HeapFree(GetProcessHeap(), 0, (p))typedef enum _IO_OPERATION {ClientIoAccept,ClientIoRead,ClientIoWrite
} IO_OPERATION, *PIO_OPERATION;typedef struct _PER_IO_CONTEXT {WSAOVERLAPPED Overlapped;char Buffer[MAX_BUFF_SIZE];WSABUF wsabuf;int nTotalBytes;int nSentBytes;IO_OPERATION IOOperation;SOCKET SocketAccept; struct _PER_IO_CONTEXT *pIOContextForward;
} PER_IO_CONTEXT, *PPER_IO_CONTEXT;// 作为 lpCompletionKey 使用
// 每个 socket 对应一个 _PER_SOCKET_CONTEXT 结构
// 该 socket 上的异步任务信息存储在 pIoContext 中,该结构中是一个链表,因此 pIoContext 应当被当作一个动态数组来看待
typedef struct _PER_SOCKET_CONTEXT {SOCKET Socket;LPFN_ACCEPTEX fnAcceptEx;PPER_IO_CONTEXT pIOContext; struct _PER_SOCKET_CONTEXT *pCtxtBack; struct _PER_SOCKET_CONTEXT *pCtxtForward;
} PER_SOCKET_CONTEXT, *PPER_SOCKET_CONTEXT;BOOL CreateListenSocket(void);
BOOL CreateAcceptSocket(BOOL fUpdateIOCP);
DWORD WINAPI WorkerThread(LPVOID WorkContext);PPER_SOCKET_CONTEXT UpdateCompletionPort(SOCKET s, IO_OPERATION ClientIo, BOOL bAddToList);PPER_SOCKET_CONTEXT CtxtAllocate(SOCKET s, IO_OPERATION ClientIO);
VOID CloseClient(PPER_SOCKET_CONTEXT lpPerSocketContext, BOOL bGraceful);
VOID CtxtListFree();
VOID CtxtListAddTo(PPER_SOCKET_CONTEXT lpPerSocketContext);
VOID CtxtListDeleteFrom(PPER_SOCKET_CONTEXT lpPerSocketContext);BOOL g_bEndServer = FALSE;
BOOL g_bRestart = TRUE;
HANDLE g_hIOCP = INVALID_HANDLE_VALUE;
SOCKET g_sdListen = INVALID_SOCKET;
HANDLE g_ThreadHandles[MAX_WORKER_THREAD];
WSAEVENT g_hCleanupEvent[1];
PPER_SOCKET_CONTEXT g_pCtxtListenSocket = NULL;
PPER_SOCKET_CONTEXT g_pCtxtList = NULL;
CRITICAL_SECTION g_CriticalSection;int myprintf(const char *lpFormat, ...);void main() {SYSTEM_INFO systemInfo;WSADATA wsaData;DWORD dwThreadCount = 0;int nRet = 0;HANDLE hThread;DWORD dwThreadId;g_ThreadHandles[0] = (HANDLE)WSA_INVALID_EVENT;for (int i = 0; i < MAX_WORKER_THREAD; i++) {g_ThreadHandles[i] = INVALID_HANDLE_VALUE;}GetSystemInfo(&systemInfo);dwThreadCount = systemInfo.dwNumberOfProcessors * 2;if (WSA_INVALID_EVENT == (g_hCleanupEvent[0] = WSACreateEvent())) {myprintf("WSACreateEvent() failed: %d\n", WSAGetLastError());return;}if ((nRet = WSAStartup(0x202, &wsaData)) != 0) {myprintf("WSAStartup() failed: %d\n",nRet);if(g_hCleanupEvent[0] != WSA_INVALID_EVENT) {WSACloseEvent(g_hCleanupEvent[0]);g_hCleanupEvent[0] = WSA_INVALID_EVENT;}return;}InitializeCriticalSection(&g_CriticalSection);while (g_bRestart) {g_bRestart = FALSE;g_bEndServer = FALSE;WSAResetEvent(g_hCleanupEvent[0]);// 创建 IOCP 实例g_hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);if (g_hIOCP == NULL) {myprintf("CreateIoCompletionPort() failed to create I/O completion port: %d\n", GetLastError());goto done;}// 启用 worker 线程来处理异步任务完成的通知for (DWORD dwCPU=0; dwCPU<dwThreadCount; dwCPU++) {// Create worker threads to service the overlapped I/O requests. The decision// to create 2 worker threads per CPU in the system is a heuristic. Also,// note that thread handles are closed right away, because we will not need them// and the worker threads will continue to execute.hThread = CreateThread(NULL, 0, WorkerThread, g_hIOCP, 0, &dwThreadId);if (hThread == NULL) {myprintf("CreateThread() failed to create worker thread: %d\n", GetLastError());goto done;}g_ThreadHandles[dwCPU] = hThread;hThread = INVALID_HANDLE_VALUE;}if (!CreateListenSocket())goto done;// 提交 accept 任务if (!CreateAcceptSocket(TRUE))goto done;// 阻塞主线程,直到服务器退出WSAWaitForMultipleEvents(1, g_hCleanupEvent, TRUE, WSA_INFINITE, FALSE);done:// 当服务器退出时,做一些清理工作g_bEndServer = TRUE;// Cause worker threads to exit// 因为我们在子线程中调用 GetQueuedCompletionStatus 使用的timeout 值为 INFINITE, // 我们需要手动的 post 一个 I/O completion packet 到 IOCP 实例上,以便子线程中的 // GetQueuedCompletionStatus 读取到我们手动 post 的任务完成通知而退出,// 不致于子线程用于无法退出if (g_hIOCP) {for (DWORD i = 0; i < dwThreadCount; i++) {PostQueuedCompletionStatus(g_hIOCP, 0, 0, NULL);}}// Make sure worker threads exits.if (WAIT_OBJECT_0 != WaitForMultipleObjects(dwThreadCount, g_ThreadHandles, TRUE, 1000)) {myprintf("WaitForMultipleObjects() failed: %d\n", GetLastError());} else {for (DWORD i=0; i<dwThreadCount; i++) {if (g_ThreadHandles[i] != INVALID_HANDLE_VALUE)CloseHandle(g_ThreadHandles[i]);g_ThreadHandles[i] = INVALID_HANDLE_VALUE;}}if (g_sdListen != INVALID_SOCKET) {closesocket(g_sdListen);g_sdListen = INVALID_SOCKET;}if (g_pCtxtListenSocket) {// 如果当前 Server socket 上还有正在进行的异步任务,等待它完成,再清理while (!HasOverlappedIoCompleted((LPOVERLAPPED)&g_pCtxtListenSocket->pIOContext->Overlapped))Sleep(0);if (g_pCtxtListenSocket->pIOContext->SocketAccept != INVALID_SOCKET)closesocket(g_pCtxtListenSocket->pIOContext->SocketAccept);g_pCtxtListenSocket->pIOContext->SocketAccept = INVALID_SOCKET;if (g_pCtxtListenSocket->pIOContext)xfree(g_pCtxtListenSocket->pIOContext);if (g_pCtxtListenSocket)xfree(g_pCtxtListenSocket);g_pCtxtListenSocket = NULL;}CtxtListFree();if (g_hIOCP) {CloseHandle(g_hIOCP);g_hIOCP = NULL;}} //while (g_bRestart)DeleteCriticalSection(&g_CriticalSection);if (g_hCleanupEvent[0] != WSA_INVALID_EVENT) {WSACloseEvent(g_hCleanupEvent[0]);g_hCleanupEvent[0] = WSA_INVALID_EVENT;}WSACleanup();
} //mainSOCKET CreateSocket() {int nRet = 0;int nZero = 0;SOCKET sdSocket = INVALID_SOCKET;sdSocket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_IP, NULL, 0, WSA_FLAG_OVERLAPPED); if (sdSocket == INVALID_SOCKET) {myprintf("WSASocket(sdSocket) failed: %d\n", WSAGetLastError());return(sdSocket);}//// Disable send buffering on the socket. Setting SO_SNDBUF// to 0 causes winsock to stop buffering sends and perform// sends directly from our buffers, thereby save one memory copy.//// However, this does prevent the socket from ever filling the// send pipeline. This can lead to packets being sent that are// not full (i.e. the overhead of the IP and TCP headers is // great compared to the amount of data being carried).//// Disabling the send buffer has less serious repercussions // than disabling the receive buffer.//nZero = 0;nRet = setsockopt(sdSocket, SOL_SOCKET, SO_SNDBUF, (char *)&nZero, sizeof(nZero));if (nRet == SOCKET_ERROR) {myprintf("setsockopt(SNDBUF) failed: %d\n", WSAGetLastError());return(sdSocket);}//// Don't disable receive buffering. This will cause poor network// performance since if no receive is posted and no receive buffers,// the TCP stack will set the window size to zero and the peer will// no longer be allowed to send data.//// // Do not set a linger value...especially don't set it to an abortive// close. If you set abortive close and there happens to be a bit of// data remaining to be transfered (or data that has not been // acknowledged by the peer), the connection will be forcefully reset// and will lead to a loss of data (i.e. the peer won't get the last// bit of data). This is BAD. If you are worried about malicious// clients connecting and then not sending or receiving, the server// should maintain a timer on each connection. If after some point,// the server deems a connection is "stale" it can then set linger// to be abortive and close the connection.///*LINGER lingerStruct;lingerStruct.l_onoff = 1;lingerStruct.l_linger = 0;nRet = setsockopt(sdSocket, SOL_SOCKET, SO_LINGER,(char *)&lingerStruct, sizeof(lingerStruct));if( nRet == SOCKET_ERROR ) {myprintf("setsockopt(SO_LINGER) failed: %d\n", WSAGetLastError());return(sdSocket);}*/return(sdSocket);
}BOOL CreateListenSocket(void) {int nRet = 0;LINGER lingerStruct;struct addrinfo hints = {0};struct addrinfo *addrlocal = NULL;lingerStruct.l_onoff = 1;lingerStruct.l_linger = 0;hints.ai_flags = AI_PASSIVE;hints.ai_family = AF_INET;hints.ai_socktype = SOCK_STREAM;hints.ai_protocol = IPPROTO_IP;if (getaddrinfo(NULL, DEFAULT_PORT, &hints, &addrlocal) != 0) {myprintf("getaddrinfo() failed with error %d\n", WSAGetLastError());return FALSE;}if (addrlocal == NULL) {myprintf("getaddrinfo() failed to resolve/convert the interface\n");return FALSE;}g_sdListen = CreateSocket();if (g_sdListen == INVALID_SOCKET) {freeaddrinfo(addrlocal);return FALSE;}nRet = bind(g_sdListen, addrlocal->ai_addr, (int) addrlocal->ai_addrlen);if (nRet == SOCKET_ERROR) {myprintf("bind() failed: %d\n", WSAGetLastError());freeaddrinfo(addrlocal);return FALSE;}nRet = listen(g_sdListen, 5);if (nRet == SOCKET_ERROR) {myprintf("listen() failed: %d\n", WSAGetLastError());freeaddrinfo(addrlocal);return FALSE;}freeaddrinfo(addrlocal);return TRUE;
}//
// Create a socket and invoke AcceptEx. Only the original call to to this
// function needs to be added to the IOCP.
//
// If the expected behaviour of connecting client applications is to NOT
// send data right away, then only posting one AcceptEx can cause connection
// attempts to be refused if a client connects without sending some initial
// data (notice that the associated iocpclient does not operate this way
// but instead makes a connection and starts sending data write away).
// This is because the IOCP packet does not get delivered without the initial
// data (as implemented in this sample) thus preventing the worker thread
// from posting another AcceptEx and eventually the backlog value set in
// listen() will be exceeded if clients continue to try to connect.
//
// One technique to address this situation is to simply cause AcceptEx
// to return right away upon accepting a connection without returning any
// data. This can be done by setting dwReceiveDataLength=0 when calling AcceptEx.
//
// Another technique to address this situation is to post multiple calls
// to AcceptEx. Posting multiple calls to AcceptEx is similar in concept to
// increasing the backlog value in listen(), though posting AcceptEx is
// dynamic (i.e. during the course of running your application you can adjust
// the number of AcceptEx calls you post). It is important however to keep
// your backlog value in listen() high in your server to ensure that the
// stack can accept connections even if your application does not get enough
// CPU cycles to repost another AcceptEx under stress conditions.
//
// This sample implements neither of these techniques and is therefore
// susceptible to the behaviour described above.
//
BOOL CreateAcceptSocket(BOOL fUpdateIOCP) {int nRet = 0;DWORD dwRecvNumBytes = 0;DWORD bytes = 0;GUID acceptex_guid = WSAID_ACCEPTEX;//The context for listening socket uses the SockAccept member to store the//socket for client connection. if (fUpdateIOCP) {g_pCtxtListenSocket = UpdateCompletionPort(g_sdListen, ClientIoAccept, FALSE);if (g_pCtxtListenSocket == NULL) {myprintf("failed to update listen socket to IOCP\n");return FALSE;}// 动态获取 AcceptEx 方法的函数指针// 将它保存再对应 Socket context 上nRet = WSAIoctl(g_sdListen,SIO_GET_EXTENSION_FUNCTION_POINTER,&acceptex_guid,sizeof(acceptex_guid),&g_pCtxtListenSocket->fnAcceptEx,sizeof(g_pCtxtListenSocket->fnAcceptEx),&bytes,NULL,NULL);if (nRet == SOCKET_ERROR) {myprintf("failed to load AcceptEx: %d\n", WSAGetLastError());return FALSE;}}g_pCtxtListenSocket->pIOContext->SocketAccept = CreateSocket();if (g_pCtxtListenSocket->pIOContext->SocketAccept == INVALID_SOCKET) {myprintf("failed to create new accept socket\n");return FALSE;}// 提交接收任务// 这里,我们期待接收 socket 的同时从该 socket 上 接收一块儿数据nRet = g_pCtxtListenSocket->fnAcceptEx(g_sdListen, g_pCtxtListenSocket->pIOContext->SocketAccept,(LPVOID)(g_pCtxtListenSocket->pIOContext->Buffer),MAX_BUFF_SIZE - (2 * (sizeof(SOCKADDR_STORAGE) + 16)),sizeof(SOCKADDR_STORAGE) + 16, sizeof(SOCKADDR_STORAGE) + 16,&dwRecvNumBytes,(LPOVERLAPPED) &(g_pCtxtListenSocket->pIOContext->Overlapped));if (nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())) {myprintf("AcceptEx() failed: %d\n", WSAGetLastError());return FALSE;}return TRUE;
}DWORD WINAPI WorkerThread (LPVOID WorkThreadContext) {HANDLE hIOCP = (HANDLE)WorkThreadContext;BOOL bSuccess = FALSE;int nRet = 0;LPWSAOVERLAPPED lpOverlapped = NULL;PPER_SOCKET_CONTEXT lpPerSocketContext = NULL;PPER_SOCKET_CONTEXT lpAcceptSocketContext = NULL;PPER_IO_CONTEXT lpIOContext = NULL; WSABUF buffRecv;WSABUF buffSend;DWORD dwRecvNumBytes = 0;DWORD dwSendNumBytes = 0;DWORD dwFlags = 0;DWORD dwIoSize = 0;HRESULT hRet;while (TRUE) {// 阻塞的等待有异步任务完成的通知到来// 如果没有,一直等待bSuccess = GetQueuedCompletionStatus(hIOCP,&dwIoSize,(PDWORD_PTR)&lpPerSocketContext,(LPOVERLAPPED *)&lpOverlapped,INFINITE );if (!bSuccess)myprintf("GetQueuedCompletionStatus() failed: %d\n", GetLastError());// 当服务器退出时,我们使用 PostQueuedCompletionStatus post 的消息会触发这个 case// 我们当前子线程便可以正常退出了if (lpPerSocketContext == NULL) {return 0;}if (g_bEndServer) {return 0;}lpIOContext = (PPER_IO_CONTEXT)lpOverlapped;////We should never skip the loop and not post another AcceptEx if the current//completion packet is for previous AcceptEx//if (lpIOContext->IOOperation != ClientIoAccept) {if (!bSuccess || (bSuccess && (0 == dwIoSize))) {CloseClient(lpPerSocketContext, FALSE); continue;}}//// determine what type of IO packet has completed by checking the PER_IO_CONTEXT // associated with this socket. This will determine what action to take.//switch (lpIOContext->IOOperation) {case ClientIoAccept://// When the AcceptEx function returns, the socket sAcceptSocket is // in the default state for a connected socket. The socket sAcceptSocket // does not inherit the properties of the socket associated with // sListenSocket parameter until SO_UPDATE_ACCEPT_CONTEXT is set on // the socket. Use the setsockopt function to set the SO_UPDATE_ACCEPT_CONTEXT // option, specifying sAcceptSocket as the socket handle and sListenSocket // as the option value. //nRet = setsockopt(lpPerSocketContext->pIOContext->SocketAccept, SOL_SOCKET,SO_UPDATE_ACCEPT_CONTEXT,(char *)&g_sdListen,sizeof(g_sdListen));if (nRet == SOCKET_ERROR) {////just warn user here.//myprintf("setsockopt(SO_UPDATE_ACCEPT_CONTEXT) failed to update accept socket\n");WSASetEvent(g_hCleanupEvent[0]);return 0;}lpAcceptSocketContext = UpdateCompletionPort(lpPerSocketContext->pIOContext->SocketAccept, ClientIoAccept, TRUE);if (lpAcceptSocketContext == NULL) {////just warn user here.//myprintf("failed to update accept socket to IOCP\n");WSASetEvent(g_hCleanupEvent[0]);return 0;}if (dwIoSize) {lpAcceptSocketContext->pIOContext->IOOperation = ClientIoWrite;lpAcceptSocketContext->pIOContext->nTotalBytes = dwIoSize;lpAcceptSocketContext->pIOContext->nSentBytes = 0;lpAcceptSocketContext->pIOContext->wsabuf.len = dwIoSize;hRet = StringCbCopyNA(lpAcceptSocketContext->pIOContext->Buffer,MAX_BUFF_SIZE,lpPerSocketContext->pIOContext->Buffer,sizeof(lpPerSocketContext->pIOContext->Buffer));lpAcceptSocketContext->pIOContext->wsabuf.buf = lpAcceptSocketContext->pIOContext->Buffer;nRet = WSASend(lpPerSocketContext->pIOContext->SocketAccept,&lpAcceptSocketContext->pIOContext->wsabuf, 1,&dwSendNumBytes,0,&(lpAcceptSocketContext->pIOContext->Overlapped), NULL);if (nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())) {myprintf ("WSASend() failed: %d\n", WSAGetLastError());CloseClient(lpAcceptSocketContext, FALSE);} else {myprintf("WorkerThread %d: Socket(%d) AcceptEx completed (%d bytes), Send posted\n", GetCurrentThreadId(), lpPerSocketContext->Socket, dwIoSize);}} else {//// AcceptEx completes but doesn't read any data so we need to post// an outstanding overlapped read.//lpAcceptSocketContext->pIOContext->IOOperation = ClientIoRead;dwRecvNumBytes = 0;dwFlags = 0;buffRecv.buf = lpAcceptSocketContext->pIOContext->Buffer,buffRecv.len = MAX_BUFF_SIZE;nRet = WSARecv(lpAcceptSocketContext->Socket,&buffRecv, 1,&dwRecvNumBytes,&dwFlags,&lpAcceptSocketContext->pIOContext->Overlapped, NULL);if (nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())) {myprintf ("WSARecv() failed: %d\n", WSAGetLastError());CloseClient(lpAcceptSocketContext, FALSE);}}////Time to post another outstanding AcceptEx//if (!CreateAcceptSocket(FALSE)) {myprintf("Please shut down and reboot the server.\n");WSASetEvent(g_hCleanupEvent[0]);return(0);}break;case ClientIoRead://// a read operation has completed, post a write operation to echo the// data back to the client using the same data buffer.//lpIOContext->IOOperation = ClientIoWrite;lpIOContext->nTotalBytes = dwIoSize;lpIOContext->nSentBytes = 0;lpIOContext->wsabuf.len = dwIoSize;dwFlags = 0;nRet = WSASend(lpPerSocketContext->Socket,&lpIOContext->wsabuf, 1, &dwSendNumBytes,dwFlags,&(lpIOContext->Overlapped), NULL);if (nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())) {myprintf("WSASend() failed: %d\n", WSAGetLastError());CloseClient(lpPerSocketContext, FALSE);} else {myprintf("WorkerThread %d: Socket(%d) Recv completed (%d bytes), Send posted\n", GetCurrentThreadId(), lpPerSocketContext->Socket, dwIoSize);}break;case ClientIoWrite://// a write operation has completed, determine if all the data intended to be// sent actually was sent.//lpIOContext->IOOperation = ClientIoWrite;lpIOContext->nSentBytes += dwIoSize;dwFlags = 0;if (lpIOContext->nSentBytes < lpIOContext->nTotalBytes) {//// the previous write operation didn't send all the data,// post another send to complete the operation//buffSend.buf = lpIOContext->Buffer + lpIOContext->nSentBytes;buffSend.len = lpIOContext->nTotalBytes - lpIOContext->nSentBytes;nRet = WSASend (lpPerSocketContext->Socket,&buffSend,1, &dwSendNumBytes,dwFlags,&(lpIOContext->Overlapped), NULL);if (nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())) {myprintf ("WSASend() failed: %d\n", WSAGetLastError());CloseClient(lpPerSocketContext, FALSE);} else {myprintf("WorkerThread %d: Socket(%d) Send partially completed (%d bytes), Recv posted\n", GetCurrentThreadId(), lpPerSocketContext->Socket, dwIoSize);}} else {//// previous write operation completed for this socket, post another recv//lpIOContext->IOOperation = ClientIoRead; dwRecvNumBytes = 0;dwFlags = 0;buffRecv.buf = lpIOContext->Buffer,buffRecv.len = MAX_BUFF_SIZE;nRet = WSARecv(lpPerSocketContext->Socket,&buffRecv, 1, &dwRecvNumBytes,&dwFlags,&lpIOContext->Overlapped, NULL);if (nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())) {myprintf ("WSARecv() failed: %d\n", WSAGetLastError());CloseClient(lpPerSocketContext, FALSE);} else {myprintf("WorkerThread %d: Socket(%d) Send completed (%d bytes), Recv posted\n", GetCurrentThreadId(), lpPerSocketContext->Socket, dwIoSize);}}break;} //switch} //whilereturn 0;
} //
// Allocate a context structures for the socket and add the socket to the IOCP.
// Additionally, add the context structure to the global list of context structures.
//
PPER_SOCKET_CONTEXT UpdateCompletionPort(SOCKET sd, IO_OPERATION ClientIo, BOOL bAddToList) {PPER_SOCKET_CONTEXT lpPerSocketContext;lpPerSocketContext = CtxtAllocate(sd, ClientIo);if (lpPerSocketContext == NULL)return NULL;g_hIOCP = CreateIoCompletionPort((HANDLE)sd, g_hIOCP, (DWORD_PTR)lpPerSocketContext, 0);if (g_hIOCP == NULL) {myprintf("CreateIoCompletionPort() failed: %d\n", GetLastError());if( lpPerSocketContext->pIOContext )xfree(lpPerSocketContext->pIOContext);xfree(lpPerSocketContext);return NULL;}////The listening socket context (bAddToList is FALSE) is not added to the list.//All other socket contexts are added to the list.//if (bAddToList) CtxtListAddTo(lpPerSocketContext);myprintf("UpdateCompletionPort: Socket(%d) added to IOCP\n", lpPerSocketContext->Socket);return lpPerSocketContext;
}//
// Close down a connection with a client. This involves closing the socket (when
// initiated as a result of a CTRL-C the socket closure is not graceful). Additionally,
// any context data associated with that socket is free'd.
//
VOID CloseClient (PPER_SOCKET_CONTEXT lpPerSocketContext, BOOL bGraceful) {EnterCriticalSection(&g_CriticalSection);if (lpPerSocketContext) {myprintf("CloseClient: Socket(%d) connection closing (graceful=%s)\n", lpPerSocketContext->Socket, (bGraceful?"TRUE":"FALSE"));if (!bGraceful) {//// force the subsequent closesocket to be abortative.//LINGER lingerStruct;lingerStruct.l_onoff = 1;lingerStruct.l_linger = 0;setsockopt(lpPerSocketContext->Socket, SOL_SOCKET, SO_LINGER, (char *)&lingerStruct, sizeof(lingerStruct));}if (lpPerSocketContext->pIOContext->SocketAccept != INVALID_SOCKET) {closesocket(lpPerSocketContext->pIOContext->SocketAccept);lpPerSocketContext->pIOContext->SocketAccept = INVALID_SOCKET;};closesocket(lpPerSocketContext->Socket);lpPerSocketContext->Socket = INVALID_SOCKET;CtxtListDeleteFrom(lpPerSocketContext);lpPerSocketContext = NULL;} else {myprintf("CloseClient: lpPerSocketContext is NULL\n");}LeaveCriticalSection(&g_CriticalSection);return;
} //
// Allocate a socket context for the new connection.
//
PPER_SOCKET_CONTEXT CtxtAllocate(SOCKET sd, IO_OPERATION ClientIO) {PPER_SOCKET_CONTEXT lpPerSocketContext;EnterCriticalSection(&g_CriticalSection);lpPerSocketContext = (PPER_SOCKET_CONTEXT)xmalloc(sizeof(PER_SOCKET_CONTEXT));if (lpPerSocketContext) {lpPerSocketContext->pIOContext = (PPER_IO_CONTEXT)xmalloc(sizeof(PER_IO_CONTEXT));if( lpPerSocketContext->pIOContext ) {lpPerSocketContext->Socket = sd;lpPerSocketContext->pCtxtBack = NULL;lpPerSocketContext->pCtxtForward = NULL;lpPerSocketContext->pIOContext->Overlapped.Internal = 0;lpPerSocketContext->pIOContext->Overlapped.InternalHigh = 0;lpPerSocketContext->pIOContext->Overlapped.Offset = 0;lpPerSocketContext->pIOContext->Overlapped.OffsetHigh = 0;lpPerSocketContext->pIOContext->Overlapped.hEvent = NULL;lpPerSocketContext->pIOContext->IOOperation = ClientIO;lpPerSocketContext->pIOContext->pIOContextForward = NULL;lpPerSocketContext->pIOContext->nTotalBytes = 0;lpPerSocketContext->pIOContext->nSentBytes = 0;lpPerSocketContext->pIOContext->wsabuf.buf = lpPerSocketContext->pIOContext->Buffer;lpPerSocketContext->pIOContext->wsabuf.len = sizeof(lpPerSocketContext->pIOContext->Buffer);lpPerSocketContext->pIOContext->SocketAccept = INVALID_SOCKET;ZeroMemory(lpPerSocketContext->pIOContext->wsabuf.buf, lpPerSocketContext->pIOContext->wsabuf.len);} else {xfree(lpPerSocketContext);myprintf("HeapAlloc() PER_IO_CONTEXT failed: %d\n", GetLastError());}} else {myprintf("HeapAlloc() PER_SOCKET_CONTEXT failed: %d\n", GetLastError());return NULL;}LeaveCriticalSection(&g_CriticalSection);return(lpPerSocketContext);
}//
// Add a client connection context structure to the global list of context structures.
//
VOID CtxtListAddTo(PPER_SOCKET_CONTEXT lpPerSocketContext) {PPER_SOCKET_CONTEXT pTemp;EnterCriticalSection(&g_CriticalSection);if (g_pCtxtList == NULL) {//// add the first node to the linked list//lpPerSocketContext->pCtxtBack = NULL;lpPerSocketContext->pCtxtForward = NULL;g_pCtxtList = lpPerSocketContext;} else {//// add node to head of list//pTemp = g_pCtxtList;g_pCtxtList = lpPerSocketContext;lpPerSocketContext->pCtxtBack = pTemp;lpPerSocketContext->pCtxtForward = NULL; pTemp->pCtxtForward = lpPerSocketContext;}LeaveCriticalSection(&g_CriticalSection);return;
}//
// Remove a client context structure from the global list of context structures.
//
VOID CtxtListDeleteFrom(PPER_SOCKET_CONTEXT lpPerSocketContext) {PPER_SOCKET_CONTEXT pBack;PPER_SOCKET_CONTEXT pForward;PPER_IO_CONTEXT pNextIO = NULL;PPER_IO_CONTEXT pTempIO = NULL;EnterCriticalSection(&g_CriticalSection);if (lpPerSocketContext) {pBack = lpPerSocketContext->pCtxtBack;pForward = lpPerSocketContext->pCtxtForward;if (pBack == NULL && pForward == NULL) {//// This is the only node in the list to delete//g_pCtxtList = NULL;} else if (pBack == NULL && pForward != NULL) {//// This is the start node in the list to delete//pForward->pCtxtBack = NULL;g_pCtxtList = pForward;} else if (pBack != NULL && pForward == NULL) {//// This is the end node in the list to delete//pBack->pCtxtForward = NULL;} else if (pBack && pForward) {//// Neither start node nor end node in the list//pBack->pCtxtForward = pForward;pForward->pCtxtBack = pBack;}//// Free all i/o context structures per socket//pTempIO = (PPER_IO_CONTEXT)(lpPerSocketContext->pIOContext);do {pNextIO = (PPER_IO_CONTEXT)(pTempIO->pIOContextForward);if (pTempIO) {////The overlapped structure is safe to free when only the posted i/o has//completed. Here we only need to test those posted but not yet received //by PQCS in the shutdown process.//if (g_bEndServer)while (!HasOverlappedIoCompleted((LPOVERLAPPED)pTempIO)) Sleep(0);xfree(pTempIO);pTempIO = NULL;}pTempIO = pNextIO;} while (pNextIO);xfree(lpPerSocketContext);lpPerSocketContext = NULL;} else {myprintf("CtxtListDeleteFrom: lpPerSocketContext is NULL\n");}LeaveCriticalSection(&g_CriticalSection);return;
}//
// Free all context structure in the global list of context structures.
//
VOID CtxtListFree() {PPER_SOCKET_CONTEXT pTemp1, pTemp2;EnterCriticalSection(&g_CriticalSection);pTemp1 = g_pCtxtList; while (pTemp1) {pTemp2 = pTemp1->pCtxtBack;CloseClient(pTemp1, FALSE);pTemp1 = pTemp2;}LeaveCriticalSection(&g_CriticalSection);return;
}int myprintf(const char *lpFormat, ...) {int nLen = 0;int nRet = 0;char cBuffer[512] ;va_list arglist ;HANDLE hOut = NULL;HRESULT hRet;ZeroMemory(cBuffer, sizeof(cBuffer));va_start(arglist, lpFormat);nLen = lstrlenA(lpFormat) ;hRet = StringCchVPrintfA(cBuffer,512,lpFormat,arglist);if (nRet >= nLen || GetLastError() == 0) {hOut = GetStdHandle(STD_OUTPUT_HANDLE);if (hOut != INVALID_HANDLE_VALUE)WriteConsole( hOut, cBuffer, lstrlenA(cBuffer), (LPDWORD)&nLen, NULL ) ;}return nLen ;
}
END!!!