文章目录
- CUDA Stream 回调函数示例代码
- 基本概念
- 示例代码
- 代码解释
- 回调函数的特点
- 更复杂的示例:多个回调
- 注意事项
- CUDA Stream 回调函数中使用 MPI 或 NCCL
- 示例程序
- 注意事项
CUDA Stream 回调函数示例代码
CUDA 中的流回调函数(stream callback)是一种在 CUDA 流中插入异步回调的机制,它允许你在流的特定位置插入一个主机端函数调用。回调函数会在流中所有前面的操作都完成后被调用。
基本概念
- 回调函数: 一个在主机上执行的函数,当流中前面的所有操作都完成后被调用
- 异步执行: 回调不会阻塞主机线程
- 执行顺序: 回调函数在流中按照插入顺序执行
示例代码
#include <stdio.h>
#include <cuda_runtime.h>// CUDA核函数
__global__ void kernel(int *data, int value, int N) {int idx = blockIdx.x * blockDim.x + threadIdx.x;if (idx < N) {data[idx] = value;}
}// 回调函数
void CUDART_CB myCallback(cudaStream_t stream, cudaError_t status, void *userData) {printf("Callback executed! Status: %s, User data: %d\n",cudaGetErrorString(status), *(int*)userData);
}int main() {const int N = 1024;const int value = 42;int *d_data = nullptr;int userData = 123; // 用户自定义数据// 分配设备内存cudaMalloc(&d_data, N * sizeof(int));// 创建流cudaStream_t stream;cudaStreamCreate(&stream);// 启动核函数dim3 block(256);dim3 grid((N + block.x - 1) / block.x);kernel<<<grid, block, 0, stream>>>(d_data, value, N);// 添加回调函数到流cudaStreamAddCallback(stream, myCallback, &userData, 0);// 可以继续添加其他操作到流kernel<<<grid, block, 0, stream>>>(d_data, value + 1, N);// 等待流完成cudaStreamSynchronize(stream);// 清理资源cudaFree(d_data);cudaStreamDestroy(stream);return 0;
}
代码解释
-
核函数: 简单的核函数,将数组元素设置为指定值。
-
回调函数:
- 必须具有
void CUDART_CB func(cudaStream_t stream, cudaError_t status, void *userData)
的签名 status
参数表示流中前面操作的状态userData
是用户提供的自定义数据
- 必须具有
-
主程序流程:
- 分配设备内存
- 创建CUDA流
- 启动第一个核函数
- 添加回调函数到流中
- 启动第二个核函数
- 同步流以确保所有操作完成
- 释放资源
回调函数的特点
-
执行时机: 回调函数会在流中所有前面的操作完成后执行,但在后续操作开始前执行。
-
线程安全: 回调函数在独立的线程中执行,不是主线程。
-
限制:
- 回调函数中不应调用CUDA API函数
- 不应执行耗时的操作
- 不应抛出异常
-
用户数据: 可以通过
userData
参数传递数据给回调函数,但需要确保在回调执行时数据仍然有效。
更复杂的示例:多个回调
#include <stdio.h>
#include <cuda_runtime.h>__global__ void kernel(int *data, int value, int N) {int idx = blockIdx.x * blockDim.x + threadIdx.x;if (idx < N) {data[idx] += value;}
}void CUDART_CB callback1(cudaStream_t stream, cudaError_t status, void *userData) {printf("Callback 1: Step %d completed\n", *(int*)userData);
}void CUDART_CB callback2(cudaStream_t stream, cudaError_t status, void *userData) {printf("Callback 2: Step %d completed\n", *(int*)userData);
}int main() {const int N = 1024;int *d_data = nullptr;int step1 = 1, step2 = 2;cudaMalloc(&d_data, N * sizeof(int));cudaMemset(d_data, 0, N * sizeof(int));cudaStream_t stream;cudaStreamCreate(&stream);dim3 block(256);dim3 grid((N + block.x - 1) / block.x);// 执行序列: 核函数 -> 回调1 -> 核函数 -> 回调2kernel<<<grid, block, 0, stream>>>(d_data, 10, N);cudaStreamAddCallback(stream, callback1, &step1, 0);kernel<<<grid, block, 0, stream>>>(d_data, 20, N);cudaStreamAddCallback(stream, callback2, &step2, 0);cudaStreamSynchronize(stream);// 验证结果int h_data[N];cudaMemcpy(h_data, d_data, N * sizeof(int), cudaMemcpyDeviceToHost);printf("First element: %d (should be 30)\n", h_data[0]);cudaFree(d_data);cudaStreamDestroy(stream);return 0;
}
这个示例展示了如何在流中插入多个回调函数,每个回调在不同的执行阶段被触发。
注意事项
- 回调函数执行时间应尽量短,避免影响整体性能
- 确保回调函数中使用的用户数据在回调执行时仍然有效
- 回调函数中不应进行CUDA API调用
- 回调函数执行顺序与它们在流中的添加顺序一致
回调函数是CUDA流中实现主机-设备交互和任务协调的有力工具,特别适合用于构建复杂的异步执行流水线。
CUDA Stream 回调函数中使用 MPI 或 NCCL
在 CUDA 中,stream 回调函数是在主机端执行的函数,当 stream 中所有前面的操作都完成后会被调用。关于在回调函数中使用 MPI 或 NCCL 的问题:
-
MPI: 可以在回调函数中使用 MPI 函数,但需要注意 MPI 的线程安全性。MPI 需要初始化为
MPI_THREAD_SERIALIZED
或MPI_THREAD_MULTIPLE
级别才能安全地在回调中使用。 -
NCCL: 也可以在回调函数中使用 NCCL 函数,但需要注意 NCCL 通信可能会与 CUDA 操作交错,需要确保正确的同步。
示例程序
下面是一个展示如何在 CUDA stream 回调函数中使用 MPI 和 NCCL 的示例程序:
#include <stdio.h>
#include <mpi.h>
#include <cuda_runtime.h>
#include <nccl.h>#define CUDACHECK(cmd) do { \cudaError_t e = cmd; \if( e != cudaSuccess ) { \printf("CUDA error %s:%d '%s'\n", \__FILE__,__LINE__,cudaGetErrorString(e)); \exit(EXIT_FAILURE); \} \
} while(0)#define NCCLCHECK(cmd) do { \ncclResult_t r = cmd; \if( r != ncclSuccess ) { \printf("NCCL error %s:%d '%s'\n", \__FILE__,__LINE__,ncclGetErrorString(r)); \exit(EXIT_FAILURE); \} \
} while(0)void CUDART_CB myStreamCallback(cudaStream_t stream, cudaError_t status, void *userData) {int *data = (int*)userData;int rank, size;// 获取MPI信息MPI_Comm_rank(MPI_COMM_WORLD, &rank);MPI_Comm_size(MPI_COMM_WORLD, &size);printf("Rank %d: Stream callback executed. Data value: %d\n", rank, *data);// 在这里可以使用MPI函数MPI_Barrier(MPI_COMM_WORLD);// 也可以使用NCCL函数(需要先初始化NCCL)ncclComm_t comm = *(ncclComm_t*)((void**)userData + 1);float *sendbuff, *recvbuff;// 假设这些缓冲区已经在其他地方分配和初始化// NCCLCHECK(ncclAllReduce(sendbuff, recvbuff, count, ncclFloat, ncclSum, comm, stream));printf("Rank %d: Finished MPI/NCCL operations in callback\n", rank);
}int main(int argc, char* argv[]) {int rank, size;// 初始化MPI,要求线程支持MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &provided);if (provided < MPI_THREAD_SERIALIZED) {printf("MPI thread support insufficient\n");MPI_Abort(MPI_COMM_WORLD, 1);}MPI_Comm_rank(MPI_COMM_WORLD, &rank);MPI_Comm_size(MPI_COMM_WORLD, &size);// 初始化CUDAint dev = rank % 8; // 假设每个进程使用不同的GPUCUDACHECK(cudaSetDevice(dev));// 初始化NCCLncclComm_t comm;ncclUniqueId id;if (rank == 0) ncclGetUniqueId(&id);MPI_Bcast(&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD);NCCLCHECK(ncclCommInitRank(&comm, size, id, rank));// 创建CUDA streamcudaStream_t stream;CUDACHECK(cudaStreamCreate(&stream));// 准备一些数据传递给回调函数int *h_data, *d_data;h_data = (int*)malloc(sizeof(int));*h_data = rank * 100;CUDACHECK(cudaMalloc(&d_data, sizeof(int)));CUDACHECK(cudaMemcpyAsync(d_data, h_data, sizeof(int), cudaMemcpyHostToDevice, stream));// 准备用户数据(包含普通数据和NCCL通信器)void *userData[2];userData[0] = h_data;userData[1] = &comm;// 添加回调函数CUDACHECK(cudaStreamAddCallback(stream, myStreamCallback, userData, 0));// 等待stream完成CUDACHECK(cudaStreamSynchronize(stream));// 清理资源NCCLCHECK(ncclCommDestroy(comm));CUDACHECK(cudaStreamDestroy(stream));CUDACHECK(cudaFree(d_data));free(h_data);MPI_Finalize();return 0;
}
注意事项
-
MPI 线程安全: 必须使用
MPI_Init_thread
并确保提供的线程支持级别足够(至少MPI_THREAD_SERIALIZED
)。 -
NCCL 使用: 在回调中使用 NCCL 时需要确保:
- NCCL 通信器已经初始化
- 使用的 CUDA stream 与 NCCL 操作兼容
- 缓冲区已经正确分配和初始化
-
死锁风险: 在回调中进行集体通信操作(如 MPI_Barrier 或 ncclAllReduce)时要小心,确保所有进程都能到达该点。
-
性能考虑: 在回调中进行通信操作可能会影响整体性能,需要仔细评估。
这个示例展示了基本用法,实际应用中需要根据具体需求进行调整。