最新要闻

广告

手机

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

家电

Transformer详解

来源:博客园

1.理论知识讲解

transfromer这个模型在机器翻译方面就是做如下事情由一种语言到另一种语言

下图中六个encoder在结构上是完全相同的但是每个encoder的内部的参数不完全相同,也就是在训练的时候6个encoder都在训练,并不是一个在训练,然后其它五个去拷贝这个encoder,六个decoder的结构也是完全相同的;每个encoder和decoder是不一样的

1.1 Encoder

1.1.1输入部分

该部分分为两个小部分

1⃣️Embedding下图中有12个字,按字切分,然后每个字定义一个512维的字向量(可以使用word2vec或者随机初始化)


(资料图片)

2⃣️位置嵌入使用transformer为什么需要位置嵌入,因为在encoder中的第二个部分中多头注意力机制中是将输入的词并行输入的,并不是像rnn那样是一个词一个词的输入,但是这样并行输入有缺少了rnn那种词与词之间的先后关系,因此需要引入词位置的编码。下图rnn是一个词一个词的输入,输入“我”,处理“我”,在输入“爱”,在接收“我”的信息之后再处理“爱”,这样保证词与词之间的先后顺序不会乱,但是一个词一个词的输入导致效率低

位置编码公式:公式中的pos就是单词或者是字的位置,2i和2i+1分别对应,在偶数位置使用sin,在基数位置使用cos。对于爱这个字所在的位置pos,如果爱这个字对应的词向量为512维,则对于这512位的向量偶数位用cos来计算,基数位置用cos来计算公式中的d_model它表示模型中每个输入和输出 token 的向量表示的维度大小。也就是说,模型中的每个单词都被表示为一个长度为 d_model 的向量,对于一下这个例子d_model的值为512,在我的代码中d_model的值为128。

由公式计算出位置编码之后,与原来的词向量进行相加得到最终整个transformer的输入。

1.1.2注意力机制

1⃣️基本的注意力机制下图中,对于“婴儿在干嘛”这句话应该关注图片中的哪个区域,通过公式计算得到这句话应该更关注图片中的哪个位置

注意力机制的公式,其中Q,K,V,三个都是矩阵

2⃣️在transformer中怎么操作x1与wq矩阵相乘的到q1矩阵,依次类推

计算attention值Score的值等于,你当前关注的词的q,分别乘以这个句子中所有的词的k,就会得到句子中每个词对当前的词的打分结果

1.1.3残差和LayNorm

1⃣️残差

可以有效缓解梯度消失

1.1.3前馈神经网络

1.2decoder

在多头注意力机制中多了一个mask机制,表示将当前单词和之后的单词做mask

为什么使用mask的原因:

在 Transformer 模型中,为了生成下一个目标单词,解码器需要访问已经生成的单词和源序列的信息。在训练时,我们可以将目标序列的所有单词都输入到解码器中,并在每个时间步生成一个单词。但是,在生成过程中,我们需要逐步地生成目标序列中的单词,而在每个时间步中,我们只能访问当前时刻之前生成的单词。这意味着,在生成第 i 个单词时,我们不能使用第 i+1 个单词的信息。因此为了解决这一问题才引入了mask机制k v矩阵是由encoder得到,而q矩阵是decoder中得到,encoder中得到的每个k v矩阵将会和decoder中的q矩阵进行交互

下图讲述了mask的过程,一行一行的看,当输入的s的时候就只能看到s看不到 后面的“卷”,“起”,“来”,因为此时这三个字的mask为1,当输入“卷”的时候就只能看到“S”和“卷”,看不到后面的两个字,这也是代码中上三角矩阵的作用

2.代码部分讲解

import torchimport numpy as npfrom config import ngpu, device# 计算角度:pos * 1/(10000^(2i/d))def get_angles(pos, i, d_model):    # 2*(i//2)保证了2i,这部分计算的是1/10000^(2i/d)    angle_rates = 1 / np.power(10000, 2 * (i // 2) / np.float32(d_model))  # => [1, 512]    return pos * angle_rates  # [50,1]*[1,512]=>[50, 512]# np.arange()函数返回一个有终点和起点的固定步长的排列,如[1,2,3,4,5],起点是1,终点是5,步长为1# 注意:起点终点是左开右闭区间,即start=1,end=6,才会产生[1,2,3,4,5]# 只有一个参数时,参数值为终点,起点取默认值0,步长取默认值1。def positional_encoding(position, d_model):  #d_model是位置编码的长度,相当于position encoding的embedding_dim?    angle_rads = get_angles(np.arange(position)[:, np.newaxis],  # [50, 1]                            np.arange(d_model)[np.newaxis, :],  # [1, d_model=512]                            d_model)    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])  #从0开始步长为2,2i    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])  #从1开始步长为2,2i+1    pos_encoding = angle_rads[np.newaxis, ...]  #[50, 512] => [1,50,512]    return torch.tensor(pos_encoding, dtype=torch.float32)pos_encoding = positional_encoding(50, 512)pad = 1  #重要!def create_padding_mask(seq):  # seq [b, seq_len]    # seq = torch.eq(seq, torch.tensor(0)).float() # pad=0的情况    seq = torch.eq(seq, torch.tensor(pad)).float()  # pad!=0    return seq[:, np.newaxis, np.newaxis, :]  # =>[b, 1, 1, seq_len]def create_look_ahead_mask(size):  # seq_len    mask = torch.triu(torch.ones((size, size)), diagonal=1)    # mask = mask.device() #    return mask  # [seq_len, seq_len]def scaled_dot_product_attention(q, k, v, mask=None):    """    #计算注意力权重。    q, k, v 必须具有匹配的前置维度。 且dq=dk    k, v 必须有匹配的倒数第二个维度,例如:seq_len_k = seq_len_v。    #虽然 mask 根据其类型(填充或前瞻)有不同的形状,    #但是 mask 必须能进行广播转换以便求和。    #参数:        q: 请求的形状 == (..., seq_len_q, depth)        k: 主键的形状 == (..., seq_len_k, depth)        v: 数值的形状 == (..., seq_len_v, depth_v)  seq_len_k = seq_len_v        mask: Float 张量,其形状能转换成              (..., seq_len_q, seq_len_k)。默认为None。    #返回值:        #输出,注意力权重    """    # matmul(a,b)矩阵乘:a b的最后2个维度要能做乘法,即a的最后一个维度值==b的倒数第2个纬度值,    # 除此之外,其他维度值必须相等或为1(为1时会广播)    matmul_qk = torch.matmul(q, k.transpose(-1, -2))  # 矩阵乘 =>[..., seq_len_q, seq_len_k]    # 缩放matmul_qk    dk = torch.tensor(k.shape[-1], dtype=torch.float32)  # k的深度dk,或叫做depth_k    scaled_attention_logits = matmul_qk / torch.sqrt(dk)  # [..., seq_len_q, seq_len_k]    # 将 mask 加入到缩放的张量上(重要!)    if mask is not None:  # mask: [b, 1, 1, seq_len]        # mask=1的位置是pad,乘以-1e9(-1*10^9)成为负无穷,经过softmax后会趋于0        scaled_attention_logits += (mask * -1e9)    # softmax 在最后一个轴(seq_len_k)上归一化    attention_weights = torch.nn.functional.softmax(scaled_attention_logits, dim=-1)  # [..., seq_len_q, seq_len_k]    output = torch.matmul(attention_weights, v)  # =>[..., seq_len_q, depth_v]    return output, attention_weights  # [..., seq_len_q, depth_v], [..., seq_len_q, seq_len_k]class MultiHeadAttention(torch.nn.Module):    def __init__(self, d_model, num_heads):        super(MultiHeadAttention, self).__init__()        self.num_heads = num_heads        self.d_model = d_model        assert d_model % self.num_heads == 0  #因为输入要被(平均?)split到不同的head        self.depth = d_model//self.num_heads  #512/8=64,所以在scaled dot-product atten中dq=dk=64,dv也是64        self.wq = torch.nn.Linear(d_model, d_model)        self.wk = torch.nn.Linear(d_model, d_model)        self.wv = torch.nn.Linear(d_model, d_model)        self.final_linear = torch.nn.Linear(d_model, d_model)    def split_heads(self, x, batch_size):  # x [b, seq_len, d_model]        x = x.view(batch_size, -1, self.num_heads,                   self.depth)  # [b, seq_len, d_model=512]=>[b, seq_len, num_head=8, depth=64]        return x.transpose(1, 2)  # [b, seq_len, num_head=8, depth=64]=>[b, num_head=8, seq_len, depth=64]    def forward(self, q, k, v, mask):  # q=k=v=x [b, seq_len, embedding_dim] embedding_dim其实也=d_model        batch_size = q.shape[0]        q = self.wq(q)  # => [b, seq_len, d_model]        k = self.wk(k)  # => [b, seq_len, d_model]        v = self.wv(v)  # => [b, seq_len, d_model]        q = self.split_heads(q, batch_size)  # => [b, num_head=8, seq_len, depth=64]        k = self.split_heads(k, batch_size)  # => [b, num_head=8, seq_len, depth=64]        v = self.split_heads(v, batch_size)  # => [b, num_head=8, seq_len, depth=64]        scaled_attention, attention_weights = scaled_dot_product_attention(q, k, v, mask)        # => [b, num_head=8, seq_len_q, depth=64], [b, num_head=8, seq_len_q, seq_len_k]        scaled_attention = scaled_attention.transpose(1, 2)  # =>[b, seq_len_q, num_head=8, depth=64]        # 转置操作让张量存储结构扭曲,直接使用view方法会失败,可以使用reshape方法        concat_attention = scaled_attention.reshape(batch_size, -1, self.d_model)  # =>[b, seq_len_q, d_model=512]        output = self.final_linear(concat_attention)  # =>[b, seq_len_q, d_model=512]        return output, attention_weights  # [b, seq_len_q, d_model=512], [b, num_head=8, seq_len_q, seq_len_k]# 点式前馈网络def point_wise_feed_forward_network(d_model, dff):    feed_forward_net = torch.nn.Sequential(        torch.nn.Linear(d_model, dff),  # [b, seq_len, d_model]=>[b, seq_len, dff=2048]        torch.nn.ReLU(),        torch.nn.Linear(dff, d_model),  # [b, seq_len, dff=2048]=>[b, seq_len, d_model=512]    )    return feed_forward_netclass EncoderLayer(torch.nn.Module):    def __init__(self, d_model, num_heads, dff, rate=0.1):        super(EncoderLayer, self).__init__()        self.mha = MultiHeadAttention(d_model, num_heads)  # 多头注意力(padding mask)(self-attention)        self.ffn = point_wise_feed_forward_network(d_model, dff)#前馈神经网络层        self.layernorm1 = torch.nn.LayerNorm(normalized_shape=d_model, eps=1e-6)        self.layernorm2 = torch.nn.LayerNorm(normalized_shape=d_model, eps=1e-6)        self.dropout1 = torch.nn.Dropout(rate)        self.dropout2 = torch.nn.Dropout(rate)    # x [b, inp_seq_len, embedding_dim] embedding_dim其实也=d_model    # mask [b,1,1,inp_seq_len]    def forward(self, x, mask):        attn_output, _ = self.mha(x, x, x, mask)  # =>[b, seq_len, d_model] self-attention        attn_output = self.dropout1(attn_output)        out1 = self.layernorm1(x + attn_output)  # 残差&层归一化 =>[b, seq_len, d_model]  残差网络思路        ffn_output = self.ffn(out1)  # =>[b, seq_len, d_model]        ffn_output = self.dropout2(ffn_output)        out2 = self.layernorm2(out1 + ffn_output)  # 残差&层归一化 =>[b, seq_len, d_model]        return out2  # [b, seq_len, d_model]class DecoderLayer(torch.nn.Module):    def __init__(self, d_model, num_heads, dff, rate=0.1):        super(DecoderLayer, self).__init__()        self.mha1 = MultiHeadAttention(d_model,                                       num_heads)  # masked的多头注意力(look ahead mask 和 padding mask)(self-attention)        self.mha2 = MultiHeadAttention(d_model, num_heads)  # 多头注意力(padding mask)(encoder-decoder attention)        self.ffn = point_wise_feed_forward_network(d_model, dff)        self.layernorm1 = torch.nn.LayerNorm(normalized_shape=d_model, eps=1e-6)        self.layernorm2 = torch.nn.LayerNorm(normalized_shape=d_model, eps=1e-6)        self.layernorm3 = torch.nn.LayerNorm(normalized_shape=d_model, eps=1e-6)        self.dropout1 = torch.nn.Dropout(rate)        self.dropout2 = torch.nn.Dropout(rate)        self.dropout3 = torch.nn.Dropout(rate)    # x [b, targ_seq_len, embedding_dim] embedding_dim其实也=d_model=512    # look_ahead_mask [b, 1, targ_seq_len, targ_seq_len] 这里传入的look_ahead_mask应该是已经结合了look_ahead_mask和padding mask的mask    # enc_output [b, inp_seq_len, d_model]    # padding_mask [b, 1, 1, inp_seq_len]    def forward(self, x, enc_output, look_ahead_mask, padding_mask):        attn1, attn_weights_block1 = self.mha1(x, x, x, look_ahead_mask)  # =>[b, targ_seq_len, d_model], [b, num_heads, targ_seq_len, targ_seq_len]  self-attention,训练阶段用于编码结果        attn1 = self.dropout1(attn1)        out1 = self.layernorm1(x + attn1)  # 残差&层归一化 [b, targ_seq_len, d_model]        # Q: receives the output from decoder"s first attention block,即 masked multi-head attention sublayer        # K V: V (value) and K (key) receive the encoder output as inputs        attn2, attn_weights_block2 = self.mha2(out1, enc_output, enc_output,                                               padding_mask)  # =>[b, targ_seq_len, d_model], [b, num_heads, targ_seq_len, inp_seq_len]        attn2 = self.dropout2(attn2)        out2 = self.layernorm2(out1 + attn2)  # 残差&层归一化 [b, targ_seq_len, d_model]        ffn_output = self.ffn(out2)  # =>[b, targ_seq_len, d_model]        ffn_output = self.dropout3(ffn_output)        out3 = self.layernorm3(out2 + ffn_output)  # 残差&层归一化 =>[b, targ_seq_len, d_model]        return out3, attn_weights_block1, attn_weights_block2        #[b, targ_seq_len, d_model], [b, num_heads, targ_seq_len, targ_seq_len], [b, num_heads, targ_seq_len, inp_seq_len]class Encoder(torch.nn.Module):    def __init__(self,                 num_layers,  # N个encoder layer                 d_model,  # 当前网络的维度                 num_heads,                 dff,  # 点式前馈网络内层fn的维度                 input_vocab_size,  # 输入词表大小(源语言(法语))                 maximun_position_encoding,                 rate=0.1):        super(Encoder, self).__init__()        self.num_layers = num_layers        self.d_model = d_model        self.embedding = torch.nn.Embedding(num_embeddings=input_vocab_size, embedding_dim=d_model)#可以对应看transformer的图,这个是编码器的词向量层        self.pos_encoding = positional_encoding(maximun_position_encoding,                                                d_model).to(device)  #在这个相当于关系图中编码的词嵌入层 =>[1, max_pos_encoding, d_model=512] # 相当于作者手动实现了pos_emb        # self.enc_layers = [EncoderLayer(d_model, num_heads, dff, rate).cuda() for _ in range(num_layers)] # 不行        #对num_layers个encoder进行堆叠        self.enc_layers = torch.nn.ModuleList([EncoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)])        self.dropout = torch.nn.Dropout(rate)    # x [b, inp_seq_len]    # mask [b, 1, 1, inp_sel_len]    def forward(self, x, mask):        inp_seq_len = x.shape[-1]        # adding embedding and position encoding        x = self.embedding(x)  # [b, inp_seq_len]=>[b, inp_seq_len, d_model]        # 缩放 embedding 原始论文的3.4节有提到: In the embedding layers, we multiply those weights by \sqrt{d_model}.        x *= torch.sqrt(torch.tensor(self.d_model, dtype=torch.float32))        pos_encoding = self.pos_encoding[:, :inp_seq_len, :]        #pos_encoding = pos_encoding.cuda()  #调用了显卡资源        x += pos_encoding  #将得到的位置编码于原来数据的输入进行相加得到最终的输入 [b, inp_seq_len, d_model]        x = self.dropout(x)        for i in range(self.num_layers):#将每一层decoder的输出当作下一层decoder的输入            x = self.enc_layers[i](x, mask)  # [b, inp_seq_len, d_model]=>[b, inp_seq_len, d_model]        return x  # [b, inp_seq_len, d_model]class Decoder(torch.nn.Module):    def __init__(self,                 num_layers,  # N个encoder layer                 d_model,                 num_heads,                 dff,  # 点式前馈网络内层fn的维度                 target_vocab_size,  # target词表大小(目标语言(英语))                 maximun_position_encoding,                 rate=0.1):        super(Decoder, self).__init__()        self.num_layers = num_layers        self.d_model = d_model        self.embedding = torch.nn.Embedding(num_embeddings=target_vocab_size, embedding_dim=d_model)        self.pos_encoding = positional_encoding(maximun_position_encoding,                                                d_model).to(device)  # =>[1, max_pos_encoding, d_model=512]        # self.dec_layers = [DecoderLayer(d_model, num_heads, dff, rate).cuda() for _ in range(num_layers)] # 不行        self.dec_layers = torch.nn.ModuleList([DecoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)])  # decoder比encoder多了一组attention层        self.dropout = torch.nn.Dropout(rate)    # x [b, targ_seq_len]    # look_ahead_mask [b, 1, targ_seq_len, targ_seq_len] 这里传入的look_ahead_mask应该是已经结合了look_ahead_mask和padding mask的mask    # enc_output [b, inp_seq_len, d_model]    # padding_mask [b, 1, 1, inp_seq_len]    def forward(self, x, enc_output, look_ahead_mask, padding_mask):        targ_seq_len = x.shape[-1]        attention_weights = {}        # adding embedding and position encoding        x = self.embedding(x)  # [b, targ_seq_len]=>[b, targ_seq_len, d_model]        # 缩放 embedding 原始论文的3.4节有提到: In the embedding layers, we multiply those weights by \sqrt{d_model}.        x *= torch.sqrt(torch.tensor(self.d_model, dtype=torch.float32))        # x += self.pos_encoding[:, :targ_seq_len, :]  # [b, targ_seq_len, d_model]        pos_encoding = self.pos_encoding[:, :targ_seq_len, :]  # [b, targ_seq_len, d_model]        #pos_encoding = pos_encoding.cuda() #调用显卡资源        x += pos_encoding  # [b, inp_seq_len, d_model]        x = self.dropout(x)        for i in range(self.num_layers):            x, attn_block1, attn_block2 = self.dec_layers[i](x, enc_output, look_ahead_mask, padding_mask)  # 因为是解码器,需要将编码器的输出结果enc_output传入            # => [b, targ_seq_len, d_model], [b, num_heads, targ_seq_len, targ_seq_len], [b, num_heads, targ_seq_len, inp_seq_len]            attention_weights[f"decoder_layer{i + 1}_block1"] = attn_block1            attention_weights[f"decoder_layer{i + 1}_block2"] = attn_block2        return x, attention_weightsclass Transformer(torch.nn.Module):    def __init__(self,                 num_layers,  # N个encoder layer                 d_model,                 num_heads,                 dff,  # 点式前馈网络内层fn的维度                 input_vocab_size,  # input此表大小(源语言(法语))                 target_vocab_size,  # target词表大小(目标语言(英语))                 pe_input,  # input max_pos_encoding                 pe_target,  # input max_pos_encoding                 rate=0.1):        super(Transformer, self).__init__()        self.encoder = Encoder(num_layers,# Encoder中的EncoderLayer层数                               d_model,#模型的维度大小,即每个词语的向量维度大小                               num_heads,#多头注意力机制中head的数量。                               dff,#前向网络中的隐层大小                               input_vocab_size,                               pe_input,                               rate)        self.decoder = Decoder(num_layers,                               d_model,                               num_heads,                               dff,                               target_vocab_size,                               pe_target,                               rate)        self.final_layer = torch.nn.Linear(d_model, target_vocab_size)    # inp [b, inp_seq_len]    # targ [b, targ_seq_len]    # enc_padding_mask [b, 1, 1, inp_seq_len]    # look_ahead_mask [b, 1, targ_seq_len, targ_seq_len]    # dec_padding_mask [b, 1, 1, inp_seq_len] # 注意这里的维度是inp_seq_len    def forward(self, inp, targ, enc_padding_mask, look_ahead_mask, dec_padding_mask):        enc_output = self.encoder(inp, enc_padding_mask)  # =>[b, inp_seq_len, d_model]        dec_output, attention_weights = self.decoder(targ, enc_output, look_ahead_mask, dec_padding_mask)        # => [b, targ_seq_len, d_model],        # {"..block1": [b, num_heads, targ_seq_len, targ_seq_len],        #  "..block2": [b, num_heads, targ_seq_len, inp_seq_len], ...}        final_output = self.final_layer(dec_output)  # =>[b, targ_seq_len, target_vocab_size]        return final_output, attention_weights

关键词: