論文參考:https://arxiv.org/abs/1604.07316
demo
今天主要來看一個如何通過圖像直接到控制的自動駕駛端到端的項目,首先需要配置好我的仿真環境,下載軟件udacity:
https://d17h27t6h515a5.cloudfront.net/topher/2016/November/5831f3a4_simulator-windows-64/simulator-windows-64.zip
現在好的解壓即可
運行時打開終端,然后將文件拖入終端中運行
選擇合適的窗口大小
成功進入界面
然后配置一下python環境,對于已經有conda環境后,直接
conda create -n cnn python=3.8
conda activate cnn
安裝以下依賴:
astor==0.8.1
bidict==0.21.2
certifi==2021.5.30
charset-normalizer==2.0.4
click==8.0.1
colorama==0.4.4
cycler==0.10.0
decorator==5.0.9
dnspython==1.16.0
eventlet==0.31.1
Flask==2.0.1
gast==0.3.3
greenlet==1.1.0
idna==3.2
itsdangerous==2.0.1
Jinja2==3.0.1
joblib==1.0.1
kiwisolver==1.3.1
MarkupSafe==2.0.1
matplotlib==3.4.2
numpy==1.19.3
opencv-python==4.5.3.56
paddlepaddle==2.1.2
pandas==1.3.1
Pillow==8.3.1
protobuf==3.17.3
pyparsing==2.4.7
python-dateutil==2.8.2
python-engineio==3.13.0
python-socketio==4.6.1
pytz==2021.1
requests==2.26.0
scikit-learn==0.24.2
scipy==1.7.1
six==1.16.0
threadpoolctl==2.2.0
urllib3==1.26.6
Werkzeug==2.0.1
代碼
git clone https://github.com/chan-yuu/end_to_end_ws.git
數據集可以自己駕駛udacity的車輛收集,也可以直接下載下來訓練師需要指定文件夾的路徑,這里我使用的是一個csv文件來指定我的文件夾
主要文件放置的結構要寫入csv文件中,分別是左中右的攝像頭圖片路徑,方向盤,油門,剎車,速度信息。
打開仿真環境,進入training模式
通過鍵盤即可控制車輛運行。
可以點擊record記錄這個過程中的數據,之后就能自動生成需要的數據集內容
python train.py -d xxx.csv
我仔細研讀了一下這個代碼
import paddle
import argparse
import numpy as np
import paddle.nn as nnfrom paddle.optimizer import Adam
from paddle.callbacks import ModelCheckpoint, EarlyStoppingfrom car.model import build_model
from car.utils import CarDataset, load_data# 設置隨機種子,確保結果可復現
np.random.seed(0)def train_model(model, args, X_train, X_valid, y_train, y_valid):"""Train the model"""# 創建一個模型檢查點回調,用于在訓練過程中保存模型checkpoint = ModelCheckpoint(save_dir=args.save_dir)# 創建一個早停回調,當監控的指標(這里是損失值)在一定輪數(patience)內沒有改善時,停止訓練earlystopping = EarlyStopping(monitor='loss',mode='min', # 監控損失值,希望其越小越好patience=10, # 允許損失值在 10 個 epoch 內沒有改善verbose=1, # 打印早停信息min_delta=0, # 損失值的最小改善量baseline=None, # 基線值,這里不使用save_best_model=True) # 保存最佳模型# 根據命令行參數決定是否使用早停回調if args.early_stop:cbs = [checkpoint, earlystopping]else:cbs = [checkpoint]# 創建 Adam 優化器,用于更新模型的參數opt = Adam(learning_rate=args.learning_rate, parameters=model.parameters())# 將模型包裝為 paddle.Model 對象,方便進行訓練和評估model = paddle.Model(model)# 配置模型的損失函數和優化器model.prepare(loss=nn.MSELoss(), optimizer=opt)# 創建訓練數據集對象train_dataset = CarDataset(args.data_dir, X_train, y_train, True)# 創建驗證數據集對象val_dataset = CarDataset(args.data_dir, X_valid, y_valid, False)# 開始訓練模型model.fit(train_data=train_dataset, # 訓練數據集eval_data=val_dataset, # 驗證數據集epochs=args.nb_epoch, # 訓練的輪數batch_size=args.batch_size, # 每個批次的樣本數量save_dir=args.save_dir, # 模型保存的目錄callbacks=cbs, # 回調函數列表verbose=1) # 打印訓練進度信息def s2b(s):"""Converts a string to boolean value"""# 將字符串轉換為小寫s = s.lower()# 判斷字符串是否表示真return s == 'true' or s == 'yes' or s == 'y' or s == '1'def main():"""Load train/validation data set and train the model"""# 創建命令行參數解析器parser = argparse.ArgumentParser(description='Behavioral Cloning Training Program')# 添加數據目錄參數,默認值為 'data'parser.add_argument('-d',help='data directory',dest='data_dir',type=str,default='data')# 添加模型保存目錄參數,默認值為 'save'parser.add_argument('-s',help='save directory',dest='save_dir',type=str,default='save')# 添加測試集大小比例參數,默認值為 0.2parser.add_argument('-t',help='test size fraction',dest='test_size',type=float,default=0.2)# 添加 Dropout 概率參數,默認值為 0.5parser.add_argument('-k',help='drop out probability',dest='keep_prob',type=float,default=0.5)# 添加訓練輪數參數,默認值為 100parser.add_argument('-n',help='number of epochs',dest='nb_epoch',type=int,default=100)# 添加批次大小參數,默認值為 40parser.add_argument('-b',help='batch size',dest='batch_size',type=int,default=40)# 添加學習率參數,默認值為 1.0e-4parser.add_argument('-l',help='learning rate',dest='learning_rate',type=float,default=1.0e-4)# 添加早停參數,默認值為 Falseparser.add_argument('-e',help='early stop',dest='early_stop',type=bool,default=False)# 解析命令行參數args = parser.parse_args()print('-' * 30)print('Parameters')print('-' * 30)# 打印所有命令行參數for key, value in vars(args).items():print('{:<20} := {}'.format(key, value))print('-' * 30)# 加載訓練數據和驗證數據data = load_data(args)# 構建模型model = build_model(args.keep_prob)# 調用訓練函數進行模型訓練train_model(model, args, *data)if __name__ == '__main__':# 程序入口,調用 main 函數main()
訓練結束后可以得到對應的模型
使用這個模型進行測試
打開仿真軟件的auto模式
此時是無法記錄的,然后我可以加載模型并駕駛車輛
python drive.py ./pretrained_models/model_paddle_test2.pdparams
同樣,自己寫一邊這個代碼更容易理解:
import os
import base64
import paddle
import shutil
import argparse
import socketio
import eventlet
import numpy
import eventlet.wsgi
from PIL import Imagefrom io import BvtesIO
from flask import Flask
from datatime import datatime
from car.model import build_model
from car.utils import preprocess# 創建一個socket.IO服務器
sio = socket.Server()
app = Flask(__name__)
# 初始化模型變量,用于后續加載模型
model = None
# 初始化上一幀圖像數組變量,用于記錄上一幀的圖像數據
prev_image_array = None# 定義最大速度和最小速度
MAX_SPEED = 25
MIN_SPEED = 10# 初始化速度限制為最大速度
speed_limit = MAX_SPEED# 定義一個事件處理函數,當接收到 'telemetry' 事件時觸發
@sio.on('telemetry')
def telemetry(sid, data):if data:# 從接收到的數據中提取當前汽車的轉向角度steering_angle = float(data["steering_angle"])# 從接收到的數據中提取當前汽車的油門值throttle = float(data["throttle"])# 從接收到的數據中提取當前汽車的速度speed = float(data["speed"])# 從接收到的數據中提取當前汽車中心攝像頭的圖像,并將其解碼為 PIL 圖像對象image = Image.open(BytesIO(base64.b64decode(data["image"])))# 如果指定了圖像保存文件夾,則保存當前幀圖像if args.image_folder != '':# 生成當前時間戳,用于作為圖像文件名timestamp = datetime.utcnow().strftime('%Y_%m_%d_%H_%M_%S_%f')[:-3]# 構建圖像文件的完整路徑image_filename = os.path.join(args.image_folder, timestamp)# 保存圖像為 JPEG 格式image.save('{}.jpg'.format(image_filename))try:# 將 PIL 圖像對象轉換為 NumPy 數組image = np.asarray(image)# 對圖像進行預處理,例如裁剪、歸一化等操作image = preprocess(image)# 為圖像數組添加一個維度,使其成為 4D 數組,以滿足模型輸入要求image = np.array([image])# 使用模型對圖像進行預測,得到轉向角度的預測值steering_angle = model(image.astype('float32') / 127.5 - 1.0).item()# 根據當前速度調整速度限制和油門值global speed_limitif speed > speed_limit:# 如果當前速度超過速度限制,則將速度限制降低到最小速度,以減速speed_limit = MIN_SPEEDelse:# 如果當前速度低于速度限制,則將速度限制恢復到最大速度speed_limit = MAX_SPEED# 根據轉向角度和速度計算油門值throttle = 1.0 - steering_angle**2 - (speed / speed_limit)**2# 打印當前的轉向角度、油門值和速度print(f'steering_angle={steering_angle:.3f}, throttle={throttle:.3f}, speed={speed:.3f}')# 發送控制指令,包括轉向角度和油門值send_control(steering_angle, throttle)except Exception as e:# 打印異常信息print(e)else:# 如果沒有接收到數據,則發送手動控制指令sio.emit('manual', data={}, skip_sid=True)# 定義一個事件處理函數,當有新的客戶端連接時觸發
@sio.on('connect')
def connect(sid, environ):# 打印連接信息print("connect ", sid)# 發送初始控制指令,將轉向角度和油門值都設為 0send_control(0, 0)# 定義一個函數,用于發送控制指令
def send_control(steering_angle, throttle):# 向客戶端發送 'steer' 事件,包含轉向角度和油門值sio.emit("steer",data={'steering_angle': steering_angle.__str__(),'throttle': throttle.__str__()},skip_sid=True)if __name__ == '__main__':# 創建一個命令行參數解析器parser = argparse.ArgumentParser(description='Remote Driving')# 添加一個必需的命令行參數,用于指定模型文件的路徑parser.add_argument('model',type=str,help='Path to model h5 file. Model should be on the same path.')# 添加一個可選的命令行參數,用于指定圖像保存文件夾的路徑parser.add_argument('image_folder',type=str,nargs='?',default='',help='Path to image folder. This is where the images from the run will be saved.')# 解析命令行參數args = parser.parse_args()# 構建模型model = build_model()# 加載模型的參數params = paddle.load(args.model)# 將加載的參數設置到模型中model.set_dict(params)# 將模型轉換為靜態圖模式,以提高推理速度model = paddle.jit.to_static(model)# 將模型設置為評估模式model.eval()# 如果指定了圖像保存文件夾if args.image_folder != '':# 打印創建圖像文件夾的信息print("Creating image folder at {}".format(args.image_folder))# 如果文件夾不存在,則創建它if not os.path.exists(args.image_folder):os.makedirs(args.image_folder)else:# 如果文件夾已存在,則先刪除它,再重新創建shutil.rmtree(args.image_folder)os.makedirs(args.image_folder)# 打印記錄運行信息print("RECORDING THIS RUN ...")else:# 如果沒有指定圖像保存文件夾,則打印不記錄運行信息print("NOT RECORDING THIS RUN ...")# 使用 Socket.IO 中間件包裝 Flask 應用app = socketio.Middleware(sio, app)# 使用 eventlet 啟動一個 WSGI 服務器,監聽 4567 端口eventlet.wsgi.server(eventlet.listen(('', 4567)), app)
即可實現基于視覺的自動駕駛功能。后面這篇文章還會繼續完善論文中的一些觀點和代碼的一些學習過程。
https://github.com/naokishibuya/car-behavioral-cloning?tab=readme-ov-file