Skip to content

Commit

Permalink
formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlov721 committed Sep 19, 2024
1 parent d2a95e5 commit af77626
Show file tree
Hide file tree
Showing 8 changed files with 189 additions and 95 deletions.
57 changes: 39 additions & 18 deletions luxonis_train/attached_modules/losses/obb_detection_loss.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,19 @@
xywh2xyxy,
xyxyxyxy2xywhr,
)
from luxonis_train.utils.types import IncompatibleException, Labels, LabelType, Packet
from luxonis_train.utils.types import (
IncompatibleException,
Labels,
LabelType,
Packet,
)

from .base_loss import BaseLoss


class OBBDetectionLoss(BaseLoss[Tensor, Tensor, Tensor, Tensor, Tensor, Tensor]):
class OBBDetectionLoss(
BaseLoss[Tensor, Tensor, Tensor, Tensor, Tensor, Tensor]
):
node: EfficientOBBoxHead
supported_labels = [LabelType.OBOUNDINGBOX]

Expand Down Expand Up @@ -127,7 +134,9 @@ def prepare(
self.grid_cell_offset,
multiply_with_stride=True,
)
self.anchor_points_strided = self.anchor_points / self.stride_tensor
self.anchor_points_strided = (
self.anchor_points / self.stride_tensor
)

target = self._preprocess_target(
target, batch_size
Expand All @@ -144,7 +153,9 @@ def prepare(
)
pred_bboxes = torch.cat(
(
dist2rbbox(pred_distri_tensor, pred_angles, self.anchor_points_strided),
dist2rbbox(
pred_distri_tensor, pred_angles, self.anchor_points_strided
),
pred_angles,
),
dim=-1,
Expand Down Expand Up @@ -198,10 +209,14 @@ def forward(
assigned_scores: Tensor,
mask_positive: Tensor,
):
one_hot_label = F.one_hot(assigned_labels.long(), self.n_classes + 1)[..., :-1]
one_hot_label = F.one_hot(assigned_labels.long(), self.n_classes + 1)[
..., :-1
]

# CLS loss
loss_cls = self.varifocal_loss(pred_scores, assigned_scores, one_hot_label)
loss_cls = self.varifocal_loss(
pred_scores, assigned_scores, one_hot_label
)
# loss_cls = self.bce(pred_scores, assigned_scores)
if assigned_scores.sum() > 1:
loss_cls /= assigned_scores.sum()
Expand Down Expand Up @@ -234,8 +249,8 @@ def forward(
return loss, sub_losses

def _preprocess_target(self, target: Tensor, batch_size: int):
"""Preprocess target in shape [batch_size, N, 6] where N is maximum number of
instances in one image."""
"""Preprocess target in shape [batch_size, N, 6] where N is
maximum number of instances in one image."""
idx_cls = target[:, :2]
xyxyxyxy = target[:, 2:]
cxcywhr = xyxyxyxy2xywhr(xyxyxyxy)
Expand All @@ -244,7 +259,8 @@ def _preprocess_target(self, target: Tensor, batch_size: int):
else:
target = torch.cat([idx_cls, torch.tensor(cxcywhr)], dim=-1)
sample_ids, counts = cast(
tuple[Tensor, Tensor], torch.unique(target[:, 0].int(), return_counts=True)
tuple[Tensor, Tensor],
torch.unique(target[:, 0].int(), return_counts=True),
)
c_max = int(counts.max()) if counts.numel() > 0 else 0
out_target = torch.zeros(batch_size, c_max, 6, device=target.device)
Expand Down Expand Up @@ -283,7 +299,8 @@ def forward(
self, pred_score: Tensor, target_score: Tensor, label: Tensor
) -> Tensor:
weight = (
self.alpha * pred_score.pow(self.gamma) * (1 - label) + target_score * label
self.alpha * pred_score.pow(self.gamma) * (1 - label)
+ target_score * label
)
ce_loss = F.binary_cross_entropy(
pred_score.float(), target_score.float(), reduction="none"
Expand All @@ -296,8 +313,8 @@ class DFLoss(nn.Module):
"""Criterion class for computing DFL losses during training.
@type reg_max: int
@param reg_max: Number of bins for predicting the distributions of bounding box
coordinates.
@param reg_max: Number of bins for predicting the distributions of
bounding box coordinates.
"""

def __init__(self, reg_max=16) -> None:
Expand All @@ -318,9 +335,13 @@ def __call__(self, pred_dist, target):
wl = tr - target # weight left
wr = 1 - wl # weight right
return (
F.cross_entropy(pred_dist, tl.view(-1), reduction="none").view(tl.shape)
F.cross_entropy(pred_dist, tl.view(-1), reduction="none").view(
tl.shape
)
* wl
+ F.cross_entropy(pred_dist, tr.view(-1), reduction="none").view(tl.shape)
+ F.cross_entropy(pred_dist, tr.view(-1), reduction="none").view(
tl.shape
)
* wr
).mean(-1, keepdim=True)

Expand All @@ -329,13 +350,13 @@ class RotatedBboxLoss(nn.Module):
"""Criterion class for computing training losses during training.
@type reg_max: int
@param reg_max: Number of bins for predicting the distributions of bounding box
coordinates.
@param reg_max: Number of bins for predicting the distributions of
bounding box coordinates.
"""

def __init__(self, reg_max):
"""Initialize the BboxLoss module with regularization maximum and DFL
settings."""
"""Initialize the BboxLoss module with regularization maximum
and DFL settings."""
super().__init__()
self.dfl_loss = DFLoss(reg_max) if reg_max > 1 else None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@


class MeanAveragePrecisionOBB(BaseMetric):
"""Compute the Mean-Average-Precision (mAP) and Mean-Average-Recall (mAR) for object
detection predictions using oriented bounding boxes.
"""Compute the Mean-Average-Precision (mAP) and Mean-Average-Recall
(mAR) for object detection predictions using oriented bounding
boxes.
Partially adapted from U{YOLOv8 OBBMetrics
<https://github.com/ultralytics/ultralytics/blob/ba438aea5ae4d0e7c28d59ed8408955d16ca71ec/ultralytics/utils/metrics.py#L1223>}.
Expand Down Expand Up @@ -39,15 +40,16 @@ def update(
outputs: list[Tensor], # preds
labels: list[Tensor], # batch
):
"""Update metrics without erasing stats from the previous batch, i.e. the
metrics are calculated cumulatively.
"""Update metrics without erasing stats from the previous batch,
i.e. the metrics are calculated cumulatively.
@type outputs: list[Tensor]
@param outputs: Network predictions [x1, y1, x2, y2, conf, cls_idx, r]
unnormalized (not in [0, 1] range) [Tensor(n_bboxes, 7)]
@param outputs: Network predictions [x1, y1, x2, y2, conf,
cls_idx, r] unnormalized (not in [0, 1] range)
[Tensor(n_bboxes, 7)]
@type labels: list[Tensor]
@param labels: [cls_idx, x1, y1, x2, y2, r] unnormalized (not in [0, 1] range)
[Tensor(n_bboxes, 6)]
@param labels: [cls_idx, x1, y1, x2, y2, r] unnormalized (not in
[0, 1] range) [Tensor(n_bboxes, 6)]
"""
for si, output in enumerate(outputs):
self.stats["conf"].append(output[:, 4])
Expand Down Expand Up @@ -97,9 +99,11 @@ def prepare(

return output_nms, output_labels

def _preprocess_target(self, target: Tensor, batch_size: int, img_size) -> Tensor:
"""Preprocess target in shape [batch_size, N, 6] where N is maximum number of
instances in one image."""
def _preprocess_target(
self, target: Tensor, batch_size: int, img_size
) -> Tensor:
"""Preprocess target in shape [batch_size, N, 6] where N is
maximum number of instances in one image."""
cls_idx = target[:, 1].unsqueeze(-1)
xyxyxyxy = target[:, 2:]
xyxyxyxy[:, 0::2] *= img_size[1] # scale x
Expand All @@ -120,7 +124,8 @@ def reset(self) -> None:
def compute(
self,
) -> tuple[Tensor, dict[str, Tensor]]:
"""Process predicted results for object detection and update metrics."""
"""Process predicted results for object detection and update
metrics."""
results = self._process(
torch.cat(self.stats["tp"]).cpu().numpy(),
torch.cat(self.stats["conf"]).cpu().numpy(),
Expand All @@ -143,8 +148,9 @@ def compute(
def _process_batch(
self, detections: Tensor, gt_bboxes: Tensor, gt_cls: Tensor
) -> Tensor:
"""Perform computation of the correct prediction matrix for a batch of # "fp":
torch.from_numpy(results[1]), detections and ground truth bounding boxes.
"""Perform computation of the correct prediction matrix for a
batch of # "fp": torch.from_numpy(results[1]), detections and
ground truth bounding boxes.
@type detections: Tensor
@param detections: A tensor of shape (N, 7) representing the detected bounding boxes and associated
Expand Down Expand Up @@ -182,23 +188,26 @@ def match_predictions(
iou: Tensor,
use_scipy: bool = False,
) -> Tensor:
"""Matches predictions to ground truth objects (pred_classes, true_classes)
using IoU.
"""Matches predictions to ground truth objects (pred_classes,
true_classes) using IoU.
@type pred_classes: Tensor
@param pred_classes: Predicted class indices of shape(N,).
@type true_classes: Tensor
@param true_classes: Target class indices of shape(M,).
@type iou: Tensor
@param iou: An NxM tensor containing the pairwise IoU values for predictions and
ground of truth
@param iou: An NxM tensor containing the pairwise IoU values for
predictions and ground of truth
@type use_scipy: bool
@param use_scipy: Whether to use scipy for matching (more precise).
@param use_scipy: Whether to use scipy for matching (more
precise).
@rtype: Tensor
@return: Correct tensor of shape(N,10) for 10 IoU thresholds.
"""
# Dx10 matrix, where D - detections, 10 - IoU thresholds
correct = np.zeros((pred_classes.shape[0], self.iouv.shape[0])).astype(bool)
correct = np.zeros((pred_classes.shape[0], self.iouv.shape[0])).astype(
bool
)
# LxD matrix where L - labels (rows), D - detections (columns)
correct_class = true_classes[:, None] == pred_classes
iou = iou * correct_class # zero out the wrong classes
Expand All @@ -210,8 +219,10 @@ def match_predictions(

cost_matrix = iou * (iou >= threshold)
if cost_matrix.any():
labels_idx, detections_idx = scipy.optimize.linear_sum_assignment(
cost_matrix, maximize=True
labels_idx, detections_idx = (
scipy.optimize.linear_sum_assignment(
cost_matrix, maximize=True
)
)
valid = cost_matrix[labels_idx, detections_idx] > 0
if valid.any():
Expand All @@ -234,10 +245,13 @@ def match_predictions(
np.unique(matches[:, 0], return_index=True)[1]
]
correct[matches[:, 1].astype(int), i] = True
return torch.tensor(correct, dtype=torch.bool, device=pred_classes.device)
return torch.tensor(
correct, dtype=torch.bool, device=pred_classes.device
)

def _update_metrics(self, results: tuple[np.ndarray, ...]):
"""Updates the evaluation metrics of the model with a new set of results.
"""Updates the evaluation metrics of the model with a new set of
results.
@type results: tuple[np.ndarray, ...]
@param results: A tuple containing the following evaluation metrics:
Expand Down Expand Up @@ -277,7 +291,8 @@ def _process(
pred_cls: np.ndarray,
target_cls: np.ndarray,
) -> tuple[np.ndarray, ...]:
"""Process predicted results for object detection and update metrics."""
"""Process predicted results for object detection and update
metrics."""
results = MeanAveragePrecisionOBB.ap_per_class(
tp,
conf,
Expand All @@ -303,7 +318,8 @@ def ap_per_class(
eps: float = 1e-16,
# prefix="",
) -> tuple[np.ndarray, ...]:
"""Computes the average precision per class for object detection evaluation.
"""Computes the average precision per class for object detection
evaluation.
Args:
tp (np.ndarray): Binary array indicating whether the detection is correct (True) or not (False).
Expand Down Expand Up @@ -414,7 +430,8 @@ def ap_per_class(
def compute_ap(
recall: list[float], precision: list[float]
) -> tuple[float, np.ndarray, np.ndarray]:
"""Compute the average precision (AP) given the recall and precision curves.
"""Compute the average precision (AP) given the recall and
precision curves.
Args:
recall (list): The recall curve.
Expand All @@ -441,14 +458,18 @@ def compute_ap(
i = np.where(mrec[1:] != mrec[:-1])[
0
] # points where x-axis (recall) changes
ap = np.sum((mrec[i + 1] - mrec[i]) * mpre[i + 1]) # area under curve
ap = np.sum(
(mrec[i + 1] - mrec[i]) * mpre[i + 1]
) # area under curve

return ap, mpre, mrec

@staticmethod
def smooth(y: np.ndarray, f: float = 0.05) -> np.ndarray:
"""Box filter of fraction f."""
nf = round(len(y) * f * 2) // 2 + 1 # number of filter elements (must be odd)
nf = (
round(len(y) * f * 2) // 2 + 1
) # number of filter elements (must be odd)
p = np.ones(nf // 2) # ones padding
yp = np.concatenate((p * y[0], y, p * y[-1]), 0) # y padded
return np.convolve(yp, np.ones(nf) / nf, mode="valid") # y-smoothed
Expand Down
Loading

0 comments on commit af77626

Please sign in to comment.