mit的BEVFusion代码学习——bevfusion.py

代码位置:bevfusion\mmdet3d\models\fusion_models\bevfusion.py

from typing import Any, Dict

import torch
from mmcv.runner import auto_fp16, force_fp32
#auto_fp16 是一种自动混合精度训练的技术,允许在需要时自动切换精度,而 force_fp32 是一种显式指示使用单精度浮点数的选项
from torch import nn
from torch.nn import functional as F

from mmdet3d.models.builder import (
    build_backbone,
    build_fuser,
    build_head,
    build_neck,
    build_vtransform,
)
from mmdet3d.ops import Voxelization, DynamicScatter
from mmdet3d.models import FUSIONMODELS


from .base import Base3DFusionModel

__all__ = ["BEVFusion"]
#加载模块,bevfuion的pipeline

#一旦类被注册,就可以在配置文件中指定要构建的融合模型类型,然后通过FUSIONMODELS.build(cfg)构建相应的类实例
@FUSIONMODELS.register_module()#装饰器用于将BEVFusion类注册到FUSIONMODELS注册表中
class BEVFusion(Base3DFusionModel):
    def __init__(
        self,
        encoders: Dict[str, Any],#字典[key(字符串),value(允许任何类型的值)]
        fuser: Dict[str, Any],
        decoder: Dict[str, Any],
        heads: Dict[str, Any],
        **kwargs, #用于接收额外的关键字参数
    ) -> None:
        super().__init__()

        self.encoders = nn.ModuleDict() #ModuleDict可以像常规Python字典一样索引,同样自动将每个 module 的 parameters 添加到网络之中的容器(注册)
        if encoders.get("camera") is not None:#是否使用camera encoder
            self.encoders["camera"] = nn.ModuleDict(
                {
                    "backbone": build_backbone(encoders["camera"]["backbone"]),
                    "neck": build_neck(encoders["camera"]["neck"]),
                    "vtransform": build_vtransform(encoders["camera"]["vtransform"]),
                }
            )
        if encoders.get("lidar") is not None:#是否使用lidar encoder
            if encoders["lidar"]["voxelize"].get("max_num_points", -1) > 0: #若键不存在,返回默认值 -1
                voxelize_module = Voxelization(**encoders["lidar"]["voxelize"])
            else:
                voxelize_module = DynamicScatter(**encoders["lidar"]["voxelize"])
            self.encoders["lidar"] = nn.ModuleDict(
                {
                    "voxelize": voxelize_module,
                    "backbone": build_backbone(encoders["lidar"]["backbone"]),
                }
            )
            self.voxelize_reduce = encoders["lidar"].get("voxelize_reduce", True)

        if encoders.get("radar") is not None:
            if encoders["radar"]["voxelize"].get("max_num_points", -1) > 0:
                voxelize_module = Voxelization(**encoders["radar"]["voxelize"])
            else:
                voxelize_module = DynamicScatter(**encoders["radar"]["voxelize"])
            self.encoders["radar"] = nn.ModuleDict(
                {
                    "voxelize": voxelize_module,
                    "backbone": build_backbone(encoders["radar"]["backbone"]),
                }
            )
            self.voxelize_reduce = encoders["radar"].get("voxelize_reduce", True)

        if fuser is not None:
            self.fuser = build_fuser(fuser)
        else:
            self.fuser = None

        self.decoder = nn.ModuleDict(
            {
                "backbone": build_backbone(decoder["backbone"]),
                "neck": build_neck(decoder["neck"]),
            }
        )
        self.heads = nn.ModuleDict()
        for name in heads: #heads 字典中的键代表了不同的头部(head)的名称或标识,可能有分类头部、边界框回归头部等
            if heads[name] is not None:
                self.heads[name] = build_head(heads[name])

        if "loss_scale" in kwargs:
            self.loss_scale = kwargs["loss_scale"]
        else:
            self.loss_scale = dict()
            for name in heads:
                if heads[name] is not None:
                    self.loss_scale[name] = 1.0

        # If the camera's vtransform is a BEVDepth version, then we're using depth loss. 
        self.use_depth_loss = ((encoders.get('camera', {}) or {}).get('vtransform', {}) or {}).get('type', '') in ['BEVDepth', 'AwareBEVDepth', 'DBEVDepth', 'AwareDBEVDepth']
                                    #如果 'camera' 不存在,返回空字典{}                          #从vtransform的配置字典中获取键为'type'的值。若'type'不存在,返回空字符串''

        self.init_weights()

    def init_weights(self) -> None:
        if "camera" in self.encoders:
            self.encoders["camera"]["backbone"].init_weights()

    def extract_camera_features(  #提取相机特征
        self,
        x,
        points,
        radar_points,
        camera2ego,#用于相机坐标系和世界坐标系之间的转换
        lidar2ego,#用于激光雷达坐标系和世界坐标系之间的转换
        lidar2camera,#用于激光雷达坐标系和相机坐标系之间的转换
        lidar2image,#用于激光雷达坐标系和图像坐标系之间的转换
        camera_intrinsics,#相机内参,大小为(B, 3, 3),其中第一个维度表示样本数量,第二个和第三个维度表示相机内参矩阵
        camera2lidar,#相机坐标系和激光雷达坐标系之间的转换
        img_aug_matrix,#图像增广矩阵
        lidar_aug_matrix,#激光雷达增广矩阵
        img_metas,#图像的元数据
        gt_depths=None,
    ) -> torch.Tensor:#返回一个torch.Tensor类型的输出
        B, N, C, H, W = x.size()
        x = x.view(B * N, C, H, W)

        x = self.encoders["camera"]["backbone"](x)#图像编码
        x = self.encoders["camera"]["neck"](x)

        if not isinstance(x, torch.Tensor):
            x = x[0]

        BN, C, H, W = x.size()
        x = x.view(B, int(BN / B), C, H, W)#调整为原来的形状

        x = self.encoders["camera"]["vtransform"](  #models/vtransforms/lss.py
            x,
            points,
            radar_points,
            camera2ego,
            lidar2ego,
            lidar2camera,
            lidar2image,
            camera_intrinsics,
            camera2lidar,
            img_aug_matrix,
            lidar_aug_matrix,
            img_metas,
            depth_loss=self.use_depth_loss, 
            gt_depths=gt_depths,
        )
        return x
    
    def extract_features(self, x, sensor) -> torch.Tensor:
        feats, coords, sizes = self.voxelize(x, sensor)
        batch_size = coords[-1, 0] + 1 #查看 coords 中最后一行的第一个元素并加 1
        x = self.encoders[sensor]["backbone"](feats, coords, batch_size, sizes=sizes)#将体素化后的特征传递给骨干网
        return x
    
    # def extract_lidar_features(self, x) -> torch.Tensor:
    #     feats, coords, sizes = self.voxelize(x)#将输入的点云数据x转换为体素格式
    #     batch_size = coords[-1, 0] + 1
    #     x = self.encoders["lidar"]["backbone"](feats, coords, batch_size, sizes=sizes)
    #     return x

    # def extract_radar_features(self, x) -> torch.Tensor:
    #     feats, coords, sizes = self.radar_voxelize(x)
    #     batch_size = coords[-1, 0] + 1
    #     x = self.encoders["radar"]["backbone"](feats, coords, batch_size, sizes=sizes)
    #     return x

    @torch.no_grad()
    @force_fp32()
    def voxelize(self, points, sensor):
        feats, coords, sizes = [], [], []
        for k, res in enumerate(points): #k为索引,res为点云数据   #TypeError: 'DataContainer' object is not iterable
            ret = self.encoders[sensor]["voxelize"](res)#对点云数据进行离散化处理
            if len(ret) == 3:
                # hard voxelize硬体素化(更高效)
                f, c, n = ret #ret 包含三个元素:特征 f、坐标 c 和大小 n(生成的体素的数量)
            else:
                assert len(ret) == 2  #软体素化(更精确):将点云数据离散化为体素,但每个体素中可以包含多个点,而且每个点对体素的贡献可以是连续的、权重化的,可更好地反映点云数据的密度分布
                f, c = ret
                n = None
            feats.append(f)
            coords.append(F.pad(c, (1, 0), mode="constant", value=k))#在第一列填充k
            if n is not None:
                sizes.append(n)

        feats = torch.cat(feats, dim=0)
        coords = torch.cat(coords, dim=0)
        if len(sizes) > 0:#存在大小信息
            sizes = torch.cat(sizes, dim=0)
            if self.voxelize_reduce:
                feats = feats.sum(dim=1, keepdim=False) / sizes.type_as(feats).view(
                    -1, 1
                )#进行了特征的归一化操作
                feats = feats.contiguous()#将特征向量重新排列成一个连续的内存块

        return feats, coords, sizes

    # @torch.no_grad()
    # @force_fp32()
    # def radar_voxelize(self, points):
    #     feats, coords, sizes = [], [], []
    #     for k, res in enumerate(points):
    #         ret = self.encoders["radar"]["voxelize"](res)
    #         if len(ret) == 3:
    #             # hard voxelize
    #             f, c, n = ret
    #         else:
    #             assert len(ret) == 2
    #             f, c = ret
    #             n = None
    #         feats.append(f)
    #         coords.append(F.pad(c, (1, 0), mode="constant", value=k))
    #         if n is not None:
    #             sizes.append(n)

    #     feats = torch.cat(feats, dim=0)
    #     coords = torch.cat(coords, dim=0)
    #     if len(sizes) > 0:
    #         sizes = torch.cat(sizes, dim=0)
    #         if self.voxelize_reduce:
    #             feats = feats.sum(dim=1, keepdim=False) / sizes.type_as(feats).view(
    #                 -1, 1
    #             )
    #             feats = feats.contiguous()

    #     return feats, coords, sizes

    @auto_fp16(apply_to=("img", "points")) #装饰器仅对函数的 "img" 和 "points" 参数进行操作
    def forward(
        self,
        img,
        points,
        camera2ego,
        lidar2ego,
        lidar2camera,
        lidar2image,
        camera_intrinsics,
        camera2lidar,
        img_aug_matrix,
        lidar_aug_matrix,
        metas,
        depths,
        radar=None,
        gt_masks_bev=None,
        gt_bboxes_3d=None,
        gt_labels_3d=None,
        **kwargs,
    ):
        if isinstance(img, list):
            raise NotImplementedError
        else:
            outputs = self.forward_single(
                img,
                points,
                camera2ego,
                lidar2ego,
                lidar2camera,
                lidar2image,
                camera_intrinsics,
                camera2lidar,
                img_aug_matrix,
                lidar_aug_matrix,
                metas,
                depths,
                radar,
                gt_masks_bev,
                gt_bboxes_3d,
                gt_labels_3d,
                **kwargs,
            )
            return outputs

    @auto_fp16(apply_to=("img", "points"))
    def forward_single(
        self,
        img,
        points,
        camera2ego,
        lidar2ego,
        lidar2camera,
        lidar2image,
        camera_intrinsics,
        camera2lidar,
        img_aug_matrix,
        lidar_aug_matrix,
        metas,
        depths=None,
        radar=None,
        gt_masks_bev=None,
        gt_bboxes_3d=None,
        gt_labels_3d=None,
        **kwargs,
    ):
        features = []
        auxiliary_losses = {}
        for sensor in (
            self.encoders if self.training else list(self.encoders.keys())[::-1]
        ):#在训练模式下遍历self.encoders中的传感器,否则逆序遍历self.encoders.keys()中的传感器
            if sensor == "camera":
                feature = self.extract_camera_features(
                    img,
                    points,
                    radar,
                    camera2ego,
                    lidar2ego,
                    lidar2camera,
                    lidar2image,
                    camera_intrinsics,
                    camera2lidar,
                    img_aug_matrix,
                    lidar_aug_matrix,
                    metas,
                    gt_depths=depths,
                )
                if self.use_depth_loss:
                    feature, auxiliary_losses['depth'] = feature[0], feature[-1]
            elif sensor == "lidar":
                feature = self.extract_features(points, sensor)
            elif sensor == "radar":
                feature = self.extract_features(radar, sensor)
            else:
                raise ValueError(f"unsupported sensor: {sensor}")

            features.append(feature)

        if not self.training:
            # avoid OOM(内存耗尽)
            features = features[::-1] #对特征进行逆序(reverse)操作

        if self.fuser is not None:
            x = self.fuser(features)#将特征输入融合器进行融合
        else:
            assert len(features) == 1, features #如果features长度不等于1,程序会停止执行,并在异常消息中显示features的内容以帮助调试
            x = features[0]

        batch_size = x.shape[0]

        x = self.decoder["backbone"](x)
        x = self.decoder["neck"](x)

        if self.training:
            outputs = {}
            for type, head in self.heads.items():#遍历每个类型的头部网络(self.heads)并进行相应的计算
                if type == "object":
                    pred_dict = head(x, metas)
                    losses = head.loss(gt_bboxes_3d, gt_labels_3d, pred_dict)#loss方法计算预测结果与标签之间的损失
                elif type == "map":
                    losses = head(x, gt_masks_bev)#计算预测结果与地面真实掩码之间的损失
                else:
                    raise ValueError(f"unsupported head: {type}")
                for name, val in losses.items(): #name 是损失的名称,val 是损失的值
                    if val.requires_grad:
                        outputs[f"loss/{type}/{name}"] = val * self.loss_scale[type] #缩放因子
                    else:
                        outputs[f"stats/{type}/{name}"] = val
            if self.use_depth_loss:
                if 'depth' in auxiliary_losses:
                    outputs["loss/depth"] = auxiliary_losses['depth']
                else:
                    raise ValueError('Use depth loss is true, but depth loss not found')
            return outputs
        else:
            outputs = [{} for _ in range(batch_size)]#[{}, {}, {}, {}, {}, ……]
            for type, head in self.heads.items():
                if type == "object":
                    pred_dict = head(x, metas) #这个字典通常包含了目标检测任务的各种预测,如边界框、置信度分数、类别标签等
                    bboxes = head.get_bboxes(pred_dict, metas)#获取预测框坐标信息,通常包括边界框的坐标、置信度分数和类别标签等
                    for k, (boxes, scores, labels) in enumerate(bboxes):
                        outputs[k].update( #添加
                            {
                                "boxes_3d": boxes.to("cpu"),
                                "scores_3d": scores.cpu(), #将数据的处理设备从其他设备(如.cuda()拿到cpu上),不会改变变量类型,转换后仍然是Tensor变量
                                "labels_3d": labels.cpu(),
                            }
                        )
                elif type == "map":
                    logits = head(x)
                    for k in range(batch_size):
                        outputs[k].update(
                            {
                                "masks_bev": logits[k].cpu(),
                                "gt_masks_bev": gt_masks_bev[k].cpu(),
                            }
                        )
                else:
                    raise ValueError(f"unsupported head: {type}")
            return outputs