mit的BEVFusion代码学习——bevfusion.py
代码位置:bevfusion\mmdet3d\models\fusion_models\bevfusion.py
from typing import Any, Dict
import torch
from mmcv.runner import auto_fp16, force_fp32
#auto_fp16 是一种自动混合精度训练的技术,允许在需要时自动切换精度,而 force_fp32 是一种显式指示使用单精度浮点数的选项
from torch import nn
from torch.nn import functional as F
from mmdet3d.models.builder import (
build_backbone,
build_fuser,
build_head,
build_neck,
build_vtransform,
)
from mmdet3d.ops import Voxelization, DynamicScatter
from mmdet3d.models import FUSIONMODELS
from .base import Base3DFusionModel
__all__ = ["BEVFusion"]
#加载模块,bevfuion的pipeline
#一旦类被注册,就可以在配置文件中指定要构建的融合模型类型,然后通过FUSIONMODELS.build(cfg)构建相应的类实例
@FUSIONMODELS.register_module()#装饰器用于将BEVFusion类注册到FUSIONMODELS注册表中
class BEVFusion(Base3DFusionModel):
def __init__(
self,
encoders: Dict[str, Any],#字典[key(字符串),value(允许任何类型的值)]
fuser: Dict[str, Any],
decoder: Dict[str, Any],
heads: Dict[str, Any],
**kwargs, #用于接收额外的关键字参数
) -> None:
super().__init__()
self.encoders = nn.ModuleDict() #ModuleDict可以像常规Python字典一样索引,同样自动将每个 module 的 parameters 添加到网络之中的容器(注册)
if encoders.get("camera") is not None:#是否使用camera encoder
self.encoders["camera"] = nn.ModuleDict(
{
"backbone": build_backbone(encoders["camera"]["backbone"]),
"neck": build_neck(encoders["camera"]["neck"]),
"vtransform": build_vtransform(encoders["camera"]["vtransform"]),
}
)
if encoders.get("lidar") is not None:#是否使用lidar encoder
if encoders["lidar"]["voxelize"].get("max_num_points", -1) > 0: #若键不存在,返回默认值 -1
voxelize_module = Voxelization(**encoders["lidar"]["voxelize"])
else:
voxelize_module = DynamicScatter(**encoders["lidar"]["voxelize"])
self.encoders["lidar"] = nn.ModuleDict(
{
"voxelize": voxelize_module,
"backbone": build_backbone(encoders["lidar"]["backbone"]),
}
)
self.voxelize_reduce = encoders["lidar"].get("voxelize_reduce", True)
if encoders.get("radar") is not None:
if encoders["radar"]["voxelize"].get("max_num_points", -1) > 0:
voxelize_module = Voxelization(**encoders["radar"]["voxelize"])
else:
voxelize_module = DynamicScatter(**encoders["radar"]["voxelize"])
self.encoders["radar"] = nn.ModuleDict(
{
"voxelize": voxelize_module,
"backbone": build_backbone(encoders["radar"]["backbone"]),
}
)
self.voxelize_reduce = encoders["radar"].get("voxelize_reduce", True)
if fuser is not None:
self.fuser = build_fuser(fuser)
else:
self.fuser = None
self.decoder = nn.ModuleDict(
{
"backbone": build_backbone(decoder["backbone"]),
"neck": build_neck(decoder["neck"]),
}
)
self.heads = nn.ModuleDict()
for name in heads: #heads 字典中的键代表了不同的头部(head)的名称或标识,可能有分类头部、边界框回归头部等
if heads[name] is not None:
self.heads[name] = build_head(heads[name])
if "loss_scale" in kwargs:
self.loss_scale = kwargs["loss_scale"]
else:
self.loss_scale = dict()
for name in heads:
if heads[name] is not None:
self.loss_scale[name] = 1.0
# If the camera's vtransform is a BEVDepth version, then we're using depth loss.
self.use_depth_loss = ((encoders.get('camera', {}) or {}).get('vtransform', {}) or {}).get('type', '') in ['BEVDepth', 'AwareBEVDepth', 'DBEVDepth', 'AwareDBEVDepth']
#如果 'camera' 不存在,返回空字典{} #从vtransform的配置字典中获取键为'type'的值。若'type'不存在,返回空字符串''
self.init_weights()
def init_weights(self) -> None:
if "camera" in self.encoders:
self.encoders["camera"]["backbone"].init_weights()
def extract_camera_features( #提取相机特征
self,
x,
points,
radar_points,
camera2ego,#用于相机坐标系和世界坐标系之间的转换
lidar2ego,#用于激光雷达坐标系和世界坐标系之间的转换
lidar2camera,#用于激光雷达坐标系和相机坐标系之间的转换
lidar2image,#用于激光雷达坐标系和图像坐标系之间的转换
camera_intrinsics,#相机内参,大小为(B, 3, 3),其中第一个维度表示样本数量,第二个和第三个维度表示相机内参矩阵
camera2lidar,#相机坐标系和激光雷达坐标系之间的转换
img_aug_matrix,#图像增广矩阵
lidar_aug_matrix,#激光雷达增广矩阵
img_metas,#图像的元数据
gt_depths=None,
) -> torch.Tensor:#返回一个torch.Tensor类型的输出
B, N, C, H, W = x.size()
x = x.view(B * N, C, H, W)
x = self.encoders["camera"]["backbone"](x)#图像编码
x = self.encoders["camera"]["neck"](x)
if not isinstance(x, torch.Tensor):
x = x[0]
BN, C, H, W = x.size()
x = x.view(B, int(BN / B), C, H, W)#调整为原来的形状
x = self.encoders["camera"]["vtransform"]( #models/vtransforms/lss.py
x,
points,
radar_points,
camera2ego,
lidar2ego,
lidar2camera,
lidar2image,
camera_intrinsics,
camera2lidar,
img_aug_matrix,
lidar_aug_matrix,
img_metas,
depth_loss=self.use_depth_loss,
gt_depths=gt_depths,
)
return x
def extract_features(self, x, sensor) -> torch.Tensor:
feats, coords, sizes = self.voxelize(x, sensor)
batch_size = coords[-1, 0] + 1 #查看 coords 中最后一行的第一个元素并加 1
x = self.encoders[sensor]["backbone"](feats, coords, batch_size, sizes=sizes)#将体素化后的特征传递给骨干网
return x
# def extract_lidar_features(self, x) -> torch.Tensor:
# feats, coords, sizes = self.voxelize(x)#将输入的点云数据x转换为体素格式
# batch_size = coords[-1, 0] + 1
# x = self.encoders["lidar"]["backbone"](feats, coords, batch_size, sizes=sizes)
# return x
# def extract_radar_features(self, x) -> torch.Tensor:
# feats, coords, sizes = self.radar_voxelize(x)
# batch_size = coords[-1, 0] + 1
# x = self.encoders["radar"]["backbone"](feats, coords, batch_size, sizes=sizes)
# return x
@torch.no_grad()
@force_fp32()
def voxelize(self, points, sensor):
feats, coords, sizes = [], [], []
for k, res in enumerate(points): #k为索引,res为点云数据 #TypeError: 'DataContainer' object is not iterable
ret = self.encoders[sensor]["voxelize"](res)#对点云数据进行离散化处理
if len(ret) == 3:
# hard voxelize硬体素化(更高效)
f, c, n = ret #ret 包含三个元素:特征 f、坐标 c 和大小 n(生成的体素的数量)
else:
assert len(ret) == 2 #软体素化(更精确):将点云数据离散化为体素,但每个体素中可以包含多个点,而且每个点对体素的贡献可以是连续的、权重化的,可更好地反映点云数据的密度分布
f, c = ret
n = None
feats.append(f)
coords.append(F.pad(c, (1, 0), mode="constant", value=k))#在第一列填充k
if n is not None:
sizes.append(n)
feats = torch.cat(feats, dim=0)
coords = torch.cat(coords, dim=0)
if len(sizes) > 0:#存在大小信息
sizes = torch.cat(sizes, dim=0)
if self.voxelize_reduce:
feats = feats.sum(dim=1, keepdim=False) / sizes.type_as(feats).view(
-1, 1
)#进行了特征的归一化操作
feats = feats.contiguous()#将特征向量重新排列成一个连续的内存块
return feats, coords, sizes
# @torch.no_grad()
# @force_fp32()
# def radar_voxelize(self, points):
# feats, coords, sizes = [], [], []
# for k, res in enumerate(points):
# ret = self.encoders["radar"]["voxelize"](res)
# if len(ret) == 3:
# # hard voxelize
# f, c, n = ret
# else:
# assert len(ret) == 2
# f, c = ret
# n = None
# feats.append(f)
# coords.append(F.pad(c, (1, 0), mode="constant", value=k))
# if n is not None:
# sizes.append(n)
# feats = torch.cat(feats, dim=0)
# coords = torch.cat(coords, dim=0)
# if len(sizes) > 0:
# sizes = torch.cat(sizes, dim=0)
# if self.voxelize_reduce:
# feats = feats.sum(dim=1, keepdim=False) / sizes.type_as(feats).view(
# -1, 1
# )
# feats = feats.contiguous()
# return feats, coords, sizes
@auto_fp16(apply_to=("img", "points")) #装饰器仅对函数的 "img" 和 "points" 参数进行操作
def forward(
self,
img,
points,
camera2ego,
lidar2ego,
lidar2camera,
lidar2image,
camera_intrinsics,
camera2lidar,
img_aug_matrix,
lidar_aug_matrix,
metas,
depths,
radar=None,
gt_masks_bev=None,
gt_bboxes_3d=None,
gt_labels_3d=None,
**kwargs,
):
if isinstance(img, list):
raise NotImplementedError
else:
outputs = self.forward_single(
img,
points,
camera2ego,
lidar2ego,
lidar2camera,
lidar2image,
camera_intrinsics,
camera2lidar,
img_aug_matrix,
lidar_aug_matrix,
metas,
depths,
radar,
gt_masks_bev,
gt_bboxes_3d,
gt_labels_3d,
**kwargs,
)
return outputs
@auto_fp16(apply_to=("img", "points"))
def forward_single(
self,
img,
points,
camera2ego,
lidar2ego,
lidar2camera,
lidar2image,
camera_intrinsics,
camera2lidar,
img_aug_matrix,
lidar_aug_matrix,
metas,
depths=None,
radar=None,
gt_masks_bev=None,
gt_bboxes_3d=None,
gt_labels_3d=None,
**kwargs,
):
features = []
auxiliary_losses = {}
for sensor in (
self.encoders if self.training else list(self.encoders.keys())[::-1]
):#在训练模式下遍历self.encoders中的传感器,否则逆序遍历self.encoders.keys()中的传感器
if sensor == "camera":
feature = self.extract_camera_features(
img,
points,
radar,
camera2ego,
lidar2ego,
lidar2camera,
lidar2image,
camera_intrinsics,
camera2lidar,
img_aug_matrix,
lidar_aug_matrix,
metas,
gt_depths=depths,
)
if self.use_depth_loss:
feature, auxiliary_losses['depth'] = feature[0], feature[-1]
elif sensor == "lidar":
feature = self.extract_features(points, sensor)
elif sensor == "radar":
feature = self.extract_features(radar, sensor)
else:
raise ValueError(f"unsupported sensor: {sensor}")
features.append(feature)
if not self.training:
# avoid OOM(内存耗尽)
features = features[::-1] #对特征进行逆序(reverse)操作
if self.fuser is not None:
x = self.fuser(features)#将特征输入融合器进行融合
else:
assert len(features) == 1, features #如果features长度不等于1,程序会停止执行,并在异常消息中显示features的内容以帮助调试
x = features[0]
batch_size = x.shape[0]
x = self.decoder["backbone"](x)
x = self.decoder["neck"](x)
if self.training:
outputs = {}
for type, head in self.heads.items():#遍历每个类型的头部网络(self.heads)并进行相应的计算
if type == "object":
pred_dict = head(x, metas)
losses = head.loss(gt_bboxes_3d, gt_labels_3d, pred_dict)#loss方法计算预测结果与标签之间的损失
elif type == "map":
losses = head(x, gt_masks_bev)#计算预测结果与地面真实掩码之间的损失
else:
raise ValueError(f"unsupported head: {type}")
for name, val in losses.items(): #name 是损失的名称,val 是损失的值
if val.requires_grad:
outputs[f"loss/{type}/{name}"] = val * self.loss_scale[type] #缩放因子
else:
outputs[f"stats/{type}/{name}"] = val
if self.use_depth_loss:
if 'depth' in auxiliary_losses:
outputs["loss/depth"] = auxiliary_losses['depth']
else:
raise ValueError('Use depth loss is true, but depth loss not found')
return outputs
else:
outputs = [{} for _ in range(batch_size)]#[{}, {}, {}, {}, {}, ……]
for type, head in self.heads.items():
if type == "object":
pred_dict = head(x, metas) #这个字典通常包含了目标检测任务的各种预测,如边界框、置信度分数、类别标签等
bboxes = head.get_bboxes(pred_dict, metas)#获取预测框坐标信息,通常包括边界框的坐标、置信度分数和类别标签等
for k, (boxes, scores, labels) in enumerate(bboxes):
outputs[k].update( #添加
{
"boxes_3d": boxes.to("cpu"),
"scores_3d": scores.cpu(), #将数据的处理设备从其他设备(如.cuda()拿到cpu上),不会改变变量类型,转换后仍然是Tensor变量
"labels_3d": labels.cpu(),
}
)
elif type == "map":
logits = head(x)
for k in range(batch_size):
outputs[k].update(
{
"masks_bev": logits[k].cpu(),
"gt_masks_bev": gt_masks_bev[k].cpu(),
}
)
else:
raise ValueError(f"unsupported head: {type}")
return outputs