微調基于 torchvision 0.3的目標檢測模型
"""
為數據集編寫類
"""
import os
import numpy as np
import torch
from PIL import Imageclass PennFudanDataset ( object ) : def __init__ ( self, root, transforms) : self. root = rootself. transforms = transformsself. imgs = list ( sorted ( os. listdir( os. path. join( root, "PNGImages" ) ) ) ) self. masks = list ( sorted ( os. listdir( os. path. join( root, "PedMasks" ) ) ) ) def __getitem__ ( self, idx) : img_path = os. path. join( self. root, "PNGImages" , self. imgs[ idx] ) mask_path = os. path. join( self. root, "PedMasks" , self. masks[ idx] ) img = Image. open ( img_path) . convert( "RGB" ) mask = Image. open ( mask_path) mask = np. array( mask) obj_ids = np. unique( mask) obj_ids = obj_ids[ 1 : ] masks = mask == obj_ids[ : , None , None ] num_objs = len ( obj_ids) boxes = [ ] for i in range ( num_objs) : pos = np. where( masks[ i] ) xmin = np. min ( pos[ 1 ] ) xmax = np. max ( pos[ 1 ] ) ymin = np. min ( pos[ 0 ] ) ymax = np. max ( pos[ 0 ] ) boxes. append( [ xmin, ymin, xmax, ymax] ) boxes = torch. as_tensor( boxes, dtype= torch. float32) labels = torch. ones( ( num_objs, ) , dtype= torch. int64) masks = torch. as_tensor( masks, dtype= torch. uint8) image_id = torch. tensor( [ idx] ) area = ( boxes[ : , 3 ] - boxes[ : , 1 ] ) * ( boxes[ : , 2 ] - boxes[ : , 0 ] ) iscrowd = torch. zeros( ( num_objs, ) , dtype= torch. int64) target = { } target[ "boxes" ] = boxestarget[ "labels" ] = labelstarget[ "masks" ] = maskstarget[ "image_id" ] = image_idtarget[ "area" ] = areatarget[ "iscrowd" ] = iscrowdif self. transforms is not None : img, target = self. transforms( img, target) return img, targetdef __len__ ( self) : return len ( self. imgs) """
第一個是我們想要從預先訓練的模型開始,然后微調最后一層。
另一種是當我們想要用不同的模型替換模型的主干時(例如,用于更快的預測)。
下面是對這兩種情況的處理。
"""
"""
PennFudan 數據集的實例分割模型
"""
import torchvision
from torchvision. models. detection. faster_rcnn import FastRCNNPredictor
from torchvision. models. detection. mask_rcnn import MaskRCNNPredictordef get_model_instance_segmentation ( num_classes) : model = torchvision. models. detection. maskrcnn_resnet50_fpn( pretrained= True ) in_features = model. roi_heads. box_predictor. cls_score. in_featuresmodel. roi_heads. box_predictor = FastRCNNPredictor( in_features, num_classes) in_features_mask = model. roi_heads. mask_predictor. conv5_mask. in_channelshidden_layer = 256 model. roi_heads. mask_predictor = MaskRCNNPredictor( in_features_mask, hidden_layer, num_classes) return model"""
為數據擴充/轉換編寫輔助函數:
"""
import transforms as Tdef get_transform ( train) : transforms = [ ] transforms. append( T. ToTensor( ) ) if train: transforms. append( T. RandomHorizontalFlip( 0.5 ) ) return T. Compose( transforms) """
編寫執行訓練和驗證的主要功能
"""
from engine import train_one_epoch, evaluate
import utilsdef main ( ) : device = torch. device( 'cuda' ) if torch. cuda. is_available( ) else torch. device( 'cpu' ) num_classes = 2 dataset = PennFudanDataset( 'PennFudanPed' , get_transform( train= True ) ) dataset_test = PennFudanDataset( 'PennFudanPed' , get_transform( train= False ) ) indices = torch. randperm( len ( dataset) ) . tolist( ) dataset = torch. utils. data. Subset( dataset, indices[ : - 50 ] ) dataset_test = torch. utils. data. Subset( dataset_test, indices[ - 50 : ] ) data_loader = torch. utils. data. DataLoader( dataset, batch_size= 2 , shuffle= True , num_workers= 4 , collate_fn= utils. collate_fn) data_loader_test = torch. utils. data. DataLoader( dataset_test, batch_size= 1 , shuffle= False , num_workers= 4 , collate_fn= utils. collate_fn) model = get_model_instance_segmentation( num_classes) model. to( device) params = [ p for p in model. parameters( ) if p. requires_grad] optimizer = torch. optim. SGD( params, lr= 0.005 , momentum= 0.9 , weight_decay= 0.0005 ) lr_scheduler = torch. optim. lr_scheduler. StepLR( optimizer, step_size= 3 , gamma= 0.1 ) num_epochs = 10 for epoch in range ( num_epochs) : train_one_epoch( model, optimizer, data_loader, device, epoch, print_freq= 10 ) lr_scheduler. step( ) evaluate( model, data_loader_test, device= device) print ( "That's it!" )
微調 Torchvision 模型
from __future__ import print_function
from __future__ import division
import torch
import torch. nn as nn
import torch. optim as optim
import numpy as np
import torchvision
from torchvision import datasets, models, transforms
import matplotlib. pyplot as plt
import time
import os
import copy"""
輸入
"""
data_dir = "./data/hymenoptera_data"
model_name = "squeezenet"
num_classes = 2
batch_size = 8
num_epochs = 15
feature_extract = True """
輔助函數
"""
def train_model ( model, dataloaders, criterion, optimizer, num_epochs= 25 , is_inception= False ) : since = time. time( ) val_acc_history = [ ] best_model_wts = copy. deepcopy( model. state_dict( ) ) best_acc = 0.0 for epoch in range ( num_epochs) : print ( 'Epoch {}/{}' . format ( epoch, num_epochs - 1 ) ) print ( '-' * 10 ) for phase in [ 'train' , 'val' ] : if phase == 'train' : model. train( ) else : model. eval ( ) running_loss = 0.0 running_corrects = 0 for inputs, labels in dataloaders[ phase] : inputs = inputs. to( device) labels = labels. to( device) optimizer. zero_grad( ) with torch. set_grad_enabled( phase == 'train' ) : if is_inception and phase == 'train' : outputs, aux_outputs = model( inputs) loss1 = criterion( outputs, labels) loss2 = criterion( aux_outputs, labels) loss = loss1 + 0.4 * loss2else : outputs = model( inputs) loss = criterion( outputs, labels) _, preds = torch. max ( outputs, 1 ) if phase == 'train' : loss. backward( ) optimizer. step( ) running_loss += loss. item( ) * inputs. size( 0 ) running_corrects += torch. sum ( preds == labels. data) epoch_loss = running_loss / len ( dataloaders[ phase] . dataset) epoch_acc = running_corrects. double( ) / len ( dataloaders[ phase] . dataset) print ( '{} Loss: {:.4f} Acc: {:.4f}' . format ( phase, epoch_loss, epoch_acc) ) if phase == 'val' and epoch_acc > best_acc: best_acc = epoch_accbest_model_wts = copy. deepcopy( model. state_dict( ) ) if phase == 'val' : val_acc_history. append( epoch_acc) print ( ) time_elapsed = time. time( ) - sinceprint ( 'Training complete in {:.0f}m {:.0f}s' . format ( time_elapsed // 60 , time_elapsed % 60 ) ) print ( 'Best val Acc: {:4f}' . format ( best_acc) ) model. load_state_dict( best_model_wts) return model, val_acc_historydef set_parameter_requires_grad ( model, feature_extracting) : if feature_extracting: for param in model. parameters( ) : param. requires_grad = False """
初始化和重塑網絡
當進行特征提取時,我們只想更新最后一層的參數,換句話說,我們只想更新我們正在重塑層的參數。
因此,我們不需要計算不需要改變 的參數的梯度,因此為了提高效率,我們將其它層的.requires_grad屬性設置為False。
這很重要,因為默認情況下,此屬性設置為True。 然后,當我們初始化新層時,默認情況下新參數.requires_grad = True,因此只更新新層的參數。
當我們進行微調時,我們可以將所有 .required_grad設置為默認值True。
"""
"""
重塑代碼
""" def initialize_model ( model_name, num_classes, feature_extract, use_pretrained= True ) : model_ft = None input_size = 0 if model_name == "resnet" : """ Resnet18""" model_ft = models. resnet18( pretrained= use_pretrained) set_parameter_requires_grad( model_ft, feature_extract) num_ftrs = model_ft. fc. in_featuresmodel_ft. fc = nn. Linear( num_ftrs, num_classes) input_size = 224 elif model_name == "alexnet" : """ Alexnet""" model_ft = models. alexnet( pretrained= use_pretrained) set_parameter_requires_grad( model_ft, feature_extract) num_ftrs = model_ft. classifier[ 6 ] . in_featuresmodel_ft. classifier[ 6 ] = nn. Linear( num_ftrs, num_classes) input_size = 224 elif model_name == "vgg" : """ VGG11_bn""" model_ft = models. vgg11_bn( pretrained= use_pretrained) set_parameter_requires_grad( model_ft, feature_extract) num_ftrs = model_ft. classifier[ 6 ] . in_featuresmodel_ft. classifier[ 6 ] = nn. Linear( num_ftrs, num_classes) input_size = 224 elif model_name == "squeezenet" : """ Squeezenet""" model_ft = models. squeezenet1_0( pretrained= use_pretrained) set_parameter_requires_grad( model_ft, feature_extract) model_ft. classifier[ 1 ] = nn. Conv2d( 512 , num_classes, kernel_size= ( 1 , 1 ) , stride= ( 1 , 1 ) ) model_ft. num_classes = num_classesinput_size = 224 elif model_name == "densenet" : """ Densenet""" model_ft = models. densenet121( pretrained= use_pretrained) set_parameter_requires_grad( model_ft, feature_extract) num_ftrs = model_ft. classifier. in_featuresmodel_ft. classifier = nn. Linear( num_ftrs, num_classes) input_size = 224 elif model_name == "inception" : """ Inception v3Be careful, expects (299,299) sized images and has auxiliary output""" model_ft = models. inception_v3( pretrained= use_pretrained) set_parameter_requires_grad( model_ft, feature_extract) num_ftrs = model_ft. AuxLogits. fc. in_featuresmodel_ft. AuxLogits. fc = nn. Linear( num_ftrs, num_classes) num_ftrs = model_ft. fc. in_featuresmodel_ft. fc = nn. Linear( num_ftrs, num_classes) input_size = 299 else : print ( "Invalid model name, exiting..." ) exit( ) return model_ft, input_size
model_ft, input_size = initialize_model( model_name, num_classes, feature_extract, use_pretrained= True )
print ( model_ft) """
加載模型
"""
data_transforms = { 'train' : transforms. Compose( [ transforms. RandomResizedCrop( input_size) , transforms. RandomHorizontalFlip( ) , transforms. ToTensor( ) , transforms. Normalize( [ 0.485 , 0.456 , 0.406 ] , [ 0.229 , 0.224 , 0.225 ] ) ] ) , 'val' : transforms. Compose( [ transforms. Resize( input_size) , transforms. CenterCrop( input_size) , transforms. ToTensor( ) , transforms. Normalize( [ 0.485 , 0.456 , 0.406 ] , [ 0.229 , 0.224 , 0.225 ] ) ] ) ,
} print ( "Initializing Datasets and Dataloaders..." )
image_datasets = { x: datasets. ImageFolder( os. path. join( data_dir, x) , data_transforms[ x] ) for x in [ 'train' , 'val' ] }
dataloaders_dict = { x: torch. utils. data. DataLoader( image_datasets[ x] , batch_size= batch_size, shuffle= True , num_workers= 4 ) for x in [ 'train' , 'val' ] }
device = torch. device( "cuda:0" if torch. cuda. is_available( ) else "cpu" ) """
創建優化器
"""
model_ft = model_ft. to( device)
params_to_update = model_ft. parameters( )
print ( "Params to learn:" )
if feature_extract: params_to_update = [ ] for name, param in model_ft. named_parameters( ) : if param. requires_grad == True : params_to_update. append( param) print ( "\t" , name)
else : for name, param in model_ft. named_parameters( ) : if param. requires_grad == True : print ( "\t" , name)
optimizer_ft = optim. SGD( params_to_update, lr= 0.001 , momentum= 0.9 ) """
運行訓練和驗證
"""
criterion = nn. CrossEntropyLoss( )
model_ft, hist = train_model( model_ft, dataloaders_dict, criterion, optimizer_ft, num_epochs= num_epochs, is_inception= ( model_name== "inception" ) ) """
對比從頭開始模型
"""
scratch_model, _ = initialize_model( model_name, num_classes, feature_extract= False , use_pretrained= False )
scratch_model = scratch_model. to( device)
scratch_optimizer = optim. SGD( scratch_model. parameters( ) , lr= 0.001 , momentum= 0.9 )
scratch_criterion = nn. CrossEntropyLoss( )
_, scratch_hist = train_model( scratch_model, dataloaders_dict, scratch_criterion, scratch_optimizer, num_epochs= num_epochs, is_inception= ( model_name== "inception" ) )
ohist = [ ]
shist = [ ] ohist = [ h. cpu( ) . numpy( ) for h in hist]
shist = [ h. cpu( ) . numpy( ) for h in scratch_hist] plt. title( "Validation Accuracy vs. Number of Training Epochs" )
plt. xlabel( "Training Epochs" )
plt. ylabel( "Validation Accuracy" )
plt. plot( range ( 1 , num_epochs+ 1 ) , ohist, label= "Pretrained" )
plt. plot( range ( 1 , num_epochs+ 1 ) , shist, label= "Scratch" )
plt. ylim( ( 0 , 1 . ) )
plt. xticks( np. arange( 1 , num_epochs+ 1 , 1.0 ) )
plt. legend( )
plt. show( )
空間變換器網絡
from __future__ import print_function
import torch
import torch. nn as nn
import torch. nn. functional as F
import torch. optim as optim
import torchvision
from torchvision import datasets, transforms
import matplotlib. pyplot as plt
import numpy as npplt. ion( ) """
加載數據
""" device = torch. device( "cuda" if torch. cuda. is_available( ) else "cpu" )
train_loader = torch. utils. data. DataLoader( datasets. MNIST( root= './data/mnist/MNIST' , train= True , download= False , transform= transforms. Compose( [ transforms. ToTensor( ) , transforms. Normalize( ( 0.1307 , ) , ( 0.3081 , ) ) ] ) ) , batch_size= 64 , shuffle= True , num_workers= 4 )
test_loader = torch. utils. data. DataLoader( datasets. MNIST( root= './data/mnist/MNIST' , train= False , transform= transforms. Compose( [ transforms. ToTensor( ) , transforms. Normalize( ( 0.1307 , ) , ( 0.3081 , ) ) ] ) ) , batch_size= 64 , shuffle= True , num_workers= 4 ) """
空間變換器網絡:
結構:
本地網絡(Localisation Network)是常規CNN,其對變換參數進行回歸。不會從該數據集中明確地學習轉換,而是網絡自動學習增強 全局準確性的空間變換。
網格生成器( Grid Genator)在輸入圖像中生成與輸出圖像中的每個像素相對應的坐標網格。
采樣器(Sampler)使用變換的參數并將其應用于輸入圖像。
"""
class Net ( nn. Module) : def __init__ ( self) : super ( Net, self) . __init__( ) self. conv1 = nn. Conv2d( 1 , 10 , kernel_size= 5 ) self. conv2 = nn. Conv2d( 10 , 20 , kernel_size= 5 ) self. conv2_drop = nn. Dropout2d( ) self. fc1 = nn. Linear( 320 , 50 ) self. fc2 = nn. Linear( 50 , 10 ) self. localization = nn. Sequential( nn. Conv2d( 1 , 8 , kernel_size= 7 ) , nn. MaxPool2d( 2 , stride= 2 ) , nn. ReLU( True ) , nn. Conv2d( 8 , 10 , kernel_size= 5 ) , nn. MaxPool2d( 2 , stride= 2 ) , nn. ReLU( True ) ) self. fc_loc = nn. Sequential( nn. Linear( 10 * 3 * 3 , 32 ) , nn. ReLU( True ) , nn. Linear( 32 , 3 * 2 ) ) self. fc_loc[ 2 ] . weight. data. zero_( ) self. fc_loc[ 2 ] . bias. data. copy_( torch. tensor( [ 1 , 0 , 0 , 0 , 1 , 0 ] , dtype= torch. float ) ) def stn ( self, x) : xs = self. localization( x) xs = xs. view( - 1 , 10 * 3 * 3 ) theta = self. fc_loc( xs) theta = theta. view( - 1 , 2 , 3 ) grid = F. affine_grid( theta, x. size( ) ) x = F. grid_sample( x, grid) return xdef forward ( self, x) : x = self. stn( x) x = F. relu( F. max_pool2d( self. conv1( x) , 2 ) ) x = F. relu( F. max_pool2d( self. conv2_drop( self. conv2( x) ) , 2 ) ) x = x. view( - 1 , 320 ) x = F. relu( self. fc1( x) ) x = F. dropout( x, training= self. training) x = self. fc2( x) return F. log_softmax( x, dim= 1 ) model = Net( ) . to( device) """
訓練模型
""" optimizer = optim. SGD( model. parameters( ) , lr= 0.01 ) def train ( epoch) : model. train( ) for batch_idx, ( data, target) in enumerate ( train_loader) : data, target = data. to( device) , target. to( device) optimizer. zero_grad( ) output = model( data) loss = F. nll_loss( output, target) loss. backward( ) optimizer. step( ) if batch_idx % 500 == 0 : print ( 'Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}' . format ( epoch, batch_idx * len ( data) , len ( train_loader. dataset) , 100 . * batch_idx / len ( train_loader) , loss. item( ) ) )
def test ( ) : with torch. no_grad( ) : model. eval ( ) test_loss = 0 correct = 0 for data, target in test_loader: data, target = data. to( device) , target. to( device) output = model( data) test_loss += F. nll_loss( output, target, size_average= False ) . item( ) pred = output. max ( 1 , keepdim= True ) [ 1 ] correct += pred. eq( target. view_as( pred) ) . sum ( ) . item( ) test_loss /= len ( test_loader. dataset) print ( '\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n' . format ( test_loss, correct, len ( test_loader. dataset) , 100 . * correct / len ( test_loader. dataset) ) ) """
可視化 STN 結果
"""
def convert_image_np ( inp) : """Convert a Tensor to numpy image.""" inp = inp. numpy( ) . transpose( ( 1 , 2 , 0 ) ) mean = np. array( [ 0.485 , 0.456 , 0.406 ] ) std = np. array( [ 0.229 , 0.224 , 0.225 ] ) inp = std * inp + meaninp = np. clip( inp, 0 , 1 ) return inp
def visualize_stn ( ) : with torch. no_grad( ) : data = next ( iter ( test_loader) ) [ 0 ] . to( device) input_tensor = data. cpu( ) transformed_input_tensor = model. stn( data) . cpu( ) in_grid = convert_image_np( torchvision. utils. make_grid( input_tensor) ) out_grid = convert_image_np( torchvision. utils. make_grid( transformed_input_tensor) ) f, axarr = plt. subplots( 1 , 2 ) axarr[ 0 ] . imshow( in_grid) axarr[ 0 ] . set_title( 'Dataset Images' ) axarr[ 1 ] . imshow( out_grid) axarr[ 1 ] . set_title( 'Transformed Images' ) for epoch in range ( 1 , 20 + 1 ) : train( epoch) test( )
visualize_stn( ) plt. ioff( )
plt. show( )
使用 PyTorch 進行 Neural-Transfer
from __future__ import print_functionimport torch
import torch. nn as nn
import torch. nn. functional as F
import torch. optim as optimfrom PIL import Image
import matplotlib. pyplot as pltimport torchvision. transforms as transforms
import torchvision. models as modelsimport copy"""
基本原理
我們定義兩個間距,一個用于內容D_C,另一個用于風格D_S。D_C測量兩張圖片內容的不同,而D_S用來測量兩張圖片風格的不同。
然后,我們輸入第三張圖片,并改變這張圖片,使其與內容圖片的內容間距和風格圖片的風格間距最小化。
開始圖像風格轉換。
""" device = torch. device( "cuda" if torch. cuda. is_available( ) else "cpu" ) """
加載圖片
原始的PIL圖片的值介于0到255之間,但是當轉換成torch張量時,它們的值被轉換成0到1之間。
圖片也需要被重設成相同的維度。
一個重要的細節是,注意torch庫中的神經網絡用來訓練的張量的值為0到1之間。
如果你嘗試將0到255的張量圖片加載到神經網絡,然后激活的特征映射將不能偵測到目標內容和風格。
然而,Caffe庫中的預訓練網絡用來訓練的張量值為0到255之間的圖片。
"""
imsize = 512 if torch. cuda. is_available( ) else 128 loader = transforms. Compose( [ transforms. Resize( imsize) , transforms. ToTensor( ) ] ) def image_loader ( image_name) : image = Image. open ( image_name) image = loader( image) . unsqueeze( 0 ) return image. to( device, torch. float ) style_img = image_loader( "./data/images//neural-style/picasso.jpg" )
content_img = image_loader( "./data/images//neural-style/dancing.jpg" ) assert style_img. size( ) == content_img. size( ) , \"我們需要導入相同大小的樣式和內容圖像"
unloader = transforms. ToPILImage( ) plt. ion( ) def imshow ( tensor, title= None ) : image = tensor. cpu( ) . clone( ) image = image. squeeze( 0 ) image = unloader( image) plt. imshow( image) if title is not None : plt. title( title) plt. pause( 0.001 ) plt. figure( )
imshow( style_img, title= 'Style Image' ) plt. figure( )
imshow( content_img, title= 'Content Image' ) """
內容損失
內容損失是一個表示一層內容間距的加權版本。
""" class ContentLoss ( nn. Module) : def __init__ ( self, target, ) : super ( ContentLoss, self) . __init__( ) self. target = target. detach( ) def forward ( self, input ) : self. loss = F. mse_loss( input , self. target) return input """
風格損失
它要作為一個網絡中的透明層,來計算相應層的風格損失
""" def gram_matrix ( input ) : a, b, c, d = input . size( ) features = input . view( a * b, c * d) G = torch. mm( features, features. t( ) ) return G. div( a * b * c * d) class StyleLoss ( nn. Module) : def __init__ ( self, target_feature) : super ( StyleLoss, self) . __init__( ) self. target = gram_matrix( target_feature) . detach( ) def forward ( self, input ) : G = gram_matrix( input ) self. loss = F. mse_loss( G, self. target) return input """
導入模型
""" cnn = models. vgg19( pretrained= True ) . features. to( device) . eval ( ) """
圖片預處理
"""
cnn_normalization_mean = torch. tensor( [ 0.485 , 0.456 , 0.406 ] ) . to( device)
cnn_normalization_std = torch. tensor( [ 0.229 , 0.224 , 0.225 ] ) . to( device)
class Normalization ( nn. Module) : def __init__ ( self, mean, std) : super ( Normalization, self) . __init__( ) self. mean = torch. tensor( mean) . view( - 1 , 1 , 1 ) self. std = torch. tensor( std) . view( - 1 , 1 , 1 ) def forward ( self, img) : return ( img - self. mean) / self. std"""
創建一個新的Sequential模型,并正確的插入內容損失和風格損失模型。
"""
content_layers_default = [ 'conv_4' ]
style_layers_default = [ 'conv_1' , 'conv_2' , 'conv_3' , 'conv_4' , 'conv_5' ] def get_style_model_and_losses ( cnn, normalization_mean, normalization_std, style_img, content_img, content_layers= content_layers_default, style_layers= style_layers_default) : cnn = copy. deepcopy( cnn) normalization = Normalization( normalization_mean, normalization_std) . to( device) content_losses = [ ] style_losses = [ ] model = nn. Sequential( normalization) i = 0 for layer in cnn. children( ) : if isinstance ( layer, nn. Conv2d) : i += 1 name = 'conv_{}' . format ( i) elif isinstance ( layer, nn. ReLU) : name = 'relu_{}' . format ( i) layer = nn. ReLU( inplace= False ) elif isinstance ( layer, nn. MaxPool2d) : name = 'pool_{}' . format ( i) elif isinstance ( layer, nn. BatchNorm2d) : name = 'bn_{}' . format ( i) else : raise RuntimeError( 'Unrecognized layer: {}' . format ( layer. __class__. __name__) ) model. add_module( name, layer) if name in content_layers: target = model( content_img) . detach( ) content_loss = ContentLoss( target) model. add_module( "content_loss_{}" . format ( i) , content_loss) content_losses. append( content_loss) if name in style_layers: target_feature = model( style_img) . detach( ) style_loss = StyleLoss( target_feature) model. add_module( "style_loss_{}" . format ( i) , style_loss) style_losses. append( style_loss) for i in range ( len ( model) - 1 , - 1 , - 1 ) : if isinstance ( model[ i] , ContentLoss) or isinstance ( model[ i] , StyleLoss) : break model = model[ : ( i + 1 ) ] return model, style_losses, content_losses"""
選擇輸入圖片
""" input_img = content_img. clone( )
plt. figure( )
imshow( input_img, title= 'Input Image' ) """
梯度下降
""" def get_input_optimizer ( input_img) : optimizer = optim. LBFGS( [ input_img. requires_grad_( ) ] ) return optimizer"""
每次網絡運行的時候將輸 入的值矯正到0到1之間
""" def run_style_transfer ( cnn, normalization_mean, normalization_std, content_img, style_img, input_img, num_steps= 300 , style_weight= 1000000 , content_weight= 1 ) : """Run the style transfer.""" print ( 'Building the style transfer model..' ) model, style_losses, content_losses = get_style_model_and_losses( cnn, normalization_mean, normalization_std, style_img, content_img) optimizer = get_input_optimizer( input_img) print ( 'Optimizing..' ) run = [ 0 ] while run[ 0 ] <= num_steps: def closure ( ) : input_img. data. clamp_( 0 , 1 ) optimizer. zero_grad( ) model( input_img) style_score = 0 content_score = 0 for sl in style_losses: style_score += sl. lossfor cl in content_losses: content_score += cl. lossstyle_score *= style_weightcontent_score *= content_weightloss = style_score + content_scoreloss. backward( ) run[ 0 ] += 1 if run[ 0 ] % 50 == 0 : print ( "run {}:" . format ( run) ) print ( 'Style Loss : {:4f} Content Loss: {:4f}' . format ( style_score. item( ) , content_score. item( ) ) ) print ( ) return style_score + content_scoreoptimizer. step( closure) input_img. data. clamp_( 0 , 1 ) return input_img"""
運行這個算法。
""" output = run_style_transfer( cnn, cnn_normalization_mean, cnn_normalization_std, content_img, style_img, input_img) plt. figure( )
imshow( output, title= 'Output Image' )
plt. ioff( )
plt. show( )
生成對抗示例
from __future__ import print_function
import torch
import torch. nn as nn
import torch. nn. functional as F
import torch. optim as optim
from torchvision import datasets, transforms
import numpy as np
import matplotlib. pyplot as plt"""
輸入
"""
epsilons = [ 0 , .05 , .1 , .15 , .2 , .25 , .3 ]
pretrained_model = "data/lenet_mnist_model.pth"
use_cuda= True """
被攻擊的模型
"""
class Net ( nn. Module) : def __init__ ( self) : super ( Net, self) . __init__( ) self. conv1 = nn. Conv2d( 1 , 10 , kernel_size= 5 ) self. conv2 = nn. Conv2d( 10 , 20 , kernel_size= 5 ) self. conv2_drop = nn. Dropout2d( ) self. fc1 = nn. Linear( 320 , 50 ) self. fc2 = nn. Linear( 50 , 10 ) def forward ( self, x) : x = F. relu( F. max_pool2d( self. conv1( x) , 2 ) ) x = F. relu( F. max_pool2d( self. conv2_drop( self. conv2( x) ) , 2 ) ) x = x. view( - 1 , 320 ) x = F. relu( self. fc1( x) ) x = F. dropout( x, training= self. training) x = self. fc2( x) return F. log_softmax( x, dim= 1 )
test_loader = torch. utils. data. DataLoader( datasets. MNIST( '../data' , train= False , download= True , transform= transforms. Compose( [ transforms. ToTensor( ) , ] ) ) , batch_size= 1 , shuffle= True )
print ( "CUDA Available: " , torch. cuda. is_available( ) )
device = torch. device( "cuda" if ( use_cuda and torch. cuda. is_available( ) ) else "cpu" )
model = Net( ) . to( device)
model. load_state_dict( torch. load( pretrained_model, map_location= 'cpu' ) )
model. eval ( ) """
FGSM算法攻擊
"""
def fgsm_attack ( image, epsilon, data_grad) : sign_data_grad = data_grad. sign( ) perturbed_image = image + epsilon* sign_data_gradperturbed_image = torch. clamp( perturbed_image, 0 , 1 ) return perturbed_image"""
測試函數
"""
def test ( model, device, test_loader, epsilon ) : correct = 0 adv_examples = [ ] for data, target in test_loader: data, target = data. to( device) , target. to( device) data. requires_grad = True output = model( data) init_pred = output. max ( 1 , keepdim= True ) [ 1 ] if init_pred. item( ) != target. item( ) : continue loss = F. nll_loss( output, target) model. zero_grad( ) loss. backward( ) data_grad = data. grad. dataperturbed_data = fgsm_attack( data, epsilon, data_grad) output = model( perturbed_data) final_pred = output. max ( 1 , keepdim= True ) [ 1 ] if final_pred. item( ) == target. item( ) : correct += 1 if ( epsilon == 0 ) and ( len ( adv_examples) < 5 ) : adv_ex = perturbed_data. squeeze( ) . detach( ) . cpu( ) . numpy( ) adv_examples. append( ( init_pred. item( ) , final_pred. item( ) , adv_ex) ) else : if len ( adv_examples) < 5 : adv_ex = perturbed_data. squeeze( ) . detach( ) . cpu( ) . numpy( ) adv_examples. append( ( init_pred. item( ) , final_pred. item( ) , adv_ex) ) final_acc = correct/ float ( len ( test_loader) ) print ( "Epsilon: {}\tTest Accuracy = {} / {} = {}" . format ( epsilon, correct, len ( test_loader) , final_acc) ) return final_acc, adv_examples"""
運行攻擊
"""
accuracies = [ ]
examples = [ ]
for eps in epsilons: acc, ex = test( model, device, test_loader, eps) accuracies. append( acc) examples. append( ex) """
準確度
"""
plt. figure( figsize= ( 5 , 5 ) )
plt. plot( epsilons, accuracies, "*-" )
plt. yticks( np. arange( 0 , 1.1 , step= 0.1 ) )
plt. xticks( np. arange( 0 , .35 , step= 0.05 ) )
plt. title( "Accuracy vs Epsilon" )
plt. xlabel( "Epsilon" )
plt. ylabel( "Accuracy" )
plt. show( ) """
樣本對抗性示例
"""
cnt = 0
plt. figure( figsize= ( 8 , 10 ) )
for i in range ( len ( epsilons) ) : for j in range ( len ( examples[ i] ) ) : cnt += 1 plt. subplot( len ( epsilons) , len ( examples[ 0 ] ) , cnt) plt. xticks( [ ] , [ ] ) plt. yticks( [ ] , [ ] ) if j == 0 : plt. ylabel( "Eps: {}" . format ( epsilons[ i] ) , fontsize= 14 ) orig, adv, ex = examples[ i] [ j] plt. title( "{} -> {}" . format ( orig, adv) ) plt. imshow( ex, cmap= "gray" )
plt. tight_layout( )
plt. show( )