文章總覽
為什么做備份更新
為機器人控制器設計一套打包備份更新機制,為控制器的批量生產和產品與項目落地做準備。
當某個模塊出現bug需要升級時,用戶可以快速獲取正確的bak包并導入到控制器中重啟生效。
如果沒有做好軟件的備份更新機制,解決問題時,需要重新燒錄整個系統、或者費時費力地從源代碼開始找問題然后修改編譯,期間系統完全癱瘓。
哪些包計劃更新
1、機器人控制器內置的web IDE服務(功能:調整參數、標定、可視化配置、掃圖和地圖操作、任務下發和狀態監控、腳本二次開發)。通常有go、python等后端和vue等前端。
2、導航算法(功能:定位建圖算法、導航避障控制算法),格式為ros包
3、傳感器驅動程序(功能:相機、雷達、IMU等模塊),格式為ROS包
4、通訊層程序(功能:將ROS topic和service轉為websocket,提供API接口服務,用于與第三方系統通信),格式為ROS包
5、控制模塊(功能:接收上層控制指令,實現底層電機等運動控制),格式為ROS包
打包流程
構建機(開發機)和部署機使用同樣的處理器型號,所以在開發機完成開發和編譯后,可以得到install 目錄下的編譯結果(可執行文件、庫、Python pycache、配置文件等)
1 首先在構建機上執行命令
colcon build --symlink-install --cmake-args -DCMAKE_BUILD_TYPE=Release
2 創建一個發布包目錄,只復制我們需要的編譯產物
mkdir -p /tmp/robot_update_pkg_v1.1/install
cp -r install/ /tmp/robot_update_pkg_v1.1/# 刪除所有的 .bak 文件(舊的備份)
find /tmp/robot_update_pkg_v1.1/ -name "*.bak" -delete
# 刪除所有的編譯中間文件(如果在install目錄里有的話)
find /tmp/robot_update_pkg_v1.1/ -name "*.o" -delete
find /tmp/robot_update_pkg_v1.1/ -name "*.cmake" -delete
find /tmp/robot_update_pkg_v1.1/ -name "Makefile" -delete
# 刪除文檔、測試等可能不需要的文件
rm -rf /tmp/robot_update_pkg_v1.1/install/**/test/
rm -rf /tmp/robot_update_pkg_v1.1/install/**/share/doc/
3 打包命令
cd /tmp
tar -czvf robot_update_pkg_v1.1.tar.gz robot_update_pkg_v1.1/
在web頁面上傳備份包并自動部署
系統架構
- 1 Web上傳服務 (ide_web_service):運行在控制器上,提供一個網頁界面和API接口,用于接收和保存用戶上傳的 .bak 更新包。
- 2 自動部署腳本 (auto_deploy.py):作為系統服務(如 systemd)在控制器啟動時運行,或在收到Web服務的通知后運行。它負責檢查、解壓、驗證并執行部署。
- 3 更新包結構:.bak 包實際上是一個 .tar.gz 壓縮包,包含編譯好的 install 目錄和部署腳本。
robot_controller/
├── uploads/ # Web服務存放上傳的包
│ ├── robot_update_v1.1.tar.gz.bak
│ └── robot_update_v1.2.tar.gz.bak
├── current_version/ # 當前運行的版本(install目錄的軟鏈接或拷貝)
│ └── ... (install目錄的內容)
├── backups/ # 部署過程中備份的文件
│ └── ...
├── ide_web_service/ # 您的Web服務包
│ └── app/
│ ├── main.py # 這是我們將要修改的Flask應用
│ └── ...
└── auto_deploy.py # 自動部署腳本
第一部分:Web上傳服務
這個服務提供上傳界面和處理邏輯。
from flask import Flask, request, jsonify, render_template
import os
from werkzeug.utils import secure_filename
import logging
from datetime import datetimeapp = Flask(__name__)# 配置
app.config['UPLOAD_FOLDER'] = '/home/robot/uploads'
app.config['MAX_CONTENT_LENGTH'] = 200 * 1024 * 1024 # 200MB 限制
ALLOWED_EXTENSIONS = {'bak', 'gz'}# 確保上傳目錄存在
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)# 設置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = app.loggerdef allowed_file(filename):"""檢查文件擴展名是否合法"""return '.' in filename and \filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS@app.route('/')
def index():"""顯示上傳頁面"""return render_template('upload.html')@app.route('/api/upload', methods=['POST'])
def upload_file():"""API接口:處理文件上傳"""if 'file' not in request.files:return jsonify({'error': 'No file part'}), 400file = request.files['file']if file.filename == '':return jsonify({'error': 'No selected file'}), 400if file and allowed_file(file.filename):# 生成安全的文件名,并加上時間戳original_filename = secure_filename(file.filename)timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")save_filename = f"{timestamp}_{original_filename}"save_path = os.path.join(app.config['UPLOAD_FOLDER'], save_filename)try:file.save(save_path)logger.info(f"File uploaded successfully: {save_filename}")# 觸發自動部署(可選:可以改為由系統服務監聽文件變化)# try:# subprocess.run(["python3", "/home/robot/auto_deploy.py", "--file", save_path], check=False, timeout=5)# except Exception as e:# logger.error(f"Failed to trigger auto-deploy: {e}")return jsonify({'message': 'File uploaded successfully!','filename': save_filename,'next_step': 'Please restart the controller to apply the update.'}), 200except Exception as e:logger.error(f"File save failed: {e}")return jsonify({'error': 'File save failed'}), 500else:return jsonify({'error': 'Invalid file type'}), 400@app.route('/api/list_uploads')
def list_uploads():"""API接口:列出所有已上傳的更新包"""files = []for f in os.listdir(app.config['UPLOAD_FOLDER']):if f.endswith('.bak'):file_path = os.path.join(app.config['UPLOAD_FOLDER'], f)files.append({'name': f,'size': os.path.getsize(file_path),'mtime': os.path.getmtime(file_path)})# 按修改時間倒序排列files.sort(key=lambda x: x['mtime'], reverse=True)return jsonify(files)if __name__ == '__main__':app.run(host='0.0.0.0', port=5000, debug=False)
對應的HTML模板 (templates/upload.html):
<!DOCTYPE html>
<html>
<head><title>Robot Controller Update</title><style>body { font-family: Arial, sans-serif; margin: 40px; }.upload-form { margin: 20px 0; padding: 20px; border: 1px solid #ccc; }.progress { display: none; margin: 10px 0; }.message { margin: 10px 0; padding: 10px; border-radius: 4px; }.success { background: #d4edda; color: #155724; }.error { background: #f8d7da; color: #721c24; }</style>
</head>
<body><h1>Upload System Update Package</h1><div class="upload-form"><input type="file" id="fileInput" accept=".bak,.gz"><button onclick="uploadFile()">Upload Update Package</button><div id="progress" class="progress">Uploading... <progress id="progressBar" value="0" max="100"></progress></div><div id="message"></div></div><script>async function uploadFile() {const fileInput = document.getElementById('fileInput');const progressDiv = document.getElementById('progress');const progressBar = document.getElementById('progressBar');const messageDiv = document.getElementById('message');if (!fileInput.files[0]) {showMessage('Please select a file first.', 'error');return;}const formData = new FormData();formData.append('file', fileInput.files[0]);try {progressDiv.style.display = 'block';messageDiv.innerHTML = '';const response = await fetch('/api/upload', {method: 'POST',body: formData});const result = await response.json();if (response.ok) {showMessage(`Upload successful! ${result.message} ${result.next_step}`, 'success');} else {showMessage(`Upload failed: ${result.error}`, 'error');}} catch (error) {showMessage('Upload failed: ' + error.message, 'error');} finally {progressDiv.style.display = 'none';}}function showMessage(text, type) {const messageDiv = document.getElementById('message');messageDiv.innerHTML = text;messageDiv.className = `message ${type}`;}</script>
</body>
</html>
第二部分:自動部署腳本 (auto_deploy.py)
這個腳本會在系統啟動時運行,檢查并部署最新的更新包。
#!/usr/bin/env python3
"""
自動部署腳本:在系統啟動時運行,查找并應用最新的更新包
"""
import os
import tarfile
import logging
import shutil
import subprocess
import glob
from datetime import datetime# 配置
UPLOAD_DIR = "/home/robot/uploads"
TARGET_INSTALL_DIR = "/home/robot/ros2_ws/install"
BACKUP_DIR = "/home/robot/backups"
LOG_FILE = "/var/log/auto_deploy.log"# 設置日志
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s',handlers=[logging.FileHandler(LOG_FILE),logging.StreamHandler()]
)
logger = logging.getLogger(__name__)def find_latest_update_package():"""查找最新的更新包"""pattern = os.path.join(UPLOAD_DIR, "*.bak")update_files = glob.glob(pattern)if not update_files:logger.info("No update packages found.")return None# 按修改時間獲取最新的文件latest_file = max(update_files, key=os.path.getmtime)logger.info(f"Found latest update package: {latest_file}")return latest_filedef backup_current_version():"""備份當前運行的版本"""timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")backup_path = os.path.join(BACKUP_DIR, f"backup_{timestamp}")try:os.makedirs(BACKUP_DIR, exist_ok=True)if os.path.exists(TARGET_INSTALL_DIR):shutil.copytree(TARGET_INSTALL_DIR, backup_path)logger.info(f"Backup created at: {backup_path}")return backup_pathelse:logger.warning("Target install directory does not exist, skipping backup.")return Noneexcept Exception as e:logger.error(f"Backup failed: {e}")return Nonedef deploy_update_package(package_path):"""部署更新包"""# 創建臨時解壓目錄extract_dir = "/tmp/update_extract"if os.path.exists(extract_dir):shutil.rmtree(extract_dir)os.makedirs(extract_dir)try:# 解壓更新包logger.info(f"Extracting package: {package_path}")with tarfile.open(package_path, 'r:gz') as tar:tar.extractall(path=extract_dir)# 檢查解壓后的內容extracted_install = os.path.join(extract_dir, "install")if not os.path.exists(extracted_install):logger.error("No 'install' directory found in the update package!")return False# 備份當前版本backup_path = backup_current_version()# 部署新版本:先清空目標目錄,然后拷貝新文件if os.path.exists(TARGET_INSTALL_DIR):shutil.rmtree(TARGET_INSTALL_DIR)shutil.copytree(extracted_install, TARGET_INSTALL_DIR)logger.info(f"Update deployed successfully to: {TARGET_INSTALL_DIR}")# 可選:將已部署的包移動到已部署目錄或刪除deployed_dir = os.path.join(UPLOAD_DIR, "deployed")os.makedirs(deployed_dir, exist_ok=True)shutil.move(package_path, os.path.join(deployed_dir, os.path.basename(package_path)))return Trueexcept Exception as e:logger.error(f"Deployment failed: {e}")# 嘗試回滾if backup_path and os.path.exists(backup_path):try:if os.path.exists(TARGET_INSTALL_DIR):shutil.rmtree(TARGET_INSTALL_DIR)shutil.copytree(backup_path, TARGET_INSTALL_DIR)logger.info("Rollback to backup completed due to deployment failure.")except Exception as rollback_error:logger.error(f"Rollback also failed: {rollback_error}")return Falsefinally:# 清理臨時目錄if os.path.exists(extract_dir):shutil.rmtree(extract_dir)def main():logger.info("=== Auto Deployment Script Started ===")# 查找最新更新包latest_package = find_latest_update_package()if not latest_package:logger.info("No updates to deploy.")return# 部署更新success = deploy_update_package(latest_package)if success:logger.info("Update deployed successfully! Please restart ROS nodes.")# 這里可以添加自動重啟ROS節點的邏輯# try:# subprocess.run(["systemctl", "restart", "robot-core.service"], check=True)# except Exception as e:# logger.error(f"Failed to restart service: {e}")else:logger.error("Update deployment failed!")logger.info("=== Auto Deployment Script Finished ===")if __name__ == "__main__":main()
第三部分:系統服務配置
創建systemd服務,讓自動部署腳本在啟動時運行。
創建服務文件 /etc/systemd/system/auto-deploy.service:
[Unit]
Description=Robot Auto Deployment Service
After=network.target
Wants=network.target[Service]
Type=oneshot
RemainAfterExit=yes
ExecStart=/usr/bin/python3 /home/robot/auto_deploy.py
User=robot
Group=robot
WorkingDirectory=/home/robot[Install]
WantedBy=multi-user.target
啟用服務:
sudo systemctl daemon-reload
sudo systemctl enable auto-deploy.service
完整工作流程
- 用戶操作:在IDE的網頁界面中上傳 robot_update_v1.2.tar.gz.bak 文件。
- Web服務:接收文件,保存到 /home/robot/uploads/ 目錄。
- 重啟控制器:用戶通過網頁或物理方式重啟控制器。
- 自動部署:
- 系統啟動時,auto-deploy.service 運行 auto_deploy.py。
- 腳本查找最新的 .bak 包,解壓并部署到 install 目錄。
- 部署成功后,自動重啟ROS節點(可選)。 - 狀態驗證:用戶通過Web界面或ROS工具驗證新版本是否正常運行。