vaihingen数据集

ISPRS Vaihingen Dataset 是国际摄影测量与遥感学会(ISPRS)发布的一个用于评估遥感图像语义分割和目标识别算法的标准数据集。该数据集主要覆盖了德国瓦伊因根(Vaihingen)地区的高分辨率航空影像,广泛用于遥感、地理信息系统(GIS)、城市规划、环境监测等领域的研究和应用。

数据集将区域划分为了33个patch,如上图所示,编号不是连续的

  • 图像类型: 高分辨率航空影像,提供了3个波段:红外(IR)、红(R)和绿(G),因此是一个近红外-红-绿(IRRG)组合图像。
  • 分辨率: 影像的空间分辨率为 9cm/pixel,这意味着每个像素代表地面上 9 厘米见方的区域,数据的精细度非常高。
  • 地区覆盖: 数据集覆盖了Vaihingen的部分城区,包含住宅区、工业区、绿地、道路和河流等多种地表类型。

地面真值(Ground Truth)标注:

  • 数据集提供了经过人工标注的地面真值,这些标注被细分为不同的地物类别,包括:
    • 建筑物 (Buildings)
    • 树木 (Trees)
    • 低植被 (Low Vegetation)
    • 道路 (Roads)
    • 汽车 (Cars)
    • 背景 (Impervious Surfaces,主要指不可渗透的地面,如水泥和沥青)
  • 每个像素都分配了一个类别标签,用于语义分割任务。

Dataset

1
2
3
4
5
6
CLASSES = ('ImSurf', 'Building', 'LowVeg', 'Tree', 'Car', 'Clutter') #分类
PALETTE = [[255, 255, 255], [0, 0, 255], [0, 255, 255], [0, 255, 0], [255, 204, 0], [255, 0, 0]] #分类标记色彩

ORIGIN_IMG_SIZE = (1024, 1024) #原始图像大小
INPUT_IMG_SIZE = (1024, 1024)
TEST_IMG_SIZE = (1024, 1024)

图像旋转预处理

1
2
3
4
5
6
def get_training_transform():
train_transform = [
albu.RandomRotate90(p=0.5),#随机旋转
albu.Normalize()#归一化
]
return albu.Compose(train_transform)#组合返回

随机缩放和裁剪

1
2
3
4
5
6
7
8
9
def train_aug(img, mask):
crop_aug = Compose([RandomScale(scale_list=[0.5, 0.75, 1.0, 1.25, 1.5], mode='value'),
SmartCropV1(crop_size=512, max_ratio=0.75,
ignore_index=len(CLASSES), nopad=False)]) #随机缩放,智能裁剪到512x512
img, mask = crop_aug(img, mask)#对图像和编码处理
img, mask = np.array(img), np.array(mask)
aug = get_training_transform()(image=img.copy(), mask=mask.copy()) #旋转增强
img, mask = aug['image'], aug['mask']
return img, mask

测试集数据处理

1
2
3
4
5
6
7
8
9
10
11
12
def get_val_transform():
val_transform = [
albu.Normalize()#归一化
]
return albu.Compose(val_transform)


def val_aug(img, mask):
img, mask = np.array(img), np.array(mask)
aug = get_val_transform()(image=img.copy(), mask=mask.copy())
img, mask = aug['image'], aug['mask']
return img, mask

__getitem__方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def __getitem__(self, index):
p_ratio = random.random() #概率
if p_ratio > self.mosaic_ratio or self.mode == 'val' or self.mode == 'test':
img, mask = self.load_img_and_mask(index)
if self.transform:
img, mask = self.transform(img, mask) #旋转
else:
img, mask = self.load_mosaic_img_and_mask(index) #使用mosaic增强数据
if self.transform:
img, mask = self.transform(img, mask)

img = torch.from_numpy(img).permute(2, 0, 1).float()
mask = torch.from_numpy(mask).long()
img_id = self.img_ids[index]
results = dict(img_id=img_id, img=img, gt_semantic_seg=mask)
return results #返回数据

提取图像ID

1
2
3
4
5
6
def get_img_ids(self, data_root, img_dir, mask_dir):
img_filename_list = os.listdir(osp.join(data_root, img_dir)) #图片路径
mask_filename_list = os.listdir(osp.join(data_root, mask_dir)) #掩码路径
assert len(img_filename_list) == len(mask_filename_list) #声明大小相同一一对应
img_ids = [str(id.split('.')[0]) for id in mask_filename_list] #图片ID列表
return img_ids

加载图片和对应掩码

1
2
3
4
5
6
7
def load_img_and_mask(self, index):
img_id = self.img_ids[index] #图片ID
img_name = osp.join(self.data_root, self.img_dir, img_id + self.img_suffix) #类型后缀
mask_name = osp.join(self.data_root, self.mask_dir, img_id + self.mask_suffix) #类型后缀
img = Image.open(img_name).convert('RGB') #以RGB三通道形式打开图片
mask = Image.open(mask_name).convert('L') #以灰度图形式打开图片
return img, mask

Mosaic图像生成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def load_mosaic_img_and_mask(self, index):
indexes = [index] + [random.randint(0, len(self.img_ids) - 1) for _ in range(3)] #随机抽取3个其他图像进行图像生成
img_a, mask_a = self.load_img_and_mask(indexes[0])
img_b, mask_b = self.load_img_and_mask(indexes[1])
img_c, mask_c = self.load_img_and_mask(indexes[2])
img_d, mask_d = self.load_img_and_mask(indexes[3])

img_a, mask_a = np.array(img_a), np.array(mask_a)
img_b, mask_b = np.array(img_b), np.array(mask_b)
img_c, mask_c = np.array(img_c), np.array(mask_c)
img_d, mask_d = np.array(img_d), np.array(mask_d) #变换形式,便于组合

h = self.img_size[0] #图片高度h
w = self.img_size[1] #图片宽度

start_x = w // 4
strat_y = h // 4
# The coordinates of the splice center
offset_x = random.randint(start_x, (w - start_x)) #中心点选取
offset_y = random.randint(strat_y, (h - strat_y)) #中心点选取

crop_size_a = (offset_x, offset_y)
crop_size_b = (w - offset_x, offset_y)
crop_size_c = (offset_x, h - offset_y)
crop_size_d = (w - offset_x, h - offset_y) #四张图片的大小

random_crop_a = albu.RandomCrop(width=crop_size_a[0], height=crop_size_a[1])
random_crop_b = albu.RandomCrop(width=crop_size_b[0], height=crop_size_b[1])
random_crop_c = albu.RandomCrop(width=crop_size_c[0], height=crop_size_c[1])
random_crop_d = albu.RandomCrop(width=crop_size_d[0], height=crop_size_d[1])

croped_a = random_crop_a(image=img_a.copy(), mask=mask_a.copy())
croped_b = random_crop_b(image=img_b.copy(), mask=mask_b.copy())
croped_c = random_crop_c(image=img_c.copy(), mask=mask_c.copy())
croped_d = random_crop_d(image=img_d.copy(), mask=mask_d.copy()) #随机裁剪图片到大小符合

img_crop_a, mask_crop_a = croped_a['image'], croped_a['mask']
img_crop_b, mask_crop_b = croped_b['image'], croped_b['mask']
img_crop_c, mask_crop_c = croped_c['image'], croped_c['mask']
img_crop_d, mask_crop_d = croped_d['image'], croped_d['mask']

top = np.concatenate((img_crop_a, img_crop_b), axis=1)
bottom = np.concatenate((img_crop_c, img_crop_d), axis=1)
img = np.concatenate((top, bottom), axis=0) #上下拼接

top_mask = np.concatenate((mask_crop_a, mask_crop_b), axis=1)
bottom_mask = np.concatenate((mask_crop_c, mask_crop_d), axis=1)
mask = np.concatenate((top_mask, bottom_mask), axis=0) #掩码拼接
mask = np.ascontiguousarray(mask) # 将图像和掩码转换为连续内存格式。
img = np.ascontiguousarray(img)
img = Image.fromarray(img) #将 NumPy 数组转换回 PIL 图像格式
mask = Image.fromarray(mask)
# print(img.shape)

return img, mask

真实掩码与预测结果对比

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def show_img_mask_seg(seg_path, img_path, mask_path, start_seg_index):
seg_list = os.listdir(seg_path)
seg_list = [f for f in seg_list if f.endswith('.png')] #筛选后缀图片
fig, ax = plt.subplots(2, 3, figsize=(18, 12)) #2x3的图片显示窗口
seg_list = seg_list[start_seg_index:start_seg_index+2] #选取两张图片
patches = [mpatches.Patch(color=np.array(PALETTE[i])/255., label=CLASSES[i]) for i in range(len(CLASSES))] #创建图例
for i in range(len(seg_list)):
seg_id = seg_list[i]
img_seg = cv2.imread(f'{seg_path}/{seg_id}', cv2.IMREAD_UNCHANGED) #加载原始图
img_seg = img_seg.astype(np.uint8)
img_seg = Image.fromarray(img_seg).convert('P') #转化为图片并调色
img_seg.putpalette(np.array(PALETTE, dtype=np.uint8)) #自定义调色板
img_seg = np.array(img_seg.convert('RGB')) #格式转化为RGB数组
mask = cv2.imread(f'{mask_path}/{seg_id}', cv2.IMREAD_UNCHANGED)
mask = mask.astype(np.uint8)
mask = Image.fromarray(mask).convert('P')
mask.putpalette(np.array(PALETTE, dtype=np.uint8))
mask = np.array(mask.convert('RGB'))
img_id = str(seg_id.split('.')[0])+'.tif'
img = cv2.imread(f'{img_path}/{img_id}', cv2.IMREAD_COLOR)
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) #读取图片并格式转换
ax[i, 0].set_axis_off()
ax[i, 0].imshow(img)
ax[i, 0].set_title('RS IMAGE ' + img_id)
ax[i, 1].set_axis_off()
ax[i, 1].imshow(mask)
ax[i, 1].set_title('Mask True ' + seg_id)
ax[i, 2].set_axis_off()
ax[i, 2].imshow(img_seg)
ax[i, 2].set_title('Mask Predict ' + seg_id)
ax[i, 2].legend(handles=patches, bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0., fontsize='large') #展示

DataLoader

参数名 类型 默认值 描述
dataset Dataset 数据集实例,继承自 torch.utils.data.Dataset,是 DataLoader 加载数据的核心。
batch_size int 1 每个批次加载的数据量,即每次迭代中有多少样本被处理。
shuffle bool False 是否在每个 epoch 开始时打乱数据顺序。通常在训练时设置为 True,而在验证或测试时为 False
sampler SamplerNone None 自定义样本的选择顺序,通常与 shuffle 互斥。可用于复杂的采样策略,如平衡类不平衡数据集。
batch_sampler SamplerNone None 每次迭代返回批次索引,而不是样本索引,不能与 batch_sizeshufflesampler 一起使用。
num_workers int 0 用于加载数据的子进程数。0 表示数据将由主进程加载,多线程可以加速数据加载。
collate_fn Callable None 自定义的函数,用于将不同样本组合成批次。默认使用 default_collate,可处理张量、序列和字典等类型。
pin_memory bool False 如果设置为 True,会将数据加载到 CUDA 固定内存中,加速 GPU 训练。
drop_last bool False 是否丢弃最后一个不足 batch_size 的批次。如果数据大小不是批次的整数倍时,设为 True 可防止出现小批次。
timeout numeric 0 如果大于 0,则表示加载一个批次数据的最大等待时间(秒),超时将引发错误。
worker_init_fn CallableNone None 每个工作进程初始化时的函数,用于设定子进程环境(如设置随机种子)。
prefetch_factor int 2 每个 worker 预取数据的批次数量。只有 num_workers > 0 时有效,默认是每次加载两个批次的数据。
persistent_workers bool False 如果为 True,在数据加载完毕后不会关闭子进程,允许在多个 epoch 中保持 worker 进程持久运行。
1
2
3
4
5
6
7
8
9
10
11
# training hparam
max_epoch = 45
ignore_index = len(CLASSES)
train_batch_size = 8
val_batch_size = 8
lr = 6e-4
weight_decay = 0.01
backbone_lr = 6e-5
backbone_weight_decay = 0.01
num_classes = len(CLASSES)
classes = CLASSES

定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
train_dataset = VaihingenDataset(data_root='data/vaihingen/train', mode='train',
mosaic_ratio=0.25, transform=train_aug)

val_dataset = VaihingenDataset(transform=val_aug)
test_dataset = VaihingenDataset(data_root='data/vaihingen/test',
transform=val_aug)

train_loader = DataLoader(dataset=train_dataset,
batch_size=train_batch_size,
num_workers=4,
pin_memory=True,
shuffle=True,
drop_last=True)

val_loader = DataLoader(dataset=val_dataset,
batch_size=val_batch_size,
num_workers=4,
shuffle=False,
pin_memory=True,
drop_last=False)