PyTorch之數據加載和處理
from __future__ import print_function, division
import os
import torch
import pandas as pd
from skimage import io, transform
import numpy as np
import matplotlib. pyplot as plt
from torch. utils. data import Dataset, DataLoader
from torchvision import transforms, utilsimport warnings
warnings. filterwarnings( "ignore" ) plt. ion( ) """
讀取數據集
將csv中的標注點數據讀入(N,2)數組中,其中N是特征點的數量。讀取數據代碼如下:
"""
landmarks_frame = pd. read_csv( 'data/faces/faces/face_landmarks.csv' ) n = 65
img_name = landmarks_frame. iloc[ n, 0 ]
landmarks = landmarks_frame. iloc[ n, 1 : ] . values
landmarks = landmarks. astype( 'float' ) . reshape( - 1 , 2 ) print ( 'Image name: {}' . format ( img_name) )
print ( 'Landmarks shape: {}' . format ( landmarks. shape) )
print ( 'First 4 Landmarks: {}' . format ( landmarks[ : 4 ] ) ) """
寫一個簡單的函數來展示一張圖片和它對應的標注點作為例子。
""" def show_landmarks ( image, landmarks) : """顯示帶有地標的圖片""" plt. imshow( image) plt. scatter( landmarks[ : , 0 ] , landmarks[ : , 1 ] , s= 10 , marker= '.' , c= 'r' ) plt. pause( 5 ) plt. figure( )
show_landmarks( io. imread( os. path. join( 'data/faces/faces/' , img_name) ) , landmarks)
plt. show( )
"""建立數據集類
"""
class FaceLandmarksDataset ( Dataset) : """面部標記數據集.""" def __init__ ( self, csv_file, root_dir, transform= None ) : """csv_file(string):帶注釋的csv文件的路徑。root_dir(string):包含所有圖像的目錄。transform(callable, optional):一個樣本上的可用的可選變換""" self. landmarks_frame = pd. read_csv( csv_file) self. root_dir = root_dirself. transform = transformdef __len__ ( self) : return len ( self. landmarks_frame) def __getitem__ ( self, idx) : img_name = os. path. join( self. root_dir, self. landmarks_frame. iloc[ idx, 0 ] ) image = io. imread( img_name) landmarks = self. landmarks_frame. iloc[ idx, 1 : ] landmarks = np. array( [ landmarks] ) landmarks = landmarks. astype( 'float' ) . reshape( - 1 , 2 ) sample = { 'image' : image, 'landmarks' : landmarks} if self. transform: sample = self. transform( sample) return sample"""
數據可視化
""" face_dataset = FaceLandmarksDataset( csv_file= 'data/faces/faces/face_landmarks.csv' , root_dir= 'data/faces/faces/' )
fig = plt. figure( ) for i in range ( len ( face_dataset) ) : sample = face_dataset[ i] print ( i, sample[ 'image' ] . shape, sample[ 'landmarks' ] . shape) ax = plt. subplot( 1 , 4 , i + 1 ) plt. tight_layout( ) ax. set_title( 'Sample #{}' . format ( i) ) ax. axis( 'off' ) show_landmarks( ** sample) if i == 3 : plt. show( ) break """
數據變換,數據預處理
讓我們創建三個轉換:
* Rescale:縮放圖片
* RandomCrop:對圖片進行隨機裁剪。這是一種數據增強操作
* ToTensor:把numpy格式圖片轉為torch格式圖片 (我們需要交換坐標軸).
""" class Rescale ( object ) : """將樣本中的圖像重新縮放到給定大小。.Args:output_size(tuple或int):所需的輸出大小。 如果是元組,則輸出為與output_size匹配。 如果是int,則匹配較小的圖像邊緣到output_size保持縱橫比相同。""" def __init__ ( self, output_size) : assert isinstance ( output_size, ( int , tuple ) ) self. output_size = output_sizedef __call__ ( self, sample) : image, landmarks = sample[ 'image' ] , sample[ 'landmarks' ] h, w = image. shape[ : 2 ] if isinstance ( self. output_size, int ) : if h > w: new_h, new_w = self. output_size * h / w, self. output_sizeelse : new_h, new_w = self. output_size, self. output_size * w / helse : new_h, new_w = self. output_sizenew_h, new_w = int ( new_h) , int ( new_w) img = transform. resize( image, ( new_h, new_w) ) landmarks = landmarks * [ new_w / w, new_h / h] return { 'image' : img, 'landmarks' : landmarks} class RandomCrop ( object ) : """隨機裁剪樣本中的圖像.Args:output_size(tuple或int):所需的輸出大小。 如果是int,方形裁剪是。""" def __init__ ( self, output_size) : assert isinstance ( output_size, ( int , tuple ) ) if isinstance ( output_size, int ) : self. output_size = ( output_size, output_size) else : assert len ( output_size) == 2 self. output_size = output_sizedef __call__ ( self, sample) : image, landmarks = sample[ 'image' ] , sample[ 'landmarks' ] h, w = image. shape[ : 2 ] new_h, new_w = self. output_sizetop = np. random. randint( 0 , h - new_h) left = np. random. randint( 0 , w - new_w) image = image[ top: top + new_h, left: left + new_w] landmarks = landmarks - [ left, top] return { 'image' : image, 'landmarks' : landmarks} class ToTensor ( object ) : """將樣本中的ndarrays轉換為Tensors.""" def __call__ ( self, sample) : image, landmarks = sample[ 'image' ] , sample[ 'landmarks' ] image = image. transpose( ( 2 , 0 , 1 ) ) return { 'image' : torch. from_numpy( image) , 'landmarks' : torch. from_numpy( landmarks) } scale = Rescale( 256 )
crop = RandomCrop( 128 )
composed = transforms. Compose( [ Rescale( 256 ) , RandomCrop( 224 ) ] ) """
應用轉換
"""
fig = plt. figure( )
sample = face_dataset[ 65 ]
for i, tsfrm in enumerate ( [ scale, crop, composed] ) : transformed_sample = tsfrm( sample) ax = plt. subplot( 1 , 3 , i + 1 ) plt. tight_layout( ) ax. set_title( type ( tsfrm) . __name__) show_landmarks( ** transformed_sample) plt. show( ) """
torch.utils.data.DataLoader是一個提供
每次這個數據集被采樣時: 及時地從文件中讀取圖片 * 對讀取的圖片應用轉換 * 由于其中一步操作是隨機的 (randomcrop) , 數據被增強了
功能的迭代器
""" transformed_dataset = FaceLandmarksDataset( csv_file= 'data/faces/faces/face_landmarks.csv' , root_dir= 'data/faces/faces/' , transform= transforms. Compose( [ Rescale( 256 ) , RandomCrop( 224 ) , ToTensor( ) ] ) ) dataloader = DataLoader( transformed_dataset, batch_size= 4 , shuffle= True , num_workers= 4 )
def show_landmarks_batch ( sample_batched) : """Show image with landmarks for a batch of samples.""" images_batch, landmarks_batch = \sample_batched[ 'image' ] , sample_batched[ 'landmarks' ] batch_size = len ( images_batch) im_size = images_batch. size( 2 ) grid_border_size = 2 grid = utils. make_grid( images_batch) plt. imshow( grid. numpy( ) . transpose( ( 1 , 2 , 0 ) ) ) for i in range ( batch_size) : plt. scatter( landmarks_batch[ i, : , 0 ] . numpy( ) + i * im_size + ( i + 1 ) * grid_border_size, landmarks_batch[ i, : , 1 ] . numpy( ) + grid_border_size, s= 10 , marker= '.' , c= 'r' ) plt. title( 'Batch from dataloader' ) for i_batch, sample_batched in enumerate ( dataloader) : print ( i_batch, sample_batched[ 'image' ] . size( ) , sample_batched[ 'landmarks' ] . size( ) ) if i_batch == 3 : plt. figure( ) show_landmarks_batch( sample_batched) plt. axis( 'off' ) plt. ioff( ) plt. show( ) break
Pytorch小試牛刀
"""
使用NumPy,手動實現網絡的 前向和反向傳播,來擬合隨機數據
""" import numpy as np
N, D_in, H, D_out = 64 , 1000 , 100 , 10
x = np. random. randn( N, D_in)
y = np. random. randn( N, D_out)
w1 = np. random. randn( D_in, H)
w2 = np. random. randn( H, D_out) learning_rate = 1e - 6
for t in range ( 500 ) : h = x. dot( w1) h_relu = np. maximum( h, 0 ) y_pred = h_relu. dot( w2) loss = np. square( y_pred - y) . sum ( ) print ( t, loss) grad_y_pred = 2.0 * ( y_pred - y) grad_w2 = h_relu. T. dot( grad_y_pred) grad_h_relu = grad_y_pred. dot( w2. T) grad_h = grad_h_relu. copy( ) grad_h[ h < 0 ] = 0 grad_w1 = x. T. dot( grad_h) w1 -= learning_rate * grad_w1w2 -= learning_rate * grad_w2"""
使用PyTorch的tensor,手動在網絡中實現前向傳播和反向傳播
""" import torchdtype = torch. float
device = torch. device( "cpu" )
N, D_in, H, D_out = 64 , 1000 , 100 , 10
x = torch. randn( N, D_in, device= device, dtype= dtype)
y = torch. randn( N, D_out, device= device, dtype= dtype)
w1 = torch. randn( D_in, H, device= device, dtype= dtype)
w2 = torch. randn( H, D_out, device= device, dtype= dtype) learning_rate = 1e - 6
for t in range ( 500 ) : h = x. mm( w1) h_relu = h. clamp( min = 0 ) y_pred = h_relu. mm( w2) loss = ( y_pred - y) . pow ( 2 ) . sum ( ) . item( ) print ( t, loss) grad_y_pred = 2.0 * ( y_pred - y) grad_w2 = h_relu. t( ) . mm( grad_y_pred) grad_h_relu = grad_y_pred. mm( w2. t( ) ) grad_h = grad_h_relu. clone( ) grad_h[ h < 0 ] = 0 grad_w1 = x. t( ) . mm( grad_h) w1 -= learning_rate * grad_w1w2 -= learning_rate * grad_w2"""
使用PyTorch的Tensors和autograd來實現我們的兩層的神經網絡
""" import torchdtype = torch. float
device = torch. device( "cpu" )
N, D_in, H, D_out = 64 , 1000 , 100 , 10
x = torch. randn( N, D_in, device= device, dtype= dtype)
y = torch. randn( N, D_out, device= device, dtype= dtype)
w1 = torch. randn( D_in, H, device= device, dtype= dtype, requires_grad= True )
w2 = torch. randn( H, D_out, device= device, dtype= dtype, requires_grad= True ) learning_rate = 1e - 6
for t in range ( 500 ) : y_pred = x. mm( w1) . clamp( min = 0 ) . mm( w2) loss = ( y_pred - y) . pow ( 2 ) . sum ( ) print ( t, loss. item( ) ) loss. backward( ) with torch. no_grad( ) : w1 -= learning_rate * w1. gradw2 -= learning_rate * w2. gradw1. grad. zero_( ) w2. grad. zero_( ) """
自定義nn模塊:
自定義Module的子類構建兩層網絡:
""" import torchclass TwoLayerNet ( torch. nn. Module) : def __init__ ( self, D_in, H, D_out) : """在構造函數中,我們實例化了兩個nn.Linear模塊,并將它們作為成員變量。""" super ( TwoLayerNet, self) . __init__( ) self. linear1 = torch. nn. Linear( D_in, H) self. linear2 = torch. nn. Linear( H, D_out) def forward ( self, x) : """在前向傳播的函數中,我們接收一個輸入的張量,也必須返回一個輸出張量。我們可以使用構造函數中定義的模塊以及張量上的任意的(可微分的)操作。""" h_relu = self. linear1( x) . clamp( min = 0 ) y_pred = self. linear2( h_relu) return y_pred
N, D_in, H, D_out = 64 , 1000 , 100 , 10
x = torch. randn( N, D_in)
y = torch. randn( N, D_out)
model = TwoLayerNet( D_in, H, D_out)
loss_fn = torch. nn. MSELoss( reduction= 'sum' )
optimizer = torch. optim. SGD( model. parameters( ) , lr= 1e - 4 )
for t in range ( 500 ) : y_pred = model( x) loss = loss_fn( y_pred, y) print ( t, loss. item( ) ) optimizer. zero_grad( ) loss. backward( ) optimizer. step( ) """
控制流和權重共享
使用普通的Python流控制來實現循環,并且我們可以通過在定義轉發時多次重用同一個模塊來實現最內層之間的權重共享。
""" import random
import torchclass DynamicNet ( torch. nn. Module) : def __init__ ( self, D_in, H, D_out) : """在構造函數中,我們構造了三個nn.Linear實例,它們將在前向傳播時被使用。""" super ( DynamicNet, self) . __init__( ) self. input_linear = torch. nn. Linear( D_in, H) self. middle_linear = torch. nn. Linear( H, H) self. output_linear = torch. nn. Linear( H, D_out) def forward ( self, x) : """對于模型的前向傳播,我們隨機選擇0、1、2、3,并重用了多次計算隱藏層的middle_linear模塊。由于每個前向傳播構建一個動態計算圖,我們可以在定義模型的前向傳播時使用常規Python控制流運算符,如循環或條件語句。在這里,我們還看到,在定義計算圖形時多次重用同一個模塊是完全安全的。這是Lua Torch的一大改進,因為Lua Torch中每個模塊只能使用一次。""" h_relu = self. input_linear( x) . clamp( min = 0 ) for _ in range ( random. randint( 0 , 3 ) ) : h_relu = self. middle_linear( h_relu) . clamp( min = 0 ) y_pred = self. output_linear( h_relu) return y_pred
N, D_in, H, D_out = 64 , 1000 , 100 , 10
x = torch. randn( N, D_in)
y = torch. randn( N, D_out)
model = DynamicNet( D_in, H, D_out)
criterion = torch. nn. MSELoss( reduction= 'sum' )
optimizer = torch. optim. SGD( model. parameters( ) , lr= 1e - 4 , momentum= 0.9 )
for t in range ( 500 ) : y_pred = model( x) loss = criterion( y_pred, y) print ( t, loss. item( ) ) optimizer. zero_grad( ) loss. backward( ) optimizer. step( )
PyTorch之遷移學習
from __future__ import print_function, division
import torch
import torch. nn as nn
import torch. optim as optim
from torch. optim import lr_scheduler
import numpy as np
import torchvision
from torchvision import datasets, models, transforms
import matplotlib. pyplot as plt
import time
import os
import copyplt. ion( ) """
加載數據
"""
data_transforms = { 'train' : transforms. Compose( [ transforms. RandomResizedCrop( 224 ) , transforms. RandomHorizontalFlip( ) , transforms. ToTensor( ) , transforms. Normalize( [ 0.485 , 0.456 , 0.406 ] , [ 0.229 , 0.224 , 0.225 ] ) ] ) , 'val' : transforms. Compose( [ transforms. Resize( 256 ) , transforms. CenterCrop( 224 ) , transforms. ToTensor( ) , transforms. Normalize( [ 0.485 , 0.456 , 0.406 ] , [ 0.229 , 0.224 , 0.225 ] ) ] ) ,
} data_dir = 'data/hymenoptera_data'
image_datasets = { x: datasets. ImageFolder( os. path. join( data_dir, x) , data_transforms[ x] ) for x in [ 'train' , 'val' ] }
dataloaders = { x: torch. utils. data. DataLoader( image_datasets[ x] , batch_size= 4 , shuffle= True , num_workers= 4 ) for x in [ 'train' , 'val' ] }
dataset_sizes = { x: len ( image_datasets[ x] ) for x in [ 'train' , 'val' ] }
class_names = image_datasets[ 'train' ] . classesdevice = torch. device( "cuda:0" if torch. cuda. is_available( ) else "cpu" ) """
可視化部分圖像數據
""" def imshow ( inp, title= None ) : """Imshow for Tensor.""" inp = inp. numpy( ) . transpose( ( 1 , 2 , 0 ) ) mean = np. array( [ 0.485 , 0.456 , 0.406 ] ) std = np. array( [ 0.229 , 0.224 , 0.225 ] ) inp = std * inp + meaninp = np. clip( inp, 0 , 1 ) plt. imshow( inp) if title is not None : plt. title( title) plt. pause( 1 )
inputs, classes = next ( iter ( dataloaders[ 'train' ] ) )
out = torchvision. utils. make_grid( inputs) imshow( out, title= [ class_names[ x] for x in classes] ) """
訓練模型
""" def train_model ( model, criterion, optimizer, scheduler, num_epochs= 25 ) : since = time. time( ) best_model_wts = copy. deepcopy( model. state_dict( ) ) best_acc = 0.0 for epoch in range ( num_epochs) : print ( 'Epoch {}/{}' . format ( epoch, num_epochs - 1 ) ) print ( '-' * 10 ) for phase in [ 'train' , 'val' ] : if phase == 'train' : scheduler. step( ) model. train( ) else : model. eval ( ) running_loss = 0.0 running_corrects = 0 for inputs, labels in dataloaders[ phase] : inputs = inputs. to( device) labels = labels. to( device) optimizer. zero_grad( ) with torch. set_grad_enabled( phase == 'train' ) : outputs = model( inputs) _, preds = torch. max ( outputs, 1 ) loss = criterion( outputs, labels) if phase == 'train' : loss. backward( ) optimizer. step( ) running_loss += loss. item( ) * inputs. size( 0 ) running_corrects += torch. sum ( preds == labels. data) epoch_loss = running_loss / dataset_sizes[ phase] epoch_acc = running_corrects. double( ) / dataset_sizes[ phase] print ( '{} Loss: {:.4f} Acc: {:.4f}' . format ( phase, epoch_loss, epoch_acc) ) if phase == 'val' and epoch_acc > best_acc: best_acc = epoch_accbest_model_wts = copy. deepcopy( model. state_dict( ) ) print ( ) time_elapsed = time. time( ) - sinceprint ( 'Training complete in {:.0f}m {:.0f}s' . format ( time_elapsed // 60 , time_elapsed % 60 ) ) print ( 'Best val Acc: {:4f}' . format ( best_acc) ) model. load_state_dict( best_model_wts) return model"""
可視化模型的預測結果
"""
def visualize_model ( model, num_images= 6 ) : was_training = model. trainingmodel. eval ( ) images_so_far = 0 fig = plt. figure( ) with torch. no_grad( ) : for i, ( inputs, labels) in enumerate ( dataloaders[ 'val' ] ) : inputs = inputs. to( device) labels = labels. to( device) outputs = model( inputs) _, preds = torch. max ( outputs, 1 ) for j in range ( inputs. size( ) [ 0 ] ) : images_so_far += 1 ax = plt. subplot( num_images// 2 , 2 , images_so_far) ax. axis( 'off' ) ax. set_title( 'predicted: {}' . format ( class_names[ preds[ j] ] ) ) imshow( inputs. cpu( ) . data[ j] ) if images_so_far == num_images: model. train( mode= was_training) return model. train( mode= was_training) """
場景1:微調ConvNet
加載預訓練模型并重置最終完全連接的圖層。
""" model_ft = models. resnet18( pretrained= True )
num_ftrs = model_ft. fc. in_features
model_ft. fc = nn. Linear( num_ftrs, 2 )
model_ft = model_ft. to( device)
criterion = nn. CrossEntropyLoss( )
optimizer_ft = optim. SGD( model_ft. parameters( ) , lr= 0.001 , momentum= 0.9 )
exp_lr_scheduler = lr_scheduler. StepLR( optimizer_ft, step_size= 7 , gamma= 0.1 )
model_ft = train_model( model_ft, criterion, optimizer_ft, exp_lr_scheduler, num_epochs= 25 )
visualize_model( model_ft) """
場景2:ConvNet作為固定特征提取器
在這里需要凍結除最后一層之外的所有網絡。通過設置requires_grad == False backward()來凍結參數,這樣在反向傳播backward()的時候他們的梯度就不會被計算。
""" model_conv = torchvision. models. resnet18( pretrained= True )
for param in model_conv. parameters( ) : param. requires_grad = False
num_ftrs = model_conv. fc. in_features
model_conv. fc = nn. Linear( num_ftrs, 2 ) model_conv = model_conv. to( device) criterion = nn. CrossEntropyLoss( )
optimizer_conv = optim. SGD( model_conv. fc. parameters( ) , lr= 0.001 , momentum= 0.9 )
exp_lr_scheduler = lr_scheduler. StepLR( optimizer_conv, step_size= 7 , gamma= 0.1 )
model_conv = train_model( model_conv, criterion, optimizer_conv, exp_lr_scheduler, num_epochs= 25 )
visualize_model( model_conv) plt. ioff( )
plt. show( )
保存和加載模型
import torch
import torch. nn as nn
import torch. functional as F
import torch. utils. data as Data
import torchvision
import torch. optim as optim"""
什么是狀態字典:state_dict?
""" class TheModelClass ( nn. Module) : def __init__ ( self) : super ( TheModelClass, self) . __init__( ) self. conv1 = nn. Conv2d( 3 , 6 , 5 ) self. pool = nn. MaxPool2d( 2 , 2 ) self. conv2 = nn. Conv2d( 6 , 16 , 5 ) self. fc1 = nn. Linear( 16 * 5 * 5 , 120 ) self. fc2 = nn. Linear( 120 , 84 ) self. fc3 = nn. Linear( 84 , 10 ) def forward ( self, x) : x = self. pool( F. relu( self. conv1( x) ) ) x = self. pool( F. relu( self. conv2( x) ) ) x = x. view( - 1 , 16 * 5 * 5 ) x = F. relu( self. fc1( x) ) x = F. relu( self. fc2( x) ) x = self. fc3( x) return x
model = TheModelClass( )
optimizer = optim. SGD( model. parameters( ) , lr= 0.001 , momentum= 0.9 )
print ( "Model's state_dict:" )
for param_tensor in model. state_dict( ) : print ( param_tensor, "\t" , model. state_dict( ) [ param_tensor] . size( ) )
print ( "Optimizer's state_dict:" )
for var_name in optimizer. state_dict( ) : print ( var_name, "\t" , optimizer. state_dict( ) [ var_name] )