import torch
from torch import Tensor
from collections import OrderedDict
import torch.nn.functional as F
from torch import nn
from torch.jit.annotations import Tuple, List, DictclassBottleneck(nn.Module):expansion =4def__init__(self, in_channel, out_channel, stride=1, downsample=None, norm_layer=None):super(Bottleneck, self).__init__()if norm_layer isNone:norm_layer = nn.BatchNorm2dself.conv1 = nn.Conv2d(in_channels=in_channel, out_channels=out_channel,kernel_size=(1,1), stride=(1,1), bias=False)# squeeze channelsself.bn1 = norm_layer(out_channel)# -----------------------------------------self.conv2 = nn.Conv2d(in_channels=out_channel, out_channels=out_channel,kernel_size=(3,3), stride=(stride,stride), bias=False, padding=(1,1))self.bn2 = norm_layer(out_channel)# -----------------------------------------self.conv3 = nn.Conv2d(in_channels=out_channel, out_channels=out_channel * self.expansion,kernel_size=(1,1), stride=(1,1), bias=False)# unsqueeze channelsself.bn3 = norm_layer(out_channel * self.expansion)self.relu = nn.ReLU(inplace=True)self.downsample = downsampledefforward(self, x):identity = xif self.downsample isnotNone:identity = self.downsample(x)out = self.conv1(x)out = self.bn1(out)out = self.relu(out)out = self.conv2(out)out = self.bn2(out)out = self.relu(out)out = self.conv3(out)out = self.bn3(out)out += identityout = self.relu(out)return outclassResNet(nn.Module):def__init__(self, block, blocks_num, num_classes=1000, include_top=True, norm_layer=None):''':param block:塊:param blocks_num:塊數:param num_classes: 分類數:param include_top::param norm_layer: BN'''super(ResNet, self).__init__()if norm_layer isNone:norm_layer = nn.BatchNorm2dself._norm_layer = norm_layerself.include_top = include_topself.in_channel =64self.conv1 = nn.Conv2d(in_channels=3, out_channels=self.in_channel, kernel_size=(7,7), stride=(2,2),padding=(3,3), bias=False)self.bn1 = norm_layer(self.in_channel)self.relu = nn.ReLU(inplace=True)self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)self.layer1 = self._make_layer(block,64, blocks_num[0])self.layer2 = self._make_layer(block,128, blocks_num[1], stride=2)self.layer3 = self._make_layer(block,256, blocks_num[2], stride=2)self.layer4 = self._make_layer(block,512, blocks_num[3], stride=2)if self.include_top:self.avgpool = nn.AdaptiveAvgPool2d((1,1))# output size = (1, 1)self.fc = nn.Linear(512* block.expansion, num_classes)'''初始化'''for m in self.modules():ifisinstance(m, nn.Conv2d):nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')def_make_layer(self, block, channel, block_num, stride=1):norm_layer = self._norm_layerdownsample =Noneif stride !=1or self.in_channel != channel * block.expansion:downsample = nn.Sequential(nn.Conv2d(self.in_channel, channel * block.expansion, kernel_size=(1,1), stride=(stride,stride), bias=False),norm_layer(channel * block.expansion))layers =[]layers.append(block(self.in_channel, channel, downsample=downsample,stride=stride, norm_layer=norm_layer))self.in_channel = channel * block.expansionfor _ inrange(1, block_num):layers.append(block(self.in_channel, channel, norm_layer=norm_layer))return nn.Sequential(*layers)defforward(self, x):x = self.conv1(x)x = self.bn1(x)x = self.relu(x)x = self.maxpool(x)x = self.layer1(x)x = self.layer2(x)x = self.layer3(x)x = self.layer4(x)if self.include_top:x = self.avgpool(x)x = torch.flatten(x,1)x = self.fc(x)return xclassIntermediateLayerGetter(nn.ModuleDict):"""Module wrapper that returns intermediate layers from a modelsIt has a strong assumption that the modules have been registeredinto the models in the same order as they are used.This means that one should **not** reuse the same nn.Moduletwice in the forward if you want this to work.Additionally, it is only able to query submodules that are directlyassigned to the models. So if `models` is passed, `models.feature1` canbe returned, but not `models.feature1.layer2`.Arguments:model (nn.Module): models on which we will extract the featuresreturn_layers (Dict[name, new_name]): a dict containing the namesof the modules for which the activations will be returned asthe key of the dict, and the value of the dict is the nameof the returned activation (which the user can specify)."""__annotations__ ={"return_layers": Dict[str,str],}def__init__(self, model, return_layers):ifnotset(return_layers).issubset([name for name, _ in model.named_children()]):raise ValueError("return_layers are not present in models")# {'layer1': '0', 'layer2': '1', 'layer3': '2', 'layer4': '3'}orig_return_layers = return_layersreturn_layers ={k: v for k, v in return_layers.items()}layers = OrderedDict()# 遍歷模型子模塊按順序存入有序字典# 只保存layer4及其之前的結構,舍去之后不用的結構for name, module in model.named_children():layers[name]= moduleif name in return_layers:del return_layers[name]ifnot return_layers:breaksuper(IntermediateLayerGetter, self).__init__(layers)self.return_layers = orig_return_layersdefforward(self, x):out = OrderedDict()# 依次遍歷模型的所有子模塊,并進行正向傳播,# 收集layer1, layer2, layer3, layer4的輸出for name, module in self.named_children():x = module(x)if name in self.return_layers:out_name = self.return_layers[name]out[out_name]= xreturn outclassFeaturePyramidNetwork(nn.Module):"""Module that adds a FPN from on top of a set of feature maps. This is based on`"Feature Pyramid Network for Object Detection" <https://arxiv.org/abs/1612.03144>`_.The feature maps are currently supposed to be in increasing depthorder.The input to the models is expected to be an OrderedDict[Tensor], containingthe feature maps on top of which the FPN will be added.Arguments:in_channels_list (list[int]): number of channels for each feature map thatis passed to the moduleout_channels (int): number of channels of the FPN representationextra_blocks (ExtraFPNBlock or None): if provided, extra operations willbe performed. It is expected to take the fpn features, the originalfeatures and the names of the original features as input, and returnsa new list of feature maps and their corresponding names"""def__init__(self, in_channels_list, out_channels, extra_blocks=None):super(FeaturePyramidNetwork, self).__init__()# 用來調整resnet特征矩陣(layer1,2,3,4)的channel(kernel_size=1)self.inner_blocks = nn.ModuleList()# 對調整后的特征矩陣使用3x3的卷積核來得到對應的預測特征矩陣self.layer_blocks = nn.ModuleList()for in_channels in in_channels_list:if in_channels ==0:continueinner_block_module = nn.Conv2d(in_channels, out_channels,(1,1))layer_block_module = nn.Conv2d(out_channels, out_channels,(3,3), padding=(1,1))self.inner_blocks.append(inner_block_module)self.layer_blocks.append(layer_block_module)# initialize parameters now to avoid modifying the initialization of top_blocksfor m in self.children():ifisinstance(m, nn.Conv2d):nn.init.kaiming_uniform_(m.weight, a=1)nn.init.constant_(m.bias,0)self.extra_blocks = extra_blocksdefget_result_from_inner_blocks(self, x, idx):# type: (Tensor, int) -> Tensor"""This is equivalent to self.inner_blocks[idx](x),but torchscript doesn't support this yet"""num_blocks =len(self.inner_blocks)if idx <0:idx += num_blocksi =0out = xfor module in self.inner_blocks:if i == idx:out = module(x)i +=1return outdefget_result_from_layer_blocks(self, x, idx):# type: (Tensor, int) -> Tensor"""This is equivalent to self.layer_blocks[idx](x),but torchscript doesn't support this yet"""num_blocks =len(self.layer_blocks)if idx <0:idx += num_blocksi =0out = xfor module in self.layer_blocks:if i == idx:out = module(x)i +=1return outdefforward(self, x):# type: (Dict[str, Tensor]) -> Dict[str, Tensor]"""Computes the FPN for a set of feature maps.Arguments:x (OrderedDict[Tensor]): feature maps for each feature level.Returns:results (OrderedDict[Tensor]): feature maps after FPN layers.They are ordered from highest resolution first."""# unpack OrderedDict into two lists for easier handlingnames =list(x.keys())x =list(x.values())# 將resnet layer4的channel調整到指定的out_channels# last_inner = self.inner_blocks[-1](x[-1])last_inner = self.get_result_from_inner_blocks(x[-1],-1)# result中保存著每個預測特征層results =[]# 將layer4調整channel后的特征矩陣,通過3x3卷積后得到對應的預測特征矩陣# results.append(self.layer_blocks[-1](last_inner))results.append(self.get_result_from_layer_blocks(last_inner,-1))# 倒序遍歷resenet輸出特征層,以及對應inner_block和layer_block# layer3 -> layer2 -> layer1 (layer4已經處理過了)# for feature, inner_block, layer_block in zip(# x[:-1][::-1], self.inner_blocks[:-1][::-1], self.layer_blocks[:-1][::-1]# ):# if not inner_block:# continue# inner_lateral = inner_block(feature)# feat_shape = inner_lateral.shape[-2:]# inner_top_down = F.interpolate(last_inner, size=feat_shape, mode="nearest")# last_inner = inner_lateral + inner_top_down# results.insert(0, layer_block(last_inner))for idx inrange(len(x)-2,-1,-1):inner_lateral = self.get_result_from_inner_blocks(x[idx], idx)feat_shape = inner_lateral.shape[-2:]inner_top_down = F.interpolate(last_inner, size=feat_shape, mode="nearest")last_inner = inner_lateral + inner_top_downresults.insert(0, self.get_result_from_layer_blocks(last_inner, idx))# 在layer4對應的預測特征層基礎上生成預測特征矩陣5if self.extra_blocks isnotNone:results, names = self.extra_blocks(results, names)# make it back an OrderedDictout = OrderedDict([(k, v)for k, v inzip(names, results)])return outclassLastLevelMaxPool(torch.nn.Module):"""Applies a max_pool2d on top of the last feature map"""defforward(self, x, names):# type: (List[Tensor], List[str]) -> Tuple[List[Tensor], List[str]]names.append("pool")x.append(F.max_pool2d(x[-1],1,2,0))return x, namesclassBackboneWithFPN(nn.Module):"""Adds a FPN on top of a models.Internally, it uses torchvision.models._utils.IntermediateLayerGetter toextract a submodel that returns the feature maps specified in return_layers.The same limitations of IntermediatLayerGetter apply here.Arguments:backbone (nn.Module)return_layers (Dict[name, new_name]): a dict containing the namesof the modules for which the activations will be returned asthe key of the dict, and the value of the dict is the nameof the returned activation (which the user can specify).in_channels_list (List[int]): number of channels for each feature mapthat is returned, in the order they are present in the OrderedDictout_channels (int): number of channels in the FPN.Attributes:out_channels (int): the number of channels in the FPN"""def__init__(self, backbone, return_layers, in_channels_list, out_channels):''':param backbone: 特征層:param return_layers: 返回的層數:param in_channels_list: 輸入通道數:param out_channels: 輸出通道數'''super(BackboneWithFPN, self).__init__()'返回有序字典模型'self.body = IntermediateLayerGetter(backbone, return_layers=return_layers)self.fpn = FeaturePyramidNetwork(in_channels_list=in_channels_list,out_channels=out_channels,extra_blocks=LastLevelMaxPool(),)# super(BackboneWithFPN, self).__init__(OrderedDict(# [("body", body), ("fpn", fpn)]))self.out_channels = out_channelsdefforward(self, x):x = self.body(x)x = self.fpn(x)return xdefresnet50_fpn_backbone():# FrozenBatchNorm2d的功能與BatchNorm2d類似,但參數無法更新# norm_layer=misc.FrozenBatchNorm2dresnet_backbone = ResNet(Bottleneck,[3,4,6,3],include_top=False)# freeze layers# 凍結layer1及其之前的所有底層權重(基礎通用特征)for name, parameter in resnet_backbone.named_parameters():if'layer2'notin name and'layer3'notin name and'layer4'notin name:'''凍結權重,不參與訓練'''parameter.requires_grad_(False)# 字典名字return_layers ={'layer1':'0','layer2':'1','layer3':'2','layer4':'3'}# in_channel 為layer4的輸出特征矩陣channel = 2048in_channels_stage2 = resnet_backbone.in_channel //8in_channels_list =[in_channels_stage2,# layer1 out_channel=256in_channels_stage2 *2,# layer2 out_channel=512in_channels_stage2 *4,# layer3 out_channel=1024in_channels_stage2 *8,# layer4 out_channel=2048]out_channels =256return BackboneWithFPN(resnet_backbone, return_layers, in_channels_list, out_channels)if __name__ =='__main__':net = resnet50_fpn_backbone()x = torch.randn(1,3,224,224)for key,value in net(x).items():print(key,value.shape)
騰訊云的一個分析,明顯是看了這個論文和草案的 : 最新的是應該是這個 A Google Congestion Control Algorithm for Real-Time Communication draft-ietf-rmcat-gcc-02 下面的這個應該過期了: draft-alvestrand-rmcat-congestion-03