基于 PyTorch 的 MNIST 手写数字识别教程

本文将介绍如何使用 PyTorch 库来构建一个神经网络模型,以实现 MNIST 手写数字的识别。完整代码在文末提供。本文将逐段解释代码,以帮助读者理解各个部分的功能。

导入库

首先,我们需要导入必要的库,其中包括 PyTorch,以及用于加载 MNIST 数据集的torchvision

import argparse
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.optim.lr_scheduler import StepLR

这段代码段是一个示例的Python脚本,涉及了一些常用的深度学习模块和库。具体内容包括:

  1. argparse:Python标准库中的一个命令行解析模块,用于解析命令行参数传入的参数。

  2. torch:PyTorch是一个深度学习框架,提供了构建神经网络模型和训练模型所需的各种工具和函数。

  3. torch.nn:PyTorch的神经网络包,提供了构建神经网络模型的各种工具和模块。

  4. torch.nn.functional:PyTorch的函数式接口模块,提供了一些常用的函数操作,如激活函数和损失函数。

  5. torch.optim:PyTorch的优化器模块,提供了各种常用的优化器算法,如SGD、Adagrad、Adam等。

  6. datasets:PyTorch中用于加载数据集的模块,提供了常见的数据集类,如MNIST、CIFAR-10等。

  7. transforms:PyTorch中用于数据预处理的模块,提供了各种数据预处理操作,如归一化、随机裁剪等。

  8. StepLR:PyTorch中的学习率调度器模块,用于根据指定的步骤调整学习率。

定义神经网络模型

我们定义一个简单的卷积神经网络类Net,该类继承自torch.nn.Module

# 定义神经网络模型
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)  # 定义第一个卷积层
        self.conv2 = nn.Conv2d(32, 64, 3, 1)  # 定义第二个卷积层
        self.dropout1 = nn.Dropout(0.25)  # 定义第一个dropout层
        self.dropout2 = nn.Dropout(0.5)  # 定义第二个dropout层
        self.fc1 = nn.Linear(9216, 128)  # 定义第一个全连接层
        self.fc2 = nn.Linear(128, 10)  # 定义第二个全连接层

    def forward(self, x):
        x = self.conv1(x)  # 第一个卷积层
        x = F.relu(x)  # 激活函数
        x = self.conv2(x)  # 第二个卷积层
        x = F.relu(x)  # 激活函数
        x = F.max_pool2d(x, 2)  # 最大池化层
        x = self.dropout1(x)  # 第一个dropout层
        x = torch.flatten(x, 1)  # 展平
        x = self.fc1(x)  # 第一个全连接层
        x = F.relu(x)  # 激活函数
        x = self.dropout2(x)  # 第二个dropout层
        x = self.fc2(x)  # 第二个全连接层
        output = F.log_softmax(x, dim=1)  # 损失函数
        return output

这段代码定义了一个简单的神经网络模型,包括两个卷积层、两个全连接层以及两个dropout层。

__init__ 函数中,神经网络模型中的各层被初始化。首先定义了两个卷积层 conv1conv2,然后定义了两个dropout层 dropout1dropout2,最后是两个全连接层 fc1fc2

forward 函数中,描述了数据在神经网络中的前向传播过程。首先通过第一个卷积层并经过激活函数 ReLU,然后通过第二个卷积层同样经过激活函数 ReLU,接着经过最大池化层、第一个dropout层、将张量展平、第一个全连接层并经过 ReLU 激活函数、第二个dropout层、最后通过第二个全连接层生成输出,并通过 log_softmax 函数计算损失函数。

这个神经网络模型的整体结构包括了卷积层、激活函数、池化层、dropout层和全连接层,适用于处理图像数据的分类任务。

训练函数

定义一个用于训练模型的函数train,每个epoch中会循环遍历训练数据并更新模型的参数。

# 训练函数
def train(args, model, device, train_loader, optimizer, epoch):
    model.train()  # 将模型设置为训练模式
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)  # 数据和标签移动到指定设备上
        optimizer.zero_grad()  # 优化器的梯度清零
        output = model(data)  # 前向传播
        loss = F.nll_loss(output, target)  # 计算损失
        loss.backward()  # 反向传播
        optimizer.step()  # 更新参数
        if batch_idx % args.log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))
            if args.dry_run:
                break

这段代码是一个用于训练神经网络模型的训练函数。让我一步步解释它:

  1. def train(args, model, device, train_loader, optimizer, epoch): - 这行定义了一个名为train的函数,它接受一些参数,包括训练数据、模型、设备、数据加载器、优化器和当前的训练轮数(epoch)。

  2. model.train() - 这一行将模型设置为训练模式,这通常会影响一些层(比如Dropout)的行为,使其在训练过程中工作正常。

  3. for batch_idx, (data, target) in enumerate(train_loader): - 这是一个循环,遍历训练数据加载器中的每个批次。每次迭代中,它会获取一个批次的数据和对应的标签。

  4. data, target = data.to(device), target.to(device) - 这行将数据和标签移动到指定的设备上,以便在该设备上进行计算。

  5. optimizer.zero_grad() - 这一行将优化器中的梯度值归零,以准备计算新的梯度。

  6. output = model(data) - 这是模型的前向传播,它使用输入数据来生成预测的输出。

  7. loss = F.nll_loss(output, target) - 这一行计算模型的预测输出和真实标签之间的负对数似然损失(negative log-likelihood loss)。

  8. loss.backward() - 这一行执行反向传播,计算模型中各参数的梯度。

  9. optimizer.step() - 这行根据反向传播计算得到的梯度来更新模型的参数,以最小化损失函数。

  10. if batch_idx % args.log_interval == 0: - 这个条件判断语句用于在每经过一定数量的批次之后输出训练过程的日志信息。

  11. print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(...)) - 这行代码打印出当前的训练轮数、已处理的数据量、总数据量和当前批次的损失值。

  12. if args.dry_run: - 这个条件判断检查是否是在测试阶段,如果是测试阶段,便中断训练,以便快速测试代码的正确性。

这个函数的作用是在每个epoch中训练神经网络模型,它通过前向传播、计算损失、反向传播和参数更新来逐步提升模型的性能。

测试函数

定义一个用于验证模型的函数test,计算模型在测试集上的损失和准确度。

# 测试函数
def test(model, device, test_loader):
    model.eval()  # 将模型设置为评估模式
    test_loss = 0  # 初始化测试损失
    correct = 0  # 初始化正确预测数量
    with torch.no_grad():  # 不计算梯度,节省内存
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)  # 数据移动到指定设备上
            output = model(data)  # 获取模型输出
            test_loss += F.nll_loss(output, target, reduction='sum').item()  # 计算测试损失
            pred = output.argmax(dim=1, keepdim=True)  # 获取预测结果
            correct += pred.eq(target.view_as(pred)).sum().item()  # 计算正确预测数量
    test_loss /= len(test_loader.dataset)  # 计算平均测试损失
    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))

这是一个用于测试机器学习模型性能的函数。函数中的主要步骤如下:

  1. model.eval(): 将模型设置为评估模式,这通常用于在测试阶段,告诉模型不要进行梯度更新,也不要进行正则化过程,只是进行前向传播计算。
  2. test_losscorrect 分别用于记录测试损失和正确预测的数量。
  3. with torch.no_grad(): 块用于在其内部的代码块中停止梯度计算,这样可以节省内存和加快计算速度。
  4. for data, target in test_loader: 循环遍历测试数据集。
  5. data, target = data.to(device), target.to(device): 将数据和标签移动到指定的设备上,通常是GPU。
  6. output = model(data): 使用模型进行前向传播,得到输出。
  7. test_loss += F.nll_loss(output, target, reduction='sum').item(): 使用负对数似然损失函数计算测试损失,并将其累加到 test_loss 中。
  8. pred = output.argmax(dim=1, keepdim=True): 获取预测结果,选择输出中概率最高的类别作为预测结果。
  9. correct += pred.eq(target.view_as(pred)).sum().item(): 将正确预测的数量累加到 correct 中。
  10. test_loss /= len(test_loader.dataset): 计算平均测试损失。
  11. 打印输出测试结果,包括平均损失和准确率。

这段代码主要用于在给定测试数据集上对模型进行评估,并输出测试结果。

主函数

main函数包含了模型训练的所有主要步骤,包括参数设置、数据加载、模型初始化及训练过程。

# 主函数
def main():
    # 训练设置
    parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
    parser.add_argument('--batch-size', type=int, default=64, metavar='N',
                        help='训练时的输入批次大小(默认:64)')
    parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',
                        help='测试时的输入批次大小(默认:1000)')
    parser.add_argument('--epochs', type=int, default=14, metavar='N',
                        help='训练的轮数(默认:14)')
    parser.add_argument('--lr', type=float, default=1.0, metavar='LR',
                        help='学习率(默认:1.0)')
    parser.add_argument('--gamma', type=float, default=0.7, metavar='M',
                        help='学习率步长gamma(默认:0.7)')
    parser.add_argument('--no-cuda', action='store_true', default=False,
                        help='禁用CUDA训练')
    parser.add_argument('--no-mps', action='store_true', default=False,
                        help='禁用macOS GPU训练')
    parser.add_argument('--dry-run', action='store_true', default=False,
                        help='快速检查一次迭代')
    parser.add_argument('--seed', type=int, default=1, metavar='S',
                        help='随机种子(默认:1)')
    parser.add_argument('--log-interval', type=int, default=10, metavar='N',
                        help='等待多少个批次后记录训练状态')
    parser.add_argument('--save-model', action='store_true', default=False,
                        help='保存当前模型')
    args = parser.parse_args()
    use_cuda = not args.no_cuda and torch.cuda.is_available()  # 判断是否使用CUDA
    use_mps = not args.no_mps and torch.backends.mps.is_available()  # 判断是否使用MPS

    torch.manual_seed(args.seed)  # 设置随机种子

    if use_cuda:  # 判断使用哪种设备
        device = torch.device("cuda")
    elif use_mps:
        device = torch.device("mps")
    else:
        device = torch.device("cpu")

    train_kwargs = {'batch_size': args.batch_size}  # 设置训练参数
    test_kwargs = {'batch_size': args.test_batch_size}
    if use_cuda:
        cuda_kwargs = {'num_workers': 1, 'pin_memory': True, 'shuffle': True}
        train_kwargs.update(cuda_kwargs)
        test_kwargs.update(cuda_kwargs)
'''
这段代码定义了两个字典train_kwargs和test_kwargs,分别存储训练和测试时的参数配置。其中,train_kwargs包含了一个键值对{'batch_size': args.batch_size},表示训练时的批大小;同理,test_kwargs包含了一个键值对{'batch_size': args.test_batch_size},表示测试时的批大小。

接着,代码通过判断是否使用GPU来设置特定的CUDA参数。如果使用CUDA,则定义一个cuda_kwargs字典,包含三个键值对{'num_workers': 1, 'pin_memory': True, 'shuffle': True},分别表示使用1个worker、将数据置于固定基地址以提高数据加载速度、以及是否在每次迭代时打乱数据。然后通过update()方法将cuda_kwargs中的键值对合并到train_kwargs和test_kwargs中,用于训练和测试时的参数配置。
'''
    # 数据预处理
    transform=transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.1307,), (0.3081,))])
    dataset1 = datasets.MNIST('../data', train=True, download=True, transform=transform)  # 加载训练数据集
    dataset2 = datasets.MNIST('../data', train=False, transform=transform)  # 加载测试数据集
    train_loader = torch.utils.data.DataLoader(dataset1,**train_kwargs)  # 加载训练数据
    test_loader = torch.utils.data.DataLoader(dataset2, **test_kwargs)  # 加载测试数据

    model = Net().to(device)  # 初始化模型
    optimizer = optim.Adadelta(model.parameters(), lr=args.lr)  # 初始化优化器

    scheduler = StepLR(optimizer, step_size=1, gamma=args.gamma)  # 初始化学习率调度器
    for epoch in range(1, args.epochs + 1):  # 训练模型
        train(args, model, device, train_loader, optimizer, epoch)
        test(model, device, test_loader)
        scheduler.step()

    if args.save_model:  # 保存模型
        torch.save(model.state_dict(), "mnist_cnn.pt")

这段代码是一个简单的基于MNIST数据集的神经网络模型训练过程。下面是代码的大致解释:

  1. 数据预处理:首先定义了数据处理的操作,包括将图像转换为张量(transforms.ToTensor())和对图像进行归一化(transforms.Normalize())。

  2. 数据加载:使用PyTorch的Datasets模块加载MNIST数据集,分为训练集和测试集,其中传入了之前定义的数据处理操作transform。

  3. 模型和优化器初始化:定义了一个名为Net的模型,并将其移动到指定的设备(如CPU或GPU)。然后初始化了Adadelta优化器,传入模型的参数和学习率。

  4. 学习率调度器初始化:定义了一个StepLR学习率调度器,以便在训练过程中调整学习率。

  5. 模型训练:使用一个循环来训练模型,循环的次数由参数args.epochs决定。在每个循环中,调用train函数对模型进行训练,然后调用test函数对模型进行测试,最后通过调度器更新学习率。

  6. 模型保存:如果参数args.save_model为True,则保存训练好的模型参数到文件mnist_cnn.pt中。

整体来说,这段代码实现了一个简单的MNIST数据集上的神经网络模型训练过程,包括数据预处理、数据加载、模型初始化、模型训练、学习率调度器设置和模型保存等步骤。

运行主函数

最后在Python脚本的末尾,调用main函数以开始训练和测试模型。

if __name__ == '__main__':
    main()

完整代码

import argparse
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.optim.lr_scheduler import StepLR


# 定义神经网络模型
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)  # 定义第一个卷积层
        self.conv2 = nn.Conv2d(32, 64, 3, 1)  # 定义第二个卷积层
        self.dropout1 = nn.Dropout(0.25)  # 定义第一个dropout层
        self.dropout2 = nn.Dropout(0.5)  # 定义第二个dropout层
        self.fc1 = nn.Linear(9216, 128)  # 定义第一个全连接层
        self.fc2 = nn.Linear(128, 10)  # 定义第二个全连接层

    def forward(self, x):
        x = self.conv1(x)  # 第一个卷积层
        x = F.relu(x)  # 激活函数
        x = self.conv2(x)  # 第二个卷积层
        x = F.relu(x)  # 激活函数
        x = F.max_pool2d(x, 2)  # 最大池化层
        x = self.dropout1(x)  # 第一个dropout层
        x = torch.flatten(x, 1)  # 展平
        x = self.fc1(x)  # 第一个全连接层
        x = F.relu(x)  # 激活函数
        x = self.dropout2(x)  # 第二个dropout层
        x = self.fc2(x)  # 第二个全连接层
        output = F.log_softmax(x, dim=1)  # 损失函数
        return output


# 训练函数
def train(args, model, device, train_loader, optimizer, epoch):
    # 将模型设置为训练模式
    model.train()
    # 遍历训练数据集
    for batch_idx, (data, target) in enumerate(train_loader):
        # 将数据和标签移动到指定设备上
        data, target = data.to(device), target.to(device)
        # 将优化器的梯度清零
        optimizer.zero_grad()
        # 前向传播
        output = model(data)
        # 计算损失
        loss = F.nll_loss(output, target)
        # 反向传播
        loss.backward()
        # 更新参数
        optimizer.step()
        # 每隔一定间隔打印训练信息
        if batch_idx % args.log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))
            # 如果是dry_run模式,则退出循环
            if args.dry_run:
                break


# 测试函数
def test(model, device, test_loader):
    # 将模型设置为评估模式
    model.eval()
    # 初始化测试损失和正确预测数量
    test_loss = 0
    correct = 0
    # 不计算梯度,节省内存
    with torch.no_grad():
        # 遍历测试数据集
        for data, target in test_loader:
            # 将数据移动到指定设备上
            data, target = data.to(device), target.to(device)
            # 获取模型输出
            output = model(data)
            # 计算测试损失
            test_loss += F.nll_loss(output, target, reduction='sum').item()  # sum up batch loss
            # 获取预测结果
            pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
            # 计算正确预测数量
            correct += pred.eq(target.view_as(pred)).sum().item()

    # 计算平均测试损失
    test_loss /= len(test_loader.dataset)

    # 打印测试结果
    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))


# 主函数
def main():
    # 训练设置
    parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
    parser.add_argument('--batch-size', type=int, default=64, metavar='N',
                        help='训练时的输入批次大小(默认:64)')
    parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',
                        help='测试时的输入批次大小(默认:1000)')
    parser.add_argument('--epochs', type=int, default=14, metavar='N',
                        help='训练的轮数(默认:14)')
    parser.add_argument('--lr', type=float, default=1.0, metavar='LR',
                        help='学习率(默认:1.0)')
    parser.add_argument('--gamma', type=float, default=0.7, metavar='M',
                        help='学习率步长gamma(默认:0.7)')
    parser.add_argument('--no-cuda', action='store_true', default=False,
                        help='禁用CUDA训练')
    parser.add_argument('--no-mps', action='store_true', default=False,
                        help='禁用macOS GPU训练')
    parser.add_argument('--dry-run', action='store_true', default=False,
                        help='快速检查一次迭代')
    parser.add_argument('--seed', type=int, default=1, metavar='S',
                        help='随机种子(默认:1)')
    parser.add_argument('--log-interval', type=int, default=10, metavar='N',
                        help='等待多少个批次后记录训练状态')
    parser.add_argument('--save-model', action='store_true', default=False,
                        help='保存当前模型')
    args = parser.parse_args()
    # 判断是否使用CUDA
    use_cuda = not args.no_cuda and torch.cuda.is_available()
    # 判断是否使用MPS
    use_mps = not args.no_mps and torch.backends.mps.is_available()

    # 设置随机种子
    torch.manual_seed(args.seed)

    # 判断使用哪种设备
    if use_cuda:
        device = torch.device("cuda")
    elif use_mps:
        device = torch.device("mps")
    else:
        device = torch.device("cpu")

    # 设置训练和测试的参数
    train_kwargs = {'batch_size': args.batch_size}
    test_kwargs = {'batch_size': args.test_batch_size}
    if use_cuda:
        cuda_kwargs = {'num_workers': 1,
                       'pin_memory': True,
                       'shuffle': True}
        train_kwargs.update(cuda_kwargs)
        test_kwargs.update(cuda_kwargs)

    # 数据预处理
    transform=transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
        ])
    # 加载数据集
    dataset1 = datasets.MNIST('../data', train=True, download=True,
                       transform=transform)
    dataset2 = datasets.MNIST('../data', train=False,
                       transform=transform)
    # 加载数据
    train_loader = torch.utils.data.DataLoader(dataset1,**train_kwargs)
    test_loader = torch.utils.data.DataLoader(dataset2, **test_kwargs)

    # 初始化模型
    model = Net().to(device)
    # 初始化优化器
    optimizer = optim.Adadelta(model.parameters(), lr=args.lr)

    # 初始化学习率调度器
    scheduler = StepLR(optimizer, step_size=1, gamma=args.gamma)
    # 训练模型
    for epoch in range(1, args.epochs + 1):
        train(args, model, device, train_loader, optimizer, epoch)
        test(model, device, test_loader)
        scheduler.step()

    # 保存模型
    if args.save_model:
        torch.save(model.state_dict(), "mnist_cnn.pt")


if __name__ == '__main__':
    main()

如何运行这个脚本

要运行这段代码,您需要确保您的计算机已经安装了Python以及PyTorch库。以下是详细的步骤:

安装Python

如果尚未安装Python,请前往 Python官方网站 下载并安装适用于您的操作系统的Python。

安装必要的库

在运行代码之前,您需要安装一些Python库。这些库可以通过pip命令安装。打开终端或命令提示符,然后运行以下命令:

pip install torch torchvision

下载代码

将上述完整代码复制并粘贴到一个新的Python文件中。例如,可以命名文件为 mnist.py

运行代码

打开终端或命令提示符,导航到保存Python文件的位置,并运行以下命令:

python mnist.py --batch-size 64 --test-batch-size 1000 --epochs 14 --lr 1.0 --gamma 0.7 --log-interval 10 --save-model

可选的命令行参数

您可以通过调整命令行参数来更改训练设置:

  • --batch-size:训练时的输入批次大小(默认64)。
  • --test-batch-size:测试时的输入批次大小(默认1000)。
  • --epochs:训练的轮数(默认14)。
  • --lr:学习率(默认1.0)。
  • --gamma:学习率步长的衰减因子(默认0.7)。
  • --log-interval:每隔多少批次记录一次训练状态。
  • --save-model:如果设置此标志,训练完成后将保存模型。

例如,您可以通过以下命令来运行训练不过多轮次,并且不保存模型:

python mnist.py --epochs 5 --save-model

查看结果

运行训练脚本后,终端或命令提示符会输出训练和测试的进度,包括训练损失和测试准确率。

Train Epoch: 1 [0/60000 (0%)]   Loss: 2.281690
Train Epoch: 1 [640/60000 (1%)] Loss: 1.549805
Train Epoch: 1 [1280/60000 (2%)]        Loss: 1.005921
Train Epoch: 1 [1920/60000 (3%)]        Loss: 0.594626
Train Epoch: 1 [2560/60000 (4%)]        Loss: 0.388489
Train Epoch: 1 [3200/60000 (5%)]        Loss: 0.363996
Train Epoch: 1 [3840/60000 (6%)]        Loss: 0.278429
Train Epoch: 1 [4480/60000 (7%)]        Loss: 0.533427
Train Epoch: 1 [5120/60000 (9%)]        Loss: 0.225273
Train Epoch: 1 [5760/60000 (10%)]       Loss: 0.175600
...
Train Epoch: 14 [53760/60000 (90%)]     Loss: 0.005433
Train Epoch: 14 [54400/60000 (91%)]     Loss: 0.045242
Train Epoch: 14 [55040/60000 (92%)]     Loss: 0.024471
Train Epoch: 14 [55680/60000 (93%)]     Loss: 0.001584
Train Epoch: 14 [56320/60000 (94%)]     Loss: 0.064491
Train Epoch: 14 [56960/60000 (95%)]     Loss: 0.003183
Train Epoch: 14 [57600/60000 (96%)]     Loss: 0.001113
Train Epoch: 14 [58240/60000 (97%)]     Loss: 0.008324
Train Epoch: 14 [58880/60000 (98%)]     Loss: 0.002318
Train Epoch: 14 [59520/60000 (99%)]     Loss: 0.001486

Test set: Average loss: 0.0257, Accuracy: 9916/10000 (99%)

如果使用了--save-model参数,训练完成后,模型会被保存到一个文件中,默认名为 mnist.pt。您可以在后续的脚本或项目中加载这个模型进行推理或进一步的训练。

07-25 03:32