Compare commits

..

14 commits

Author SHA1 Message Date
Corentin Risselin
6359258061 Small fix, making h5py optional 2023-01-04 16:58:48 +09:00
Corentin
1bac46219b Fix dropouts and typos in ViT 2021-08-17 15:54:35 +09:00
Corentin
0cf142571b Fix typos 2021-05-25 14:06:22 +09:00
Corentin
06db437aa4 Vision Transformer 2021-05-22 01:18:39 +09:00
Corentin
90abb84710 Merge branch 'hoel-bagard/torch_utils-BatchNormModifications' 2021-05-21 16:00:53 +09:00
Corentin
770a9a4f82 Avoid use_batch_norm as layers instance variable 2021-05-21 16:00:16 +09:00
Corentin
fe11f3e6d5 Merge branch 'master' into 'BatchNormModifications'
# Conflicts:
#   layers.py
2021-05-21 06:53:31 +00:00
Corentin
d87bb89e6c SSDLoss implementation 2021-05-21 15:14:14 +09:00
Corentin
092f4acc3b Add SSD 2021-05-17 21:05:15 +09:00
Corentin
8d13de5711 Improve ResNet layers 2021-04-29 19:45:32 +09:00
Hoel Bagard
ce6314bf5e
Fixed bias 2021-01-22 12:48:33 +09:00
Hoel Bagard
a4280a1b78
Fixed issues: layers now use self.use_batch_norm instead of default value, fixed Layer's forward 2021-01-22 12:38:07 +09:00
Hoel Bagard
54000b6c34 Fixed default use_batch_norm value 2021-01-21 20:36:22 +09:00
Hoel Bagard
7a6f5821bd Introduced the use_batch_norm variable, removed old code 2021-01-21 16:10:10 +09:00
8 changed files with 657 additions and 95 deletions

106
layers.py
View file

@ -2,36 +2,25 @@ from typing import Union, Tuple
import torch import torch
import torch.nn as nn import torch.nn as nn
import torch.nn.functional as F
from .utils.logger import DummyLogger from .utils.logger import DummyLogger
class LayerInfo():
def __init__(self):
self.memory = 0.0
self.ops = 0.0
self.output = 0.0
class Layer(nn.Module): class Layer(nn.Module):
# Default layer arguments # Default layer arguments
ACTIVATION = F.leaky_relu ACTIVATION = torch.nn.LeakyReLU
ACTIVATION_KWARGS = {"negative_slope": 0.1}
BATCH_NORM = True USE_BATCH_NORM = True
BATCH_NORM_TRAINING = True BATCH_NORM_TRAINING = True
BATCH_NORM_MOMENTUM = 0.01 BATCH_NORM_MOMENTUM = 0.01
IS_TRAINING = False IS_TRAINING = False
METRICS = False METRICS = False
VERBOSE = 0
LOGGER = DummyLogger() LOGGER = DummyLogger()
def __init__(self, activation, batch_norm): def __init__(self, activation):
super().__init__() super().__init__()
self.name = 'Layer'
self.info = LayerInfo()
# Preload default # Preload default
if activation == 0: if activation == 0:
activation = Layer.ACTIVATION activation = Layer.ACTIVATION
@ -39,74 +28,93 @@ class Layer(nn.Module):
self.activation = activation() self.activation = activation()
else: else:
self.activation = activation self.activation = activation
self.batch_norm = Layer.BATCH_NORM if batch_norm is None else batch_norm self.batch_norm: torch.nn._BatchNorm
def forward(self, input_data: torch.Tensor) -> torch.Tensor: def forward(self, input_data: torch.Tensor) -> torch.Tensor:
output = input_data output = input_data
if self.activation is not None: if self.activation:
output = self.activation(output) output = self.activation(output)
if self.batch_norm is not None: if self.batch_norm:
output = self.batch_norm(output) output = self.batch_norm(output)
return output return output
@staticmethod
def add_weight_decay(module: nn.Module, weight_decay: float, exclude=()):
decay = []
no_decay = []
for name, param in module.named_parameters():
if not param.requires_grad:
continue
if len(param.shape) == 1 or name.endswith('.bias') or name in exclude:
no_decay.append(param)
else:
decay.append(param)
return [
{'params': no_decay, 'weight_decay': 0.0},
{'params': decay, 'weight_decay': weight_decay}]
class Linear(Layer): class Linear(Layer):
def __init__(self, in_channels: int, out_channels: int, activation=0, batch_norm=None, **kwargs): def __init__(self, in_channels: int, out_channels: int, activation=0, use_batch_norm: bool = None, **kwargs):
super().__init__(activation, batch_norm) super().__init__(activation)
self.fc = nn.Linear(in_channels, out_channels, **kwargs) use_batch_norm = Layer.USE_BATCH_NORM if use_batch_norm is None else use_batch_norm
self.linear = nn.Linear(in_channels, out_channels, bias=not use_batch_norm, **kwargs)
self.batch_norm = nn.BatchNorm1d( self.batch_norm = nn.BatchNorm1d(
out_channels, out_channels,
momentum=Layer.BATCH_NORM_MOMENTUM, momentum=Layer.BATCH_NORM_MOMENTUM,
track_running_stats=Layer.BATCH_NORM_TRAINING) if self.batch_norm else None track_running_stats=Layer.BATCH_NORM_TRAINING) if use_batch_norm else None
def forward(self, input_data: torch.Tensor) -> torch.Tensor: def forward(self, input_data: torch.Tensor) -> torch.Tensor:
return super().forward(self.fc(input_data)) return super().forward(self.linear(input_data))
class Conv1d(Layer): class Conv1d(Layer):
def __init__(self, in_channels: int, out_channels: int, kernel_size: int = 3, def __init__(self, in_channels: int, out_channels: int, kernel_size: int = 3,
stride: Union[int, Tuple[int, int]] = 1, activation=0, batch_norm=None, **kwargs): stride: Union[int, Tuple[int, int]] = 1, activation=0, use_batch_norm: bool = None, **kwargs):
super().__init__(activation, batch_norm) super().__init__(activation)
use_batch_norm = Layer.USE_BATCH_NORM if use_batch_norm is None else use_batch_norm
self.conv = nn.Conv1d(in_channels, out_channels, kernel_size, stride=stride, self.conv = nn.Conv1d(in_channels, out_channels, kernel_size, stride=stride,
bias=not self.batch_norm, **kwargs) bias=use_batch_norm, **kwargs)
self.batch_norm = nn.BatchNorm1d( self.batch_norm = nn.BatchNorm1d(
out_channels, out_channels,
momentum=Layer.BATCH_NORM_MOMENTUM, momentum=Layer.BATCH_NORM_MOMENTUM,
track_running_stats=Layer.BATCH_NORM_TRAINING) if self.batch_norm else None track_running_stats=Layer.BATCH_NORM_TRAINING) if use_batch_norm else None
def forward(self, input_data: torch.Tensor) -> torch.Tensor: def forward(self, input_data: torch.Tensor) -> torch.Tensor:
return super().forward(self.conv(input_data)) return super().forward(self.conv(input_data))
class Conv2d(Layer): class Conv2d(Layer):
def __init__(self, in_channels: int, out_channels: int, kernel_size: int = 3, def __init__(self, in_channels: int, out_channels: int, kernel_size: Union[int, tuple[int, int]] = 3,
stride: Union[int, Tuple[int, int]] = 1, activation=0, batch_norm=None, **kwargs): stride: Union[int, tuple[int, int]] = 1, activation=0, use_batch_norm: bool = None, **kwargs):
super().__init__(activation, batch_norm) super().__init__(activation)
use_batch_norm = Layer.USE_BATCH_NORM if use_batch_norm is None else use_batch_norm
self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride=stride, self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride=stride,
bias=not self.batch_norm, **kwargs) bias=not use_batch_norm, **kwargs)
self.batch_norm = nn.BatchNorm2d( self.batch_norm = nn.BatchNorm2d(
out_channels, out_channels,
momentum=Layer.BATCH_NORM_MOMENTUM, momentum=Layer.BATCH_NORM_MOMENTUM,
track_running_stats=Layer.BATCH_NORM_TRAINING) if self.batch_norm else None track_running_stats=Layer.BATCH_NORM_TRAINING) if use_batch_norm else None
def forward(self, input_data: torch.Tensor) -> torch.Tensor: def forward(self, input_data: torch.Tensor) -> torch.Tensor:
return super().forward(self.conv(input_data)) return super().forward(self.conv(input_data))
class Conv3d(Layer): class Conv3d(Layer):
def __init__(self, in_channels: int, out_channels: int, kernel_size: int = 3, def __init__(self, in_channels: int, out_channels: int, kernel_size: Union[int, tuple[int, int, int]] = 3,
stride: Union[int, Tuple[int, int]] = 1, activation=0, batch_norm=None, **kwargs): stride: Union[int, Tuple[int, int]] = 1, activation=0, use_batch_norm: bool = None, **kwargs):
super().__init__(activation, batch_norm) super().__init__(activation)
use_batch_norm = Layer.USE_BATCH_NORM if use_batch_norm is None else use_batch_norm
self.conv = nn.Conv3d(in_channels, out_channels, kernel_size, stride=stride, self.conv = nn.Conv3d(in_channels, out_channels, kernel_size, stride=stride,
bias=not self.batch_norm, **kwargs) bias=use_batch_norm, **kwargs)
self.batch_norm = nn.BatchNorm3d( self.batch_norm = nn.BatchNorm3d(
out_channels, out_channels,
momentum=Layer.BATCH_NORM_MOMENTUM, momentum=Layer.BATCH_NORM_MOMENTUM,
track_running_stats=Layer.BATCH_NORM_TRAINING) if self.batch_norm else None track_running_stats=Layer.BATCH_NORM_TRAINING) if use_batch_norm else None
def forward(self, input_data: torch.Tensor) -> torch.Tensor: def forward(self, input_data: torch.Tensor) -> torch.Tensor:
return super().forward(self.conv(input_data)) return super().forward(self.conv(input_data))
@ -114,16 +122,32 @@ class Conv3d(Layer):
class Deconv2d(Layer): class Deconv2d(Layer):
def __init__(self, in_channels: int, out_channels: int, kernel_size: int = 3, def __init__(self, in_channels: int, out_channels: int, kernel_size: int = 3,
stride: Union[int, Tuple[int, int]] = 1, activation=0, batch_norm=None, **kwargs): stride: Union[int, Tuple[int, int]] = 1, activation=0, use_batch_norm: bool = None, **kwargs):
super().__init__(activation, batch_norm) super().__init__(activation)
use_batch_norm = Layer.USE_BATCH_NORM if use_batch_norm is None else use_batch_norm
self.deconv = nn.ConvTranspose2d( self.deconv = nn.ConvTranspose2d(
in_channels, out_channels, kernel_size, stride=stride, in_channels, out_channels, kernel_size, stride=stride,
bias=not self.batch_norm, **kwargs) bias=not use_batch_norm, **kwargs)
self.batch_norm = nn.BatchNorm2d( self.batch_norm = nn.BatchNorm2d(
out_channels, out_channels,
momentum=Layer.BATCH_NORM_MOMENTUM, momentum=Layer.BATCH_NORM_MOMENTUM,
track_running_stats=Layer.BATCH_NORM_TRAINING) if self.batch_norm else None track_running_stats=Layer.BATCH_NORM_TRAINING) if use_batch_norm else None
def forward(self, input_data: torch.Tensor) -> torch.Tensor: def forward(self, input_data: torch.Tensor) -> torch.Tensor:
return super().forward(self.deconv(input_data)) return super().forward(self.deconv(input_data))
class DropPath(nn.Module):
def __init__(self, drop_prob=None):
super().__init__()
self.drop_prob = drop_prob
def forward(self, input_data: torch.Tensor) -> torch.Tensor:
if self.drop_prob == 0.0:
return input_data
keep_prob = 1 - self.drop_prob
shape = (input_data.shape[0],) + (1,) * (input_data.ndim - 1)
random_tensor = keep_prob + torch.rand(shape, dtype=input_data.dtype, device=input_data.device)
random_tensor.floor_() # binarize
return input_data.div(keep_prob) * random_tensor

View file

@ -3,65 +3,51 @@ from typing import Union, Tuple
import torch import torch
import torch.nn as nn import torch.nn as nn
from .layers import LayerInfo, Layer from .layers import Conv2d, LayerInfo, Layer
class ResBlock(Layer): class ResBlock(Layer):
def __init__(self, in_channels: int, out_channels: int, kernel_size: int = 3, def __init__(self, in_channels: int, out_channels: int = -1, kernel_size: int = 3, padding: int = 1,
activation=None, **kwargs): stride: Union[int, Tuple[int, int]] = 1, activation=None, batch_norm=None, **kwargs):
super().__init__(activation if activation is not None else 0, False) super().__init__(activation if activation is not None else 0, False)
self.batch_norm = None
if out_channels == -1:
out_channels = in_channels
self.seq = nn.Sequential( self.seq = nn.Sequential(
nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size, bias=False, **kwargs), Conv2d(in_channels, in_channels, kernel_size=kernel_size, stride=stride, padding=padding, **kwargs),
nn.BatchNorm2d( Conv2d(in_channels, out_channels, kernel_size=3, padding=1,
out_channels, activation=None, batch_norm=batch_norm))
momentum=Layer.BATCH_NORM_MOMENTUM, self.residual = Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, activation=None) if (
track_running_stats=not Layer.BATCH_NORM_TRAINING), out_channels != in_channels or stride != 1) else None
torch.nn.LeakyReLU(),
nn.Conv2d(out_channels, out_channels, kernel_size=kernel_size, bias=False, padding=1),
nn.BatchNorm2d(
out_channels,
momentum=Layer.BATCH_NORM_MOMENTUM,
track_running_stats=not Layer.BATCH_NORM_TRAINING))
self.batch_norm = nn.BatchNorm2d(
out_channels,
momentum=Layer.BATCH_NORM_MOMENTUM,
track_running_stats=not Layer.BATCH_NORM_TRAINING) if self.batch_norm else None
def forward(self, input_data: torch.Tensor) -> torch.Tensor: def forward(self, input_data: torch.Tensor) -> torch.Tensor:
if self.residual is not None:
return super().forward(self.residual(input_data) + self.seq(input_data))
return super().forward(input_data + self.seq(input_data)) return super().forward(input_data + self.seq(input_data))
class ResBottleneck(Layer): class ResBottleneck(Layer):
def __init__(self, in_channels: int, out_channels: int, planes: int = 1, kernel_size: int = 3, def __init__(self, in_channels: int, out_channels: int = -1, bottleneck_channels: int = -1, kernel_size: int = 3,
stride: Union[int, Tuple[int, int]] = 1, activation=None, **kwargs): stride: Union[int, Tuple[int, int]] = 1, padding=1,
activation=None, batch_norm=None, **kwargs):
super().__init__(activation if activation is not None else 0, False) super().__init__(activation if activation is not None else 0, False)
self.batch_norm = None self.batch_norm = None
if out_channels == -1:
out_channels = in_channels
if bottleneck_channels == -1:
bottleneck_channels = in_channels // 4
self.seq = nn.Sequential( self.seq = nn.Sequential(
nn.Conv2d(in_channels, out_channels, kernel_size=1, bias=False), Conv2d(in_channels, bottleneck_channels, kernel_size=1),
nn.BatchNorm2d( Conv2d(bottleneck_channels, bottleneck_channels, kernel_size=kernel_size,
out_channels, stride=stride, padding=padding, **kwargs),
momentum=Layer.BATCH_NORM_MOMENTUM, Conv2d(bottleneck_channels, out_channels, kernel_size=1,
track_running_stats=not Layer.BATCH_NORM_TRAINING), activation=None, batch_norm=batch_norm))
torch.nn.LeakyReLU(), self.residual = Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, activation=None) if (
nn.Conv2d(out_channels, out_channels, kernel_size=kernel_size, stride=stride, bias=False, **kwargs), out_channels != in_channels or stride != 1) else None
nn.BatchNorm2d(
out_channels,
momentum=Layer.BATCH_NORM_MOMENTUM,
track_running_stats=not Layer.BATCH_NORM_TRAINING),
torch.nn.LeakyReLU(),
nn.Conv2d(out_channels, planes * out_channels, kernel_size=1, bias=False),
nn.BatchNorm2d(
out_channels,
momentum=Layer.BATCH_NORM_MOMENTUM,
track_running_stats=not Layer.BATCH_NORM_TRAINING))
self.downsample = nn.Sequential(
nn.Conv2d(in_channels, planes * out_channels, stride=stride, kernel_size=1),
nn.BatchNorm2d(
planes * out_channels,
momentum=Layer.BATCH_NORM_MOMENTUM,
track_running_stats=not Layer.BATCH_NORM_TRAINING))
def forward(self, input_data: torch.Tensor) -> torch.Tensor: def forward(self, input_data: torch.Tensor) -> torch.Tensor:
return super().forward(self.downsample(input_data) + self.seq(input_data)) if self.residual is not None:
return super().forward(self.residual(input_data) + self.seq(input_data))
return super().forward(input_data + self.seq(input_data))

86
ssd/box.py Normal file
View file

@ -0,0 +1,86 @@
import numpy as np
def create_box(y_pos: float, x_pos: float, height: float, width: float) -> tuple[float, float, float, float]:
y_min, x_min, y_max, x_max = check_rectangle(
y_pos - (height / 2), x_pos - (width / 2), y_pos + (height / 2), x_pos + (width / 2))
return (y_min + y_max) / 2, (x_min + x_max) / 2, y_max - y_min, x_max - x_min
def check_rectangle(y_min: float, x_min: float, y_max: float, x_max: float) -> tuple[float, float, float, float]:
if y_min < 0:
y_min = 0
if x_min < 0:
x_min = 0
if y_min > 1:
y_min = 1
if x_min > 1:
x_min = 1
if y_max < 0:
y_max = 0
if x_max < 0:
x_max = 0
if y_max >= 1:
y_max = 1
if x_max >= 1:
x_max = 1
return y_min, x_min, y_max, x_max
def get_boxes(predictions: np.ndarray, anchors: np.ndarray, class_index: int) -> np.ndarray:
boxes = np.zeros(anchors.shape)
boxes[:, 0] = (predictions[:, 0] * anchors[:, 2]) + anchors[:, 0]
boxes[:, 1] = (predictions[:, 1] * anchors[:, 3]) + anchors[:, 1]
boxes[:, 2] = np.exp(predictions[:, 2]) * anchors[:, 2]
boxes[:, 3] = np.exp(predictions[:, 3]) * anchors[:, 3]
boxes = np.asarray([create_box(*box) for box in boxes])
# return np.insert(boxes, 4, predictions[:, class_index], axis=-1)
return np.concatenate([boxes, predictions[:, class_index:class_index + 1]], axis=1)
def fast_nms(boxes: np.ndarray, min_iou: float) -> np.ndarray:
# if there are no boxes, return an empty list
if len(boxes) == 0:
return []
# initialize the list of picked indexes
pick = []
# grab the coordinates of the bounding boxes
y_min = boxes[:, 0] - (boxes[:, 2] / 2)
y_max = boxes[:, 0] + (boxes[:, 2] / 2)
x_min = boxes[:, 1] - (boxes[:, 3] / 2)
x_max = boxes[:, 1] + (boxes[:, 3] / 2)
scores = boxes[:, 4]
# compute the area of the bounding boxes and sort the bounding boxes by the scores
areas = (x_max - x_min) * (y_max - y_min)
idxs = np.argsort(scores)
# keep looping while some indexes still remain in the indexes
# list
while len(idxs) > 0:
# grab the last index in the indexes list and add the
# index value to the list of picked indexes
last = len(idxs) - 1
i = idxs[last]
pick.append(i)
inter_tops = np.maximum(y_min[i], y_min[idxs[:last]])
inter_bottoms = np.minimum(y_max[i], y_max[idxs[:last]])
inter_lefts = np.maximum(x_min[i], x_min[idxs[:last]])
inter_rights = np.minimum(x_max[i], x_max[idxs[:last]])
inter_areas = (inter_rights - inter_lefts) * (inter_bottoms - inter_tops)
# compute the ratio of overlap
union_area = (areas[idxs[:last]] + areas[i]) - inter_areas
overlap = inter_areas / union_area
# delete all indexes from the index list that have less overlap than min_iou
idxs = np.delete(
idxs, np.concatenate(([last], np.where(overlap > min_iou)[0])))
# return only the bounding boxes that were picked using the
# integer data type
return boxes[pick]

112
ssd/loss.py Normal file
View file

@ -0,0 +1,112 @@
import torch
import torch.nn as nn
class JacardOverlap(nn.Module):
def forward(self, anchors: torch.Tensor, labels: torch.Tensor) -> torch.Tensor:
"""
Assuming rank 2 (number of boxes, locations), location is (y, x, h, w)
Jaccard overlap : A B / A B = A B / (area(A) + area(B) - A B)
Return:
jaccard overlap: (tensor) Shape: [predictions.size(0), labels.size(0)]
"""
anchors_count = anchors.size(0)
labels_count = labels.size(0)
# Getting coords (y_min, x_min, y_max, x_max) repeated to fill (anchor count, label count)
anchor_coords = torch.cat([
anchors[:, :2] - (anchors[:, 2:] / 2),
anchors[:, :2] + (anchors[:, 2:] / 2)], 1).unsqueeze(1).expand(anchors_count, labels_count, 4)
label_coords = torch.cat([
labels[:, :2] - (labels[:, 2:] / 2),
labels[:, :2] + (labels[:, 2:] / 2)], 1).unsqueeze(0).expand(anchors_count, labels_count, 4)
mins = torch.max(anchor_coords, label_coords)[:, :, :2]
maxes = torch.min(anchor_coords, label_coords)[:, :, 2:]
inter_coords = torch.clamp(maxes - mins, min=0)
inter_area = inter_coords[:, :, 0] * inter_coords[:, :, 1]
anchor_areas = (anchors[:, 2] * anchors[:, 3]).unsqueeze(1).expand_as(inter_area)
label_areas = (labels[:, 2] * labels[:, 3]).unsqueeze(0).expand_as(inter_area)
union_area = anchor_areas + label_areas - inter_area
return inter_area / union_area
class SSDLoss(nn.Module):
def __init__(self, anchors: torch.Tensor, label_per_image: int,
negative_mining_ratio: int, matching_iou: float,
location_dimmension: int = 4, localization_loss_weight: float = 1.0):
super().__init__()
self.anchors = anchors
self.anchor_count = anchors.size(0)
self.label_per_image = label_per_image
self.location_dimmension = location_dimmension
self.negative_mining_ratio = negative_mining_ratio
self.matching_iou = matching_iou
self.localization_loss_weight = localization_loss_weight
self.overlap = JacardOverlap()
self.matches = []
# self.negative_matches = []
self.positive_class_loss = torch.Tensor()
self.negative_class_loss = torch.Tensor()
self.localization_loss = torch.Tensor()
self.class_loss = torch.Tensor()
self.final_loss = torch.Tensor()
def forward(self, input_data: torch.Tensor, input_labels: torch.Tensor) -> torch.Tensor:
batch_size = input_data.size(0)
expanded_anchors = self.anchors[:, :4].unsqueeze(0).unsqueeze(2).expand(
batch_size, self.anchor_count, self.label_per_image, 4)
expanded_labels = input_labels[:, :, :self.location_dimmension].unsqueeze(1).expand(
batch_size, self.anchor_count, self.label_per_image, self.location_dimmension)
objective_pos = (expanded_labels[:, :, :, :2] - expanded_anchors[:, :, :, :2]) / (
expanded_anchors[:, :, :, 2:])
objective_size = torch.log(expanded_labels[:, :, :, 2:] / expanded_anchors[:, :, :, 2:])
positive_objectives = []
positive_predictions = []
positive_class_loss = []
negative_class_loss = []
self.matches = []
# self.negative_matches = []
for batch_index in range(batch_size):
predictions = input_data[batch_index]
labels = input_labels[batch_index]
overlaps = self.overlap(self.anchors[:, :4], labels[:, :4])
mask = (overlaps >= self.matching_iou).long()
match_indices = torch.nonzero(mask, as_tuple=False)
self.matches.append(match_indices.detach().cpu())
mining_count = int(self.negative_mining_ratio * len(self.matches[-1]))
masked_prediction = predictions[:, self.location_dimmension] + torch.max(mask, dim=1)[0]
non_match_indices = torch.argsort(masked_prediction, dim=-1, descending=False)[:mining_count]
# self.negative_matches.append(non_match_indices.detach().cpu())
for anchor_index, label_index in match_indices:
positive_predictions.append(predictions[anchor_index])
positive_objectives.append(
torch.cat((
objective_pos[batch_index, anchor_index, label_index],
objective_size[batch_index, anchor_index, label_index]), dim=-1))
positive_class_loss.append(torch.log(
predictions[anchor_index, self.location_dimmension + labels[label_index, -1].long()]))
for anchor_index in non_match_indices:
negative_class_loss.append(
torch.log(predictions[anchor_index, self.location_dimmension]))
if not positive_predictions:
return None
positive_predictions = torch.stack(positive_predictions)
positive_objectives = torch.stack(positive_objectives)
self.positive_class_loss = -torch.sum(torch.stack(positive_class_loss))
self.negative_class_loss = -torch.sum(torch.stack(negative_class_loss))
self.localization_loss = nn.functional.smooth_l1_loss(
positive_predictions[:, self.location_dimmension],
positive_objectives)
self.class_loss = self.positive_class_loss + self.negative_class_loss
self.final_loss = (self.localization_loss_weight * self.localization_loss) + self.class_loss
return self.final_loss

165
ssd/ssd.py Normal file
View file

@ -0,0 +1,165 @@
import colorsys
import math
import numpy as np
import torch
import torch.nn as nn
from .box import check_rectangle
from ..layers import Conv2d
class SSD(nn.Module):
class Detector(nn.Module):
def __init__(self, input_features: int, output_features: int):
super().__init__()
self.conv = Conv2d(input_features, output_features, kernel_size=3, padding=1,
batch_norm=False, activation=None)
self.output = None
def forward(self, input_data: torch.Tensor) -> torch.Tensor:
self.output = self.conv(input_data).permute(0, 2, 3, 1)
return self.output
class DetectorMerge(nn.Module):
def __init__(self, location_dimmension: int):
super().__init__()
self.location_dim = location_dimmension
def forward(self, detector_outputs: torch.Tensor) -> torch.Tensor:
return torch.cat(
[detector_outputs[:, :, :self.location_dim],
torch.softmax(detector_outputs[:, :, self.location_dim:], dim=2)], dim=2)
class AnchorInfo:
def __init__(self, center: tuple[float, float], size: tuple[float],
index: int, layer_index: int, map_index: tuple[int, int], color_index: int,
ratio: float, size_factor: float):
self.index = index
self.layer_index = layer_index
self.map_index = map_index
self.color_index = color_index
self.ratio = ratio
self.size_factor = size_factor
self.center = center
self.size = size
self.box = check_rectangle(
center[0] - (size[0] / 2), center[1] - (size[1] / 2),
center[0] + (size[0] / 2), center[1] + (size[1] / 2))
def __repr__(self):
return (f'{self.__class__.__name__}'
f'(index:{self.index}, layer:{self.layer_index}, coord:{self.map_index}'
f', center:({self.center[0]:.03f}, {self.center[1]:.03f})'
f', size:({self.size[0]:.03f}, {self.size[1]:.03f})'
f', ratio:{self.ratio:.03f}, size_factor:{self.size_factor:.03f})'
f', y:[{self.box[0]:.03f}:{self.box[2]:.03f}]'
f', x:[{self.box[1]:.03f}:{self.box[3]:.03f}])')
def __array__(self):
return np.array([*self.center, *self.size])
def __init__(self, base_network: nn.Module, input_sample: torch.Tensor, classes: list[str],
location_dimmension: int, layer_channels: list[int], layer_box_ratios: list[float], layer_args: dict,
box_size_factors: list[float]):
super().__init__()
self.location_dim = location_dimmension
self.classes = ['none'] + classes
self.class_count = len(self.classes)
self.base_input_shape = input_sample.numpy().shape[1:]
self.base_network = base_network
sample_output = base_network(input_sample)
self.base_output_shape = list(sample_output.detach().numpy().shape)[-3:]
layer_convs: list[nn.Module] = []
layer_detectors: list[SSD.Detector] = []
last_feature_count = self.base_output_shape[0]
for layer_index, (output_features, kwargs) in enumerate(zip(layer_channels, layer_args)):
if 'disable' not in kwargs:
layer_convs.append(Conv2d(last_feature_count, output_features, **kwargs))
layer_detectors.append(SSD.Detector(
last_feature_count, (self.class_count + self.location_dim) * len(layer_box_ratios[layer_index])))
# layers.append(SSD.Layer(
# last_feature_count, output_features,
# (self.class_count + self.location_dim) * len(layer_box_ratios[layer_index]),
# **kwargs))
last_feature_count = output_features
self.layer_convs = nn.ModuleList(layer_convs)
self.layer_detectors = nn.ModuleList(layer_detectors)
self.merge = self.DetectorMerge(location_dimmension)
self.anchors_numpy, self.anchor_info, self.box_colors = self._create_anchors(
sample_output, self.layer_convs, self.layer_detectors, layer_box_ratios, box_size_factors,
input_sample.shape[3] / input_sample.shape[2])
self.anchors = torch.from_numpy(self.anchors_numpy)
def forward(self, input_data: torch.Tensor) -> torch.Tensor:
head = self.base_network(input_data)
detector_outputs = []
for layer_index, detector in enumerate(self.layer_detectors):
detector_out = detector(head)
detector_outputs.append(detector_out.reshape(
detector_out.size(0), -1, self.class_count + self.location_dim))
if layer_index < len(self.layer_convs):
head = self.layer_convs[layer_index](head)
detector_outputs = torch.cat(detector_outputs, 1)
return self.merge(detector_outputs)
# base_output = self.base_network(input_data)
# head = base_output
# outputs = []
# for layer in self.layers:
# head, detector_output = layer(head)
# outputs.append(detector_output.reshape(base_output.size(0), -1, self.class_count + self.location_dim))
# outputs = torch.cat(outputs, 1)
# return torch.cat(
# [outputs[:, :, :self.location_dim], torch.softmax(outputs[:, :, self.location_dim:], dim=2)], dim=2)
def _apply(self, fn):
super()._apply(fn)
self.anchors = fn(self.anchors)
return self
@staticmethod
def _create_anchors(
base_output: torch.Tensor, layers: nn.ModuleList, detectors: nn.ModuleList, layer_box_ratios: list[float],
box_size_factors: list[float], image_ratio: float) -> tuple[np.ndarray, np.ndarray, list[np.ndarray]]:
anchors = []
anchor_info: list[SSD.AnchorInfo] = []
box_colors: list[np.ndarray] = []
head = base_output
for layer_index, detector in enumerate(detectors):
detector_output = detector(head) # detector output shape : NCRSHW (Ratio, Size)
if layer_index < len(layers):
head = layers[layer_index](head)
detector_rows = detector_output.size()[1]
detector_cols = detector_output.size()[2]
color_index = 0
layer_ratios = layer_box_ratios[layer_index]
for index_y in range(detector_rows):
center_y = (index_y + 0.5) / detector_rows
for index_x in range(detector_cols):
center_x = (index_x + 0.5) / detector_cols
for ratio, size_factor in zip(layer_ratios, box_size_factors):
box_colors.append((np.asarray(colorsys.hsv_to_rgb(
color_index / len(layer_ratios), 1.0, 1.0)) * 255).astype(np.uint8))
color_index += 1
unit_box_size = size_factor / max(detector_rows, detector_cols)
anchor_width = unit_box_size * math.sqrt(ratio / image_ratio)
anchor_height = unit_box_size / math.sqrt(ratio / image_ratio)
anchor_info.append(SSD.AnchorInfo(
(center_y, center_x),
(anchor_height, anchor_width),
len(anchors),
layer_index,
(index_y, index_x),
len(box_colors) - 1,
ratio,
size_factor
))
anchors.append([center_y, center_x, anchor_height, anchor_width])
return np.asarray(anchors, dtype=np.float32), anchor_info, box_colors

View file

@ -0,0 +1,188 @@
"""
Data efficent image transformer (deit)
from https://github.com/facebookresearch/deit, https://arxiv.org/abs/2012.12877
And Vit : https://arxiv.org/abs/2010.11929
"""
from functools import partial
import math
import numpy as np
import torch
import torch.nn as nn
from ..layers import DropPath, Layer
class PatchEmbed(nn.Module):
def __init__(self, image_shape: tuple[int, int], patch_size: int = 16,
in_channels: int = 3, embed_dim: int = 768):
super().__init__()
patch_count = (image_shape[0] // patch_size) * (image_shape[1] // patch_size)
self.image_shape = image_shape
self.patch_size = patch_size
self.patch_count = patch_count
self.projector = nn.Conv2d(in_channels, embed_dim, kernel_size=patch_size, stride=patch_size)
def forward(self, input_data: torch.Tensor) -> torch.Tensor:
return self.projector(input_data).flatten(2).transpose(1, 2)
class SelfAttention(nn.Module):
def __init__(self, dim: int, head_count: int, qkv_bias: bool, qk_scale: float,
attention_drop_rate: float, projection_drop_rate: float):
super().__init__()
self.head_count = head_count
head_dim = dim // head_count
self.scale = qk_scale or head_dim ** -0.5
self.qkv = nn.Linear(dim, dim * 3, bias=qkv_bias)
self.attention_drop = nn.Dropout(attention_drop_rate)
self.projector = nn.Linear(dim, dim)
self.projection_drop = nn.Dropout(projection_drop_rate)
def forward(self, input_data: torch.Tensor) -> torch.Tensor:
batch_size, sequence_length, channel_count = input_data.shape
qkv = self.qkv(input_data).reshape(
batch_size, sequence_length, 3, self.head_count, channel_count // self.head_count).permute(
2, 0, 3, 1, 4)
# (output shape : 3, batch_size, head_count, sequence_lenght, channel_count / head_count)
query, key, value = qkv[0], qkv[1], qkv[2]
attention = self.attention_drop(((query @ key.transpose(-2, -1)) * self.scale).softmax(dim=-1))
return self.projection_drop(self.projector(
(attention @ value).transpose(1, 2).reshape(batch_size, sequence_length, channel_count)))
class Block(nn.Module):
def __init__(self, dim: int, head_count: int, mlp_ratio: float,
qkv_bias: bool, qk_scale: float, drop_rate: float,
attention_drop_rate: float, drop_path_rate: float,
norm_layer=0, activation=0):
super().__init__()
self.norm1 = norm_layer(dim)
self.attention = SelfAttention(dim, head_count, qkv_bias, qk_scale, attention_drop_rate, drop_rate)
self.drop_path = DropPath(drop_path_rate) if drop_path_rate > 0.0 else nn.Identity()
self.norm2 = norm_layer(dim)
self.mlp = nn.Sequential(
nn.Linear(dim, int(dim * mlp_ratio)),
activation(),
nn.Linear(int(dim * mlp_ratio), dim),
nn.Dropout(drop_rate))
def forward(self, input_data: torch.Tensor) -> torch.Tensor:
out = input_data + self.drop_path(self.attention(self.norm1(input_data)))
return out + self.drop_path(self.mlp(self.norm2(out)))
class VissionTransformer(nn.Module):
QK_SCALE = None
ACTIVATION = 0
NORM_LAYER = nn.LayerNorm
def __init__(self, image_shape: tuple[int, int, int], class_count: int, depth: int,
patch_size: int = 16, embed_dim: int = 768,
head_count: int = 8, mlp_ratio: float = 4.0, qkv_bias: bool = True, qk_scale: float = None,
representation_size=None, distilled: bool = False, drop_rate: float = 0.0,
attention_drop_rate: float = 0.0, drop_path_rate: float = 0.0, embed_layer=PatchEmbed,
norm_layer=0, activation=0):
super().__init__()
qk_scale = qk_scale if qk_scale is not None else self.QK_SCALE
activation = activation if activation != 0 else self.ACTIVATION
activation = activation if activation != 0 else Layer.ACTIVATION
norm_layer = norm_layer if norm_layer != 0 else self.NORM_LAYER
self.class_count = class_count
self.feature_count = self.embed_dim = embed_dim
self.distilled = distilled
norm_layer = norm_layer or partial(nn.LayerNorm, eps=1e-6)
self.patch_embed = embed_layer(image_shape[1:], patch_size=patch_size,
in_channels=image_shape[0], embed_dim=embed_dim)
patch_count = self.patch_embed.patch_count
token_count = 2 if distilled else 1
self.class_token = nn.Parameter(torch.zeros(1, 1, embed_dim))
self.distillation_token = nn.Parameter(torch.zeros(1, 1, embed_dim)) if distilled else None
self.position_embeddings = nn.Parameter(torch.zeros(1, patch_count + token_count, embed_dim))
self.position_drop = nn.Dropout(drop_rate) if drop_rate > 0.0 else nn.Identity()
depth_path_drop_rates = np.linspace(0, drop_path_rate, depth) if drop_path_rate > 0.0 else [0.0] * depth
self.blocks = nn.Sequential(*[
Block(embed_dim, head_count, mlp_ratio, qkv_bias, qk_scale, drop_rate, attention_drop_rate,
pdr, norm_layer, activation) for pdr in depth_path_drop_rates])
self.norm = norm_layer(embed_dim)
# Representation Layer
if representation_size and not distilled:
self.feature_count = representation_size
self.pre_logits = nn.Sequential(
nn.Linear(embed_dim, representation_size),
nn.Tanh())
else:
self.pre_logits = nn.Identity()
# Final classifier
self.head = nn.Linear(self.feature_count, class_count) if class_count > 0 else nn.Identity()
self.head_distilled = nn.Linear(
self.embed_dim, self.class_count) if class_count > 0 and distilled else nn.Identity()
# Init weights
nn.init.trunc_normal_(self.class_token, std=0.02)
nn.init.trunc_normal_(self.position_embeddings, std=0.02)
if self.distilled:
nn.init.trunc_normal_(self.distillation_token, std=0.02)
# Applying weights initialization made no difference so far
self.apply(partial(self._init_weights, head_bias=-math.log(self.class_count)))
@torch.jit.ignore
def no_weight_decay(self) -> dict:
return {'class_token', 'distillation_token', 'position_embeddings'}
def get_classifier(self):
return self.head if self.distillation_token is None else (self.head, self.head_distilled)
def reset_classifier(self, class_count: int):
self.class_count = class_count
self.head = nn.Linear(self.feature_count, class_count) if class_count > 0 else nn.Identity()
self.head_distilled = nn.Linear(
self.embed_dim, self.class_count) if class_count > 0 and self.distilled else nn.Identity()
def forward(self, input_data: torch.Tensor) -> torch.Tensor:
embeddings = self.patch_embed(input_data)
class_token = self.class_token.expand(embeddings.shape[0], -1, -1)
if self.distilled:
block_output = self.norm(self.blocks(self.position_drop(
torch.cat((class_token, self.distillation_token.expand(embeddings.shape[0], -1, -1), embeddings), dim=1)
+ self.position_embeddings)))
distilled_head_output = self.head_distilled(block_output[:, 1])
head_output = self.head(block_output[:, 0])
if self.training and not torch.jit.is_scripting():
return head_output, distilled_head_output
return (head_output + distilled_head_output) / 2.0
block_output = self.norm(self.blocks(self.position_drop(
torch.cat((class_token, embeddings), dim=1) + self.position_embeddings)))
return self.head(self.pre_logits(block_output[:, 0]))
@staticmethod
def _init_weights(module: nn.Module, name: str = '', head_bias: float = 0.0):
if isinstance(module, nn.Linear):
if name.startswith('head'):
nn.init.zeros_(module.weight)
nn.init.constant_(module.bias, head_bias)
elif name.startswith('pre_logits'):
nn.init.xavier_normal_(module.weight)
nn.init.zeros_(module.bias)
# pytorch init for conv is fine
# elif isinstance(module, nn.Conv2d):
# nn.init.xavier_normal_(module.weight)
# if module.bias is not None:
# nn.init.zeros_(module.bias)
elif isinstance(module, nn.LayerNorm):
nn.init.ones_(module.weight)
nn.init.zeros_(module.bias)

View file

@ -3,7 +3,6 @@ from multiprocessing import shared_memory
import os import os
from typing import Callable, Iterable, Optional, Tuple from typing import Callable, Iterable, Optional, Tuple
import h5py
import numpy as np import numpy as np
@ -20,6 +19,7 @@ class BatchGenerator:
self.num_workers = num_workers self.num_workers = num_workers
self.flip_data = flip_data self.flip_data = flip_data
self.pipeline = pipeline self.pipeline = pipeline
self.process_id = 'NA'
if not preload: if not preload:
self.data_processor = data_processor self.data_processor = data_processor
@ -37,6 +37,7 @@ class BatchGenerator:
os.makedirs(os.path.dirname(save_path)) os.makedirs(os.path.dirname(save_path))
if save and os.path.exists(save_path): if save and os.path.exists(save_path):
import h5py
with h5py.File(save_path, 'r') as h5_file: with h5py.File(save_path, 'r') as h5_file:
self.data = np.asarray(h5_file['data']) self.data = np.asarray(h5_file['data'])
self.label = np.asarray(h5_file['label']) self.label = np.asarray(h5_file['label'])
@ -80,7 +81,6 @@ class BatchGenerator:
self.batch_data = first_data self.batch_data = first_data
self.batch_label = first_label self.batch_label = first_label
self.process_id = 'NA'
if self.prefetch or self.num_workers > 1: if self.prefetch or self.num_workers > 1:
self.cache_memory_indices = shared_memory.SharedMemory(create=True, size=self.index_list.nbytes) self.cache_memory_indices = shared_memory.SharedMemory(create=True, size=self.index_list.nbytes)
self.cache_indices = np.ndarray( self.cache_indices = np.ndarray(

View file

@ -3,7 +3,6 @@ from multiprocessing import shared_memory
import os import os
from typing import Callable, Iterable, Optional from typing import Callable, Iterable, Optional
import h5py
import numpy as np import numpy as np
try: try:
@ -25,12 +24,13 @@ class SequenceGenerator(BatchGenerator):
self.prefetch = prefetch and not preload self.prefetch = prefetch and not preload
self.num_workers = num_workers self.num_workers = num_workers
self.pipeline = pipeline self.pipeline = pipeline
self.process_id = 'NA'
if not preload: if not preload:
self.data_processor = data_processor self.data_processor = data_processor
self.label_processor = label_processor self.label_processor = label_processor
self.data = np.asarray(data) self.data = np.asarray(data, dtype=np.object)
self.label = np.asarray(label) self.label = np.asarray(label, dtype=np.object)
else: else:
self.data_processor = None self.data_processor = None
self.label_processor = None self.label_processor = None
@ -42,6 +42,7 @@ class SequenceGenerator(BatchGenerator):
os.makedirs(os.path.dirname(save_path)) os.makedirs(os.path.dirname(save_path))
if save and os.path.exists(save_path): if save and os.path.exists(save_path):
import h5py
with h5py.File(save_path, 'r') as h5_file: with h5py.File(save_path, 'r') as h5_file:
data_len = np.asarray(h5_file['data_len']) data_len = np.asarray(h5_file['data_len'])
self.data = [] self.data = []
@ -49,22 +50,23 @@ class SequenceGenerator(BatchGenerator):
for sequence_index in range(data_len): for sequence_index in range(data_len):
self.data.append(np.asarray(h5_file[f'data_{sequence_index}'])) self.data.append(np.asarray(h5_file[f'data_{sequence_index}']))
self.label.append(np.asarray(h5_file[f'label_{sequence_index}'])) self.label.append(np.asarray(h5_file[f'label_{sequence_index}']))
self.data = np.asarray(self.data) self.data = np.asarray(self.data, dtype=np.object)
self.label = np.asarray(self.label) self.label = np.asarray(self.label, dtype=np.object)
else: else:
if data_processor: if data_processor:
self.data = np.asarray( self.data = np.asarray(
[np.asarray([data_processor(entry) for entry in serie]) for serie in data], [np.asarray([data_processor(entry) for entry in serie]) for serie in data],
dtype=np.object if len(data) > 1 else None) dtype=np.object if len(data) > 1 else None)
else: else:
self.data = np.asarray(data) self.data = np.asarray(data, dtype=np.object)
if label_processor: if label_processor:
self.label = np.asarray( self.label = np.asarray(
[np.asarray([label_processor(entry) for entry in serie]) for serie in label], [np.asarray([label_processor(entry) for entry in serie]) for serie in label],
dtype=np.object if len(label) > 1 else None) dtype=np.object if len(label) > 1 else None)
else: else:
self.label = np.asarray(label) self.label = np.asarray(label, dtype=np.object)
if save: if save:
import h5py
with h5py.File(save_path, 'w') as h5_file: with h5py.File(save_path, 'w') as h5_file:
h5_file.create_dataset('data_len', data=len(self.data)) h5_file.create_dataset('data_len', data=len(self.data))
for sequence_index in range(len(self.data)): for sequence_index in range(len(self.data)):
@ -133,7 +135,6 @@ class SequenceGenerator(BatchGenerator):
self.batch_data = first_data self.batch_data = first_data
self.batch_label = first_label self.batch_label = first_label
self.process_id = 'NA'
if self.prefetch or self.num_workers > 1: if self.prefetch or self.num_workers > 1:
self.cache_memory_indices = shared_memory.SharedMemory(create=True, size=self.index_list.nbytes) self.cache_memory_indices = shared_memory.SharedMemory(create=True, size=self.index_list.nbytes)
self.cache_indices = np.ndarray( self.cache_indices = np.ndarray(