在深度学习领域,尤其是当模型部署到资源有限的环境中时,模型压缩技术变得尤为重要。剪枝(Pruning)是一种常见的模型压缩方法,通过减少模型中不重要的参数,可以在不显著降低模型性能的情况下提升效率。在本文中,我们将详细介绍如何在PyTorch中使用剪枝技术,并通过一些实验展示其效果。

1. 加载数据集与预处理

        我们将使用TorchText库加载常用的AG_NEWS数据集,并进行预处理。首先,导入必要的库并设置随机种子以保证实验的可重复性。        

import torch, torchdata, torchtext
from torch import nn
import time

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

SEED = 1234
torch.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

        接下来,加载AG_NEWS数据集,并将其拆分为训练集、验证集和测试集。我们使用TorchText的random_split方法来进行数据划分。

from torchtext.datasets import AG_NEWS
train, test = AG_NEWS()

train_size = len(list(iter(train)))
too_much, train, valid = train.random_split(total_length=train_size, weights = {"too_much": 0.7, "smaller_train": 0.2, "valid": 0.1}, seed=999)

数据预处理

        我们将使用Spacy作为分词器,并将文本转换为整数表示。这里,我们使用build_vocab_from_iterator来为数据集生成词汇表。

from torchtext.data.utils import get_tokenizer
tokenizer = get_tokenizer('spacy', language='en_core_web_sm')

from torchtext.vocab import build_vocab_from_iterator
def yield_tokens(data_iter):
    for _, text in data_iter:
        yield tokenizer(text)

vocab = build_vocab_from_iterator(yield_tokens(train), specials=['<unk>', '<pad>', '<bos>', '<eos>'])
vocab.set_default_index(vocab["<unk>"])

2. FastText 预训练词向量

        接下来,我们将加载FastText的预训练词向量,并将其应用到我们的词汇表中。

from torchtext.vocab import FastText
fast_vectors = FastText(language='simple') # 使用FastText预训练词向量

fast_embedding = fast_vectors.get_vecs_by_tokens(vocab.get_itos()).to(device)
fast_embedding.shape

3. 数据加载器

        我们需要定义一个数据加载器collate_fn,来确保批处理中的序列长度一致(通过填充)。在这里,我们还会生成序列长度信息,以便后续用于LSTM中的打包序列处理。

from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence

pad_idx = vocab['<pad>']

def collate_batch(batch):
    label_list, text_list, length_list = [], [], []
    for (_label, _text) in batch:
        label_list.append(int(_label) - 1)  # 标签从0开始
        processed_text = torch.tensor([vocab[token] for token in tokenizer(_text)], dtype=torch.int64)
        text_list.append(processed_text)
        length_list.append(processed_text.size(0))
    return torch.tensor(label_list, dtype=torch.int64), pad_sequence(text_list, padding_value=pad_idx, batch_first=True), torch.tensor(length_list, dtype=torch.int64)

train_loader = DataLoader(train, batch_size=64, shuffle=True, collate_fn=collate_batch)
valid_loader = DataLoader(valid, batch_size=64, shuffle=False, collate_fn=collate_batch)
test_loader = DataLoader(test, batch_size=64, shuffle=False, collate_fn=collate_batch)

4. 模型定义

        我们将定义一个双向LSTM模型,并将预训练的FastText词向量作为模型的嵌入层权重初始化。

class LSTM(nn.Module):
    def __init__(self, input_dim, emb_dim, hid_dim, output_dim, num_layers, bidirectional, dropout):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, emb_dim, padding_idx=pad_idx)
        self.lstm = nn.LSTM(emb_dim, hid_dim, num_layers=num_layers, bidirectional=bidirectional, dropout=dropout, batch_first=True)
        self.fc = nn.Linear(hid_dim * 2, output_dim)

    def forward(self, text, text_lengths):
        embedded = self.embedding(text)
        packed_embedded = nn.utils.rnn.pack_padded_sequence(embedded, text_lengths.to('cpu'), enforce_sorted=False, batch_first=True)
        packed_output, (hn, cn) = self.lstm(packed_embedded)
        output, output_lengths = nn.utils.rnn.pad_packed_sequence(packed_output, batch_first=True)
        hn = torch.cat((hn[-2,:,:], hn[-1,:,:]), dim=1)
        return self.fc(hn)

5. 模型训练与评估

        我们定义模型的训练与评估函数,并加载预训练好的模型进行测试。

criterion = nn.CrossEntropyLoss()

def accuracy(preds, y):
    predicted = torch.max(preds.data, 1)[1]
    return (predicted == y).sum().item() / len(y)

def evaluate(model, loader, criterion):
    model.eval()
    epoch_loss, epoch_acc = 0, 0
    with torch.no_grad():
        for label, text, text_length in loader:
            label, text = label.to(device), text.to(device)
            predictions = model(text, text_length).squeeze(1)
            loss = criterion(predictions, label)
            acc = accuracy(predictions, label)
            epoch_loss += loss.item()
            epoch_acc += acc
    return epoch_loss / len(loader), epoch_acc / len(loader)

test_loss, test_acc = evaluate(model, test_loader, criterion)
print(f'Test Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')

6. 剪枝(Pruning)

随机剪枝

        首先,我们使用PyTorch的torch.nn.utils.prune库对模型进行随机剪枝。例如,以下代码将随机剪掉全连接层中95%的连接。

import torch.nn.utils.prune as prune

fc = model.fc
prune.random_unstructured(fc, name="weight", amount=0.95)
print(list(fc.named_buffers()))  # 打印权重掩码

基于L1范数的剪枝

        我们还可以基于权重的L1范数进行剪枝,以下代码展示了如何根据最小的L1范数剪枝95%的连接。

prune.l1_unstructured(fc, name="weight", amount=0.95)
print(fc.weight)

全局剪枝

        全局剪枝是通过在整个模型中移除最低重要性的连接,而不是逐层进行剪枝。我们可以使用global_unstructured来实现这一目标。

parameters_to_prune = [(model.embedding, 'weight'), (model.lstm, 'weight_ih_l0'), (model.lstm, 'weight_hh_l0'), (model.fc, 'weight')]
prune.global_unstructured(parameters_to_prune, pruning_method=prune.L1Unstructured, amount=0.7)

7. 自定义剪枝方法

        我们还可以通过继承torch.nn.utils.prune.BasePruningMethod类来自定义剪枝方法。下面是一个简单的自定义剪枝示例,剪去张量中的每隔一个元素。

class ExamplePruningMethod(prune.BasePruningMethod):
    PRUNING_TYPE = 'unstructured'
    def compute_mask(self, t, default_mask):
        mask = default_mask.clone()
        mask.view(-1)[::2] = 0 
        return mask

 

 

如果你觉得这篇博文对你有帮助,请点赞、收藏、关注我,并且可以打赏支持我!

欢迎关注我的后续博文,我将分享更多关于人工智能、自然语言处理和计算机视觉的精彩内容。

谢谢大家的支持!

 

10-07 23:29