Compare commits
14 commits
9b0c8ec49d
...
6359258061
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6359258061 | ||
|
|
1bac46219b | ||
|
|
0cf142571b | ||
|
|
06db437aa4 | ||
|
|
90abb84710 | ||
|
|
770a9a4f82 | ||
|
|
fe11f3e6d5 | ||
|
|
d87bb89e6c | ||
|
|
092f4acc3b | ||
|
|
8d13de5711 | ||
|
|
ce6314bf5e |
||
|
|
a4280a1b78 |
||
|
|
54000b6c34 | ||
|
|
7a6f5821bd |
8 changed files with 657 additions and 95 deletions
106
layers.py
106
layers.py
|
|
@ -2,36 +2,25 @@ from typing import Union, Tuple
|
|||
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
import torch.nn.functional as F
|
||||
|
||||
from .utils.logger import DummyLogger
|
||||
|
||||
|
||||
class LayerInfo():
|
||||
def __init__(self):
|
||||
self.memory = 0.0
|
||||
self.ops = 0.0
|
||||
self.output = 0.0
|
||||
|
||||
|
||||
class Layer(nn.Module):
|
||||
# Default layer arguments
|
||||
ACTIVATION = F.leaky_relu
|
||||
ACTIVATION = torch.nn.LeakyReLU
|
||||
ACTIVATION_KWARGS = {"negative_slope": 0.1}
|
||||
|
||||
BATCH_NORM = True
|
||||
USE_BATCH_NORM = True
|
||||
BATCH_NORM_TRAINING = True
|
||||
BATCH_NORM_MOMENTUM = 0.01
|
||||
|
||||
IS_TRAINING = False
|
||||
METRICS = False
|
||||
VERBOSE = 0
|
||||
LOGGER = DummyLogger()
|
||||
|
||||
def __init__(self, activation, batch_norm):
|
||||
def __init__(self, activation):
|
||||
super().__init__()
|
||||
self.name = 'Layer'
|
||||
self.info = LayerInfo()
|
||||
|
||||
# Preload default
|
||||
if activation == 0:
|
||||
activation = Layer.ACTIVATION
|
||||
|
|
@ -39,74 +28,93 @@ class Layer(nn.Module):
|
|||
self.activation = activation()
|
||||
else:
|
||||
self.activation = activation
|
||||
self.batch_norm = Layer.BATCH_NORM if batch_norm is None else batch_norm
|
||||
self.batch_norm: torch.nn._BatchNorm
|
||||
|
||||
def forward(self, input_data: torch.Tensor) -> torch.Tensor:
|
||||
output = input_data
|
||||
if self.activation is not None:
|
||||
if self.activation:
|
||||
output = self.activation(output)
|
||||
if self.batch_norm is not None:
|
||||
if self.batch_norm:
|
||||
output = self.batch_norm(output)
|
||||
return output
|
||||
|
||||
@staticmethod
|
||||
def add_weight_decay(module: nn.Module, weight_decay: float, exclude=()):
|
||||
decay = []
|
||||
no_decay = []
|
||||
for name, param in module.named_parameters():
|
||||
if not param.requires_grad:
|
||||
continue
|
||||
if len(param.shape) == 1 or name.endswith('.bias') or name in exclude:
|
||||
no_decay.append(param)
|
||||
else:
|
||||
decay.append(param)
|
||||
return [
|
||||
{'params': no_decay, 'weight_decay': 0.0},
|
||||
{'params': decay, 'weight_decay': weight_decay}]
|
||||
|
||||
|
||||
class Linear(Layer):
|
||||
def __init__(self, in_channels: int, out_channels: int, activation=0, batch_norm=None, **kwargs):
|
||||
super().__init__(activation, batch_norm)
|
||||
def __init__(self, in_channels: int, out_channels: int, activation=0, use_batch_norm: bool = None, **kwargs):
|
||||
super().__init__(activation)
|
||||
|
||||
self.fc = nn.Linear(in_channels, out_channels, **kwargs)
|
||||
use_batch_norm = Layer.USE_BATCH_NORM if use_batch_norm is None else use_batch_norm
|
||||
self.linear = nn.Linear(in_channels, out_channels, bias=not use_batch_norm, **kwargs)
|
||||
self.batch_norm = nn.BatchNorm1d(
|
||||
out_channels,
|
||||
momentum=Layer.BATCH_NORM_MOMENTUM,
|
||||
track_running_stats=Layer.BATCH_NORM_TRAINING) if self.batch_norm else None
|
||||
track_running_stats=Layer.BATCH_NORM_TRAINING) if use_batch_norm else None
|
||||
|
||||
def forward(self, input_data: torch.Tensor) -> torch.Tensor:
|
||||
return super().forward(self.fc(input_data))
|
||||
return super().forward(self.linear(input_data))
|
||||
|
||||
|
||||
class Conv1d(Layer):
|
||||
def __init__(self, in_channels: int, out_channels: int, kernel_size: int = 3,
|
||||
stride: Union[int, Tuple[int, int]] = 1, activation=0, batch_norm=None, **kwargs):
|
||||
super().__init__(activation, batch_norm)
|
||||
stride: Union[int, Tuple[int, int]] = 1, activation=0, use_batch_norm: bool = None, **kwargs):
|
||||
super().__init__(activation)
|
||||
|
||||
use_batch_norm = Layer.USE_BATCH_NORM if use_batch_norm is None else use_batch_norm
|
||||
self.conv = nn.Conv1d(in_channels, out_channels, kernel_size, stride=stride,
|
||||
bias=not self.batch_norm, **kwargs)
|
||||
bias=use_batch_norm, **kwargs)
|
||||
self.batch_norm = nn.BatchNorm1d(
|
||||
out_channels,
|
||||
momentum=Layer.BATCH_NORM_MOMENTUM,
|
||||
track_running_stats=Layer.BATCH_NORM_TRAINING) if self.batch_norm else None
|
||||
track_running_stats=Layer.BATCH_NORM_TRAINING) if use_batch_norm else None
|
||||
|
||||
def forward(self, input_data: torch.Tensor) -> torch.Tensor:
|
||||
return super().forward(self.conv(input_data))
|
||||
|
||||
|
||||
class Conv2d(Layer):
|
||||
def __init__(self, in_channels: int, out_channels: int, kernel_size: int = 3,
|
||||
stride: Union[int, Tuple[int, int]] = 1, activation=0, batch_norm=None, **kwargs):
|
||||
super().__init__(activation, batch_norm)
|
||||
def __init__(self, in_channels: int, out_channels: int, kernel_size: Union[int, tuple[int, int]] = 3,
|
||||
stride: Union[int, tuple[int, int]] = 1, activation=0, use_batch_norm: bool = None, **kwargs):
|
||||
super().__init__(activation)
|
||||
|
||||
use_batch_norm = Layer.USE_BATCH_NORM if use_batch_norm is None else use_batch_norm
|
||||
self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride=stride,
|
||||
bias=not self.batch_norm, **kwargs)
|
||||
bias=not use_batch_norm, **kwargs)
|
||||
self.batch_norm = nn.BatchNorm2d(
|
||||
out_channels,
|
||||
momentum=Layer.BATCH_NORM_MOMENTUM,
|
||||
track_running_stats=Layer.BATCH_NORM_TRAINING) if self.batch_norm else None
|
||||
track_running_stats=Layer.BATCH_NORM_TRAINING) if use_batch_norm else None
|
||||
|
||||
def forward(self, input_data: torch.Tensor) -> torch.Tensor:
|
||||
return super().forward(self.conv(input_data))
|
||||
|
||||
|
||||
class Conv3d(Layer):
|
||||
def __init__(self, in_channels: int, out_channels: int, kernel_size: int = 3,
|
||||
stride: Union[int, Tuple[int, int]] = 1, activation=0, batch_norm=None, **kwargs):
|
||||
super().__init__(activation, batch_norm)
|
||||
def __init__(self, in_channels: int, out_channels: int, kernel_size: Union[int, tuple[int, int, int]] = 3,
|
||||
stride: Union[int, Tuple[int, int]] = 1, activation=0, use_batch_norm: bool = None, **kwargs):
|
||||
super().__init__(activation)
|
||||
|
||||
use_batch_norm = Layer.USE_BATCH_NORM if use_batch_norm is None else use_batch_norm
|
||||
self.conv = nn.Conv3d(in_channels, out_channels, kernel_size, stride=stride,
|
||||
bias=not self.batch_norm, **kwargs)
|
||||
bias=use_batch_norm, **kwargs)
|
||||
self.batch_norm = nn.BatchNorm3d(
|
||||
out_channels,
|
||||
momentum=Layer.BATCH_NORM_MOMENTUM,
|
||||
track_running_stats=Layer.BATCH_NORM_TRAINING) if self.batch_norm else None
|
||||
track_running_stats=Layer.BATCH_NORM_TRAINING) if use_batch_norm else None
|
||||
|
||||
def forward(self, input_data: torch.Tensor) -> torch.Tensor:
|
||||
return super().forward(self.conv(input_data))
|
||||
|
|
@ -114,16 +122,32 @@ class Conv3d(Layer):
|
|||
|
||||
class Deconv2d(Layer):
|
||||
def __init__(self, in_channels: int, out_channels: int, kernel_size: int = 3,
|
||||
stride: Union[int, Tuple[int, int]] = 1, activation=0, batch_norm=None, **kwargs):
|
||||
super().__init__(activation, batch_norm)
|
||||
stride: Union[int, Tuple[int, int]] = 1, activation=0, use_batch_norm: bool = None, **kwargs):
|
||||
super().__init__(activation)
|
||||
|
||||
use_batch_norm = Layer.USE_BATCH_NORM if use_batch_norm is None else use_batch_norm
|
||||
self.deconv = nn.ConvTranspose2d(
|
||||
in_channels, out_channels, kernel_size, stride=stride,
|
||||
bias=not self.batch_norm, **kwargs)
|
||||
bias=not use_batch_norm, **kwargs)
|
||||
self.batch_norm = nn.BatchNorm2d(
|
||||
out_channels,
|
||||
momentum=Layer.BATCH_NORM_MOMENTUM,
|
||||
track_running_stats=Layer.BATCH_NORM_TRAINING) if self.batch_norm else None
|
||||
track_running_stats=Layer.BATCH_NORM_TRAINING) if use_batch_norm else None
|
||||
|
||||
def forward(self, input_data: torch.Tensor) -> torch.Tensor:
|
||||
return super().forward(self.deconv(input_data))
|
||||
|
||||
|
||||
class DropPath(nn.Module):
|
||||
def __init__(self, drop_prob=None):
|
||||
super().__init__()
|
||||
self.drop_prob = drop_prob
|
||||
|
||||
def forward(self, input_data: torch.Tensor) -> torch.Tensor:
|
||||
if self.drop_prob == 0.0:
|
||||
return input_data
|
||||
keep_prob = 1 - self.drop_prob
|
||||
shape = (input_data.shape[0],) + (1,) * (input_data.ndim - 1)
|
||||
random_tensor = keep_prob + torch.rand(shape, dtype=input_data.dtype, device=input_data.device)
|
||||
random_tensor.floor_() # binarize
|
||||
return input_data.div(keep_prob) * random_tensor
|
||||
|
|
|
|||
74
residual.py
74
residual.py
|
|
@ -3,65 +3,51 @@ from typing import Union, Tuple
|
|||
import torch
|
||||
import torch.nn as nn
|
||||
|
||||
from .layers import LayerInfo, Layer
|
||||
from .layers import Conv2d, LayerInfo, Layer
|
||||
|
||||
|
||||
class ResBlock(Layer):
|
||||
def __init__(self, in_channels: int, out_channels: int, kernel_size: int = 3,
|
||||
activation=None, **kwargs):
|
||||
def __init__(self, in_channels: int, out_channels: int = -1, kernel_size: int = 3, padding: int = 1,
|
||||
stride: Union[int, Tuple[int, int]] = 1, activation=None, batch_norm=None, **kwargs):
|
||||
super().__init__(activation if activation is not None else 0, False)
|
||||
self.batch_norm = None
|
||||
if out_channels == -1:
|
||||
out_channels = in_channels
|
||||
|
||||
self.seq = nn.Sequential(
|
||||
nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size, bias=False, **kwargs),
|
||||
nn.BatchNorm2d(
|
||||
out_channels,
|
||||
momentum=Layer.BATCH_NORM_MOMENTUM,
|
||||
track_running_stats=not Layer.BATCH_NORM_TRAINING),
|
||||
torch.nn.LeakyReLU(),
|
||||
nn.Conv2d(out_channels, out_channels, kernel_size=kernel_size, bias=False, padding=1),
|
||||
nn.BatchNorm2d(
|
||||
out_channels,
|
||||
momentum=Layer.BATCH_NORM_MOMENTUM,
|
||||
track_running_stats=not Layer.BATCH_NORM_TRAINING))
|
||||
self.batch_norm = nn.BatchNorm2d(
|
||||
out_channels,
|
||||
momentum=Layer.BATCH_NORM_MOMENTUM,
|
||||
track_running_stats=not Layer.BATCH_NORM_TRAINING) if self.batch_norm else None
|
||||
Conv2d(in_channels, in_channels, kernel_size=kernel_size, stride=stride, padding=padding, **kwargs),
|
||||
Conv2d(in_channels, out_channels, kernel_size=3, padding=1,
|
||||
activation=None, batch_norm=batch_norm))
|
||||
self.residual = Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, activation=None) if (
|
||||
out_channels != in_channels or stride != 1) else None
|
||||
|
||||
def forward(self, input_data: torch.Tensor) -> torch.Tensor:
|
||||
if self.residual is not None:
|
||||
return super().forward(self.residual(input_data) + self.seq(input_data))
|
||||
return super().forward(input_data + self.seq(input_data))
|
||||
|
||||
|
||||
class ResBottleneck(Layer):
|
||||
def __init__(self, in_channels: int, out_channels: int, planes: int = 1, kernel_size: int = 3,
|
||||
stride: Union[int, Tuple[int, int]] = 1, activation=None, **kwargs):
|
||||
def __init__(self, in_channels: int, out_channels: int = -1, bottleneck_channels: int = -1, kernel_size: int = 3,
|
||||
stride: Union[int, Tuple[int, int]] = 1, padding=1,
|
||||
activation=None, batch_norm=None, **kwargs):
|
||||
super().__init__(activation if activation is not None else 0, False)
|
||||
self.batch_norm = None
|
||||
if out_channels == -1:
|
||||
out_channels = in_channels
|
||||
if bottleneck_channels == -1:
|
||||
bottleneck_channels = in_channels // 4
|
||||
|
||||
self.seq = nn.Sequential(
|
||||
nn.Conv2d(in_channels, out_channels, kernel_size=1, bias=False),
|
||||
nn.BatchNorm2d(
|
||||
out_channels,
|
||||
momentum=Layer.BATCH_NORM_MOMENTUM,
|
||||
track_running_stats=not Layer.BATCH_NORM_TRAINING),
|
||||
torch.nn.LeakyReLU(),
|
||||
nn.Conv2d(out_channels, out_channels, kernel_size=kernel_size, stride=stride, bias=False, **kwargs),
|
||||
nn.BatchNorm2d(
|
||||
out_channels,
|
||||
momentum=Layer.BATCH_NORM_MOMENTUM,
|
||||
track_running_stats=not Layer.BATCH_NORM_TRAINING),
|
||||
torch.nn.LeakyReLU(),
|
||||
nn.Conv2d(out_channels, planes * out_channels, kernel_size=1, bias=False),
|
||||
nn.BatchNorm2d(
|
||||
out_channels,
|
||||
momentum=Layer.BATCH_NORM_MOMENTUM,
|
||||
track_running_stats=not Layer.BATCH_NORM_TRAINING))
|
||||
self.downsample = nn.Sequential(
|
||||
nn.Conv2d(in_channels, planes * out_channels, stride=stride, kernel_size=1),
|
||||
nn.BatchNorm2d(
|
||||
planes * out_channels,
|
||||
momentum=Layer.BATCH_NORM_MOMENTUM,
|
||||
track_running_stats=not Layer.BATCH_NORM_TRAINING))
|
||||
Conv2d(in_channels, bottleneck_channels, kernel_size=1),
|
||||
Conv2d(bottleneck_channels, bottleneck_channels, kernel_size=kernel_size,
|
||||
stride=stride, padding=padding, **kwargs),
|
||||
Conv2d(bottleneck_channels, out_channels, kernel_size=1,
|
||||
activation=None, batch_norm=batch_norm))
|
||||
self.residual = Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, activation=None) if (
|
||||
out_channels != in_channels or stride != 1) else None
|
||||
|
||||
def forward(self, input_data: torch.Tensor) -> torch.Tensor:
|
||||
return super().forward(self.downsample(input_data) + self.seq(input_data))
|
||||
if self.residual is not None:
|
||||
return super().forward(self.residual(input_data) + self.seq(input_data))
|
||||
return super().forward(input_data + self.seq(input_data))
|
||||
|
|
|
|||
86
ssd/box.py
Normal file
86
ssd/box.py
Normal file
|
|
@ -0,0 +1,86 @@
|
|||
import numpy as np
|
||||
|
||||
|
||||
def create_box(y_pos: float, x_pos: float, height: float, width: float) -> tuple[float, float, float, float]:
|
||||
y_min, x_min, y_max, x_max = check_rectangle(
|
||||
y_pos - (height / 2), x_pos - (width / 2), y_pos + (height / 2), x_pos + (width / 2))
|
||||
return (y_min + y_max) / 2, (x_min + x_max) / 2, y_max - y_min, x_max - x_min
|
||||
|
||||
|
||||
def check_rectangle(y_min: float, x_min: float, y_max: float, x_max: float) -> tuple[float, float, float, float]:
|
||||
if y_min < 0:
|
||||
y_min = 0
|
||||
if x_min < 0:
|
||||
x_min = 0
|
||||
if y_min > 1:
|
||||
y_min = 1
|
||||
if x_min > 1:
|
||||
x_min = 1
|
||||
if y_max < 0:
|
||||
y_max = 0
|
||||
if x_max < 0:
|
||||
x_max = 0
|
||||
if y_max >= 1:
|
||||
y_max = 1
|
||||
if x_max >= 1:
|
||||
x_max = 1
|
||||
return y_min, x_min, y_max, x_max
|
||||
|
||||
|
||||
def get_boxes(predictions: np.ndarray, anchors: np.ndarray, class_index: int) -> np.ndarray:
|
||||
boxes = np.zeros(anchors.shape)
|
||||
boxes[:, 0] = (predictions[:, 0] * anchors[:, 2]) + anchors[:, 0]
|
||||
boxes[:, 1] = (predictions[:, 1] * anchors[:, 3]) + anchors[:, 1]
|
||||
boxes[:, 2] = np.exp(predictions[:, 2]) * anchors[:, 2]
|
||||
boxes[:, 3] = np.exp(predictions[:, 3]) * anchors[:, 3]
|
||||
boxes = np.asarray([create_box(*box) for box in boxes])
|
||||
|
||||
# return np.insert(boxes, 4, predictions[:, class_index], axis=-1)
|
||||
return np.concatenate([boxes, predictions[:, class_index:class_index + 1]], axis=1)
|
||||
|
||||
|
||||
def fast_nms(boxes: np.ndarray, min_iou: float) -> np.ndarray:
|
||||
# if there are no boxes, return an empty list
|
||||
if len(boxes) == 0:
|
||||
return []
|
||||
|
||||
# initialize the list of picked indexes
|
||||
pick = []
|
||||
|
||||
# grab the coordinates of the bounding boxes
|
||||
y_min = boxes[:, 0] - (boxes[:, 2] / 2)
|
||||
y_max = boxes[:, 0] + (boxes[:, 2] / 2)
|
||||
x_min = boxes[:, 1] - (boxes[:, 3] / 2)
|
||||
x_max = boxes[:, 1] + (boxes[:, 3] / 2)
|
||||
scores = boxes[:, 4]
|
||||
|
||||
# compute the area of the bounding boxes and sort the bounding boxes by the scores
|
||||
areas = (x_max - x_min) * (y_max - y_min)
|
||||
idxs = np.argsort(scores)
|
||||
|
||||
# keep looping while some indexes still remain in the indexes
|
||||
# list
|
||||
while len(idxs) > 0:
|
||||
# grab the last index in the indexes list and add the
|
||||
# index value to the list of picked indexes
|
||||
last = len(idxs) - 1
|
||||
i = idxs[last]
|
||||
pick.append(i)
|
||||
|
||||
inter_tops = np.maximum(y_min[i], y_min[idxs[:last]])
|
||||
inter_bottoms = np.minimum(y_max[i], y_max[idxs[:last]])
|
||||
inter_lefts = np.maximum(x_min[i], x_min[idxs[:last]])
|
||||
inter_rights = np.minimum(x_max[i], x_max[idxs[:last]])
|
||||
inter_areas = (inter_rights - inter_lefts) * (inter_bottoms - inter_tops)
|
||||
|
||||
# compute the ratio of overlap
|
||||
union_area = (areas[idxs[:last]] + areas[i]) - inter_areas
|
||||
overlap = inter_areas / union_area
|
||||
|
||||
# delete all indexes from the index list that have less overlap than min_iou
|
||||
idxs = np.delete(
|
||||
idxs, np.concatenate(([last], np.where(overlap > min_iou)[0])))
|
||||
|
||||
# return only the bounding boxes that were picked using the
|
||||
# integer data type
|
||||
return boxes[pick]
|
||||
112
ssd/loss.py
Normal file
112
ssd/loss.py
Normal file
|
|
@ -0,0 +1,112 @@
|
|||
import torch
|
||||
import torch.nn as nn
|
||||
|
||||
|
||||
class JacardOverlap(nn.Module):
|
||||
def forward(self, anchors: torch.Tensor, labels: torch.Tensor) -> torch.Tensor:
|
||||
"""
|
||||
Assuming rank 2 (number of boxes, locations), location is (y, x, h, w)
|
||||
Jaccard overlap : A ∩ B / A ∪ B = A ∩ B / (area(A) + area(B) - A ∩ B)
|
||||
Return:
|
||||
jaccard overlap: (tensor) Shape: [predictions.size(0), labels.size(0)]
|
||||
"""
|
||||
anchors_count = anchors.size(0)
|
||||
labels_count = labels.size(0)
|
||||
|
||||
# Getting coords (y_min, x_min, y_max, x_max) repeated to fill (anchor count, label count)
|
||||
anchor_coords = torch.cat([
|
||||
anchors[:, :2] - (anchors[:, 2:] / 2),
|
||||
anchors[:, :2] + (anchors[:, 2:] / 2)], 1).unsqueeze(1).expand(anchors_count, labels_count, 4)
|
||||
label_coords = torch.cat([
|
||||
labels[:, :2] - (labels[:, 2:] / 2),
|
||||
labels[:, :2] + (labels[:, 2:] / 2)], 1).unsqueeze(0).expand(anchors_count, labels_count, 4)
|
||||
|
||||
mins = torch.max(anchor_coords, label_coords)[:, :, :2]
|
||||
maxes = torch.min(anchor_coords, label_coords)[:, :, 2:]
|
||||
|
||||
inter_coords = torch.clamp(maxes - mins, min=0)
|
||||
inter_area = inter_coords[:, :, 0] * inter_coords[:, :, 1]
|
||||
|
||||
anchor_areas = (anchors[:, 2] * anchors[:, 3]).unsqueeze(1).expand_as(inter_area)
|
||||
label_areas = (labels[:, 2] * labels[:, 3]).unsqueeze(0).expand_as(inter_area)
|
||||
|
||||
union_area = anchor_areas + label_areas - inter_area
|
||||
return inter_area / union_area
|
||||
|
||||
|
||||
class SSDLoss(nn.Module):
|
||||
def __init__(self, anchors: torch.Tensor, label_per_image: int,
|
||||
negative_mining_ratio: int, matching_iou: float,
|
||||
location_dimmension: int = 4, localization_loss_weight: float = 1.0):
|
||||
super().__init__()
|
||||
self.anchors = anchors
|
||||
self.anchor_count = anchors.size(0)
|
||||
self.label_per_image = label_per_image
|
||||
self.location_dimmension = location_dimmension
|
||||
self.negative_mining_ratio = negative_mining_ratio
|
||||
self.matching_iou = matching_iou
|
||||
self.localization_loss_weight = localization_loss_weight
|
||||
|
||||
self.overlap = JacardOverlap()
|
||||
self.matches = []
|
||||
# self.negative_matches = []
|
||||
self.positive_class_loss = torch.Tensor()
|
||||
self.negative_class_loss = torch.Tensor()
|
||||
self.localization_loss = torch.Tensor()
|
||||
self.class_loss = torch.Tensor()
|
||||
self.final_loss = torch.Tensor()
|
||||
|
||||
def forward(self, input_data: torch.Tensor, input_labels: torch.Tensor) -> torch.Tensor:
|
||||
batch_size = input_data.size(0)
|
||||
expanded_anchors = self.anchors[:, :4].unsqueeze(0).unsqueeze(2).expand(
|
||||
batch_size, self.anchor_count, self.label_per_image, 4)
|
||||
expanded_labels = input_labels[:, :, :self.location_dimmension].unsqueeze(1).expand(
|
||||
batch_size, self.anchor_count, self.label_per_image, self.location_dimmension)
|
||||
objective_pos = (expanded_labels[:, :, :, :2] - expanded_anchors[:, :, :, :2]) / (
|
||||
expanded_anchors[:, :, :, 2:])
|
||||
objective_size = torch.log(expanded_labels[:, :, :, 2:] / expanded_anchors[:, :, :, 2:])
|
||||
|
||||
positive_objectives = []
|
||||
positive_predictions = []
|
||||
positive_class_loss = []
|
||||
negative_class_loss = []
|
||||
self.matches = []
|
||||
# self.negative_matches = []
|
||||
for batch_index in range(batch_size):
|
||||
predictions = input_data[batch_index]
|
||||
labels = input_labels[batch_index]
|
||||
overlaps = self.overlap(self.anchors[:, :4], labels[:, :4])
|
||||
mask = (overlaps >= self.matching_iou).long()
|
||||
match_indices = torch.nonzero(mask, as_tuple=False)
|
||||
self.matches.append(match_indices.detach().cpu())
|
||||
|
||||
mining_count = int(self.negative_mining_ratio * len(self.matches[-1]))
|
||||
masked_prediction = predictions[:, self.location_dimmension] + torch.max(mask, dim=1)[0]
|
||||
non_match_indices = torch.argsort(masked_prediction, dim=-1, descending=False)[:mining_count]
|
||||
# self.negative_matches.append(non_match_indices.detach().cpu())
|
||||
|
||||
for anchor_index, label_index in match_indices:
|
||||
positive_predictions.append(predictions[anchor_index])
|
||||
positive_objectives.append(
|
||||
torch.cat((
|
||||
objective_pos[batch_index, anchor_index, label_index],
|
||||
objective_size[batch_index, anchor_index, label_index]), dim=-1))
|
||||
positive_class_loss.append(torch.log(
|
||||
predictions[anchor_index, self.location_dimmension + labels[label_index, -1].long()]))
|
||||
|
||||
for anchor_index in non_match_indices:
|
||||
negative_class_loss.append(
|
||||
torch.log(predictions[anchor_index, self.location_dimmension]))
|
||||
|
||||
if not positive_predictions:
|
||||
return None
|
||||
positive_predictions = torch.stack(positive_predictions)
|
||||
positive_objectives = torch.stack(positive_objectives)
|
||||
self.positive_class_loss = -torch.sum(torch.stack(positive_class_loss))
|
||||
self.negative_class_loss = -torch.sum(torch.stack(negative_class_loss))
|
||||
self.localization_loss = nn.functional.smooth_l1_loss(
|
||||
positive_predictions[:, self.location_dimmension],
|
||||
positive_objectives)
|
||||
self.class_loss = self.positive_class_loss + self.negative_class_loss
|
||||
self.final_loss = (self.localization_loss_weight * self.localization_loss) + self.class_loss
|
||||
return self.final_loss
|
||||
165
ssd/ssd.py
Normal file
165
ssd/ssd.py
Normal file
|
|
@ -0,0 +1,165 @@
|
|||
import colorsys
|
||||
import math
|
||||
|
||||
import numpy as np
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
|
||||
from .box import check_rectangle
|
||||
from ..layers import Conv2d
|
||||
|
||||
|
||||
class SSD(nn.Module):
|
||||
|
||||
class Detector(nn.Module):
|
||||
def __init__(self, input_features: int, output_features: int):
|
||||
super().__init__()
|
||||
self.conv = Conv2d(input_features, output_features, kernel_size=3, padding=1,
|
||||
batch_norm=False, activation=None)
|
||||
self.output = None
|
||||
|
||||
def forward(self, input_data: torch.Tensor) -> torch.Tensor:
|
||||
self.output = self.conv(input_data).permute(0, 2, 3, 1)
|
||||
return self.output
|
||||
|
||||
class DetectorMerge(nn.Module):
|
||||
def __init__(self, location_dimmension: int):
|
||||
super().__init__()
|
||||
self.location_dim = location_dimmension
|
||||
|
||||
def forward(self, detector_outputs: torch.Tensor) -> torch.Tensor:
|
||||
return torch.cat(
|
||||
[detector_outputs[:, :, :self.location_dim],
|
||||
torch.softmax(detector_outputs[:, :, self.location_dim:], dim=2)], dim=2)
|
||||
|
||||
class AnchorInfo:
|
||||
def __init__(self, center: tuple[float, float], size: tuple[float],
|
||||
index: int, layer_index: int, map_index: tuple[int, int], color_index: int,
|
||||
ratio: float, size_factor: float):
|
||||
self.index = index
|
||||
self.layer_index = layer_index
|
||||
self.map_index = map_index
|
||||
self.color_index = color_index
|
||||
self.ratio = ratio
|
||||
self.size_factor = size_factor
|
||||
self.center = center
|
||||
self.size = size
|
||||
self.box = check_rectangle(
|
||||
center[0] - (size[0] / 2), center[1] - (size[1] / 2),
|
||||
center[0] + (size[0] / 2), center[1] + (size[1] / 2))
|
||||
|
||||
def __repr__(self):
|
||||
return (f'{self.__class__.__name__}'
|
||||
f'(index:{self.index}, layer:{self.layer_index}, coord:{self.map_index}'
|
||||
f', center:({self.center[0]:.03f}, {self.center[1]:.03f})'
|
||||
f', size:({self.size[0]:.03f}, {self.size[1]:.03f})'
|
||||
f', ratio:{self.ratio:.03f}, size_factor:{self.size_factor:.03f})'
|
||||
f', y:[{self.box[0]:.03f}:{self.box[2]:.03f}]'
|
||||
f', x:[{self.box[1]:.03f}:{self.box[3]:.03f}])')
|
||||
|
||||
def __array__(self):
|
||||
return np.array([*self.center, *self.size])
|
||||
|
||||
def __init__(self, base_network: nn.Module, input_sample: torch.Tensor, classes: list[str],
|
||||
location_dimmension: int, layer_channels: list[int], layer_box_ratios: list[float], layer_args: dict,
|
||||
box_size_factors: list[float]):
|
||||
super().__init__()
|
||||
|
||||
self.location_dim = location_dimmension
|
||||
self.classes = ['none'] + classes
|
||||
self.class_count = len(self.classes)
|
||||
self.base_input_shape = input_sample.numpy().shape[1:]
|
||||
self.base_network = base_network
|
||||
sample_output = base_network(input_sample)
|
||||
self.base_output_shape = list(sample_output.detach().numpy().shape)[-3:]
|
||||
|
||||
layer_convs: list[nn.Module] = []
|
||||
layer_detectors: list[SSD.Detector] = []
|
||||
last_feature_count = self.base_output_shape[0]
|
||||
for layer_index, (output_features, kwargs) in enumerate(zip(layer_channels, layer_args)):
|
||||
if 'disable' not in kwargs:
|
||||
layer_convs.append(Conv2d(last_feature_count, output_features, **kwargs))
|
||||
layer_detectors.append(SSD.Detector(
|
||||
last_feature_count, (self.class_count + self.location_dim) * len(layer_box_ratios[layer_index])))
|
||||
# layers.append(SSD.Layer(
|
||||
# last_feature_count, output_features,
|
||||
# (self.class_count + self.location_dim) * len(layer_box_ratios[layer_index]),
|
||||
# **kwargs))
|
||||
last_feature_count = output_features
|
||||
self.layer_convs = nn.ModuleList(layer_convs)
|
||||
self.layer_detectors = nn.ModuleList(layer_detectors)
|
||||
|
||||
self.merge = self.DetectorMerge(location_dimmension)
|
||||
|
||||
self.anchors_numpy, self.anchor_info, self.box_colors = self._create_anchors(
|
||||
sample_output, self.layer_convs, self.layer_detectors, layer_box_ratios, box_size_factors,
|
||||
input_sample.shape[3] / input_sample.shape[2])
|
||||
self.anchors = torch.from_numpy(self.anchors_numpy)
|
||||
|
||||
def forward(self, input_data: torch.Tensor) -> torch.Tensor:
|
||||
head = self.base_network(input_data)
|
||||
detector_outputs = []
|
||||
for layer_index, detector in enumerate(self.layer_detectors):
|
||||
detector_out = detector(head)
|
||||
detector_outputs.append(detector_out.reshape(
|
||||
detector_out.size(0), -1, self.class_count + self.location_dim))
|
||||
if layer_index < len(self.layer_convs):
|
||||
head = self.layer_convs[layer_index](head)
|
||||
detector_outputs = torch.cat(detector_outputs, 1)
|
||||
return self.merge(detector_outputs)
|
||||
# base_output = self.base_network(input_data)
|
||||
# head = base_output
|
||||
# outputs = []
|
||||
# for layer in self.layers:
|
||||
# head, detector_output = layer(head)
|
||||
# outputs.append(detector_output.reshape(base_output.size(0), -1, self.class_count + self.location_dim))
|
||||
# outputs = torch.cat(outputs, 1)
|
||||
# return torch.cat(
|
||||
# [outputs[:, :, :self.location_dim], torch.softmax(outputs[:, :, self.location_dim:], dim=2)], dim=2)
|
||||
|
||||
def _apply(self, fn):
|
||||
super()._apply(fn)
|
||||
self.anchors = fn(self.anchors)
|
||||
return self
|
||||
|
||||
@staticmethod
|
||||
def _create_anchors(
|
||||
base_output: torch.Tensor, layers: nn.ModuleList, detectors: nn.ModuleList, layer_box_ratios: list[float],
|
||||
box_size_factors: list[float], image_ratio: float) -> tuple[np.ndarray, np.ndarray, list[np.ndarray]]:
|
||||
anchors = []
|
||||
anchor_info: list[SSD.AnchorInfo] = []
|
||||
box_colors: list[np.ndarray] = []
|
||||
head = base_output
|
||||
|
||||
for layer_index, detector in enumerate(detectors):
|
||||
detector_output = detector(head) # detector output shape : NCRSHW (Ratio, Size)
|
||||
if layer_index < len(layers):
|
||||
head = layers[layer_index](head)
|
||||
|
||||
detector_rows = detector_output.size()[1]
|
||||
detector_cols = detector_output.size()[2]
|
||||
color_index = 0
|
||||
layer_ratios = layer_box_ratios[layer_index]
|
||||
for index_y in range(detector_rows):
|
||||
center_y = (index_y + 0.5) / detector_rows
|
||||
for index_x in range(detector_cols):
|
||||
center_x = (index_x + 0.5) / detector_cols
|
||||
for ratio, size_factor in zip(layer_ratios, box_size_factors):
|
||||
box_colors.append((np.asarray(colorsys.hsv_to_rgb(
|
||||
color_index / len(layer_ratios), 1.0, 1.0)) * 255).astype(np.uint8))
|
||||
color_index += 1
|
||||
unit_box_size = size_factor / max(detector_rows, detector_cols)
|
||||
anchor_width = unit_box_size * math.sqrt(ratio / image_ratio)
|
||||
anchor_height = unit_box_size / math.sqrt(ratio / image_ratio)
|
||||
anchor_info.append(SSD.AnchorInfo(
|
||||
(center_y, center_x),
|
||||
(anchor_height, anchor_width),
|
||||
len(anchors),
|
||||
layer_index,
|
||||
(index_y, index_x),
|
||||
len(box_colors) - 1,
|
||||
ratio,
|
||||
size_factor
|
||||
))
|
||||
anchors.append([center_y, center_x, anchor_height, anchor_width])
|
||||
return np.asarray(anchors, dtype=np.float32), anchor_info, box_colors
|
||||
188
transformer/vision_transformer.py
Normal file
188
transformer/vision_transformer.py
Normal file
|
|
@ -0,0 +1,188 @@
|
|||
"""
|
||||
Data efficent image transformer (deit)
|
||||
from https://github.com/facebookresearch/deit, https://arxiv.org/abs/2012.12877
|
||||
And Vit : https://arxiv.org/abs/2010.11929
|
||||
"""
|
||||
|
||||
|
||||
from functools import partial
|
||||
import math
|
||||
|
||||
import numpy as np
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
|
||||
from ..layers import DropPath, Layer
|
||||
|
||||
|
||||
class PatchEmbed(nn.Module):
|
||||
def __init__(self, image_shape: tuple[int, int], patch_size: int = 16,
|
||||
in_channels: int = 3, embed_dim: int = 768):
|
||||
super().__init__()
|
||||
patch_count = (image_shape[0] // patch_size) * (image_shape[1] // patch_size)
|
||||
self.image_shape = image_shape
|
||||
self.patch_size = patch_size
|
||||
self.patch_count = patch_count
|
||||
|
||||
self.projector = nn.Conv2d(in_channels, embed_dim, kernel_size=patch_size, stride=patch_size)
|
||||
|
||||
def forward(self, input_data: torch.Tensor) -> torch.Tensor:
|
||||
return self.projector(input_data).flatten(2).transpose(1, 2)
|
||||
|
||||
|
||||
class SelfAttention(nn.Module):
|
||||
def __init__(self, dim: int, head_count: int, qkv_bias: bool, qk_scale: float,
|
||||
attention_drop_rate: float, projection_drop_rate: float):
|
||||
super().__init__()
|
||||
self.head_count = head_count
|
||||
head_dim = dim // head_count
|
||||
self.scale = qk_scale or head_dim ** -0.5
|
||||
|
||||
self.qkv = nn.Linear(dim, dim * 3, bias=qkv_bias)
|
||||
self.attention_drop = nn.Dropout(attention_drop_rate)
|
||||
self.projector = nn.Linear(dim, dim)
|
||||
self.projection_drop = nn.Dropout(projection_drop_rate)
|
||||
|
||||
def forward(self, input_data: torch.Tensor) -> torch.Tensor:
|
||||
batch_size, sequence_length, channel_count = input_data.shape
|
||||
qkv = self.qkv(input_data).reshape(
|
||||
batch_size, sequence_length, 3, self.head_count, channel_count // self.head_count).permute(
|
||||
2, 0, 3, 1, 4)
|
||||
# (output shape : 3, batch_size, head_count, sequence_lenght, channel_count / head_count)
|
||||
query, key, value = qkv[0], qkv[1], qkv[2]
|
||||
attention = self.attention_drop(((query @ key.transpose(-2, -1)) * self.scale).softmax(dim=-1))
|
||||
return self.projection_drop(self.projector(
|
||||
(attention @ value).transpose(1, 2).reshape(batch_size, sequence_length, channel_count)))
|
||||
|
||||
|
||||
class Block(nn.Module):
|
||||
def __init__(self, dim: int, head_count: int, mlp_ratio: float,
|
||||
qkv_bias: bool, qk_scale: float, drop_rate: float,
|
||||
attention_drop_rate: float, drop_path_rate: float,
|
||||
norm_layer=0, activation=0):
|
||||
super().__init__()
|
||||
|
||||
self.norm1 = norm_layer(dim)
|
||||
self.attention = SelfAttention(dim, head_count, qkv_bias, qk_scale, attention_drop_rate, drop_rate)
|
||||
self.drop_path = DropPath(drop_path_rate) if drop_path_rate > 0.0 else nn.Identity()
|
||||
self.norm2 = norm_layer(dim)
|
||||
self.mlp = nn.Sequential(
|
||||
nn.Linear(dim, int(dim * mlp_ratio)),
|
||||
activation(),
|
||||
nn.Linear(int(dim * mlp_ratio), dim),
|
||||
nn.Dropout(drop_rate))
|
||||
|
||||
def forward(self, input_data: torch.Tensor) -> torch.Tensor:
|
||||
out = input_data + self.drop_path(self.attention(self.norm1(input_data)))
|
||||
return out + self.drop_path(self.mlp(self.norm2(out)))
|
||||
|
||||
|
||||
class VissionTransformer(nn.Module):
|
||||
QK_SCALE = None
|
||||
ACTIVATION = 0
|
||||
NORM_LAYER = nn.LayerNorm
|
||||
|
||||
def __init__(self, image_shape: tuple[int, int, int], class_count: int, depth: int,
|
||||
patch_size: int = 16, embed_dim: int = 768,
|
||||
head_count: int = 8, mlp_ratio: float = 4.0, qkv_bias: bool = True, qk_scale: float = None,
|
||||
representation_size=None, distilled: bool = False, drop_rate: float = 0.0,
|
||||
attention_drop_rate: float = 0.0, drop_path_rate: float = 0.0, embed_layer=PatchEmbed,
|
||||
norm_layer=0, activation=0):
|
||||
super().__init__()
|
||||
qk_scale = qk_scale if qk_scale is not None else self.QK_SCALE
|
||||
activation = activation if activation != 0 else self.ACTIVATION
|
||||
activation = activation if activation != 0 else Layer.ACTIVATION
|
||||
norm_layer = norm_layer if norm_layer != 0 else self.NORM_LAYER
|
||||
|
||||
self.class_count = class_count
|
||||
self.feature_count = self.embed_dim = embed_dim
|
||||
self.distilled = distilled
|
||||
norm_layer = norm_layer or partial(nn.LayerNorm, eps=1e-6)
|
||||
|
||||
self.patch_embed = embed_layer(image_shape[1:], patch_size=patch_size,
|
||||
in_channels=image_shape[0], embed_dim=embed_dim)
|
||||
patch_count = self.patch_embed.patch_count
|
||||
token_count = 2 if distilled else 1
|
||||
|
||||
self.class_token = nn.Parameter(torch.zeros(1, 1, embed_dim))
|
||||
self.distillation_token = nn.Parameter(torch.zeros(1, 1, embed_dim)) if distilled else None
|
||||
self.position_embeddings = nn.Parameter(torch.zeros(1, patch_count + token_count, embed_dim))
|
||||
self.position_drop = nn.Dropout(drop_rate) if drop_rate > 0.0 else nn.Identity()
|
||||
|
||||
depth_path_drop_rates = np.linspace(0, drop_path_rate, depth) if drop_path_rate > 0.0 else [0.0] * depth
|
||||
self.blocks = nn.Sequential(*[
|
||||
Block(embed_dim, head_count, mlp_ratio, qkv_bias, qk_scale, drop_rate, attention_drop_rate,
|
||||
pdr, norm_layer, activation) for pdr in depth_path_drop_rates])
|
||||
self.norm = norm_layer(embed_dim)
|
||||
|
||||
# Representation Layer
|
||||
if representation_size and not distilled:
|
||||
self.feature_count = representation_size
|
||||
self.pre_logits = nn.Sequential(
|
||||
nn.Linear(embed_dim, representation_size),
|
||||
nn.Tanh())
|
||||
else:
|
||||
self.pre_logits = nn.Identity()
|
||||
|
||||
# Final classifier
|
||||
self.head = nn.Linear(self.feature_count, class_count) if class_count > 0 else nn.Identity()
|
||||
self.head_distilled = nn.Linear(
|
||||
self.embed_dim, self.class_count) if class_count > 0 and distilled else nn.Identity()
|
||||
|
||||
# Init weights
|
||||
nn.init.trunc_normal_(self.class_token, std=0.02)
|
||||
nn.init.trunc_normal_(self.position_embeddings, std=0.02)
|
||||
if self.distilled:
|
||||
nn.init.trunc_normal_(self.distillation_token, std=0.02)
|
||||
|
||||
# Applying weights initialization made no difference so far
|
||||
self.apply(partial(self._init_weights, head_bias=-math.log(self.class_count)))
|
||||
|
||||
@torch.jit.ignore
|
||||
def no_weight_decay(self) -> dict:
|
||||
return {'class_token', 'distillation_token', 'position_embeddings'}
|
||||
|
||||
def get_classifier(self):
|
||||
return self.head if self.distillation_token is None else (self.head, self.head_distilled)
|
||||
|
||||
def reset_classifier(self, class_count: int):
|
||||
self.class_count = class_count
|
||||
self.head = nn.Linear(self.feature_count, class_count) if class_count > 0 else nn.Identity()
|
||||
self.head_distilled = nn.Linear(
|
||||
self.embed_dim, self.class_count) if class_count > 0 and self.distilled else nn.Identity()
|
||||
|
||||
def forward(self, input_data: torch.Tensor) -> torch.Tensor:
|
||||
embeddings = self.patch_embed(input_data)
|
||||
class_token = self.class_token.expand(embeddings.shape[0], -1, -1)
|
||||
|
||||
if self.distilled:
|
||||
block_output = self.norm(self.blocks(self.position_drop(
|
||||
torch.cat((class_token, self.distillation_token.expand(embeddings.shape[0], -1, -1), embeddings), dim=1)
|
||||
+ self.position_embeddings)))
|
||||
distilled_head_output = self.head_distilled(block_output[:, 1])
|
||||
head_output = self.head(block_output[:, 0])
|
||||
if self.training and not torch.jit.is_scripting():
|
||||
return head_output, distilled_head_output
|
||||
return (head_output + distilled_head_output) / 2.0
|
||||
|
||||
block_output = self.norm(self.blocks(self.position_drop(
|
||||
torch.cat((class_token, embeddings), dim=1) + self.position_embeddings)))
|
||||
return self.head(self.pre_logits(block_output[:, 0]))
|
||||
|
||||
@staticmethod
|
||||
def _init_weights(module: nn.Module, name: str = '', head_bias: float = 0.0):
|
||||
if isinstance(module, nn.Linear):
|
||||
if name.startswith('head'):
|
||||
nn.init.zeros_(module.weight)
|
||||
nn.init.constant_(module.bias, head_bias)
|
||||
elif name.startswith('pre_logits'):
|
||||
nn.init.xavier_normal_(module.weight)
|
||||
nn.init.zeros_(module.bias)
|
||||
# pytorch init for conv is fine
|
||||
# elif isinstance(module, nn.Conv2d):
|
||||
# nn.init.xavier_normal_(module.weight)
|
||||
# if module.bias is not None:
|
||||
# nn.init.zeros_(module.bias)
|
||||
elif isinstance(module, nn.LayerNorm):
|
||||
nn.init.ones_(module.weight)
|
||||
nn.init.zeros_(module.bias)
|
||||
|
|
@ -3,7 +3,6 @@ from multiprocessing import shared_memory
|
|||
import os
|
||||
from typing import Callable, Iterable, Optional, Tuple
|
||||
|
||||
import h5py
|
||||
import numpy as np
|
||||
|
||||
|
||||
|
|
@ -20,6 +19,7 @@ class BatchGenerator:
|
|||
self.num_workers = num_workers
|
||||
self.flip_data = flip_data
|
||||
self.pipeline = pipeline
|
||||
self.process_id = 'NA'
|
||||
|
||||
if not preload:
|
||||
self.data_processor = data_processor
|
||||
|
|
@ -37,6 +37,7 @@ class BatchGenerator:
|
|||
os.makedirs(os.path.dirname(save_path))
|
||||
|
||||
if save and os.path.exists(save_path):
|
||||
import h5py
|
||||
with h5py.File(save_path, 'r') as h5_file:
|
||||
self.data = np.asarray(h5_file['data'])
|
||||
self.label = np.asarray(h5_file['label'])
|
||||
|
|
@ -80,7 +81,6 @@ class BatchGenerator:
|
|||
self.batch_data = first_data
|
||||
self.batch_label = first_label
|
||||
|
||||
self.process_id = 'NA'
|
||||
if self.prefetch or self.num_workers > 1:
|
||||
self.cache_memory_indices = shared_memory.SharedMemory(create=True, size=self.index_list.nbytes)
|
||||
self.cache_indices = np.ndarray(
|
||||
|
|
|
|||
|
|
@ -3,7 +3,6 @@ from multiprocessing import shared_memory
|
|||
import os
|
||||
from typing import Callable, Iterable, Optional
|
||||
|
||||
import h5py
|
||||
import numpy as np
|
||||
|
||||
try:
|
||||
|
|
@ -25,12 +24,13 @@ class SequenceGenerator(BatchGenerator):
|
|||
self.prefetch = prefetch and not preload
|
||||
self.num_workers = num_workers
|
||||
self.pipeline = pipeline
|
||||
self.process_id = 'NA'
|
||||
|
||||
if not preload:
|
||||
self.data_processor = data_processor
|
||||
self.label_processor = label_processor
|
||||
self.data = np.asarray(data)
|
||||
self.label = np.asarray(label)
|
||||
self.data = np.asarray(data, dtype=np.object)
|
||||
self.label = np.asarray(label, dtype=np.object)
|
||||
else:
|
||||
self.data_processor = None
|
||||
self.label_processor = None
|
||||
|
|
@ -42,6 +42,7 @@ class SequenceGenerator(BatchGenerator):
|
|||
os.makedirs(os.path.dirname(save_path))
|
||||
|
||||
if save and os.path.exists(save_path):
|
||||
import h5py
|
||||
with h5py.File(save_path, 'r') as h5_file:
|
||||
data_len = np.asarray(h5_file['data_len'])
|
||||
self.data = []
|
||||
|
|
@ -49,22 +50,23 @@ class SequenceGenerator(BatchGenerator):
|
|||
for sequence_index in range(data_len):
|
||||
self.data.append(np.asarray(h5_file[f'data_{sequence_index}']))
|
||||
self.label.append(np.asarray(h5_file[f'label_{sequence_index}']))
|
||||
self.data = np.asarray(self.data)
|
||||
self.label = np.asarray(self.label)
|
||||
self.data = np.asarray(self.data, dtype=np.object)
|
||||
self.label = np.asarray(self.label, dtype=np.object)
|
||||
else:
|
||||
if data_processor:
|
||||
self.data = np.asarray(
|
||||
[np.asarray([data_processor(entry) for entry in serie]) for serie in data],
|
||||
dtype=np.object if len(data) > 1 else None)
|
||||
else:
|
||||
self.data = np.asarray(data)
|
||||
self.data = np.asarray(data, dtype=np.object)
|
||||
if label_processor:
|
||||
self.label = np.asarray(
|
||||
[np.asarray([label_processor(entry) for entry in serie]) for serie in label],
|
||||
dtype=np.object if len(label) > 1 else None)
|
||||
else:
|
||||
self.label = np.asarray(label)
|
||||
self.label = np.asarray(label, dtype=np.object)
|
||||
if save:
|
||||
import h5py
|
||||
with h5py.File(save_path, 'w') as h5_file:
|
||||
h5_file.create_dataset('data_len', data=len(self.data))
|
||||
for sequence_index in range(len(self.data)):
|
||||
|
|
@ -133,7 +135,6 @@ class SequenceGenerator(BatchGenerator):
|
|||
self.batch_data = first_data
|
||||
self.batch_label = first_label
|
||||
|
||||
self.process_id = 'NA'
|
||||
if self.prefetch or self.num_workers > 1:
|
||||
self.cache_memory_indices = shared_memory.SharedMemory(create=True, size=self.index_list.nbytes)
|
||||
self.cache_indices = np.ndarray(
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue