大模型基础补全计划(七)---Transformer(多头注意力、自注意力、位置编码)及实例与测试

news/2025/11/16 16:35:35/文章来源:https://www.cnblogs.com/Iflyinsky/p/19228410

大模型基础补全计划(七)---Transformer(多头注意力、自注意力、位置编码)及实例与测试

PS:要转载请注明出处,本人版权所有。

PS: 这个只是基于《我自己》的理解,

如果和你的原则及想法相冲突,请谅解,勿喷。

环境说明

  无

前言


   本文是这个系列第七篇,它们是:

  • 《大模型基础补全计划(一)---重温一些深度学习相关的数学知识》 https://www.cnblogs.com/Iflyinsky/p/18717317
  • 《大模型基础补全计划(二)---词嵌入(word embedding) 》 https://www.cnblogs.com/Iflyinsky/p/18775451
  • 《大模型基础补全计划(三)---RNN实例与测试》 https://www.cnblogs.com/Iflyinsky/p/18967569
  • 《大模型基础补全计划(四)---LSTM的实例与测试(RNN的改进)》 https://www.cnblogs.com/Iflyinsky/p/19091089
  • 《大模型基础补全计划(五)---seq2seq实例与测试(编码器、解码器架构)》 https://www.cnblogs.com/Iflyinsky/p/19150535
  • 《大模型基础补全计划(六)---带注意力机制的seq2seq实例与测试(Bahdanau Attention)》 https://www.cnblogs.com/Iflyinsky/p/19184558

  本文的核心是介绍transformer模型结构,下面是transformer的网络结构示意图(图来源:见参考文献部分)。

rep_img

  从上面的架构图可以知道,在开始介绍之前,需要提前介绍多头注意力、自注意力、位置编码等前置知识。





点积注意力与自注意力


   首先我们来介绍一种新的注意力评分方式,点积注意力,其计算公式是:$$\text{Attention}(Q, K, V) = \text{Softmax}\left(\frac{Q K^T}{\sqrt{d_k}}\right) V$$。

   回到前面文章中的seq2seq中的注意力机制(一种加法注意力评分方式),其KV来自于encoder的output,Q来自于decoder的隐藏态。这个时候,我们假设一下,如果QKV都是同一种数据,那么每一次Q,都会输出对整个KV(也就是Q本身)的注意力,这种特殊的注意力被称为自注意力。

   下面是点积注意力的代码,当QKV都是同一个输入时,下面的注意力就是自注意力。

class DotProductAttention(nn.Module):  #@save"""Scaled dot product attention."""def __init__(self, dropout):super().__init__()self.dropout = nn.Dropout(dropout)# Shape of queries: (batch_size, no. of queries, d)# Shape of keys: (batch_size, no. of key-value pairs, d)# Shape of values: (batch_size, no. of key-value pairs, value dimension)# Shape of valid_lens: (batch_size,) or (batch_size, no. of queries)def forward(self, queries, keys, values, valid_lens=None):d = queries.shape[-1]# Swap the last two dimensions of keys with keys.transpose(1, 2)scores = torch.bmm(queries, keys.transpose(1, 2)) / math.sqrt(d)self.attention_weights = masked_softmax(scores, valid_lens)return torch.bmm(self.dropout(self.attention_weights), values)




位置编码


   我们知道,我们的序列数据中的每个数据都是在序列中有位置信息的,根据点积注意力的并行计算的实现,我们知道每个序列数据在同一时间进行了运算,没有序列之间的顺序信息。为了让我们的并行计算过程中,让模型感受到序列的顺序信息,因此我们需要在输入数据中含有位置信息,因此有人设计了位置编码。其代码实现如下:

class PositionalEncoding(nn.Module):  #@save"""Positional encoding."""def __init__(self, num_hiddens, dropout, max_len=1000):super().__init__()self.dropout = nn.Dropout(dropout)# Create a long enough Pself.P = torch.zeros((1, max_len, num_hiddens))X = torch.arange(max_len, dtype=torch.float32).reshape(-1, 1) / torch.pow(10000, torch.arange(0, num_hiddens, 2, dtype=torch.float32) / num_hiddens)self.P[:, :, 0::2] = torch.sin(X)self.P[:, :, 1::2] = torch.cos(X)def forward(self, X):X = X + self.P[:, :X.shape[1], :].to(X.device)return self.dropout(X)

   当我们的序列数据经过了位置编码后,在进行点积注意力计算时,我们的输入数据有了顺序信息,会让我们的模型学习到序列顺序相关的信息。





多头注意力


   注意力机制已经可以对一个数据进行有侧重的关注。但是我们希望的是,注意力机制可以对数据的多个维度的侧重关注,因为我们的数据有很多的不同维度的属性信息。例如:一句英文,其有语法信息、有语境信息、有单词之间的信息等等。

   基于这里提到的问题,有人提出了多头注意力机制。从上面的介绍来看,很好理解这个机制,就是每个头单独分析数据的属性,这样我们可以同时关注数据的多个维度的属性,提升我们的模型的理解能力。

   其代码实现如下:

class MultiHeadAttention(nn.Module):  #@save"""Multi-head attention."""def __init__(self, num_hiddens, num_heads, dropout, bias=False, **kwargs):super().__init__()self.num_heads = num_headsself.attention = DotProductAttention(dropout)self.W_q = nn.LazyLinear(num_hiddens, bias=bias)self.W_k = nn.LazyLinear(num_hiddens, bias=bias)self.W_v = nn.LazyLinear(num_hiddens, bias=bias)self.W_o = nn.LazyLinear(num_hiddens, bias=bias)def transpose_qkv(self, X):"""Transposition for parallel computation of multiple attention heads."""# Shape of input X: (batch_size, no. of queries or key-value pairs,# num_hiddens). Shape of output X: (batch_size, no. of queries or# key-value pairs, num_heads, num_hiddens / num_heads)X = X.reshape(X.shape[0], X.shape[1], self.num_heads, -1)# Shape of output X: (batch_size, num_heads, no. of queries or key-value# pairs, num_hiddens / num_heads)X = X.permute(0, 2, 1, 3)# Shape of output: (batch_size * num_heads, no. of queries or key-value# pairs, num_hiddens / num_heads)return X.reshape(-1, X.shape[2], X.shape[3])def transpose_output(self, X):"""Reverse the operation of transpose_qkv."""X = X.reshape(-1, self.num_heads, X.shape[1], X.shape[2])X = X.permute(0, 2, 1, 3)return X.reshape(X.shape[0], X.shape[1], -1)def forward(self, queries, keys, values, valid_lens):# Shape of queries, keys, or values:# (batch_size, no. of queries or key-value pairs, num_hiddens)# Shape of valid_lens: (batch_size,) or (batch_size, no. of queries)# After transposing, shape of output queries, keys, or values:# (batch_size * num_heads, no. of queries or key-value pairs,# num_hiddens / num_heads)queries = self.transpose_qkv(self.W_q(queries))keys = self.transpose_qkv(self.W_k(keys))values = self.transpose_qkv(self.W_v(values))if valid_lens is not None:# On axis 0, copy the first item (scalar or vector) for num_heads# times, then copy the next item, and so onvalid_lens = torch.repeat_interleave(valid_lens, repeats=self.num_heads, dim=0)# Shape of output: (batch_size * num_heads, no. of queries,# num_hiddens / num_heads)output = self.attention(queries, keys, values, valid_lens)# Shape of output_concat: (batch_size, no. of queries, num_hiddens)output_concat = self.transpose_output(output)return self.W_o(output_concat)

   上面的代码透露了一个问题,多头注意力并不是简单的创建N个相同的注意力进行运算,而是通过nn.LazyLinear投影后,在num_hiddens维度进行num_heads个数的划分,注意经过nn.LazyLinear后,num_hiddens维度的每一个数据其实都和输入的数据有关联,因此这个时候进行num_heads个数的划分是有效的,因为这个时候每个num_heads的组都携带了输入数据的全部信息。





位置前馈网络


   引入非线性计算,加强网络认知能力。代码如下:

class PositionWiseFFN(nn.Module):  #@save"""The positionwise feed-forward network."""def __init__(self, ffn_num_hiddens, ffn_num_outputs):super().__init__()self.dense1 = nn.LazyLinear(ffn_num_hiddens)self.relu = nn.ReLU()self.dense2 = nn.LazyLinear(ffn_num_outputs)def forward(self, X):return self.dense2(self.relu(self.dense1(X)))




残差连接和层归一化


   这个结构主要将原始输入叠加到一个其他计算(例如注意力)的输出上面,这样可以保证输出不会丢失原始输入信息,这个在网络层数大的情况下有奇效。代码如下:

class AddNorm(nn.Module):  #@save"""The residual connection followed by layer normalization."""def __init__(self, norm_shape, dropout):super().__init__()self.dropout = nn.Dropout(dropout)self.ln = nn.LayerNorm(norm_shape)def forward(self, X, Y):return self.ln(self.dropout(Y) + X)




Transformer Encoder结构


   下面是transformer-Encoder部分的代码

class TransformerEncoderBlock(nn.Module):  #@save"""The Transformer encoder block."""def __init__(self, num_hiddens, ffn_num_hiddens, num_heads, dropout,use_bias=False):super().__init__()self.attention = MultiHeadAttention(num_hiddens, num_heads,dropout, use_bias)self.addnorm1 = AddNorm(num_hiddens, dropout)self.ffn = PositionWiseFFN(ffn_num_hiddens, num_hiddens)self.addnorm2 = AddNorm(num_hiddens, dropout)def forward(self, X, valid_lens):Y = self.addnorm1(X, self.attention(X, X, X, valid_lens))return self.addnorm2(Y, self.ffn(Y))

   从代码中可以知道,其计算过程就是多头注意力、残差连接及层归一化、位置前馈网络、残差连接及层归一化的过程。





Transformer Decoder结构


  下面是transformer-Decoder部分的代码

class TransformerDecoderBlock(nn.Module):# The i-th block in the Transformer decoderdef __init__(self, num_hiddens, ffn_num_hiddens, num_heads, dropout, i):super().__init__()self.i = iself.attention1 = MultiHeadAttention(num_hiddens, num_heads,dropout)self.addnorm1 = AddNorm(num_hiddens, dropout)self.attention2 = MultiHeadAttention(num_hiddens, num_heads,dropout)self.addnorm2 = AddNorm(num_hiddens, dropout)self.ffn = PositionWiseFFN(ffn_num_hiddens, num_hiddens)self.addnorm3 = AddNorm(num_hiddens, dropout)def forward(self, X, state):enc_outputs, enc_valid_lens = state[0], state[1]# During training, all the tokens of any output sequence are processed# at the same time, so state[2][self.i] is None as initialized. When# decoding any output sequence token by token during prediction,# state[2][self.i] contains representations of the decoded output at# the i-th block up to the current time stepif state[2][self.i] is None:key_values = Xelse:key_values = torch.cat((state[2][self.i], X), dim=1)state[2][self.i] = key_valuesif self.training:batch_size, num_steps, _ = X.shape# Shape of dec_valid_lens: (batch_size, num_steps), where every# row is [1, 2, ..., num_steps]dec_valid_lens = torch.arange(1, num_steps + 1, device=X.device).repeat(batch_size, 1)else:dec_valid_lens = None# Self-attentionX2 = self.attention1(X, key_values, key_values, dec_valid_lens)Y = self.addnorm1(X, X2)# Encoder-decoder attention. Shape of enc_outputs:# (batch_size, num_steps, num_hiddens)Y2 = self.attention2(Y, enc_outputs, enc_outputs, enc_valid_lens)Z = self.addnorm2(Y, Y2)return self.addnorm3(Z, self.ffn(Z)), state

   从代码中可以知道,其计算过程就是多头注意力、残差连接及层归一化、多头注意力、残差连接及层归一化、位置前馈网络、残差连接及层归一化的过程。





基于transformer的类似seq2seq 英文翻译中文 的实例


   关于dataset部分的内容,请参考前面seq2seq相关文章。



完整代码如下

  

import os
import random
import torch
import math
from torch import nn
from torch.nn import functional as F
import numpy as np
import time
import visdom
import collections
import dataset
class Accumulator:"""在n个变量上累加"""def __init__(self, n):"""Defined in :numref:`sec_softmax_scratch`"""self.data = [0.0] * ndef add(self, *args):self.data = [a + float(b) for a, b in zip(self.data, args)]def reset(self):self.data = [0.0] * len(self.data)def __getitem__(self, idx):return self.data[idx]class Timer:"""记录多次运行时间"""def __init__(self):"""Defined in :numref:`subsec_linear_model`"""self.times = []self.start()def start(self):"""启动计时器"""self.tik = time.time()def stop(self):"""停止计时器并将时间记录在列表中"""self.times.append(time.time() - self.tik)return self.times[-1]def avg(self):"""返回平均时间"""return sum(self.times) / len(self.times)def sum(self):"""返回时间总和"""return sum(self.times)def cumsum(self):"""返回累计时间"""return np.array(self.times).cumsum().tolist()
class Encoder(nn.Module):"""编码器-解码器架构的基本编码器接口"""def __init__(self, **kwargs):# 调用父类nn.Module的构造函数,确保正确初始化super(Encoder, self).__init__(**kwargs)def forward(self, X, *args):# 抛出未实现错误,意味着该方法需要在子类中具体实现raise NotImplementedErrorclass Decoder(nn.Module):"""编码器-解码器架构的基本解码器接口Defined in :numref:`sec_encoder-decoder`"""def __init__(self, **kwargs):# 调用父类nn.Module的构造函数,确保正确初始化super(Decoder, self).__init__(**kwargs)def init_state(self, enc_outputs, *args):# 抛出未实现错误,意味着该方法需要在子类中具体实现raise NotImplementedErrordef forward(self, X, state):# 抛出未实现错误,意味着该方法需要在子类中具体实现raise NotImplementedErrorclass EncoderDecoder(nn.Module):"""编码器-解码器架构的基类Defined in :numref:`sec_encoder-decoder`"""def __init__(self, encoder, decoder, **kwargs):# 调用父类nn.Module的构造函数,确保正确初始化super(EncoderDecoder, self).__init__(**kwargs)# 将传入的编码器实例赋值给类的属性self.encoder = encoder# 将传入的解码器实例赋值给类的属性self.decoder = decoderdef forward(self, enc_X, dec_X, enc_X_valid_len, *args):# 调用编码器的前向传播方法,处理输入的编码器输入数据enc_Xenc_outputs = self.encoder(enc_X, enc_X_valid_len, *args)# 调用解码器的init_state方法,根据编码器的输出初始化解码器的状态dec_state = self.decoder.init_state(enc_outputs, enc_X_valid_len)# 调用解码器的前向传播方法,处理输入的解码器输入数据dec_X和初始化后的状态return self.decoder(dec_X, dec_state)def masked_softmax(X, valid_lens):  #@save"""Perform softmax operation by masking elements on the last axis."""# X: 3D tensor, valid_lens: 1D or 2D tensordef _sequence_mask(X, valid_len, value=0):maxlen = X.size(1)mask = torch.arange((maxlen), dtype=torch.float32,device=X.device)[None, :] < valid_len[:, None]X[~mask] = valuereturn Xif valid_lens is None:return nn.functional.softmax(X, dim=-1)else:shape = X.shapeif valid_lens.dim() == 1:valid_lens = torch.repeat_interleave(valid_lens, shape[1])else:valid_lens = valid_lens.reshape(-1)# On the last axis, replace masked elements with a very large negative# value, whose exponentiation outputs 0X = _sequence_mask(X.reshape(-1, shape[-1]), valid_lens, value=-1e6)return nn.functional.softmax(X.reshape(shape), dim=-1)class DotProductAttention(nn.Module):  #@save"""Scaled dot product attention."""def __init__(self, dropout):super().__init__()self.dropout = nn.Dropout(dropout)# Shape of queries: (batch_size, no. of queries, d)# Shape of keys: (batch_size, no. of key-value pairs, d)# Shape of values: (batch_size, no. of key-value pairs, value dimension)# Shape of valid_lens: (batch_size,) or (batch_size, no. of queries)def forward(self, queries, keys, values, valid_lens=None):d = queries.shape[-1]# Swap the last two dimensions of keys with keys.transpose(1, 2)scores = torch.bmm(queries, keys.transpose(1, 2)) / math.sqrt(d)self.attention_weights = masked_softmax(scores, valid_lens)return torch.bmm(self.dropout(self.attention_weights), values)class MultiHeadAttention(nn.Module):  #@save"""Multi-head attention."""def __init__(self, num_hiddens, num_heads, dropout, bias=False, **kwargs):super().__init__()self.num_heads = num_headsself.attention = DotProductAttention(dropout)self.W_q = nn.LazyLinear(num_hiddens, bias=bias)self.W_k = nn.LazyLinear(num_hiddens, bias=bias)self.W_v = nn.LazyLinear(num_hiddens, bias=bias)self.W_o = nn.LazyLinear(num_hiddens, bias=bias)def transpose_qkv(self, X):"""Transposition for parallel computation of multiple attention heads."""# Shape of input X: (batch_size, no. of queries or key-value pairs,# num_hiddens). Shape of output X: (batch_size, no. of queries or# key-value pairs, num_heads, num_hiddens / num_heads)X = X.reshape(X.shape[0], X.shape[1], self.num_heads, -1)# Shape of output X: (batch_size, num_heads, no. of queries or key-value# pairs, num_hiddens / num_heads)X = X.permute(0, 2, 1, 3)# Shape of output: (batch_size * num_heads, no. of queries or key-value# pairs, num_hiddens / num_heads)return X.reshape(-1, X.shape[2], X.shape[3])def transpose_output(self, X):"""Reverse the operation of transpose_qkv."""X = X.reshape(-1, self.num_heads, X.shape[1], X.shape[2])X = X.permute(0, 2, 1, 3)return X.reshape(X.shape[0], X.shape[1], -1)def forward(self, queries, keys, values, valid_lens):# Shape of queries, keys, or values:# (batch_size, no. of queries or key-value pairs, num_hiddens)# Shape of valid_lens: (batch_size,) or (batch_size, no. of queries)# After transposing, shape of output queries, keys, or values:# (batch_size * num_heads, no. of queries or key-value pairs,# num_hiddens / num_heads)queries = self.transpose_qkv(self.W_q(queries))keys = self.transpose_qkv(self.W_k(keys))values = self.transpose_qkv(self.W_v(values))if valid_lens is not None:# On axis 0, copy the first item (scalar or vector) for num_heads# times, then copy the next item, and so onvalid_lens = torch.repeat_interleave(valid_lens, repeats=self.num_heads, dim=0)# Shape of output: (batch_size * num_heads, no. of queries,# num_hiddens / num_heads)output = self.attention(queries, keys, values, valid_lens)# Shape of output_concat: (batch_size, no. of queries, num_hiddens)output_concat = self.transpose_output(output)return self.W_o(output_concat)class PositionWiseFFN(nn.Module):  #@save"""The positionwise feed-forward network."""def __init__(self, ffn_num_hiddens, ffn_num_outputs):super().__init__()self.dense1 = nn.LazyLinear(ffn_num_hiddens)self.relu = nn.ReLU()self.dense2 = nn.LazyLinear(ffn_num_outputs)def forward(self, X):return self.dense2(self.relu(self.dense1(X)))class AddNorm(nn.Module):  #@save"""The residual connection followed by layer normalization."""def __init__(self, norm_shape, dropout):super().__init__()self.dropout = nn.Dropout(dropout)self.ln = nn.LayerNorm(norm_shape)def forward(self, X, Y):return self.ln(self.dropout(Y) + X)class TransformerEncoderBlock(nn.Module):  #@save"""The Transformer encoder block."""def __init__(self, num_hiddens, ffn_num_hiddens, num_heads, dropout,use_bias=False):super().__init__()self.attention = MultiHeadAttention(num_hiddens, num_heads,dropout, use_bias)self.addnorm1 = AddNorm(num_hiddens, dropout)self.ffn = PositionWiseFFN(ffn_num_hiddens, num_hiddens)self.addnorm2 = AddNorm(num_hiddens, dropout)def forward(self, X, valid_lens):Y = self.addnorm1(X, self.attention(X, X, X, valid_lens))return self.addnorm2(Y, self.ffn(Y))class PositionalEncoding(nn.Module):  #@save"""Positional encoding."""def __init__(self, num_hiddens, dropout, max_len=1000):super().__init__()self.dropout = nn.Dropout(dropout)# Create a long enough Pself.P = torch.zeros((1, max_len, num_hiddens))X = torch.arange(max_len, dtype=torch.float32).reshape(-1, 1) / torch.pow(10000, torch.arange(0, num_hiddens, 2, dtype=torch.float32) / num_hiddens)self.P[:, :, 0::2] = torch.sin(X)self.P[:, :, 1::2] = torch.cos(X)def forward(self, X):X = X + self.P[:, :X.shape[1], :].to(X.device)return self.dropout(X)class TransformerEncoder(Encoder):  #@save"""The Transformer encoder."""def __init__(self, vocab_size, num_hiddens, ffn_num_hiddens,num_heads, num_blks, dropout, use_bias=False):super().__init__()self.num_hiddens = num_hiddensself.embedding = nn.Embedding(vocab_size, num_hiddens)self.pos_encoding = PositionalEncoding(num_hiddens, dropout)self.blks = nn.Sequential()for i in range(num_blks):self.blks.add_module("block"+str(i), TransformerEncoderBlock(num_hiddens, ffn_num_hiddens, num_heads, dropout, use_bias))def forward(self, X, valid_lens):# Since positional encoding values are between -1 and 1, the embedding# values are multiplied by the square root of the embedding dimension# to rescale before they are summed up# X[batch_size, seq_len, num_hidden]X = self.pos_encoding(self.embedding(X) * math.sqrt(self.num_hiddens))self.attention_weights = [None] * len(self.blks)for i, blk in enumerate(self.blks):X = blk(X, valid_lens)self.attention_weights[i] = blk.attention.attention.attention_weights# X[batch_size, seq_len, num_hidden]return Xclass TransformerDecoderBlock(nn.Module):# The i-th block in the Transformer decoderdef __init__(self, num_hiddens, ffn_num_hiddens, num_heads, dropout, i):super().__init__()self.i = iself.attention1 = MultiHeadAttention(num_hiddens, num_heads,dropout)self.addnorm1 = AddNorm(num_hiddens, dropout)self.attention2 = MultiHeadAttention(num_hiddens, num_heads,dropout)self.addnorm2 = AddNorm(num_hiddens, dropout)self.ffn = PositionWiseFFN(ffn_num_hiddens, num_hiddens)self.addnorm3 = AddNorm(num_hiddens, dropout)def forward(self, X, state):enc_outputs, enc_valid_lens = state[0], state[1]# During training, all the tokens of any output sequence are processed# at the same time, so state[2][self.i] is None as initialized. When# decoding any output sequence token by token during prediction,# state[2][self.i] contains representations of the decoded output at# the i-th block up to the current time stepif state[2][self.i] is None:key_values = Xelse:key_values = torch.cat((state[2][self.i], X), dim=1)state[2][self.i] = key_valuesif self.training:batch_size, num_steps, _ = X.shape# Shape of dec_valid_lens: (batch_size, num_steps), where every# row is [1, 2, ..., num_steps]dec_valid_lens = torch.arange(1, num_steps + 1, device=X.device).repeat(batch_size, 1)else:dec_valid_lens = None# Self-attentionX2 = self.attention1(X, key_values, key_values, dec_valid_lens)Y = self.addnorm1(X, X2)# Encoder-decoder attention. Shape of enc_outputs:# (batch_size, num_steps, num_hiddens)Y2 = self.attention2(Y, enc_outputs, enc_outputs, enc_valid_lens)Z = self.addnorm2(Y, Y2)return self.addnorm3(Z, self.ffn(Z)), stateclass TransformerDecoder(Decoder):def __init__(self, vocab_size, num_hiddens, ffn_num_hiddens, num_heads,num_blks, dropout):super().__init__()self.num_hiddens = num_hiddensself.num_blks = num_blksself.embedding = nn.Embedding(vocab_size, num_hiddens)self.pos_encoding = PositionalEncoding(num_hiddens, dropout)self.blks = nn.Sequential()for i in range(num_blks):self.blks.add_module("block"+str(i), TransformerDecoderBlock(num_hiddens, ffn_num_hiddens, num_heads, dropout, i))self.dense = nn.LazyLinear(vocab_size)def init_state(self, enc_outputs, enc_valid_lens):return [enc_outputs, enc_valid_lens, [None] * self.num_blks]def forward(self, X, state):X = self.pos_encoding(self.embedding(X) * math.sqrt(self.num_hiddens))self._attention_weights = [[None] * len(self.blks) for _ in range (2)]for i, blk in enumerate(self.blks):X, state = blk(X, state)# Decoder self-attention weightsself._attention_weights[0][i] = blk.attention1.attention.attention_weights# Encoder-decoder attention weightsself._attention_weights[1][i] = blk.attention2.attention.attention_weightsreturn self.dense(X), state@propertydef attention_weights(self):return self._attention_weightsdef sequence_mask(X, valid_len, value=0):"""在序列中屏蔽不相关的项"""maxlen = X.size(1)mask = torch.arange((maxlen), dtype=torch.float32,device=X.device)[None, :] < valid_len[:, None]X[~mask] = valuereturn Xclass MaskedSoftmaxCELoss(nn.CrossEntropyLoss):"""带遮蔽的softmax交叉熵损失函数"""# pred的形状:(batch_size,num_steps,vocab_size)# label的形状:(batch_size,num_steps)# valid_len的形状:(batch_size,)def forward(self, pred, label, valid_len):weights = torch.ones_like(label)weights = sequence_mask(weights, valid_len)self.reduction='none'unweighted_loss = super(MaskedSoftmaxCELoss, self).forward(pred.permute(0, 2, 1), label)weighted_loss = (unweighted_loss * weights).mean(dim=1)return weighted_lossdef grad_clipping(net, theta):  #@save"""裁剪梯度"""if isinstance(net, nn.Module):params = [p for p in net.parameters() if p.requires_grad]else:params = net.paramsnorm = torch.sqrt(sum(torch.sum((p.grad ** 2)) for p in params))if norm > theta:for param in params:param.grad[:] *= theta / normdef train_seq2seq(net, data_iter, lr, num_epochs, tgt_vocab, device):"""训练序列到序列模型"""def xavier_init_weights(m):if type(m) == nn.Linear:nn.init.xavier_uniform_(m.weight)if type(m) == nn.GRU:for param in m._flat_weights_names:if "weight" in param:nn.init.xavier_uniform_(m._parameters[param])net.apply(xavier_init_weights)net.to(device)optimizer = torch.optim.Adam(net.parameters(), lr=lr)loss = MaskedSoftmaxCELoss()net.train()vis = visdom.Visdom(env=u'test1', server="http://127.0.0.1", port=8097)animator = visfor epoch in range(num_epochs):timer = Timer()metric = Accumulator(2)  # 训练损失总和,词元数量for batch in data_iter:#清零(reset)优化器中的梯度缓存optimizer.zero_grad()# x.shape = [batch_size, num_steps]X, X_valid_len, Y, Y_valid_len = [x.to(device) for x in batch]# bos.shape = batch_size 个 bos-idbos = torch.tensor([tgt_vocab['<bos>']] * Y.shape[0],device=device).reshape(-1, 1)# dec_input.shape = (batch_size, num_steps)# 解码器的输入通常由序列的起始标志 bos 和目标序列(去掉末尾的部分 Y[:, :-1])组成。dec_input = torch.cat([bos, Y[:, :-1]], 1)  # 强制教学# Y_hat的形状:(batch_size,num_steps,vocab_size)Y_hat, _ = net(X, dec_input, X_valid_len)l = loss(Y_hat, Y, Y_valid_len)l.sum().backward()      # 损失函数的标量进行“反向传播”grad_clipping(net, 1)num_tokens = Y_valid_len.sum()optimizer.step()with torch.no_grad():metric.add(l.sum(), num_tokens)if (epoch + 1) % 10 == 0:# print(predict('你是?'))# print(epoch)# animator.add(epoch + 1, )if epoch == 9:# 清空图表:使用空数组来替换现有内容vis.line(X=np.array([0]), Y=np.array([0]), win='train_ch8', update='replace')# _loss_val = l# _loss_val = _loss_val.cpu().sum().detach().numpy()vis.line(X=np.array([epoch + 1]),Y=[ metric[0] / metric[1]],win='train_ch8',update='append',opts={'title': 'train_ch8','xlabel': 'epoch','ylabel': 'loss','linecolor': np.array([[0, 0, 255]]),  # 蓝色线条})print(f'loss {metric[0] / metric[1]:.3f}, {metric[1] / timer.stop():.1f} 'f'tokens/sec on {str(device)}')torch.save(net.cpu().state_dict(), 'model_h.pt')  # [[6]]torch.save(net.cpu(), 'model.pt')  # [[6]]def predict_seq2seq(net, src_sentence, src_vocab, tgt_vocab, num_steps,device, save_attention_weights=False):"""序列到序列模型的预测"""# 在预测时将net设置为评估模式net.eval()src_tokens = src_vocab[src_sentence.lower().split(' ')] + [src_vocab['<eos>']]enc_valid_len = torch.tensor([len(src_tokens)], device=device)src_tokens = dataset.truncate_pad(src_tokens, num_steps, src_vocab['<pad>'])# 添加批量轴enc_X = torch.unsqueeze(torch.tensor(src_tokens, dtype=torch.long, device=device), dim=0)enc_outputs = net.encoder(enc_X, enc_valid_len)dec_state = net.decoder.init_state(enc_outputs, enc_valid_len)# 添加批量轴dec_X = torch.unsqueeze(torch.tensor([tgt_vocab['<bos>']], dtype=torch.long, device=device), dim=0)output_seq, attention_weight_seq = [], []for _ in range(num_steps):Y, dec_state = net.decoder(dec_X, dec_state)# 我们使用具有预测最高可能性的词元,作为解码器在下一时间步的输入dec_X = Y.argmax(dim=2)pred = dec_X.squeeze(dim=0).type(torch.int32).item()# 保存注意力权重(稍后讨论)if save_attention_weights:# 2'st block&2'st attentionattention_weight_seq.append(net.decoder.attention_weights[1][1].cpu())# 一旦序列结束词元被预测,输出序列的生成就完成了if pred == tgt_vocab['<eos>']:breakoutput_seq.append(pred)return ' '.join(tgt_vocab.to_tokens(output_seq)), attention_weight_seqdef bleu(pred_seq, label_seq, k):  #@save"""计算BLEU"""pred_tokens, label_tokens = pred_seq.split(' '), [i for i in label_seq]len_pred, len_label = len(pred_tokens), len(label_tokens)score = math.exp(min(0, 1 - len_label / len_pred))for n in range(1, k + 1):num_matches, label_subs = 0, collections.defaultdict(int)for i in range(len_label - n + 1):label_subs[' '.join(label_tokens[i: i + n])] += 1for i in range(len_pred - n + 1):if label_subs[' '.join(pred_tokens[i: i + n])] > 0:num_matches += 1label_subs[' '.join(pred_tokens[i: i + n])] -= 1score *= math.pow(num_matches / (len_pred - n + 1), math.pow(0.5, n))return scoredef try_gpu(i=0):"""如果存在,则返回gpu(i),否则返回cpu()Defined in :numref:`sec_use_gpu`"""if torch.cuda.device_count() >= i + 1:return torch.device(f'cuda:{i}')return torch.device('cpu')from matplotlib import pyplot as plt
import matplotlib
# from matplotlib_inline import backend_inline
def show_heatmaps(matrices, xlabel, ylabel, titles=None, figsize=(2.5, 2.5),cmap='Reds'):"""显示矩阵的热图(Heatmaps)。这个函数旨在以子图网格的形式绘制多个矩阵,通常用于可视化注意力权重等。参数:matrices (numpy.ndarray 或 torch.Tensor 数组): 一个四维数组,形状应为 (num_rows, num_cols, height, width)。其中,num_rows 和 num_cols 决定了子图网格的布局,height 和 width 是每个热图(即每个矩阵)的维度。xlabel (str): 所有最底行子图的 x 轴标签。ylabel (str): 所有最左列子图的 y 轴标签。titles (list of str, optional): 一个包含 num_cols 个标题的列表,用于设置每一列子图的标题。默认 None。figsize (tuple, optional): 整个图形(figure)的大小。默认 (2.5, 2.5)。cmap (str, optional): 用于绘制热图的颜色映射(colormap)。默认 'Reds'。"""# 导入所需的 matplotlib 模块,确保图形在 Jupyter/IPython 环境中正确显示为 SVG 格式# (假设在包含这个函数的环境中已经导入了 matplotlib 的 backend_inline)# backend_inline.set_matplotlib_formats('svg')matplotlib.use('TkAgg')# 从输入的 matrices 形状中解构出子图网格的行数和列数# 假设 matrices 的形状是 (num_rows, num_cols, height, width)num_rows, num_cols, _, _ = matrices.shape# 创建一个包含多个子图(axes)的图形(fig)# fig: 整个图形对象# axes: 一个 num_rows x num_cols 的子图对象数组fig, axes = plt.subplots(num_rows, num_cols, figsize=figsize,sharex=True,    # 所有子图共享 x 轴刻度sharey=True,    # 所有子图共享 y 轴刻度squeeze=False   # 即使只有一行或一列,也强制返回二维数组的 axes,方便后续循环)# 遍历子图的行和对应的矩阵行# i 是行索引, row_axes 是当前行的子图数组, row_matrices 是当前行的矩阵数组for i, (row_axes, row_matrices) in enumerate(zip(axes, matrices)):# 遍历当前行中的子图和对应的矩阵# j 是列索引, ax 是当前的子图对象, matrix 是当前的待绘矩阵for j, (ax, matrix) in enumerate(zip(row_axes, row_matrices)):# 使用 ax.imshow() 绘制热图# matrix.detach().numpy():将 PyTorch Tensor 转换为 numpy 数组,并从计算图中分离(如果它是 Tensor)# cmap:指定颜色映射pcm = ax.imshow(matrix.detach().numpy(), cmap=cmap)# --- 设置轴标签和标题 ---# 只有最底行 (i == num_rows - 1) 的子图才显示 x 轴标签if i == num_rows - 1:ax.set_xlabel(xlabel)# 只有最左列 (j == 0) 的子图才显示 y 轴标签if j == 0:ax.set_ylabel(ylabel)# 如果提供了标题列表,则设置当前列的子图标题(所有行共享列标题)if titles:ax.set_title(titles[j])# --- 添加颜色条(Colorbar) ---# 为整个图形添加一个颜色条,用于表示数值和颜色的对应关系# pcm: 之前绘制的第一个热图返回的 Colormap # ax=axes: 颜色条将参照整个子图网格进行定位和缩放# shrink=0.6: 缩小颜色条的高度/长度,使其只占图形高度的 60%fig.colorbar(pcm, ax=axes, shrink=0.6)plt.show()if __name__ == '__main__':num_hiddens, num_blks, dropout = 256, 2, 0.2ffn_num_hiddens, num_heads = 64, 4batch_size = 1024num_steps = 10lr, num_epochs, device = 0.001, 2000, try_gpu()train_iter, src_vocab, tgt_vocab, source, target = dataset.load_data(batch_size, num_steps)encoder = TransformerEncoder(len(src_vocab), num_hiddens, ffn_num_hiddens, num_heads,num_blks, dropout)decoder = TransformerDecoder(len(tgt_vocab), num_hiddens, ffn_num_hiddens, num_heads,num_blks, dropout)net = EncoderDecoder(encoder, decoder)is_train = Falseis_show = Trueif is_train:train_seq2seq(net, train_iter, lr, num_epochs, tgt_vocab, device)elif is_show:state_dict = torch.load('model_h.pt')net.load_state_dict(state_dict)net.to(device)src_text = "Call us."translation, attention_weight_seq = predict_seq2seq(net, src_text, src_vocab, tgt_vocab, num_steps, device, True)# attention_weights = torch.eye(10).reshape((1, 1, 10, 10))# (num_rows, num_cols, height, width)print(f'translation={translation}')# print(attention_weight_seq.shape)stacked_tensor = torch.stack(attention_weight_seq, dim=0).permute(2, 1, 0, 3)print(stacked_tensor.shape)show_heatmaps(stacked_tensor,xlabel='Attention weight', ylabel='Decode Step', titles=['Head %d' % i for i in range(1, 5)])else:state_dict = torch.load('model_h.pt')net.load_state_dict(state_dict)net.to(device)C = 0C1 = 0for i in range(2000):# print(source[i])# print(target[i])translation, attention_weight_seq = predict_seq2seq(net, source[i], src_vocab, tgt_vocab, num_steps, device)score = bleu(translation, target[i], k=2)if score > 0.0:C = C + 1if score > 0.8:C1 = C1 + 1print(f'{source[i]} => {translation}, bleu {score:.3f}')print(f'Counter(bleu > 0) = {C}')print(f'Valid-Counter(bleu > 0.8) = {C1}')

  我们先看一下TransformerEncoder做了什么:

  • 和前面类似,首先输入做了embedding,然后叠加位置编码
  • 然后循环计算每一个TransformerEncoderBlock

  TransformerEncoderBlock中做了:

  • 计算自注意力
  • 残差连接和层归一化
  • 位置前馈网络
  • 残差连接和层归一化

  然后我们来看看TransformerDecoder做了什么:

  • 和TransformerEncoder类似,首先输入做了embedding,然后叠加位置编码
  • 然后循环计算每一个TransformerDecoderBlock
  • 最后接一个全连接,映射到词表大小

  TransformerDecoderBlock做了:

  • 首先准备自注意力的\(K_1 V_1\),其更新过程是每次输入X的拼接过程
  • 将输入X 作为Q,\(K_1 V_1\)作为KV开始自注意力的运算过程
  • 残差连接和层归一化,得到Y
  • 将enc_output作为KV, Y作为Q,计算编码器-解码器注意力
  • 残差连接和层归一化
  • 位置前馈网络
  • 残差连接和层归一化

  下面是训练和测试的一些结果

rep_img
rep_img

  从上面的图可以看到,这个模型的效果比seq2seq原始模型、seq2seq带注意力的模型要好很多。

  此外,下面是我们翻译:"Call us."-> "联 系 我 们 。" 的attention weight的可视化(block=2, head=4, mask=3)

rep_img

  从每一个decode step的每个head的注意力权重来看,不同head关注了不一样的重点,有效的识别了特征中的多种属性,提高了模型的能力。





后记


    本文介绍了transformer结构以及其示例,这里也引入了很多现在LLM的很多概念,例如:位置编码等。

参考文献

  • https://d2l.ai/chapter_attention-mechanisms-and-transformers/transformer.html



打赏、订阅、收藏、丢香蕉、硬币,请关注公众号(攻城狮的搬砖之路)
qrc_img

PS: 请尊重原创,不喜勿喷。

PS: 要转载请注明出处,本人版权所有。

PS: 有问题请留言,看到后我会第一时间回复。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/967178.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

京东商品详情接口终极突破:从多接口联动解析到数据全息重构

京东商品详情接口长期以来以 “数据碎片化、签名动态化、反爬层叠化” 著称,常规采集方案往往因单一接口依赖导致数据缺失或请求封禁。本文跳出 “单接口模拟” 的固化思维,通过逆向商品详情页的完整数据链路,实现 …

实用指南:On-Page SEO完全指南:从关键词策略到内容优化

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

2025年品质卓越的羊毛地毯品牌综合推荐与选购指南

摘要 随着家居品质需求的不断提升,2025年羊毛地毯行业迎来新一轮消费升级。现代消费者不仅关注产品美观度,更重视环保属性、工艺水准与文化内涵。据统计,中国高端手工地毯市场规模较去年增长23.6%,其中天然植物染色…

2025年品质卓越的羊毛地毯工厂综合推荐与选择指南

摘要 随着家居装饰行业的消费升级,手工羊毛地毯市场在2025年迎来新一轮增长期。消费者对地毯的品质、环保性和设计美感要求显著提升,推动行业向高端化、个性化方向发展。本文基于市场调研和用户口碑,为您精选十家品…

20232415 2025-2026-1 《网络与系统攻防技术》 实验五实验报告

一、实验内容 1.1DNS 域名与 IP 关联信息搜集:从指定 3 个域名中任选其一,运用 whois、dig、nslookup、traceroute、tracert 及在线工具,获取 DNS 注册人及联系方式、域名对应 IP 地址、IP 注册人及联系方式、IP 所…

CSP2025反思——于诗涵

2025CSP反思——于诗涵 通过本次比赛,让我对自己的不足有了更加深刻的了解,总结出了以下几点: J组:100+100+45+64=309 1.对比去年的成绩,我有了显著的提升,也吸取了去年因为打错变量等各种原因在简单题上丢了很多…

接雨水算法全解析:从错误到3种最优解法(含扩展与思路Trigger)

接雨水算法全解析:从错误到3种最优解法(含扩展与思路Trigger)接雨水问题是数组类算法的经典“拦路虎”——既考察对“凹陷容量计算”的本质理解,又要求灵活运用单调栈、双指针等数据结构/技巧。本文将从最常见的错…

详细介绍:Android APK签名机制的工作原理、结构差异、安全局限与优势

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

Java位运算符概览

运算符名称描述示例& 按位与 两个操作数对应位都为1时结果为1 5 & 3 = 1| 按位或 两个操作数对应位有一个为1时结果为1 5 | 3 = 7^ 按位异或 两个操作数对应位不同时结果为1 5 ^ 3 = 6~ 按位取反 操作数的每位…

C#性能优化基础:高CPU使用率(trace)

接上一篇:C#性能优化基础:内存诊断(dump)内存说完了,另外一个C#性能优化需要关注的点就是高CPU使用率了,所谓高CPU使用率,其实就是程序在执行大量的计算,这些计算也许是正常的,也可能是异常,比如死循环、多线…

详细介绍:Linux Bash(一)

详细介绍:Linux Bash(一)pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", &…

做一个简单的贪吃蛇游戏

简单 C++ 贪吃蛇游戏(控制台版) 基于控制台实现的贪吃蛇游戏,核心功能包括:方向控制、食物生成、碰撞检测、分数统计,适合新手学习循环、数组、逻辑控制的实战案例。 开发环境 编译器:支持 C++11 及以上(Dev-C+…

pytest测试range内置函数

pytest测试range内置函数点击查看代码 import pytest# ------------------------------ # 一、基础操作:正序序列测试用例 # ------------------------------ @pytest.mark.parametrize("start, end, step, expe…

WPS---功能设置

WPS作为一款国内开发的办公软件,用确实好用,但是恶心也确实挺恶心的,就比如这个必须登录才能使用的功能,简直就是反人类。 操作:点击【登录】旁边的【设置】选择【配置和修复工具】然后点击【高级】功能定制:关闭…

自动化测大样例

省流: for((i=1;i<=10;i++)) do g++ T1.cpp -std=c++14 -Wall -Wextra -O a.exe cp ex_edit$i.in edit.in ./a.exe diff -s -Z ex_edit$i.ans edit.out done#include<bits/stdc++.h> #include <unistd.h&…

[Debug记录] 分布式实验-FTP编程

分布式课程学习JavaSocket和TCP/UDP,第一次实验要求基于Java Socket TCP和UDP实现一个简易的网络文件服务程序,包含服务器端FileServer和客户端FileClient。完成实验的过程中遇到一些比较典型的bug,记录一下。 用来…

2025年当下行业内知名的旧房翻新企业排名与推荐

摘要 旧房翻新行业在2025年迎来快速发展,受城市化进程加速和住房老龄化影响,市场需求持续增长,预计年增长率达15%以上(数据来源:中国建筑装饰协会2025年报告)。本文基于行业调研、用户口碑和专业技术评估,精选排…

2025年国内旧房翻新公司综合实力排行榜TOP10推荐

摘要 随着城市化进程加速和居民生活品质提升,2025年旧房翻新行业迎来爆发式增长,市场规模预计突破8000亿元。旧房翻新不仅解决老房安全隐患,更是提升居住舒适度和资产价值的重要途径。本文基于企业实力、技术水准、…

现今国内口碑好的旧房翻新企业排行

摘要 随着城市化进程加速和居民生活品质提升,2025年旧房翻新行业迎来爆发式增长。据统计,国内旧房翻新市场规模已突破万亿,年均增长率达18.7%。本文基于用户口碑、施工质量、服务体验等维度,综合评出现今国内十大旧…