• 3.模型架构
    本文使用hfl/rbt3模型[2],参数量约3800万。基本思路为使用一个预训练模型从文本中抽取数据特征,再对每个字的数据特征做分类任务,最终得到和原文一一对应的标签序列(BIO)。

    二.准备数据集
    1.使用编码工具
    使用hfl/rbt3编码器编码工具如下所示:

    def load_encode_tool(pretrained_model_name_or_path):
        """
        加载编码工具
        "
    ""
        tokenizer = AutoTokenizer.from_pretrained(Path(f'{pretrained_model_name_or_path}'))
        return tokenizer
    if __name__ == '__main__':
        # 测试编码工具
        pretrained_model_name_or_path = r'L:/20230713_HuggingFaceModel/rbt3'
        tokenizer = load_encode_tool(pretrained_model_name_or_path)
        print(tokenizer)
        # 测试编码句子
        out = tokenizer.batch_encode_plus(
            batch_text_or_text_pairs=[
                [ '海''钓''比''赛''地''点''在''厦''门''与''金''门''之''间''的''海''域''。'],
                [ '这''座''依''山''傍''水''的''博''物''馆''由''国''内''′''一''流''的''设''计''师''主''持''设''计''。']],
            truncation=True, # 截断
            padding='max_length'# [PAD]
            max_length=20, # 最大长度
            return_tensors='pt'# 返回pytorch张量
            is_split_into_words=True # 按词切分
        )
        # 查看编码输出
        for k, v in out.items():
            print(k, v.shape)
        # 将编码还原为句子
        print(tokenizer.decode(out['input_ids'][0]))
        print(tokenizer.decode(out['input_ids'][1]))

    输出结果如下所示:

    BertTokenizerFast(name_or_path='L:\20230713_HuggingFaceModel\rbt3', vocab_size=21128, model_max_length=1000000000000000019884624838656, is_fast=True, padding_side='right', truncation_side='right', special_tokens={'unk_token''[UNK]''sep_token''[SEP]''pad_token''[PAD]''cls_token''[CLS]''mask_token''[MASK]'}, clean_up_tokenization_spaces=True)
    input_ids torch.Size([2, 20])
    token_type_ids torch.Size([2, 20])
    attention_mask torch.Size([2, 20])
    [CLS] 海 钓 比 赛 地 点 在 厦 门 与 金 门 之 间 的 海 域 。 [SEP]
    [CLS] 这 座 依 山 傍 水 的 博 物 馆 由 国 内 一 流 的 设 计 [SEP]

    需要说明参数is_split_into_words=True让编码器跳过分词步骤,即告诉编码器输入句子是分好词的,不用再进行分词。

    2.定义数据集
    定义数据集代码如下所示:

    class Dataset(torch.utils.data.Dataset):
        def __init__(self, split):
            # 在线加载数据集
            # dataset = load_dataset(path='people_daily_ner', split=split)
            # dataset.save_to_disk(dataset_dict_path='L:/20230713_HuggingFaceModel/peoples_daily_ner')
            # 离线加载数据集
            dataset = load_from_disk(dataset_path='L:/20230713_HuggingFaceModel/peoples_daily_ner')[split]
            # print(dataset.features['ner_tags'].feature.num_classes) #7
            # print(dataset.features['ner_tags'].feature.names) # ['O','B-PER','I-PER','B-ORG','I-ORG','B-LOC','I-LOC']
            self.dataset = dataset

        def __len__(self):
            return len(self.dataset)

        def __getitem__(self, i):
            tokens = self.dataset[i]['tokens']
            labels = self.dataset[i]['ner_tags']
            return tokens, labels
    if __name__ == '__main__':
        # 测试编码工具
        pretrained_model_name_or_path = r'L:/20230713_HuggingFaceModel/rbt3'
        tokenizer = load_encode_tool(pretrained_model_name_or_path)
        # 加载数据集
        dataset = Dataset('train')
        tokens, labels = dataset[0]
        print(tokens, labels, dataset)
        print(len(dataset))

    输出结果如下所示:

    ['海''钓''比''赛''地''点''在''厦''门''与''金''门''之''间''的''海''域''。'] [0, 0, 0, 0, 0, 0, 0, 5, 6, 0, 5, 6, 0, 0, 0, 0, 0, 0] <__main__.Dataset object at 0x0000027B01DC3940>
    20865

    其中,20865表示训练数据集的大小。在people_daily_ner数据集中,每条数据包括两个字段,即tokens和ner_tags,分别代表句子和标签,在__getitem__()函数中把这两个字段取出并返回即可。

    3.定义计算设备

    device = 'cpu'
    if torch.cuda.is_available():
        device = 'cuda'
    print(device)

    4.定义数据整理函数

    def collate_fn(data):
        tokens = [i[0] for i in data]
        labels = [i[1] for i in data]
        inputs = tokenizer.batch_encode_plus(tokens, # 文本列表
                                       truncation=True, # 截断
                                       padding=True, # [PAD]
                                       max_length=512, # 最大长度
                                       return_tensors='pt'# 返回pytorch张量
                                       is_split_into_words=True) # 分词完成,无需再次分词
        # 求一批数据中最长的句子长度
        lens = inputs['input_ids'].shape[1]
        # 在labels的头尾补充7,把所有的labels补充成统一的长度
        for i in range(len(labels)):
            labels[i] = [7] + labels[i]
            labels[i] += [7] * lens
            labels[i] = labels[i][:lens]
        # 把编码结果移动到计算设备上
        for k, v in inputs.items():
            inputs[k] = v.to(device)
        # 把统一长度的labels组装成矩阵,移动到计算设备上
        labels = torch.tensor(labels).to(device)
        return inputs, labels

    形参data表示一批数据,主要是对句子和标签进行编码,这里会涉及到一个填充的问题。标签的开头和尾部填充7,因为0-6都有物理意义),而句子开头会被插入[CLS]标签。无论是句子还是标签,最终都被转换为矩阵。测试数据整理函数如下所示:

    data = [
        (
            ['海''钓''比''赛''地''点''在''厦''门''与''金''门''之''间''的''海''域''。'], [0, 0, 0, 0, 0, 0, 0, 5, 6, 0, 5, 6, 0, 0, 0, 0, 0, 0]
        ),
        (
            ['这''座''依''山''傍''水''的''博''物''馆''由''国''内''一''流''的''设''计''师''主''持''设''计'',''整''个''建''筑''群''精''美''而''恢''宏''。'],
            [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
        )]
    inputs, labels = collate_fn(data)
    for k, v in inputs.items():
        print(k, v.shape)
    print('labels', labels.shape)

    输出结果如下所示:

    input_ids torch.Size([2, 37])
    token_type_ids torch.Size([2, 37])
    attention_mask torch.Size([2, 37])
    labels torch.Size([2, 37])

    5.定义数据集加载器

    loader = torch.utils.data.DataLoader(dataset=dataset, batch_size=16, collate_fn=collate_fn, shuffle=True, drop_last=True)

    通过数据集加载器查看一批样例数据,如下所示:

    for i, (inputs, labels) in enumerate(loader):
        break
    print(tokenizer.decode(inputs['input_ids'][0]))
    print(labels[0])
    for k, v in inputs.items():
        print(k, v.shape)

    输出结果如下所示:

    [CLS] 这 种 输 液 器 不 必 再 悬 吊 药 瓶 , 改 用 气 压 推 动 液 体 流 动 , 自 闭 防 回 流 , 安 全 、 简 便 、 抗 污 染 , 堪 称 输 液 器 历 史 上 的 一 次 革 命 。 [SEP] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD]
    tensor([7, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
            0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7,
            7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7], device='cuda:0')
    input_ids torch.Size([16, 87])
    token_type_ids torch.Size([16, 87])
    attention_mask torch.Size([16, 87])

    三.定义模型
    1.加载预训练模型

    # 加载预训练模型
    pretrained = AutoModel.from_pretrained(Path(f'{pretrained_model_name_or_path}'))
    # 统计参数量
    # print(sum(i.numel() for i in pretrained.parameters()) / 10000)
    # 测试预训练模型
    pretrained.to(device)

    2.定义下游任务模型
    先介绍一个两段式训练的概念,通常是先单独对下游任务模型进行训练,然后再连同预训练模型和下游任务模型一起进行训练的模式。

    class Model(torch.nn.Module):
        def __init__(self):
            super().__init__()
            # 标识当前模型是否处于tuning模式
            self.tuning = False
            # 当处于tuning模式时backbone应该属于当前模型的一部分,否则该变量为空
            self.pretrained = None
            # 当前模型的神经网络层
            self.rnn = torch.nn.GRU(input_size=768, hidden_size=768, batch_first=True)
            self.fc = torch.nn.Linear(in_features=768, out_features=8)

        def forward(self, inputs):
            # 根据当前模型是否处于tuning模式而使用外部backbone或内部backbone计算
            if self.tuning:
                out = self.pretrained(**inputs).last_hidden_state
            else:
                with torch.no_grad():
                    out = pretrained(**inputs).last_hidden_state
            # backbone抽取的特征输入RNN网络进一步抽取特征
            out, _ = self.rnn(out)
            # RNN网络抽取的特征最后输入FC神经网络分类
            out = self.fc(out).softmax(dim=2)
            return out

        # 切换下游任务模型的tuning模式
        def fine_tuning(self, tuning):
            self.tuning = tuning
            # tuning模式时,训练backbone的参数
            if tuning:
                for i in pretrained.parameters():
                    i.requires_grad = True
                pretrained.train()
                self.pretrained = pretrained
            # 非tuning模式时,不训练backbone的参数
            else:
                for i in pretrained.parameters():
                    i.requires_grad_(False)
                pretrained.eval()
                self.pretrained = None

    (1)tuning表示当前模型是否处于微调模型,pretrained表示微调模式时预训练模型属于当前模型。
    (2)在__init__()中定义了下游任务模型的2个层,分别为GRU网络和全连接神经网络层,GRU作用是进一步抽取特征,提高模型预测正确率。
    (3)fine_tuning()用来切换训练模式pretrained.train()和评估模式pretrained.eval()

    四.训练和测试
    1.模型训练

    def train(epochs):
        lr = 2e-5 if model.tuning else 5e-4 # 根据模型的tuning模式设置学习率
        optimizer = AdamW(model.parameters(), lr=lr) # 优化器
        criterion = torch.nn.CrossEntropyLoss() # 损失函数
        scheduler = get_scheduler(name='linear', num_warmup_steps=0, num_training_steps=len(loader) * epochs, optimizer=optimizer) # 学习率衰减策略
        model.train()
        for epoch in range(epochs):
            for step, (inputs, labels) in enumerate(loader):
                # 模型计算
                # [b,lens] -> [b,lens,8]
                outs = model(inputs)
                # 对outs和labels变形,并且移除PAD
                # outs -> [b, lens, 8] -> [c, 8]
                # labels -> [b, lens] -> [c]
                outs, labels = reshape_and_remove_pad(outs, labels, inputs['attention_mask'])
                # 梯度下降
                loss = criterion(outs, labels) # 计算损失
                loss.backward() # 反向传播
                optimizer.step() # 更新参数
                scheduler.step() # 更新学习率
                optimizer.zero_grad() # 清空梯度
                if step % (len(loader) * epochs // 30) == 0:
                    counts = get_correct_and_total_count(labels, outs)
                    accuracy = counts[0] / counts[1]
                    accuracy_content = counts[2] / counts[3]
                    lr = optimizer.state_dict()['param_groups'][0]['lr']
                    print(epoch, step, loss.item(), lr, accuracy, accuracy_content)
        torch.save(model, 'model/中文命名实体识别.model')

    训练过程基本步骤如下所示:
    (1)从数据集加载器中获取一个批次的数据。
    (2)让模型计算预测结果。
    (2)使用工具函数对预测结果和labels进行变形,移除预测结果和labels中的PAD。
    (4)计算loss并执行梯度下降优化模型参数。
    (5)每隔一定的steps,输出一次模型当前的各项数据,便于观察。
    (6)每训练完一个epoch,将模型的参数保存到磁盘。
    接下来介绍两段式训练过程,第一阶段是训练下游任务模型,第二阶段是联合训练下游任务模型和预训练模型如下所示:

    # 两段式训练第一阶段,训练下游任务模型
    model.fine_tuning(False)
    # print(sum(p.numel() for p in model.parameters() / 10000))
    train(1)

    # 两段式训练第二阶段,联合训练下游任务模型和预训练模型
    model.fine_tuning(True)
    # print(sum(p.numel() for p in model.parameters() / 10000))
    train(5)

    2.模型测试
    模型测试基本思路:从磁盘加载模型,然后切换到评估模式,将模型移动到计算设备,从测试集中取批次数据,输入模型中,统计正确率。

    def test():
        # 加载训练完的模型
        model_load = torch.load('model/中文命名实体识别.model')
        model_load.eval() # 切换到评估模式
        model_load.to(device)
        # 测试数据集加载器
        loader_test = torch.utils.data.DataLoader(dataset=Dataset('validation'), batch_size=128, collate_fn=collate_fn, shuffle=True, drop_last=True)
        correct = 0
        total = 0
        correct_content = 0
        total_content = 0
        # 遍历测试数据集
        for step, (inputs, labels) in enumerate(loader_test):
            # 测试5个批次即可,不用全部遍历
            if step == 5:
                break
            print(step)
            # 计算
            with torch.no_grad():
                # [b, lens] -> [b, lens, 8] -> [b, lens]
                outs = model_load(inputs)
            # 对outs和labels变形,并且移除PAD
            # fouts -> [b, lens, 8] -> [c, 8]
            # labels -> [b, lens] -> [c]
            outs, labels = reshape_and_remove_pad(outs, labels, inputs['attention_mask'])
            # 统计正确数量
            counts = get_correct_and_total_count(labels, outs)
            correct += counts[0]
            total += counts[1]
            correct_content += counts[2]
            total_content += counts[3]
        print(correct / total, correct_content / total_content)

    3.预测任务

    def predict():
        # 加载模型
        model_load = torch.load('model/中文命名实体识别.model')
        model_load.eval()
        model_load.to(device)
        # 测试数据集加载器
        loader_test = torch.utils.data.DataLoader(dataset=Dataset('validation'), batch_size=32, collate_fn=collate_fn, shuffle=True, drop_last=True)
        # 取一个批次的数据
        for i, (inputs, labels) in enumerate(loader_test):
            break
        # 计算
        with torch.no_grad():
            # [b, lens] -> [b, lens, 8] -> [b, lens]
            outs = model_load(inputs).argmax(dim=2)
        for i in range(32):
            # 移除PAD
            select = inputs['attention_mask'][i] == 1
            input_id = inputs['input_ids'][i, select]
            out = outs[i, select]
            label = labels[i, select]
            # 输出原句子
            print(tokenizer.decode(input_id).replace(' '''))
            # 输出tag
            for tag in [label, out]:
                s = ''
                for j in range(len(tag)):
                    if tag[j] == 0:
                        s += '.'
                        continue
                    s += tokenizer.decode(input_id[j])
                    s += str(tag[j].item())
                print(s)
            print('=====================')

    参考文献:
    [1]HuggingFace自然语言处理详解:基于BERT中文模型的任务实战
    [2]https://huggingface.co/hfl/rbt3
    [3]https://huggingface.co/datasets/peoples_daily_ner/tree/main
    [4]https://github.com/OYE93/Chinese-NLP-Corpus/
    [5]https://github.com/ai408/nlp-engineering/blob/main/20230625_HuggingFace自然语言处理详解/第10章:中文命名实体识别.py

    09-03 15:48