✅ NLP 研 0 选手的学习笔记
文章目录
● 上一篇文章链接: NLP冻手之路(4)——pipeline管道函数的使用
● 本文的代码由小编认真改写和函数封装过,并附加必要注释,力求能够简洁明了,并测试过,均无误。
一、需要的环境
● python
需要 3.7+,pytorch
需要 1.10+
● 本文使用的库基于 Hugging Face Transformer,官网链接:https://huggingface.co/docs/transformers/index 【一个很不错的开源网站,针对于 transformer 框架做了很多大集成,目前 github 72.3k ⭐️】
● 安装 Hugging Face Transformer 的库只需要在终端输入 pip install transformers
【这是 pip 安装方法】;如果你用的是 conda
,则输入 conda install -c huggingface transformers
● 本文除了要安装上述配置,还要安装名为 datasets
的数据集处理包,只需要在终端输入 pip install datasets
【这是 pip 安装方法】;如果你用的是 conda
,则输入 conda install -c huggingface -c conda-forge datasets
二、模型搭建
2.1 项目环境
● 要用到的包如下:
import torch
import torch.utils.data as Data
from transformers import BertModel
from datasets import load_from_disk
from transformers import BertTokenizer
from transformers import AdamW
● 项目环境如下,sst_main.py
即是代码文件,my_model
是预训练模型,my_vocab
是词典文件,save_data
是数据集:
2.2 整体调用函数 main()
● 当我们运行整个程序时,将执行一遍 main()
。
● 补充说明:cache_dir='./my_model'
的意思是,我们会从 Hugging Face 官网下载 bert-base-chinese
模型到本地文件夹(名为 my_model
)中。其中 Model
、Dataset
是类,train
和 test
是函数,后面会讲。另外,load_from_disk()
函数是用来加载本地数据集的,数据集如何下载到本地请参考博客 NLP冻手之路(2)——文本数据集的下载与各种操作(Datasets)。
def main():
pretrained_model = BertModel.from_pretrained('bert-base-chinese', cache_dir='./my_model') # 加载预训练模型
model = Model(pretrained_model) # 构建自己的模型
# 如果有 gpu, 就用 gpu
if torch.cuda.is_available():
model.to(device)
train_data = load_from_disk('./save_data')['train'] # 加载训练数据
test_data = load_from_disk('./save_data')['test'] # 加载测试数据
optimizer = AdamW(model.parameters(), lr=5e-4) # 优化器
criterion = torch.nn.CrossEntropyLoss() # 损失函数
epochs = 2 # 训练次数
# 训练模型
for i in range(epochs):
print("--------------- >>>> epoch : {} <<<< -----------------".format(i))
train(model, train_data, criterion, optimizer)
test(model, test_data)
2.3 整体模型 class Model()
● 补充说明:我们对预训练模型 pretrained_model
不进行梯度更新,只利用它已经训练好的参数。注意,torch.nn.Linear(768, 2)
中的 768
是词嵌入的维度,2
是对情感做二分类,积极或消极。另外,self.fc(output[0][:, 0])
中的 [:, 0]
是指取一句话最开头的 [CLS]
处的 embedding
的特征,为什么要这么做,要追溯到 BERT 的原理。
# 定义下游任务模型
class Model(torch.nn.Module):
def __init__(self, pretrained_model):
super().__init__()
self.pretrain_model = pretrained_model
self.fc = torch.nn.Linear(768, 2)
def forward(self, input_ids, attention_mask, token_type_ids):
with torch.no_grad(): # 上游的模型不进行梯度更新
output = self.pretrain_model(input_ids=input_ids, # input_ids: 编码之后的数字(即token)
attention_mask=attention_mask, # attention_mask: 其中 pad 的位置是 0 , 其他位置是 1
# token_type_ids: 第一个句子和特殊符号的位置是 0 , 第二个句子的位置是 1
token_type_ids=token_type_ids)
output = self.fc(output[0][:, 0]) # 取出每个 batch 的第一列作为 CLS, 即 (16, 786)
output = output.softmax(dim=1) # 通过 softmax 函数, 并使其在 1 的维度上进行缩放,使元素位于[0,1] 范围内,总和为 1
return output
2.4 训练函数 train()
● 没当我们要训练一轮的时候,我们就调用一次 train()
。
● 补充说明:Data.DataLoader
中的 collate_fn
是一个 lambda 函数,作用是合并 samples 成一个列表以形成 mini-batch,当对 loader_train
使用批量加载时会被自动使用,关于这个 lambda 函数,后面会讲。另外,每对 enumerate(loader_train)
做一次提取,都会取一个 batch_size
的数据。
def train(model, dataset, criterion, optimizer):
loader_train = Data.DataLoader(dataset=dataset,
batch_size=32,
collate_fn=collate_fn,
shuffle=True, # 顺序打乱
drop_last=True) # 设置为'True'时,如果数据集大小不能被批处理大小整除,则删除最后一个不完整的批次
model.train()
total_acc_num = 0
train_num = 0
for i, (input_ids, attention_mask, token_type_ids, labels) in enumerate(loader_train):
output = model(input_ids=input_ids, # input_ids: 编码之后的数字(即token)
attention_mask=attention_mask, # attention_mask: 其中 pad 的位置是 0 , 其他位置是 1
token_type_ids=token_type_ids) # token_type_ids: 第一个句子和特殊符号的位置是 0 , 第二个句子
# 计算 loss, 反向传播, 梯度清零
loss = criterion(output, labels)
loss.backward()
optimizer.step()
optimizer.zero_grad()
# 算 acc
output = output.argmax(dim=1) # 取出所有在维度 1 上的最大值的下标
accuracy_num = (output == labels).sum().item()
total_acc_num += accuracy_num
train_num += loader_train.batch_size
if i % 50 == 0:
print("train_schedule: [{}/{}] train_loss: {} train_acc: {}".format(i, len(loader_train),
loss.item(), total_acc_num / train_num))
print("total train_acc: {}".format(total_acc_num / train_num))
2.5 测试函数 test()
● 没当我们要训练完一轮后,一般我们要测试一下 test()
。
● 补充说明:test()
和 train()
类似,只不过不需要反向传播、梯度更新。
def test(model, dataset):
correct_num = 0
test_num = 0
loader_test = Data.DataLoader(dataset=dataset,
batch_size=32,
collate_fn=collate_fn,
shuffle=True,
drop_last=True)
model.eval()
for t, (input_ids, attention_mask, token_type_ids, labels) in enumerate(loader_test):
with torch.no_grad():
output = model(input_ids=input_ids, # input_ids: 编码之后的数字(即token)
attention_mask=attention_mask, # attention_mask: 其中 pad 的位置是 0 , 其他位置是 1
token_type_ids=token_type_ids) # token_type_ids: 第一个句子和特殊符号的位置是 0 , 第二个句子
output = output.argmax(dim=1)
correct_num += (output == labels).sum().item()
test_num += loader_test.batch_size
if t % 10 == 0:
print("schedule: [{}/{}] acc: {}".format(t, len(loader_test), correct_num / test_num))
print("total test_acc: {}".format(correct_num / test_num))
2.6 打包函数 collate_fn()
● 这个函数是一个 lambda 函数,它将作为 形参 传入到 Data.DataLoader() 当中,作用是合并 samples 成一个列表以形成 mini-batch,当对 loader_train
使用批量加载时会被自动使用,说白了就是一个 “将批量数据进行打包的函数”。
● 补充说明:关于 BertTokenizer.from_pretrained()
和 batch_encode_plus()
的使用可以参考博客 NLP冻手之路(1)——中文/英文字典与分词操作(Tokenizer)
def collate_fn(data):
# 将数据中的文本和标签分别提取出来
sentences = [tuple_x['text'] for tuple_x in data]
labels = [tuple_x['label'] for tuple_x in data]
# 加载字典和分词工具
token = BertTokenizer.from_pretrained('bert-base-chinese', cache_dir='./my_vocab')
# 对数据进行编码
data = token.batch_encode_plus(batch_text_or_text_pairs=sentences,
truncation=True,
max_length=500,
padding='max_length',
return_tensors='pt',
return_length=True)
input_ids = data['input_ids'] # input_ids: 编码之后的数字(即token)
attention_mask = data['attention_mask'] # attention_mask: 其中 pad 的位置是 0 , 其他位置是 1
token_type_ids = data['token_type_ids'] # token_type_ids: 第一个句子和特殊符号的位置是 0 , 第二个句子的位置是 1
labels = torch.LongTensor(labels)
if torch.cuda.is_available(): # 如果有 gpu, 就用 gpu
input_ids = input_ids.to(device)
attention_mask = attention_mask.to(device)
token_type_ids = token_type_ids.to(device)
labels = labels.to(device)
return input_ids, attention_mask, token_type_ids, labels
三、完整代码
# 作者: CSDN@一支王同学, 参考: B站up主 蓝斯诺特
import torch
import torch.utils.data as Data
from transformers import BertModel
from datasets import load_from_disk
from transformers import BertTokenizer
from transformers import AdamW
def main():
pretrained_model = BertModel.from_pretrained('bert-base-chinese', cache_dir='./my_model') # 加载预训练模型
model = Model(pretrained_model) # 构建自己的模型
# 如果有 gpu, 就用 gpu
if torch.cuda.is_available():
model.to(device)
train_data = load_from_disk('./save_data')['train'] # 加载训练数据
test_data = load_from_disk('./save_data')['test'] # 加载测试数据
optimizer = AdamW(model.parameters(), lr=5e-4) # 优化器
criterion = torch.nn.CrossEntropyLoss() # 损失函数
epochs = 2 # 训练次数
# 训练模型
for i in range(epochs):
print("--------------- >>>> epoch : {} <<<< -----------------".format(i))
train(model, train_data, criterion, optimizer)
test(model, test_data)
# 定义下游任务模型
class Model(torch.nn.Module):
def __init__(self, pretrained_model):
super().__init__()
self.pretrain_model = pretrained_model
self.fc = torch.nn.Linear(768, 2)
def forward(self, input_ids, attention_mask, token_type_ids):
with torch.no_grad(): # 上游的模型不进行梯度更新
output = self.pretrain_model(input_ids=input_ids, # input_ids: 编码之后的数字(即token)
attention_mask=attention_mask, # attention_mask: 其中 pad 的位置是 0 , 其他位置是 1
# token_type_ids: 第一个句子和特殊符号的位置是 0 , 第二个句子的位置是 1
token_type_ids=token_type_ids)
output = self.fc(output[0][:, 0]) # 取出每个 batch 的第一列作为 CLS, 即 (16, 786)
output = output.softmax(dim=1) # 通过 softmax 函数, 并使其在 1 的维度上进行缩放,使元素位于[0,1] 范围内,总和为 1
return output
def train(model, dataset, criterion, optimizer):
loader_train = Data.DataLoader(dataset=dataset,
batch_size=32,
collate_fn=collate_fn,
shuffle=True, # 顺序打乱
drop_last=True) # 设置为'True'时,如果数据集大小不能被批处理大小整除,则删除最后一个不完整的批次
model.train()
total_acc_num = 0
train_num = 0
for i, (input_ids, attention_mask, token_type_ids, labels) in enumerate(loader_train):
output = model(input_ids=input_ids, # input_ids: 编码之后的数字(即token)
attention_mask=attention_mask, # attention_mask: 其中 pad 的位置是 0 , 其他位置是 1
token_type_ids=token_type_ids) # token_type_ids: 第一个句子和特殊符号的位置是 0 , 第二个句子
# 计算 loss, 反向传播, 梯度清零
loss = criterion(output, labels)
loss.backward()
optimizer.step()
optimizer.zero_grad()
# 算 acc
output = output.argmax(dim=1) # 取出所有在维度 1 上的最大值的下标
accuracy_num = (output == labels).sum().item()
total_acc_num += accuracy_num
train_num += loader_train.batch_size
if i % 50 == 0:
print("train_schedule: [{}/{}] train_loss: {} train_acc: {}".format(i, len(loader_train),
loss.item(), total_acc_num / train_num))
print("total train_acc: {}".format(total_acc_num / train_num))
def test(model, dataset):
correct_num = 0
test_num = 0
loader_test = Data.DataLoader(dataset=dataset,
batch_size=32,
collate_fn=collate_fn,
shuffle=True,
drop_last=True)
model.eval()
for t, (input_ids, attention_mask, token_type_ids, labels) in enumerate(loader_test):
with torch.no_grad():
output = model(input_ids=input_ids, # input_ids: 编码之后的数字(即token)
attention_mask=attention_mask, # attention_mask: 其中 pad 的位置是 0 , 其他位置是 1
token_type_ids=token_type_ids) # token_type_ids: 第一个句子和特殊符号的位置是 0 , 第二个句子
output = output.argmax(dim=1)
correct_num += (output == labels).sum().item()
test_num += loader_test.batch_size
if t % 10 == 0:
print("schedule: [{}/{}] acc: {}".format(t, len(loader_test), correct_num / test_num))
print("total test_acc: {}".format(correct_num / test_num))
def collate_fn(data):
# 将数据中的文本和标签分别提取出来
sentences = [tuple_x['text'] for tuple_x in data]
labels = [tuple_x['label'] for tuple_x in data]
# 加载字典和分词工具
token = BertTokenizer.from_pretrained('bert-base-chinese', cache_dir='./my_vocab')
# 对数据进行编码
data = token.batch_encode_plus(batch_text_or_text_pairs=sentences,
truncation=True,
max_length=500,
padding='max_length',
return_tensors='pt',
return_length=True)
input_ids = data['input_ids'] # input_ids: 编码之后的数字(即token)
attention_mask = data['attention_mask'] # attention_mask: 其中 pad 的位置是 0 , 其他位置是 1
token_type_ids = data['token_type_ids'] # token_type_ids: 第一个句子和特殊符号的位置是 0 , 第二个句子的位置是 1
labels = torch.LongTensor(labels)
if torch.cuda.is_available(): # 如果有 gpu, 就用 gpu
input_ids = input_ids.to(device)
attention_mask = attention_mask.to(device)
token_type_ids = token_type_ids.to(device)
labels = labels.to(device)
return input_ids, attention_mask, token_type_ids, labels
if __name__ == '__main__':
device = 'cuda' if torch.cuda.is_available() else 'cpu' # 全局变量
print('所用的设备为(cuda即为gpu): ', device)
main()
四、运行结果
● 可以看到,随着训练时间的增加,差不多一两轮就收敛了,因为我们只有一个 fc层
在训练,所以很快。
五、小结
● 通过本小节的学习,以及代码实践,即可算是 NLP 中文文本处理小小地入门了吧。
● Hugging Face 的很多组件都封装得很好,有什么不懂的,可以查看它的 doc 手册:Hugging Face Documentations
六、补充说明
● 上一篇文章链接: NLP冻手之路(4)——pipeline管道函数的使用
● 若有写得不对的地方,或有疑问,欢迎评论交流。
● 参考视频:HuggingFace简明教程,BERT中文模型实战示例.NLP预训练模型,Transformers类库,datasets类库快速入门.
⭐️ ⭐️