1. 代码整体功能概述
这段代码实现了一个半监督学习的食品图像分类任务,核心流程如下:
2. 核心模块逐行拆解
(1)随机种子固定:保证实验可复现
def seed_everything(seed):
torch.manual_seed(seed) # CPU随机种子
torch.cuda.manual_seed(seed) # 单个GPU随机种子
torch.cuda.manual_seed_all(seed) # 多GPU随机种子
torch.backends.cudnn.benchmark = False # 关闭自动优化,避免随机性
torch.backends.cudnn.deterministic = True # 强制确定性算法
random.seed(seed) # Python原生随机种子
np.random.seed(seed) # Numpy随机种子
os.environ['PYTHONHASHSEED'] = str(seed) # 哈希种子
seed_everything(0) # 固定种子为0,每次运行结果一致
关键知识点:
- 深度学习中 GPU 运算、数据洗牌等操作自带随机性,固定种子是实验可复现的核心
- cudnn.deterministic = True 会牺牲一点速度,但保证每次计算结果完全一致
(2)图像预处理:数据增强(训练)vs 纯转换(验证)
HW = 224 # 图像统一缩放到224×224(适配VGG等预训练模型)
# 训练集:数据增强(提升泛化能力)
train_transform = transforms.Compose([
transforms.ToPILImage(), # 把numpy数组转成PIL图像(因为后续操作需要PIL格式)
transforms.RandomResizedCrop(224), # 随机裁剪+缩放(模拟不同视角)
transforms.RandomRotation(50), # 随机旋转±50度
transforms.ToTensor() # 转Tensor:(H,W,C)→(C,H,W),值归一化到[0,1]
])
# 验证集:无增强(避免引入噪声,保证评估准确)
val_transform = transforms.Compose([
transforms.ToPILImage(),
transforms.ToTensor()
])
核心逻辑:
- 训练集增强:通过随机裁剪、旋转增加数据多样性,防止过拟合
- 验证集不增强:用 “干净” 的数据评估模型真实性能
- ToTensor() 是关键:PyTorch 模型要求输入是 (通道数, 高度, 宽度) 的 Tensor,而 PIL/OpenCV 读取的是 (H,W,C) 的数组,H是高度、W是宽度、C是通道数
(3)自定义数据集类 food_Dataset:处理有标签 / 无标签数据
这是代码的核心之一,支持 3 种模式(train/val/semi),适配不同数据类型:
class food_Dataset(Dataset):
def __init__(self, path, mode="train"):
self.mode = mode
# 半监督模式:只读取无标签数据(无Y)
if mode == "semi":
self.X = self.read_file(path)
# 训练/验证模式:读取有标签数据(X+Y)
else:
self.X, self.Y = self.read_file(path)
self.Y = torch.LongTensor(self.Y) # 标签转LongTensor(分类任务要求)
# 绑定预处理策略
if mode == "train":
self.transform = train_transform
else:
self.transform = val_transform
# 核心:读取文件的函数(区分有标签/无标签)
def read_file(self, path):
# 半监督模式:读取无标签文件夹下的所有图片
if self.mode == "semi":
file_list = os.listdir(path) # 列出文件夹下所有图片名
# 初始化数组:(样本数, 224, 224, 3),uint8节省内存
xi = np.zeros((len(file_list), HW, HW, 3), dtype=np.uint8)
for j, img_name in enumerate(file_list):
img_path = os.path.join(path, img_name)
img = Image.open(img_path).resize((HW, HW)) # 缩放至224×224
xi[j, …] = img # 存入数组
print("读到了%d个数据" % len(xi))
return xi
# 有标签模式:按类别文件夹读取(00/01/…/10共11类)
else:
for i in tqdm(range(11)): # tqdm显示进度条
file_dir = path + "/%02d" % i # 拼接类别文件夹路径(00~10)
file_list = os.listdir(file_dir)
# 初始化当前类别的数据/标签
xi = np.zeros((len(file_list), HW, HW, 3), dtype=np.uint8)
yi = np.zeros(len(file_list), dtype=np.uint8)
for j, img_name in enumerate(file_list):
img_path = os.path.join(file_dir, img_name)
img = Image.open(img_path).resize((HW, HW))
xi[j, …] = img
yi[j] = i # 标签=文件夹对应的类别(0~10)
# 拼接所有类别的数据
if i == 0:
X = xi
Y = yi
else:
X = np.concatenate((X, xi), axis=0)
Y = np.concatenate((Y, yi), axis=0)
print("读到了%d个数据" % len(Y))
return X, Y
# 获取单个样本(Dataset必须实现)
def __getitem__(self, item):
if self.mode == "semi":
# 半监督:返回预处理后的图片 + 原始图片(后续打伪标签用)
return self.transform(self.X[item]), self.X[item]
else:
# 有标签:返回预处理后的图片 + 标签
return self.transform(self.X[item]), self.Y[item]
# 返回数据集长度(Dataset必须实现)
def __len__(self):
return len(self.X)
对于图像数据集的读取函数 有两种模式
1.在半监督模式下:
if self.mode == "semi":
# 列出文件夹下所有文件
file_list = os.listdir(path)
# 初始化数组: [文件数, 图片高度, 图片宽度, 3通道]
xi = np.zeros((len(file_list), HW, HW, 3), dtype=np.uint8)
# 遍历每个图片文件
for j, img_name in enumerate(file_list):
# 完整的图片路径
img_path = os.path.join(path, img_name)
# 打开图片
img = Image.open(img_path)
# 调整大小到统一尺寸 (HW × HW)
img = img.resize((HW, HW))
# 存储图片数据
xi[j, …] = img # … 表示所有维度
print("读到了%d个数据" % len(xi))
return xi # 只返回图像数据,不返回标签
半监督模式特点:
-
所有图片在同一个文件夹
-
没有标签(只返回X,不返回Y)
-
用于训练时不需要标签的情况
2. 完整监督模式:
else:
# 使用tqdm显示进度条
for i in tqdm(range(11)): # 遍历0-10共11个类别
# 类别文件夹路径: path/00, path/01, …
file_dir = path + "/%02d" % i # %02d 表示两位数字,如00,01
# 列出该类别下所有图片
file_list = os.listdir(file_dir)
# 初始化当前类别的数据
xi = np.zeros((len(file_list), HW, HW, 3), dtype=np.uint8) # 图像
yi = np.zeros(len(file_list), dtype=np.uint8) # 标签
# 遍历该类别的所有图片
for j, img_name in enumerate(file_list):
img_path = os.path.join(file_dir, img_name)
img = Image.open(img_path)
img = img.resize((HW, HW))
xi[j, …] = img
yi[j] = i # 文件夹名就是标签(0,1,2…)
# 拼接所有类别的数据
if i == 0:
X = xi # 第一个类别
Y = yi
else:
X = np.concatenate((X, xi), axis=0) # 沿第0维(样本维度)拼接
Y = np.concatenate((Y, yi), axis=0)
print("读到了%d个数据" % len(Y))
return X, Y # 返回图像和标签
(4)半监督核心:semiDataset 类(给无标签数据打伪标签)
这是半监督学习的关键,用训练好的模型对无标签数据预测,筛选置信度高的样本:
class semiDataset(Dataset):
def __init__(self, no_label_loder, model, device, thres=0.99):
# 核心:用模型给无标签数据打伪标签
x, y = self.get_label(no_label_loder, model, device, thres)
# 无高置信度样本:标记为无效
if x == []:
self.flag = False
# 有高置信度样本:构建数据集
else:
self.flag = True
self.X = np.array(x)
self.Y = torch.LongTensor(y)
self.transform = train_transform # 用训练集增强策略
def get_label(self, no_label_loder, model, device, thres):
model = model.to(device)
pred_prob = [] # 存储预测置信度
labels = [] # 存储预测标签
x = [] # 存储高置信度的原始图片
y = [] # 存储高置信度的伪标签
soft = nn.Softmax() # 把模型输出转成概率
with torch.no_grad(): # 禁用梯度,提速+省内存
for bat_x, _ in no_label_loder: # 遍历无标签数据
bat_x = bat_x.to(device)
pred = model(bat_x) # 模型预测(logits)
pred_soft = soft(pred) # 转概率(0~1)
# 取每个样本的最大概率和对应标签
pred_max, pred_value = pred_soft.max(1)
pred_prob.extend(pred_max.cpu().numpy().tolist())
labels.extend(pred_value.cpu().numpy().tolist())
# 筛选置信度>阈值的样本
for index, prob in enumerate(pred_prob):
if prob > thres: # 置信度>0.99才保留
x.append(no_label_loder.dataset[index][1]) # 原始图片
y.append(labels[index]) # 伪标签
return x, y
def __getitem__(self, item):
return self.transform(self.X[item]), self.Y[item]
def __len__(self):
return len(self.X)
# 辅助函数:创建半监督数据加载器
def get_semi_loader(no_label_loder, model, device, thres):
semiset = semiDataset(no_label_loder, model, device, thres)
if semiset.flag == False:
return None
else:
semi_loader = DataLoader(semiset, batch_size=16, shuffle=False)
return semi_loader
对于代码中的核心函数get_label这里给出详细注释:
def get_label(self, no_label_loder, model, device, thres):
# 准备工作
model = model.to(device) # 把模型搬到GPU(如果有)
pred_prob = [] # 存储所有图片的预测置信度
labels = [] # 存储所有图片的预测标签
x = [] # 存储高置信度图片(要返回的)
y = [] # 存储高置信度伪标签(要返回的)
soft = nn.Softmax() # Softmax层:把模型输出转成概率(0~1之间)
with torch.no_grad(): # 🚩非常重要!禁用梯度计算,加速推理,节省显存
for bat_x, _ in no_label_loder: # 遍历无标签数据,_占位符表示“忽略标签”
bat_x = bat_x.to(device) # 把图片搬到GPU
pred = model(bat_x) # 模型前向传播,输出logits(未归一化的分数)
pred_soft = soft(pred) # Softmax归一化,变成概率
# max(1)的含义:
# – 参数1表示在维度1(类别维度)上取最大值
# – 返回值1: 最大概率值(如0.99)
# – 返回值2: 最大概率对应的索引(如3,代表第3类)
pred_max, pred_value = pred_soft.max(1)
# 收集这一批次的结果
# .cpu():从GPU搬到CPU
# .numpy():转成numpy数组
# .tolist():转成Python列表
pred_prob.extend(pred_max.cpu().numpy().tolist())
labels.extend(pred_value.cpu().numpy().tolist())
# 🚩筛选置信度>阈值的样本
for index, prob in enumerate(pred_prob):
if prob > thres: # 只有高置信度的才保留
# 注意这里:no_label_loder.dataset[index][1]
# – no_label_loder.dataset: 原始数据集
# – [index]: 第index个样本
# – [1]: 取这个样本的第2个元素(通常是图片)
x.append(no_label_loder.dataset[index][1]) # 原始图片
y.append(labels[index]) # 对应的伪标签
return x, y
核心逻辑(半监督关键):
(5)自定义 CNN 模型 myModel
代码中实际用了预训练 VGG,但保留了自定义 CNN 作为备选,结构如下:
class myModel(nn.Module):
def __init__(self, num_class):
super(myModel, self).__init__()
# 卷积层:提取图像特征(类似VGG的简化版)
self.conv1 = nn.Conv2d(3, 64, 3, 1, 1) # 3→64通道,3×3卷积,padding=1(保持尺寸)
self.bn1 = nn.BatchNorm2d(64) # 批量归一化:加速训练,防止梯度消失
self.relu = nn.ReLU()
self.pool1 = nn.MaxPool2d(2) # 池化:缩小尺寸,保留关键特征
# 堆叠卷积层(每层通道数翻倍,尺寸减半)
self.layer1 = nn.Sequential(nn.Conv2d(64, 128, 3, 1, 1), nn.BatchNorm2d(128), nn.ReLU(), nn.MaxPool2d(2))
self.layer2 = nn.Sequential(nn.Conv2d(128, 256, 3, 1, 1), nn.BatchNorm2d(256), nn.ReLU(), nn.MaxPool2d(2))
self.layer3 = nn.Sequential(nn.Conv2d(256, 512, 3, 1, 1), nn.BatchNorm2d(512), nn.ReLU(), nn.MaxPool2d(2))
self.pool2 = nn.MaxPool2d(2) # 最终特征图:512x7x7
# 全连接层:分类
self.fc1 = nn.Linear(25088, 1000) # 512*7*7=25088 → 1000维隐藏层
self.relu2 = nn.ReLU()
self.fc2 = nn.Linear(1000, num_class) # 1000→11类(最终输出)
def forward(self, x):
# 卷积层提取特征
x = self.conv1(x)
x = self.bn1(x)
x = self.relu(x)
x = self.pool1(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.pool2(x)
# 展平:(batch, 512,7,7) → (batch, 25088)
x = x.view(x.size()[0], -1)
# 全连接层分类
x = self.fc1(x)
x = self.relu2(x)
x = self.fc2(x)
return x
Conv2d参数含义速查表:
| 第1个 | 3 | in_channels | 输入通道数 | 彩色图片有RGB 3个通道 |
| 第2个 | 64 | out_channels | 输出通道数 | 用64个不同的卷积核提取特征 |
| 第3个 | 3 | kernel_size | 卷积核大小 | 3×3的过滤器 |
| 第4个 | 1 | stride | 步长 | 每次移动1个像素 |
| 第5个 | 1 | padding | 填充 | 周围补1圈0 |
计算公式:输出高度 H_out = (H_in + 2*padding – kernel_size) // stride + 1
输出宽度 W_out = (W_in + 2*padding – kernel_size) // stride + 1
输入: 224×224
conv = nn.Conv2d(3, 64, 3, 1, 1)
H_out = (224 + 2*1 – 3) // 1 + 1
= (224 + 2 – 3) // 1 + 1
= 223 // 1 + 1
= 223 + 1
= 224 ✅ 尺寸保持不变!
关键知识点:
- 卷积层规律:通道数从 3→64→128→256→512(逐步提取高层特征)
- 池化层作用:每次尺寸减半,最终 224×224→7×7
- x.view(x.size()[0], -1):展平特征图,为全连接层做准备(PyTorch 中-1表示自动计算维度)
(6)核心训练函数 train_val:有监督 + 半监督混合训练
这是代码的执行核心,整合了有监督训练、半监督样本筛选、验证评估:
def train_val(model, train_loader, val_loader, no_label_loader, device, epochs, optimizer, loss, thres, save_path):
model = model.to(device)
semi_loader = None # 半监督数据加载器(初始为空)
# 记录训练/验证的损失和准确率
plt_train_loss = []
plt_val_loss = []
plt_train_acc = []
plt_val_acc = []
max_acc = 0.0 # 记录最高验证准确率
for epoch in range(epochs):
train_loss = 0.0
val_loss = 0.0
train_acc = 0.0
val_acc = 0.0
semi_loss = 0.0
semi_acc = 0.0
start_time = time.time()
# ========== 1. 有监督训练(训练集) ==========
model.train() # 训练模式(启用BN/Dropout)
for batch_x, batch_y in train_loader:
x, target = batch_x.to(device), batch_y.to(device)
pred = model(x) # 模型预测
train_bat_loss = loss(pred, target) # 计算损失
train_bat_loss.backward() # 反向传播
optimizer.step() # 更新参数
optimizer.zero_grad() # 梯度清零(必须!)
train_loss += train_bat_loss.cpu().item()# 累加损失
# 计算准确率:预测标签=argmax(pred),和真实标签比较
train_acc += np.sum(np.argmax(pred.detach().cpu().numpy(), axis=1) == target.cpu().numpy())
# 记录平均损失和准确率
plt_train_loss.append(train_loss / len(train_loader))
plt_train_acc.append(train_acc / len(train_loader.dataset))
# ========== 2. 半监督训练(可选) ==========
if semi_loader != None: # 有半监督数据才训练
for batch_x, batch_y in semi_loader:
x, target = batch_x.to(device), batch_y.to(device)
pred = model(x)
semi_bat_loss = loss(pred, target)
semi_bat_loss.backward()
optimizer.step()
optimizer.zero_grad()
semi_loss += train_bat_loss.cpu().item() # 注意:这里代码有笔误,应该是semi_bat_loss
semi_acc += np.sum(np.argmax(pred.detach().cpu().numpy(), axis=1) == target.cpu().numpy())
print("半监督数据集的训练准确率为", semi_acc / len(semi_loader.dataset)) # 修正:用semi_loader的长度
# ========== 3. 验证集评估 ==========
model.eval() # 评估模式(关闭BN/Dropout)
with torch.no_grad(): # 禁用梯度,提速
for batch_x, batch_y in val_loader:
x, target = batch_x.to(device), batch_y.to(device)
pred = model(x)
val_bat_loss = loss(pred, target)
val_loss += val_bat_loss.cpu().item()
val_acc += np.sum(np.argmax(pred.detach().cpu().numpy(), axis=1) == target.cpu().numpy())
# 记录验证损失/准确率(注意:代码原笔误是val_loader.dataset.__len__(),统一用len(val_loader.dataset))
plt_val_loss.append(val_loss / len(val_loader.dataset))
plt_val_acc.append(val_acc / len(val_loader.dataset))
# ========== 4. 每3轮筛选一次半监督数据 ==========
if epoch % 3 == 0 and plt_val_acc[-1] > 0.6: # 验证准确率>0.6才开始半监督(模型足够好)
semi_loader = get_semi_loader(no_label_loader, model, device, thres)
# ========== 5. 保存最优模型 ==========
if val_acc > max_acc: # 验证准确率更高时保存
torch.save(model, save_path)
max_acc = val_acc # 修正:原代码是max_acc = val_loss,明显错误!
# ========== 6. 打印训练日志 ==========
print('[%03d/%03d] %2.2f sec(s) TrainLoss : %.6f | valLoss: %.6f Trainacc : %.6f | valacc: %.6f' % \\
(epoch, epochs, time.time() – start_time, plt_train_loss[-1], plt_val_loss[-1], plt_train_acc[-1], plt_val_acc[-1]))
# ========== 7. 绘制训练曲线 ==========
plt.plot(plt_train_loss)
plt.plot(plt_val_loss)
plt.title("loss")
plt.legend(["train", "val"])
plt.show()
plt.plot(plt_train_acc)
plt.plot(plt_val_acc)
plt.title("acc")
plt.legend(["train", "val"])
plt.show()
(7)主程序:配置参数 + 执行训练
# 数据路径(适配样本集)
train_path = r"E:\\fenlei\\food_classification\\food-11_sample\\training\\labeled"
val_path = r"E:\\fenlei\\food_classification\\food-11_sample\\validation"
no_label_path = r"E:\\fenlei\\food_classification\\food-11_sample\\training\\unlabeled\\00"
# 构建数据集
train_set = food_Dataset(train_path, "train")
val_set = food_Dataset(val_path, "val")
no_label_set = food_Dataset(no_label_path, "semi")
# 构建数据加载器(batch_size=16,训练集shuffle=True)
train_loader = DataLoader(train_set, batch_size=16, shuffle=True)
val_loader = DataLoader(val_set, batch_size=16, shuffle=True)
no_label_loader = DataLoader(no_label_set, batch_size=16, shuffle=False)
# 模型选择:用预训练VGG(而非自定义CNN)
model, _ = initialize_model("vgg", 11, use_pretrained=True)
# 训练配置
lr = 0.001
loss = nn.CrossEntropyLoss() # 分类任务标配损失
optimizer = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=1e-4) # AdamW带权重衰减(L2正则)
device = "cuda" if torch.cuda.is_available() else "cpu" # GPU优先
save_path = "model_save/best_model.pth"
epochs = 15
thres = 0.99
# 启动训练
train_val(model, train_loader, val_loader, no_label_loader, device, epochs, optimizer, loss, thres, save_path)
-
关键知识点:
- 预训练模型:initialize_model("vgg", 11) 加载 VGG16/19 预训练权重,冻结底层特征提取层(代码中未显式冻结,但use_pretrained=True会加载权重)
- 优化器:AdamW 是 Adam + 权重衰减,比 SGD 收敛更快,比原始 Adam 更稳定
- weight_decay=1e-4:L2 正则,防止过拟合
网硕互联帮助中心






评论前必须登录!
注册