偏标记学习+图像分类(论文复现)
概述
算法原理
核心逻辑
import models
import datasets
import torch
from torch.utils.data import DataLoader
import numpy as np
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR
import torch.nn.functional as F
import torchvision.transforms as transforms
from tqdm import tqdm
def CE_loss(probs, targets):
"""交叉熵损失函数"""
loss = -torch.sum(targets * torch.log(probs), dim = -1)
loss_avg = torch.sum(loss)/probs.shape[0]
return loss_avg
class Proden:
def __init__(self, configs):
self.configs = configs
def train(self, save = False):
configs = self.configs
# 读取数据集
dataset_path = configs['dataset path']
if configs['dataset'] == 'CIFAR-10':
train_data, train_labels, test_data, test_labels = datasets.cifar10_read(dataset_path)
train_dataset = datasets.Cifar(train_data, train_labels)
test_dataset = datasets.Cifar(test_data, test_labels)
output_dimension = 10
elif configs['dataset'] == 'CIFAR-100':
train_data, train_labels, test_data, test_labels = datasets.cifar100_read(dataset_path)
train_dataset = datasets.Cifar(train_data, train_labels)
test_dataset = datasets.Cifar(test_data, test_labels)
output_dimension = 100
# 生成偏标记
partial_labels = datasets.generate_partial_labels(train_labels, configs['partial rate'])
train_dataset.load_partial_labels(partial_labels)
# 计算数据的均值和方差,用于模型输入的标准化
mean = [np.mean(train_data[:, i, :, :]) for i in range(3)]
std = [np.std(train_data[:, i, :, :]) for i in range(3)]
normalize = transforms.Normalize(mean, std)
# 设备:GPU或CPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# 加载模型
if configs['model'] == 'ResNet18':
model = models.ResNet18(output_dimension = output_dimension).to(device)
elif configs['model'] == 'ConvNet':
model = models.ConvNet(output_dimension = output_dimension).to(device)
# 设置学习率等超参数
lr = configs['learning rate']
weight_decay = configs['weight decay']
momentum = configs['momentum']
optimizer = optim.SGD(model.parameters(), lr = lr, weight_decay = weight_decay, momentum = momentum)
lr_step = configs['learning rate decay step']
lr_decay = configs['learning rate decay rate']
lr_scheduler = StepLR(optimizer, step_size=lr_step, gamma=lr_decay)
for epoch_id in range(configs['epoch count']):
# 训练模型
train_dataloader = DataLoader(train_dataset, batch_size = configs['batch size'], shuffle = True)
model.train()
for batch in tqdm(train_dataloader, desc='Training(Epoch %d)' % epoch_id, ascii=' 123456789#'):
ids = batch['ids']
# 标准化输入
data = normalize(batch['data'].to(device))
partial_labels = batch['partial_labels'].to(device)
targets = batch['targets'].to(device)
optimizer.zero_grad()
# 计算预测概率
logits = model(data)
probs = F.softmax(logits, dim=-1)
# 更新软标签
with torch.no_grad():
new_targets = F.normalize(probs * partial_labels, p=1, dim=-1)
train_dataset.targets[ids] = new_targets.cpu().numpy()
# 计算交叉熵损失
loss = CE_loss(probs, targets)
loss.backward()
# 更新模型参数
optimizer.step()
# 调整学习率
lr_scheduler.step()
效果演示
使用方式
unzip Proden-implemention.zip
cd Proden-implemention
pip install -r requirements.txt
bash download.sh
python main.py -c [你的配置文件路径] -r [选择下者之一:"train"、"test"、"infer"]
python main-flask.py