# Ultralytics 🚀 AGPL-3.0 License - https://ultralytics.com/license """Transformer modules.""" import math import torch import torch.nn as nn import torch.nn.functional as F from torch.nn.init import constant_, xavier_uniform_ from .conv import Conv from .utils import _get_clones, inverse_sigmoid, multi_scale_deformable_attn_pytorch __all__ = ( "TransformerEncoderLayer", "TransformerLayer", "TransformerBlock", "MLPBlock", "LayerNorm2d", "AIFI", "DeformableTransformerDecoder", "DeformableTransformerDecoderLayer", "MSDeformAttn", "MLP", ) class TransformerEncoderLayer(nn.Module): """ Defines a single layer of the transformer encoder. Attributes: ma (nn.MultiheadAttention): Multi-head attention module. fc1 (nn.Linear): First linear layer in the feedforward network. fc2 (nn.Linear): Second linear layer in the feedforward network. norm1 (nn.LayerNorm): Layer normalization after attention. norm2 (nn.LayerNorm): Layer normalization after feedforward network. dropout (nn.Dropout): Dropout layer for the feedforward network. dropout1 (nn.Dropout): Dropout layer after attention. dropout2 (nn.Dropout): Dropout layer after feedforward network. act (nn.Module): Activation function. normalize_before (bool): Whether to apply normalization before attention and feedforward. """ def __init__(self, c1, cm=2048, num_heads=8, dropout=0.0, act=nn.GELU(), normalize_before=False): """ Initialize the TransformerEncoderLayer with specified parameters. Args: c1 (int): Input dimension. cm (int): Hidden dimension in the feedforward network. num_heads (int): Number of attention heads. dropout (float): Dropout probability. act (nn.Module): Activation function. normalize_before (bool): Whether to apply normalization before attention and feedforward. """ super().__init__() from ...utils.torch_utils import TORCH_1_9 if not TORCH_1_9: raise ModuleNotFoundError( "TransformerEncoderLayer() requires torch>=1.9 to use nn.MultiheadAttention(batch_first=True)." ) self.ma = nn.MultiheadAttention(c1, num_heads, dropout=dropout, batch_first=True) # Implementation of Feedforward model self.fc1 = nn.Linear(c1, cm) self.fc2 = nn.Linear(cm, c1) self.norm1 = nn.LayerNorm(c1) self.norm2 = nn.LayerNorm(c1) self.dropout = nn.Dropout(dropout) self.dropout1 = nn.Dropout(dropout) self.dropout2 = nn.Dropout(dropout) self.act = act self.normalize_before = normalize_before @staticmethod def with_pos_embed(tensor, pos=None): """Add position embeddings to the tensor if provided.""" return tensor if pos is None else tensor + pos def forward_post(self, src, src_mask=None, src_key_padding_mask=None, pos=None): """ Perform forward pass with post-normalization. Args: src (torch.Tensor): Input tensor. src_mask (torch.Tensor, optional): Mask for the src sequence. src_key_padding_mask (torch.Tensor, optional): Mask for the src keys per batch. pos (torch.Tensor, optional): Positional encoding. Returns: (torch.Tensor): Output tensor after attention and feedforward. """ q = k = self.with_pos_embed(src, pos) src2 = self.ma(q, k, value=src, attn_mask=src_mask, key_padding_mask=src_key_padding_mask)[0] src = src + self.dropout1(src2) src = self.norm1(src) src2 = self.fc2(self.dropout(self.act(self.fc1(src)))) src = src + self.dropout2(src2) return self.norm2(src) def forward_pre(self, src, src_mask=None, src_key_padding_mask=None, pos=None): """ Perform forward pass with pre-normalization. Args: src (torch.Tensor): Input tensor. src_mask (torch.Tensor, optional): Mask for the src sequence. src_key_padding_mask (torch.Tensor, optional): Mask for the src keys per batch. pos (torch.Tensor, optional): Positional encoding. Returns: (torch.Tensor): Output tensor after attention and feedforward. """ src2 = self.norm1(src) q = k = self.with_pos_embed(src2, pos) src2 = self.ma(q, k, value=src2, attn_mask=src_mask, key_padding_mask=src_key_padding_mask)[0] src = src + self.dropout1(src2) src2 = self.norm2(src) src2 = self.fc2(self.dropout(self.act(self.fc1(src2)))) return src + self.dropout2(src2) def forward(self, src, src_mask=None, src_key_padding_mask=None, pos=None): """ Forward propagates the input through the encoder module. Args: src (torch.Tensor): Input tensor. src_mask (torch.Tensor, optional): Mask for the src sequence. src_key_padding_mask (torch.Tensor, optional): Mask for the src keys per batch. pos (torch.Tensor, optional): Positional encoding. Returns: (torch.Tensor): Output tensor after transformer encoder layer. """ if self.normalize_before: return self.forward_pre(src, src_mask, src_key_padding_mask, pos) return self.forward_post(src, src_mask, src_key_padding_mask, pos) class AIFI(TransformerEncoderLayer): """ Defines the AIFI transformer layer. This class extends TransformerEncoderLayer to work with 2D data by adding positional embeddings. """ def __init__(self, c1, cm=2048, num_heads=8, dropout=0, act=nn.GELU(), normalize_before=False): """ Initialize the AIFI instance with specified parameters. Args: c1 (int): Input dimension. cm (int): Hidden dimension in the feedforward network. num_heads (int): Number of attention heads. dropout (float): Dropout probability. act (nn.Module): Activation function. normalize_before (bool): Whether to apply normalization before attention and feedforward. """ super().__init__(c1, cm, num_heads, dropout, act, normalize_before) def forward(self, x): """ Forward pass for the AIFI transformer layer. Args: x (torch.Tensor): Input tensor with shape [B, C, H, W]. Returns: (torch.Tensor): Output tensor with shape [B, C, H, W]. """ c, h, w = x.shape[1:] pos_embed = self.build_2d_sincos_position_embedding(w, h, c) # Flatten [B, C, H, W] to [B, HxW, C] x = super().forward(x.flatten(2).permute(0, 2, 1), pos=pos_embed.to(device=x.device, dtype=x.dtype)) return x.permute(0, 2, 1).view([-1, c, h, w]).contiguous() @staticmethod def build_2d_sincos_position_embedding(w, h, embed_dim=256, temperature=10000.0): """ Build 2D sine-cosine position embedding. Args: w (int): Width of the feature map. h (int): Height of the feature map. embed_dim (int): Embedding dimension. temperature (float): Temperature for the sine/cosine functions. Returns: (torch.Tensor): Position embedding with shape [1, embed_dim, h*w]. """ assert embed_dim % 4 == 0, "Embed dimension must be divisible by 4 for 2D sin-cos position embedding" grid_w = torch.arange(w, dtype=torch.float32) grid_h = torch.arange(h, dtype=torch.float32) grid_w, grid_h = torch.meshgrid(grid_w, grid_h, indexing="ij") pos_dim = embed_dim // 4 omega = torch.arange(pos_dim, dtype=torch.float32) / pos_dim omega = 1.0 / (temperature**omega) out_w = grid_w.flatten()[..., None] @ omega[None] out_h = grid_h.flatten()[..., None] @ omega[None] return torch.cat([torch.sin(out_w), torch.cos(out_w), torch.sin(out_h), torch.cos(out_h)], 1)[None] class TransformerLayer(nn.Module): """Transformer layer https://arxiv.org/abs/2010.11929 (LayerNorm layers removed for better performance).""" def __init__(self, c, num_heads): """ Initialize a self-attention mechanism using linear transformations and multi-head attention. Args: c (int): Input and output channel dimension. num_heads (int): Number of attention heads. """ super().__init__() self.q = nn.Linear(c, c, bias=False) self.k = nn.Linear(c, c, bias=False) self.v = nn.Linear(c, c, bias=False) self.ma = nn.MultiheadAttention(embed_dim=c, num_heads=num_heads) self.fc1 = nn.Linear(c, c, bias=False) self.fc2 = nn.Linear(c, c, bias=False) def forward(self, x): """ Apply a transformer block to the input x and return the output. Args: x (torch.Tensor): Input tensor. Returns: (torch.Tensor): Output tensor after transformer layer. """ x = self.ma(self.q(x), self.k(x), self.v(x))[0] + x return self.fc2(self.fc1(x)) + x class TransformerBlock(nn.Module): """ Vision Transformer https://arxiv.org/abs/2010.11929. Attributes: conv (Conv, optional): Convolution layer if input and output channels differ. linear (nn.Linear): Learnable position embedding. tr (nn.Sequential): Sequential container of transformer layers. c2 (int): Output channel dimension. """ def __init__(self, c1, c2, num_heads, num_layers): """ Initialize a Transformer module with position embedding and specified number of heads and layers. Args: c1 (int): Input channel dimension. c2 (int): Output channel dimension. num_heads (int): Number of attention heads. num_layers (int): Number of transformer layers. """ super().__init__() self.conv = None if c1 != c2: self.conv = Conv(c1, c2) self.linear = nn.Linear(c2, c2) # learnable position embedding self.tr = nn.Sequential(*(TransformerLayer(c2, num_heads) for _ in range(num_layers))) self.c2 = c2 def forward(self, x): """ Forward propagates the input through the bottleneck module. Args: x (torch.Tensor): Input tensor with shape [b, c1, w, h]. Returns: (torch.Tensor): Output tensor with shape [b, c2, w, h]. """ if self.conv is not None: x = self.conv(x) b, _, w, h = x.shape p = x.flatten(2).permute(2, 0, 1) return self.tr(p + self.linear(p)).permute(1, 2, 0).reshape(b, self.c2, w, h) class MLPBlock(nn.Module): """Implements a single block of a multi-layer perceptron.""" def __init__(self, embedding_dim, mlp_dim, act=nn.GELU): """ Initialize the MLPBlock with specified embedding dimension, MLP dimension, and activation function. Args: embedding_dim (int): Input and output dimension. mlp_dim (int): Hidden dimension. act (nn.Module): Activation function. """ super().__init__() self.lin1 = nn.Linear(embedding_dim, mlp_dim) self.lin2 = nn.Linear(mlp_dim, embedding_dim) self.act = act() def forward(self, x: torch.Tensor) -> torch.Tensor: """ Forward pass for the MLPBlock. Args: x (torch.Tensor): Input tensor. Returns: (torch.Tensor): Output tensor after MLP block. """ return self.lin2(self.act(self.lin1(x))) class MLP(nn.Module): """ Implements a simple multi-layer perceptron (also called FFN). Attributes: num_layers (int): Number of layers in the MLP. layers (nn.ModuleList): List of linear layers. sigmoid (bool): Whether to apply sigmoid to the output. act (nn.Module): Activation function. """ def __init__(self, input_dim, hidden_dim, output_dim, num_layers, act=nn.ReLU, sigmoid=False): """ Initialize the MLP with specified input, hidden, output dimensions and number of layers. Args: input_dim (int): Input dimension. hidden_dim (int): Hidden dimension. output_dim (int): Output dimension. num_layers (int): Number of layers. act (nn.Module): Activation function. sigmoid (bool): Whether to apply sigmoid to the output. """ super().__init__() self.num_layers = num_layers h = [hidden_dim] * (num_layers - 1) self.layers = nn.ModuleList(nn.Linear(n, k) for n, k in zip([input_dim] + h, h + [output_dim])) self.sigmoid = sigmoid self.act = act() def forward(self, x): """ Forward pass for the entire MLP. Args: x (torch.Tensor): Input tensor. Returns: (torch.Tensor): Output tensor after MLP. """ for i, layer in enumerate(self.layers): x = getattr(self, "act", nn.ReLU())(layer(x)) if i < self.num_layers - 1 else layer(x) return x.sigmoid() if getattr(self, "sigmoid", False) else x class LayerNorm2d(nn.Module): """ 2D Layer Normalization module inspired by Detectron2 and ConvNeXt implementations. Original implementations in https://github.com/facebookresearch/detectron2/blob/main/detectron2/layers/batch_norm.py and https://github.com/facebookresearch/ConvNeXt/blob/main/models/convnext.py. Attributes: weight (nn.Parameter): Learnable scale parameter. bias (nn.Parameter): Learnable bias parameter. eps (float): Small constant for numerical stability. """ def __init__(self, num_channels, eps=1e-6): """ Initialize LayerNorm2d with the given parameters. Args: num_channels (int): Number of channels in the input. eps (float): Small constant for numerical stability. """ super().__init__() self.weight = nn.Parameter(torch.ones(num_channels)) self.bias = nn.Parameter(torch.zeros(num_channels)) self.eps = eps def forward(self, x): """ Perform forward pass for 2D layer normalization. Args: x (torch.Tensor): Input tensor. Returns: (torch.Tensor): Normalized output tensor. """ u = x.mean(1, keepdim=True) s = (x - u).pow(2).mean(1, keepdim=True) x = (x - u) / torch.sqrt(s + self.eps) return self.weight[:, None, None] * x + self.bias[:, None, None] class MSDeformAttn(nn.Module): """ Multiscale Deformable Attention Module based on Deformable-DETR and PaddleDetection implementations. https://github.com/fundamentalvision/Deformable-DETR/blob/main/models/ops/modules/ms_deform_attn.py Attributes: im2col_step (int): Step size for im2col operations. d_model (int): Model dimension. n_levels (int): Number of feature levels. n_heads (int): Number of attention heads. n_points (int): Number of sampling points per attention head per feature level. sampling_offsets (nn.Linear): Linear layer for generating sampling offsets. attention_weights (nn.Linear): Linear layer for generating attention weights. value_proj (nn.Linear): Linear layer for projecting values. output_proj (nn.Linear): Linear layer for projecting output. """ def __init__(self, d_model=256, n_levels=4, n_heads=8, n_points=4): """ Initialize MSDeformAttn with the given parameters. Args: d_model (int): Model dimension. n_levels (int): Number of feature levels. n_heads (int): Number of attention heads. n_points (int): Number of sampling points per attention head per feature level. """ super().__init__() if d_model % n_heads != 0: raise ValueError(f"d_model must be divisible by n_heads, but got {d_model} and {n_heads}") _d_per_head = d_model // n_heads # Better to set _d_per_head to a power of 2 which is more efficient in a CUDA implementation assert _d_per_head * n_heads == d_model, "`d_model` must be divisible by `n_heads`" self.im2col_step = 64 self.d_model = d_model self.n_levels = n_levels self.n_heads = n_heads self.n_points = n_points self.sampling_offsets = nn.Linear(d_model, n_heads * n_levels * n_points * 2) self.attention_weights = nn.Linear(d_model, n_heads * n_levels * n_points) self.value_proj = nn.Linear(d_model, d_model) self.output_proj = nn.Linear(d_model, d_model) self._reset_parameters() def _reset_parameters(self): """Reset module parameters.""" constant_(self.sampling_offsets.weight.data, 0.0) thetas = torch.arange(self.n_heads, dtype=torch.float32) * (2.0 * math.pi / self.n_heads) grid_init = torch.stack([thetas.cos(), thetas.sin()], -1) grid_init = ( (grid_init / grid_init.abs().max(-1, keepdim=True)[0]) .view(self.n_heads, 1, 1, 2) .repeat(1, self.n_levels, self.n_points, 1) ) for i in range(self.n_points): grid_init[:, :, i, :] *= i + 1 with torch.no_grad(): self.sampling_offsets.bias = nn.Parameter(grid_init.view(-1)) constant_(self.attention_weights.weight.data, 0.0) constant_(self.attention_weights.bias.data, 0.0) xavier_uniform_(self.value_proj.weight.data) constant_(self.value_proj.bias.data, 0.0) xavier_uniform_(self.output_proj.weight.data) constant_(self.output_proj.bias.data, 0.0) def forward(self, query, refer_bbox, value, value_shapes, value_mask=None): """ Perform forward pass for multiscale deformable attention. https://github.com/PaddlePaddle/PaddleDetection/blob/develop/ppdet/modeling/transformers/deformable_transformer.py Args: query (torch.Tensor): Tensor with shape [bs, query_length, C]. refer_bbox (torch.Tensor): Tensor with shape [bs, query_length, n_levels, 2], range in [0, 1], top-left (0,0), bottom-right (1, 1), including padding area. value (torch.Tensor): Tensor with shape [bs, value_length, C]. value_shapes (List): List with shape [n_levels, 2], [(H_0, W_0), (H_1, W_1), ..., (H_{L-1}, W_{L-1})]. value_mask (torch.Tensor, optional): Tensor with shape [bs, value_length], True for non-padding elements, False for padding elements. Returns: (torch.Tensor): Output tensor with shape [bs, Length_{query}, C]. """ bs, len_q = query.shape[:2] len_v = value.shape[1] assert sum(s[0] * s[1] for s in value_shapes) == len_v value = self.value_proj(value) if value_mask is not None: value = value.masked_fill(value_mask[..., None], float(0)) value = value.view(bs, len_v, self.n_heads, self.d_model // self.n_heads) sampling_offsets = self.sampling_offsets(query).view(bs, len_q, self.n_heads, self.n_levels, self.n_points, 2) attention_weights = self.attention_weights(query).view(bs, len_q, self.n_heads, self.n_levels * self.n_points) attention_weights = F.softmax(attention_weights, -1).view(bs, len_q, self.n_heads, self.n_levels, self.n_points) # N, Len_q, n_heads, n_levels, n_points, 2 num_points = refer_bbox.shape[-1] if num_points == 2: offset_normalizer = torch.as_tensor(value_shapes, dtype=query.dtype, device=query.device).flip(-1) add = sampling_offsets / offset_normalizer[None, None, None, :, None, :] sampling_locations = refer_bbox[:, :, None, :, None, :] + add elif num_points == 4: add = sampling_offsets / self.n_points * refer_bbox[:, :, None, :, None, 2:] * 0.5 sampling_locations = refer_bbox[:, :, None, :, None, :2] + add else: raise ValueError(f"Last dim of reference_points must be 2 or 4, but got {num_points}.") output = multi_scale_deformable_attn_pytorch(value, value_shapes, sampling_locations, attention_weights) return self.output_proj(output) class DeformableTransformerDecoderLayer(nn.Module): """ Deformable Transformer Decoder Layer inspired by PaddleDetection and Deformable-DETR implementations. https://github.com/PaddlePaddle/PaddleDetection/blob/develop/ppdet/modeling/transformers/deformable_transformer.py https://github.com/fundamentalvision/Deformable-DETR/blob/main/models/deformable_transformer.py Attributes: self_attn (nn.MultiheadAttention): Self-attention module. dropout1 (nn.Dropout): Dropout after self-attention. norm1 (nn.LayerNorm): Layer normalization after self-attention. cross_attn (MSDeformAttn): Cross-attention module. dropout2 (nn.Dropout): Dropout after cross-attention. norm2 (nn.LayerNorm): Layer normalization after cross-attention. linear1 (nn.Linear): First linear layer in the feedforward network. act (nn.Module): Activation function. dropout3 (nn.Dropout): Dropout in the feedforward network. linear2 (nn.Linear): Second linear layer in the feedforward network. dropout4 (nn.Dropout): Dropout after the feedforward network. norm3 (nn.LayerNorm): Layer normalization after the feedforward network. """ def __init__(self, d_model=256, n_heads=8, d_ffn=1024, dropout=0.0, act=nn.ReLU(), n_levels=4, n_points=4): """ Initialize the DeformableTransformerDecoderLayer with the given parameters. Args: d_model (int): Model dimension. n_heads (int): Number of attention heads. d_ffn (int): Dimension of the feedforward network. dropout (float): Dropout probability. act (nn.Module): Activation function. n_levels (int): Number of feature levels. n_points (int): Number of sampling points. """ super().__init__() # Self attention self.self_attn = nn.MultiheadAttention(d_model, n_heads, dropout=dropout) self.dropout1 = nn.Dropout(dropout) self.norm1 = nn.LayerNorm(d_model) # Cross attention self.cross_attn = MSDeformAttn(d_model, n_levels, n_heads, n_points) self.dropout2 = nn.Dropout(dropout) self.norm2 = nn.LayerNorm(d_model) # FFN self.linear1 = nn.Linear(d_model, d_ffn) self.act = act self.dropout3 = nn.Dropout(dropout) self.linear2 = nn.Linear(d_ffn, d_model) self.dropout4 = nn.Dropout(dropout) self.norm3 = nn.LayerNorm(d_model) @staticmethod def with_pos_embed(tensor, pos): """Add positional embeddings to the input tensor, if provided.""" return tensor if pos is None else tensor + pos def forward_ffn(self, tgt): """ Perform forward pass through the Feed-Forward Network part of the layer. Args: tgt (torch.Tensor): Input tensor. Returns: (torch.Tensor): Output tensor after FFN. """ tgt2 = self.linear2(self.dropout3(self.act(self.linear1(tgt)))) tgt = tgt + self.dropout4(tgt2) return self.norm3(tgt) def forward(self, embed, refer_bbox, feats, shapes, padding_mask=None, attn_mask=None, query_pos=None): """ Perform the forward pass through the entire decoder layer. Args: embed (torch.Tensor): Input embeddings. refer_bbox (torch.Tensor): Reference bounding boxes. feats (torch.Tensor): Feature maps. shapes (List): Feature shapes. padding_mask (torch.Tensor, optional): Padding mask. attn_mask (torch.Tensor, optional): Attention mask. query_pos (torch.Tensor, optional): Query position embeddings. Returns: (torch.Tensor): Output tensor after decoder layer. """ # Self attention q = k = self.with_pos_embed(embed, query_pos) tgt = self.self_attn(q.transpose(0, 1), k.transpose(0, 1), embed.transpose(0, 1), attn_mask=attn_mask)[ 0 ].transpose(0, 1) embed = embed + self.dropout1(tgt) embed = self.norm1(embed) # Cross attention tgt = self.cross_attn( self.with_pos_embed(embed, query_pos), refer_bbox.unsqueeze(2), feats, shapes, padding_mask ) embed = embed + self.dropout2(tgt) embed = self.norm2(embed) # FFN return self.forward_ffn(embed) class DeformableTransformerDecoder(nn.Module): """ Implementation of Deformable Transformer Decoder based on PaddleDetection. https://github.com/PaddlePaddle/PaddleDetection/blob/develop/ppdet/modeling/transformers/deformable_transformer.py Attributes: layers (nn.ModuleList): List of decoder layers. num_layers (int): Number of decoder layers. hidden_dim (int): Hidden dimension. eval_idx (int): Index of the layer to use during evaluation. """ def __init__(self, hidden_dim, decoder_layer, num_layers, eval_idx=-1): """ Initialize the DeformableTransformerDecoder with the given parameters. Args: hidden_dim (int): Hidden dimension. decoder_layer (nn.Module): Decoder layer module. num_layers (int): Number of decoder layers. eval_idx (int): Index of the layer to use during evaluation. """ super().__init__() self.layers = _get_clones(decoder_layer, num_layers) self.num_layers = num_layers self.hidden_dim = hidden_dim self.eval_idx = eval_idx if eval_idx >= 0 else num_layers + eval_idx def forward( self, embed, # decoder embeddings refer_bbox, # anchor feats, # image features shapes, # feature shapes bbox_head, score_head, pos_mlp, attn_mask=None, padding_mask=None, ): """ Perform the forward pass through the entire decoder. Args: embed (torch.Tensor): Decoder embeddings. refer_bbox (torch.Tensor): Reference bounding boxes. feats (torch.Tensor): Image features. shapes (List): Feature shapes. bbox_head (nn.Module): Bounding box prediction head. score_head (nn.Module): Score prediction head. pos_mlp (nn.Module): Position MLP. attn_mask (torch.Tensor, optional): Attention mask. padding_mask (torch.Tensor, optional): Padding mask. Returns: dec_bboxes (torch.Tensor): Decoded bounding boxes. dec_cls (torch.Tensor): Decoded classification scores. """ output = embed dec_bboxes = [] dec_cls = [] last_refined_bbox = None refer_bbox = refer_bbox.sigmoid() for i, layer in enumerate(self.layers): output = layer(output, refer_bbox, feats, shapes, padding_mask, attn_mask, pos_mlp(refer_bbox)) bbox = bbox_head[i](output) refined_bbox = torch.sigmoid(bbox + inverse_sigmoid(refer_bbox)) if self.training: dec_cls.append(score_head[i](output)) if i == 0: dec_bboxes.append(refined_bbox) else: dec_bboxes.append(torch.sigmoid(bbox + inverse_sigmoid(last_refined_bbox))) elif i == self.eval_idx: dec_cls.append(score_head[i](output)) dec_bboxes.append(refined_bbox) break last_refined_bbox = refined_bbox refer_bbox = refined_bbox.detach() if self.training else refined_bbox return torch.stack(dec_bboxes), torch.stack(dec_cls)