diff --git a/layers.py b/layers.py index d27697e..6d511f6 100644 --- a/layers.py +++ b/layers.py @@ -2,25 +2,36 @@ from typing import Union, Tuple import torch import torch.nn as nn +import torch.nn.functional as F from .utils.logger import DummyLogger +class LayerInfo(): + def __init__(self): + self.memory = 0.0 + self.ops = 0.0 + self.output = 0.0 + + class Layer(nn.Module): # Default layer arguments - ACTIVATION = torch.nn.LeakyReLU - ACTIVATION_KWARGS = {"negative_slope": 0.1} + ACTIVATION = F.leaky_relu - USE_BATCH_NORM = True + BATCH_NORM = True BATCH_NORM_TRAINING = True BATCH_NORM_MOMENTUM = 0.01 IS_TRAINING = False METRICS = False + VERBOSE = 0 LOGGER = DummyLogger() - def __init__(self, activation): + def __init__(self, activation, batch_norm): super().__init__() + self.name = 'Layer' + self.info = LayerInfo() + # Preload default if activation == 0: activation = Layer.ACTIVATION @@ -28,93 +39,74 @@ class Layer(nn.Module): self.activation = activation() else: self.activation = activation - self.batch_norm: torch.nn._BatchNorm + self.batch_norm = Layer.BATCH_NORM if batch_norm is None else batch_norm def forward(self, input_data: torch.Tensor) -> torch.Tensor: output = input_data - if self.activation: + if self.activation is not None: output = self.activation(output) - if self.batch_norm: + if self.batch_norm is not None: output = self.batch_norm(output) return output - @staticmethod - def add_weight_decay(module: nn.Module, weight_decay: float, exclude=()): - decay = [] - no_decay = [] - for name, param in module.named_parameters(): - if not param.requires_grad: - continue - if len(param.shape) == 1 or name.endswith('.bias') or name in exclude: - no_decay.append(param) - else: - decay.append(param) - return [ - {'params': no_decay, 'weight_decay': 0.0}, - {'params': decay, 'weight_decay': weight_decay}] - class Linear(Layer): - def __init__(self, in_channels: int, out_channels: int, activation=0, use_batch_norm: bool = None, **kwargs): - super().__init__(activation) + def __init__(self, in_channels: int, out_channels: int, activation=0, batch_norm=None, **kwargs): + super().__init__(activation, batch_norm) - use_batch_norm = Layer.USE_BATCH_NORM if use_batch_norm is None else use_batch_norm - self.linear = nn.Linear(in_channels, out_channels, bias=not use_batch_norm, **kwargs) + self.fc = nn.Linear(in_channels, out_channels, **kwargs) self.batch_norm = nn.BatchNorm1d( out_channels, momentum=Layer.BATCH_NORM_MOMENTUM, - track_running_stats=Layer.BATCH_NORM_TRAINING) if use_batch_norm else None + track_running_stats=Layer.BATCH_NORM_TRAINING) if self.batch_norm else None def forward(self, input_data: torch.Tensor) -> torch.Tensor: - return super().forward(self.linear(input_data)) + return super().forward(self.fc(input_data)) class Conv1d(Layer): def __init__(self, in_channels: int, out_channels: int, kernel_size: int = 3, - stride: Union[int, Tuple[int, int]] = 1, activation=0, use_batch_norm: bool = None, **kwargs): - super().__init__(activation) + stride: Union[int, Tuple[int, int]] = 1, activation=0, batch_norm=None, **kwargs): + super().__init__(activation, batch_norm) - use_batch_norm = Layer.USE_BATCH_NORM if use_batch_norm is None else use_batch_norm self.conv = nn.Conv1d(in_channels, out_channels, kernel_size, stride=stride, - bias=use_batch_norm, **kwargs) + bias=not self.batch_norm, **kwargs) self.batch_norm = nn.BatchNorm1d( out_channels, momentum=Layer.BATCH_NORM_MOMENTUM, - track_running_stats=Layer.BATCH_NORM_TRAINING) if use_batch_norm else None + track_running_stats=Layer.BATCH_NORM_TRAINING) if self.batch_norm else None def forward(self, input_data: torch.Tensor) -> torch.Tensor: return super().forward(self.conv(input_data)) class Conv2d(Layer): - def __init__(self, in_channels: int, out_channels: int, kernel_size: Union[int, tuple[int, int]] = 3, - stride: Union[int, tuple[int, int]] = 1, activation=0, use_batch_norm: bool = None, **kwargs): - super().__init__(activation) + def __init__(self, in_channels: int, out_channels: int, kernel_size: int = 3, + stride: Union[int, Tuple[int, int]] = 1, activation=0, batch_norm=None, **kwargs): + super().__init__(activation, batch_norm) - use_batch_norm = Layer.USE_BATCH_NORM if use_batch_norm is None else use_batch_norm self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride=stride, - bias=not use_batch_norm, **kwargs) + bias=not self.batch_norm, **kwargs) self.batch_norm = nn.BatchNorm2d( out_channels, momentum=Layer.BATCH_NORM_MOMENTUM, - track_running_stats=Layer.BATCH_NORM_TRAINING) if use_batch_norm else None + track_running_stats=Layer.BATCH_NORM_TRAINING) if self.batch_norm else None def forward(self, input_data: torch.Tensor) -> torch.Tensor: return super().forward(self.conv(input_data)) class Conv3d(Layer): - def __init__(self, in_channels: int, out_channels: int, kernel_size: Union[int, tuple[int, int, int]] = 3, - stride: Union[int, Tuple[int, int]] = 1, activation=0, use_batch_norm: bool = None, **kwargs): - super().__init__(activation) + def __init__(self, in_channels: int, out_channels: int, kernel_size: int = 3, + stride: Union[int, Tuple[int, int]] = 1, activation=0, batch_norm=None, **kwargs): + super().__init__(activation, batch_norm) - use_batch_norm = Layer.USE_BATCH_NORM if use_batch_norm is None else use_batch_norm self.conv = nn.Conv3d(in_channels, out_channels, kernel_size, stride=stride, - bias=use_batch_norm, **kwargs) + bias=not self.batch_norm, **kwargs) self.batch_norm = nn.BatchNorm3d( out_channels, momentum=Layer.BATCH_NORM_MOMENTUM, - track_running_stats=Layer.BATCH_NORM_TRAINING) if use_batch_norm else None + track_running_stats=Layer.BATCH_NORM_TRAINING) if self.batch_norm else None def forward(self, input_data: torch.Tensor) -> torch.Tensor: return super().forward(self.conv(input_data)) @@ -122,32 +114,16 @@ class Conv3d(Layer): class Deconv2d(Layer): def __init__(self, in_channels: int, out_channels: int, kernel_size: int = 3, - stride: Union[int, Tuple[int, int]] = 1, activation=0, use_batch_norm: bool = None, **kwargs): - super().__init__(activation) + stride: Union[int, Tuple[int, int]] = 1, activation=0, batch_norm=None, **kwargs): + super().__init__(activation, batch_norm) - use_batch_norm = Layer.USE_BATCH_NORM if use_batch_norm is None else use_batch_norm self.deconv = nn.ConvTranspose2d( in_channels, out_channels, kernel_size, stride=stride, - bias=not use_batch_norm, **kwargs) + bias=not self.batch_norm, **kwargs) self.batch_norm = nn.BatchNorm2d( out_channels, momentum=Layer.BATCH_NORM_MOMENTUM, - track_running_stats=Layer.BATCH_NORM_TRAINING) if use_batch_norm else None + track_running_stats=Layer.BATCH_NORM_TRAINING) if self.batch_norm else None def forward(self, input_data: torch.Tensor) -> torch.Tensor: return super().forward(self.deconv(input_data)) - - -class DropPath(nn.Module): - def __init__(self, drop_prob=None): - super().__init__() - self.drop_prob = drop_prob - - def forward(self, input_data: torch.Tensor) -> torch.Tensor: - if self.drop_prob == 0.0: - return input_data - keep_prob = 1 - self.drop_prob - shape = (input_data.shape[0],) + (1,) * (input_data.ndim - 1) - random_tensor = keep_prob + torch.rand(shape, dtype=input_data.dtype, device=input_data.device) - random_tensor.floor_() # binarize - return input_data.div(keep_prob) * random_tensor diff --git a/residual.py b/residual.py index e35a009..bdc14c8 100644 --- a/residual.py +++ b/residual.py @@ -3,51 +3,65 @@ from typing import Union, Tuple import torch import torch.nn as nn -from .layers import Conv2d, LayerInfo, Layer +from .layers import LayerInfo, Layer class ResBlock(Layer): - def __init__(self, in_channels: int, out_channels: int = -1, kernel_size: int = 3, padding: int = 1, - stride: Union[int, Tuple[int, int]] = 1, activation=None, batch_norm=None, **kwargs): + def __init__(self, in_channels: int, out_channels: int, kernel_size: int = 3, + activation=None, **kwargs): super().__init__(activation if activation is not None else 0, False) - self.batch_norm = None - if out_channels == -1: - out_channels = in_channels self.seq = nn.Sequential( - Conv2d(in_channels, in_channels, kernel_size=kernel_size, stride=stride, padding=padding, **kwargs), - Conv2d(in_channels, out_channels, kernel_size=3, padding=1, - activation=None, batch_norm=batch_norm)) - self.residual = Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, activation=None) if ( - out_channels != in_channels or stride != 1) else None + nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size, bias=False, **kwargs), + nn.BatchNorm2d( + out_channels, + momentum=Layer.BATCH_NORM_MOMENTUM, + track_running_stats=not Layer.BATCH_NORM_TRAINING), + torch.nn.LeakyReLU(), + nn.Conv2d(out_channels, out_channels, kernel_size=kernel_size, bias=False, padding=1), + nn.BatchNorm2d( + out_channels, + momentum=Layer.BATCH_NORM_MOMENTUM, + track_running_stats=not Layer.BATCH_NORM_TRAINING)) + self.batch_norm = nn.BatchNorm2d( + out_channels, + momentum=Layer.BATCH_NORM_MOMENTUM, + track_running_stats=not Layer.BATCH_NORM_TRAINING) if self.batch_norm else None def forward(self, input_data: torch.Tensor) -> torch.Tensor: - if self.residual is not None: - return super().forward(self.residual(input_data) + self.seq(input_data)) return super().forward(input_data + self.seq(input_data)) class ResBottleneck(Layer): - def __init__(self, in_channels: int, out_channels: int = -1, bottleneck_channels: int = -1, kernel_size: int = 3, - stride: Union[int, Tuple[int, int]] = 1, padding=1, - activation=None, batch_norm=None, **kwargs): + def __init__(self, in_channels: int, out_channels: int, planes: int = 1, kernel_size: int = 3, + stride: Union[int, Tuple[int, int]] = 1, activation=None, **kwargs): super().__init__(activation if activation is not None else 0, False) self.batch_norm = None - if out_channels == -1: - out_channels = in_channels - if bottleneck_channels == -1: - bottleneck_channels = in_channels // 4 self.seq = nn.Sequential( - Conv2d(in_channels, bottleneck_channels, kernel_size=1), - Conv2d(bottleneck_channels, bottleneck_channels, kernel_size=kernel_size, - stride=stride, padding=padding, **kwargs), - Conv2d(bottleneck_channels, out_channels, kernel_size=1, - activation=None, batch_norm=batch_norm)) - self.residual = Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, activation=None) if ( - out_channels != in_channels or stride != 1) else None + nn.Conv2d(in_channels, out_channels, kernel_size=1, bias=False), + nn.BatchNorm2d( + out_channels, + momentum=Layer.BATCH_NORM_MOMENTUM, + track_running_stats=not Layer.BATCH_NORM_TRAINING), + torch.nn.LeakyReLU(), + nn.Conv2d(out_channels, out_channels, kernel_size=kernel_size, stride=stride, bias=False, **kwargs), + nn.BatchNorm2d( + out_channels, + momentum=Layer.BATCH_NORM_MOMENTUM, + track_running_stats=not Layer.BATCH_NORM_TRAINING), + torch.nn.LeakyReLU(), + nn.Conv2d(out_channels, planes * out_channels, kernel_size=1, bias=False), + nn.BatchNorm2d( + out_channels, + momentum=Layer.BATCH_NORM_MOMENTUM, + track_running_stats=not Layer.BATCH_NORM_TRAINING)) + self.downsample = nn.Sequential( + nn.Conv2d(in_channels, planes * out_channels, stride=stride, kernel_size=1), + nn.BatchNorm2d( + planes * out_channels, + momentum=Layer.BATCH_NORM_MOMENTUM, + track_running_stats=not Layer.BATCH_NORM_TRAINING)) def forward(self, input_data: torch.Tensor) -> torch.Tensor: - if self.residual is not None: - return super().forward(self.residual(input_data) + self.seq(input_data)) - return super().forward(input_data + self.seq(input_data)) + return super().forward(self.downsample(input_data) + self.seq(input_data)) diff --git a/ssd/box.py b/ssd/box.py deleted file mode 100644 index e255fe4..0000000 --- a/ssd/box.py +++ /dev/null @@ -1,86 +0,0 @@ -import numpy as np - - -def create_box(y_pos: float, x_pos: float, height: float, width: float) -> tuple[float, float, float, float]: - y_min, x_min, y_max, x_max = check_rectangle( - y_pos - (height / 2), x_pos - (width / 2), y_pos + (height / 2), x_pos + (width / 2)) - return (y_min + y_max) / 2, (x_min + x_max) / 2, y_max - y_min, x_max - x_min - - -def check_rectangle(y_min: float, x_min: float, y_max: float, x_max: float) -> tuple[float, float, float, float]: - if y_min < 0: - y_min = 0 - if x_min < 0: - x_min = 0 - if y_min > 1: - y_min = 1 - if x_min > 1: - x_min = 1 - if y_max < 0: - y_max = 0 - if x_max < 0: - x_max = 0 - if y_max >= 1: - y_max = 1 - if x_max >= 1: - x_max = 1 - return y_min, x_min, y_max, x_max - - -def get_boxes(predictions: np.ndarray, anchors: np.ndarray, class_index: int) -> np.ndarray: - boxes = np.zeros(anchors.shape) - boxes[:, 0] = (predictions[:, 0] * anchors[:, 2]) + anchors[:, 0] - boxes[:, 1] = (predictions[:, 1] * anchors[:, 3]) + anchors[:, 1] - boxes[:, 2] = np.exp(predictions[:, 2]) * anchors[:, 2] - boxes[:, 3] = np.exp(predictions[:, 3]) * anchors[:, 3] - boxes = np.asarray([create_box(*box) for box in boxes]) - - # return np.insert(boxes, 4, predictions[:, class_index], axis=-1) - return np.concatenate([boxes, predictions[:, class_index:class_index + 1]], axis=1) - - -def fast_nms(boxes: np.ndarray, min_iou: float) -> np.ndarray: - # if there are no boxes, return an empty list - if len(boxes) == 0: - return [] - - # initialize the list of picked indexes - pick = [] - - # grab the coordinates of the bounding boxes - y_min = boxes[:, 0] - (boxes[:, 2] / 2) - y_max = boxes[:, 0] + (boxes[:, 2] / 2) - x_min = boxes[:, 1] - (boxes[:, 3] / 2) - x_max = boxes[:, 1] + (boxes[:, 3] / 2) - scores = boxes[:, 4] - - # compute the area of the bounding boxes and sort the bounding boxes by the scores - areas = (x_max - x_min) * (y_max - y_min) - idxs = np.argsort(scores) - - # keep looping while some indexes still remain in the indexes - # list - while len(idxs) > 0: - # grab the last index in the indexes list and add the - # index value to the list of picked indexes - last = len(idxs) - 1 - i = idxs[last] - pick.append(i) - - inter_tops = np.maximum(y_min[i], y_min[idxs[:last]]) - inter_bottoms = np.minimum(y_max[i], y_max[idxs[:last]]) - inter_lefts = np.maximum(x_min[i], x_min[idxs[:last]]) - inter_rights = np.minimum(x_max[i], x_max[idxs[:last]]) - inter_areas = (inter_rights - inter_lefts) * (inter_bottoms - inter_tops) - - # compute the ratio of overlap - union_area = (areas[idxs[:last]] + areas[i]) - inter_areas - overlap = inter_areas / union_area - - # delete all indexes from the index list that have less overlap than min_iou - idxs = np.delete( - idxs, np.concatenate(([last], np.where(overlap > min_iou)[0]))) - - # return only the bounding boxes that were picked using the - # integer data type - return boxes[pick] diff --git a/ssd/loss.py b/ssd/loss.py deleted file mode 100644 index 1b3d259..0000000 --- a/ssd/loss.py +++ /dev/null @@ -1,112 +0,0 @@ -import torch -import torch.nn as nn - - -class JacardOverlap(nn.Module): - def forward(self, anchors: torch.Tensor, labels: torch.Tensor) -> torch.Tensor: - """ - Assuming rank 2 (number of boxes, locations), location is (y, x, h, w) - Jaccard overlap : A ∩ B / A ∪ B = A ∩ B / (area(A) + area(B) - A ∩ B) - Return: - jaccard overlap: (tensor) Shape: [predictions.size(0), labels.size(0)] - """ - anchors_count = anchors.size(0) - labels_count = labels.size(0) - - # Getting coords (y_min, x_min, y_max, x_max) repeated to fill (anchor count, label count) - anchor_coords = torch.cat([ - anchors[:, :2] - (anchors[:, 2:] / 2), - anchors[:, :2] + (anchors[:, 2:] / 2)], 1).unsqueeze(1).expand(anchors_count, labels_count, 4) - label_coords = torch.cat([ - labels[:, :2] - (labels[:, 2:] / 2), - labels[:, :2] + (labels[:, 2:] / 2)], 1).unsqueeze(0).expand(anchors_count, labels_count, 4) - - mins = torch.max(anchor_coords, label_coords)[:, :, :2] - maxes = torch.min(anchor_coords, label_coords)[:, :, 2:] - - inter_coords = torch.clamp(maxes - mins, min=0) - inter_area = inter_coords[:, :, 0] * inter_coords[:, :, 1] - - anchor_areas = (anchors[:, 2] * anchors[:, 3]).unsqueeze(1).expand_as(inter_area) - label_areas = (labels[:, 2] * labels[:, 3]).unsqueeze(0).expand_as(inter_area) - - union_area = anchor_areas + label_areas - inter_area - return inter_area / union_area - - -class SSDLoss(nn.Module): - def __init__(self, anchors: torch.Tensor, label_per_image: int, - negative_mining_ratio: int, matching_iou: float, - location_dimmension: int = 4, localization_loss_weight: float = 1.0): - super().__init__() - self.anchors = anchors - self.anchor_count = anchors.size(0) - self.label_per_image = label_per_image - self.location_dimmension = location_dimmension - self.negative_mining_ratio = negative_mining_ratio - self.matching_iou = matching_iou - self.localization_loss_weight = localization_loss_weight - - self.overlap = JacardOverlap() - self.matches = [] - # self.negative_matches = [] - self.positive_class_loss = torch.Tensor() - self.negative_class_loss = torch.Tensor() - self.localization_loss = torch.Tensor() - self.class_loss = torch.Tensor() - self.final_loss = torch.Tensor() - - def forward(self, input_data: torch.Tensor, input_labels: torch.Tensor) -> torch.Tensor: - batch_size = input_data.size(0) - expanded_anchors = self.anchors[:, :4].unsqueeze(0).unsqueeze(2).expand( - batch_size, self.anchor_count, self.label_per_image, 4) - expanded_labels = input_labels[:, :, :self.location_dimmension].unsqueeze(1).expand( - batch_size, self.anchor_count, self.label_per_image, self.location_dimmension) - objective_pos = (expanded_labels[:, :, :, :2] - expanded_anchors[:, :, :, :2]) / ( - expanded_anchors[:, :, :, 2:]) - objective_size = torch.log(expanded_labels[:, :, :, 2:] / expanded_anchors[:, :, :, 2:]) - - positive_objectives = [] - positive_predictions = [] - positive_class_loss = [] - negative_class_loss = [] - self.matches = [] - # self.negative_matches = [] - for batch_index in range(batch_size): - predictions = input_data[batch_index] - labels = input_labels[batch_index] - overlaps = self.overlap(self.anchors[:, :4], labels[:, :4]) - mask = (overlaps >= self.matching_iou).long() - match_indices = torch.nonzero(mask, as_tuple=False) - self.matches.append(match_indices.detach().cpu()) - - mining_count = int(self.negative_mining_ratio * len(self.matches[-1])) - masked_prediction = predictions[:, self.location_dimmension] + torch.max(mask, dim=1)[0] - non_match_indices = torch.argsort(masked_prediction, dim=-1, descending=False)[:mining_count] - # self.negative_matches.append(non_match_indices.detach().cpu()) - - for anchor_index, label_index in match_indices: - positive_predictions.append(predictions[anchor_index]) - positive_objectives.append( - torch.cat(( - objective_pos[batch_index, anchor_index, label_index], - objective_size[batch_index, anchor_index, label_index]), dim=-1)) - positive_class_loss.append(torch.log( - predictions[anchor_index, self.location_dimmension + labels[label_index, -1].long()])) - - for anchor_index in non_match_indices: - negative_class_loss.append( - torch.log(predictions[anchor_index, self.location_dimmension])) - - if not positive_predictions: - return None - positive_predictions = torch.stack(positive_predictions) - positive_objectives = torch.stack(positive_objectives) - self.positive_class_loss = -torch.sum(torch.stack(positive_class_loss)) - self.negative_class_loss = -torch.sum(torch.stack(negative_class_loss)) - self.localization_loss = nn.functional.smooth_l1_loss( - positive_predictions[:, self.location_dimmension], - positive_objectives) - self.class_loss = self.positive_class_loss + self.negative_class_loss - self.final_loss = (self.localization_loss_weight * self.localization_loss) + self.class_loss - return self.final_loss diff --git a/ssd/ssd.py b/ssd/ssd.py deleted file mode 100644 index eba064d..0000000 --- a/ssd/ssd.py +++ /dev/null @@ -1,165 +0,0 @@ -import colorsys -import math - -import numpy as np -import torch -import torch.nn as nn - -from .box import check_rectangle -from ..layers import Conv2d - - -class SSD(nn.Module): - - class Detector(nn.Module): - def __init__(self, input_features: int, output_features: int): - super().__init__() - self.conv = Conv2d(input_features, output_features, kernel_size=3, padding=1, - batch_norm=False, activation=None) - self.output = None - - def forward(self, input_data: torch.Tensor) -> torch.Tensor: - self.output = self.conv(input_data).permute(0, 2, 3, 1) - return self.output - - class DetectorMerge(nn.Module): - def __init__(self, location_dimmension: int): - super().__init__() - self.location_dim = location_dimmension - - def forward(self, detector_outputs: torch.Tensor) -> torch.Tensor: - return torch.cat( - [detector_outputs[:, :, :self.location_dim], - torch.softmax(detector_outputs[:, :, self.location_dim:], dim=2)], dim=2) - - class AnchorInfo: - def __init__(self, center: tuple[float, float], size: tuple[float], - index: int, layer_index: int, map_index: tuple[int, int], color_index: int, - ratio: float, size_factor: float): - self.index = index - self.layer_index = layer_index - self.map_index = map_index - self.color_index = color_index - self.ratio = ratio - self.size_factor = size_factor - self.center = center - self.size = size - self.box = check_rectangle( - center[0] - (size[0] / 2), center[1] - (size[1] / 2), - center[0] + (size[0] / 2), center[1] + (size[1] / 2)) - - def __repr__(self): - return (f'{self.__class__.__name__}' - f'(index:{self.index}, layer:{self.layer_index}, coord:{self.map_index}' - f', center:({self.center[0]:.03f}, {self.center[1]:.03f})' - f', size:({self.size[0]:.03f}, {self.size[1]:.03f})' - f', ratio:{self.ratio:.03f}, size_factor:{self.size_factor:.03f})' - f', y:[{self.box[0]:.03f}:{self.box[2]:.03f}]' - f', x:[{self.box[1]:.03f}:{self.box[3]:.03f}])') - - def __array__(self): - return np.array([*self.center, *self.size]) - - def __init__(self, base_network: nn.Module, input_sample: torch.Tensor, classes: list[str], - location_dimmension: int, layer_channels: list[int], layer_box_ratios: list[float], layer_args: dict, - box_size_factors: list[float]): - super().__init__() - - self.location_dim = location_dimmension - self.classes = ['none'] + classes - self.class_count = len(self.classes) - self.base_input_shape = input_sample.numpy().shape[1:] - self.base_network = base_network - sample_output = base_network(input_sample) - self.base_output_shape = list(sample_output.detach().numpy().shape)[-3:] - - layer_convs: list[nn.Module] = [] - layer_detectors: list[SSD.Detector] = [] - last_feature_count = self.base_output_shape[0] - for layer_index, (output_features, kwargs) in enumerate(zip(layer_channels, layer_args)): - if 'disable' not in kwargs: - layer_convs.append(Conv2d(last_feature_count, output_features, **kwargs)) - layer_detectors.append(SSD.Detector( - last_feature_count, (self.class_count + self.location_dim) * len(layer_box_ratios[layer_index]))) - # layers.append(SSD.Layer( - # last_feature_count, output_features, - # (self.class_count + self.location_dim) * len(layer_box_ratios[layer_index]), - # **kwargs)) - last_feature_count = output_features - self.layer_convs = nn.ModuleList(layer_convs) - self.layer_detectors = nn.ModuleList(layer_detectors) - - self.merge = self.DetectorMerge(location_dimmension) - - self.anchors_numpy, self.anchor_info, self.box_colors = self._create_anchors( - sample_output, self.layer_convs, self.layer_detectors, layer_box_ratios, box_size_factors, - input_sample.shape[3] / input_sample.shape[2]) - self.anchors = torch.from_numpy(self.anchors_numpy) - - def forward(self, input_data: torch.Tensor) -> torch.Tensor: - head = self.base_network(input_data) - detector_outputs = [] - for layer_index, detector in enumerate(self.layer_detectors): - detector_out = detector(head) - detector_outputs.append(detector_out.reshape( - detector_out.size(0), -1, self.class_count + self.location_dim)) - if layer_index < len(self.layer_convs): - head = self.layer_convs[layer_index](head) - detector_outputs = torch.cat(detector_outputs, 1) - return self.merge(detector_outputs) - # base_output = self.base_network(input_data) - # head = base_output - # outputs = [] - # for layer in self.layers: - # head, detector_output = layer(head) - # outputs.append(detector_output.reshape(base_output.size(0), -1, self.class_count + self.location_dim)) - # outputs = torch.cat(outputs, 1) - # return torch.cat( - # [outputs[:, :, :self.location_dim], torch.softmax(outputs[:, :, self.location_dim:], dim=2)], dim=2) - - def _apply(self, fn): - super()._apply(fn) - self.anchors = fn(self.anchors) - return self - - @staticmethod - def _create_anchors( - base_output: torch.Tensor, layers: nn.ModuleList, detectors: nn.ModuleList, layer_box_ratios: list[float], - box_size_factors: list[float], image_ratio: float) -> tuple[np.ndarray, np.ndarray, list[np.ndarray]]: - anchors = [] - anchor_info: list[SSD.AnchorInfo] = [] - box_colors: list[np.ndarray] = [] - head = base_output - - for layer_index, detector in enumerate(detectors): - detector_output = detector(head) # detector output shape : NCRSHW (Ratio, Size) - if layer_index < len(layers): - head = layers[layer_index](head) - - detector_rows = detector_output.size()[1] - detector_cols = detector_output.size()[2] - color_index = 0 - layer_ratios = layer_box_ratios[layer_index] - for index_y in range(detector_rows): - center_y = (index_y + 0.5) / detector_rows - for index_x in range(detector_cols): - center_x = (index_x + 0.5) / detector_cols - for ratio, size_factor in zip(layer_ratios, box_size_factors): - box_colors.append((np.asarray(colorsys.hsv_to_rgb( - color_index / len(layer_ratios), 1.0, 1.0)) * 255).astype(np.uint8)) - color_index += 1 - unit_box_size = size_factor / max(detector_rows, detector_cols) - anchor_width = unit_box_size * math.sqrt(ratio / image_ratio) - anchor_height = unit_box_size / math.sqrt(ratio / image_ratio) - anchor_info.append(SSD.AnchorInfo( - (center_y, center_x), - (anchor_height, anchor_width), - len(anchors), - layer_index, - (index_y, index_x), - len(box_colors) - 1, - ratio, - size_factor - )) - anchors.append([center_y, center_x, anchor_height, anchor_width]) - return np.asarray(anchors, dtype=np.float32), anchor_info, box_colors diff --git a/transformer/vision_transformer.py b/transformer/vision_transformer.py deleted file mode 100644 index bdca511..0000000 --- a/transformer/vision_transformer.py +++ /dev/null @@ -1,188 +0,0 @@ -""" -Data efficent image transformer (deit) -from https://github.com/facebookresearch/deit, https://arxiv.org/abs/2012.12877 -And Vit : https://arxiv.org/abs/2010.11929 -""" - - -from functools import partial -import math - -import numpy as np -import torch -import torch.nn as nn - -from ..layers import DropPath, Layer - - -class PatchEmbed(nn.Module): - def __init__(self, image_shape: tuple[int, int], patch_size: int = 16, - in_channels: int = 3, embed_dim: int = 768): - super().__init__() - patch_count = (image_shape[0] // patch_size) * (image_shape[1] // patch_size) - self.image_shape = image_shape - self.patch_size = patch_size - self.patch_count = patch_count - - self.projector = nn.Conv2d(in_channels, embed_dim, kernel_size=patch_size, stride=patch_size) - - def forward(self, input_data: torch.Tensor) -> torch.Tensor: - return self.projector(input_data).flatten(2).transpose(1, 2) - - -class SelfAttention(nn.Module): - def __init__(self, dim: int, head_count: int, qkv_bias: bool, qk_scale: float, - attention_drop_rate: float, projection_drop_rate: float): - super().__init__() - self.head_count = head_count - head_dim = dim // head_count - self.scale = qk_scale or head_dim ** -0.5 - - self.qkv = nn.Linear(dim, dim * 3, bias=qkv_bias) - self.attention_drop = nn.Dropout(attention_drop_rate) - self.projector = nn.Linear(dim, dim) - self.projection_drop = nn.Dropout(projection_drop_rate) - - def forward(self, input_data: torch.Tensor) -> torch.Tensor: - batch_size, sequence_length, channel_count = input_data.shape - qkv = self.qkv(input_data).reshape( - batch_size, sequence_length, 3, self.head_count, channel_count // self.head_count).permute( - 2, 0, 3, 1, 4) - # (output shape : 3, batch_size, head_count, sequence_lenght, channel_count / head_count) - query, key, value = qkv[0], qkv[1], qkv[2] - attention = self.attention_drop(((query @ key.transpose(-2, -1)) * self.scale).softmax(dim=-1)) - return self.projection_drop(self.projector( - (attention @ value).transpose(1, 2).reshape(batch_size, sequence_length, channel_count))) - - -class Block(nn.Module): - def __init__(self, dim: int, head_count: int, mlp_ratio: float, - qkv_bias: bool, qk_scale: float, drop_rate: float, - attention_drop_rate: float, drop_path_rate: float, - norm_layer=0, activation=0): - super().__init__() - - self.norm1 = norm_layer(dim) - self.attention = SelfAttention(dim, head_count, qkv_bias, qk_scale, attention_drop_rate, drop_rate) - self.drop_path = DropPath(drop_path_rate) if drop_path_rate > 0.0 else nn.Identity() - self.norm2 = norm_layer(dim) - self.mlp = nn.Sequential( - nn.Linear(dim, int(dim * mlp_ratio)), - activation(), - nn.Linear(int(dim * mlp_ratio), dim), - nn.Dropout(drop_rate)) - - def forward(self, input_data: torch.Tensor) -> torch.Tensor: - out = input_data + self.drop_path(self.attention(self.norm1(input_data))) - return out + self.drop_path(self.mlp(self.norm2(out))) - - -class VissionTransformer(nn.Module): - QK_SCALE = None - ACTIVATION = 0 - NORM_LAYER = nn.LayerNorm - - def __init__(self, image_shape: tuple[int, int, int], class_count: int, depth: int, - patch_size: int = 16, embed_dim: int = 768, - head_count: int = 8, mlp_ratio: float = 4.0, qkv_bias: bool = True, qk_scale: float = None, - representation_size=None, distilled: bool = False, drop_rate: float = 0.0, - attention_drop_rate: float = 0.0, drop_path_rate: float = 0.0, embed_layer=PatchEmbed, - norm_layer=0, activation=0): - super().__init__() - qk_scale = qk_scale if qk_scale is not None else self.QK_SCALE - activation = activation if activation != 0 else self.ACTIVATION - activation = activation if activation != 0 else Layer.ACTIVATION - norm_layer = norm_layer if norm_layer != 0 else self.NORM_LAYER - - self.class_count = class_count - self.feature_count = self.embed_dim = embed_dim - self.distilled = distilled - norm_layer = norm_layer or partial(nn.LayerNorm, eps=1e-6) - - self.patch_embed = embed_layer(image_shape[1:], patch_size=patch_size, - in_channels=image_shape[0], embed_dim=embed_dim) - patch_count = self.patch_embed.patch_count - token_count = 2 if distilled else 1 - - self.class_token = nn.Parameter(torch.zeros(1, 1, embed_dim)) - self.distillation_token = nn.Parameter(torch.zeros(1, 1, embed_dim)) if distilled else None - self.position_embeddings = nn.Parameter(torch.zeros(1, patch_count + token_count, embed_dim)) - self.position_drop = nn.Dropout(drop_rate) if drop_rate > 0.0 else nn.Identity() - - depth_path_drop_rates = np.linspace(0, drop_path_rate, depth) if drop_path_rate > 0.0 else [0.0] * depth - self.blocks = nn.Sequential(*[ - Block(embed_dim, head_count, mlp_ratio, qkv_bias, qk_scale, drop_rate, attention_drop_rate, - pdr, norm_layer, activation) for pdr in depth_path_drop_rates]) - self.norm = norm_layer(embed_dim) - - # Representation Layer - if representation_size and not distilled: - self.feature_count = representation_size - self.pre_logits = nn.Sequential( - nn.Linear(embed_dim, representation_size), - nn.Tanh()) - else: - self.pre_logits = nn.Identity() - - # Final classifier - self.head = nn.Linear(self.feature_count, class_count) if class_count > 0 else nn.Identity() - self.head_distilled = nn.Linear( - self.embed_dim, self.class_count) if class_count > 0 and distilled else nn.Identity() - - # Init weights - nn.init.trunc_normal_(self.class_token, std=0.02) - nn.init.trunc_normal_(self.position_embeddings, std=0.02) - if self.distilled: - nn.init.trunc_normal_(self.distillation_token, std=0.02) - - # Applying weights initialization made no difference so far - self.apply(partial(self._init_weights, head_bias=-math.log(self.class_count))) - - @torch.jit.ignore - def no_weight_decay(self) -> dict: - return {'class_token', 'distillation_token', 'position_embeddings'} - - def get_classifier(self): - return self.head if self.distillation_token is None else (self.head, self.head_distilled) - - def reset_classifier(self, class_count: int): - self.class_count = class_count - self.head = nn.Linear(self.feature_count, class_count) if class_count > 0 else nn.Identity() - self.head_distilled = nn.Linear( - self.embed_dim, self.class_count) if class_count > 0 and self.distilled else nn.Identity() - - def forward(self, input_data: torch.Tensor) -> torch.Tensor: - embeddings = self.patch_embed(input_data) - class_token = self.class_token.expand(embeddings.shape[0], -1, -1) - - if self.distilled: - block_output = self.norm(self.blocks(self.position_drop( - torch.cat((class_token, self.distillation_token.expand(embeddings.shape[0], -1, -1), embeddings), dim=1) - + self.position_embeddings))) - distilled_head_output = self.head_distilled(block_output[:, 1]) - head_output = self.head(block_output[:, 0]) - if self.training and not torch.jit.is_scripting(): - return head_output, distilled_head_output - return (head_output + distilled_head_output) / 2.0 - - block_output = self.norm(self.blocks(self.position_drop( - torch.cat((class_token, embeddings), dim=1) + self.position_embeddings))) - return self.head(self.pre_logits(block_output[:, 0])) - - @staticmethod - def _init_weights(module: nn.Module, name: str = '', head_bias: float = 0.0): - if isinstance(module, nn.Linear): - if name.startswith('head'): - nn.init.zeros_(module.weight) - nn.init.constant_(module.bias, head_bias) - elif name.startswith('pre_logits'): - nn.init.xavier_normal_(module.weight) - nn.init.zeros_(module.bias) - # pytorch init for conv is fine - # elif isinstance(module, nn.Conv2d): - # nn.init.xavier_normal_(module.weight) - # if module.bias is not None: - # nn.init.zeros_(module.bias) - elif isinstance(module, nn.LayerNorm): - nn.init.ones_(module.weight) - nn.init.zeros_(module.bias) diff --git a/utils/batch_generator.py b/utils/batch_generator.py index ad02b55..b8257ef 100644 --- a/utils/batch_generator.py +++ b/utils/batch_generator.py @@ -3,6 +3,7 @@ from multiprocessing import shared_memory import os from typing import Callable, Iterable, Optional, Tuple +import h5py import numpy as np @@ -19,7 +20,6 @@ class BatchGenerator: self.num_workers = num_workers self.flip_data = flip_data self.pipeline = pipeline - self.process_id = 'NA' if not preload: self.data_processor = data_processor @@ -37,7 +37,6 @@ class BatchGenerator: os.makedirs(os.path.dirname(save_path)) if save and os.path.exists(save_path): - import h5py with h5py.File(save_path, 'r') as h5_file: self.data = np.asarray(h5_file['data']) self.label = np.asarray(h5_file['label']) @@ -81,6 +80,7 @@ class BatchGenerator: self.batch_data = first_data self.batch_label = first_label + self.process_id = 'NA' if self.prefetch or self.num_workers > 1: self.cache_memory_indices = shared_memory.SharedMemory(create=True, size=self.index_list.nbytes) self.cache_indices = np.ndarray( diff --git a/utils/sequence_batch_generator.py b/utils/sequence_batch_generator.py index 947a0b2..005caf3 100644 --- a/utils/sequence_batch_generator.py +++ b/utils/sequence_batch_generator.py @@ -3,6 +3,7 @@ from multiprocessing import shared_memory import os from typing import Callable, Iterable, Optional +import h5py import numpy as np try: @@ -24,13 +25,12 @@ class SequenceGenerator(BatchGenerator): self.prefetch = prefetch and not preload self.num_workers = num_workers self.pipeline = pipeline - self.process_id = 'NA' if not preload: self.data_processor = data_processor self.label_processor = label_processor - self.data = np.asarray(data, dtype=np.object) - self.label = np.asarray(label, dtype=np.object) + self.data = np.asarray(data) + self.label = np.asarray(label) else: self.data_processor = None self.label_processor = None @@ -42,7 +42,6 @@ class SequenceGenerator(BatchGenerator): os.makedirs(os.path.dirname(save_path)) if save and os.path.exists(save_path): - import h5py with h5py.File(save_path, 'r') as h5_file: data_len = np.asarray(h5_file['data_len']) self.data = [] @@ -50,23 +49,22 @@ class SequenceGenerator(BatchGenerator): for sequence_index in range(data_len): self.data.append(np.asarray(h5_file[f'data_{sequence_index}'])) self.label.append(np.asarray(h5_file[f'label_{sequence_index}'])) - self.data = np.asarray(self.data, dtype=np.object) - self.label = np.asarray(self.label, dtype=np.object) + self.data = np.asarray(self.data) + self.label = np.asarray(self.label) else: if data_processor: self.data = np.asarray( [np.asarray([data_processor(entry) for entry in serie]) for serie in data], dtype=np.object if len(data) > 1 else None) else: - self.data = np.asarray(data, dtype=np.object) + self.data = np.asarray(data) if label_processor: self.label = np.asarray( [np.asarray([label_processor(entry) for entry in serie]) for serie in label], dtype=np.object if len(label) > 1 else None) else: - self.label = np.asarray(label, dtype=np.object) + self.label = np.asarray(label) if save: - import h5py with h5py.File(save_path, 'w') as h5_file: h5_file.create_dataset('data_len', data=len(self.data)) for sequence_index in range(len(self.data)): @@ -135,6 +133,7 @@ class SequenceGenerator(BatchGenerator): self.batch_data = first_data self.batch_label = first_label + self.process_id = 'NA' if self.prefetch or self.num_workers > 1: self.cache_memory_indices = shared_memory.SharedMemory(create=True, size=self.index_list.nbytes) self.cache_indices = np.ndarray(