transformer代码
哔哩哔哩很多课程,但是资源很难领取,代码和PPT不好找到
学习的过程中对照网课视频敲了代码,分享给大家使用
只包含代码主体,测试部分放到下方
顺便请教一个问题:视频中 mask = Variable(torch.zeros(8,4,4))。输出是(2,4,512)
我这边的代码会报错。
mask = Variable(torch.zeros(2,4,4))的时候是没问题的,当然此时的输入是
x1 = Variable(torch.LongTensor([[100,2,658,50],[15,898,52,145]]))
这个错误还没明白。明白后会更新的。
模型测试环节,优化部分的代码,放在最下方。日后更新。
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
# from torch import nn.Batch
import math
import copy
from torch.autograd import Variable
import matplotlib.pyplot as plt
# 构建embeddings类来实现文本嵌入层
class Embeddings(nn.Module):
def __init__(self, d_model, vocab):
"""
类的初始化函数
:param d_model: 词嵌入的维度
:param vocab:词表的大小
"""
super(Embeddings, self).__init__()
# 调用预定义层Embedding,获得一个词嵌入对象self.lut
self.lut = nn.Embedding(vocab, d_model)
# 最后是将d_model传入类中
self.d_model = d_model
def forward(self, x):
"""
理解为该层的前向传播逻辑
传给该类的实例化对象参数时,自动调用该函数
:param x: 因为Embedding层是首层,所以代表输入给模型的文本通过词汇映射后的向量
:return:
"""
# 缩放的作用
return self.lut(x) * math.sqrt(self.d_model)
class PositionalEncoding(nn.Module):
def __init__(self, d_model, dropout, max_len = 5000):
"""
位置编码器的初始化函数
:param d_model: 词嵌入维度
:param dropout: 置0比率
:param max_len: 每个句子的最大长度
"""
super(PositionalEncoding, self).__init__()
# 实例化nn中预定义的Dropout层,并传入,获得对象self.dropout
self.dropout = nn.Dropout(p=dropout)
# 初始化一个位置编码矩阵,大小是 max_len * d_model
pe = torch.zeros(max_len, d_model)
# 初始化一个绝对位置矩阵,词汇的绝对位置就是用它的索引来表示
# 使用arange获得连续的自然数向量,使用unsqueeze方法拓展向量维度
# 因为参数传的是1,代表矩阵拓展的位置,会是向量变成一个 max_len *1 的矩阵
position = torch.arange(0, max_len).unsqueeze(1)
"""
绝对位置矩阵初始化之后,接下来将位置信息加入到位置编码矩阵中
思路:将 max_len *1 的绝对位置矩阵,变成 max_len * d_model 形状,然后覆盖原来的初始矩阵
需要 1 * d_model 形状的变换矩阵div_term
将自然数的绝对位置编码缩放为足够小的数字,有助于之后的梯度下降过程中更快的收敛
首先使用arange获得一个自然数矩阵
"""
div_term = torch.exp(torch.arange(0, d_model, 2) *
-(math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term) #偶数列用sin赋值
pe[:, 1::2] = torch.cos(position * div_term) #奇数列用cos赋值
# 得到了位置编码矩阵pe,pe还是二维的。需要拓展维度才能和embedding的输出一样
pe = pe.unsqueeze(0)
# pe位置编码矩阵注册成模型的buffer
self.register_buffer('pe',pe)
def forward(self, x):
"""
:param x: 表示文本序列的词嵌入表示
:return:
进行pe与输入张量的适配
"""
x = x + Variable(self.pe[:, :x.size(1)],
requires_grad=False)
return self.dropout(x)
# 生成掩码张量
def subsequent_mask(size):
"""
生成向后遮掩的掩码张量
:param size: 最后两个维度的大小,最后两维形成一个仿真
:return:
"""
# 函数中,首先定义掩码张量的形状
attn_shape = (1,size,size)
# 往这个形状中添加1元素,形成上三角阵,最后为了节约空间,修改数据类型
subsequent_mask = np.triu(np.ones(attn_shape), k=1).astype('uint8')
# 转换为torch中的tensor,内部做1-操作。做三角阵的反转
return torch.from_numpy(1 - subsequent_mask)
def attention(query, key, value, mask = None, dropout = None):
"""
注意力机制的实现
:param query:
:param key:
:param value:
:param mask:
:param dropout:
:return:
"""
# 取query最后一维的大小,一般情况就等于我们的词嵌入维度
d_k = query.size(-1)
# 注意力公式, q和转置的k相乘
scores = torch.matmul(query, key.transpose(-2,-1)) / math.sqrt(d_k)
# 判断是否使用掩码张量
if mask is not None:
# 使用masked_fill方法,掩码张量和scores张量每个位置一一比较,如果为0,用小数值-1e9替换
scores = scores.masked_fill(mask == 0, -1e9)
# 对scores的最后一维进行softmax操作,得到最后的注意力张量
p_attn = F.softmax(scores, dim = -1)
if dropout is not None:
# 将p_attn传入dropout对象中进行丢弃处理
p_attn = dropout(p_attn)
# 最后根据公式进行相乘
return torch.matmul(p_attn, value), p_attn
def clones(module, N):
"""
生成相同网络层的克隆函数
:param module: 要克隆的目标网络层
:param N: 克隆n份
:return:
"""
return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])
# 实现多头注意力
class MutiHeadedAttention(nn.Module):
def __init__(self, head, embedding_dim, dropout = 0.1):
"""
:param head: 头数
:param embedding_dim:词嵌入维度
:param dropout:置0比率
"""
super(MutiHeadedAttention, self).__init__()
# 测试embedding_dim/head是否能整除
assert embedding_dim % head == 0
# 得到每个头获得的分割词向量维度d_k
self.d_k = embedding_dim // head
# 传入头数
self.head = head
# 获得线性层对象,实例化。 qkv 3个 +最后拼接的矩阵 一共4个
self.linears = clones(nn.Linear(embedding_dim, embedding_dim), 4)
# 代表最后得到的注意力张量
self.attn = None
self.dropout = nn.Dropout(p=dropout)
def forward(self, query, key, value, mask = None):
"""
前向逻辑函数
:param query:
:param key:
:param value:
:param mask: 可能需要的掩码张量,默认是None
:return:
"""
if mask is not None:
mask = mask.unsqueeze(1)
# 如果有掩码张 量,就扩展维度
batch_size = query.size(0)
# 是query尺寸的第一个数字,代表有多少条样本
# 进入多头处理环节
# zip组合QKV三个线性层,-1代表自适应维度
query, key, value = \
[model(x).view(batch_size, -1 ,self.head, self.d_k).transpose(1,2)
for model,x in zip (self.linears, (query,key,value))]
x, self.attn = attention(query,key,value,mask =mask, dropout=self.dropout)
# 能让转置后的张量应用view方法,否则无法使用
x = x.transpose(1,2).contiguous().view(batch_size, -1, self.head * self.d_k)
# 现在x是三维张 量了
# 最后使用最后一个线性层对输入进行线性变换得到的最终的多头注意力结构的输出
return self.linears[-1](x)
# 前馈全连接层,注意力机制可能拟合不够,增加模型的能力
class PositionwiseFeedForward(nn.Module):
def __init__(self, d_model, d_ff, dropout = 0.1):
"""
初始化函数有三个参数
:param d_model:词嵌入,线性层的输入维度
:param d_ff:希望通过前馈全连接层后的输入和输出维度不变,第二个线性层的输入维度和第一个线性层的输出维度
:param dropout:
"""
super(PositionwiseFeedForward, self).__init__()
# 实例化两个线性层对象
self.w1 = nn.Linear(d_model, d_ff)
self.w2 = nn.Linear(d_ff, d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
"""
输入参数为x,代表来自上一层的输出
:param x:
:return:
首先经过一个线性层,使用relu函数进行激活
然后用dropout进行置0,然后通过第二个线性层w2,返回结果
"""
return self.w2(self.dropout(F.relu(self.w1(x))))
class LayerNorm(nn.Module):
"""
随着网络层数的增加,通过多层的计算参数后可能出现过大或过小的情况,导致学习异常
收敛很慢,需要连接规范化层进行数值的规范化,使其特征数值再合理范围内。
"""
def __init__(self, features, eps=1e-6):
"""
:param features: 词嵌入的维度
:param eps: 足够小的数,出现在分母中
"""
super(LayerNorm, self).__init__()
"""
根据features的形状初始化两个参数张量a2,b2
第一个初始化张量都是1,第二个都是0
既满足规范化的要求,又能不改变针对目标的表征,nn.parameter进行封装
"""
self.a2 = nn.Parameter(torch.ones(features))
self.b2 = nn.Parameter(torch.zeros(features))
self.eps = eps
def forward(self, x):
# 输入参数x代表上一层的输出
# 对x求其最后一个维度的均值,并保持输出维度与输入维度一致
# 求最后一个维度的标准差,规范化公式。最后对结果乘缩放参数,点乘加上位移参数b2
mean = x.mean(-1, keepdim=True)
std = x.std(-1, keepdim=True)
return self.a2 * (x - mean) / (std + self.eps) + self.b2
# 子层连接,残差连接
# 两个子层加上周围的连接结构就形成了两个子层结构
class SublayerConnection(nn.Module):
def __init__(self, size, dropout=0.1):
"""
实现子层连接
:param size:词嵌入维度的大小
:param dropout:对模型结构中的节点数进行随机抑制的比率
"""
super(SublayerConnection, self).__init__()
self.norm = LayerNorm(size)
self.dropout = nn.Dropout(p=dropout)
def forward(self, x, sublayer):
"""
前向逻辑函数中,接收上一个层或者子层的输入作为第一个参数
将该层连接中的子层函数作为第二个参数
规范化后传给子层处理,再dropout,随机停止一些神经元的作用防止过拟合
因为存在跳跃连接,所以是将输入x与dropout后的子层输出结果相加作为最终的子层连接输出
:param x:
:param sublayer:
:return:
"""
return x + self.dropout(sublayer(self.norm(x)))
# 编码器层
# 每个编码器层完成一次对输入的特征提取过程,即编码过程
class EncoderLayer(nn.Module):
def __init__(self, size, self_attn, feed_forward, dropout):
"""
:param size:词嵌入维度的大小
:param self_attn:自注意力机制
:param feed_forward:前馈全连接层
:param dropout:置零比率
"""
super(EncoderLayer, self).__init__()
self.self_attn = self_attn
self.feed_forward = feed_forward
self.sublayer = clones(SublayerConnection(size, dropout),2)
self.size = size
def forward(self, x, mask):
"""
:param x:
:param mask:
:return:
"""
x = self.sublayer[0](x , lambda x: self.self_attn(x, x, x, mask))
return self.sublayer[1](x, self.feed_forward)
# 编码器,N个编码器层堆叠而成
class Encoder(nn.Module):
def __init__(self, layer, N):
"""
:param layer:编码器层
:param N: 编码器层的个数
"""
super(Encoder, self).__init__()
# 克隆多个
self.layers = clones(layer, N)
# 初始化一个规范化层,用在编码器最后面
self.norm = LayerNorm(layer.size)
def forward(self, x, mask):
"""
forward的输入和编码器层相同
对克隆的编码器进行循环,每次会得到一个新的x
这个循环的过程就相当于输出的X经过了N个编码器层的处理,最后通过规范化层处理,得到返回结果
:param self:
:param x: 上一层的输出
:param mask: 掩码张量
:return:
"""
for layer in self.layers:
x = layer(x, mask)
return self.norm(x)
"""
解码器由三个子层连接结构组成
第一个子层连接结构包括一个多头自注意力子层、规范化层和一个残差连接
第二个子层连接结构包括一个多头注意力子层和规范化层以及一个残差连接
第三个子层连接结构包括一个前馈全连接层和规范化层以及一个残差连接
"""
class DecoderLayer(nn.Module):
# 每个解码器层根据给定的输入向目标方向进行特征提取操作,即解码过程
def __init__(self, size, self_attn, src_attn, feed_forward, dropout):
"""
:param size: 词嵌入维度的大小,代表解码器层的尺寸
:param self_attn: 多头自注意力对象, q k v 相等
:param src_attn: q 不等于 k v , q!=k=v
:param feed_forward:
:param dropout:
"""
super(DecoderLayer, self).__init__()
self.size = size
self.self_attn = self_attn
self.src_attn = src_attn
self.feed_forward = feed_forward
self.sublayer = clones(SublayerConnection(size, dropout), 3)
def forward(self, x, memory, source_mask, target_mask):
"""
:param x: 上一层的输入x
:param memory: 编码器的语义存储变量memory,
:param source_mask:源数据掩码张量,
:param target_mask:目标数据掩码张量
:return:
"""
m = memory
"""
x传入第一个子层结构,对目标数据进行遮掩
"""
x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, target_mask))
# 接着进入第二个子层,遮蔽掉对结果没有意义的字符而产生的注意力值,以提升模型效果和训练速度
x = self.sublayer[1](x, lambda x: self.src_attn(x, m, m, source_mask))
# 最后一个子层是前馈全连接层,处理后返回结果
return self.sublayer[2](x, self.feed_forward)
# 使用类Decoder来实现解码器
class Decoder(nn.Module):
def __init__(self, layer, N):
"""
:param layer: 解码器层
:param N: 解码器层的个数 N
"""
super(Decoder, self).__init__()
# 克隆N个layer,实例化一个规范化层
self.layers = clones(layer, N)
self.norm = LayerNorm(layer.size)
def forward(self, x, memory, source_mask, target_mask):
for layer in self.layers:
x = layer(x, memory, source_mask, target_mask)
return self.norm(x)
# 线性层,通过对上一步的线性变化得到指定维度的输出,也就是转换维度的作用
# softmax层是使最后一维的向量中的数字缩放到0-1的概率值之内,并满足他们的和为1
# 线性层+softmax层
class Generator(nn.Module):
def __init__(self, d_model, vocad_size):
"""
:param d_model:嵌入维度
:param vocad_size: 词表大小
"""
super(Generator, self).__init__()
# 实例化,得到一个self.project等待使用
self.project = nn.Linear(d_model, vocad_size)
def forward(self, x):
# 前向逻辑函数中输入是上一层的输出张量x
return F.log_softmax(self.project(x), dim=-1)
# 模型构建,编码器-解码器的实现过程。
# 掌握transformer模型的构建过程
class EncoderDecoder(nn.Module):
def __init__(self, encoder, decoder, source_embed, target_embed, generator):
"""
:param encoder:编码器对象
:param decoder: 解码器对象
:param source_embed: 源数据嵌入函数
:param target_embed: 目标数据嵌入函数
:param generator: 输出部分的类别生成器对象
"""
super(EncoderDecoder, self).__init__()
self.encoder = encoder
self.decoder = decoder
self.src_embed = source_embed
self.tgt_embed = target_embed
self.generator = generator
def forward(self, source, target, source_mask, target_mask):
"""
将source,source_mask传入编码函数,得到结果后
于source_mask, target, 和target_mask一同传给解码函数
:param source:源数据
:param target:目标数据
:param source_mask:
:param target_mask:
:return:
"""
return self.decode(self.encode(source, source_mask), source_mask, target, target_mask)
def encode(self, sourec, source_mask):
"""
使用src_embed对source做处理,然后和source_mask一起传给self.encoder
:param sourec:
:param source_mask:
:return:
"""
return self.encoder(self.src_embed(sourec), source_mask)
def decode(self, memory, source_mask, target, target_mask):
"""
解码函数,
使用tgt_embed 对 target做处理,然后一起传给self.decoder
:param memory:编码器的输出
:param source_mask:
:param target:
:param target_mask:
:return:
"""
return self.decoder(self.tgt_embed(target), memory, source_mask, target_mask)
def make_model(source_vocab, target_vocab, N=6,
d_model=512, d_ff=2048, head=8, dropout=0.1):
"""
:param source_vocab: 源数据特征词汇总数
:param target_vocab: 目标数据特征词汇总数
:param N: 编码器和解码器堆叠数
:param d_model: 词向量映射维度
:param d_ff: 前馈全连接网络中变换矩阵的维度
:param head:
:param dropout:
:return:
"""
c = copy.deepcopy
# 实例化多头注意力类
attn = MutiHeadedAttention(head, d_model)
# 实例化前馈全连接类
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
# 实例化位置编码类
position = PositionalEncoding(d_model, dropout)
"""
根据结构图,最外层是EncoderDecoder,在EncederDecoder中
分别是编码器层、解码器层、源数据embedding层和位置编码组成的有序结构
目标数据embedding层和位置编码层组成的有序结构,以及类别生成器层
在编码器层中有attention子层以及前馈全连接层
在解码器层中有两个attention子层以及前馈全连接层
"""
model = EncoderDecoder(
Encoder(EncoderLayer(d_model, c(attn), c(ff), dropout), N),
Decoder(DecoderLayer(d_model, c(attn), c(attn), c(ff), dropout), N),
nn.Sequential(Embeddings(d_model, source_vocab), c(position)),
nn.Sequential(Embeddings(d_model, source_vocab), c(position)),
Generator(d_model, target_vocab)
)
# 模型结构完成后,接下来就是初始化模型中的参数,比如线性层中的变换矩阵
# 判断参数的维度大于1,则会将其初始化成一个服从均匀分布的矩阵
for p in model.parameters():
if p.dim() > 1:
nn.init.xavier_uniform_(p)
return model
if __name__ == '__main__':
res = make_model(source_vocab, target_vocab, N)
print(res)
代码测试部分在这里
head = 8
embedding_dim = 512
dropout = 0.2
query = key = value = pe_result
mask = Variable(torch.zeros(2,4,4))
mha = MutiHeadedAttention(head, embedding_dim, dropout)
mha_result = mha(query, key, value, mask)
size = 512
x = mha_result
d_ff = 64
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
ff_result = ff(x)
features = d_model = 512
eps = 1e-6
x = ff_result
ln = LayNorm(features, eps)
ln_result = ln(x)
print(ln_result)
sublayer = lambda x: self_attn(x,x,x,mask)
sc = SublayerConnection(size, dropout)
sc_result = sc(x, sublayer)
print(sc_result)
print(sc_result.shape)
el = EncoderLayer(size, self_attn, ff, dropout)
el_result = el(x, mask)
print(el_result)
print(el_result.shape)
d_model = 512
vocab = 1000
dropout = 0.1
max_len = 60
x1 = Variable(torch.LongTensor([[100,2,658,50],[15,898,52,145]]))
emb = Embeddings(d_model, vocab)
emr = emb(x1)
x =emr
pe = PositionalEncoding(d_model, dropout, max_len)
pe_result = pe(x)
print(pe_result)
size = d_model = 512
head = 8
d_ff = 64
x = pe_result
dropout = 0.2
attn = MutiHeadedAttention(head, d_model)
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
mask = Variable(torch.zeros(2,4,4))
N = 8
c = copy.deepcopy
layer = EncoderLayer(size, c(attn), c(ff), dropout)
en = Encoder(layer, N)
en_result = en(x, mask)
print(en_result)
print(en_result.shape)
self_attn = src_attn = MutiHeadedAttention(head, d_model , dropout)
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
layer = DecoderLayer(d_model, c(attn), c(attn), c(ff), dropout)
memory = en_result
source_mask = target_mask = mask
dl = DecoderLayer(size, self_attn, src_attn, ff, dropout)
dl_result = dl(x, memory, source_mask, target_mask)
print(dl_result)
de = Decoder(layer , N)
de_result = de(x, memory, source_mask, target_mask)
print(de_result)
vocab_size = 1000
x = de_result
gen = Generator(d_model, vocab_size)
gen_result = gen(x)
print(gen_result.shape)
source_embed = nn.Embedding(vocab_size, d_model)
target_embed = nn.Embedding(vocab_size, d_model)
generator = gen
encoder = en
decoder = de
source = target = x1
source_mask = target_mask = mask
ed = EncoderDecoder(encoder, decoder, source_embed, target_embed, generator)
ed_result = ed(source, target, source_mask, target_mask)
print(ed_result)
print(ed_result.shape)
# print("----------------------------------------")
# print(x1.shape)
# print(mask.shape)
source_vocab = 11
target_vocab = 11
N = 6
if __name__ == '__main__':
res = make_model(source_vocab, target_vocab, N)
print(res)
模型测试环节,优化部分的代码
from pyitcast.transformer_utils import get_std_opt
# 导入优化器工具包,用于获得标准的针对transformer模型的优化器,
# 该标准优化器基于Adam优化器,使其对序列到序列的任务更有效
from pyitcast.transformer_utils import SimpleLossCompute
# 损失计算工具包,使用标签平滑后的结果进行损失的计算,方法可以认为是交叉熵损失函数
from pyitcast.transformer_utils import LabelSmoothing
# 导入标签平滑工具包,该工具用于标签平滑,小幅度改变原有标签值的值域
# 弥补标签受外界因素导致的偏差,减少模型对某一条规律的绝对认知,防止过拟合
from pyitcast.transformer_utils import run_epoch
from pyitcast.transformer_utils import greedy_decode
from pyitcast.transformer_utils import Batch
def data_generator(V, batch_size, num_batch):
"""
:param V:随机生成数据的最大值+1
:param batch_size: 每次输送给模型的样本数量,经历这些样本训练后进行一次参数的更新
:param num_batch:一共输送模型多少轮数据
:return:
"""
for i in range(num_batch):
# 随机生成V,分布的形状(batch,0)
data = torch.from_numpy(np.random.randint(1, V, size=(batch_size, 10)))
# 将数据的第一列全部设置为1,作为起始标志
data[:, 0] = 1
# 样本的参数不需要参与梯度的计算
source = Variable(data, requires_grad=False)
target = Variable(data, requires_grad=False)
yield Batch(source, target)
V = 11
batch_size = 20
num_batch = 30
# 获得model
model = make_model(V, V, N=2)
# 获得模型的优化器
model_optimizer = get_std_opt(model)
# 标签平滑对象
criterion = LabelSmoothing(size=V, padding_idx=0, smoothing=0.0)
# 标签平滑结果的损失计算方法
loss = SimpleLossCompute(model.generator, criterion, model_optimizer)
crit = LabelSmoothing(size=5, padding_idx=0, smoothing=0.5)
# 实例化一个crit对象,size代表目标数据的词汇总数,也是模型最后一层得到张量的最后一维大小
# 5说明词汇总数有5个,第二个参数padding_idx表示要将那些tensor中的数字替换成0
# padding_idx = 0表示不进行替换,smooth表示标签的平滑程度
# 假定一个任意的模型最后输出预测结果和真实结果
predict = Variable(torch.FloatTensor([
[0.3, 0.5, 0.9, 0, 0.1],
[0.3, 0.5, 0.9, 0, 0.1],
[0.3, 0.5, 0.9, 0, 0.1]]))
# 标签的表示值是0, 1, 2
target = Variable(torch.LongTensor([2, 1, 0]))
# 传入对象中
crit(predict, target)
# 绘制标签平滑图像
plt.imshow(crit.true_dist)
日后更新~