文章目录
- 自动混合精度示例
- 典型的混合精度训练
- 处理未缩放梯度
- 梯度裁剪
- 处理缩放梯度
- 梯度累积
- 梯度惩罚
- 处理多个模型、损失函数和优化器
- 多 GPU 工作环境下的注意事项
- 单进程中的DataParallel
- 分布式数据并行:每个进程对应一个GPU
- 每个进程使用多块GPU的DistributedDataParallel
- 自动混合精度与自定义自动求导函数
- 支持多输入或自动转换操作的功能
- 需要特定 `dtype` 的函数
- 自动微分机制
- 自动微分如何编码历史记录
- 保存的张量
- 不可微函数的梯度计算
- 局部禁用梯度计算
- 设置 `requires_grad`
- 梯度模式
- 默认模式(梯度模式)
- 无梯度模式
- 推理模式
- 评估模式 (`nn.Module.eval()`)
- 自动求导中的原地操作
- 原地操作的正确性检查
- 多线程自动求导
- CPU 并发处理
- 非确定性行为
- 图保留机制
- Autograd节点的线程安全性
- C++钩子的线程安全性问题
- 复数自动求导
- 什么是复导数?
- 维廷格微积分登场...
- Wirtinger微积分在优化中有何作用?
- PyTorch如何计算共轭Wirtinger导数?
- 一、关于 Diff Match Patch
- 相关链接资源
- 关键功能特性
- 二、参考文档
- 三、多语言支持
- 四、算法实现
- 五、Python 实现
- 1、Hello World示例
- 2、测试说明
- 如何为复杂函数编写自定义导数公式?
- 跨域函数如何处理?
- 张量保存的钩子函数
- 为保存的张量注册钩子
- 为保存的张量注册默认钩子
- 反向钩子执行机制
- 特定钩子何时会被触发
- 不同钩子的触发顺序
- 特殊钩子
- 当张量被原地修改时 Tensor hooks 的行为表现
- 广播语义
- 通用语义
- 原地操作语义
- 向后兼容性
- CPU线程与TorchScript推理
- 构建选项
- 运行时 API
- 调整线程数量
- CUDA 语义
- Ampere(及后续)设备上的 TensorFloat-32 (TF32)
- FP16 GEMM中的降低精度计算
- BF16 GEMM中的降低精度缩减
- FP16 GEMM 中的全 FP16 累加
- 异步执行
- CUDA 流
- 反向传播的流语义
- BC 说明:在默认流上使用梯度
- 内存管理
- 使用 `PYTORCH_CUDA_ALLOC_CONF` 优化内存占用
- 为CUDA使用自定义内存分配器
- 在同一程序中混合使用不同的CUDA系统分配器
- cuBLAS 工作空间
- cuFFT 计划缓存
- 即时编译
- 最佳实践
- 设备无关代码
- 使用固定内存缓冲区
- 使用 nn.parallel.DistributedDataParallel 替代 multiprocessing 或 nn.DataParallel
- CUDA 图
- 为什么选择CUDA Graphs?
- PyTorch API
- 约束条件
- 非约束条件
- 全网捕获
- 部分网络捕获
- 与 torch.cuda.amp 配合使用
- 多流使用方式
- 与 DistributedDataParallel 配合使用
- NCCL < 2.9.6 版本
- NCCL >= 2.9.6
- 图内存管理
- 跨捕获共享内存
- 分布式数据并行
- 示例
- 内部设计
- 实现
- 进程组
- 分布式数据并行 (DistributedDataParallel)
- TorchDynamo DDPOptimizer(分布式数据并行优化器)
- 扩展 PyTorch
- 添加新运算符
- 扩展 `torch.autograd`
- 何时使用自定义函数
- 何时不应使用
- 使用方法
- 示例
- 合并或分离 `forward()` 与 `setup_context()`
- 前向模式自动微分
- `torch.func` 转换和/或 `torch.vmap()`
- 扩展 `torch.nn`
- 添加 `Module`
- 扩展 `torch` Python API
- 扩展 `torch` 实现类 `Tensor` 类型
- 继承 `torch.Tensor` 类
- 扩展 `torch` 的 `Tensor` 包装类型
- 对定义了`__torch_function__`的多种类型进行操作
- PyTorch API 覆盖测试范围
- 扩展 `torch` 原生 API
- `__torch_dispatch__` 调用约定
- 通过模式扩展所有 `torch` API
- 使用 autograd.Function 扩展 torch.func
- 基础用法
- 示例1:autograd.Function调用外部系统
- 示例2:autograd.Function自定义梯度规则
- 限制与注意事项
- `torch.vmap()` 支持
- 自动生成vmap规则
- 定义 vmap 静态方法
- `torch.func.jvp()` 支持
- 常见问题解答
- 我的模型报错"cuda runtime error(2): out of memory"
- 我的GPU内存未正确释放
- 我的内存不足异常处理程序无法分配内存
- 我的数据加载工作进程返回相同的随机数
- 我的循环神经网络在数据并行环境下无法正常工作
- FSDP 说明文档
- FSDP 预取机制的细节
- 通信负载大小
- FSDP缓冲区大小
- 英特尔 GPU 入门指南
- 硬件要求
- 英特尔数据中心 GPU
- 英特尔客户端 GPU
- 软件前提条件
- 安装
- 二进制文件
- 从源码构建
- 检查 Intel GPU 的可用性
- 最小代码改动
- 示例
- 推理示例
- 使用FP32进行推理
- 使用AMP进行推理
- 使用 `torch.compile` 进行推理
- 训练示例
- 使用 FP32 进行训练
- 使用 AMP 进行训练
- 使用 `torch.compile` 进行训练
- 梯度检验机制
- 符号约定与背景信息
- 默认反向模式梯度检查行为
- 实数到实数函数
- 默认实数输入数值求导方法
- 默认实数输入的解析评估
- 复数到实数的函数映射
- 默认复数输入数值评估
- 默认复杂输入解析评估
- 处理复杂输出的函数
- 快速反向模式梯度检查
- 实函数到实函数的快速梯度检验
- 复数到实数函数的快速梯度检验
- 快速复数输入数值评估方法
- 快速复数输入解析评估
- 为何不使用复数uuu
- 支持复数输出的快速梯度检验
- Gradgradcheck 实现
- HIP (ROCm) 语义说明
- HIP 接口复用 CUDA 接口
- 检查 HIP 支持
- ROCm 上的 TensorFloat-32 (TF32)
- 内存管理
- hipBLAS 工作空间
- hipFFT/rocFFT 计划缓存
- torch.distributed 后端支持
- C++中CUDA API到HIP API的映射
- 请参阅CUDA语义文档
- 启用内核断言
- 大规模部署特性
- 全集群算子性能分析
- API 使用日志记录
- 为保存的TorchScript模型附加元数据
- 构建环境注意事项
- 常见扩展点
- LibTorch 稳定ABI
第二部分见: https://blog.csdn.net/lovechris00/article/details/147689171
自动混合精度示例
https://pytorch.org/docs/stable/notes/amp_examples.html
通常来说,"自动混合精度训练"指的是同时使用 torch.autocast
和 torch.amp.GradScaler
进行训练。
torch.autocast
实例能够为选定区域启用自动类型转换功能。自动类型转换会智能选择操作的计算精度,在保持准确性的同时提升性能。
torch.amp.GradScaler
实例则帮助便捷地执行梯度缩放步骤。梯度缩放通过最小化梯度下溢(underflow)问题,可以改善使用 float16
梯度(在CUDA和XPU设备上默认使用)的神经网络收敛性,具体原理参见此处。
torch.autocast
和 torch.amp.GradScaler
是模块化设计的。在以下示例中,每个组件都按照其官方文档建议的方式使用。
(本文示例仅作演示用途,可运行的完整教程请参阅自动混合精度实践指南)
典型的混合精度训练
# Creates model and optimizer in default precision
model = Net().cuda()
optimizer = optim.SGD(model.parameters(),
...)# Creates a GradScaler once at the beginning of training.
scaler = GradScaler()for epoch in epochs:for input, target in data:optimizer.zero_grad()# Runs the forward pass with autocasting.with autocast(device_type='cuda', dtype=torch.float16):output = model(input)loss = loss_fn(output, target)# Scales loss. Calls backward() on scaled loss to create scaled gradients.# Backward passes under autocast are not recommended.# Backward ops run in the same dtype autocast chose for corresponding forward ops.scaler.scale(loss).backward()# scaler.step() first unscales the gradients of the optimizer's assigned params.# If these gradients do not contain infs or NaNs, optimizer.step() is then called, # otherwise, optimizer.step() is skipped.scaler.step(optimizer)# Updates the scale for next iteration.scaler.update()
处理未缩放梯度
通过scaler.scale(loss).backward()
生成的所有梯度都经过缩放。若您需要在backward()
和scaler.step(optimizer)
之间修改或检查参数的.grad
属性,应当先对其进行反缩放处理。例如,梯度裁剪操作会调整一组梯度,使其全局范数(参见torch.nn.utils.clip_grad_norm_()
)或最大幅度(参见torch.nn.utils.clip_grad_value_()
)小于等于用户设定的阈值。如果尝试在未反缩放的情况下进行裁剪,梯度的范数/最大幅度也会保持缩放状态,这将导致您设定的阈值(本应针对未缩放梯度)失效。
scaler.unscale_(optimizer)
方法可对optimizer
所分配参数的梯度进行反缩放。如果您的模型包含其他被分配给不同优化器(例如optimizer2
)的参数,可以单独调用scaler.unscale_(optimizer2)
来反缩放这些参数的梯度。
梯度裁剪
在裁剪之前调用 scaler.unscale_(optimizer)
可以像往常一样对未缩放的梯度进行裁剪:
scaler = GradScaler()for epoch in epochs:for input, target in data:optimizer.zero_grad()with autocast(device_type='cuda', dtype=torch.float16):output = model(input)loss = loss_fn(output, target)scaler.scale(loss).backward()# Unscales the gradients of optimizer's assigned params in-placescaler.unscale_(optimizer)# Since the gradients of optimizer's assigned params are unscaled, clips as usual:torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm)# optimizer's gradients are already unscaled, so scaler.step does not unscale them, # although it still skips optimizer.step() if the gradients contain infs or NaNs.scaler.step(optimizer)# Updates the scale for next iteration.scaler.update()
scaler
记录显示当前迭代中已对优化器调用了 scaler.unscale_(optimizer)
,因此 scaler.step(optimizer)
知道无需在(内部)调用 optimizer.step()
之前重复反缩放梯度。
警告:每个优化器在每次 step
调用期间,仅应在所有关联参数的梯度累积完成后调用一次 unscale_
。若在两次 step
之间对同一优化器重复调用 unscale_
,将触发 RuntimeError。
处理缩放梯度
梯度累积
梯度累积会在一个有效批次(大小为 batch_per_iter * iters_to_accumulate
,如果是分布式训练则乘以 * num_procs
)上累加梯度。缩放因子应针对有效批次进行校准,这意味着:
- 在有效批次粒度上进行 inf/NaN 检查
- 如果发现梯度包含 inf/NaN 则跳过该步骤
- 缩放因子的更新也应在有效批次粒度上完成
此外,在累积某个有效批次的梯度时,梯度应保持缩放状态,且缩放因子必须保持不变。如果在累积完成前对梯度取消缩放(或改变了缩放因子),下一次反向传播会将缩放后的梯度与未缩放的梯度(或以不同因子缩放的梯度)相加,导致无法恢复累积的未缩放梯度。因此,step
操作必须应用这些条件。
若需对梯度执行 unscale_
操作(例如为了实现未缩放梯度的裁剪),请在调用 step
前立即调用 unscale_
,且确保所有待执行 step
的(已缩放)梯度已完成累积。另外,仅在完整处理完一个有效批次并调用 step
的迭代结束时,才调用 update
。
scaler = GradScaler()for epoch in epochs:for i, (input, target) in enumerate(data):with autocast(device_type='cuda', dtype=torch.float16):output = model(input)loss = loss_fn(output, target)loss = loss / iters_to_accumulate# Accumulates scaled gradients.scaler.scale(loss).backward()if (i + 1) % iters_to_accumulate == 0:# may unscale_ here if desired (e.g., to allow clipping unscaled gradients)scaler.step(optimizer)scaler.update()optimizer.zero_grad()
梯度惩罚
常见的梯度惩罚实现会使用 torch.autograd.grad()
生成梯度,将这些梯度组合成惩罚值,然后将惩罚值添加到损失函数中。
以下是一个未进行梯度缩放或自动混合精度的普通 L2 惩罚示例:
for epoch in epochs:for input, target in data:optimizer.zero_grad()output = model(input)loss = loss_fn(output, target)# Creates gradientsgrad_params = torch.autograd.grad(outputs=loss,inputs=model.parameters(), create_graph=True)# Computes the penalty term and adds it to the lossgrad_norm = 0for grad in grad_params:grad_norm += grad.pow(2).sum()grad_norm = grad_norm.sqrt()loss = loss + grad_normloss.backward()# clip gradients here, if desiredoptimizer.step()
为了实现带有梯度缩放的梯度惩罚,传递给 torch.autograd.grad()
的 outputs
张量需要进行缩放处理。这样得到的梯度也会被缩放,因此在合并生成惩罚值之前需要先进行反缩放操作。
此外,惩罚项的计算属于前向传播过程,因此应当放在 autocast
上下文环境中执行。
以下是相同 L2 惩罚项的实现示例:
scaler = GradScaler()for epoch in epochs:for input, target in data:optimizer.zero_grad()with autocast(device_type='cuda', dtype=torch.float16):output = model(input)loss = loss_fn(output, target)# Scales the loss for autograd.grad's backward pass, producing scaled_grad_paramsscaled_grad_params = torch.autograd.grad(outputs=scaler.scale(loss), inputs=model.parameters(), create_graph=True)# Creates unscaled grad_params before computing the penalty. scaled_grad_params are # not owned by any optimizer, so ordinary division is used instead of scaler.unscale_:inv_scale = 1./scaler.get_scale()grad_params = [p * inv_scale for p in scaled_grad_params]# Computes the penalty term and adds it to the losswith autocast(device_type='cuda', dtype=torch.float16):grad_norm = 0for grad in grad_params:grad_norm += grad.pow(2).sum()grad_norm = grad_norm.sqrt()loss = loss + grad_norm# Applies scaling to the backward call as usual.# Accumulates leaf gradients that are correctly scaled.scaler.scale(loss).backward()# may unscale_ here if desired (e.g., to allow clipping unscaled gradients)# step() and update() proceed as usual.scaler.step(optimizer)scaler.update()
处理多个模型、损失函数和优化器
如果网络包含多个损失函数,必须对每个损失函数单独调用 scaler.scale
。
如果网络包含多个优化器,可以单独对任意优化器调用 scaler.unscale_
,但必须对每个优化器单独调用 scaler.step
。
需要注意的是,scaler.update
只需在所有优化器完成当前迭代的 step
操作后调用一次。
scaler = torch.amp.GradScaler()for epoch in epochs:for input, target in data:optimizer0.zero_grad()optimizer1.zero_grad()with autocast(device_type='cuda', dtype=torch.float16):output0 = model0(input)output1 = model1(input)loss0 = loss_fn(2 * output0 + 3 * output1, target)loss1 = loss_fn(3 * output0 - 5 * output1, target)# (retain_graph here is unrelated to amp, it's present because in this# example, both backward() calls share some sections of graph.)scaler.scale(loss0).backward(retain_graph=True)scaler.scale(loss1).backward()# You can choose which optimizers receive explicit unscaling, if you # want to inspect or modify the gradients of the params they own.scaler.unscale_(optimizer0)scaler.step(optimizer0)scaler.step(optimizer1)scaler.update()
每个优化器都会检查其梯度是否存在无穷大/NaN值,并独立决定是否跳过当前步骤。这可能导致一个优化器跳过步骤,而另一个优化器继续执行。由于跳步情况很少发生(每几百次迭代才出现一次),因此不会影响模型收敛。如果您在给多优化器模型添加梯度缩放后观察到收敛效果不佳,请提交错误报告。
多 GPU 工作环境下的注意事项
此处描述的问题仅影响 autocast
功能,GradScaler
的使用方式保持不变。
单进程中的DataParallel
即使 torch.nn.DataParallel
会生成线程在每个设备上运行前向传播,自动转换状态也会在每个线程中传递,因此以下代码可以正常工作:
model = MyModel()
dp_model = nn.DataParallel(model)# Sets autocast in the main thread
with autocast(device_type='cuda', dtype=torch.float16):# dp_model's internal threads will autocast.output = dp_model(input)# loss_fn also autocastloss = loss_fn(output)
分布式数据并行:每个进程对应一个GPU
torch.nn.parallel.DistributedDataParallel
的文档建议,每个进程使用一个GPU以获得最佳性能。在这种情况下,DistributedDataParallel
不会在内部生成线程,因此autocast
和GradScaler
的使用不受影响。
每个进程使用多块GPU的DistributedDataParallel
torch.nn.parallel.DistributedDataParallel
可能会像 torch.nn.DataParallel
那样,为每个设备启动一个侧线程来运行前向传播。解决方法相同:在模型的forward
方法中应用autocast,确保侧线程中启用了该功能。
自动混合精度与自定义自动求导函数
如果您的网络使用了自定义自动求导函数(torch.autograd.Function
的子类),在以下情况下需要进行修改以确保与自动混合精度兼容:
- 函数接收多个浮点类型的张量输入
- 函数内部包含任何可自动混合精度的操作(参见自动混合精度操作参考)
- 函数需要特定的
dtype
(例如,当函数封装了仅针对特定dtype
编译的CUDA扩展时)
对于所有情况,如果您正在导入该函数且无法修改其定义,一个安全的备选方案是:在出现错误的使用点禁用自动混合精度,并强制使用float32
(或所需dtype
)执行。
with autocast(device_type='cuda', dtype=torch.float16):...with autocast(device_type='cuda', dtype=torch.float16, enabled=False):output = imported_function(input1.float(), input2.float())
如果你是函数的作者(或能修改其定义),更好的解决方案是使用 torch.amp.custom_fwd()
和 torch.amp.custom_bwd()
装饰器,如下文相关案例所示。
支持多输入或自动转换操作的功能
对 forward
和 backward
分别应用 custom_fwd
和 custom_bwd
(无需参数)。这能确保 forward
在当前自动转换状态下执行,而 backward
则与 forward
保持相同的自动转换状态(可避免类型不匹配错误):
class MyMM(torch.autograd.Function):@staticmethod@custom_fwddef forward(ctx, a, b):ctx.save_for_backward(a, b)return a.mm(b)@staticmethod@custom_bwddef backward(ctx, grad):a, b = ctx.saved_tensorsreturn grad.mm(b.t()), a.t().mm(grad)
现在可以随时随地调用 MyMM
,无需禁用自动类型转换或手动转换输入:
mymm = MyMM.applywith autocast(device_type='cuda', dtype=torch.float16):output = mymm(input1, input2)
需要特定 dtype
的函数
考虑一个需要 torch.float32
输入的自定义函数。
对 forward
应用 custom_fwd(device_type='cuda', cast_inputs=torch.float32)
,
对 backward
应用 custom_bwd(device_type='cuda')
。
如果 forward
在启用自动混合精度(autocast)的区域中运行,这些装饰器会将浮点张量输入转换为参数 device_type 指定的设备(本例中为 CUDA)上的 float32
类型,并在 forward
和 backward
期间局部禁用自动混合精度。
class MyFloat32Func(torch.autograd.Function):@staticmethod@custom_fwd(device_type='cuda', cast_inputs=torch.float32)def forward(ctx, input):ctx.save_for_backward(input)...return fwd_output@staticmethod@custom_bwd(device_type='cuda')def backward(ctx, grad):...
现在可以随处调用 MyFloat32Func
,无需手动禁用自动类型转换或转换输入参数:
func = MyFloat32Func.applywith autocast(device_type='cuda', dtype=torch.float16):# func will run in float32, regardless of the surrounding autocast stateoutput = func(input)
自动微分机制
https://pytorch.org/docs/stable/notes/autograd.html
本说明将概述自动微分(autograd)的工作原理及其操作记录方式。虽然并非必须完全理解这些内容,但我们建议您熟悉它,因为这将帮助您编写更高效、更简洁的程序,并有助于调试工作。
自动微分如何编码历史记录
Autograd 是一个反向自动微分系统。从概念上讲,当您执行运算时,autograd 会记录一个运算图,保存所有创建数据的操作,形成一个有向无环图。这个图的叶子节点是输入张量,根节点是输出张量。通过从根节点到叶子节点追踪这个图,您可以利用链式法则自动计算梯度。
在内部,autograd 将这个图表示为 Function
对象(实际上是表达式)组成的图,可以通过 apply()
方法来计算图的求值结果。在计算前向传播时,autograd 会同时执行请求的计算,并构建一个表示梯度计算函数的图(每个 torch.Tensor
的 .grad_fn
属性就是这个图的入口点)。前向传播完成后,我们会在反向传播中评估这个图以计算梯度。
需要注意的是,这个图在每次迭代时都会从头开始重新创建。正是这一点使得我们可以使用任意的 Python 控制流语句,在每次迭代时改变图的整体形状和大小。您不需要在启动训练前编码所有可能的路径——运行的内容就是您要微分的对象。
保存的张量
某些操作需要在正向传播过程中保存中间结果,以便执行反向传播。例如,函数 x↦x² 会保存输入 x 来计算梯度。
当定义自定义 Python Function
时,你可以使用 save_for_backward()
在正向传播期间保存张量,并通过 saved_tensors
在反向传播期间检索它们。更多信息请参阅 扩展 PyTorch。
对于 PyTorch 内置的操作(如 torch.pow()
),系统会根据需要自动保存张量。出于学习或调试目的,你可以通过查找以 _saved
前缀开头的属性,来探索特定 grad_fn
保存了哪些张量。
x = torch.randn(5, requires_grad=True)
y = x.pow(2)
print(x.equal(y.grad_fn._saved_self)) # True
print(x is y.grad_fn._saved_self) # True
在前面的代码中,y.grad_fn._saved_self
指向与 x 相同的 Tensor 对象。
但情况并非总是如此。例如:
x = torch.randn(5, requires_grad=True)
y = x.exp()
print(y.equal(y.grad_fn._saved_result)) # True
print(y is y.grad_fn._saved_result) # False
在底层实现中,为了防止引用循环,PyTorch 会在保存时对张量进行打包操作,并在读取时将其解包为另一个张量。此时,通过访问 y.grad_fn._saved_result
获取的张量与 y
属于不同的张量对象(但它们仍共享相同的存储空间)。
一个张量是否会被打包成不同的张量对象,取决于它是否是其自身 grad_fn 的输出——这是一个可能变动的实现细节,用户不应依赖该行为。
您可以通过保存张量的钩子函数来控制 PyTorch 的打包/解包行为。
不可微函数的梯度计算
使用自动微分进行梯度计算时,仅当每个基本函数都可微时才有效。但实际应用中许多函数并不满足这一特性(例如 relu
或 sqrt
在 0
点处)。为了降低不可微函数的影响,我们按以下优先级规则定义基本操作的梯度:
1、若函数当前点可微且梯度存在,则直接采用该梯度。
2、若函数(至少局部)是凸的,采用最小范数的次梯度(即最速下降方向)。
3、若函数(至少局部)是凹的,考虑 -f(x) 并采用最小范数的超梯度(应用上一条规则)。
4、若函数有定义,通过连续性定义当前点的梯度(注意此处可能得到 inf
,例如 sqrt(0)
)。若存在多个可能值,则任选其一。
5、若函数无定义(例如 sqrt(-1)
、log(-1)
或输入为 NaN
时的大多数函数),则梯度值可任意选取(也可能报错但不保证)。多数函数会使用 NaN
作为梯度,但出于性能考虑,某些函数会采用其他值(如 log(-1)
)。
6、若函数是非确定性映射(即不符合数学函数定义),则标记为不可微。这将导致在非 no_grad
环境下对需要梯度的张量使用时,反向传播阶段会报错。
局部禁用梯度计算
Python 提供了多种机制来局部禁用梯度计算:
要禁用整个代码块的梯度计算,可以使用上下文管理器,例如无梯度模式(no-grad mode)和推理模式(inference mode)。对于更细粒度地排除子图参与梯度计算,可以设置张量的 requires_grad
字段。
下文除了讨论上述机制外,还会介绍评估模式(nn.Module.eval()
)。虽然该方法并非用于禁用梯度计算,但由于其名称,常被与前三种机制混淆。
设置 requires_grad
requires_grad
是一个标志位,默认值为 false,除非被包装在 nn.Parameter
中。它允许精细控制哪些子图参与梯度计算,并在前向传播和反向传播中均生效:
- 前向传播期间:只有当操作的至少一个输入张量需要梯度时,该操作才会被记录到反向计算图中。
- 反向传播期间(调用
.backward()
):只有requires_grad=True
的叶子张量才会将梯度累积到它们的.grad
字段中。
需特别注意:虽然每个张量都有此标志位,但设置它仅对叶子张量有意义(即没有 grad_fn
的张量,例如 nn.Module
的参数)。非叶子张量(具有 grad_fn
的张量)已关联反向计算图,其梯度会作为中间结果用于计算需要梯度的叶子张量。因此,所有非叶子张量会自动设置为 require_grad=True
。
设置 requires_grad
是控制模型哪些部分参与梯度计算的主要方式。例如,在模型微调时冻结预训练模型的部分参数:
1、冻结参数:对不希望更新的参数直接调用 .requires_grad_(False)
。如上所述,使用这些参数作为输入的计算不会被记录在前向传播中,因此它们不会出现在反向计算图中,自然也不会在反向传播时更新 .grad
字段。
2、模块级设置:由于此操作非常常见,requires_grad
也可以通过 nn.Module.requires_grad_()
在模块级别设置。对模块调用此方法时,会作用于该模块的所有参数(默认情况下这些参数的 requires_grad=True
)。
梯度模式
除了设置 requires_grad
外,还可以通过 Python 选择三种梯度模式,这些模式会影响 PyTorch 内部 autograd 处理计算的方式:默认模式(梯度模式)、无梯度模式和推理模式。所有模式都可以通过上下文管理器和装饰器进行切换。
模式 | 不记录操作到反向计算图中 | 跳过额外的 autograd 跟踪开销 | 在该模式下创建的张量后续可用于梯度模式 | 示例场景 |
---|---|---|---|---|
默认模式 | ✓ | 前向传播 | ||
无梯度模式 | ✓ | ✓ | 优化器参数更新 | |
推理模式 | ✓ | ✓ | 数据处理、模型评估 |
默认模式(梯度模式)
“默认模式"是指当没有启用其他模式(如无梯度模式和推理模式)时,我们隐式处于的模式。为了与"无梯度模式"形成对比,默认模式有时也被称为"梯度模式”。
关于默认模式最重要的一点是:这是唯一能让requires_grad
生效的模式。在其他两种模式下,requires_grad
都会被强制设为False
。
无梯度模式
在无梯度模式下,计算过程会表现得好像所有输入都不需要梯度。换句话说,即使存在require_grad=True
的输入,无梯度模式下的计算也不会被记录到反向图中。
当你需要执行不应被autograd记录的操作,但又希望稍后在梯度模式下使用这些计算的输出时,可以启用无梯度模式。这个上下文管理器能方便地禁用代码块或函数的梯度计算,而无需临时将张量设置为requires_grad=False
后再改回True
。
例如,在编写优化器时,无梯度模式可能非常有用:执行训练更新时,你希望就地更新参数而不被autograd记录。同时你还打算在下一个前向传播中使用更新后的参数进行梯度模式下的计算。
torch.nn.init中的实现也依赖无梯度模式来初始化参数,从而避免就地更新初始化参数时被autograd跟踪。
推理模式
推理模式是无梯度模式的极端版本。与无梯度模式类似,推理模式下的计算不会被记录到反向图中,但启用推理模式能让PyTorch进一步加速模型运行。这种运行时优化带来一个缺点:在推理模式下创建的张量,退出该模式后将无法用于需要被autograd记录的计算中。
当满足以下两个条件时,建议启用推理模式:
1、执行的计算不需要与autograd交互
2、不计划将推理模式下创建的张量用于后续需要被autograd记录的任何计算
推荐在不需要autograd追踪的代码部分尝试推理模式(例如数据处理和模型评估)。如果该模式能直接适用于你的使用场景,这将带来免费的性能提升。若启用后出现错误,请检查是否在退出推理模式后,将推理模式下创建的张量用于了需要被autograd记录的计算。如果确实无法避免这种情况,可以随时切换回无梯度模式。
有关推理模式的详细说明,请参阅:推理模式
推理模式的实现细节参见:RFC-0011-InferenceMode
评估模式 (nn.Module.eval()
)
评估模式并非局部禁用梯度计算的机制。此处之所以提及,是因为它有时会被误认为是这样的机制。
从功能上讲,module.eval()
(或等效的module.train(False)
)与无梯度模式和推理模式完全正交。model.eval()
如何影响模型,完全取决于模型中使用的具体模块以及这些模块是否定义了训练模式下的特定行为。
如果模型依赖于某些模块(例如 torch.nn.Dropout
和 torch.nn.BatchNorm2d
),而这些模块的行为可能因训练模式而异(例如为了避免在验证数据上更新批归一化的运行统计量),则需要自行调用 model.eval()
和 model.train()
。
建议在训练时始终使用 model.train()
,在评估模型(验证/测试)时始终使用 model.eval()
,即使不确定模型是否存在训练模式下的特定行为。因为所使用的模块可能会更新,从而在训练和评估模式下表现出不同的行为。
自动求导中的原地操作
在自动求导系统中支持原地操作是一个复杂的问题,大多数情况下我们建议避免使用。自动求导通过积极的缓冲区释放和重用机制实现了高效运行,真正能通过原地操作显著降低内存占用的场景非常罕见。除非面临严重的内存压力,否则您可能永远不需要使用原地操作。
限制原地操作适用性的主要原因有两点:
1、原地操作可能会覆盖计算梯度所需的值。
2、每个原地操作都需要重写计算图实现。非原地版本只需分配新对象并保留旧图的引用,而原地操作需要将所有输入的创建者更改为代表该操作的Function
。当多个张量共享同一存储空间时(例如通过索引或转置创建),这一过程会变得棘手。如果被修改输入的存储空间被其他任何Tensor
引用,原地函数将报错。
原地操作的正确性检查
每个张量都维护一个版本计数器,每当它在任何操作中被标记为"脏数据"时,该计数器就会递增。当Function保存任何张量用于反向传播时,其所属Tensor的版本计数器也会被保存。一旦访问self.saved_tensors
,系统就会检查当前版本号,如果该值大于保存的值,就会引发错误。这样可以确保:如果您使用了原地操作函数且没有看到任何错误,就能确定计算得到的梯度是正确的。
多线程自动求导
自动求导引擎负责运行所有必要的反向操作以完成反向传播计算。本节将详细介绍如何在线程环境中高效利用该引擎。(此内容仅适用于 PyTorch 1.6+ 版本,早期版本的行为有所不同。)
用户可以通过多线程代码(例如 Hogwild 训练)来训练模型,且不会阻塞并发的反向计算。示例代码如下:
# Define a train function to be used in different threads
def train_fn():x = torch.ones(5, 5, requires_grad=True)# forwardy = (x + 3) * (x + 4) * 0.5# backwardy.sum().backward()# potential optimizer update# User write their own threading code to drive the train_fn
threads = []
for _ in range(10):p = threading.Thread(target=train_fn, args=())p.start()threads.append(p)for p in threads:p.join()
请注意以下用户需要了解的行为特性:
CPU 并发处理
当你在 CPU 上通过 Python 或 C++ API 在多个线程中运行 backward()
或 grad()
时,预期会看到额外的并发效果,而不是在执行过程中按特定顺序串行化所有反向传播调用(这是 PyTorch 1.6 版本之前的行为)。
非确定性行为
当从多个线程并发调用backward()
且存在共享输入时(例如Hogwild CPU训练场景),可能会出现非确定性结果。这是因为参数会自动在线程间共享,导致多个线程在梯度累积过程中可能同时访问并尝试累加同一个.grad
属性。从技术上讲,这种操作并不安全,可能引发竞态条件,最终导致计算结果无效。
开发含共享参数的多线程模型时,开发者必须充分考虑线程模型的设计,并充分理解上述潜在问题。
作为替代方案,可以使用函数式API torch.autograd.grad()
来计算梯度,而非直接调用backward()
,从而避免非确定性行为的发生。
图保留机制
当自动微分计算图的部分内容在线程间共享时(例如,先在单线程中运行前向传播的第一部分,然后在多线程中运行第二部分),图的第一部分会被共享。这种情况下,不同线程对同一计算图执行grad()
或backward()
时,可能会出现一个线程正在动态销毁计算图,而另一个线程因此崩溃的问题。Autograd会向用户报错——类似于未设置retain_graph=True
时两次调用backward()
的情况,并提示用户应当使用retain_graph=True
参数。
Autograd节点的线程安全性
由于Autograd允许调用线程驱动其反向执行以实现潜在并行化,我们必须确保CPU上共享部分或整个GraphTask的并行backward()
调用具有线程安全性。
自定义Python的autograd.Function
由于GIL的存在自动具备线程安全性。对于内置的C++ Autograd节点(如AccumulateGrad、CopySlices)和自定义的autograd::Function
,Autograd引擎会使用线程互斥锁来确保可能涉及状态读写操作的autograd节点的线程安全。
C++钩子的线程安全性问题
Autograd依赖于用户自行编写线程安全的C++钩子。若需在多线程环境中正确应用钩子,您需要编写适当的线程锁定代码来确保钩子的线程安全性。
复数自动求导
简而言之:
- 当你使用PyTorch对任意定义域和/或值域为复数的函数f(z)进行微分时,梯度计算会假设该函数是一个更大的实值损失函数g(input)=L的一部分。计算得到的梯度是∂L/∂z*(注意z的共轭),其负方向正是梯度下降算法中所使用的最陡下降方向。因此,现有优化器可以直接用于复数参数。
- 这一约定与TensorFlow的复数微分约定一致,但不同于JAX(后者计算∂L/∂z)。
- 如果你的函数是从实数到实数,但内部使用了复数运算,此处的约定无关紧要:你得到的结果与仅使用实数运算实现时完全相同。
若想了解数学细节,或想知道如何在PyTorch中定义复数导数,请继续阅读。
什么是复导数?
复可微性的数学定义采用了导数的极限定义,并将其推广到复数运算中。考虑一个函数 f: ℂ → ℂ,
f ( z = x + y j ) = u ( x , y ) + v ( x , y ) j f(z=x+yj)=u(x,y)+v(x,y)j f(z=x+yj)=u(x,y)+v(x,y)j
其中,u 和 v 是二元实值函数,j 是虚数单位。
根据导数的定义,我们可以写出:
f ′ ( z ) = lim h → 0 , h ∈ C f ( z + h ) − f ( z ) h f^′(z) = \lim_{h \to 0, h \in C} \frac{f(z+h) - f(z)}{h} f′(z)=h→0,h∈Climhf(z+h)−f(z)
为了使该极限存在,不仅 u 和 v 必须是实可微的,而且 f 还必须满足 柯西-黎曼方程。
换句话说,对于实部和虚部步长(h)计算的极限必须相等。这是一个更为严格的条件。
复可微函数通常被称为全纯函数。它们具有良好的性质,具备实可微函数的所有优点,但在优化领域几乎没有实际用途。对于优化问题,研究界通常只使用实值目标函数,因为复数不属于任何有序域,因此复数值的损失函数没有实际意义。
此外,现实中也没有任何有意义的实值目标函数能满足柯西-黎曼方程。因此,全纯函数的理论无法用于优化,大多数人转而使用 Wirtinger 微积分。
维廷格微积分登场…
我们拥有如此优秀的复可微理论和全纯函数理论,却因为许多常用函数并非全纯而无法使用。数学家们该怎么办呢?维廷格观察到,即使函数f(z)不是全纯的,也可以将其重写为二元函数f(z,z*),后者总是全纯的。这是因为z的实部和虚部都可以用z和z*表示:
R e ( z ) = z + z ∗ 2 Re(z)= \frac{z+z^∗}{2} Re(z)=2z+z∗
I m ( z ) & = z − z ∗ 2 Im(z) \&= \frac {z - z^*}{2} Im(z)&=2z−z∗
维廷格微积分建议转而研究 f ( z , z ∗ ) f(z,z^*) f(z,z∗),只要f是实可微的,这个函数就保证是全纯的(另一种理解方式是坐标系变换,从f(x,y)转换到f(z,z*))。这个函数具有偏导数∂/∂z和∂/∂z*。我们可以通过链式法则建立这些偏导数与z的实部和虚部偏导数之间的关系。
从上述方程中,我们得到:
这正是你在维基百科上能找到的维廷格微积分的经典定义。
这一转变带来了许多美妙的结果:
- 首先,柯西-黎曼方程简化为∂f/∂z* = 0(也就是说,函数f可以完全用z表示,无需参考z*)。
- 另一个重要(且有些反直觉)的结果是,当我们对实值损失进行优化时,变量更新应采取的步骤由∂Loss/∂z*给出(而不是∂Loss/∂z),这一点我们稍后会看到。
更多阅读资料,请查看:https://arxiv.org/pdf/0906.4835.pdf
Wirtinger微积分在优化中有何作用?
音频等领域的研究人员通常使用梯度下降法来优化具有复变量的实值损失函数。通常,他们会将实部和虚部视为可单独更新的独立通道。对于步长α/2和损失函数L,我们可以在ℝ²空间中写出以下方程:
这些方程如何转换到复数空间ℂ?
这里发生了非常有趣的现象:Wirtinger微积分告诉我们,可以将上述复变量更新公式简化为仅涉及共轭Wirtinger导数∂L/∂z*,从而直接得到优化过程中采用的步长。
由于共轭Wirtinger导数能准确给出实值损失函数所需的更新步长,PyTorch在对实值损失函数进行微分时,会直接返回这个导数。
PyTorch如何计算共轭Wirtinger导数?
通常,我们的导数公式将grad_output作为输入,表示我们已经计算过的Vector-Jacobian乘积,即∂L/∂s*,其中L是整个计算过程的损失函数(产生实数损失),s是我们函数的输出。这里的目标是计算∂L/∂z*,其中z是函数的输入。事实证明,在实数损失的情况下,我们仅需要计算∂L/∂s*,尽管链式法则表明我们还需要访问∂L/∂s。如果你想跳过这个推导过程,可以直接查看本节最后一个公式,然后跳到下一节。
让我们继续处理定义为f(z) = f(x+yj) = u(x,y) + v(x,y)j的函数f: ℂ → ℂ。如上所述,autograd的梯度约定围绕实数损失函数的优化展开,因此假设f是更大的实数损失函数g的一部分。使用链式法则,我们可以写出:
本文翻译整理自:https://github.com/google/diff-match-patch
一、关于 Diff Match Patch
Diff Match Patch 是一个支持多语言的高性能文本处理库,提供强大的算法来实现纯文本同步所需的操作。
相关链接资源
- github : https://github.com/google/diff-match-patch
- 官网:https://neil.fraser.name/software/diff_match_patch/
- 官方文档:https://github.com/google/diff-match-patch/wiki
- Paper : https://neil.fraser.name/writing/diff/myers.pdf
- Demo/在线试用:
- Diff 演示:https://neil.fraser.name/software/diff_match_patch/demos/diff.html
- Match 演示:https://neil.fraser.name/software/diff_match_patch/demos/match.html
- Patch 演示:https://neil.fraser.name/software/diff_match_patch/demos/patch.html
- Support : https://groups.google.com/forum/#!forum/diff-match-patch
- License : Apache License 2.0
关键功能特性
1、Diff:
- 比较两个纯文本块并高效返回差异列表
- 查看详情:https://neil.fraser.name/software/diff_match_patch/demos/diff.html
2、Match:
- 在纯文本块中模糊匹配搜索字符串,同时考虑准确性和位置权重
- 查看详情:https://neil.fraser.name/software/diff_match_patch/demos/match.html
3、Patch:
- 将补丁列表应用到纯文本,即使基础文本不匹配也会尽力应用
- 查看详情:https://neil.fraser.name/software/diff_match_patch/demos/patch.html
该库最初于2006年为Google Docs开发,现支持C++、C#、Dart、Java、JavaScript、Lua、Objective C和Python等多种语言。
二、参考文档
- API - 所有语言通用的API
- 行或词差异 - 更简洁的差异展示
- 纯文本与结构化内容 - 处理XML等数据的方法
- Unidiff - 补丁序列化格式
三、多语言支持
虽然各语言版本使用相同API,但存在一些语言特定说明:
- C++
- C#
- Dart
- Java
- JavaScript
- Lua
- Objective-C
- Python
标准化速度测试可查看https://docs.google.com/spreadsheets/d/1zpZccuBpjMZTvL1nGDMKJc7rWL_m_drF4XKOJvB27Kc/edit#gid=0比较各语言性能差异。
四、算法实现
本库采用Myer’s差异算法,该算法被公认为最佳通用差异算法。算法外围还实现了预处理加速和后处理优化,显著提升性能和输出质量。
同时实现了Bitap匹配算法作为灵活匹配和补丁策略的核心。
五、Python 实现
所有语言版本都使用相同API,以下是Python版本的特别说明。
Python存在两个不兼容版本:Python 2和Python 3。请选择与解释器兼容的版本,Python 2的运行速度略快于Python 3。可通过python --version
命令查看已安装版本。
1、Hello World示例
以下是Python中最简单的差异处理示例:
import diff_match_patch as dmp_moduledmp = dmp_module.diff_match_patch()
diff = dmp.diff_main("Hello World.", "Goodbye World.")
# Result: [(-1, "Hell"), (1, "G"), (0, "o"), (1, "odbye"), (0, " World.")]
dmp.diff_cleanupSemantic(diff)
# Result: [(-1, "Hello"), (1, "Goodbye"), (0, " World.")]
print(diff)
进入python2
或python3
目录,将上述程序保存为hello.py
后执行python hello.py
。
2、测试说明
单元测试可在python/tests
目录下执行python diff_match_patch_test.py
运行,应通过20多个测试组且零失败。
差异速度测试可通过python speedtest.py
执行,耗时约20秒。
伊织 xAI 2025-04-27(日)
现在,利用Wirtinger导数的定义,我们可以写出:
这里需要注意的是,由于u和v是实函数,且根据我们假设f是实数损失函数的一部分,L也是实数,因此有:
(2) (∂L/∂s)* = ∂L/∂s*
即,∂L/∂s等于grad_output的共轭。
通过解上述方程求∂L/∂u和∂L/∂v,我们得到:
(3) ∂L/∂u = ∂L/∂s + ∂L/∂s*
∂L/∂v = 1j * (∂L/∂s - ∂L/∂s*)
将(3)代入(1),我们得到:
∂L/∂z* = (∂L/∂s + ∂L/∂s*) * (∂u/∂z*) + 1j * (∂L/∂s - ∂L/∂s*) * (∂v/∂z*)
= ∂L/∂s * (∂u/∂z* + ∂v/∂z* j) + ∂L/∂s* * (∂u/∂z* - ∂v/∂z* j)
= ∂L/∂s * ∂(u + vj)/∂z* + ∂L/∂s* * ∂(u + vj)/∂z
= ∂L/∂s * ∂s/∂z* + ∂L/∂s* * ∂s*/∂z*
利用(2),我们得到:
(4) ∂L/∂z* = (∂L/∂s*)* * (∂s/∂z*) + ∂L/∂s* * (∂s/∂z)*
= (grad_output)* * (∂s/∂z*) + grad_output * (∂s/∂z)*
最后一个公式对于编写自定义梯度非常重要,因为它将我们的导数公式分解为一个更简单且易于手动计算的表达式。
如何为复杂函数编写自定义导数公式?
上述方框中的方程给出了所有复杂函数导数的通用公式。但我们仍需计算 ∂s/∂z 和 ∂s/∂z*。有两种方法可以实现:
- 第一种方法是直接使用Wirtinger导数的定义,通过∂s/∂x和∂s/∂y(可按常规方式计算)来求解∂s/∂z和∂s/∂z*。
- 第二种方法是采用变量替换技巧,将f(z)改写为二元函数f(z, z*),并将z和z视为独立变量来计算共轭Wirtinger导数。这种方法通常更简便:例如当函数是解析函数时,只会用到z(此时∂s/∂z为零)。
以函数f(z = x + yj) = c * z = c * (x + yj)为例(其中c∈ℝ):
使用第一种方法计算Wirtinger导数:
∂s/∂z = 1/2 * (∂s/∂x - ∂s/∂y j) = 1/2 * (c - (c * 1j) * 1j) = c ∂s/∂z* = 1/2 * (∂s/∂x + ∂s/∂y j) = 1/2 * (c + (c * 1j) * 1j) = 0
根据公式(4),当grad_output = 1.0(PyTorch中对标量输出调用backward()
时的默认梯度输出值)时,可得:
∂L/∂z* = 1 * 0 + 1 * c = c
使用第二种方法计算Wirtinger导数,直接得到:
∂s/∂z = ∂(cz)/∂z = c
∂s/∂z = ∂(cz)/∂z = 0
再次应用公式(4)可得∂L/∂z* = c。显然第二种方法计算量更小,更适合快速求解。
(注:所有数学公式和代码块保持原样,术语如"Wirtinger导数"、"PyTorch"等未翻译,符合核心翻译原则)
跨域函数如何处理?
有些函数会将复数输入映射到实数输出,或者反过来。这些函数构成了(4)的特殊情况,我们可以通过链式法则推导出:
对于函数 f: ℂ → ℝ,得到:
∂L/∂z∗ = 2 * grad_output * ∂s/∂z∗
∂L/∂z∗ = 2 * grad_output * ∂s/∂z∗
- 对于函数 f: ℝ → ℂ,得到:
∂L/∂z∗ = 2 * Re(grad_output∗ * ∂s/∂z∗)
∂L/∂z∗ = 2 * Re(grad_output∗ * ∂s/∂z∗)
张量保存的钩子函数
您可以通过定义一对 pack_hook
/ unpack_hook
钩子函数来控制已保存张量的打包/解包方式。pack_hook
函数应接收一个张量作为唯一参数,但可以返回任意 Python 对象(例如另一个张量、元组,甚至是包含文件名的字符串)。unpack_hook
函数以 pack_hook
的输出作为唯一参数,并应返回一个用于反向传播的张量。unpack_hook
返回的张量只需与输入 pack_hook
的张量内容相同即可,特别地,所有与自动微分相关的元数据都可以忽略,因为在解包过程中会被覆盖。
以下是一个钩子函数对的示例:
class SelfDeletingTempFile():def __init__(self):self.name = os.path.join(tmp_dir, str(uuid.uuid4()))def __del__(self):os.remove(self.name)def pack_hook(tensor):temp_file = SelfDeletingTempFile()torch.save(tensor, temp_file.name)return temp_filedef unpack_hook(temp_file):return torch.load(temp_file.name)
请注意,unpack_hook
不应删除临时文件,因为它可能会被多次调用:只要返回的 SelfDeletingTempFile 对象存在,临时文件就应该保持有效。在上面的示例中,我们通过在不再需要时(即 SelfDeletingTempFile 对象被删除时)关闭临时文件来防止文件泄漏。
注意:我们保证 pack_hook
只会被调用一次,但 unpack_hook
可能会根据反向传播的需要被多次调用,并且我们期望它每次返回相同的数据。
警告:禁止对任何函数的输入执行原地操作,因为这可能导致意外的副作用。如果修改了 pack hook 的输入,PyTorch 会抛出错误,但不会捕获修改 unpack hook 输入的情况。
为保存的张量注册钩子
您可以通过在 SavedTensor
对象上调用 register_hooks()
方法来注册一对钩子。这些对象会作为 grad_fn
的属性暴露出来,并以 _raw_saved_
前缀开头。
x = torch.randn(5, requires_grad=True)
y = x.pow(2)
y.grad_fn._raw_saved_self.register_hooks(pack_hook, unpack_hook)
当注册张量对时,会立即调用 pack_hook
方法。
每次需要访问已保存的张量时(无论是通过 y.grad_fn._saved_self
还是在反向传播过程中),都会调用 unpack_hook
方法。
警告:如果在保存的张量被释放后(即调用 backward 之后)仍持有对 SavedTensor
的引用,则禁止调用其 register_hooks()
方法。
大多数情况下 PyTorch 会抛出错误,但在某些情况下可能无法检测到,从而导致未定义行为。
为保存的张量注册默认钩子
或者,你可以使用上下文管理器 saved_tensors_hooks
来注册一对钩子,这些钩子将应用于在该上下文中创建的所有保存的张量。
示例:
# Only save on disk tensors that have size >= 1000
SAVE_ON_DISK_THRESHOLD = 1000def pack_hook(x):if x.numel() < SAVE_ON_DISK_THRESHOLD:return xtemp_file = SelfDeletingTempFile()torch.save(tensor, temp_file.name)return temp_filedef unpack_hook(tensor_or_sctf):if isinstance(tensor_or_sctf, torch.Tensor):return tensor_or_sctfreturn torch.load(tensor_or_sctf.name)class Model(nn.Module):def forward(self, x):with torch.autograd.graph.saved_tensors_hooks(pack_hook, unpack_hook):# ... compute outputoutput = xreturn outputmodel = Model()
net = nn.DataParallel(model)
使用此上下文管理器定义的钩子是线程局部的。
因此,以下代码不会产生预期效果,因为这些钩子不会通过DataParallel传递。
# Example what NOT to donet = nn.DataParallel(model)
with torch.autograd.graph.saved_tensors_hooks(pack_hook, unpack_hook):output = net(input)
请注意,使用这些钩子会禁用所有旨在减少张量对象创建的优化措施。例如:
with torch.autograd.graph.saved_tensors_hooks(lambda x: x, lambda x: x):x = torch.randn(5, requires_grad=True)y = x * x
在没有钩子的情况下,x
、y.grad_fn._saved_self
和 y.grad_fn._saved_other
都指向同一个张量对象。
使用钩子后,PyTorch 会将 x 打包和解包成两个新的张量对象,
这些新对象与原始 x 共享相同的存储空间(不执行复制操作)。
反向钩子执行机制
本节将讨论不同类型钩子的触发条件与非触发场景,并阐述它们的执行顺序。涵盖的钩子类型包括:
1、通过 torch.Tensor.register_hook()
注册到 Tensor 的反向钩子
2、通过 torch.Tensor.register_post_accumulate_grad_hook()
注册到 Tensor 的梯度累加后钩子
3、通过 torch.autograd.graph.Node.register_hook()
注册到 Node 的后置钩子
4、通过 torch.autograd.graph.Node.register_prehook()
注册到 Node 的前置钩子
特定钩子何时会被触发
通过 torch.Tensor.register_hook()
注册到张量的钩子,会在计算该张量的梯度时执行。(注意:这并不要求执行张量的 grad_fn。例如,如果张量作为 torch.autograd.grad()
的 inputs
参数的一部分传递,该张量的 grad_fn 可能不会执行,但注册到该张量的钩子始终会执行。)
通过 torch.Tensor.register_post_accumulate_grad_hook()
注册到张量的钩子,会在该张量的梯度累积完成后执行,即张量的 grad 字段已被设置。而通过 torch.Tensor.register_hook()
注册的钩子在梯度计算过程中运行,通过 torch.Tensor.register_post_accumulate_grad_hook()
注册的钩子仅在反向传播结束时由 autograd 更新张量的 grad 字段后触发。因此,后累积梯度钩子只能注册到叶子张量。在非叶子张量上通过 torch.Tensor.register_post_accumulate_grad_hook()
注册钩子会报错,即使调用 backward(retain_graph=True) 也是如此。
通过 torch.autograd.graph.Node.register_hook()
或 torch.autograd.graph.Node.register_prehook()
注册到 torch.autograd.graph.Node
的钩子,仅在注册的 Node 被执行时才会触发。
特定 Node 是否执行可能取决于反向传播是通过 torch.autograd.grad()
还是 torch.autograd.backward()
调用的。具体来说,当你对传递给 torch.autograd.grad()
或 torch.autograd.backward()
的 inputs
参数中的张量对应的 Node 注册钩子时,应注意这些差异。
如果使用 torch.autograd.backward()
,无论是否指定了 inputs
参数,上述所有钩子都会执行。这是因为 .backward() 会执行所有 Node,即使它们对应于作为输入指定的张量。(注意:这种对作为 inputs
传递的张量对应的额外 Node 的执行通常是不必要的,但仍会执行。此行为可能会更改,不应依赖它。)
另一方面,如果使用 torch.autograd.grad()
,注册到传递给 input
的张量对应的 Node 的反向钩子可能不会执行,因为这些 Node 不会被执行,除非有其他输入依赖于该 Node 的梯度结果。
不同钩子的触发顺序
事件发生的顺序如下:
1、注册到 Tensor 的钩子首先执行
2、如果 Node 被执行,则注册到 Node 的 pre-hooks 执行
3、对于设置了 retain_grad
的 Tensor,其 .grad
字段会被更新
4、Node 执行(需遵守上述规则)
5、对于已累积 .grad
的叶子 Tensor,执行 post-accumulate-grad 钩子
6、如果 Node 被执行,则注册到 Node 的 post-hooks 执行
若同一 Tensor 或 Node 上注册了多个同类型钩子,
它们将按注册顺序依次执行。
后执行的钩子可以观察到先前钩子对梯度所做的修改。
特殊钩子
torch.autograd.graph.register_multi_grad_hook()
是通过注册到张量的钩子实现的。每个独立的张量钩子会按照上述定义的张量钩子顺序触发,当最后一个张量梯度计算完成后,注册的多梯度钩子就会被调用。
torch.nn.modules.module.register_module_full_backward_hook()
则是通过注册到节点的钩子实现的。在前向计算过程中,钩子会被注册到与模块输入输出对应的 grad_fn 上。由于一个模块可能接收多个输入并返回多个输出,系统会先在模块输入前应用一个虚拟的自定义 autograd Function,并在模块输出前对前向计算的返回结果进行处理,以确保这些张量共享同一个 grad_fn,从而能够将我们的钩子附加到该节点上。
当张量被原地修改时 Tensor hooks 的行为表现
通常情况下,注册在张量上的钩子会接收到输出相对于该张量的梯度,此时张量的取值是反向计算时的当前值。
然而,如果你先给张量注册钩子,然后对该张量进行原地修改,那么:
- 在原地修改前注册的钩子同样会接收到输出相对于该张量的梯度
- 但此时张量的取值会采用原地修改前的值
若你希望保持第一种情况的行为特性,应在完成所有原地修改操作后再给张量注册钩子。
示例说明:
t = torch.tensor(1., requires_grad=True).sin()
t.cos_()
t.register_hook(fn)
t.backward()
此外,了解以下底层机制会很有帮助:当钩子注册到张量时,它们实际上会永久绑定到该张量的grad_fn。因此,如果该张量随后被原地修改,即使张量现在有了新的grad_fn,之前注册的钩子仍会与旧的grad_fn保持关联。例如,当自动微分引擎在计算图中执行到该张量的旧grad_fn时,这些钩子仍会被触发。
广播语义
许多PyTorch操作支持NumPy的广播语义。详情请参阅https://numpy.org/doc/stable/user/basics.broadcasting.html。
简而言之,如果一个PyTorch操作支持广播,那么它的张量参数可以自动扩展为相同大小(无需复制数据)。
通用语义
两个张量在满足以下规则时可进行"广播"操作:
- 每个张量至少有一个维度
- 从尾部维度开始迭代维度大小时,维度大小必须满足以下条件之一:
- 相等
- 其中一个为1
- 其中一个维度不存在
例如:
>>> x=torch.empty(5,7,3)
>>> y=torch.empty(5,7,3)
# same shapes are always broadcastable (i.e. the above rules always hold)>>> x=torch.empty((0,))
>>> y=torch.empty(2,2)
# x and y are not broadcastable, because x does not have at least 1 dimension# can line up trailing dimensions
>>> x=torch.empty(5,3,4,1)
>>> y=torch.empty( 3,1,1)
# x and y are broadcastable.
# 1st trailing dimension: both have size 1
# 2nd trailing dimension: y has size 1
# 3rd trailing dimension: x size == y size
# 4th trailing dimension: y dimension doesn't exist# but:
>>> x=torch.empty(5,2,4,1)
>>> y=torch.empty( 3,1,1)
# x and y are not broadcastable, because in the 3rd trailing dimension 2 != 3
如果两个张量 x
和 y
是"可广播的",则结果张量的大小按以下方式计算:
- 如果
x
和y
的维度数不相等,则在维度较少的张量前面补 1,使它们的维度长度相等。 - 然后,对于每个维度大小,结果维度大小是
x
和y
在该维度上的最大值。
例如:
# can line up trailing dimensions to make reading easier
>>> x=torch.empty(5,1,4,1)
>>> y=torch.empty( 3,1,1)
>>> (x+y).size()
torch.Size([5, 3, 4, 1])# but not necessary:
>>> x=torch.empty(1)
>>> y=torch.empty(3,1,7)
>>> (x+y).size()
torch.Size([3, 1, 7])>>> x=torch.empty(5,2,4,1)
>>> y=torch.empty(3,1,1)
>>> (x+y).size()
RuntimeError: The size of tensor a (2) must match the size of tensor b (3) at non-singleton dimension 1
原地操作语义
一个复杂之处在于,原地操作不允许参与广播的原地张量改变形状。
例如:
>>> x=torch.empty(5,3,4,1)
>>> y=torch.empty(3,1,1)
>>> (x.add_(y)).size()
torch.Size([5, 3, 4, 1])# but:
>>> x=torch.empty(1,3,1)
>>> y=torch.empty(3,1,7)
>>> (x.add_(y)).size()
RuntimeError: The expanded size of the tensor (1) must match the existing size (7) at non-singleton dimension 2.
向后兼容性
PyTorch 的早期版本允许某些逐点函数在不同形状的张量上执行,只要每个张量的元素数量相同即可。这种情况下,逐点操作会通过将每个张量视为一维形式来完成。现在 PyTorch 已支持广播机制,这种"一维"逐点操作行为被视为已弃用,当张量不可广播但元素数量相同时,会生成 Python 警告。
需要注意的是,广播机制的引入可能导致向后不兼容的情况,即当两个张量形状不同但可广播且元素数量相同时。
例如:
>>> torch.add(torch.ones(4,1), torch.randn(4))
之前会生成一个大小为 torch.Size([4,1])
的张量,但现在会生成大小为 torch.Size([4,4])
的张量。
为了帮助识别代码中可能因广播机制引入的向后不兼容情况,你可以将 torch.utils.backcompat.broadcast_warning.enabled
设为 True,这样在出现此类情况时会生成 Python 警告。
例如:
>>> torch.utils.backcompat.broadcast_warning.enabled=True
>>> torch.add(torch.ones(4,1), torch.ones(4))
__main__:1: UserWarning: self and other do not have the same shape, but are broadcastable, and have the same number of elements.
Changing behavior in a backwards incompatible manner to broadcasting rather than viewing as 1-dimensional.
CPU线程与TorchScript推理
PyTorch支持在TorchScript模型推理过程中使用多个CPU线程。下图展示了一个典型应用中可能存在的不同层级并行机制:
一个或多个推理线程会在给定输入上执行模型的前向传播。每个推理线程会调用JIT解释器,该解释器会逐个内联执行模型的操作。模型可以通过fork
TorchScript原语来启动异步任务。同时分叉多个操作会产生并行执行的任务。fork
操作符会返回一个Future
对象,可用于后续同步操作,例如:
@torch.jit.script
def compute_z(x):return torch.mm(x, self.w_z)@torch.jit.script
def forward(x):# launch compute_z asynchronously:fut = torch.jit._fork(compute_z, x)# execute the next operation in parallel to compute_z:y = torch.mm(x, self.w_y)# wait for the result of compute_z:z = torch.jit._wait(fut)return y + z
PyTorch 使用单个线程池来处理操作间并行(inter-op parallelism),该线程池由应用程序进程内派生的所有推理任务共享。
除了操作间并行外,PyTorch 还可以在操作内部(intra-op parallelism)利用多线程。这在许多场景下非常有用,包括对大张量进行逐元素操作、卷积运算、GEMM(通用矩阵乘法)、嵌入查找等操作。
构建选项
PyTorch 使用内部 ATen 库来实现算子操作。此外,PyTorch 还可以通过集成外部库(如 MKL 和 MKL-DNN)来加速 CPU 计算。
ATen、MKL 和 MKL-DNN 支持操作内并行,并依赖以下并行库实现该功能:
- OpenMP —— 一种标准(通常随编译器提供),被广泛用于外部库;
- TBB —— 新型并行库,专为基于任务的并行和并发环境优化。
OpenMP 因其易用性和对基于循环的并行等特性的支持,长期以来被众多库广泛采用。
TBB 在外部库中使用较少,但它针对并发环境进行了优化。PyTorch 的 TBB 后端能确保应用中所有算子共享同一个进程内独立线程池。
根据具体场景,开发者可选择更适合自身应用的并行库。
PyTorch 允许在构建时通过以下选项指定 ATen 及其他库使用的并行后端:
库 | 构建选项 | 可选值 | 说明 |
---|---|---|---|
ATen | ATEN_THREADING | OMP (默认), TBB | |
MKL | MKL_THREADING | (同上) | 需启用 BLAS=MKL 使用 MKL |
MKL-DNN | MKLDNN_CPU_RUNTIME | (同上) | 需启用 USE_MKLDNN=1 使用 MKL-DNN |
建议不要在同一个构建中混用 OpenMP 和 TBB。
上表中所有 TBB
选项需设置 USE_TBB=1
(默认关闭)。OpenMP 并行需单独设置 USE_OPENMP=1
(默认开启)。
运行时 API
以下API用于控制线程设置:
并行类型 | 设置项 | 备注 |
---|---|---|
操作间并行 | at::set_num_interop_threads , at::get_num_interop_threads (C++) set_num_interop_threads , get_num_interop_threads (Python, torch 模块) | 默认线程数:CPU核心数 |
操作内并行 | at::set_num_threads , at::get_num_threads (C++) set_num_threads , get_num_threads (Python, torch 模块) 环境变量: OMP_NUM_THREADS 和 MKL_NUM_THREADS |
对于操作内并行设置,at::set_num_threads
和torch.set_num_threads
的优先级始终高于环境变量,其中MKL_NUM_THREADS
变量的优先级高于OMP_NUM_THREADS
。
调整线程数量
以下简单脚本展示了矩阵乘法运行时间如何随线程数量变化:
import timeit
runtimes = []
threads = [1] + [t for t in range(2, 49, 2)]
for t in threads:torch.set_num_threads(t)r = timeit.timeit(setup = "import torch; x = torch.randn(1024, 1024); y = torch.randn(1024, 1024)", stmt="torch.mm(x, y)", number=100)runtimes.append(r)
# ... plotting (threads, runtimes) ...
在一台配备24个物理CPU核心(Xeon E5-2680处理器,基于MKL和OpenMP构建)的系统上运行该脚本,得到以下运行时间结果:
调整内部操作线程和跨操作线程数量时需考虑以下因素:
- 选择线程数量时需避免过度订阅(使用过多线程会导致性能下降)。例如,在使用大型应用线程池或高度依赖跨操作并行的应用中,可以考虑禁用内部操作并行(例如通过调用
set_num_threads(1)
); - 典型应用中可能需要在延迟(处理推理请求的时间)和吞吐量(单位时间内完成的工作量)之间进行权衡。调整线程数量是优化这种权衡的有效手段。例如,在延迟敏感型应用中,可以增加内部操作线程数以加速单个请求处理。但需注意,操作的并行实现可能带来额外开销,这会增加单个请求的工作量从而降低整体吞吐量。
警告:OpenMP不保证应用会使用单一进程内内部操作线程池。相反,两个不同的应用线程或跨操作线程可能使用不同的OpenMP线程池执行内部操作。这可能导致应用使用大量线程。在OpenMP环境下,需要特别注意线程数量调优以避免多线程应用中的过度订阅问题。
注意:预编译的PyTorch版本默认启用OpenMP支持。
注意:parallel_info
工具可打印线程配置信息用于调试。在Python中也可通过调用torch.__config__.parallel_info()
获取类似输出。
CUDA 语义
https://pytorch.org/docs/stable/notes/cuda.html
torch.cuda
用于设置和运行 CUDA 操作。它会跟踪当前选定的 GPU,默认情况下所有分配的 CUDA 张量都会在该设备上创建。可以通过 torch.cuda.device
上下文管理器来更改所选设备。
不过,一旦张量分配完成后,无论当前选定哪个设备,都可以对其进行操作,结果始终会放在与张量相同的设备上。
默认情况下不允许跨 GPU 操作,除了 copy_()
和其他具有复制功能的方法,如 to()
和 cuda()
。除非启用了点对点内存访问,否则尝试在不同设备上的张量上启动操作都会引发错误。
下面是一个展示此行为的小示例:
cuda = torch.device('cuda') # Default CUDA device
cuda0 = torch.device('cuda:0')
cuda2 = torch.device('cuda:2') # GPU 2 (these are 0-indexed)x = torch.tensor([1., 2.], device=cuda0)
# x.device is device(type='cuda', index=0)
y = torch.tensor([1., 2.]).cuda()
# y.device is device(type='cuda', index=0)with torch.cuda.device(1):# allocates a tensor on GPU 1a = torch.tensor([1., 2.], device=cuda)# transfers a tensor from CPU to GPU 1b = torch.tensor([1., 2.]).cuda()# a.device and b.device are device(type='cuda', index=1)# You can also use ``Tensor.to`` to transfer a tensor:b2 = torch.tensor([1., 2.]).to(device=cuda)# b.device and b2.device are device(type='cuda', index=1)c = a + b# c.device is device(type='cuda', index=1)z = x + y# z.device is device(type='cuda', index=0)# even within a context, you can specify the device# (or give a GPU index to the .cuda call)d = torch.randn(2, device=cuda2)e = torch.randn(2).to(cuda2)f = torch.randn(2).cuda(cuda2)# d.device, e.device, and f.device are all device(type='cuda', index=2)
Ampere(及后续)设备上的 TensorFloat-32 (TF32)
从 PyTorch 1.7 开始,新增了一个名为 allow_tf32
的标志。该标志在 PyTorch 1.7 至 1.11 版本中默认为 True
,而在 PyTorch 1.12 及更高版本中默认为 False
。此标志控制 PyTorch 是否允许在内部使用 TensorFloat-32 (TF32) 张量核心(自 Ampere 架构起在 NVIDIA GPU 上提供)来计算矩阵乘法(包括批量矩阵乘法)和卷积运算。
TF32 张量核心的设计目标是通过将输入数据舍入为 10 位尾数,并以 FP32 精度累加结果(同时保持 FP32 的动态范围),从而在 torch.float32
张量的矩阵乘法和卷积运算上实现更佳性能。
矩阵乘法和卷积运算的控制是独立的,其对应标志可通过以下方式访问:
# The flag below controls whether to allow TF32 on matmul. This flag defaults to False
# in PyTorch 1.12 and later.
torch.backends.cuda.matmul.allow_tf32 = True# The flag below controls whether to allow TF32 on cuDNN. This flag defaults to True.
torch.backends.cudnn.allow_tf32 = True
可以通过 set_float_32_matmul_precision()
更广泛地设置矩阵乘法的精度(不仅限于 CUDA)。
请注意,除了矩阵乘法和卷积运算本身外,内部使用矩阵乘法或卷积的函数和 nn 模块也会受到影响。这些包括 nn.Linear、nn.Conv*、cdist、tensordot、仿射网格和网格采样、自适应对数 softmax、GRU 和 LSTM。
要了解精度和速度的情况,请参阅下面的示例代码和基准测试数据(基于 A100):
a_full = torch.randn(10240, 10240, dtype=torch.double, device='cuda')
b_full = torch.randn(10240, 10240, dtype=torch.double, device='cuda')
ab_full = a_full @ b_full
mean = ab_full.abs().mean() # 80.7277a = a_full.float()
b = b_full.float()# Do matmul at TF32 mode.
torch.backends.cuda.matmul.allow_tf32 = True
ab_tf32 = a @ b # takes 0.016s on GA100
error = (ab_tf32 - ab_full).abs().max() # 0.1747
relative_error = error / mean # 0.0022# Do matmul with TF32 disabled.
torch.backends.cuda.matmul.allow_tf32 = False
ab_fp32 = a @ b # takes 0.11s on GA100
error = (ab_fp32 - ab_full).abs().max() # 0.0031
relative_error = error / mean # 0.000039
从上述示例可以看出,启用TF32后,A100上的速度提升了约7倍,但与双精度相比的相对误差大约增加了2个数量级。需要注意的是,TF32与单精度速度的具体比例取决于硬件代际,因为不同代际或不同型号之间可能存在以下特性差异:
- 内存带宽与计算能力的比例
- TF32与FP32矩阵乘法吞吐量的比例
如果需要完整的FP32精度,用户可以通过以下方式禁用TF32:
torch.backends.cuda.matmul.allow_tf32 = False
torch.backends.cudnn.allow_tf32 = False
要在 C++ 中关闭 TF32 标志,可以执行以下操作:
at::globalContext().setAllowTF32CuBLAS(false);
at::globalContext().setAllowTF32CuDNN(false);
有关TF32的更多信息,请参阅:
- TensorFloat-32
- CUDA 11
- Ampere架构
FP16 GEMM中的降低精度计算
(不同于专为硬件设计的全FP16累加方案——该方案在FP16累加时比FP32累加具有更高吞吐量,详见全FP16累加章节)
fp16 GEMM运算可能会采用部分中间结果的降低精度计算(例如使用fp16而非fp32)。这种选择性降低精度的方法能在特定工作负载(尤其是k维度较大的情况)和GPU架构上实现更高性能,但代价是数值精度降低和潜在的溢出风险。
V100显卡上的基准测试数据示例如下:
[--------------------------- bench_gemm_transformer --------------------------][ m , k , n ] | allow_fp16_reduc=True | allow_fp16_reduc=False
1 threads: --------------------------------------------------------------------[4096, 4048, 4096] | 1634.6 | 1639.8[4096, 4056, 4096] | 1670.8 | 1661.9[4096, 4080, 4096] | 1664.2 | 1658.3[4096, 4096, 4096] | 1639.4 | 1651.0[4096, 4104, 4096] | 1677.4 | 1674.9[4096, 4128, 4096] | 1655.7 | 1646.0[4096, 4144, 4096] | 1796.8 | 2519.6[4096, 5096, 4096] | 2094.6 | 3190.0[4096, 5104, 4096] | 2144.0 | 2663.5[4096, 5112, 4096] | 2149.1 | 2766.9[4096, 5120, 4096] | 2142.8 | 2631.0[4096, 9728, 4096] | 3875.1 | 5779.8[4096, 16384, 4096] | 6182.9 | 9656.5
(times in microseconds).
如果需要完全精度缩减,用户可以通过以下方式禁用 fp16 GEMM 中的缩减精度缩减:
torch.backends.cuda.matmul.allow_fp16_reduced_precision_reduction = False
要在 C++ 中切换降低精度缩减标志,可以这样做
at::globalContext().setAllowFP16ReductionCuBLAS(false);
BF16 GEMM中的降低精度缩减
BFloat16 GEMM也存在类似的标志(如上所述)。
请注意,对于BF16,此开关默认设置为True。如果在您的工作负载中观察到数值不稳定性,可以将其设为False。
如果不希望使用降低精度缩减,用户可以通过以下方式在bf16 GEMM中禁用该功能:
torch.backends.cuda.matmul.allow_bf16_reduced_precision_reduction = False
要在 C++ 中切换降低精度的缩减标志,可以这样做
at::globalContext().setAllowBF16ReductionCuBLAS(true);
FP16 GEMM 中的全 FP16 累加
某些 GPU 在执行 全部 FP16 GEMM 累加时性能会提升,但代价是数值精度降低和溢出概率增加。
请注意,此设置仅对计算能力 7.0(Volta)或更高版本的 GPU 有效。
可通过以下方式启用该行为:
torch.backends.cuda.matmul.allow_fp16_accumulation = True
在 C++ 中切换降低精度缩减标志,可以通过以下方式实现:
at::globalContext().setAllowFP16AccumulationCuBLAS(true);
异步执行
默认情况下,GPU操作是异步的。当调用使用GPU的函数时,操作会被加入队列到特定设备,但不一定会立即执行。这让我们可以并行执行更多计算,包括CPU或其他GPU上的操作。
通常来说,异步计算对调用者是不可见的,因为:(1) 每个设备会按照操作入队顺序执行;(2) PyTorch在CPU与GPU之间或两个GPU之间复制数据时,会自动执行必要的同步。因此,计算过程的表现就像所有操作都是同步执行的。
你可以通过设置环境变量CUDA_LAUNCH_BLOCKING=1
来强制同步计算。这在GPU发生错误时很有用(异步执行时,这类错误要到操作实际执行后才会报告,因此堆栈追踪不会显示错误请求的位置)。
异步计算的一个影响是:未同步的时间测量是不准确的。要获得精确测量,应该在测量前调用torch.cuda.synchronize()
,或者使用torch.cuda.Event
来记录时间,如下所示:
start_event = torch.cuda.Event(enable_timing=True)
end_event = torch.cuda.Event(enable_timing=True)
start_event.record()# Run some things hereend_event.record()
torch.cuda.synchronize() # Wait for the events to be recorded!
elapsed_time_ms = start_event.elapsed_time(end_event)
作为例外情况,某些函数如 to()
和 copy_()
提供了显式的 non_blocking
参数,允许调用者在不需要同步时绕过该机制。
另一个例外是 CUDA 流,具体说明如下。
CUDA 流
CUDA 流是属于特定设备的线性执行序列。通常您不需要显式创建流:默认情况下,每个设备会使用自己的"默认"流。
同一流内的操作会按照创建顺序串行执行,但不同流中的操作可以以任意相对顺序并发执行,除非使用了显式同步函数(例如 synchronize()
或 wait_stream()
)。例如,以下代码是错误的:
cuda = torch.device('cuda')
s = torch.cuda.Stream() # Create a new stream.
A = torch.empty((100, 100), device=cuda).normal_(0.0, 1.0)
with torch.cuda.stream(s):# sum() may start execution before normal_() finishes!B = torch.sum(A)
当"当前流"是默认流时,PyTorch会自动执行必要的数据同步操作(如上文所述)。但在使用非默认流时,用户需自行确保正确的同步。以下是修正后的示例代码:
cuda = torch.device('cuda')
s = torch.cuda.Stream() # Create a new stream.
A = torch.empty((100, 100), device=cuda).normal_(0.0, 1.0)
s.wait_stream(torch.cuda.default_stream(cuda)) # NEW!
with torch.cuda.stream(s):B = torch.sum(A)
A.record_stream(s) # NEW!
新增了两项功能。调用 torch.cuda.Stream.wait_stream()
可以确保在侧流上开始运行 sum(A)
之前,normal_()
的执行已经完成。torch.Tensor.record_stream()
(详见文档)则确保在 sum(A)
完成之前不会释放张量 A 的内存。你也可以稍后通过 torch.cuda.default_stream(cuda).wait_stream(s)
手动等待流(注意立即等待没有意义,这会阻止该流与默认流上的其他工作并行执行)。关于何时使用这两种方法,请参阅 torch.Tensor.record_stream()
的文档说明。
需要注意的是,即使没有读取依赖关系(如此例所示),这种同步操作也是必要的。
cuda = torch.device('cuda')
s = torch.cuda.Stream() # Create a new stream.
A = torch.empty((100, 100), device=cuda)
s.wait_stream(torch.cuda.default_stream(cuda)) # STILL REQUIRED!
with torch.cuda.stream(s):A.normal_(0.0, 1.0)A.record_stream(s)
尽管对 s
的计算不会读取 A
的内容且 A
没有其他用途,但仍然需要进行同步操作。这是因为 A
可能对应由 CUDA 缓存分配器重新分配的内存,其中可能包含来自旧(已释放)内存的待处理操作。
反向传播的流语义
每个反向CUDA操作都运行在其对应前向操作所使用的同一流上。如果前向传播在不同流上并行运行独立操作,这种设计有助于反向传播利用相同的并行性。
反向调用相对于周围操作的流语义与其他任何调用相同。反向传播会插入内部同步以确保这一点,即使反向操作如上一段所述运行在多个流上。更具体地说,当调用以下函数时:
autograd.backward
、autograd.grad
或
tensor.backward
,并可选地提供CUDA张量作为初始梯度(例如autograd.backward(..., grad_tensors=initial_grads)
、autograd.grad(..., grad_outputs=initial_grads)
或
tensor.backward(..., gradient=initial_grad)
),以下三个操作:
1、可选地填充初始梯度
2、调用反向传播
3、使用梯度
它们之间的流语义关系与任何操作组相同。
s = torch.cuda.Stream()# Safe, grads are used in the same stream context as backward()
with torch.cuda.stream(s):loss.backward()use grads# Unsafe
with torch.cuda.stream(s):loss.backward()
use grads# Safe, with synchronization
with torch.cuda.stream(s):loss.backward()
torch.cuda.current_stream().wait_stream(s)
use grads# Safe, populating initial grad and invoking backward are in the same stream context
with torch.cuda.stream(s):loss.backward(gradient=torch.ones_like(loss))# Unsafe, populating initial_grad and invoking backward are in different stream contexts, # without synchronization
initial_grad = torch.ones_like(loss)
with torch.cuda.stream(s):loss.backward(gradient=initial_grad)# Safe, with synchronization
initial_grad = torch.ones_like(loss)
s.wait_stream(torch.cuda.current_stream())
with torch.cuda.stream(s):initial_grad.record_stream(s)loss.backward(gradient=initial_grad)
BC 说明:在默认流上使用梯度
在 PyTorch 的早期版本(1.9 及之前)中,自动梯度引擎总是将默认流与所有反向操作同步,因此以下模式:
with torch.cuda.stream(s):loss.backward()
use grads
只要use grads
发生在默认流上就是安全的。
在当前版本的PyTorch中,这种模式不再安全。如果backward()
和use grads
处于不同的流上下文中,你必须同步这些流:
with torch.cuda.stream(s):loss.backward()
torch.cuda.current_stream().wait_stream(s)
use grads
即使启用了 use grads
选项,也会使用默认流。
内存管理
PyTorch 使用缓存内存分配器来加速内存分配。这种机制无需设备同步即可实现快速内存释放。但分配器管理的未使用内存仍会在 nvidia-smi
中显示为已占用。您可以通过 memory_allocated()
和 max_memory_allocated()
监控张量占用的内存,使用 memory_reserved()
和 max_memory_reserved()
监控缓存分配器管理的总内存量。调用 empty_cache()
会释放 PyTorch 中所有未使用的缓存内存,供其他 GPU 应用程序使用。但已分配给张量的 GPU 内存不会被释放,因此不会增加 PyTorch 可用的 GPU 内存总量。
要深入了解 CUDA 内存随时间的使用情况,理解 CUDA 内存使用 文档介绍了捕获和可视化内存使用轨迹的工具。
对于高级用户,我们通过 memory_stats()
提供更全面的内存基准测试功能。此外,通过 memory_snapshot()
可以捕获内存分配器状态的完整快照,帮助您理解代码产生的底层内存分配模式。
使用 PYTORCH_CUDA_ALLOC_CONF
优化内存占用
缓存分配器的使用可能会干扰 cuda-memcheck
等内存检查工具。若需通过 cuda-memcheck
调试内存错误,可在环境中设置 PYTORCH_NO_CUDA_MEMORY_CACHING=1
来禁用缓存。
缓存分配器的行为可通过环境变量 PYTORCH_CUDA_ALLOC_CONF
控制,其格式为:
PYTORCH_CUDA_ALLOC_CONF=<选项>:<值>,<选项2>:<值2>...
可用选项包括:
backend
:选择底层分配器实现。当前有效选项为:native
:使用 PyTorch 原生实现cudaMallocAsync
:使用 CUDA 内置异步分配器(需 CUDA 11.4 或更高版本)
默认值为native
。此设置对进程使用的所有设备生效,无法按设备单独指定。
max_split_size_mb
:阻止原生分配器拆分超过此大小(MB)的内存块,可减少碎片化并帮助临界工作负载避免内存耗尽。性能开销从"零"到"显著"不等,具体取决于分配模式。默认值为无限制(允许拆分所有块)。建议通过memory_stats()
和memory_summary()
方法进行调优。此选项应作为因"内存不足"而中止且显示大量非活动拆分块的工作负载的最后手段。仅对backend:native
有效。roundup_power2_divisions
:将请求的分配大小舍入到最近的 2 次幂分频值,以提升内存块利用率。原生 CUDACachingAllocator 默认以 512 字节块大小的倍数向上取整,这对小尺寸效果良好,但对相邻大分配可能效率低下(导致块复用率降低)。此选项支持将分配大小舍入到最近的 2 次幂分频值。例如:1200 的分配大小在 4 分频时会舍入为 1280(介于 1024-2048 之间的分频值为 1024/1280/1536/1792)。可指定单一全局值或键值对数组(如[256:1,512:2,1024:4,>:8]
表示:<256MB 用 1 分频,256-512MB 用 2 分频等)。仅对backend:native
有效。max_non_split_rounding_mb
:允许非拆分块更大范围的复用(例如 1024MB 缓存块可复用于 512MB 请求)。默认仅允许 20MB 舍入范围(512MB 块只能服务 512-532MB 请求),设为 1024 时将扩展至 512-1536MB 范围,减少 cudaMalloc 调用延迟。garbage_collection_threshold
:主动回收未使用的 GPU 内存以避免触发昂贵的同步全回收操作(release_cached_blocks),这对延迟敏感的 GPU 应用(如服务器)尤为重要。设置阈值(如 0.8)后,当 GPU 内存使用超过阈值(总内存的 80%)时将启动回收,优先释放老旧未用块。阈值需在 0.0-1.0 之间。仅对backend:native
有效。expandable_segments
(实验性,默认 False):设为 True 时,分配器会创建可扩展的 CUDA 分配段,适用于分配大小频繁变化的场景(如动态批量大小)。传统方式对大分配(>2MB)直接调用 cudaMalloc,而此模式改为创建可扩展段(每流一个),通过动态扩展减少内存碎片。当批量从 N 增至 N+1 时,新分配会整齐填入扩展段,避免产生大量不可用内存碎片。pinned_use_cuda_host_register
:布尔标志,决定是否使用 CUDA API 的 cudaHostRegister 函数(而非默认的 cudaHostAlloc)分配固定内存。设为 True 时,先通过常规 malloc 分配内存,再调用 cudaHostRegister 前映射内存页,可减少锁定时长。pinned_num_register_threads
:仅在 pinned_use_cuda_host_register=True 时有效。默认使用单线程映射内存页,此选项允许使用多线程并行化页映射操作以缩短固定内存分配时间。基准测试建议值为 8。pinned_use_background_threads
:布尔标志,启用后台线程处理事件,避免快速分配路径中的事件查询/处理慢路径。默认禁用。
注意:CUDA 内存管理 API 报告的某些统计信息仅适用于 backend:native
,对 backend:cudaMallocAsync
无意义。详见各函数的文档说明。
为CUDA使用自定义内存分配器
在C/C++中可以将分配器定义为简单的函数,并将其编译为共享库。以下代码展示了一个基础分配器,仅用于追踪所有内存操作。
#include <sys/types.h>
#include <cuda_runtime_api.h>
#include <iostream>
// Compile with g++ alloc.cc -o alloc.so -I/usr/local/cuda/include -shared -fPIC
extern "C" {
void* my_malloc(ssize_t size, int device, cudaStream_t stream) {void *ptr;cudaMalloc(&ptr, size);std::cout<<"alloc "<<ptr<<size<<std::endl;return ptr;
}void my_free(void* ptr, ssize_t size, int device, cudaStream_t stream) {std::cout<<"free "<<ptr<< " "<<stream<<std::endl;cudaFree(ptr);
}
}
在 Python 中可以通过 torch.cuda.memory.CUDAPluggableAllocator
使用该功能。
用户需要自行提供 .so 文件的路径,以及符合上述签名的 alloc/free 函数名称。
import torch# Load the allocator
new_alloc = torch.cuda.memory.CUDAPluggableAllocator('alloc.so', 'my_malloc', 'my_free')
# Swap the current allocator
torch.cuda.memory.change_current_allocator(new_alloc)
# This will allocate memory in the device using the new allocator
b = torch.zeros(10, device='cuda')
import torch# Do an initial memory allocator
b = torch.zeros(10, device='cuda')
# Load the allocator
new_alloc = torch.cuda.memory.CUDAPluggableAllocator('alloc.so', 'my_malloc', 'my_free')
# This will error since the current allocator was already instantiated
torch.cuda.memory.change_current_allocator(new_alloc)
在同一程序中混合使用不同的CUDA系统分配器
根据您的使用场景,change_current_allocator()
可能并非理想选择,因为它会替换整个程序的CUDA分配器(类似于设置PYTORCH_CUDA_ALLOC_CONF=backend:cudaMallocAsync
)。例如,如果替换的分配器不具备缓存机制,您将失去PyTorch的CUDACachingAllocator的所有优势。作为替代方案,您可以通过torch.cuda.MemPool
选择性地标记PyTorch代码区域来使用自定义分配器。这样可以在同一个PyTorch程序中同时使用多个CUDA系统分配器,同时保留CUDACachingAllocator的大部分优势(如缓存功能)。
通过torch.cuda.MemPool
,您可以利用支持以下特性的自定义分配器:
- 使用
ncclMemAlloc
分配器为all-reduce操作分配输出缓冲区,可启用NVLink Switch Reductions(NVLS)。这能减少GPU资源(SM和复制引擎)上重叠计算与通信内核的竞争,特别适用于张量并行工作负载。 - 对于基于Grace CPU的系统,使用
cuMemCreate
为all-gather操作分配主机输出缓冲区,并指定CU_MEM_LOCATION_TYPE_HOST_NUMA
,可启用基于扩展GPU内存(EGM)的从源GPU到目标CPU的内存传输。由于传输通过NVLink进行(而非带宽受限的网卡链路),这会加速all-gather操作,进而加快模型检查点保存速度。 - 如果您正在构建模型,且不希望初期考虑内存密集型模块(如嵌入表)的最佳内存位置,或者某个模块对性能不敏感且无法放入GPU,可以直接使用
cudaMallocManaged
以CPU为首选位置分配该模块,先让模型运行起来。
注意:虽然cudaMallocManaged
通过CUDA统一虚拟内存(UVM)提供了便捷的自动内存管理,但不建议用于深度学习工作负载。对于能放入GPU内存的深度学习任务,显式内存放置方案始终优于UVM,因为前者没有页面错误且访问模式可预测。当GPU内存饱和时,UVM必须执行代价高昂的双重传输:先将页面驱逐到CPU,再载入新页面。
以下代码展示了封装在torch.cuda.memory.CUDAPluggableAllocator
中的ncclMemAlloc
实现。
import osimport torch
import torch.distributed as dist
from torch.cuda.memory import CUDAPluggableAllocator
from torch.distributed.distributed_c10d import _get_default_group
from torch.utils import cpp_extension# create allocator
nccl_allocator_source = """
#include <nccl.h>
#include <iostream>
extern "C" {void* nccl_alloc_plug(size_t size, int device, void* stream) {std::cout << "Using ncclMemAlloc" << std::endl;void* ptr;ncclResult_t err = ncclMemAlloc(&ptr, size);return ptr;}void nccl_free_plug(void* ptr, size_t size, int device, void* stream) {std::cout << "Using ncclMemFree" << std::endl;ncclResult_t err = ncclMemFree(ptr);
}}
"""
nccl_allocator_libname = "nccl_allocator"
nccl_allocator = torch.utils.cpp_extension.load_inline(name=nccl_allocator_libname, cpp_sources=nccl_allocator_source, with_cuda=True, extra_ldflags=["-lnccl"], verbose=True, is_python_module=False, build_directory="./", )allocator = CUDAPluggableAllocator(f"./{nccl_allocator_libname}.so", "nccl_alloc_plug", "nccl_free_plug"
).allocator()# setup distributed
rank = int(os.getenv("RANK"))
local_rank = int(os.getenv("LOCAL_RANK"))
world_size = int(os.getenv("WORLD_SIZE"))
torch.cuda.set_device(local_rank)
dist.init_process_group(backend="nccl")
device = torch.device(f"cuda:{local_rank}")
default_pg = _get_default_group()
backend = default_pg._get_backend(device)# Note: for convenience, ProcessGroupNCCL backend provides
# the ncclMemAlloc allocator as backend.mem_allocator
allocator = backend.mem_allocator
现在你可以通过将这个分配器传递给 torch.cuda.MemPool
来定义一个新的内存池:
pool = torch.cuda.MemPool(allocator)
然后可以使用 torch.cuda.use_mem_pool
上下文管理器将该内存池用于张量分配:
with torch.cuda.use_mem_pool(pool):# tensor gets allocated with ncclMemAlloc passed in the pooltensor = torch.arange(1024 * 1024 * 2, device=device)print(f"tensor ptr on rank {rank} is {hex(tensor.data_ptr())}")# register user buffers using ncclCommRegister (called under the hood)
backend.register_mem_pool(pool)# Collective uses Zero Copy NVLS
dist.all_reduce(tensor[0:4])
torch.cuda.synchronize()
print(tensor[0:4])
注意上面示例中register_mem_pool
的用法。这是NVLS规约操作所需的额外步骤,用户需要将缓冲区注册到NCCL。用户可以通过类似的deregister_mem_pool
调用来注销缓冲区。
要回收内存,用户首先需要确保没有操作正在使用该内存池。当所有张量都不再持有该内存池的引用时,在删除内存池时会自动调用empty_cache()
,从而将所有内存返还给系统。
del tensor, del pool
以下 torch.cuda.MemPool.use_count()
和 torch.cuda.MemPool.snapshot()
API 可用于调试目的:
pool = torch.cuda.MemPool(allocator)# pool's use count should be 1 at this point as MemPool object
# holds a reference
assert pool.use_count() == 1nelem_1mb = 1024 * 1024 // 4with torch.cuda.use_mem_pool(pool):out_0 = torch.randn(nelem_1mb, device="cuda")# pool's use count should be 2 at this point as use_mem_pool# holds a referenceassert pool.use_count() == 2# pool's use count should be back to 1 at this point as use_mem_pool
# released its reference
assert pool.use_count() == 1with torch.cuda.use_mem_pool(pool):# pool should have 1 segment since we made a small allocation (1 MB)# above and so the CUDACachingAllocator packed it into a 2 MB bufferassert len(pool.snapshot()) == 1out_1 = torch.randn(nelem_1mb, device="cuda")# pool should still have 1 segment since we made another small allocation# (1 MB) that got packed into the existing 2 MB bufferassert len(pool.snapshot()) == 1out_2 = torch.randn(nelem_1mb, device="cuda")# pool now should have 2 segments since the CUDACachingAllocator had# to make a new 2 MB buffer to accomodate out_2assert len(pool.snapshot()) == 2
注意:
torch.cuda.MemPool
会持有内存池的引用。当使用torch.cuda.use_mem_pool
上下文管理器时,它也会获取对内存池的另一个引用。在退出上下文管理器时,它会释放该引用。此后,理想情况下应仅剩张量持有对内存池的引用。当张量释放其引用后,内存池的使用计数将为1,表示只有torch.cuda.MemPool
对象仍持有引用。只有到那时,在调用内存池的析构函数del
时,内存池持有的内存才能被释放回系统。torch.cuda.MemPool
目前不支持 CUDACachingAllocator 的expandable_segments
模式。- NCCL 对缓冲区有特定要求,以确保其与 NVLS 归约操作兼容。在动态工作负载中,这些要求可能会被破坏。例如,CUDACachingAllocator 发送给 NCCL 的缓冲区可能会被分割,从而导致未正确对齐。在这种情况下,NCCL 可以使用备用算法代替 NVLS。
- 像
ncclMemAlloc
这样的分配器由于对齐要求(CU_MULTICAST_GRANULARITY_RECOMMENDED
、CU_MULTICAST_GRANULARITY_MINIMUM
),可能会分配比请求更多的内存,从而导致工作负载耗尽内存。
cuBLAS 工作空间
对于每个 cuBLAS 句柄与 CUDA 流的组合,当该组合执行需要工作空间的 cuBLAS 内核时,系统会分配一个 cuBLAS 工作空间。为了避免重复分配,这些工作空间不会被释放,除非显式调用 torch._C._cuda_clearCublasWorkspaces()
方法。
每个分配的工作空间大小可通过环境变量 CUBLAS_WORKSPACE_CONFIG
指定,格式为 :[SIZE]:[COUNT]
。例如,默认配置 CUBLAS_WORKSPACE_CONFIG=:4096:2:16:8
表示总空间大小为 2 * 4096 + 8 * 16 KiB
。若需强制禁用 cuBLAS 工作空间,可设置为 CUBLAS_WORKSPACE_CONFIG=:0:0
。
cuFFT 计划缓存
针对每个 CUDA 设备,系统会使用一个 LRU 缓存来存储 cuFFT 计划,以加速在相同几何形状和配置的 CUDA 张量上重复运行 FFT 方法(例如 torch.fft.fft()
)。由于某些 cuFFT 计划可能会分配 GPU 内存,这些缓存设有最大容量限制。
您可以通过以下 API 控制和查询当前设备的缓存属性:
torch.backends.cuda.cufft_plan_cache.max_size
提供缓存的容量(在 CUDA 10 及更新版本中默认为 4096,旧版 CUDA 中默认为 1023)。直接设置该值可修改容量。torch.backends.cuda.cufft_plan_cache.size
提供当前缓存中驻留的计划数量。torch.backends.cuda.cufft_plan_cache.clear()
用于清空缓存。
要控制和查询非默认设备的计划缓存,您可以通过 torch.device
对象或设备索引来访问 torch.backends.cuda.cufft_plan_cache
对象,并获取上述任一属性。例如,要将设备 1
的缓存容量设置为 10,可执行:
torch.backends.cuda.cufft_plan_cache[1].max_size = 10
。
即时编译
PyTorch 会对某些操作(如 torch.special.zeta
)在 CUDA 张量上执行时进行即时编译。这种编译可能非常耗时(根据硬件和软件配置,最长可达数秒),并且单个算子可能触发多次编译——因为许多 PyTorch 算子实际上会从多个内核中选择执行,每个内核都需要根据输入类型编译一次。该编译过程每个进程仅发生一次,若启用内核缓存则全局仅需编译一次。
默认情况下,PyTorch 会在以下路径创建内核缓存:
- 若定义了
XDG_CACHE_HOME
环境变量:$XDG_CACHE_HOME/torch/kernels
- 未定义时:
$HOME/.cache/torch/kernels
(注意:Windows 系统暂不支持内核缓存功能)
可通过两个环境变量直接控制缓存行为:
- 设置
USE_PYTORCH_KERNEL_CACHE=0
将禁用缓存 - 设置
PYTORCH_KERNEL_CACHE_PATH
可指定自定义缓存路径替代默认位置
最佳实践
设备无关代码
由于PyTorch的结构特性,你可能需要显式编写设备无关(CPU或GPU)的代码。例如,创建一个新张量作为循环神经网络的初始隐藏状态时就需要这种处理。
第一步是确定是否应该使用GPU。常见的做法是利用Python的argparse
模块读取用户参数,并通过结合is_available()
方法设置一个禁用CUDA的标志。在下述代码中,args.device
会生成一个torch.device
对象,该对象可用于将张量移至CPU或CUDA。
import argparse
import torchparser = argparse.ArgumentParser(description='PyTorch Example')
parser.add_argument('--disable-cuda', action='store_true', help='Disable CUDA')
args = parser.parse_args()
args.device = None
if not args.disable_cuda and torch.cuda.is_available():args.device = torch.device('cuda')
else:args.device = torch.device('cpu')
注意:在评估特定环境中CUDA的可用性时(通过is_available()
),PyTorch的默认行为是调用CUDA Runtime API方法cudaGetDeviceCount。由于该调用会反过来初始化CUDA Driver API(通过cuInit),如果尚未初始化,那么后续运行过is_available()
的进程fork将会因CUDA初始化错误而失败。
您可以在导入执行is_available()
的PyTorch模块之前(或直接执行该函数之前),在环境中设置PYTORCH_NVML_BASED_CUDA_CHECK=1
,以指示is_available()
尝试基于NVML的评估(nvmlDeviceGetCount_v2)。如果基于NVML的评估成功(即NVML的发现/初始化未失败),is_available()
调用将不会影响后续的进程fork。
如果NVML发现/初始化失败,is_available()
将回退到标准的CUDA Runtime API评估,此时上述fork限制将适用。
需要注意的是,基于NVML的CUDA可用性评估提供的保证比默认的CUDA Runtime API方法(要求CUDA初始化成功)更弱。在某些情况下,基于NVML的检查可能成功,但后续的CUDA初始化会失败。
现在我们有了args.device
,可以用它在目标设备上创建张量。
x = torch.empty((8, 42), device=args.device)
net = Network().to(device=args.device)
这可用于多种场景来生成与设备无关的代码。以下是使用数据加载器时的示例:
cuda0 = torch.device('cuda:0') # CUDA GPU 0
for i, x in enumerate(train_loader):x = x.to(cuda0)
在系统上使用多个GPU时,可以通过CUDA_VISIBLE_DEVICES
环境变量来控制PyTorch可用的GPU设备。如前所述,要手动指定张量创建在哪个GPU上,最佳实践是使用torch.cuda.device
上下文管理器。
print("Outside device is 0") # On device 0 (default in most scenarios)
with torch.cuda.device(1):print("Inside device is 1") # On device 1
print("Outside device is still 0") # On device 0
如果你有一个张量,并希望在同一设备上创建相同类型的新张量,可以使用 torch.Tensor.new_*
方法(参见 torch.Tensor
)。
虽然前面提到的 torch.*
工厂函数(创建操作)依赖于当前 GPU 上下文和传入的属性参数,但 torch.Tensor.new_*
方法会保留张量的设备和其他属性。
在编写需要在前向传播过程中内部创建新张量的模块时,这是推荐的做法。
cuda = torch.device('cuda')
x_cpu = torch.empty(2)
x_gpu = torch.empty(2, device=cuda)
x_cpu_long = torch.empty(2, dtype=torch.int64)y_cpu = x_cpu.new_full([3, 2], fill_value=0.3)
print(y_cpu)tensor([[ 0.3000, 0.3000], [ 0.3000, 0.3000], [ 0.3000, 0.3000]])y_gpu = x_gpu.new_full([3, 2], fill_value=-5)
print(y_gpu)tensor([[-5.0000, -5.0000], [-5.0000, -5.0000], [-5.0000, -5.0000]], device='cuda:0')y_cpu_long = x_cpu_long.new_tensor([[1, 2, 3]])
print(y_cpu_long)tensor([[ 1, 2, 3]])
如果你想创建一个与另一个张量类型和大小相同的张量,并用全1或全0填充,可以使用便捷的辅助函数 ones_like()
或 zeros_like()
(这些函数还会保留张量的 torch.device
和 torch.dtype
属性)。
x_cpu = torch.empty(2, 3)
x_gpu = torch.empty(2, 3)y_cpu = torch.ones_like(x_cpu)
y_gpu = torch.zeros_like(x_gpu)
使用固定内存缓冲区
警告:这是高级技巧。过度使用固定内存可能导致内存不足时出现严重问题,且需注意固定操作通常开销较大。
当数据从固定(页锁定)内存传输时,主机到GPU的复制速度会显著提升。CPU张量和存储对象提供了 pin_memory()
方法,该方法会返回数据存放在固定内存区域的对象副本。
此外,固定张量或存储后,可启用异步GPU复制。只需在调用 to()
或 cuda()
时附加参数 non_blocking=True
,即可实现数据传输与计算的重叠执行。
通过向 DataLoader
构造函数传递 pin_memory=True
参数,可使返回的批次数据直接存放在固定内存中。
使用 nn.parallel.DistributedDataParallel 替代 multiprocessing 或 nn.DataParallel
对于涉及批量输入和多 GPU 的大多数用例,默认应使用 DistributedDataParallel
来利用多个 GPU。
使用 CUDA 模型配合 multiprocessing
存在重大注意事项:除非严格满足数据处理要求,否则程序很可能出现错误或未定义行为。
建议使用 DistributedDataParallel
而非 DataParallel
进行多 GPU 训练,即使只有单个节点。
DistributedDataParallel
与 DataParallel
的区别在于:前者采用多进程方式为每个 GPU 创建独立进程,而后者使用多线程。通过多进程机制,每个 GPU 拥有专属进程,从而避免了 Python 解释器 GIL 带来的性能开销。
若使用 DistributedDataParallel
,可通过 torch.distributed.launch 工具启动程序,详见第三方后端。
CUDA 图
CUDA 图是对 CUDA 流及其依赖流所执行工作(主要是内核及其参数)的记录。关于 CUDA 底层 API 的基本原理和详细信息,请参阅 CUDA 图入门指南 和 CUDA C 编程指南中的图章节。
PyTorch 支持通过流捕获来构建 CUDA 图,该功能将 CUDA 流置于捕获模式。在捕获流中提交的 CUDA 工作实际上不会在 GPU 上运行,而是会被记录到图中。
捕获完成后,可以启动该图来多次运行 GPU 工作。每次重放都会使用相同的参数执行相同的内核。对于指针参数,这意味着使用相同的内存地址。通过在每次重放前用新数据(例如来自新批次的数据)填充输入内存,可以在新数据上重新运行相同的工作。
为什么选择CUDA Graphs?
重放图(graph replay)通过牺牲典型即时执行(eager execution)的动态灵活性,换取了显著降低的CPU开销。由于图的参数和内核是固定的,重放过程会跳过所有参数设置和内核调度的层级,包括Python、C++和CUDA驱动的开销。在底层实现中,重放通过单次调用cudaGraphLaunch将整个图的工作提交给GPU。重放中的内核在GPU上的执行也会稍快一些,但消除CPU开销才是主要优势。
如果您的整个或部分网络符合图安全条件(通常意味着静态形状和静态控制流,但需参见其他限制条件),并且您怀疑其运行时至少部分受限于CPU性能,那么应该尝试使用CUDA Graphs。
PyTorch API
警告:此API处于测试阶段,未来版本可能会有变更。
PyTorch通过原始的torch.cuda.CUDAGraph
类以及两个便捷封装器torch.cuda.graph
和torch.cuda.make_graphed_callables
来暴露图计算功能。
torch.cuda.graph
是一个简单通用的上下文管理器,用于捕获其上下文中的CUDA操作。在捕获前,需要通过运行几次即时模式的迭代来预热待捕获的工作负载。预热必须在侧流(side stream)上进行。由于图在每次重放时都会读写相同的内存地址,因此必须保持对输入输出数据张量的长期引用。要在新输入数据上运行图,需将新数据复制到捕获的输入张量中,重放图计算,然后从捕获的输出张量中读取新结果。示例:
g = torch.cuda.CUDAGraph()# Placeholder input used for capture
static_input = torch.empty((5,), device="cuda")# Warmup before capture
s = torch.cuda.Stream()
s.wait_stream(torch.cuda.current_stream())
with torch.cuda.stream(s):for _ in range(3):static_output = static_input * 2
torch.cuda.current_stream().wait_stream(s)# Captures the graph
# To allow capture, automatically sets a side stream as the current stream in the context
with torch.cuda.graph(g):static_output = static_input * 2# Fills the graph's input memory with new data to compute on static_input.copy_(torch.full((5,), 3, device="cuda"))
g.replay()
# static_output holds the results
print(static_output) # full of 3 * 2 = 6# Fills the graph's input memory with more data to compute on static_input.copy_(torch.full((5,), 4, device="cuda"))
g.replay()
print(static_output) # full of 4 * 2 = 8
请参考以下实际应用和高级模式:
全网络捕获、与 torch.cuda.amp 配合使用 以及 多流捕获
make_graphed_callables
功能更为复杂。
该方法支持 Python 函数和 torch.nn.Module
模块,会为每个传入的函数或模块分别创建前向传播和后向传播的计算图。具体实现可参阅:
部分网络捕获
约束条件
一组操作如果满足以下所有约束条件,则被视为可捕获的。
这些约束适用于 torch.cuda.graph
上下文中的所有工作,以及传递给 torch.cuda.make_graphed_callables()
的任何可调用对象的前向和反向传播过程中的所有工作。
违反以下任何一条都可能导致运行时错误:
- 捕获操作必须在非默认流上进行。(仅在使用原始方法
CUDAGraph.capture_begin
和CUDAGraph.capture_end
时需要注意,graph
和make_graphed_callables()
会自动为你设置一个侧流。) - 禁止执行会同步 CPU 和 GPU 的操作(例如
.item()
调用)。 - 允许执行 CUDA 随机数生成(RNG)操作。如果在图中使用多个
torch.Generator
实例,必须在图捕获前通过CUDAGraph.register_generator_state
注册这些实例。避免在捕获过程中使用Generator.get_state
和Generator.set_state
,而应使用Generator.graphsafe_set_state
和Generator.graphsafe_get_state
来安全地管理生成器状态,以确保在 CUDA 图上下文中正确执行 RNG 操作和生成器管理。
违反以下任何一条可能导致静默数值错误或未定义行为:
- 在同一进程中,同一时间只能进行一次捕获操作。
- 捕获过程中,该进程(任何线程)不得运行非捕获的 CUDA 工作。
- CPU 工作不会被捕获。如果捕获的操作包含 CPU 工作,重放时会忽略这部分工作。
- 每次重放都会读取和写入相同的(虚拟)内存地址。
- 禁止基于 CPU 或 GPU 数据的动态控制流。
- 禁止使用动态形状。图假设捕获的操作序列中的每个张量在每次重放时具有相同的大小和布局。
- 允许在捕获中使用多流,但存在限制条件。
非约束条件
- 一旦捕获完成,该计算图可以在任何流上重放执行。
全网捕获
如果您的整个网络流量可被捕获,您就能捕获并重放完整的网络交互过程:
***
N, D_in, H, D_out = 640, 4096, 2048, 1024
model = torch.nn.Sequential(torch.nn.Linear(D_in, H), torch.nn.Dropout(p=0.2), torch.nn.Linear(H, D_out), torch.nn.Dropout(p=0.1)).cuda()
loss_fn = torch.nn.MSELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.1)# Placeholders used for capture
static_input = torch.randn(N, D_in, device='cuda')
static_target = torch.randn(N, D_out, device='cuda')# warmup
# Uses static_input and static_target here for convenience, # but in a real setting, because the warmup includes optimizer.step()
# you must use a few batches of real data.
s = torch.cuda.Stream()
s.wait_stream(torch.cuda.current_stream())
with torch.cuda.stream(s):for i in range(3):optimizer.zero_grad(set_to_none=True)y_pred = model(static_input)loss = loss_fn(y_pred, static_target)loss.backward()optimizer.step()
torch.cuda.current_stream().wait_stream(s)# capture
g = torch.cuda.CUDAGraph()
# Sets grads to None before capture, so backward() will create
# .grad attributes with allocations from the graph's private pool
optimizer.zero_grad(set_to_none=True)
with torch.cuda.graph(g):static_y_pred = model(static_input)static_loss = loss_fn(static_y_pred, static_target)static_loss.backward()optimizer.step()real_inputs = [torch.rand_like(static_input) for _ in range(10)]
real_targets = [torch.rand_like(static_target) for _ in range(10)]for data, target in zip(real_inputs, real_targets):# Fills the graph's input memory with new data to compute on static_input.copy_(data)static_target.copy_(target)# replay() includes forward, backward, and step.# You don't even need to call optimizer.zero_grad() between iterations# because the captured backward refills static .grad tensors in place.g.replay()# Params have been updated. static_y_pred, static_loss, and .grad# attributes hold values from computing on this iteration's data.
部分网络捕获
如果您的网络中有部分内容不适合进行捕获(例如由于动态控制流、动态形状、CPU同步或关键CPU端逻辑),可以对这些不安全部分保持即时执行模式,同时使用torch.cuda.make_graphed_callables()
仅对可安全捕获的部分进行图化处理。
默认情况下,make_graphed_callables()
返回的可调用对象支持自动微分,可以直接在训练循环中替代原始函数或nn.Module
。
make_graphed_callables()
内部会创建CUDAGraph
对象,执行预热迭代,并根据需要维护静态输入输出。因此(与torch.cuda.graph
不同),您无需手动处理这些操作。
在以下示例中,数据相关的动态控制流导致网络无法端到端捕获,但通过make_graphed_callables()
,我们依然可以将可图化的部分捕获并运行:
N, D_in, H, D_out = 640, 4096, 2048, 1024module1 = torch.nn.Linear(D_in, H).cuda()
module2 = torch.nn.Linear(H, D_out).cuda()
module3 = torch.nn.Linear(H, D_out).cuda()loss_fn = torch.nn.MSELoss()
optimizer = torch.optim.SGD( chain(module1.parameters(), module2.parameters(), module3.parameters()), lr=0.1)# Sample inputs used for capture
# requires_grad state of sample inputs must match
# requires_grad state of real inputs each callable will see.
x = torch.randn(N, D_in, device='cuda')
h = torch.randn(N, H, device='cuda', requires_grad=True)module1 = torch.cuda.make_graphed_callables(module1, (x,))
module2 = torch.cuda.make_graphed_callables(module2, (h,))
module3 = torch.cuda.make_graphed_callables(module3, (h,))real_inputs = [torch.rand_like(x) for _ in range(10)]
real_targets = [torch.randn(N, D_out, device="cuda") for _ in range(10)]for data, target in zip(real_inputs, real_targets):optimizer.zero_grad(set_to_none=True)tmp = module1(data) # forward ops run as a graphif tmp.sum().item() > 0:tmp = module2(tmp) # forward ops run as a graphelse:tmp = module3(tmp) # forward ops run as a graphloss = loss_fn(tmp, target)# module2's or module3's (whichever was chosen) backward ops, # as well as module1's backward ops, run as graphsloss.backward()optimizer.step()
与 torch.cuda.amp 配合使用
对于常规优化器,GradScaler.step
会同步 CPU 与 GPU 的操作,这在捕获过程中是被禁止的。为避免错误,可以采用以下两种方案:
1、使用部分网络捕获功能
2、若前向传播、损失计算和反向传播是捕获安全的,则捕获这三个阶段但不捕获优化器步骤
# warmup
# In a real setting, use a few batches of real data.
s = torch.cuda.Stream()
s.wait_stream(torch.cuda.current_stream())
with torch.cuda.stream(s):for i in range(3):optimizer.zero_grad(set_to_none=True)with torch.cuda.amp.autocast():y_pred = model(static_input)loss = loss_fn(y_pred, static_target)scaler.scale(loss).backward()scaler.step(optimizer)scaler.update()torch.cuda.current_stream().wait_stream(s)# capture
g = torch.cuda.CUDAGraph()
optimizer.zero_grad(set_to_none=True)
with torch.cuda.graph(g):with torch.cuda.amp.autocast():static_y_pred = model(static_input)static_loss = loss_fn(static_y_pred, static_target)scaler.scale(static_loss).backward()# don't capture scaler.step(optimizer) or scaler.update()real_inputs = [torch.rand_like(static_input) for _ in range(10)]
real_targets = [torch.rand_like(static_target) for _ in range(10)]for data, target in zip(real_inputs, real_targets):static_input.copy_(data)static_target.copy_(target)g.replay()# Runs scaler.step and scaler.update eagerlyscaler.step(optimizer)scaler.update()
多流使用方式
捕获模式会自动传播到与捕获流同步的所有流。在捕获过程中,您可以通过向不同流发起调用来展现并行性,但整个流依赖关系图必须在捕获开始后从初始捕获流分支出,并在捕获结束前重新汇合到初始流。
with torch.cuda.graph(g):# at context manager entrance, torch.cuda.current_stream()# is the initial capturing stream# INCORRECT (does not branch out from or rejoin initial stream)with torch.cuda.stream(s):cuda_work()# CORRECT:# branches out from initial streams.wait_stream(torch.cuda.current_stream())with torch.cuda.stream(s):cuda_work()# rejoins initial stream before capture endstorch.cuda.current_stream().wait_stream(s)
注意:为避免使用 nsight systems 或 nvprof 查看回放的高级用户产生混淆:
与即时执行不同,在捕获过程中,图会将非平凡的流 DAG 视为提示而非命令。在回放期间,图可能会将独立操作重新组织到不同的流上,或以不同的顺序将它们加入队列(同时仍遵循原始 DAG 的整体依赖关系)。
与 DistributedDataParallel 配合使用
NCCL < 2.9.6 版本
NCCL 2.9.6 之前的版本不支持在计算图中捕获集合通信操作。此时必须使用部分网络捕获方案,该方案会将所有reduce操作推迟到反向计算图的非图化部分执行。
在使用DDP包装网络之前,需先对可图化的网络部分调用 make_graphed_callables()
方法。
(注:保留所有代码块、链接和术语的原始格式,技术术语如NCCL、DDP、allreduces等未翻译,被动语态已转换为主动语态,长句进行了合理拆分)
NCCL >= 2.9.6
NCCL 2.9.6 或更高版本支持在计算图中执行集合操作。采用完整反向传播捕获的方法是可行的,但需要三个设置步骤:
1、禁用 DDP 的内部异步错误处理:
os.environ["NCCL_ASYNC_ERROR_HANDLING"] = "0"
torch.distributed.init_process_group(...)
2、在实现完全反向捕获之前,必须在侧流上下文中构建DDP:
with torch.cuda.stream(s):model = DistributedDataParallel(model)
3、在开始捕获之前,您的预热过程必须至少运行11次启用DDP的即时模式迭代。
图内存管理
每次重放时,捕获的计算图都会操作相同的虚拟地址。如果PyTorch释放了这些内存,后续重放可能会触发非法内存访问。如果PyTorch将这些内存重新分配给新的张量,重放过程可能会破坏这些张量所持有的值。因此,计算图使用的虚拟地址必须在多次重放期间被保留。PyTorch缓存分配器通过检测捕获过程,并从图专属的内存池中分配内存来实现这一目标。该私有内存池会一直存活,直到其对应的CUDAGraph
对象及捕获期间创建的所有张量超出作用域。
私有内存池由系统自动维护。默认情况下,分配器会为每次捕获创建独立的私有内存池。这种保守策略确保了多个计算图的重放过程不会互相破坏数据值,但有时可能会造成不必要的内存浪费。
跨捕获共享内存
为了节省私有内存池中的内存占用,torch.cuda.graph
和 torch.cuda.make_graphed_callables()
提供了让不同捕获共享同一私有内存池的可选功能。当满足以下条件时,多个计算图共享私有内存池是安全的:
1、这些计算图总是按照捕获时的顺序回放
2、这些计算图永远不会并发回放
torch.cuda.graph
的 pool
参数用于指定使用特定的私有内存池,通过该参数可以实现跨计算图的内存共享,如下所示:
g1 = torch.cuda.CUDAGraph()
g2 = torch.cuda.CUDAGraph()# (create static inputs for g1 and g2, run warmups of their workloads...)# Captures g1
with torch.cuda.graph(g1):static_out_1 = g1_workload(static_in_1)# Captures g2, hinting that g2 may share a memory pool with g1
with torch.cuda.graph(g2, pool=g1.pool()):static_out_2 = g2_workload(static_in_2)static_in_1.copy_(real_data_1)
static_in_2.copy_(real_data_2)
g1.replay()
g2.replay()
使用 torch.cuda.make_graphed_callables()
时,若需对多个可调用对象进行图捕获且已知它们总是按相同顺序运行(且永不并发),则应按实际工作负载中的执行顺序将它们作为元组传入。此时 make_graphed_callables()
会使用共享的私有内存池来捕获它们的计算图。
若实际工作负载中可调用对象的执行顺序可能变化或存在并发执行的情况,则禁止将它们作为元组一次性传入 make_graphed_callables()
。此时必须为每个可调用对象单独调用 make_graphed_callables()
。
分布式数据并行
https://pytorch.org/docs/stable/notes/ddp.html
警告:torch.nn.parallel.DistributedDataParallel
的实现会随时间演进。本文档基于 v1.4 版本的状态编写。
torch.nn.parallel.DistributedDataParallel
(DDP) 透明地执行分布式数据并行训练。本文将说明其工作原理并揭示实现细节。
示例
让我们从一个简单的 torch.nn.parallel.DistributedDataParallel
示例开始。该示例使用 torch.nn.Linear
作为本地模型,用 DDP 进行封装,然后在 DDP 模型上执行一次前向传播、一次反向传播以及优化器步骤。完成后,本地模型的参数将被更新,且所有不同进程上的模型应保持完全一致。
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.nn as nn
import torch.optim as optim
import os
from torch.nn.parallel import DistributedDataParallel as DDPdef example(rank, world_size):# create default process groupdist.init_process_group("gloo", rank=rank, world_size=world_size)# create local modelmodel = nn.Linear(10, 10).to(rank)# construct DDP modelddp_model = DDP(model, device_ids=[rank])# define loss function and optimizerloss_fn = nn.MSELoss()optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)# forward passoutputs = ddp_model(torch.randn(20, 10).to(rank))labels = torch.randn(20, 10).to(rank)# backward passloss_fn(outputs, labels).backward()# update parametersoptimizer.step()def main():world_size = 2mp.spawn(example, args=(world_size,), nprocs=world_size, join=True)if __name__=="__main__":# Environment variables which need to be# set when using c10d's default "env"# initialization mode.os.environ["MASTER_ADDR"] = "localhost"os.environ["MASTER_PORT"] = "29500"main()
DDP 可与 TorchDynamo 协同工作。当与 TorchDynamo 配合使用时,需在模型编译前应用 DDP 模型封装器,这样 torchdynamo 就能基于 DDP 的桶大小应用 DDPOptimizer
(图中断优化)。(更多信息请参阅 TorchDynamo DDPOptimizer)
ddp_model = DDP(model, device_ids=[rank])
ddp_model = torch.compile(ddp_model)
内部设计
本节通过深入剖析每次迭代中的每个步骤,揭示 torch.nn.parallel.DistributedDataParallel
的底层工作原理。
- 前置条件:DDP 依赖 c10d 的
ProcessGroup
进行通信。因此应用程序必须在构建 DDP 前先创建ProcessGroup
实例。 - 构建阶段:DDP 构造函数接收本地模块的引用,并将 rank 0 进程的
state_dict()
广播给组内所有其他进程,确保所有模型副本从完全相同的状态开始。
接着,每个 DDP 进程会创建一个本地Reducer
,该对象将在反向传播阶段负责梯度同步。为了提高通信效率,Reducer
会将参数梯度组织到多个桶中,并逐个桶进行规约操作。
桶大小可通过 DDP 构造函数的 bucket_cap_mb 参数配置。
从参数梯度到桶的映射关系在构建时根据桶大小限制和参数尺寸确定。
模型参数大致按照给定模型Model.parameters()
的逆序分配到桶中,这是因为 DDP 预期梯度在反向传播期间会按近似该顺序就绪。
下图展示了示例场景,注意grad0
和grad1
位于bucket1
,其余两个梯度则在bucket0
中。
当然这个假设并非总是成立,当出现偏差时可能影响 DDP 的反向传播速度,因为Reducer
无法在最早时机启动通信。
除分桶外,Reducer
在构建时还会为每个参数注册自动求导钩子,这些钩子将在反向传播期间梯度就绪时触发。 - 前向传播:DDP 将输入传递给本地模型后,若
find_unused_parameters
设为True
,还会分析本地模型的输出。
此模式允许在模型子图上运行反向传播,DDP 通过从模型输出遍历自动求导图来识别参与反向传播的参数,并将未使用的参数标记为"就绪可规约"。
在反向传播阶段,Reducer
仅会等待未就绪的参数,但仍会规约所有桶。
当前将参数梯度标记为就绪并不能让 DDP 跳过桶处理,但能避免 DDP 在反向传播时永久等待不存在的梯度。
注意遍历自动求导图会引入额外开销,因此应仅在必要时启用find_unused_parameters
。 - 反向传播:直接在损失
Tensor
上调用backward()
函数(此过程不受 DDP 控制),DDP 通过构建时注册的自动求导钩子触发梯度同步。
当某个梯度就绪时,其对应梯度累加器上的 DDP 钩子就会触发,DDP 随即将该参数梯度标记为"就绪可规约"。
当某个桶内所有梯度就绪时,Reducer
会异步启动该桶的allreduce
操作以计算所有进程的梯度均值。
当所有桶就绪后,Reducer
将阻塞等待所有allreduce
操作完成。
完成后,平均梯度会写入所有参数的param.grad
字段,因此反向传播后不同 DDP 进程中对应参数的 grad 字段应当相同。 - 优化器步骤:从优化器视角看,它是在优化本地模型。所有 DDP 进程上的模型副本能保持同步,因为它们起始状态相同,且每次迭代都获得相同的平均梯度。
注意:DDP 要求所有进程上的 Reducer
实例必须按完全相同的顺序调用 allreduce
,这是通过始终按桶索引顺序(而非实际桶就绪顺序)执行 allreduce
来实现的。跨进程的 allreduce
顺序失配会导致错误结果或 DDP 反向传播挂起。
实现
以下是 DDP 实现组件的关键点。堆叠图展示了代码的结构。
进程组
- ProcessGroup.hpp:
包含所有进程组实现的抽象API。c10d
库默认提供了3种实现,分别是ProcessGroupGloo、ProcessGroupNCCL和ProcessGroupMPI。
DistributedDataParallel
在初始化时使用ProcessGroup::broadcast()
将模型状态从rank 0进程广播到其他进程,并使用ProcessGroup::allreduce()
对梯度进行求和。 - Store.hpp:
为进程组实例提供会合服务,帮助它们相互发现。
分布式数据并行 (DistributedDataParallel)
- distributed.py: 这是DDP的Python入口点。它实现了
nn.parallel.DistributedDataParallel
模块的初始化步骤和forward
函数,这些函数会调用C++库。其中的_sync_param
函数在单个DDP进程处理多个设备时执行进程内参数同步,同时还会将模型缓冲区从rank 0进程广播到所有其他进程。进程间的参数同步发生在Reducer.cpp
中。 - comm.h: 实现了合并广播辅助函数,该函数在初始化期间用于广播模型状态,并在前向传播之前同步模型缓冲区。
- reducer.h: 提供了反向传播中梯度同步的核心实现。它包含三个入口函数:
Reducer
: 构造函数在distributed.py
中被调用,用于将Reducer::autograd_hook()
注册到梯度累加器。autograd_hook()
函数会在梯度准备就绪时由自动微分引擎调用。prepare_for_backward()
在distributed.py
的DDP前向传播结束时调用。当DDP构造函数中find_unused_parameters
设置为True
时,它会遍历自动微分图以查找未使用的参数。
TorchDynamo DDPOptimizer(分布式数据并行优化器)
DDP(分布式数据并行)的性能优势源于在反向传播过程中将AllReduce集合操作与计算任务重叠执行。
然而,当使用AotAutograd配合TorchDynamo编译完整的前向和反向计算图时,这种重叠会被破坏——因为AllReduce操作由自动求导钩子在优化后的整个反向计算完成后才触发。
TorchDynamo的DDPOptimizer通过以下方式解决该问题:在反向传播期间,按照DDP的AllReduce桶(bucket)逻辑边界将前向计算图分割。
注意:核心目标是在反向传播阶段拆分计算图,而最简单的实现方式是先拆分前向图,然后对每个分段分别调用AotAutograd和编译。
这使得DDP的AllReduce钩子能在反向传播的各个分段之间触发,从而让通信操作与计算任务实现重叠执行。
更深入的技术解析和实验结果请参阅这篇博客文章,或查看torch/_dynamo/optimizations/distributed.py的文档与代码。
调试DDPOptimizer时,可通过设置TORCH_LOGS='ddp_graphs'
获取完整的计算图转储。
若只需日志而不需要计算图,可在TORCH_LOGS
中添加dynamo
、distributed
或dist_ddp
(用于获取桶边界的基本信息)。
如需禁用DDPOptimizer,可设置torch._dynamo.config.optimize_ddp=False
。
注意:即使不启用DDPOptimizer,DDP与TorchDynamo仍能正确协同工作,但会导致性能下降。
扩展 PyTorch
https://pytorch.org/docs/stable/notes/extending.html
在本指南中,我们将介绍扩展 torch.nn
、torch.autograd
、torch
的方法,以及如何编写自定义 C++ 扩展。
添加新运算符
PyTorch 提供了大量作用于张量的运算符库(例如 torch.add()
、torch.sum()
等)。但有时您可能希望为 PyTorch 引入新的自定义运算,并使其行为与 PyTorch 内置运算符一致。为此,您需要通过 Python 的 torch.library 或 C++ 的 TORCH_LIBRARY API 向 PyTorch 注册自定义运算。
更多详情请参阅 PyTorch 自定义运算符指南页。
扩展 torch.autograd
要为 autograd
添加新操作,需要为每个操作实现一个 Function
子类。请记住,autograd
正是通过 Function 类来记录操作历史并计算梯度的。
本文档第一部分重点介绍反向模式自动微分(AD),因为这是最常用的功能。最后一部分将讨论前向模式 AD 的扩展实现。
何时使用自定义函数
一般来说,当您需要在模型中执行以下两种计算时,应该实现自定义函数:
1、不可微分运算或依赖非PyTorch库(如NumPy)的计算
2、同时希望该操作能与其他运算串联使用,并与自动求导引擎兼容
在以下场景中,自定义函数还能优化性能和内存使用:
- 如果您使用C++扩展实现了前向和反向传播,可以通过封装成
Function
来对接自动求导引擎 - 当需要减少反向传播时保存的缓冲区数量时,可以通过自定义函数将多个运算合并
何时不应使用
如果你已经能够用 PyTorch 内置运算符编写函数,那么它的反向计算图(极有可能)已经可以被 autograd 自动记录。这种情况下,你不需要自己实现反向函数,直接使用普通的 Python 函数即可。
如果需要维护状态(例如可训练参数),你应该(同时)使用自定义模块。更多关于扩展 torch.nn
的信息,请参阅下文相关章节。
如果希望在反向传播过程中修改梯度或执行副作用操作,可以考虑注册 tensor 或 Module 钩子。
使用方法
按照以下步骤操作:
1、继承 Function
类并实现 forward()
、(可选的)setup_context()
和 backward()
方法。
2、在 ctx
参数上调用适当的方法。
3、声明你的函数是否支持双重反向传播。
4、使用 gradcheck 验证梯度计算是否正确。
步骤1: 继承 Function
后,需要定义3个方法:
forward()
是执行运算的代码。它可以接受任意数量的参数,其中部分参数可通过指定默认值设为可选。
这里允许传入所有类型的Python对象。会追踪历史记录的Tensor
参数(即requires_grad=True
)将在调用前被转换为不追踪历史的张量,它们的使用会被记录在计算图中。
注意:该逻辑不会遍历列表/字典/其他数据结构,仅处理直接作为调用参数的张量。可以返回单个Tensor
输出,或多个输出组成的tuple
。另请参阅Function
文档,了解只能在forward()
中调用的实用方法说明。setup_context()
(可选)。可以编写一个接受ctx
对象的"组合式"forward()
,或者(从PyTorch 2.0开始)使用不接收ctx
的独立forward()
配合专门修改ctx
的setup_context()
方法。前者应包含计算逻辑,后者应仅负责ctx
修改(不包含任何计算)。通常独立forward()
和setup_context()
更接近PyTorch原生操作的工作方式,因此能与各子系统更好地组合。详见组合式或分离式forward()与setup_context()。backward()
(或vjp()
)定义梯度公式。
它会接收与输出数量相同的Tensor
参数,每个参数表示对应输出的梯度。
注意:绝对不要就地修改这些参数。应返回与输入数量相同的张量,每个张量包含对应输入的梯度。
如果某些输入不需要梯度(needs_input_grad
是布尔元组,指示各输入是否需要梯度计算),或是非Tensor
对象,可以返回python:None
。
如果forward()
有可选参数,只要额外梯度都是None
,返回的梯度数量可以多于输入数量。
步骤2: 你有责任正确使用 ctx
中的函数,以确保新 Function
能与自动微分引擎协同工作。
- 必须使用
save_for_backward()
保存反向传播中需要使用的张量。
非张量应直接存储在 ctx 上。如果保存了既非输入也非输出的张量,你的Function
可能不支持双重反向传播(见步骤3)。 - 必须使用
mark_dirty()
标记被前向函数就地修改的输入。 - 必须使用
mark_non_differentiable()
告知引擎哪些输出不可微分。
默认情况下,所有可微分类型的输出张量都会被设为需要梯度。不可微分类型(如整数类型)的张量永远不会被标记为需要梯度。 - 可以使用
set_materialize_grads()
告诉自动微分引擎在输出不依赖输入的情况下优化梯度计算——不将传给 backward 的梯度张量具体化。
即设为 False 时,Python 中的 None 对象或 C++ 中的"未定义张量"(x.defined() 为 False 的张量 x)在调用 backward 前不会被转换为零填充张量,你的代码需要将这些对象视为零填充张量处理。该设置默认为 True。
步骤3: 如果你的 Function
不支持双重反向传播,应通过用 once_differentiable()
装饰 backward 来显式声明。使用该装饰器后,尝试通过你的函数执行双重反向传播将报错。关于双重反向传播的更多信息,请参阅我们的双重反向教程。
步骤4: 建议使用 torch.autograd.gradcheck()
来检查反向函数是否正确计算前向传播的梯度。该方法会使用你的反向函数计算雅可比矩阵,并通过有限差分法数值计算的雅可比矩阵进行逐元素比较验证。
示例
以下是一个带有附加注释的 Linear
函数代码:
# Inherit from Function
class LinearFunction(Function):# Note that forward, setup_context, and backward are @staticmethods@staticmethoddef forward(input, weight, bias):output = input.mm(weight.t())if bias is not None:output += bias.unsqueeze(0).expand_as(output)return output@staticmethod# inputs is a Tuple of all of the inputs passed to forward.# output is the output of the forward().def setup_context(ctx, inputs, output):input, weight, bias = inputsctx.save_for_backward(input, weight, bias)# This function has only a single output, so it gets only one gradient@staticmethoddef backward(ctx, grad_output):# This is a pattern that is very convenient - at the top of backward# unpack saved_tensors and initialize all gradients w.r.t. inputs to # None. Thanks to the fact that additional trailing Nones are # ignored, the return statement is simple even when the function has# optional inputs.input, weight, bias = ctx.saved_tensorsgrad_input = grad_weight = grad_bias = None# These needs_input_grad checks are optional and there only to # improve efficiency. If you want to make your code simpler, you can# skip them. Returning gradients for inputs that don't require it is# not an error.if ctx.needs_input_grad[0]:grad_input = grad_output.mm(weight)if ctx.needs_input_grad[1]:grad_weight = grad_output.t().mm(input)if bias is not None and ctx.needs_input_grad[2]:grad_bias = grad_output.sum(0)return grad_input, grad_weight, grad_bias
为了方便使用这些自定义操作,我们建议为其创建别名或封装成函数。封装成函数的方式可以支持默认参数和关键字参数:
# Option 1: alias
linear = LinearFunction.apply# Option 2: wrap in a function, to support default args and keyword args.
def linear(input, weight, bias=None):return LinearFunction.apply(input, weight, bias)
这里,我们提供一个额外示例,展示一个由非张量参数参数化的函数:
class MulConstant(Function):@staticmethoddef forward(tensor, constant):return tensor * constant@staticmethoddef setup_context(ctx, inputs, output):# ctx is a context object that can be used to stash information# for backward computationtensor, constant = inputsctx.constant = constant@staticmethoddef backward(ctx, grad_output):# We return as many input gradients as there were arguments.# Gradients of non-Tensor arguments to forward must be None.return grad_output * ctx.constant, None
这里我们通过调用 set_materialize_grads(False)
来优化上述示例:
class MulConstant(Function):@staticmethoddef forward(tensor, constant):return tensor * constant@staticmethoddef setup_context(ctx, inputs, output):tensor, constant = inputsctx.set_materialize_grads(False)ctx.constant = constant@staticmethoddef backward(ctx, grad_output):# Here we must handle None grad_output tensor. In this case we # can skip unnecessary computations and just return None.if grad_output is None:return None, None# We return as many input gradients as there were arguments.# Gradients of non-Tensor arguments to forward must be None.return grad_output * ctx.constant, None
如果你需要在 forward()
中计算并保存"中间"张量,要么将它们作为输出返回,要么结合使用 forward
和 setup_context()
(参见合并或分离 forward() 与 setup_context())。
请注意,若希望梯度能通过这些中间值反向传播,你需要为它们定义梯度公式(另见双重反向传播教程)。
class MyCube(torch.autograd.Function):@staticmethoddef forward(x):# We wish to save dx for backward. In order to do so, it must# be returned as an output.dx = 3 * x ** 2result = x ** 3return result, dx@staticmethoddef setup_context(ctx, inputs, output):x, = inputsresult, dx = outputctx.save_for_backward(x, dx)@staticmethoddef backward(ctx, grad_output, grad_dx):x, dx = ctx.saved_tensors# In order for the autograd.Function to work with higher-order# gradients, we must add the gradient contribution of `dx`, # which is grad_dx * 6 * x.result = grad_output * dx + grad_dx * 6 * xreturn result# Wrap MyCube in a function so that it is clearer what the output is
def my_cube(x):result, dx = MyCube.apply(x)return result
注意:backward
的输入(即grad_output
)也可以是记录历史信息的张量。因此,如果backward
通过可微操作实现(例如调用另一个自定义的Function
),高阶导数将能够正常工作。
在这种情况下,通过save_for_backward
保存的张量也可用于反向传播,并会有梯度回传。但保存在ctx
中的张量则不会有梯度回传。
如果需要让保存在ctx
中的张量也能回传梯度,应将其作为自定义Function
的输出,并通过save_for_backward
保存。
建议检查所实现的backward
方法是否正确计算了函数的导数。可通过与有限差分法的数值近似结果进行对比验证。
from torch.autograd import gradcheck# gradcheck takes a tuple of tensors as input, check if your gradient
# evaluated with these tensors are close enough to numerical
# approximations and returns True if they all verify this condition.
input = (torch.randn(20,20,dtype=torch.double,requires_grad=True), torch.randn(30,20,dtype=torch.double,requires_grad=True))
test = gradcheck(linear, input, eps=1e-6, atol=1e-4)
print(test)
更多关于有限差分梯度比较的细节,请参阅数值梯度检查。
如果您的函数用于高阶导数(对反向传播过程进行微分),可以使用同一包中的gradgradcheck
函数来检查高阶导数。
合并或分离 forward()
与 setup_context()
定义 Function
主要有两种方式:
- 定义一个将前向计算逻辑与
setup_context()
合并的forward()
- (从 PyTorch 2.0 开始)分别定义
forward()
和setup_context()
我们推荐第二种方式(分离 forward()
和 setup_context()
),因为这种方式更接近 PyTorch 原生操作的实现方式,并且能与 torch.func
转换兼容。不过,我们计划同时支持这两种方式;将 forward()
与 setup_context()
合并会带来更大的灵活性,因为你可以保存中间结果而不必将其作为输出返回。
关于如何分别定义 forward()
和 setup_context()
的 Function
,请参阅前一节。
以下是一个展示如何定义合并 forward()
与 setup_context()
的 Function
的示例:
class LinearFunction(Function):@staticmethod# ctx is the first argument to forwarddef forward(ctx, input, weight, bias=None):# The forward pass can use ctx.ctx.save_for_backward(input, weight, bias)output = input.mm(weight.t())if bias is not None:output += bias.unsqueeze(0).expand_as(output)return output@staticmethoddef backward(ctx, grad_output):input, weight, bias = ctx.saved_tensorsgrad_input = grad_weight = grad_bias = Noneif ctx.needs_input_grad[0]:grad_input = grad_output.mm(weight)if ctx.needs_input_grad[1]:grad_weight = grad_output.t().mm(input)if bias is not None and ctx.needs_input_grad[2]:grad_bias = grad_output.sum(0)return grad_input, grad_weight, grad_bias
前向模式自动微分
重写前向模式自动微分公式的API与反向模式非常相似,但存在一些细微差别。你可以实现jvp()
函数。
该函数会接收与输入数量相同的Tensor
参数,每个参数代表对应输入的梯度。它应该返回与输出数量相同的张量,每个张量包含对应输出的梯度。jvp()
会在forward()
方法之后、apply()
返回之前被调用。
jvp()
与backward()
函数有几个关键区别:
- 可以通过ctx对象将数据从
forward()
传递到jvp()
。如果这些状态在backward()
中不需要,可以在jvp()
结束时通过del ctx.foo
显式释放。 jvp()
的实现必须支持反向微分,或者显式检查所有前向模式梯度都没有设置requires_grad
标志。jvp()
必须与forward()
的视图/原地操作行为保持一致。例如,如果第i个输入被原地修改,那么第i个梯度也必须原地更新。
类似地,如果第j个输出是第k个输入的视图,那么返回的第j个输出梯度必须是给定第k个输入梯度的视图。- 由于用户无法指定需要计算哪些梯度,
jvp()
应该始终计算所有输出的梯度。 - 前向模式梯度会遵循
set_materialize_grads()
设置的标志,当禁用该选项时,可能会收到None输入梯度。
torch.func
转换和/或 torch.vmap()
详情请参阅使用 autograd.Function 扩展 torch.func。
扩展 torch.nn
nn
模块提供了两种接口——模块化接口及其对应的函数式版本。您可以通过这两种方式进行扩展,但我们建议:
- 对于包含参数或缓冲区的各类层结构,采用模块化方式
- 对于无参数的操作(如激活函数、池化等),采用函数式实现
函数式操作的添加方法已在前文章节中完整涵盖。
添加 Module
由于 nn
大量使用了 autograd
,添加一个新的 Module
需要实现一个 Function
,该函数执行操作并能够计算梯度。从现在开始,我们假设要实现一个 Linear
模块,并且已经像上面的代码清单那样实现了函数。添加这个模块所需的代码非常少。现在,需要实现两个函数:
__init__
(可选)- 接收诸如内核大小、特征数量等参数,并初始化参数和缓冲区。forward()
- 实例化一个Function
并使用它来执行操作。它与上面展示的函数式包装器非常相似。
以下是如何实现 Linear
模块的示例:
class Linear(nn.Module):def __init__(self, input_features, output_features, bias=True):super().__init__()self.input_features = input_featuresself.output_features = output_features# nn.Parameter is a special kind of Tensor, that will get# automatically registered as Module's parameter once it's assigned# as an attribute. Parameters and buffers need to be registered, or# they won't appear in .parameters() (doesn't apply to buffers), and # won't be converted when e.g. .cuda() is called. You can use# .register_buffer() to register buffers.# nn.Parameters require gradients by default.self.weight = nn.Parameter(torch.empty(output_features, input_features))if bias:self.bias = nn.Parameter(torch.empty(output_features))else:# You should always register all possible parameters, but the # optional ones can be None if you want.self.register_parameter('bias', None)# Not a very smart way to initialize weightsnn.init.uniform_(self.weight, -0.1, 0.1)if self.bias is not None:nn.init.uniform_(self.bias, -0.1, 0.1)def forward(self, input):# See the autograd section for explanation of what happens here.return LinearFunction.apply(input, self.weight, self.bias)def extra_repr(self):# (Optional)Set the extra information about this module. You can test# it by printing an object of this class.return 'input_features={}, output_features={}, bias={}'.format(self.input_features, self.output_features, self.bias is not None)
扩展 torch
Python API
你可以通过定义一个与 Tensor
方法匹配的自定义类,来创建模拟 Tensor
的自定义类型。但如果你想将这些类型传递给顶级 torch
命名空间中的函数(如 torch.add()
),而这些函数原本只接受 Tensor
操作数,该怎么办?
如果你的自定义 Python 类型定义了一个名为 __torch_function__
的方法,当你的自定义类实例被传递给 torch
命名空间中的函数时,PyTorch 会调用你的 __torch_function__
实现。
这使得你可以为 torch
命名空间中的任何函数定义自定义实现,你的 __torch_function__
实现可以调用这些函数,从而让你的用户能够将自定义类型用于现有的 PyTorch 工作流(这些工作流原本是为 Tensor
编写的)。这种方式不仅适用于与 Tensor
无关的“鸭子”类型,也适用于用户定义的 Tensor
子类。
扩展 torch
实现类 Tensor
类型
注意:此功能灵感来源于 NumPy 的 __array_function__
协议。更多细节请参阅 NumPy 文档 和 NEP-0018。
为了让概念更具体,我们从一个展示 API 调度机制的简单示例开始。我们将创建一个自定义类型来表示二维标量张量,该类型由阶数 N
和对角线元素值 value
参数化:
class ScalarTensor(object):def __init__(self, N, value):self._N = Nself._value = valuedef __repr__(self):return "ScalarTensor(N={}, value={})".format(self._N, self._value)def tensor(self):return self._value * torch.eye(self._N)
这个设计的第一个版本并不实用。ScalarTensor
的主要功能是提供一个比基础张量类更紧凑的标量张量字符串表示形式。
>>> d = ScalarTensor(5, 2)
>>> d
ScalarTensor(N=5, value=2)
>>> d.tensor()
tensor([[2., 0., 0., 0., 0.], [0., 2., 0., 0., 0.], [0., 0., 2., 0., 0.], [0., 0., 0., 2., 0.], [0., 0., 0., 0., 2.]])
如果我们尝试将这个对象与 torch
API 一起使用,就会遇到问题:
>>> import torch
>>> torch.mean(d)
TypeError: mean(): argument 'input' (position 1) must be Tensor, not ScalarTensor
为 ScalarTensor
添加 __torch_function__
实现后,上述操作就能成功执行。让我们重新实现一遍,这次加上 __torch_function__
的实现:
***
HANDLED_FUNCTIONS = {}
class ScalarTensor(object):def __init__(self, N, value):self._N = Nself._value = valuedef __repr__(self):return "ScalarTensor(N={}, value={})".format(self._N, self._value)def tensor(self):return self._value * torch.eye(self._N)@classmethoddef __torch_function__(cls, func, types, args=(), kwargs=None):if kwargs is None:kwargs = {}if func not in HANDLED_FUNCTIONS or not all(issubclass(t, (torch.Tensor, ScalarTensor))for t in types):return NotImplementedreturn HANDLED_FUNCTIONS[func](*args, **kwargs)
__torch_function__
方法接收四个参数:func
表示被覆盖的 torch API 函数引用,types
是实现了 __torch_function__
的类张量类型列表,args
是传递给函数的参数元组,kwargs
是传递给函数的关键字参数字典。该方法使用名为 HANDLED_FUNCTIONS
的全局调度表来存储自定义实现,该字典的键是 torch
命名空间中的函数,值是对应 ScalarTensor
的实现。
注意:使用全局调度表并非 __torch_function__
API 的强制要求,这只是组织覆盖实现的一种实用设计模式。
仅靠这个类定义还不足以让 torch.mean
在处理 ScalarTensor
时正确工作——我们还需要为 ScalarTensor
操作数定义 torch.mean
的实现,并将该实现添加到 HANDLED_FUNCTIONS
调度表字典中。一种实现方式是定义装饰器:
import functools
def implements(torch_function):"""Register a torch function override for ScalarTensor"""def decorator(func):functools.update_wrapper(func, torch_function)HANDLED_FUNCTIONS[torch_function] = funcreturn funcreturn decorator
这可以应用于我们重写逻辑的实现中。
@implements(torch.mean)
def mean(input):return float(input._value) / input._N
通过这一改动,我们现在可以在 ScalarTensor
上使用 torch.mean
方法了:
>>> d = ScalarTensor(5, 2)
>>> torch.mean(d)
0.4
当然,torch.mean
是最简单的一类可重写函数示例,因为它仅接受一个操作数。我们可以使用相同的机制来重写接受多个操作数的函数,其中任何一个操作数可能是:
定义了 __torch_function__
的张量或类张量对象,例如 torch.add()
:
def ensure_tensor(data):if isinstance(data, ScalarTensor):return data.tensor()return torch.as_tensor(data)@implements(torch.add)
def add(input, other):try:if input._N == other._N:return ScalarTensor(input._N, input._value + other._value)else:raise ValueError("Shape mismatch!")except AttributeError:return torch.add(ensure_tensor(input), ensure_tensor(other))
该版本针对两个操作数都是 ScalarTensor
实例的情况提供了快速路径,同时也包含一个较慢的备用路径——当任一操作数不是 ScalarTensor
时,会将数据转换为张量。这样的设计确保重载函数无论操作数是 ScalarTensor
还是常规 Tensor
都能正确运作。
>>> s = ScalarTensor(2, 2)
>>> torch.add(s, s)
ScalarTensor(N=2, value=4)
>>> t = torch.tensor([[1, 1,], [1, 1]])
>>> torch.add(s, t)
tensor([[3., 1.], [1., 3.]])
请注意,我们实现的 add
方法不像 torch.add()
那样接受 alpha
或 out
作为关键字参数:
>>> torch.add(s, s, alpha=2)
TypeError: add() got an unexpected keyword argument 'alpha'
为了追求速度和灵活性,__torch_function__
调度机制不会检查重写函数的签名是否与 torch
API 中被重写函数的签名匹配。对于某些应用场景,忽略可选参数是可以接受的,但为了确保与 Tensor
的完全兼容性,用户实现的 torch API 函数应当精确模拟被重写函数的 API。
在 torch
API 中没有显式重写的函数会通过 __torch_function__
返回 NotImplemented
。如果所有定义了 __torch_function__
的操作数都返回 NotImplemented
,PyTorch 将抛出 TypeError
。这意味着在大多数情况下,当传递该类型的实例时,没有为该类型显式重写的操作会引发 TypeError
。
>>> torch.mul(s, 3)
TypeError: no implementation found for 'torch.mul' on types that implement __torch_function__: [ScalarTensor]
在实践中,这意味着如果你想通过实现__torch_function__
来覆盖默认行为,就需要显式地实现完整的torch
API,或者针对你的使用场景实现相关API子集。这可能是个艰巨的任务,因为完整的torch
API非常庞大。
另一个方案是:对于未处理的运算操作,不返回NotImplemented
,而是在没有覆盖实现时,将Tensor
传递给原始的torch
函数。例如,如果我们把ScalarTensor
的__torch_function__
实现修改为如下版本:
@classmethod
def __torch_function__(cls, func, types, args=(), kwargs=None):if kwargs is None:kwargs = {}if func not in HANDLED_FUNCTIONS or not all(issubclass(t, (torch.Tensor, ScalarTensor))for t in types):args = [a.tensor() if hasattr(a, 'tensor') else a for a in args]return func(*args, **kwargs)return HANDLED_FUNCTIONS[func](*args, **kwargs)
这样 torch.mul()
就能正常工作,但返回值类型始终会是 Tensor
而非 ScalarTensor
,即使两个操作数都是 ScalarTensor
实例时也是如此:
>>> s = ScalarTensor(2, 2)
>>> torch.mul(s, s)
tensor([[4., 0.], [0., 4.]])
另请参阅下面的 MetadataTensor
示例,这是该模式的另一种变体,但它始终返回 MetadataTensor
以通过 torch
API 中的操作传播元数据。
__torch_function__
协议设计用于全面覆盖 API,部分覆盖可能导致不良结果,特别是某些函数会抛出 TypeError
。
对于子类尤其如此,即使 torch.add、torch.Tensor.add 和 torch.Tensor.add 返回完全相同的结果,也必须全部覆盖。
未能做到这一点还可能导致无限递归。如果需要在 torch.Tensor
子类中实现某个函数,必须在实现中使用 super().__torch_function__
。
继承 torch.Tensor
类
从 1.7.0 版本开始,torch.Tensor
类的方法以及公共 torch.*
命名空间中应用于 torch.Tensor
子类的函数,将返回子类实例而非 torch.Tensor
实例:
>>> class SubTensor(torch.Tensor):
... pass
>>> type(torch.add(SubTensor([0]), SubTensor([1]))).__name__
'SubTensor'
>>> type(torch.add(SubTensor([0]), torch.tensor([1]))).__name__
'SubTensor'
如果存在多个子类,默认会选择继承层级最低的那个。如果无法唯一确定这种情况,则会抛出 TypeError
错误:
>>> type(torch.add(SubTensor2([0]), SubTensor([1]))).__name__
'SubTensor2'
>>> type(torch.add(SubTensor2([0]), torch.tensor([1]))).__name__
'SubTensor2'
>>> torch.add(SubTensor([0]), OtherSubTensor([1]))
Traceback (most recent call last):File "<stdin>", line 1, in <module>
TypeError: no implementation found for 'torch.add' on types that implement __torch_function__: [SubTensor, OtherSubTensor]
如果想对所有张量方法进行全局覆盖,可以使用 __torch_function__
。以下是一个记录所有函数/方法调用的示例:
class LoggingTensor(torch.Tensor):@classmethoddef __torch_function__(cls, func, types, args=(), kwargs=None):# NOTE: Logging calls Tensor.__repr__, so we can't log __repr__ without infinite recursionif func is not torch.Tensor.__repr__:logging.info(f"func: {func.__name__}, args: {args!r}, kwargs: {kwargs!r}")if kwargs is None:kwargs = {}return super().__torch_function__(func, types, args, kwargs)
然而,如果想在 Tensor 子类中覆盖某个方法,可以通过两种方式实现:直接覆盖该方法(在子类中定义该方法),或者使用 __torch_function__
并通过 func
进行匹配。
需要注意的是,在子类的 __torch_function__
方法中,应该始终调用 super().__torch_function__(func, ...)
而不是直接调用 func
,这与 1.7.0 版本之前的情况不同。如果未能这样做,可能会导致 func
递归调用 __torch_function__
,从而引发无限递归。
扩展 torch
的 Tensor
包装类型
另一个实用场景是创建包装 Tensor
的类型,可以通过属性或子类化实现。
下面我们实现了这种类型的特例——MetadataTensor
,它在 Tensor
上附加了一个元数据字典,这些元数据会通过 torch
操作传播。
由于这是对完整 torch
API 的通用包装,我们不需要单独实现每个重写方法,因此可以让 __torch_function__
的实现对允许的操作更加宽松:
class MetadataTensor(object):def __init__(self, data, metadata=None, **kwargs):self._t = torch.as_tensor(data, **kwargs)self._metadata = metadatadef __repr__(self):return "Metadata:\n{}\n\ndata:\n{}".format(self._metadata, self._t)@classmethoddef __torch_function__(cls, func, types, args=(), kwargs=None):if kwargs is None:kwargs = {}metadatas = tuple(a._metadata for a in args if hasattr(a, '_metadata'))args = [getattr(a, '_t', a) for a in args]assert len(metadatas) > 0ret = func(*args, **kwargs)return MetadataTensor(ret, metadata=metadatas[0])
这个简单实现可能无法兼容 torch
API 中的所有函数,但对于捕获大多数常见操作已经足够:
>>> metadata = {'owner': 'Ministry of Silly Walks'}
>>> m = MetadataTensor([[1, 2], [3, 4]], metadata=metadata)
>>> t = torch.tensor([[1, 2], [1, 2]])
>>> torch.add(t, m)
Metadata:
{'owner': 'Ministry of Silly Walks'}data:
tensor([[2, 4], [4, 6]])
>>> torch.mul(t, m)
Metadata:
{'owner': 'Ministry of Silly Walks'}data:
tensor([[1, 4], [3, 8]])
对定义了__torch_function__
的多种类型进行操作
可以在PyTorch API中同时使用多个各自实现了__torch_function__
的不同类型,但需要特别注意以下规则:
- 调度操作会收集每个操作数所有不同的
__torch_function__
实现,并按以下顺序调用:子类优先于父类,运算符表达式中其余操作数按从左到右顺序处理。 - 如果任何实现返回了非
NotImplemented
的值,则该值将作为结果返回。实现可以通过返回NotImplemented
来表明不支持该操作。 - 如果所有
__torch_function__
实现都返回NotImplemented
,PyTorch将抛出TypeError
异常。
PyTorch API 覆盖测试范围
实现 __torch_function__
时的一个棘手问题是:如果某些操作有覆盖而其他操作没有,用户最多只能获得不一致的体验,最糟的情况下,当他们使用未覆盖的函数时会遇到运行时错误。为了简化这一过程,PyTorch 提供了一个面向开发者的 API,用于确保对 __torch_function__
覆盖的全面支持。该 API 是私有的,未来可能未经警告就发生变更。
首先,要获取所有可覆盖函数的列表,可使用 torch.overrides._get_overridable_functions
。它会返回一个字典,其键是 PyTorch
Python API 中的命名空间,值是该命名空间内可覆盖的函数列表。例如,我们打印 torch.nn.functional
中前 5 个可覆盖函数的名称:
>>> from torch.overrides import get_overridable_functions
>>> func_dict = get_overridable_functions()
>>> nn_funcs = func_dict[torch.nn.functional]
>>> print([f.__name__ for f in nn_funcs[:5])
['adaptive_avg_pool1d', 'adaptive_avg_pool2d', 'adaptive_avg_pool3d', 'adaptive_max_pool1d', 'adaptive_max_pool1d_with_indices']
这份函数列表让我们能够遍历所有可重写的函数,但在实际应用中,仅凭这一点还不足以高效地为所有函数编写测试——若不对每个函数签名进行繁琐的手动复制,测试工作将难以开展。为了简化这一流程,torch.overrides._get_testing_overrides
函数会返回一个字典,该字典将 PyTorch
API 中的可重写函数映射到对应的虚拟 lambda 函数。
这些 lambda 函数与原始函数具有相同的签名,但会无条件返回 -1。这些函数在与 inspect
结合使用时尤为有用,可用于分析原始 PyTorch
函数的签名特征。
>>> import inspect
>>> from torch.overrides import get_testing_overrides
>>> override_dict = get_testing_overrides()
>>> dummy_add = override_dict[torch.add]
>>> inspect.signature(dummy_add)
<Signature (input, other, out=None)>
最终,torch.overrides.get_ignored_functions
会返回一个函数元组,这些函数明确无法通过 __torch_function__
进行重写。当需要确认某个未出现在 get_overridable_functions
返回字典中的函数是否确实不可重写时,这个列表会非常有用。
扩展 torch
原生 API
虽然 __torch_function__
允许有效扩展 PyTorch 纯 Python 组件的行为,但它无法扩展 PyTorch 中 C++ 实现的部分。为此,Tensor
子类还可以定义 __torch_dispatch__
,从而能够在 C++ 层面覆盖行为。
要有效使用此功能,了解 PyTorch 原生部分的实现方式非常重要。其中最重要的组件是我们称为“调度器”的部分(最佳描述可在这篇博客文章中找到,尽管内容稍有过时)。
如其名所示,它负责为特定函数调用调用正确的后端函数。例如,当调用 torch.add(a, b)
时,调度器会检查两个参数,确定应为此特定调用使用的“功能”(自动微分、自动转换、函数化等)和“后端”(CPU、CUDA、MPS 等),并最终调用所有正确的内核。
内核常做的一件事是“重新调度”。例如,当在 GPU 上使用自动转换运行神经网络时,第一次调用将是处理任何潜在自动转换逻辑并向下重新调度的自动转换内核。
接下来的功能将是自动微分,它会正确创建自动微分图并向下重新调度。
最后,我们到达 CUDA 的后端内核,它将启动正确的 CUDA 内核并返回最终结果。在返回过程中,自动微分会将图附加到输出上,最后自动转换有机会在退出时进行任何需要的更新。
调度器的一个配置是所有这些功能和后端键的调用顺序。最新的列表及其顺序可以在 DispatchKey.h
中的 DispatchKey
枚举中找到。为了扩展 torch,本次讨论中重要的顺序子集是:
vmap -> 自动转换 -> 自动微分 -> ZeroTensor -> Neg/Conj -> 函数化 -> Python -> 后端
本次讨论中最重要的键是 Python
,因为每个定义了 __torch_dispatch__
方法的 Tensor
子类都会调用此功能。从这里调用用户定义的方法,并可以任意覆盖行为。从这里再次调用提供的 func
将执行“重新调度”。
此实现的一些重要含义包括:
- 此代码“在所有功能之下”运行。因此,它像常规后端一样,仅负责生成每个
Tensor
的输出值(并且可以且应该忽略所有高级功能,如自动微分、自动转换等)。 - 如果任何高级功能实现了给定函数而不重新调度,它将永远不会到达
Python
键,因此__torch_dispatch__
回调永远不会触发。特别是CompositeImplicitAutograd
函数在自动微分级别评估而不重新调度时会发生这种情况。这是因为CompositeImplicitAutograd
函数通过隐式调用其他原生操作来指定其自动微分公式,因此在自动微分级别,函数被分解为其原生操作并评估这些操作。 - 当回调到 Python 并包装结果时,使用与常规 PyTorch Python/C++ 绑定相同的转换。特别是,某些对象无法在 Python 中表示,需要特殊处理(例如未定义的
Tensor
变为None
)。 - 我们的原生函数作为
torch.ops.{namespace}.{func_name}.{overload_name}
被惰性填充为可调用的 Python 对象,以便从 Python 轻松与之交互。提供给__torch_dispatch__
的func
对象始终是此命名空间中的一个条目。此命名空间可用于直接调用原生操作并绕过常规的 Python API 和绑定代码。
与 __torch_function__
能够介入所有 torch 的 Python API 和 Tensor
方法类似,__torch_dispatch__
能够拦截所有对 aten 原生 API 的调用。
请注意,Tensor
上的所有方法在进入调度器之前都会转换为函数调用,因此在这里会显示为函数调用:torch.add(a, 2)
和 a + 2
将导致完全相同的 aten 调用。
这些函数大多定义在 native_functions.yaml
中,该文件指定了这些函数的属性及其后端实现。它们的实现连同指定的功能随后通过代码生成自动注册。一些更特殊的函数或功能也会在 C++ 代码库的其他地方或用户定义的 C++ 扩展中注册。
还可以使用 torch.library
添加新的原生函数。此 Python 功能允许定义和/或为原生函数添加新的实现。这可用于添加缺失的内核、替换现有内核或定义全新的原生函数。
你可以在 subclass zoo 仓库中找到许多基于 __torch_dispatch__
的子类示例。
__torch_dispatch__
调用约定
@classmethod
def __torch_dispatch__(cls, func, types, args=(), kwargs=None):pass
当用户使用带有__torch_dispatch__
属性的输入调用运算符时,该调用可能会被转发到__torch_dispatch__
方法。在调用__torch_dispatch__
之前,args和kwargs会被规范化处理,具体规则如下:
kwargs
仅包含运算符签名中的关键字参数。如果某个关键字参数的值等于其默认值(根据签名定义),则该参数不会被传递。args
包含所有其他参数,无论这些参数是以位置参数还是关键字参数形式传入的。如果某个位置参数的值等于其默认值,并且该参数是最右侧的位置参数或其右侧的所有参数均未传递,则该参数不会被传递。
通过模式扩展所有 torch
API
遗憾的是,有些函数并不接收张量作为输入。这意味着上述子类方法无法用于覆盖 PyTorch 所有函数的行为。此外,若使用场景需要拦截每个函数调用,将所有张量改为子类可能侵入性过强。
为解决这类需求,我们引入了"模式"概念。这些模式适用于 __torch_function__
和 __torch_dispatch__
的重写,分别通过继承 torch.overrides.TorchFunctionMode
和 torch.utils._python_dispatch.TorchDispatchMode
来创建,并作为上下文管理器使用。
为简化描述模式与子类及其他模式的交互逻辑:每当进入某个模式的上下文管理器时,所有函数的行为都如同在参数列表开头额外添加了一个以该模式为子类的张量参数。这意味着所有模式处理器的调用都会优先于任何子类处理器,且内层上下文管理器对应的模式总是最先执行。
需特别注意:在特定模式处理器内部,该模式会被禁用,可通过 with self:
手动重新启用。
以下是展示两种类型日志模式的示例:
import torch
from torch.overrides import TorchFunctionMode, resolve_name
from torch.utils._python_dispatch import TorchDispatchModeclass FunctionLog(TorchFunctionMode):def __torch_function__(self, func, types, args, kwargs=None):print(f"Function Log: {resolve_name(func)}(*{args}, **{kwargs})")return func(*args, **(kwargs or {}))class DispatchLog(TorchDispatchMode):def __torch_dispatch__(self, func, types, args, kwargs=None):print(f"Dispatch Log: {func}(*{args}, **{kwargs})")return func(*args, **(kwargs or {}))def f():a = torch.rand(10, requires_grad=True)b = a * 2b.sum().backward()print("TorchFunctionMode logging:")
with FunctionLog():f()print("TorchDispatchMode logging:")
with DispatchLog():f()
这将打印以下内容(附带额外注释):
TorchFunctionMode logging:
Function Log: torch.rand(*(10,), **{'requires_grad': True})
Function Log: torch.Tensor.mul(*(tensor([0.7164, 0.9897, 0.1745, 0.9336, 0.4287, 0.7989, 0.2169, 0.7474, 0.5624, 0.5970], requires_grad=True), 2), **None)
Function Log: torch.Tensor.sum(*(tensor([1.4328, 1.9794, 0.3490, 1.8671, 0.8573, 1.5977, 0.4338, 1.4948, 1.1249, 1.1939], grad_fn=<MulBackward0>),), **None)
# Note that at the python level, we only see the call to backward but not what happens in the autograd engine.
Function Log: torch.Tensor.backward(*(tensor(12.3307, grad_fn=<SumBackward0>),), **{'gradient': None, 'retain_graph': None, 'create_graph': False, 'inputs': None})TorchDispatchMode logging:
# Here the requires_grad flag from autograd is removed while default arguments were populated.
Dispatch Log: aten.rand.default(*([10],), **{'device': device(type='cpu'), 'pin_memory': False})
Dispatch Log: aten.mul.Tensor(*(tensor([0.2151, 0.6018, 0.8415, 0.9060, 0.2974, 0.7708, 0.6668, 0.0352, 0.7948, 0.6023], requires_grad=True), 2), **{})
Dispatch Log: aten.sum.default(*(tensor([0.4303, 1.2036, 1.6831, 1.8120, 0.5949, 1.5416, 1.3335, 0.0705, 1.5897, 1.2046], grad_fn=<MulBackward0>),), **{})
# Here we don't see the call to backward itself, but its constituents. Starting here with the factory function that creates the initial gradient.
Dispatch Log: aten.ones_like.default(*(tensor(11.4637, grad_fn=<SumBackward0>),), **{'pin_memory': False, 'memory_format': torch.preserve_format})
# This is the backward of the sum
Dispatch Log: aten.expand.default(*(tensor(1.), [10]), **{})
Dispatch Log: aten.mul.Tensor(*(tensor([1., 1., 1., 1., 1., 1., 1., 1., 1., 1.]), 2), **{})
Dispatch Log: aten.detach.default(*(tensor([2., 2., 2., 2., 2., 2., 2., 2., 2., 2.]),), **{})
Dispatch Log: aten.detach.default(*(tensor([2., 2., 2., 2., 2., 2., 2., 2., 2., 2.]),), **{})
使用 autograd.Function 扩展 torch.func
https://pytorch.org/docs/stable/notes/extending.func.html
如果你想将 torch.autograd.Function
与 torch.func
的变换操作(如 torch.vmap()
、torch.func.grad()
等)结合使用,主要有两种应用场景:
- 你希望调用不包含 PyTorch 操作的代码,并使其支持函数变换。也就是说,
torch.autograd.Function
的 forward/backward 等方法会调用来自其他系统(如 C++、CUDA、numpy)的函数。 - 你希望指定自定义梯度规则,类似于 JAX 的 custom_vjp/custom_jvp
PyTorch 将这两个概念都整合到了 torch.autograd.Function
中。
基础用法
本指南假设您已熟悉扩展 torch.autograd文档,该文档详细说明了如何使用torch.autograd.Function
。
torch.autograd.Function
可以有两种形式:
1、包含接收ctx
对象的forward()
方法
2、分离的forward()
方法(不接收ctx
)配合修改ctx
对象的setup_context()
静态方法
函数变换仅支持第二种形式:
forward()
是执行运算的代码,不应接收ctx
对象setup_context(ctx, inputs, output)
是操作ctx
的代码区域,在此处可以:- 通过
ctx.save_for_backward(*tensors)
保存张量用于反向传播 - 通过赋值给
ctx
对象保存非张量数据
- 通过
由于setup_context()
仅接收inputs
和output
,可保存的数据仅限于:
- 输入/输出中的对象(如张量)
- 从它们派生的量(如
Tensor.shape
)
若需要保存Function.forward()
中的非输入中间激活值用于反向传播,必须将其作为forward()
的输出传递给setup_context()
。
根据变换类型的不同需求:
- 支持反向模式自动微分(
torch.func.grad()
、torch.func.vjp()
)需要实现backward()
静态方法 - 支持
torch.vmap()
需要实现vmap()
静态方法 - 支持
torch.func.jvp()
需要实现jvp()
静态方法 - 支持组合变换(如
torch.func.jacrev()
、torch.func.jacfwd()
、torch.func.hessian()
)可能需要同时实现多个上述方法
为使torch.autograd.Function
能任意组合函数变换,建议:
- 除
forward()
和setup_context()
外的所有静态方法必须可变换 - 这些方法应仅包含PyTorch运算符或调用其他
torch.autograd.Function
(可调用C++/CUDA等底层代码)
下面我们将通过典型用例进行具体说明。
示例1:autograd.Function调用外部系统
常见场景是 torch.autograd.Function
同时包含 forward() 和 backward() 方法,这两个方法会调用外部系统(如 C++、CUDA、numpy 或 triton)。
import torch
import numpy as npdef to_numpy(tensor):return tensor.cpu().numpy()class NumpySort(torch.autograd.Function):# Note that forward does not take ctx@staticmethoddef forward(x, dim):device = x.devicex = to_numpy(x)ind = np.argsort(x, axis=dim)ind_inv = np.argsort(ind, axis=dim)result = np.take_along_axis(x, ind, axis=dim)# Any intermediates to be saved in backward must be returned as # outputs.return (# The desired outputtorch.tensor(result, device=device), # intermediate to save for backwardtorch.tensor(ind, device=device), # intermediate to save for backwardtorch.tensor(ind_inv, device=device), )# setup_context is responsible for calling methods and/or assigning to # the ctx object. Please do not do additional compute (e.g. add# Tensors together) in setup_context.@staticmethoddef setup_context(ctx, inputs, output):x, dim = inputs# Note that output is whatever you returned from forward.# If you returned multiple values, then output is a Tuple of multiple values.# If you returned a single Tensor, then output is a Tensor.# If you returned a Tuple with a single Tensor, then output is a # Tuple with a single Tensor._, ind, ind_inv = outputctx.mark_non_differentiable(ind, ind_inv)# Tensors must be saved via ctx.save_for_backward. Please do not# assign them directly onto the ctx object.ctx.save_for_backward(ind, ind_inv)# Non-tensors may be saved by assigning them as attributes on the ctx object.ctx.dim = dim@staticmethoddef backward(ctx, grad_output, _0, _1):# For the autograd.Function to be arbitrarily composable with function# transforms, all staticmethod other than forward and setup_context# must be implemented in a "transformable" way; that is, they must# only consist of PyTorch operations or autograd.Function.## For example, this allows us to do double backwards and/or compute# second order gradients.## We've written the backward pass of NumpySort in terms of another# autograd.Function, NumpyTake.ind, ind_inv = ctx.saved_tensorsreturn NumpyTake.apply(grad_output, ind_inv, ind, ctx.dim), Noneclass NumpyTake(torch.autograd.Function):@staticmethoddef forward(x, ind, ind_inv, dim):device = x.devicex = to_numpy(x)ind = to_numpy(ind)return torch.tensor(np.take_along_axis(x, ind, dim), device=device)@staticmethoddef setup_context(ctx, inputs, output):x, ind, ind_inv, dim = inputsctx.save_for_backward(ind, ind_inv)ctx.dim = dim@staticmethoddef backward(ctx, grad_output):ind, ind_inv = ctx.saved_tensorsresult = NumpyTake.apply(grad_output, ind_inv, ind, ctx.dim)return result, None, None, None
现在,为了简化NumpySort
的使用(隐藏我们作为输出返回的中间结果,并允许默认参数和关键字参数),我们创建一个新的函数来调用它:
def numpy_sort(x, dim=-1):result, _, _ = NumpySort.apply(x, dim)return result
以下是完整性检查:
x = torch.randn(2, 3)
grad_x = torch.func.grad(lambda x: numpy_sort(x).sum())(x)
assert torch.allclose(grad_x, torch.ones_like(x))
示例2:autograd.Function自定义梯度规则
另一种常见情况是使用PyTorch操作实现的torch.autograd.Function
。PyTorch能够自动计算PyTorch操作的梯度,但有时我们可能希望自定义梯度计算方式。以下是一些需要自定义反向传播(而非使用PyTorch默认实现)的原因:
- 提高数值稳定性
- 改变反向传播的性能特征
- 修改边缘情况的处理方式(例如NaN、无穷大)
- 调整梯度值(例如梯度裁剪)
这里展示一个针对函数y = x ** 3
的torch.autograd.Function
示例,我们通过将部分本应在反向传播阶段计算dx的运算提前到前向传播阶段,从而改变了性能特征。
class MyCube(torch.autograd.Function):@staticmethoddef forward(x):result = x ** 3# In regular PyTorch, if we had just run y = x ** 3, then the backward# pass computes dx = 3 * x ** 2、In this autograd.Function, we've done# that computation here in the forward pass instead.dx = 3 * x ** 2return result, dx@staticmethoddef setup_context(ctx, inputs, output):x, = inputsresult, dx = outputctx.save_for_backward(x, dx)@staticmethoddef backward(ctx, grad_output, grad_dx):x, dx = ctx.saved_tensors# In order for the autograd.Function to work with higher-order# gradients, we must add the gradient contribution of `dx`.result = grad_output * dx + grad_dx * 6 * xreturn result
现在,为了让使用NumpySort
更加便捷(同时隐藏我们作为输出返回的中间变量),我们创建了一个调用它的新函数:
def my_cube(x):result, _ = MyCube.apply(x)return result
这是一个用于计算二阶梯度的完整性检查:
x = torch.randn([])
ggx = torch.func.grad(torch.func.grad(my_cube))(x)
assert torch.allclose(ggx, 6 * x)
限制与注意事项
警告:请仔细阅读以下关于 torch.autograd.Function
与 torch.func 转换的限制。我们无法捕获其中许多情况并优雅地报错,因此它们可能导致未定义行为。
请勿在 torch.autograd.Function
的方法中捕获以下张量:正在被转换的张量、requires_grad=True 的张量或对偶张量。确保完全安全的做法是:任何 torch.autograd.Function
方法内部使用的张量必须直接作为输入传递(或通过 ctx 对象),而非来自函数外部。
torch.autograd.Function
不处理 pytree 中的张量(可能包含张量的任意嵌套 Python 数据结构)。要使这些张量被 autograd 追踪,必须将它们直接作为参数传递给 torch.autograd.Function
。这与 jax.{custom_vjp, custom_jvp} 不同,后者支持 pytree。
请仅使用 save_for_backward()
或 save_for_forward()
保存张量。切勿直接将张量或张量集合赋值到 ctx 对象上——这些张量将无法被追踪。
torch.vmap()
支持
要在 torch.vmap()
中使用 torch.autograd.Function
,您必须满足以下条件之一:
- 提供一个
vmap()
静态方法,用于说明该torch.autograd.Function
在torch.vmap()
下的行为 - 通过设置
generate_vmap_rule=True
要求系统自动生成该规则
自动生成vmap规则
如果你的 torch.autograd.Function
满足以下额外约束条件,我们就能为它自动生成vmap规则。如果不满足这些约束条件,或者你想要在vmap下实现自定义行为,请手动定义vmap静态方法(参见下一节)。
警告:我们无法轻松检查以下约束条件并优雅地报错。违反这些约束条件可能导致未定义行为。
torch.autograd.Function
的forward()
、backward()
(如果存在)和jvp()
(如果存在)静态方法必须能够通过torch.vmap()
进行转换。也就是说,它们必须仅包含PyTorch操作(而不是NumPy或自定义CUDA内核等)。
示例:
class MyCube(torch.autograd.Function):# Set generate_vmap_rule to True to ask PyTorch to automatically generate# a vmap rule.generate_vmap_rule = True@staticmethoddef forward(x):result = x ** 3dx = 3 * x ** 2return result, dx@staticmethoddef setup_context(ctx, inputs, output):x, = inputsresult, dx = outputctx.save_for_backward(x, dx)@staticmethoddef backward(ctx, grad_output, grad_dx):x, dx = ctx.saved_tensorsresult = grad_output * dx + grad_dx * 6 * xreturn resultdef my_cube(x):result, dx = MyCube.apply(x)return resultx = torch.randn(3)
result = torch.vmap(my_cube)(x)
assert torch.allclose(result, x ** 3)
定义 vmap 静态方法
如果你的 torch.autograd.Function
调用了其他系统(如 NumPy、C++、CUDA、triton),为了让其与 torch.vmap()
或使用它的变换兼容,你需要手动定义一个 vmap()
静态方法。
根据你想使用的变换和具体场景,可能不需要为所有 torch.autograd.Function
添加 vmap()
静态方法:
- 例如,
torch.func.jacrev()
会对反向传播过程执行vmap()
。因此,如果你仅关注torch.func.jacrev()
的使用,只需确保backward()
静态方法支持向量化映射即可。
不过,我们建议确保所有 torch.autograd.Function
都支持 torch.vmap()
,尤其是当你开发第三方库时,希望你的 torch.autograd.Function
能与所有 torch.func()
变换组合兼容。
从概念上讲,vmap
静态方法负责定义 forward()
在 torch.vmap()
下的行为。也就是说,它定义了如何转换 forward()
,使其能处理带额外维度(即被向量化映射的维度)的输入。这与 torch.vmap()
在 PyTorch 操作上的实现方式类似:为每个操作定义一个 vmap 规则(有时也称为“批处理规则”)。
以下是定义 vmap()
静态方法的要点:
- 方法签名为
vmap(info, in_dims: Tuple[Optional[int]], *args)
,其中*args
与forward()
的参数一致。 vmap
静态方法需定义forward()
在torch.vmap()
下的行为。即,给定带额外维度(由in_dims
指定)的输入,如何计算forward()
的批处理版本?- 对于
args
中的每个参数,in_dims
都有一个对应的Optional[int]
。如果参数不是张量或未被向量化映射,则为None
;否则为一个整数,指定张量中被映射的维度。 info
是包含额外元数据的集合:info.batch_size
指定被映射维度的大小,info.randomness
是传递给torch.vmap()
的randomness
选项。vmap
静态方法的返回是一个元组(output, out_dims)
。与in_dims
类似,out_dims
的结构应与output
一致,并为每个输出包含一个out_dim
,指明输出是否包含被映射的维度及其位置索引。
示例:
def to_numpy(tensor):return tensor.cpu().numpy()class NumpySort(torch.autograd.Function):@staticmethoddef forward(x, dim):device = x.devicex = to_numpy(x)ind = np.argsort(x, axis=dim)ind_inv = np.argsort(ind, axis=dim)result = np.take_along_axis(x, ind, axis=dim)return (torch.tensor(result, device=device), torch.tensor(ind, device=device), torch.tensor(ind_inv, device=device), )@staticmethoddef setup_context(ctx, inputs, output):x, dim = inputs_, ind, ind_inv = outputctx.mark_non_differentiable(ind, ind_inv)ctx.save_for_backward(ind, ind_inv)ctx.dim = dim@staticmethoddef backward(ctx, grad_output, _0, _1):ind, ind_inv = ctx.saved_tensorsreturn NumpyTake.apply(grad_output, ind_inv, ind, ctx.dim), None# The signature of the vmap staticmethod is:# vmap(info, in_dims: Tuple[Optional[int]], *args)# where *args is the same as the arguments to `forward`.@staticmethoddef vmap(info, in_dims, x, dim):# For every input (x and dim), in_dims stores an Optional[int]# that is:# - None if the input is not being vmapped over or if the input# is not a Tensor# - an integer if the input is being vmapped over that represents# the index of the dimension being vmapped over.x_bdim, _ = in_dims# A "vmap rule" is the logic of how to perform the operation given# inputs with one additional dimension. In NumpySort, x has an# additional dimension (x_bdim). The vmap rule is simply# to call NumpySort again but pass it a different `dim`.x = x.movedim(x_bdim, 0)# Handle negative dims correctlydim = dim if dim >= 0 else dim + x.dim() - 1result = NumpySort.apply(x, dim + 1)# The vmap rule must return a tuple of two things# 1、the output. Should be the same amount of things# as returned by the forward().# 2、one Optional[int] for each output specifying if each output# is being vmapped over, and if so, the index of the # dimension being vmapped over.## NumpySort.forward returns a Tuple of 3 Tensors. Since we moved the # dimension being vmapped over to the front of `x`, that appears at # dimension 0 of all outputs.# The return is (output, out_dims) -- output is a tuple of 3 Tensors# and out_dims is a Tuple of 3 Optional[int]return NumpySort.apply(x, dim + 1), (0, 0, 0)class NumpyTake(torch.autograd.Function):@staticmethoddef forward(x, ind, ind_inv, dim):device = x.devicex = to_numpy(x)ind = to_numpy(ind)return torch.tensor(np.take_along_axis(x, ind, dim), device=device)@staticmethoddef setup_context(ctx, inputs, output):x, ind, ind_inv, dim = inputsctx.save_for_backward(ind, ind_inv)ctx.dim = dim@staticmethoddef backward(ctx, grad_output):ind, ind_inv = ctx.saved_tensorsresult = NumpyTake.apply(grad_output, ind_inv, ind, ctx.dim)return result, None, None, None@staticmethoddef vmap(info, in_dims, x, ind, ind_inv, dim):x_bdim, ind_bdim, ind_inv_bdim, _ = in_dims# The strategy is: expand {x, ind, ind_inv} to all have the dimension# being vmapped over.# Then, call back into NumpyTake(expanded_x, expanded_ind, expanded_ind_inv, new_dim).# Handle negative dims by wrapping them to be positivelogical_dim = x.dim() if x_bdim is None else x_bdim - 1dim = dim if dim >= 0 else dim + logical_dimdef maybe_expand_bdim_at_front(x, x_bdim):if x_bdim is None:return x.expand(info.batch_size, *x.shape)return x.movedim(x_bdim, 0)# If the Tensor doesn't have the dimension being vmapped over, # expand it out. Otherwise, move it to the front of the Tensorx = maybe_expand_bdim_at_front(x, x_bdim)ind = maybe_expand_bdim_at_front(ind, ind_bdim)ind_inv = maybe_expand_bdim_at_front(ind_inv, ind_inv_bdim)# The return is a tuple (output, out_dims). Since output is a Tensor, # then out_dims is an Optional[int] (instead of being a Tuple).return NumpyTake.apply(x, ind, ind_inv, dim + 1), 0def numpy_sort(x, dim=-1):result, _, _ = NumpySort.apply(x, dim)return resultx = torch.randn(2, 3)
result = torch.vmap(numpy_sort)(x)
assert torch.allclose(result, numpy_sort(result, 1))
注意:vmap
静态方法应确保保持整个Function
的语义不变。也就是说,(伪代码)grad(vmap(MyFunc))
应当能够被grad(map(MyFunc))
替代。
如果您的autograd.Function在反向传播中有任何自定义行为,请牢记这一点。
注意:为PyTorch能通过generate_vmap_rule=True
生成vmap规则的Function
编写自定义vmap静态方法是一个合理的用例。当生成的vmap规则不符合您期望的语义时,您可能需要这样做。
torch.func.jvp()
支持
要实现前向模式自动微分(AD),torch.autograd.Function
必须包含一个 jvp()
静态方法。具体细节请参阅前向模式AD文档。
常见问题解答
https://pytorch.org/docs/stable/notes/faq.html
我的模型报错"cuda runtime error(2): out of memory"
如错误信息所示,您的GPU内存已耗尽。由于在PyTorch中我们经常处理大量数据,细微的错误可能快速耗尽GPU内存;幸运的是,这类问题的修复通常很简单。以下是几个常见的检查点:
不要在训练循环中累积历史记录
默认情况下,涉及需要梯度的变量的计算会保留历史记录。这意味着您应该避免在训练循环之外的计算中使用这类变量(例如跟踪统计信息时)。相反,您应该分离变量或访问其底层数据。
有时,可微分变量的出现可能并不明显。考虑以下训练循环示例(节选自来源):
total_loss = 0
for i in range(10000):optimizer.zero_grad()output = model(input)loss = criterion(output)loss.backward()optimizer.step()total_loss += loss
在这里,total_loss
会在训练循环中持续累积历史记录,因为loss
是一个带有自动求导历史记录的可微分变量。你可以通过改用total_loss += float(loss)
来解决这个问题。
该问题的其他实例:1。
不要保留不需要的张量和变量
如果将Tensor或Variable赋值给局部变量,Python会等到该局部变量超出作用域后才释放内存。你可以通过使用del x
来释放这个引用。同样地,如果将Tensor或Variable赋值给对象的成员变量,该对象超出作用域后才会释放内存。如果不保留不需要的临时变量,可以获得最佳的内存使用效率。
局部变量的作用域可能比你想象的要大。例如:
for i in range(5):intermediate = f(input[i])result += g(intermediate)
output = h(result)
return output
在这里,intermediate
变量在 h
执行期间仍然保持活跃状态,因为它的作用域延伸到了循环结束之后。若想提前释放它,应在使用完毕后执行 del intermediate
。
避免在过长的序列上运行RNN。
RNN反向传播所需的内存会随输入序列长度线性增长。因此,如果尝试处理过长的序列,将会耗尽内存。
这种现象的技术术语称为时间反向传播。关于如何实现截断式BPTT(Truncated BPTT)有大量参考资料,例如单词语言模型示例中的实现——截断操作由repackage
函数处理,具体说明可参考此论坛帖子。
不要使用过大的线性层。
线性层 nn.Linear(m, n)
的内存占用为 O(nm):即权重的内存需求会随特征数量呈平方级增长。这种方式极易耗尽内存(需注意权重内存至少需要两倍空间,因为还需存储梯度)。
考虑使用检查点机制。
可以通过检查点功能来权衡内存与计算资源的消耗。
我的GPU内存未正确释放
PyTorch使用缓存内存分配器来加速内存分配。因此,nvidia-smi
显示的值通常不能反映真实的内存使用情况。有关GPU内存管理的更多详情,请参阅内存管理。
如果即使在Python退出后GPU内存仍未释放,很可能是某些Python子进程仍在运行。你可以通过ps -elf | grep python
查找这些进程,并使用kill -9 [pid]
手动终止它们。
我的内存不足异常处理程序无法分配内存
你可能编写了一些试图从内存不足错误中恢复的代码。
try:run_model(batch_size)
except RuntimeError: # Out of memoryfor _ in range(batch_size):run_model(1)
但你会发现,当内存确实耗尽时,你的恢复代码也无法分配内存。这是因为 Python 异常对象持有一个指向错误发生时的堆栈帧的引用,这会阻止原始张量对象被释放。解决方案是将 OOM(内存不足)恢复代码移到 except
子句之外。
oom = False
try:run_model(batch_size)
except RuntimeError: # Out of memoryoom = Trueif oom:for _ in range(batch_size):run_model(1)
我的数据加载工作进程返回相同的随机数
问题可能出在您使用了其他库来生成数据集中的随机数,并且工作进程是通过fork
方式启动的。请参阅torch.utils.data.DataLoader
文档,了解如何通过其worker_init_fn
选项正确设置工作进程的随机种子。
我的循环神经网络在数据并行环境下无法正常工作
在将打包序列 -> 循环网络 -> 解包序列
的模式与Module
中的DataParallel
或data_parallel()
一起使用时存在一个细节问题。每个设备上的forward()
函数接收的输入只是整个输入的一部分。由于解包操作torch.nn.utils.rnn.pad_packed_sequence()
默认只会填充到该设备上看到的最长输入长度,因此在结果汇总时会出现尺寸不匹配的情况。
解决方法是可以利用pad_packed_sequence()
的total_length
参数,确保所有forward()
调用返回相同长度的序列。例如,你可以这样编写代码:
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequenceclass MyModule(nn.Module):# ... __init__, other methods, etc.# padded_input is of shape [B x T x *] (batch_first mode) and contains# the sequences sorted by lengths# B is the batch size# T is max sequence lengthdef forward(self, padded_input, input_lengths):total_length = padded_input.size(1) # get the max sequence lengthpacked_input = pack_padded_sequence(padded_input, input_lengths, batch_first=True)packed_output, _ = self.my_lstm(packed_input)output, _ = pad_packed_sequence(packed_output, batch_first=True, total_length=total_length)return outputm = MyModule().cuda()
dp_m = nn.DataParallel(m)
此外,当批次维度为 1
(即 batch_first=False
)时,在使用数据并行时需要格外注意。这种情况下,pack_padded_sequence
的第一个参数 padding_input
的形状为 [T x B x *]
,应该沿维度 1
进行分散;而第二个参数 input_lengths
的形状为 [B]
,应该沿维度 0
进行分散。此时需要额外的代码来处理张量形状的转换。
FSDP 说明文档
https://pytorch.org/docs/stable/notes/fsdp.html
FSDP 预取机制的细节
要实现 forward
阶段的 all-gather 操作与 forward
计算的并行重叠,有两种可能的机制:
1、隐式前向预取(默认启用)
2、显式前向预取(通过 forward_prefetch=True
启用)
隐式 forward
预取 是指通过将 all-gather 操作分配到独立的 CUDA 流中,使得 all-gather 操作能够与 CPU 视角中先发出的 forward
计算重叠。例如,执行顺序为:第 0 层 all-gather → 第 0 层 forward
计算 → 第 1 层 all-gather → … 时,第 1 层的 all-gather 可以与第 0 层的 forward
计算重叠,尽管 CPU 线程是在之后才发出该 all-gather 指令的。(注意:第一个 all-gather 操作无法与任何计算重叠)
显式 forward
预取 是指改变 CPU 线程的指令发出顺序,例如:第 0 层 all-gather → 第 1 层 all-gather → 第 0 层 forward
计算 → …。在 eager 模式下,当仍在执行第 0 层时,通常无法预知下一层(例如示例中的第 1 层)是什么。因此,显式 forward
预取仅适用于每次迭代执行顺序固定的模型(我们有时称为"静态图")。不满足此约束的模型示例可参考 FLAVA。
显式 forward
预取仅能节省发出某层 forward
计算内核的时间,代价是必须在当前 all-gather 输出张量仍在使用时就预分配下一个 all-gather 的输出张量。通过在当前 forward
计算内核之前发出下一个 all-gather 指令,可以让下一个 all-gather 更早地在 GPU 上启动。对于大多数 LLM 工作负载而言,这种情况并不常见,因此没有启用 forward_prefetch=True
的必要性。
相比之下,在 backward
阶段,我们必须使用显式 backward
预取,否则通信与计算将完全无法重叠。这是因为我们为 all-gather 和 reduce-scatter 使用同一个 NCCL 进程组(部分原因是早期 NCCL 版本中,同一设备上相同 rank 的多个并发操作不安全)。单个 NCCL 进程组意味着只有一个内部 NCCL 流,reduce-scatter 和 all-gather 操作会串行执行。因此,除非我们显式调整 CPU 指令顺序为:下一个 all-gather → 当前 reduce-scatter,否则当前 reduce-scatter 会阻塞下一个 all-gather,进而阻塞下一个 backward
计算,导致当前 reduce-scatter 无法与其他操作重叠。
通信负载大小
在FSDP中,通信操作包括:
1、forward
阶段对参数进行all-gather操作
2、backward
阶段对参数进行all-gather操作
3、backward
阶段对梯度进行reduce-scatter操作
如果使用了激活检查点(checkpoint()
),则不会产生额外通信,因为参数在backward
阶段无论如何都会被预取。
FSDP设计中,每个rank的通信负载确定方式如下:每次调用FullyShardedDataParallel
会创建一个通信组,包含module.parameters()
中除已分配给嵌套FullyShardedDataParallel
实例外的所有参数。例如对于Llama模型,如果对每个transformer块和根模块都应用FullyShardedDataParallel
,那么每个transformer块会对应一个通信组,最后初始嵌入层和最终线性层会组成另一个通信组。
每个通信组对应一次all-gather调用和一次reduce-scatter调用。因此,FullyShardedDataParallel
的应用方式决定了通信量大小。通常对每个transformer块应用FSDP是LLMs的良好启发式方法,在当前设计下很难做得更好。
举例说明:假设一个基于Transformer的模型在8个GPU上进行分片,分片仅发生在transformer块级别,每个transformer块包含16亿参数且为fp32格式(每个参数4字节)。这意味着分片后每个transformer块在每个rank上包含2亿参数。
forward
阶段将以0.2*4=0.8GB
的块大小进行all-gather通信backward
阶段将进行2次0.8GB
的通信(1次all-gather和1次reduce-scatter)
换句话说,每次通信负载为0.8GB
,共进行3次通信。如果模型包含10个transformer块,则总共需要进行30次通信,总量为30*0.8=24GB
。
形式化表达:每个rank每次通信的负载大小为total_transformer_block_params_in_B*dtype_bytes/num_gpus
(单位GB)。
请注意,本例未包含嵌入层所需的额外通信(这部分也应计入)。具体计算取决于输入和输出嵌入是否绑定——如果不绑定,通信量将增加2倍。
FSDP缓冲区大小
首先,我们介绍通信分配的缓冲区:
当前forward
需要2倍全收集(all-gather)缓冲区大小。原因如下:
如FSDP预取细节所述,在显式forward
预取(forward_prefetch=True
)情况下,第0层全收集→第0层前向计算→第1层全收集的流程需要2个全收集大小的缓冲区。因为一个缓冲区用于当前forward
,另一个用于预取操作。
而隐式forward
预取(forward_prefetch=False
,默认情况)理论上只需1个缓冲区,但实际上仍需要2倍全收集缓冲区。这是因为在扁平参数(flat-parameter)的FSDP设计中,我们不会从全收集缓冲区复制数据。用于计算的参数直接映射到全收集缓冲区(实际上,"扁平参数"的主要优势正是这个特性)。因此当"第1层全收集"与"第0层前向计算"重叠时,"第0层前向计算"使用的是映射到"第0层全收集"缓冲区的参数。
那么自然会问:何时需要使用forward_prefetch=False
?对于静态图模型(如大多数LLM),存在一个主要技术原因。实际上我们快速添加这个选项是为了某些CPU密集型内部模型,但尚未在单元测试中验证所有代码路径,因此对其可靠性信心较低。forward_prefetching=False
可能更易理解,因为我们不必检查记录的前向顺序作为可能的"故障模式";模块的全收集操作始终可以在性能分析追踪中通过自身的record_function
标签找到。
当前backward
需要至少2倍全收集缓冲区大小,可能更多。原因如下:
当前FSDP设计使用recordStream
管理不同流之间的内存分配,这可能导致比预期更多的内存使用。具体多出的量是"非确定性的",取决于GPU内核与CPU的相对时序。limit_all_gathers=True
参数可以缓解此问题——详见FSDP与CUDACachingAllocator讨论。
现有FSDP与autograd的协作方式:
- FSDP全收集作为autograd叶节点的
flat_param
- 调用
torch.split
获取对应原始参数的1D视图 - 对每个1D分割调用
torch.view
恢复为ND形状 - 因此在
backward
中会产生ViewBackward
(ND→1D)和SplitWithSizesBackward
(实际上是concat操作)。每个梯度作为独立分配计算,最终通过显式concat构建reduce-scatter输入缓冲区。这意味着在峰值内存点时reduce-scatter需要2倍缓冲区大小。
总结来说,backward
需要约2倍reduce-scatter缓冲区大小加上recordStream
的影响。
其次讨论额外缓冲区:
当从所有rank收集分片参数后,需要额外的完整参数缓冲区(total_transformer_block_params_in_Bdtype_bytes)。延续之前的例子,如果每个transformer块有16亿参数且为fp32格式,则需要1.64=6.4GB缓冲区。
由于存在一个使用中的缓冲区和另一个预取缓冲区,实际需要2个这样的缓冲区。
汇总如下:
1、2倍通信缓冲区:total_transformer_block_params_in_B*dtype_bytes/num_gpus
2、2倍未分片transformer块参数缓冲区:total_transformer_block_params_in_B*dtype_bytes
以示例数据计算:
1、2*1.6*4/8=1.6GB
2、2*1.6*4=12.8GB
总计14.4GB
最后简要讨论未计入计算的嵌入层处理:
根据笔记中"通信缓冲区大小按以下规则确定"的说明,分析如下:
- 假设对根模块(如
Transformer
类)应用FSDP,并对每个transformer块(如TransformerBlock
类)也应用FSDP - 通常嵌入层和最终线性投影是根
Transformer
类的直接子模块 - 根据规则,嵌入层和最终线性投影会被分配到根
Transformer
的扁平参数 - 另有一条特殊规则:根模块在前向计算后不释放参数,因为它们在后向计算中会立即被全收集
- 这意味着包含嵌入层和最终投影的根扁平参数在前向开始时全收集,并保留在GPU内存中直到后向结束
- 如果嵌入层和最终线性层未权重共享,可对它们分别应用FSDP。权重共享的参数必须属于同一扁平参数(否则会被重复计算),这样嵌入层可在前向使用后释放,仅在后向结束时全收集
- 这说明了嵌套
nn.Module
结构会影响全收集/释放调度,进而影响内存/吞吐性能——每个FSDP模块分配其module.parameters()
中的参数(已被嵌套FSDP模块分配的除外),且FSDP模块的forward
定义了其参数的"存活"区间。
英特尔 GPU 入门指南
https://pytorch.org/docs/stable/notes/get_start_xpu.html
硬件要求
英特尔数据中心 GPU
设备 | Red Hat* Enterprise Linux* 9.2 | SUSE Linux Enterprise Server* 15 SP5 | Ubuntu* Server 22.04 (内核版本 >= 5.15 LTS) |
---|---|---|---|
Intel® Data Center GPU Max 系列 (代号: Ponte Vecchio) | 支持 | 支持 | 支持 |
英特尔客户端 GPU
支持的操作系统 | 已验证硬件 |
---|---|
Windows 10/11 和 Ubuntu 24.10 | Intel® Arc A-Series Graphics (CodeName: Alchemist) Intel® Arc B-Series Graphics (CodeName: Battlemage) Intel® Core™ Ultra Processors with Intel® Arc™ Graphics (CodeName: Meteor Lake) Intel® Core™ Ultra 200V Series with Intel® Arc™ Graphics (CodeName: Lunar Lake) Intel® Core™ Ultra Series 2 Processors with Intel® Arc™ Graphics (CodeName: Arrow Lake) |
Ubuntu 24.04 和 WSL2 (Ubuntu 24.04) | Intel® Arc A-Series Graphics (CodeName: Alchemist) Intel® Core™ Ultra Processors with Intel® Arc™ Graphics (CodeName: Meteor Lake) Intel® Core™ Ultra 200V Series with Intel® Arc™ Graphics (CodeName: Lunar Lake) Intel® Core™ Ultra Series 2 Processors with Intel® Arc™ Graphics (CodeName: Arrow Lake) |
从 PyTorch* 2.5 版本开始,英特尔 GPU (原型阶段) 已支持 Linux 和 Windows 平台上的英特尔® 客户端 GPU 和英特尔® 数据中心 GPU Max 系列。这一更新将英特尔 GPU 和 SYCL* 软件栈引入官方 PyTorch 生态,通过一致的用户体验支持更多 AI 应用场景。
软件前提条件
要在英特尔GPU上使用PyTorch,您需要先安装英特尔GPU驱动程序。安装指南请参阅英特尔GPU驱动程序安装。
若通过二进制文件安装,请跳过安装Intel® Deep Learning Essentials的步骤。若从源码构建,请参考PyTorch在英特尔GPU上的安装前提条件获取英特尔GPU驱动程序和Intel® Deep Learning Essentials的安装说明。
安装
二进制文件
现在我们已经安装了 Intel GPU 驱动,可以使用以下命令在 Linux 系统上安装 pytorch
、torchvision
和 torchaudio
。
针对发布的 wheel 包:
pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/xpu
对于夜间构建的wheel包
pip3 install --pre torch torchvision torchaudio --index-url https://download.pytorch.org/whl/nightly/xpu
从源码构建
现在我们已经安装了 Intel GPU 驱动程序和 Intel® Deep Learning Essentials。接下来按照指南从源码构建 pytorch
、torchvision
和 torchaudio
。
- 构建
torch
的源码请参考 PyTorch 安装指南:从源码构建。 - 构建
torchvision
的源码请参考 Torchvision 安装指南:从源码构建。 - 构建
torchaudio
的源码请参考 Torchaudio 安装指南:从源码构建。
检查 Intel GPU 的可用性
要检查您的 Intel GPU 是否可用,通常可以使用以下代码:
import torch
torch.xpu.is_available() # torch.xpu is the API for Intel GPU support
如果输出结果为 False
,请重新检查 Intel GPU 的驱动程序安装情况。
最小代码改动
如果您正在从 cuda
迁移代码,只需将所有 cuda
引用改为 xpu
即可。例如:
# CUDA CODE
tensor = torch.tensor([1.0, 2.0]).to("cuda")# CODE for Intel GPU
tensor = torch.tensor([1.0, 2.0]).to("xpu")
以下是PyTorch在Intel GPU上的支持与限制要点:
1、支持训练和推理工作流程。
2、支持即时执行模式(eager mode)和torch.compile
。从PyTorch* 2.7版本开始,Windows平台上的Intel GPU也支持torch.compile
功能,具体可参考如何在Windows上使用CPU/XPU运行Inductor。
3、支持FP32、BF16、FP16等数据类型及自动混合精度(AMP)功能。
示例
本节包含推理和训练工作流的使用示例。
推理示例
以下是几个推理工作流程的示例。
使用FP32进行推理
import torch
import torchvision.models as modelsmodel = models.resnet50(weights="ResNet50_Weights.DEFAULT")
model.eval()
data = torch.rand(1, 3, 224, 224)model = model.to("xpu")
data = data.to("xpu")with torch.no_grad():model(data)print("Execution finished")
使用AMP进行推理
import torch
import torchvision.models as modelsmodel = models.resnet50(weights="ResNet50_Weights.DEFAULT")
model.eval()
data = torch.rand(1, 3, 224, 224)model = model.to("xpu")
data = data.to("xpu")with torch.no_grad():d = torch.rand(1, 3, 224, 224)d = d.to("xpu")# set dtype=torch.bfloat16 for BF16with torch.autocast(device_type="xpu", dtype=torch.float16, enabled=True):model(data)print("Execution finished")
使用 torch.compile
进行推理
import torch
import torchvision.models as models
import timemodel = models.resnet50(weights="ResNet50_Weights.DEFAULT")
model.eval()
data = torch.rand(1, 3, 224, 224)
ITERS = 10model = model.to("xpu")
data = data.to("xpu")for i in range(ITERS):start = time.time()with torch.no_grad():model(data)torch.xpu.synchronize()end = time.time()print(f"Inference time before torch.compile for iteration {i}: {(end-start)*1000} ms")model = torch.compile(model)
for i in range(ITERS):start = time.time()with torch.no_grad():model(data)torch.xpu.synchronize()end = time.time()print(f"Inference time after torch.compile for iteration {i}: {(end-start)*1000} ms")print("Execution finished")
训练示例
这里提供几个训练工作流示例。
使用 FP32 进行训练
import torch
import torchvisionLR = 0.001
DOWNLOAD = True
DATA = "datasets/cifar10/"transform = torchvision.transforms.Compose([torchvision.transforms.Resize((224, 224)),torchvision.transforms.ToTensor(),torchvision.transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),]
)
train_dataset = torchvision.datasets.CIFAR10(root=DATA,train=True,transform=transform,download=DOWNLOAD,
)
train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=128)
train_len = len(train_loader)model = torchvision.models.resnet50()
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=LR, momentum=0.9)
model.train()
model = model.to("xpu")
criterion = criterion.to("xpu")print(f"Initiating training")
for batch_idx, (data, target) in enumerate(train_loader):data = data.to("xpu")target = target.to("xpu")optimizer.zero_grad()output = model(data)loss = criterion(output, target)loss.backward()optimizer.step()if (batch_idx + 1) % 10 == 0:iteration_loss = loss.item()print(f"Iteration [{batch_idx+1}/{train_len}], Loss: {iteration_loss:.4f}")
torch.save({"model_state_dict": model.state_dict(),"optimizer_state_dict": optimizer.state_dict(),},"checkpoint.pth",
)print("Execution finished")
使用 AMP 进行训练
注意:使用 GradScaler
进行训练需要硬件支持 FP64
。Intel® Arc™ A 系列显卡原生不支持 FP64
。如果在 Intel® Arc™ A 系列显卡上运行工作负载,请禁用 GradScaler
。
import torch
import torchvisionLR = 0.001
DOWNLOAD = True
DATA = "datasets/cifar10/"use_amp=Truetransform = torchvision.transforms.Compose([torchvision.transforms.Resize((224, 224)),torchvision.transforms.ToTensor(),torchvision.transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),]
)
train_dataset = torchvision.datasets.CIFAR10(root=DATA,train=True,transform=transform,download=DOWNLOAD,
)
train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=128)
train_len = len(train_loader)model = torchvision.models.resnet50()
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=LR, momentum=0.9)
scaler = torch.amp.GradScaler(device="xpu", enabled=use_amp)model.train()
model = model.to("xpu")
criterion = criterion.to("xpu")print(f"Initiating training")
for batch_idx, (data, target) in enumerate(train_loader):data = data.to("xpu")target = target.to("xpu")# set dtype=torch.bfloat16 for BF16with torch.autocast(device_type="xpu", dtype=torch.float16, enabled=use_amp):output = model(data)loss = criterion(output, target)scaler.scale(loss).backward()scaler.step(optimizer)scaler.update()optimizer.zero_grad()if (batch_idx + 1) % 10 == 0:iteration_loss = loss.item()print(f"Iteration [{batch_idx+1}/{train_len}], Loss: {iteration_loss:.4f}")torch.save({"model_state_dict": model.state_dict(),"optimizer_state_dict": optimizer.state_dict(),},"checkpoint.pth",
)print("Execution finished")
使用 torch.compile
进行训练
import torch
import torchvisionLR = 0.001
DOWNLOAD = True
DATA = "datasets/cifar10/"transform = torchvision.transforms.Compose([torchvision.transforms.Resize((224, 224)),torchvision.transforms.ToTensor(),torchvision.transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),]
)
train_dataset = torchvision.datasets.CIFAR10(root=DATA,train=True,transform=transform,download=DOWNLOAD,
)
train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=128)
train_len = len(train_loader)model = torchvision.models.resnet50()
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=LR, momentum=0.9)
model.train()
model = model.to("xpu")
criterion = criterion.to("xpu")
model = torch.compile(model)print(f"Initiating training with torch compile")
for batch_idx, (data, target) in enumerate(train_loader):data = data.to("xpu")target = target.to("xpu")optimizer.zero_grad()output = model(data)loss = criterion(output, target)loss.backward()optimizer.step()if (batch_idx + 1) % 10 == 0:iteration_loss = loss.item()print(f"Iteration [{batch_idx+1}/{train_len}], Loss: {iteration_loss:.4f}")
torch.save({"model_state_dict": model.state_dict(),"optimizer_state_dict": optimizer.state_dict(),},"checkpoint.pth",
)print("Execution finished")
梯度检验机制
本文概述了 gradcheck()
和 gradgradcheck()
函数的工作原理。
内容涵盖:
- 实数函数和复数函数的前向模式与反向模式自动微分
- 高阶导数计算
- 默认梯度检验行为
- 传递
fast_mode=True
参数时的快速梯度检验模式(下文简称快速梯度检验)
符号约定与背景信息
在本说明中,我们将采用以下约定:
1、 和 uiuiui
为实值向量,zzz
为复值向量,可表示为两个实值向量 z = a + i b
。
2、NNN
和 MMM
为两个整数,分别表示输入和输出空间的维度。
3、f: R^N → R^M
是我们的基础实到实函数,满足 y = f(x)
。
4、g: C^N → R^M
是我们的基础复到实函数,满足 y = g(z)
。
对于简单的实到实情况,我们将 f
的雅可比矩阵记为 J_f
,其大小为 M×N
。该矩阵包含所有偏导数,其中位置 (i,j)
的元素为 ∂y_i/∂x_j
。反向模式自动微分(AD)会针对给定大小为 M
的向量 vvv
计算 v^T J_f
。而正向模式 AD 则针对给定大小为 N
的向量 uuu
计算 J_f u
。
对于包含复数的函数,情况要复杂得多。这里我们仅提供概要,完整描述可参阅 Autograd for Complex Numbers。
由于复可微性(柯西-黎曼方程)的约束对所有实值损失函数过于严格,我们转而采用 Wirtinger 微积分。在 Wirtinger 微积分的基本设定中,链式法则需要同时使用 Wirtinger 导数(下文称为 W
)和共轭 Wirtinger 导数(下文称为 CW
)。尽管名称如此,但一般情况下 W
和 CW
并非互为复共轭,因此两者都需要传播。
为避免同时传播两个值,在反向模式 AD 中,我们始终假设被求导的函数要么是实值函数,要么是某个更大实值函数的一部分。这一假设意味着反向传播过程中计算的所有中间梯度也都与实值函数相关联。实际上,在进行优化时这一假设并不受限,因为此类问题需要实值目标(复数本身没有自然的序关系)。
在此假设下,利用 W
和 CW
的定义可以证明 W = CW^*
(此处 *
表示复共轭),因此实际上只需反向传播其中一个值,另一个值可以轻松恢复。为简化内部计算,PyTorch 使用 2*CW
作为反向传播的值,并在用户请求梯度时返回该值。与实数情况类似,当输出实际位于 R^M
时,反向模式 AD 并不计算 2*CW
,而是针对给定向量 v ∈ R^M
计算 v^T (2 * CW)
。
对于正向模式 AD,我们采用类似的逻辑,假设函数是某个输入位于 R
的更大函数的一部分。在此假设下,我们可以类似地断言每个中间结果都对应于输入位于 R
的函数,此时利用 W
和 CW
的定义可以证明中间函数满足 W = CW
。为确保正向和反向模式在一维函数的基本情况下计算结果一致,正向模式同样计算 2*CW
。与实数情况类似,当输入实际位于 R^N
时,正向模式 AD 并不计算 2*CW
,而是针对给定向量 u ∈ R^N
计算 (2 * CW) u
。
默认反向模式梯度检查行为
实数到实数函数
为了测试函数 f:ℝᴺ → ℝᴵᴹ, x → y,我们通过两种方式重构完整的 M×N 雅可比矩阵 J_f:解析法和数值法。解析版本使用我们的反向模式自动微分(AD),而数值版本采用有限差分法。随后对这两个重构的雅可比矩阵进行逐元素比对以验证一致性。
默认实数输入数值求导方法
考虑一维函数的基本情况(N=M=1),我们可以采用维基百科文章中的基本有限差分公式。为了获得更好的数值特性,这里使用"中心差分法":
∂y/∂x ≈ [f(x+eps) - f(x-eps)] / (2*eps)
该公式可以轻松推广到多输出场景(M>1),此时∂y/∂x 是一个尺寸为 M×1 的列向量,与 f(x+eps) 的形式相同。在这种情况下,上述公式可以直接复用,仅需两次用户函数求值(即计算 f(x+eps) 和 f(x-eps))即可近似得到完整的雅可比矩阵。
处理多输入情况(N>1)的计算成本更高。在此场景下,我们需要逐个遍历所有输入变量,依次对 x 的每个元素施加 eps 扰动。这种方法使我们能够逐列重构 Jf 矩阵。
默认实数输入的解析评估
在解析评估中,我们利用了前文所述的反向模式自动微分(AD)特性,即它能计算 vᵀJ_f。对于单输出函数,只需设 v=1 即可通过一次反向传播完整获取雅可比矩阵。
针对多输出函数,我们采用循环遍历输出的方案:每次将 v 设为对应输出的独热向量,逐个处理。这种方法能逐行重构出 J_f 矩阵。
(注:原文中的数学公式 vᵀJ_f 和变量 J_f 按代码保护原则保留原格式)
复数到实数的函数映射
为了测试一个函数 g: ℂᴺ → ℝᴹ, z ↦ y(其中 z = a + ib),我们需要重构包含 2*CW 的(复数值)矩阵。
默认复数输入数值评估
首先考虑 N=M=1 的基本情况。根据这篇研究论文(第3章)可知:
CW := \frac{\partial y}{\partial z^*} = \frac{1}{2} * (\frac{\partial y}{\partial a} + i \frac{\partial y}{\partial b})
注意,上述方程中的 \frac{\partial y}{\partial a} 和 \frac{\partial y}{\partial b} 都是 R→R 导数。为了进行数值计算,我们采用前文所述的实对实导数计算方法。这样就能计算出 CW 矩阵,然后将其乘以 2。
需要注意的是,截至撰写本文时,代码中计算该值的方式略显复杂:
# Code from https://github.com/pytorch/pytorch/blob/58eb23378f2a376565a66ac32c93a316c45b6131/torch/autograd/gradcheck.py#L99-L105
# Notation changes in this code block:
# s here is y above
# x, y here are a, b aboveds_dx = compute_gradient(eps)
ds_dy = compute_gradient(eps * 1j)
# conjugate wirtinger derivative
conj_w_d = 0.5 * (ds_dx + ds_dy * 1j)
# wirtinger derivative
w_d = 0.5 * (ds_dx - ds_dy * 1j)
d[d_idx] = grad_out.conjugate() * conj_w_d + grad_out * w_d.conj()# Since grad_out is always 1, and W and CW are complex conjugate of each other, the last line ends up computing exactly `conj_w_d + w_d.conj() = conj_w_d + conj_w_d = 2 * conj_w_d`.
默认复杂输入解析评估
由于反向模式自动微分已经精确计算了两倍的 CWCWCW 导数,我们在此直接沿用实数到实数情况下的相同技巧——当存在多个实数输出时,逐行重构矩阵。
处理复杂输出的函数
在这种情况下,用户提供的函数不符合自动微分(autograd)的基本假设——即我们计算反向自动微分的函数应为实数值函数。这意味着直接对该函数使用自动微分没有明确定义。
为解决这个问题,我们将把函数 h:PN→CMh: \mathcal{P}^N \to \mathcal{C}^Mh:PN→CM(其中 P\mathcal{P}P 可以是 R\mathcal{R}R 或 C\mathcal{C}C)的测试替换为两个函数:hrhrhr 和 hihihi,其定义为:
hr(q):=real(f(q))hi(q):=imag(f(q))\begin{aligned}
hr(q) &:= real(f(q)) \\
hi(q) &:= imag(f(q))
\end{aligned}
hr(q)hi(q):=real(f(q)):=imag(f(q))其中 q∈Pq \in \mathcal{P}q∈P。
然后,我们根据 P\mathcal{P}P 的类型(实数或复数),使用前文所述的"实数到实数"或"复数到实数"方法,分别对 hrhrhr 和 hihihi 进行基础的梯度检查。
需要注意的是,截至撰写本文时,代码并未显式创建这两个函数,而是通过手动传递 grad_out\text{grad_out}grad_out 参数到不同函数,结合 realrealreal 或 imagimagimag 函数来执行链式法则。
- 当 grad_out=1\text{grad_out} = 1grad_out=1 时,对应的是 hrhrhr 的情况。
- 当 grad_out=1j\text{grad_out} = 1jgrad_out=1j 时,对应的是 hihihi 的情况。
快速反向模式梯度检查
虽然上述梯度检查方案在确保正确性和可调试性方面表现优异,但由于需要重构完整的雅可比矩阵,其执行速度非常缓慢。本节将介绍一种在不影响正确性的前提下加速梯度检查的方法。当检测到错误时,我们可以通过添加特殊逻辑来恢复可调试性——此时运行会重构完整矩阵的默认版本,为用户提供详尽信息。
该方案的高层策略是:寻找一个既能被数值法和解析法高效计算,又能充分代表慢速梯度检查所计算的完整矩阵的标量值。这个标量值需要确保能捕捉到雅可比矩阵中的所有差异。
实函数到实函数的快速梯度检验
这里我们需要计算的标量是 vᵀJᶠu,其中 v ∈ ℝᴹ 是一个随机向量,u ∈ ℝᴺ 是一个随机单位范数向量。
在数值计算中,我们可以高效地通过以下公式近似计算:
Jᶠu ≈ [f(x + ueps) - f(x - ueps)] / (2*eps)
然后将该向量与 v 进行点积运算,即可得到目标标量值。
对于解析版本,我们可以直接使用反向模式自动微分来计算 vᵀJᶠ。随后通过与 u 的点积运算得到期望值。
复数到实数函数的快速梯度检验
与实数到实数的情况类似,我们希望进行全矩阵的降维处理。但这里的2∗CW矩阵是复数矩阵,因此我们需要将其与复数标量进行比较。
由于数值计算中存在一些效率限制,并且为了尽量减少数值计算的次数,我们计算了以下(看似令人意外的)标量值:
s := 2 * v^T (real(CW) ur + i * imag(CW) ui)
其中,v ∈ ℝᴹ,ur ∈ ℝᴺ,ui ∈ ℝᴺ。
快速复数输入数值评估方法
我们首先探讨如何用数值方法计算sss。在此过程中,需要牢记我们考虑的是映射关系 g:CN→RM,z→yg: \mathcal{C}^N \to \mathcal{R}^M, z \to yg:CN→RM,z→y,其中 z=a+ibz = a + i bz=a+ib,且 CW=12∗(∂y∂a+i∂y∂b)CW = \frac{1}{2} * (\frac{\partial y}{\partial a} + i \frac{\partial y}{\partial b})CW=21∗(∂a∂y+i∂b∂y)。我们可以将其改写为以下形式:
s=2∗vT(real(CW)ur+i∗imag(CW)ui)=2∗vT(12∗∂y∂aur+i∗12∗∂y∂bui)=vT(∂y∂aur+i∗∂y∂bui)=vT((∂y∂aur)+i∗(∂y∂bui))\begin{aligned}
s &= 2 * v^T (real(CW) ur + i * imag(CW) ui) \\
&= 2 * v^T (\frac{1}{2} * \frac{\partial y}{\partial a} ur + i * \frac{1}{2} * \frac{\partial y}{\partial b} ui) \\
&= v^T (\frac{\partial y}{\partial a} ur + i * \frac{\partial y}{\partial b} ui) \\
&= v^T ((\frac{\partial y}{\partial a} ur) + i * (\frac{\partial y}{\partial b} ui))
\end{aligned}
s=2∗vT(real(CW)ur+i∗imag(CW)ui)=2∗vT(21∗∂a∂yur+i∗21∗∂b∂yui)=vT(∂a∂yur+i∗∂b∂yui)=vT((∂a∂yur)+i∗(∂b∂yui))通过这个公式可以看出,∂y∂aur\frac{\partial y}{\partial a} ur∂a∂yur 和 ∂y∂bui\frac{\partial y}{\partial b} ui∂b∂yui 的计算方式与实数到实数情况下的快速版本完全相同。
当这些实数值量计算完成后,我们可以重构右侧的复数向量,并与实值向量 vvv 进行点积运算。
快速复数输入解析评估
对于解析情况,问题更为简单,我们将公式重写为:
s=2∗vT(real(CW)ur+i∗imag(CW)ui)=vTreal(2∗CW)ur+i∗vTimag(2∗CW)ui)=real(vT(2∗CW))ur+i∗imag(vT(2∗CW))ui\begin{aligned}
s &= 2 * v^T (real(CW) ur + i * imag(CW) ui) \\
&= v^T real(2 * CW) ur + i * v^T imag(2 * CW) ui) \\
&= real(v^T (2 * CW)) ur + i * imag(v^T (2 * CW)) ui
\end{aligned}
s=2∗vT(real(CW)ur+i∗imag(CW)ui)=vTreal(2∗CW)ur+i∗vTimag(2∗CW)ui)=real(vT(2∗CW))ur+i∗imag(vT(2∗CW))ui因此,我们可以利用反向模式自动微分(AD)提供的有效方法来计算 vT(2∗CW)v^T (2 * CW)vT(2∗CW),然后将其实部与 ururur 进行点积,虚部与 uiuiui 进行点积,最终重构出复数标量 sss。
为何不使用复数uuu
此时,你可能会疑惑为何我们没有选择复数uuu,而是直接计算了简化形式2∗vTCWu′2 * v^T CW u’2∗vTCWu′。
为了深入探讨这一点,本段我们将采用复数形式的uuu,记作u′=ur′+iui′u’ = ur’ + i ui’u′=ur′+iui′。
使用这种复数u′u’u′时,问题在于进行数值计算时需要求解:
2∗CWu′=(∂y∂a+i∂y∂b)(ur′+iui′)=∂y∂aur′+i∂y∂aui′+i∂y∂bur′−∂y∂bui′\begin{aligned}
2*CW u’ &= (\frac{\partial y}{\partial a} + i \frac{\partial y}{\partial b})(ur’ + i ui’) \\
&= \frac{\partial y}{\partial a} ur’ + i \frac{\partial y}{\partial a} ui’ + i \frac{\partial y}{\partial b} ur’ - \frac{\partial y}{\partial b} ui’
\end{aligned}
2∗CWu′=(∂a∂y+i∂b∂y)(ur′+iui′)=∂a∂yur′+i∂a∂yui′+i∂b∂yur′−∂b∂yui′这将需要四次实对实有限差分计算(是前文提出方法的两倍)。
由于该方案并未增加自由度(实值变量数量相同),且我们追求最快的计算速度,因此采用了上文中的另一种表述形式。
支持复数输出的快速梯度检验
与慢速情况类似,我们会考虑两个实值函数,并针对每个函数应用上文对应的求导法则。
Gradgradcheck 实现
PyTorch 还提供了一个用于验证二阶梯度的实用工具。其目的是确保反向传播实现同样可正确微分,并能计算出正确结果。
该功能的实现原理是:考虑函数 F:x,v→vTJfF: x, v \to v^T J_fF:x,v→vTJf,并在此函数上应用前文定义的 gradcheck。需要注意的是,这里的 vvv 只是一个与 f(x)f(x)f(x) 同类型的随机向量。
gradgradcheck 的快速版本是通过在该函数 FFF 上使用快速版 gradcheck 来实现的。
HIP (ROCm) 语义说明
https://pytorch.org/docs/stable/notes/hip.html
ROCm™ 是 AMD 推出的开源软件平台,专注于 GPU 加速的高性能计算和机器学习领域。HIP 作为 ROCm 的 C++ 方言,旨在简化将 CUDA 应用程序转换为可移植 C++ 代码的过程。该技术主要应用于两种场景:
1、将 PyTorch 等现有 CUDA 应用迁移至可移植 C++ 代码
2、开发需要兼容 AMD 和 NVIDIA 硬件的新项目
HIP 接口复用 CUDA 接口
PyTorch 的 HIP 实现特意复用了现有的 torch.cuda
接口。这种做法能显著加速现有 PyTorch 代码和模型的移植过程,因为几乎不需要修改代码(即使需要改动也很少)。
CUDA 语义中的示例在 HIP 环境下会完全同样地运行:
cuda = torch.device('cuda') # Default HIP device
cuda0 = torch.device('cuda:0') # 'rocm' or 'hip' are not valid, use 'cuda'
cuda2 = torch.device('cuda:2') # GPU 2 (these are 0-indexed)x = torch.tensor([1., 2.], device=cuda0)
# x.device is device(type='cuda', index=0)
y = torch.tensor([1., 2.]).cuda()
# y.device is device(type='cuda', index=0)with torch.cuda.device(1):# allocates a tensor on GPU 1a = torch.tensor([1., 2.], device=cuda)# transfers a tensor from CPU to GPU 1b = torch.tensor([1., 2.]).cuda()# a.device and b.device are device(type='cuda', index=1)# You can also use ``Tensor.to`` to transfer a tensor:b2 = torch.tensor([1., 2.]).to(device=cuda)# b.device and b2.device are device(type='cuda', index=1)c = a + b# c.device is device(type='cuda', index=1)z = x + y# z.device is device(type='cuda', index=0)# even within a context, you can specify the device# (or give a GPU index to the .cuda call)d = torch.randn(2, device=cuda2)e = torch.randn(2).to(cuda2)f = torch.randn(2).cuda(cuda2)# d.device, e.device, and f.device are all device(type='cuda', index=2)
检查 HIP 支持
无论你使用的是 PyTorch 的 CUDA 还是 HIP 版本,调用 is_available()
的结果都相同。如果你使用的 PyTorch 版本支持 GPU,该函数会返回 True。如果需要确认当前使用的 PyTorch 版本,请参考以下示例:
if torch.cuda.is_available() and torch.version.hip:# do something specific for HIP
elif torch.cuda.is_available() and torch.version.cuda:# do something specific for CUDA
ROCm 上的 TensorFloat-32 (TF32)
ROCm 不支持 TF32。
内存管理
PyTorch 使用缓存内存分配器来加速内存分配。这种方式能实现快速内存释放而无需设备同步。但分配器管理的未使用内存在 rocm-smi
中仍会显示为已占用状态。您可以通过以下方法监控内存使用情况:
- 使用
memory_allocated()
和max_memory_allocated()
监控张量占用的内存 - 使用
memory_reserved()
和max_memory_reserved()
监控缓存分配器管理的总内存
调用 empty_cache()
会释放 PyTorch 中所有未使用的缓存内存,这些内存可被其他 GPU 应用程序使用。但已分配给张量的 GPU 内存不会被释放,因此不会增加 PyTorch 可用的 GPU 内存总量。
对于高级用户,我们提供更全面的内存分析工具:
- 通过
memory_stats()
进行内存统计 - 通过
memory_snapshot()
捕获内存分配器状态的完整快照,帮助理解代码产生的底层分配模式
调试内存错误时,可在环境中设置以下变量禁用缓存:
PYTORCH_NO_HIP_MEMORY_CACHING=1
- 为便于移植,也接受
PYTORCH_NO_CUDA_MEMORY_CACHING=1
hipBLAS 工作空间
对于每个 hipBLAS 句柄与 HIP 流的组合,当该组合执行需要工作空间的 hipBLAS 内核时,系统会分配一个 hipBLAS 工作空间。为了避免重复分配工作空间,这些空间不会被释放,除非调用 torch._C._cuda_clearCublasWorkspaces()
函数(注意该函数同样适用于 CUDA 或 HIP)。
每个分配的工作空间大小可通过环境变量 HIPBLAS_WORKSPACE_CONFIG
指定,格式为 :[SIZE]:[COUNT]
。例如,环境变量 HIPBLAS_WORKSPACE_CONFIG=:4096:2:16:8
表示总大小为 2 * 4096 + 8 * 16 KiB
(即 8 MiB)。默认工作空间大小为 32 MiB;MI300 及更新型号默认值为 128 MiB。若要强制 hipBLAS 不使用工作空间,可设置为 HIPBLAS_WORKSPACE_CONFIG=:0:0
。为方便起见,系统也接受 CUBLAS_WORKSPACE_CONFIG
作为等效配置。
hipFFT/rocFFT 计划缓存
不支持设置 hipFFT/rocFFT 计划缓存的大小。
torch.distributed 后端支持
目前,ROCm 平台上仅支持 torch.distributed 的 “nccl” 和 “gloo” 两种后端。
C++中CUDA API到HIP API的映射
请参考:https://rocmdocs.amd.com/en/latest/Programming_Guides/HIP_API_Guide.html
注意:CUDA_VERSION
宏、cudaRuntimeGetVersion
和cudaDriverGetVersion
API在语义上并不对应HIP_VERSION
宏、hipRuntimeGetVersion
和hipDriverGetVersion
API的相同值。在进行版本检查时,请不要互换使用它们。
例如:不要使用
#if defined(CUDA_VERSION) && CUDA_VERSION >= 11000
来隐式排除ROCm/HIP,
而应使用以下方式避免为ROCm/HIP执行代码路径:
#if defined(CUDA_VERSION) && CUDA_VERSION >= 11000 && !defined(USE_ROCM)
或者,如果需要为ROCm/HIP执行代码路径:
#if (defined(CUDA_VERSION) && CUDA_VERSION >= 11000) || defined(USE_ROCM)
又或者,如果仅需要针对特定HIP版本为ROCm/HIP执行代码路径:
#if (defined(CUDA_VERSION) && CUDA_VERSION >= 11000) || (defined(USE_ROCM) && ROCM_VERSION >= 40300)
请参阅CUDA语义文档
对于本文档未列出的部分,请参考CUDA语义文档:CUDA语义
启用内核断言
ROCm 平台支持内核断言功能,但出于性能考虑默认处于禁用状态。如需启用,需要从源码重新编译 PyTorch。
请在 cmake 命令参数中添加以下配置行:
-DROCM_FORCE_ENABLE_GPU_ASSERTS:BOOL=ON
大规模部署特性
https://pytorch.org/docs/stable/notes/large_scale_deployments.html
本文档探讨了在大型系统中运行PyTorch,或在组织内多系统协同使用PyTorch时可能用到的若干扩展技巧。
内容不涉及模型生产部署相关主题,请参考 torch.jit
或对应教程。
文档假设您满足以下任一条件:
1、在组织内从源码构建PyTorch
2、能够静态链接额外代码并在PyTorch运行时加载
因此,大多数钩子以C++ API形式暴露,可通过集中位置(例如静态初始化代码)一次性触发。
全集群算子性能分析
PyTorch 提供了 torch.autograd.profiler
模块,能够按需测量单个算子的执行时间。开发者可以利用相同机制,对任何运行 PyTorch 的进程进行"持续开启"的性能测量。该功能对于收集指定进程或整个机器集群中 PyTorch 工作负载的运行信息非常有用。
通过 torch::addGlobalCallback
可以添加针对任意算子调用的新回调函数。这些钩子函数会接收描述调用上下文(例如算子名称)的 torch::RecordFunction
结构体作为参数。若启用记录功能,RecordFunction::inputs()
会以 torch::IValue
变体类型的形式包含函数参数。请注意,记录输入参数会产生较高开销,因此需要显式启用。
算子回调函数还可以通过 c10::ThreadLocalDebugInfo::get()
接口获取指向调试信息结构的指针。这些调试信息可以通过 at::DebugInfoGuard
对象预先设置。调试信息会在前向传播(包括异步的 fork
任务)和反向传播过程中传递,可用于将执行环境的相关信息(例如模型ID)从应用高层传递到算子回调层。
调用回调函数会产生额外开销,因此通常建议采用随机采样方式记录算子调用。这可以通过向 torch::addGlobalCallback
传入可选采样率参数来实现,该设置可按回调函数单独配置。
请注意,addGlobalCallback
不是线程安全的,只能在没有任何 PyTorch 算子运行时调用。通常建议在初始化阶段一次性完成回调注册。
示例代码如下:
// Called somewhere in the program beginning
void init() {// Sample one in a hundred operator runs randomlyaddGlobalCallback(RecordFunctionCallback(&onFunctionEnter,&onFunctionExit).needsInputs(true).samplingProb(0.01));// Note, to enable observers in the model calling thread,// call enableRecordFunction() in the thread before running a model
}void onFunctionEnter(const RecordFunction& fn) {std::cerr << "Before function " << fn.name()<< " with " << fn.inputs().size() << " inputs" << std::endl;
}void onFunctionExit(const RecordFunction& fn) {std::cerr << "After function " << fn.name();
}
API 使用日志记录
在更广泛的生态系统中运行时(例如在托管作业调度器中),追踪哪些二进制文件调用了特定的 PyTorch API 通常很有帮助。系统在几个关键 API 点注入了简单的 instrumentation 机制,用于触发指定的回调函数。由于 PyTorch 通常是在一次性 Python 脚本中调用的,因此对于每个 API,回调函数在每个进程中只会触发一次。
可以通过 c10::SetAPIUsageHandler
来注册 API 使用情况的 instrumentation 处理程序。传入的参数将是一个用于标识使用点的 “api key”,例如:
python.import
表示 PyTorch 扩展导入torch.script.compile
表示触发了 TorchScript 编译
SetAPIUsageLogger([](const std::string& event_name) {std::cerr << "API was used: " << event_name << std::endl;
});
开发者注意:可以通过在C++代码中添加C10_LOG_API_USAGE_ONCE("my_api")
或在Python中添加torch._C._log_api_usage_once("my.api")
来新增API触发点。
为保存的TorchScript模型附加元数据
TorchScript模块可以保存为一个归档文件,该文件将序列化的参数和模块代码打包为TorchScript格式(参见torch.jit.save()
)。通常需要将额外信息与模型一起打包,例如模型生产者的描述或辅助文件。
这可以通过向torch.jit.save()
和torch::jit::load
传递_extra_files
参数来实现,以便在保存过程中存储和检索任意二进制数据块。由于TorchScript文件是常规的ZIP归档,额外信息会作为普通文件存储在归档的extra/
目录中。
此外,还有一个全局钩子允许向当前进程中生成的任何TorchScript归档附加额外文件。这类似于数码相机生成的JPEG元数据,可用于为模型添加生产者元数据标签。示例用法可能如下:
SetExportModuleExtraFilesHook([](const Module&) {ExtraFilesMap files;files["producer_info.json"] = "{\"user\": \"" + getenv("USER") + "\"}";return files;
});
构建环境注意事项
TorchScript 的编译过程需要访问原始 Python 文件,因为它使用了 Python 的 inspect.getsource
调用。在某些生产环境中,可能需要显式部署 .py
文件与预编译的 .pyc
文件一起使用。
常见扩展点
PyTorch API 通常采用松耦合设计,可以轻松用定制化组件替换原有模块。主要扩展方式包括:
- 用C++实现自定义运算符 - 详见教程文档
- 自定义数据读取通常可直接通过调用对应Python库集成。通过继承
Dataset
或IterableDataset
即可复用torch.utils.data
现有功能
LibTorch 稳定ABI
本文档最终将详细介绍如何使用 torch/csrc/stable 中的API。目前包含一个内部表示形式的对照表:
1、自定义扩展中的类型:终端用户自定义库中使用的类型
2、StableIValue表示:以ABI稳定的方式将类型转换为用户模型与libtorch.so之间的桥梁
3、libtorch中的类型:libtorch.so(或任何与libtorch绑定的二进制代码)内部使用的类型
4、模式类型:由模式描述的类型,我们将其视为native_functions.yaml中ATen操作和通过TORCH_LIBRARY或torch.library注册到调度器的用户自定义操作的真实来源
自定义扩展中的类型 | StableIValue表示 | libtorch中的类型 | 模式类型 |
---|---|---|---|
std::optional<S> | reinterpret_cast<(StableIValue)*>,指向递归定义的StableIValue指针 | std::optional<T> | Type? |
std::nullopt | reinterpret_cast<nullptr_t> | IValue() | None |
RAIIATH | *reinterpret_cast<uint64_t*> of AtenTensorHandle | at::Tensor | Tensor |
int32_t | *reinterpret_cast<uint64_t*> | at::ScalarType | ScalarType |
int32_t | *reinterpret_cast<uint64_t*> | at::Layout | Layout |
int32_t | *reinterpret_cast<uint64_t*> | at::MemoryFormat | MemoryFormat |
bool | *reinterpret_cast<uint64_t*> | bool | bool |
int64_t | *reinterpret_cast<uint64_t*> | int64_t | int |
double | *reinterpret_cast<uint64_t*> | double | float |
? | ? | c10::Device | Device |
? | ? | c10::Stream | Stream |
? | ? | c10::complex | complex |
? | ? | at::Scalar | Scalar |
? | ? | std::string/const char*/ivalue::ConstantString | str |
? | ? | at::Storage | Storage |
? | ? | at::Generator | Generator |
? | ? | c10::List<T> | Type[] |
? | ? | ivalue::Tuple<T> | (Type, …) |
? | ? | c10::SymInt | SymInt |
? | ? | c10::SymFloat | SymFloat |
? | ? | c10::SymBool | SymBool |
? | ? | at::QScheme | QScheme |
我们完全支持的类型是表格中已完成的行。对于有限的用例集,我们还隐式支持任何可以用64位表示的StableIValue字面类型,因为默认的reinterpret_cast会成功。即使自定义扩展中没有设备的标准定义表示形式,您也可以在自定义内核中使用StableIValue抽象来处理诸如c10::Device之类的类型。例如,自定义操作可以将StableIValue设备作为参数,并通过aoti_torch_call_dispatcher直接传递给aten操作。