现场拍照是农业保险验标过程中的一个重要环节。不同的拍照人员对拍摄要求的理解可能存在差异,导致拍摄的照片在角度、内容完整性等方面参差不齐。不同的设备清晰度不同,环境复杂多样,可能存在光线不足、天气恶劣等情况,影响照片的清晰度和质量。例如,在阴天或傍晚拍摄的照片可能比较昏暗,难以看清细节;在暴雨天气拍摄时,照片容易被雨水遮挡或模糊。在核保环境,很多核保人员面对照片,可能也难以分辨图片中的作物种类。基于深度学习模型来识别现场验标中作物种类信息,是提高核保准确率与效率的一种可靠方案。
基于往年历史验标照片数据作为样本,可以省去样本收集的工作量。
模型选择
ResNet50 是一种深度卷积神经网络,具有 50 层的网络结构。这种深度结构使其能够自动学习到非常丰富和复杂的图像特征。在识别作物种类时,它可以从作物图像中提取到从低级的边缘、颜色、纹理等特征,到高级的语义特征,例如作物的整体形状、叶片的排列方式、花朵或果实的特征等。通过对这些多维度特征的综合学习和分析,ResNet50 能够准确地区分不同种类的作物。
随着神经网络深度的增加,梯度消失和退化问题往往会变得非常严重,这会导致模型难以训练,准确率难以提升。ResNet50 引入了残差连接的创新设计,有效地解决了这些问题。残差连接允许梯度直接跳过一些层进行反向传播,使得训练深层网络变得更加容易,模型能够收敛到更优的解。这意味着 ResNet50 可以充分利用其深度结构来学习更具代表性的特征,而不会受到梯度问题的困扰,从而提高了对作物种类识别的准确率。
采用ResNet50还有一个好处时,不需要做样本标注。
开发环境
- python 3.10
- torch 2.6.0
- torchvision 0.21.0
样本准备
准备了34类作物,共200746张图片
数据预处理
ResNet50 输入图像的分辨率通常要求不低于模型默认的输入尺寸,一般为 224×224 像素。更高分辨率的图像能提供更多细节,但也会增加计算量和内存占用。如果图像分辨率过低,可能会丢失一些关键特征,影响模型的识别精度。为了将图片大小尺寸统一为224*224。采用了以下代码:
import os
from PIL import Image
from torchvision import transformsdef resize_images_torchvision(input_folder, output_folder, target_size=(224, 224)):# 检查输出文件夹是否存在,如果不存在则创建if not os.path.exists(output_folder):os.makedirs(output_folder)# 定义图像转换操作transform = transforms.Compose([transforms.Resize(target_size),transforms.ToTensor(),transforms.ToPILImage()])# 遍历输入文件夹中的所有文件for filename in os.listdir(input_folder):# 构建完整的文件路径file_path = os.path.join(input_folder, filename)# 检查是否为图片文件if os.path.isfile(file_path) and filename.lower().endswith(('.png', '.jpg', '.jpeg')):# 构建输出文件路径output_path = os.path.join(output_folder, filename)if os.path.exists(output_path):continuetry:# 打开图片image = Image.open(file_path)# 应用图像转换操作resized_image = transform(image)# 保存调整尺寸后的图片resized_image.save(output_path)except Exception as alias:print(file_path)print(f"Error resizing image: {alias}")if __name__ == '__main__':# 获取目录下的1级文件夹base_path = 'E:/tmp/zy2'out_path = 'E:/tmp/rsnet50/train'for root, dirs, files in os.walk('E:/tmp/zy2'):for dir in dirs:print(os.path.join(root, dir))path1=os.path.join(root, dir)path2=os.path.join(out_path, dir)if not os.path.exists(path2):os.makedirs(path2)resize_images_torchvision(path1, path2)
抽取部分作为验证使用
import os
import shutil
import randomdef count_files_in_folder(folder_path):"""计算文件夹下的文件数量:param folder_path: 文件夹路径:return: 文件数量"""count = 0for root, dirs, files in os.walk(folder_path):count += len(files)return countdef move_files_to_folder(source_folder, target_folder, percentage=10):"""从源文件夹中抽取一定比例的文件移动到目标文件夹:param source_folder: 源文件夹路径:param target_folder: 目标文件夹路径:param percentage: 抽取的文件比例,默认为10%"""all_files = []for root, dirs, files in os.walk(source_folder):for file in files:file_path = os.path.join(root, file)all_files.append(file_path)num_files_to_move = int(len(all_files) * (percentage / 100))files_to_move = random.sample(all_files, num_files_to_move)for file in files_to_move:shutil.move(file, target_folder)def resize_images_torchvision(input_folder, output_folder):# 检查输出文件夹是否存在,如果不存在则创建if not os.path.exists(output_folder):os.makedirs(output_folder)file_count = count_files_in_folder(input_folder)print(f"源文件夹下的文件数量为: {file_count}")move_files_to_folder(input_folder, output_folder)print(f"已抽取10%的文件移动到目标文件夹")if __name__ == '__main__':# 示例用法# 获取目录下的1级文件夹base_path = 'E:/tmp/rsnet50/train'out_path = 'E:/tmp/rsnet50/val'for root, dirs, files in os.walk('E:/tmp/zy2'):for dir in dirs:print(os.path.join(root, dir))path1=os.path.join(root, dir)path2=os.path.join(out_path, dir)if not os.path.exists(path2):os.makedirs(path2)resize_images_torchvision(path1, path2)
模型训练
- 这里采用的预训练模型,毕竟测试时使用的样本数量还是有限
- 采用的是有显卡的机器训练的,CPU训练太慢
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, models, transforms
from torch.utils.data import DataLoader
import time
import os
import pickle
from torchvision.models import ResNet50_Weights# 定义数据预处理操作
data_transforms = {'train': transforms.Compose([transforms.RandomResizedCrop(224),transforms.RandomHorizontalFlip(),transforms.ToTensor(),transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]),'val': transforms.Compose([transforms.Resize(256),transforms.CenterCrop(224),transforms.ToTensor(),transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]),
}# 数据集路径,请根据实际情况修改
data_dir = '/data/test'# 加载训练集和验证集
image_datasets = {'train': datasets.ImageFolder(os.path.join(data_dir, 'train'), data_transforms['train']),'val': datasets.ImageFolder(os.path.join(data_dir, 'val'), data_transforms['val'])
}# 创建数据加载器
dataloaders = {'train': DataLoader(image_datasets['train'], batch_size=32, shuffle=True, num_workers=4),'val': DataLoader(image_datasets['val'], batch_size=32, shuffle=False, num_workers=4)
}dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val']}
class_names = image_datasets['train'].classes# 加载预训练的ResNet50模型
model = models.resnet50(pretrained=True,weights=ResNet50_Weights.DEFAULT)# 修改全连接层以适应你的分类任务的类别数
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, len(class_names))# 选择设备(CPU或GPU)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = model.to(device)# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)# 训练模型的函数
def train_model(model, criterion, optimizer, num_epochs=25):since = time.time()best_model_wts = model.state_dict()best_acc = 0.0for epoch in range(num_epochs):print(f'Epoch {epoch}/{num_epochs - 1}')print('-' * 10)# 每个epoch都有训练和验证阶段for phase in ['train', 'val']:if phase == 'train':model.train() # 将模型设置为训练模式else:model.eval() # 将模型设置为评估模式running_loss = 0.0running_corrects = 0# 遍历数据for inputs, labels in dataloaders[phase]:inputs = inputs.to(device)labels = labels.to(device)# 梯度清零optimizer.zero_grad()# 前向传播with torch.set_grad_enabled(phase == 'train'):outputs = model(inputs)_, preds = torch.max(outputs, 1)loss = criterion(outputs, labels)# 反向传播 + 优化仅在训练阶段if phase == 'train':loss.backward()optimizer.step()# 统计running_loss += loss.item() * inputs.size(0)running_corrects += torch.sum(preds == labels.data)epoch_loss = running_loss / dataset_sizes[phase]epoch_acc = running_corrects.double() / dataset_sizes[phase]print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')# 保存最好的模型if phase == 'val' and epoch_acc > best_acc:best_acc = epoch_accbest_model_wts = model.state_dict()print()time_elapsed = time.time() - sinceprint(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')print(f'Best val Acc: {best_acc:4f}')# 加载最好的模型权重model.load_state_dict(best_model_wts)return modelif __name__ == '__main__':# 训练模型model = train_model(model, criterion, optimizer, num_epochs=25)# # 保存训练好的模型torch.save(model.state_dict(), 'trained_resnet50.pth')# 保存 class_names,后面识别时做对照用with open('class_names.pkl', 'wb') as f:pickle.dump(class_names, f)
模型使用
import torch
import torchvision.models as models
import torchvision.transforms as transforms
from PIL import Image
import torch.nn as nn
import pickledevice = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")def predict(image_path, model_path, class_names_path):# 加载 class_nameswith open(class_names_path, 'rb') as f:class_names = pickle.load(f)# 加载预训练的ResNet50模型model = models.resnet50(weights=None)num_ftrs = model.fc.in_featuresmodel.fc = nn.Linear(num_ftrs, len(class_names)) # 假设class_names已经定义model.load_state_dict(torch.load(model_path))model.to(device)model.eval()# 数据预处理transform = transforms.Compose([transforms.Resize((224, 224)),transforms.ToTensor(),transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])])# 加载并预处理图像image = Image.open(image_path).convert('RGB')image = transform(image).unsqueeze(0) # 添加批次维度image = image.to(device)# 进行预测with torch.no_grad():outputs = model(image)print(outputs)top5_values, top5_indices = torch.topk(outputs, 5, dim=1)top5_values = torch.squeeze(top5_values)top5_indices = torch.squeeze(top5_indices)print(f"Top 5 values: {top5_values}")print(f"Top 5 indices: {top5_indices}")# 获取前5个类别名称top5_classes = [class_names[idx] for idx in top5_indices]print(f"Top 5 classes: {top5_classes}")return top5_indices, top5_classesif __name__ == '__main__':# 示例:预测一张图片# image_path = r'/data/test/train/7/1731912277585_411423106233_BD.jpg'# image_path = r'/data/test/train/2520/1730449297666_411422206215_BD.jpg'image_path = '07.jpg'# predicted_class = predict(image_path, "trained_resnet50.pth")class_names_path = "class_names.pkl"top5_indices, top5_classes = predict(image_path, "trained_resnet50.pth", class_names_path)print(f"预测的前5个类别索引是: {top5_indices}")print(f"预测的前5个类别名称是: {top5_classes}")
经验证,准确率在98%以上。使用这种方法,可以有效提高农险保险核保的效率与准确性,为核保人员减轻工作量。